From 01939bcf340da47e6af51910dd571dd9f6f92a38 Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Tue, 9 Jan 2024 16:55:22 +0100 Subject: [PATCH] Remove concurrent loading of remote sessions as at startup time only one node is up anyway. (#25709) Closes #22082 Signed-off-by: Alexander Schwartz Co-authored-by: Martin Kanis --- .../release_notes/topics/24_0_0.adoc | 7 + .../topics/keycloak/changes-24_0_0.adoc | 12 ++ model/infinispan/pom.xml | 6 + .../initializer/BaseCacheInitializer.java | 7 +- .../InfinispanCacheInitializer.java | 152 ++++++------------ .../initializer/SessionInitializerWorker.java | 42 +---- .../RemoteCacheSessionsLoader.java | 125 +++++--------- .../RemoteCacheSessionsLoaderContext.java | 41 +---- .../cluster/infinispan/JDGPutTest.java | 84 ++-------- .../RemoteCacheSessionsLoaderTest.java | 3 - .../initializer/InitializerStateTest.java | 24 +-- .../OfflineSessionPersistenceTest.java | 2 +- 12 files changed, 136 insertions(+), 369 deletions(-) diff --git a/docs/documentation/release_notes/topics/24_0_0.adoc b/docs/documentation/release_notes/topics/24_0_0.adoc index ca26efa206..fb87d90a00 100644 --- a/docs/documentation/release_notes/topics/24_0_0.adoc +++ b/docs/documentation/release_notes/topics/24_0_0.adoc @@ -104,6 +104,13 @@ first time using the `idp-review-user-profile.ftl` template. For more details, see link:{upgradingguide_link}[{upgradingguide_name}]. += Sequential loading of offline sessions and remote sessions + +Starting with this release, the first member of a Keycloak cluster will load remote sessions sequentially instead of in parallel. +If offline session preloading is enabled, those will be loaded sequentially as well. + +For more details, see link:{upgradingguide_link}[{upgradingguide_name}]. + = Performing actions on behalf of another user is not longer possible when the user is already authenticated In this release, you can no longer perform actions such as email verification if the user is already authenticated diff --git a/docs/documentation/upgrading/topics/keycloak/changes-24_0_0.adoc b/docs/documentation/upgrading/topics/keycloak/changes-24_0_0.adoc index 4e77edf230..079ba9c7a2 100644 --- a/docs/documentation/upgrading/topics/keycloak/changes-24_0_0.adoc +++ b/docs/documentation/upgrading/topics/keycloak/changes-24_0_0.adoc @@ -144,6 +144,18 @@ and potentially introduce unexpected changes and behavior that should only affec If you have customizations to the `login-update-profile.ftl` template to customize how users update their profiles when authenticating through a broker, make sure to move your changes to the new template. += Sequential loading of offline sessions and remote sessions + +Starting with this release, the first member of a Keycloak cluster will load remote sessions sequentially instead of in parallel. +If offline session preloading is enabled, those will be loaded sequentially as well. + +The previous code led to high resource-consumption across the cluster at startup and was challenging to analyze in production environments and could lead to complex failure scenarios if a node was restarted during loading. +Therefore, it was changed to sequential session loading. + +For offline sessions, the default in this and previous versions of Keycloak is to load those sessions on demand, which scales better with a lot of offline sessions than the attempt to preload them in parallel. Setups that use this default setup are not affected by the change of the loading strategy for offline sessions. +Setups that have offline session preloading enabled should migrate to a setup where offline-session preloading is disabled. + + = Infinispan metrics use labels for cache manager and cache names When enabling metrics for {project_name}'s embedded caches, the metrics now use labels for the cache manager and the cache names. diff --git a/model/infinispan/pom.xml b/model/infinispan/pom.xml index 3ca26d1e3c..552ff4dda3 100755 --- a/model/infinispan/pom.xml +++ b/model/infinispan/pom.xml @@ -23,6 +23,12 @@ org.keycloak 999.0.0-SNAPSHOT + + 17 + 17 + 17 + + 4.0.0 keycloak-model-infinispan diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java index 196b4cfddf..10fc37044d 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java @@ -24,10 +24,7 @@ import org.infinispan.context.Flag; import org.infinispan.lifecycle.ComponentStatus; import org.infinispan.remoting.transport.Transport; import org.jboss.logging.Logger; -import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSessionFactory; -import org.keycloak.models.KeycloakSessionTask; -import org.keycloak.models.utils.KeycloakModelUtils; /** * @author Marek Posolda @@ -40,11 +37,11 @@ public abstract class BaseCacheInitializer extends CacheInitializer { protected final KeycloakSessionFactory sessionFactory; protected final Cache workCache; - protected final SessionLoader sessionLoader; + protected final SessionLoader sessionLoader; protected final int sessionsPerSegment; protected final String stateKey; - public BaseCacheInitializer(KeycloakSessionFactory sessionFactory, Cache workCache, SessionLoader sessionLoader, String stateKeySuffix, int sessionsPerSegment) { + public BaseCacheInitializer(KeycloakSessionFactory sessionFactory, Cache workCache, SessionLoader sessionLoader, String stateKeySuffix, int sessionsPerSegment) { this.sessionFactory = sessionFactory; this.workCache = workCache; this.sessionLoader = sessionLoader; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java index affc7471ab..12340564c5 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java @@ -18,10 +18,7 @@ package org.keycloak.models.sessions.infinispan.initializer; import org.infinispan.Cache; -import org.infinispan.commons.CacheException; import org.infinispan.factories.ComponentRegistry; -import org.infinispan.manager.ClusterExecutor; -import org.infinispan.remoting.transport.Transport; import org.jboss.logging.Logger; import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSessionFactory; @@ -29,18 +26,14 @@ import org.keycloak.models.KeycloakSessionTask; import org.keycloak.models.utils.KeycloakModelUtils; import java.io.Serializable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.LinkedList; import java.util.List; import java.util.Queue; /** - * Startup initialization for reading persistent userSessions to be filled into infinispan/memory . In cluster, - * the initialization is distributed among all cluster nodes, so the startup time is even faster + * Startup initialization for reading persistent userSessions to be filled into infinispan/memory. * - * Implementation is pretty generic and doesn't contain any "userSession" specific stuff. All logic related to how are sessions loaded is in the SessionLoader implementation + * Implementation is pretty generic and doesn't contain any "userSession" specific stuff. All logic related to how sessions are loaded is in the SessionLoader implementation * * @author Marek Posolda */ @@ -119,119 +112,72 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { } protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) { - // Assume each worker has same processor's count - int processors = Runtime.getRuntime().availableProcessors(); - - Transport transport = workCache.getCacheManager().getTransport(); - - // Every worker iteration will be executed on single node. Use 3 failover attempts for each segment (should be sufficient in all cases) - ClusterExecutor clusterExecutor = workCache.getCacheManager().executor() - .singleNodeSubmission(3); - int errors = 0; int segmentToLoad = 0; - //try { - SessionLoader.WorkerResult previousResult = null; - SessionLoader.WorkerResult nextResult = null; - int distributedWorkersCount = 0; - boolean firstTryForSegment = true; + SessionLoader.WorkerResult previousResult = null; + SessionLoader.WorkerResult nextResult = null; + int distributedWorkersCount = 1; - while (segmentToLoad < state.getSegmentsCount()) { - if (firstTryForSegment) { - // do not change the node count if it's not the first try - int nodesCount = transport==null ? 1 : transport.getMembers().size(); - distributedWorkersCount = processors * nodesCount; - } + while (segmentToLoad < state.getSegmentsCount()) { - log.debugf("Starting next iteration with %d workers", distributedWorkersCount); + log.debugf("Starting next iteration with %d workers", distributedWorkersCount); - List segments = state.getSegmentsToLoad(segmentToLoad, distributedWorkersCount); + List segments = state.getSegmentsToLoad(segmentToLoad, distributedWorkersCount); - if (log.isTraceEnabled()) { - log.trace("unfinished segments for this iteration: " + segments); - } + if (log.isTraceEnabled()) { + log.trace("unfinished segments for this iteration: " + segments); + } - List> futures = new LinkedList<>(); - final Queue results = new ConcurrentLinkedQueue<>(); + final Queue results = new ConcurrentLinkedQueue<>(); - CompletableFuture completableFuture = null; - for (Integer segment : segments) { - SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, segment - segmentToLoad, previousResult); + for (Integer segment : segments) { + SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, segment - segmentToLoad, previousResult); - SessionInitializerWorker worker = new SessionInitializerWorker(); - worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader, workCache.getName()); + SessionInitializerWorker worker = new SessionInitializerWorker(); + worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader); - completableFuture = clusterExecutor.submitConsumer(worker, (address, workerResult, throwable) -> { - log.tracef("Calling triConsumer on address %s, throwable message: %s, segment: %s", address, throwable == null ? "null" : throwable.getMessage(), - workerResult == null ? null : workerResult.getSegment()); + results.add(worker.apply(sessionFactory)); + } - if (throwable != null) { - throw new CacheException(throwable); - } - results.add(workerResult); - }); + boolean anyFailure = false; - futures.add(completableFuture); - } - - boolean anyFailure = false; - - // Make sure that all workers are finished - for (CompletableFuture future : futures) { - try { - future.get(); - } catch (InterruptedException ie) { - anyFailure = true; - errors++; - log.error("Interruped exception when computed future. Errors: " + errors, ie); - } catch (ExecutionException ee) { - anyFailure = true; - errors++; - log.error("ExecutionException when computed future. Errors: " + errors, ee); - } - } - - // Check the results - for (SessionLoader.WorkerResult result : results) { - if (result.isSuccess()) { - state.markSegmentFinished(result.getSegment()); - if (result.getSegment() == segmentToLoad + distributedWorkersCount - 1) { - // last result for next iteration when complete - nextResult = result; - } - } else { - if (log.isTraceEnabled()) { - log.tracef("Segment %d failed to compute", result.getSegment()); - } - anyFailure = true; - } - } - - if (errors >= maxErrors) { - throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details"); - } - - if (!anyFailure) { - // everything is OK, prepare the new row - segmentToLoad += distributedWorkersCount; - firstTryForSegment = true; - previousResult = nextResult; - nextResult = null; - if (log.isTraceEnabled()) { - log.debugf("New initializer state is: %s", state); + // Check the results + for (SessionLoader.WorkerResult result : results) { + if (result.isSuccess()) { + state.markSegmentFinished(result.getSegment()); + if (result.getSegment() == segmentToLoad + distributedWorkersCount - 1) { + // last result for next iteration when complete + nextResult = result; } } else { - // some segments failed, try to load unloaded segments - firstTryForSegment = false; + if (log.isTraceEnabled()) { + log.tracef("Segment %d failed to compute", result.getSegment()); + } + anyFailure = true; } } - // Push the state after computation is finished - saveStateToCache(state); + if (errors >= maxErrors) { + throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details"); + } - // Loader callback after the task is finished - this.sessionLoader.afterAllSessionsLoaded(this); + if (!anyFailure) { + // everything is OK, prepare the new row + segmentToLoad += distributedWorkersCount; + previousResult = nextResult; + nextResult = null; + if (log.isTraceEnabled()) { + log.debugf("New initializer state is: %s", state); + } + } + } + + // Push the state after computation is finished + saveStateToCache(state); + + // Loader callback after the task is finished + this.sessionLoader.afterAllSessionsLoaded(this); } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java index 4b8076d1fe..69c53dadaf 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java @@ -17,12 +17,7 @@ package org.keycloak.models.sessions.infinispan.initializer; -import org.infinispan.Cache; -import org.infinispan.manager.EmbeddedCacheManager; -import org.jboss.logging.Logger; -import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSessionFactory; -import org.keycloak.models.KeycloakSessionTask; import org.keycloak.models.utils.KeycloakModelUtils; import java.io.Serializable; @@ -31,48 +26,21 @@ import java.util.function.Function; /** * @author Marek Posolda */ -public class SessionInitializerWorker implements Function, Serializable { - - private static final Logger log = Logger.getLogger(SessionInitializerWorker.class); +public class SessionInitializerWorker implements Function, Serializable { private SessionLoader.LoaderContext loaderCtx; private SessionLoader.WorkerContext workerCtx; - private SessionLoader sessionLoader; + private SessionLoader sessionLoader; - private String cacheName; - - - public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader, String cacheName) { + public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader) { this.loaderCtx = loaderCtx; this.workerCtx = workerCtx; this.sessionLoader = sessionLoader; - this.cacheName = cacheName; } @Override - public SessionLoader.WorkerResult apply(EmbeddedCacheManager embeddedCacheManager) { - Cache workCache = embeddedCacheManager.getCache(cacheName); - if (log.isTraceEnabled()) { - log.tracef("Running computation for segment %s with worker %s", workerCtx.getSegment(), workerCtx.getWorkerId()); - } - - KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class); - if (sessionFactory == null) { - log.debugf("KeycloakSessionFactory not yet set in cache. Worker skipped"); - return sessionLoader.createFailedWorkerResult(loaderCtx, workerCtx); - } - - SessionLoader.WorkerResult[] ref = new SessionLoader.WorkerResult[1]; - KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() { - - @Override - public void run(KeycloakSession session) { - ref[0] = sessionLoader.loadSessions(session, loaderCtx, workerCtx); - } - - }); - - return ref[0]; + public SessionLoader.WorkerResult apply(KeycloakSessionFactory sessionFactory) { + return KeycloakModelUtils.runJobInTransactionWithResult(sessionFactory, (session) -> sessionLoader.loadSessions(session, loaderCtx, workerCtx)); } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java index b5fda004e7..42a8727a6e 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java @@ -18,21 +18,15 @@ package org.keycloak.models.sessions.infinispan.remotestore; import java.io.Serializable; -import java.net.SocketAddress; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.client.hotrod.impl.RemoteCacheImpl; -import org.infinispan.client.hotrod.impl.operations.IterationStartOperation; -import org.infinispan.client.hotrod.impl.operations.IterationStartResponse; -import org.infinispan.client.hotrod.impl.operations.OperationsFactory; import org.infinispan.commons.util.CloseableIterator; import org.infinispan.context.Flag; import org.jboss.logging.Logger; +import org.keycloak.common.util.Retry; import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.KeycloakSession; @@ -40,8 +34,6 @@ import org.keycloak.models.sessions.infinispan.initializer.BaseCacheInitializer; import org.keycloak.models.sessions.infinispan.initializer.OfflinePersistentUserSessionLoader; import org.keycloak.models.sessions.infinispan.initializer.SessionLoader; -import static org.infinispan.client.hotrod.impl.Util.await; - /** * @author Marek Posolda */ @@ -65,42 +57,10 @@ public class RemoteCacheSessionsLoader implements SessionLoader> segmentsByAddress = operationsFactory.getPrimarySegmentsByAddress(); - - for (Map.Entry> entry : segmentsByAddress.entrySet()) { - SocketAddress targetAddress = entry.getKey(); - - // Same like RemoteCloseableIterator.startInternal - IterationStartOperation iterationStartOperation = operationsFactory.newIterationStartOperation(null, null, null, sessionsPerSegment, false, null, targetAddress); - IterationStartResponse startResponse = await(iterationStartOperation.execute()); - - try { - // Could happen for non-clustered caches - if (startResponse.getSegmentConsistentHash() == null) { - return -1; - } else { - return startResponse.getSegmentConsistentHash().getNumSegments(); - } - } finally { - startResponse.getChannel().close(); - } - } - // Handle the case when primary segments owned by the address are not known - return -1; - } - - @Override public WorkerContext computeWorkerContext(RemoteCacheSessionsLoaderContext loaderCtx, int segment, int workerId, WorkerResult previousResult) { return new WorkerContext(segment, workerId); @@ -112,72 +72,59 @@ public class RemoteCacheSessionsLoader implements SessionLoader cache = getCache(session); + Cache decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES); + RemoteCache remoteCache = getRemoteCache(session); - Set myIspnSegments = getMyIspnSegments(ctx.getSegment(), loaderContext); - - log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment()); - - Map remoteEntries = new HashMap<>(); - CloseableIterator iterator = null; int countLoaded = 0; - try { - iterator = remoteCache.retrieveEntries(null, myIspnSegments, loaderContext.getSessionsPerSegment()); - while (iterator.hasNext()) { - countLoaded++; - Map.Entry entry = iterator.next(); - remoteEntries.put(entry.getKey(), entry.getValue()); + try (CloseableIterator> it = remoteCache.retrieveEntries(null, loaderContext.getSessionsPerSegment())) { + Map toInsert = new HashMap<>(loaderContext.getSessionsPerSegment()); + int count = 0; + while (it.hasNext()) { + Map.Entry entry = it.next(); + toInsert.put(entry.getKey(), entry.getValue()); + ++countLoaded; + if (++count == loaderContext.getSessionsPerSegment()) { + insertSessions(decoratedCache, toInsert); + toInsert = new HashMap<>(loaderContext.getSessionsPerSegment()); + count = 0; + } + } + + if (!toInsert.isEmpty()) { + // last batch + insertSessions(decoratedCache, toInsert); } } catch (RuntimeException e) { log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment()); throw e; - } finally { - if (iterator != null) { - iterator.close(); - } } - DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> - // With Infinispan 14.0.21/14.0.19, we've seen deadlocks in tests where this future never completed when shutting down the internal Infinispan. - // Therefore, prevent the shutdown of the internal Infinispan during this step. - decoratedCache.putAll(remoteEntries) - ); - log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded); return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId()); } + private void insertSessions(Cache cache, Map entries) { + log.debugf("Adding %d entries to cache '%s'", entries.size(), cacheName); - // Compute set of ISPN segments into 1 "worker" segment - protected Set getMyIspnSegments(int segment, RemoteCacheSessionsLoaderContext ctx) { - // Remote cache is non-clustered - if (ctx.getIspnSegmentsCount() < 0) { - return null; - } - - if (ctx.getIspnSegmentsCount() % ctx.getSegmentsCount() > 0) { - throw new IllegalStateException("Illegal state. IspnSegmentsCount: " + ctx.getIspnSegmentsCount() + ", segmentsCount: " + ctx.getSegmentsCount()); - } - - int countPerSegment = ctx.getIspnSegmentsCount() / ctx.getSegmentsCount(); - int first = segment * countPerSegment; - int last = first + countPerSegment - 1; - - Set myIspnSegments = new HashSet<>(); - for (int i=first ; i<=last ; i++) { - myIspnSegments.add(i); - } - return myIspnSegments; + // The `putAll` operation might time out when a node becomes unavailable, therefore, retry. + Retry.executeWithBackoff( + (int iteration) -> { + DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> { + // With Infinispan 14.0.21/14.0.19, we've seen deadlocks in tests where this future never completed when shutting down the internal Infinispan. + // Therefore, prevent the shutdown of the internal Infinispan during this step. + cache.putAll(entries); + }); + }, + (iteration, throwable) -> log.warnf("Unable to put entries into the cache in iteration %s", iteration, throwable), + 3, + 10); } - @Override public boolean isFinished(BaseCacheInitializer initializer) { Cache workCache = initializer.getWorkCache(); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java index bf8787c882..6d7b8c006a 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoaderContext.java @@ -25,65 +25,34 @@ import org.keycloak.models.sessions.infinispan.initializer.SessionLoader; */ public class RemoteCacheSessionsLoaderContext extends SessionLoader.LoaderContext { - // Count of hash segments for remote infinispan cache. It's by default 256 for distributed/replicated caches - private final int ispnSegmentsCount; - private final int sessionsPerSegment; - private final int sessionsTotal; - - public RemoteCacheSessionsLoaderContext(int ispnSegmentsCount, int sessionsPerSegment, int sessionsTotal) { - super(computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount)); - this.ispnSegmentsCount = ispnSegmentsCount; + public RemoteCacheSessionsLoaderContext(int sessionsPerSegment) { + super(1); this.sessionsPerSegment = sessionsPerSegment; - this.sessionsTotal = sessionsTotal; } - // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration. - // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items, - // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8) - // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment) - private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) { + private static int computeSegmentsCount(int ispnSegments) { // No support by remote ISPN cache for segments. This can happen if remoteCache is local (non-clustered) if (ispnSegments < 0) { return 1; } - int seg = sessionsTotal / sessionsPerSegment; - if (sessionsTotal % sessionsPerSegment > 0) { - seg = seg + 1; - } - - int seg2 = 1; - while (seg2Marek Posolda */ @@ -100,81 +91,24 @@ public class JDGPutTest { } private static void bulkLoadSessions(RemoteCache remoteCache) { - int size = remoteCache.size(); - int ispnSegmentsCount = getIspnSegmentsCount(remoteCache); - RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(ispnSegmentsCount, 64, 1); + RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(64); - Set myIspnSegments = getMyIspnSegments(0, ctx); + Map toInsert = new HashMap<>(ctx.getSessionsPerSegment()); - Map remoteEntries = new HashMap<>(); - CloseableIterator iterator = null; - int countLoaded = 0; - try { - iterator = remoteCache.retrieveEntries(null, myIspnSegments, ctx.getSessionsPerSegment()); - while (iterator.hasNext()) { - countLoaded++; - Map.Entry entry = iterator.next(); - remoteEntries.put(entry.getKey(), entry.getValue()); + try (CloseableIterator> it = remoteCache.retrieveEntries(null, ctx.getSessionsPerSegment())) { + while (it.hasNext()) { + Map.Entry entry = it.next(); + toInsert.put(entry.getKey(), entry.getValue()); } + } catch (RuntimeException e) { - System.err.println(String.format("Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), 1)); + logger.warnf(e, "Error loading sessions from remote cache '%s'", remoteCache.getName()); throw e; - } finally { - if (iterator != null) { - iterator.close(); - } } - logger.info("Loaded " + remoteEntries); + logger.info("Loaded " + toInsert); } - protected static int getIspnSegmentsCount(RemoteCache remoteCache) { - OperationsFactory operationsFactory = ((RemoteCacheImpl) remoteCache).getOperationsFactory(); - Map> segmentsByAddress = operationsFactory.getPrimarySegmentsByAddress(); - for (Map.Entry> entry : segmentsByAddress.entrySet()) { - SocketAddress targetAddress = entry.getKey(); - - // Same like RemoteCloseableIterator.startInternal - IterationStartOperation iterationStartOperation = operationsFactory.newIterationStartOperation(null, null, null, 64, false, null, targetAddress); - IterationStartResponse startResponse = await(iterationStartOperation.execute()); - - try { - // Could happen for non-clustered caches - if (startResponse.getSegmentConsistentHash() == null) { - return -1; - } else { - return startResponse.getSegmentConsistentHash().getNumSegments(); - } - } finally { - startResponse.getChannel().close(); - } - } - // Handle the case when primary segments owned by the address are not known - return -1; - } - - // Compute set of ISPN segments into 1 "worker" segment - protected static Set getMyIspnSegments(int segment, RemoteCacheSessionsLoaderContext ctx) { - // Remote cache is non-clustered - if (ctx.getIspnSegmentsCount() < 0) { - return null; - } - - if (ctx.getIspnSegmentsCount() % ctx.getSegmentsCount() > 0) { - throw new IllegalStateException("Illegal state. IspnSegmentsCount: " + ctx.getIspnSegmentsCount() + ", segmentsCount: " + ctx.getSegmentsCount()); - } - - int countPerSegment = ctx.getIspnSegmentsCount() / ctx.getSegmentsCount(); - int first = segment * countPerSegment; - int last = first + countPerSegment - 1; - - Set myIspnSegments = new HashSet<>(); - for (int i=first ; i<=last ; i++) { - myIspnSegments.add(i); - } - return myIspnSegments; - - } } diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java index 44b6145513..7d58ed898e 100644 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java +++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java @@ -105,9 +105,6 @@ public class RemoteCacheSessionsLoaderTest { loader.init(null); RemoteCacheSessionsLoaderContext ctx = loader.computeLoaderContext(null); - Assert.assertEquals(ctx.getSessionsTotal(), COUNT); - Assert.assertEquals(ctx.getIspnSegmentsCount(), 256); - //Assert.assertEquals(ctx.getSegmentsCount(), 16); Assert.assertEquals(ctx.getSessionsPerSegment(), 64); int totalCount = 0; diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java index 1da635ce92..ec117670c0 100644 --- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java +++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java @@ -50,27 +50,11 @@ public class InitializerStateTest { @Test public void testRemoteLoaderContext() { - assertSegmentsForRemoteLoader(0, 64, -1, 1); - assertSegmentsForRemoteLoader(0, 64, 256, 1); - assertSegmentsForRemoteLoader(5, 64, 256, 1); - assertSegmentsForRemoteLoader(63, 64, 256, 1); - assertSegmentsForRemoteLoader(64, 64, 256, 1); - assertSegmentsForRemoteLoader(65, 64, 256, 2); - assertSegmentsForRemoteLoader(127, 64, 256, 2); - assertSegmentsForRemoteLoader(1000, 64, 256, 16); - - assertSegmentsForRemoteLoader(2047, 64, 256, 32); - assertSegmentsForRemoteLoader(2048, 64, 256, 32); - assertSegmentsForRemoteLoader(2049, 64, 256, 64); - - assertSegmentsForRemoteLoader(1000, 64, 256, 16); - assertSegmentsForRemoteLoader(10000, 64, 256, 256); - assertSegmentsForRemoteLoader(1000000, 64, 256, 256); - assertSegmentsForRemoteLoader(10000000, 64, 256, 256); + assertSegmentsForRemoteLoader(64, 1); } - private void assertSegmentsForRemoteLoader(int sessionsTotal, int sessionsPerSegment, int ispnSegmentsCount, int expectedSegments) { - RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(ispnSegmentsCount, sessionsPerSegment, sessionsTotal); + private void assertSegmentsForRemoteLoader(int sessionsPerSegment, int expectedSegments) { + RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(sessionsPerSegment); Assert.assertEquals(expectedSegments, ctx.getSegmentsCount()); } @@ -111,7 +95,7 @@ public class InitializerStateTest { } @Test - public void testDailyTimeout() throws Exception { + public void testDailyTimeout() { Date date = new Date(CacheableStorageProviderModel.dailyTimeout(10, 30)); System.out.println(DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL).format(date)); date = new Date(CacheableStorageProviderModel.dailyTimeout(17, 45)); diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/OfflineSessionPersistenceTest.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/OfflineSessionPersistenceTest.java index 569a9ba642..11e0228d7c 100644 --- a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/OfflineSessionPersistenceTest.java +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/OfflineSessionPersistenceTest.java @@ -343,7 +343,7 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest { // Shutdown factory -> enforce session persistence closeKeycloakSessionFactory(); - inIndependentFactories(4, 30, () -> assertOfflineSessionsExist(realmId, offlineSessionIds)); + inIndependentFactories(4, 60, () -> assertOfflineSessionsExist(realmId, offlineSessionIds)); } /**