parent
4d2f86202d
commit
3119566407
9 changed files with 93 additions and 6 deletions
|
@ -259,6 +259,7 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
|||
.l1()
|
||||
.enabled(l1Enabled)
|
||||
.lifespan(l1Lifespan)
|
||||
.stateTransfer().timeout(30, TimeUnit.SECONDS)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -181,6 +181,10 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
|
|||
}
|
||||
}
|
||||
|
||||
private int getStalledTimeoutInSeconds(int defaultTimeout) {
|
||||
return config.getInt("stalledTimeoutInSeconds", defaultTimeout);
|
||||
}
|
||||
|
||||
private void loadLoginFailuresFromRemoteCaches(final KeycloakSessionFactory sessionFactory, String cacheName, final int sessionsPerSegment, final int maxErrors) {
|
||||
log.debugf("Check pre-loading sessions from remote cache '%s'", cacheName);
|
||||
|
||||
|
@ -190,9 +194,12 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
|
|||
public void run(KeycloakSession session) {
|
||||
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
|
||||
Cache<String, Serializable> workCache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
int defaultStateTransferTimeout = (int) (connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME)
|
||||
.getCacheConfiguration().clustering().stateTransfer().timeout() / 1000);
|
||||
|
||||
InfinispanCacheInitializer initializer = new InfinispanCacheInitializer(sessionFactory, workCache,
|
||||
new RemoteCacheSessionsLoader(cacheName, sessionsPerSegment), "remoteCacheLoad::" + cacheName, sessionsPerSegment, maxErrors);
|
||||
new RemoteCacheSessionsLoader(cacheName, sessionsPerSegment), "remoteCacheLoad::" + cacheName, sessionsPerSegment, maxErrors,
|
||||
getStalledTimeoutInSeconds(defaultStateTransferTimeout));
|
||||
|
||||
initializer.initCache();
|
||||
initializer.loadSessions();
|
||||
|
|
|
@ -165,6 +165,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
return timeout != null ? timeout : Environment.getServerStartupTimeout();
|
||||
}
|
||||
|
||||
private int getStalledTimeoutInSeconds(int defaultTimeout) {
|
||||
return config.getInt("sessionPreloadStalledTimeoutInSeconds", defaultTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadPersistentSessions(final KeycloakSessionFactory sessionFactory, final int maxErrors, final int sessionsPerSegment) {
|
||||
|
@ -180,9 +183,12 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
|
||||
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
|
||||
Cache<String, Serializable> workCache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
int defaultStateTransferTimeout = (int) (connections.getCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME)
|
||||
.getCacheConfiguration().clustering().stateTransfer().timeout() / 1000);
|
||||
|
||||
InfinispanCacheInitializer ispnInitializer = new InfinispanCacheInitializer(sessionFactory, workCache,
|
||||
new OfflinePersistentUserSessionLoader(sessionsPerSegment), "offlineUserSessions", sessionsPerSegment, maxErrors);
|
||||
new OfflinePersistentUserSessionLoader(sessionsPerSegment), "offlineUserSessions", sessionsPerSegment, maxErrors,
|
||||
getStalledTimeoutInSeconds(defaultStateTransferTimeout));
|
||||
|
||||
// DB-lock to ensure that persistent sessions are loaded from DB just on one DC. The other DCs will load them from remote cache.
|
||||
CacheInitializer initializer = new DBLockBasedCacheInitializer(session, ispnInitializer);
|
||||
|
@ -326,9 +332,12 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
public void run(KeycloakSession session) {
|
||||
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
|
||||
Cache<String, Serializable> workCache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
int defaultStateTransferTimeout = (int) (connections.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME)
|
||||
.getCacheConfiguration().clustering().stateTransfer().timeout() / 1000);
|
||||
|
||||
InfinispanCacheInitializer initializer = new InfinispanCacheInitializer(sessionFactory, workCache,
|
||||
new RemoteCacheSessionsLoader(cacheName, sessionsPerSegment), "remoteCacheLoad::" + cacheName, sessionsPerSegment, maxErrors);
|
||||
new RemoteCacheSessionsLoader(cacheName, sessionsPerSegment), "remoteCacheLoad::" + cacheName, sessionsPerSegment, maxErrors,
|
||||
getStalledTimeoutInSeconds(defaultStateTransferTimeout));
|
||||
|
||||
initializer.initCache();
|
||||
initializer.loadSessions();
|
||||
|
|
|
@ -72,6 +72,11 @@ public abstract class BaseCacheInitializer extends CacheInitializer {
|
|||
return transport == null || transport.isCoordinator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getProgressIndicator() {
|
||||
InitializerState state = getStateFromCache();
|
||||
return state == null ? 0 : state.getProgressIndicator();
|
||||
}
|
||||
|
||||
protected InitializerState getStateFromCache() {
|
||||
// We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored.
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.keycloak.models.sessions.infinispan.initializer;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
@ -30,10 +32,32 @@ public abstract class CacheInitializer {
|
|||
}
|
||||
|
||||
public void loadSessions() {
|
||||
Instant loadingMustContinueBy = Instant.now().plusSeconds(getStalledTimeoutInSeconds());
|
||||
boolean loadingStalledInPreviousStep = false;
|
||||
int lastProgressIndicator = 0;
|
||||
while (!isFinished()) {
|
||||
if (!isCoordinator()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
final int progressIndicator = getProgressIndicator();
|
||||
final boolean loadingStalled = lastProgressIndicator == progressIndicator;
|
||||
if (loadingStalled) {
|
||||
if (loadingStalledInPreviousStep) {
|
||||
if (Instant.now().isAfter(loadingMustContinueBy)) {
|
||||
throw new RuntimeException("Loading sessions has stalled for " + getStalledTimeoutInSeconds() + " seconds, possibly caused by split-brain");
|
||||
}
|
||||
|
||||
log.tracef("Loading sessions stalled. Waiting until %s", loadingMustContinueBy);
|
||||
} else {
|
||||
loadingMustContinueBy = Instant.now().plusSeconds(getStalledTimeoutInSeconds());
|
||||
loadingStalledInPreviousStep = true;
|
||||
}
|
||||
} else {
|
||||
loadingStalledInPreviousStep = false;
|
||||
}
|
||||
|
||||
lastProgressIndicator = progressIndicator;
|
||||
} catch (InterruptedException ie) {
|
||||
log.error("Interrupted", ie);
|
||||
throw new RuntimeException("Loading sessions failed", ie);
|
||||
|
@ -49,8 +73,19 @@ public abstract class CacheInitializer {
|
|||
|
||||
protected abstract boolean isCoordinator();
|
||||
|
||||
/**
|
||||
* Returns an integer which captures current progress. If there is a progress in loading,
|
||||
* this indicator must be different most of the time so that it does not hit 30-seconds
|
||||
* limit.
|
||||
* @see #stalledTimeoutInSeconds
|
||||
* @return
|
||||
*/
|
||||
protected abstract int getProgressIndicator();
|
||||
|
||||
/**
|
||||
* Just coordinator will run this
|
||||
*/
|
||||
protected abstract void startLoading();
|
||||
|
||||
protected abstract int getStalledTimeoutInSeconds();
|
||||
}
|
||||
|
|
|
@ -57,6 +57,15 @@ public class DBLockBasedCacheInitializer extends CacheInitializer {
|
|||
return delegate.isCoordinator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getProgressIndicator() {
|
||||
return delegate.getProgressIndicator();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStalledTimeoutInSeconds() {
|
||||
return delegate.getStalledTimeoutInSeconds();
|
||||
}
|
||||
|
||||
/**
|
||||
* Just coordinator will run this. And there is DB-lock, so the delegate.startLoading() will be permitted just by the single DC
|
||||
|
|
|
@ -51,10 +51,13 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
|
|||
|
||||
private final int maxErrors;
|
||||
|
||||
// Effectively no timeout
|
||||
private final int stalledTimeoutInSeconds;
|
||||
|
||||
public InfinispanCacheInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, String stateKeySuffix, int sessionsPerSegment, int maxErrors) {
|
||||
public InfinispanCacheInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, String stateKeySuffix, int sessionsPerSegment, int maxErrors, int stalledTimeoutInSeconds) {
|
||||
super(sessionFactory, workCache, sessionLoader, stateKeySuffix, sessionsPerSegment);
|
||||
this.maxErrors = maxErrors;
|
||||
this.stalledTimeoutInSeconds = stalledTimeoutInSeconds;
|
||||
}
|
||||
|
||||
|
||||
|
@ -111,6 +114,10 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
|
|||
startLoadingImpl(state, ctx[0]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getStalledTimeoutInSeconds() {
|
||||
return this.stalledTimeoutInSeconds;
|
||||
}
|
||||
|
||||
protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) {
|
||||
// Assume each worker has same processor's count
|
||||
|
|
|
@ -73,6 +73,11 @@ public class InitializerState extends SessionEntity {
|
|||
return segments.cardinality() == segmentsCount;
|
||||
}
|
||||
|
||||
/** Return indication of progress - changes upon progress */
|
||||
public int getProgressIndicator() {
|
||||
return segments.hashCode();
|
||||
}
|
||||
|
||||
/** Return next un-finished segments in the next row of segments.
|
||||
* @param segmentToLoad The segment we are loading
|
||||
* @param maxSegmentCount The max segment to load
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.keycloak.keys.PublicKeyStorageSpi;
|
|||
import org.keycloak.keys.infinispan.InfinispanCachePublicKeyProviderFactory;
|
||||
import org.keycloak.keys.infinispan.InfinispanPublicKeyStorageProviderFactory;
|
||||
import org.keycloak.models.SingleUseObjectSpi;
|
||||
import org.keycloak.models.UserLoginFailureSpi;
|
||||
import org.keycloak.models.UserSessionSpi;
|
||||
import org.keycloak.models.cache.authorization.CachedStoreFactorySpi;
|
||||
import org.keycloak.models.cache.infinispan.authorization.InfinispanCacheStoreFactoryProviderFactory;
|
||||
import org.keycloak.models.cache.CachePublicKeyProviderSpi;
|
||||
|
@ -98,7 +100,14 @@ public class Infinispan extends KeycloakModelParameters {
|
|||
.config("embedded", "true")
|
||||
.config("clustered", "true")
|
||||
.config("useKeycloakTimeService", "true")
|
||||
.config("nodeName", "node-" + NODE_COUNTER.incrementAndGet());
|
||||
.config("nodeName", "node-" + NODE_COUNTER.incrementAndGet())
|
||||
.spi(UserLoginFailureSpi.NAME)
|
||||
.provider(InfinispanUserLoginFailureProviderFactory.PROVIDER_ID)
|
||||
.config("stalledTimeoutInSeconds", "10")
|
||||
.spi(UserSessionSpi.NAME)
|
||||
.provider(InfinispanUserSessionProviderFactory.PROVIDER_ID)
|
||||
.config("sessionPreloadStalledTimeoutInSeconds", "10")
|
||||
;
|
||||
}
|
||||
|
||||
public Infinispan() {
|
||||
|
|
Loading…
Reference in a new issue