Remove concurrent loading of remote sessions as at startup time only one node is up anyway. (#25709)

Closes #22082

Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
Co-authored-by: Martin Kanis <martin-kanis@users.noreply.github.com>
This commit is contained in:
Alexander Schwartz 2024-01-09 16:55:22 +01:00 committed by GitHub
parent e979ed3b30
commit 01939bcf34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 136 additions and 369 deletions

View file

@ -104,6 +104,13 @@ first time using the `idp-review-user-profile.ftl` template.
For more details, see link:{upgradingguide_link}[{upgradingguide_name}].
= Sequential loading of offline sessions and remote sessions
Starting with this release, the first member of a Keycloak cluster will load remote sessions sequentially instead of in parallel.
If offline session preloading is enabled, those will be loaded sequentially as well.
For more details, see link:{upgradingguide_link}[{upgradingguide_name}].
= Performing actions on behalf of another user is not longer possible when the user is already authenticated
In this release, you can no longer perform actions such as email verification if the user is already authenticated

View file

@ -144,6 +144,18 @@ and potentially introduce unexpected changes and behavior that should only affec
If you have customizations to the `login-update-profile.ftl` template to customize how users update their profiles when authenticating through a broker, make sure to move your changes
to the new template.
= Sequential loading of offline sessions and remote sessions
Starting with this release, the first member of a Keycloak cluster will load remote sessions sequentially instead of in parallel.
If offline session preloading is enabled, those will be loaded sequentially as well.
The previous code led to high resource-consumption across the cluster at startup and was challenging to analyze in production environments and could lead to complex failure scenarios if a node was restarted during loading.
Therefore, it was changed to sequential session loading.
For offline sessions, the default in this and previous versions of Keycloak is to load those sessions on demand, which scales better with a lot of offline sessions than the attempt to preload them in parallel. Setups that use this default setup are not affected by the change of the loading strategy for offline sessions.
Setups that have offline session preloading enabled should migrate to a setup where offline-session preloading is disabled.
= Infinispan metrics use labels for cache manager and cache names
When enabling metrics for {project_name}'s embedded caches, the metrics now use labels for the cache manager and the cache names.

View file

@ -23,6 +23,12 @@
<groupId>org.keycloak</groupId>
<version>999.0.0-SNAPSHOT</version>
</parent>
<properties>
<maven.compiler.release>17</maven.compiler.release>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<modelVersion>4.0.0</modelVersion>
<artifactId>keycloak-model-infinispan</artifactId>

View file

@ -24,10 +24,7 @@ import org.infinispan.context.Flag;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -40,11 +37,11 @@ public abstract class BaseCacheInitializer extends CacheInitializer {
protected final KeycloakSessionFactory sessionFactory;
protected final Cache<String, Serializable> workCache;
protected final SessionLoader sessionLoader;
protected final SessionLoader<SessionLoader.LoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult> sessionLoader;
protected final int sessionsPerSegment;
protected final String stateKey;
public BaseCacheInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, String stateKeySuffix, int sessionsPerSegment) {
public BaseCacheInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader<SessionLoader.LoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult> sessionLoader, String stateKeySuffix, int sessionsPerSegment) {
this.sessionFactory = sessionFactory;
this.workCache = workCache;
this.sessionLoader = sessionLoader;

View file

@ -18,10 +18,7 @@
package org.keycloak.models.sessions.infinispan.initializer;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.manager.ClusterExecutor;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
@ -29,18 +26,14 @@ import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.utils.KeycloakModelUtils;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
/**
* Startup initialization for reading persistent userSessions to be filled into infinispan/memory . In cluster,
* the initialization is distributed among all cluster nodes, so the startup time is even faster
* Startup initialization for reading persistent userSessions to be filled into infinispan/memory.
*
* Implementation is pretty generic and doesn't contain any "userSession" specific stuff. All logic related to how are sessions loaded is in the SessionLoader implementation
* Implementation is pretty generic and doesn't contain any "userSession" specific stuff. All logic related to how sessions are loaded is in the SessionLoader implementation
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -119,119 +112,72 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
}
protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) {
// Assume each worker has same processor's count
int processors = Runtime.getRuntime().availableProcessors();
Transport transport = workCache.getCacheManager().getTransport();
// Every worker iteration will be executed on single node. Use 3 failover attempts for each segment (should be sufficient in all cases)
ClusterExecutor clusterExecutor = workCache.getCacheManager().executor()
.singleNodeSubmission(3);
int errors = 0;
int segmentToLoad = 0;
//try {
SessionLoader.WorkerResult previousResult = null;
SessionLoader.WorkerResult nextResult = null;
int distributedWorkersCount = 0;
boolean firstTryForSegment = true;
SessionLoader.WorkerResult previousResult = null;
SessionLoader.WorkerResult nextResult = null;
int distributedWorkersCount = 1;
while (segmentToLoad < state.getSegmentsCount()) {
if (firstTryForSegment) {
// do not change the node count if it's not the first try
int nodesCount = transport==null ? 1 : transport.getMembers().size();
distributedWorkersCount = processors * nodesCount;
}
while (segmentToLoad < state.getSegmentsCount()) {
log.debugf("Starting next iteration with %d workers", distributedWorkersCount);
log.debugf("Starting next iteration with %d workers", distributedWorkersCount);
List<Integer> segments = state.getSegmentsToLoad(segmentToLoad, distributedWorkersCount);
List<Integer> segments = state.getSegmentsToLoad(segmentToLoad, distributedWorkersCount);
if (log.isTraceEnabled()) {
log.trace("unfinished segments for this iteration: " + segments);
}
if (log.isTraceEnabled()) {
log.trace("unfinished segments for this iteration: " + segments);
}
List<CompletableFuture<Void>> futures = new LinkedList<>();
final Queue<SessionLoader.WorkerResult> results = new ConcurrentLinkedQueue<>();
final Queue<SessionLoader.WorkerResult> results = new ConcurrentLinkedQueue<>();
CompletableFuture<Void> completableFuture = null;
for (Integer segment : segments) {
SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, segment - segmentToLoad, previousResult);
for (Integer segment : segments) {
SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, segment - segmentToLoad, previousResult);
SessionInitializerWorker worker = new SessionInitializerWorker();
worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader, workCache.getName());
SessionInitializerWorker worker = new SessionInitializerWorker();
worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader);
completableFuture = clusterExecutor.submitConsumer(worker, (address, workerResult, throwable) -> {
log.tracef("Calling triConsumer on address %s, throwable message: %s, segment: %s", address, throwable == null ? "null" : throwable.getMessage(),
workerResult == null ? null : workerResult.getSegment());
results.add(worker.apply(sessionFactory));
}
if (throwable != null) {
throw new CacheException(throwable);
}
results.add(workerResult);
});
boolean anyFailure = false;
futures.add(completableFuture);
}
boolean anyFailure = false;
// Make sure that all workers are finished
for (CompletableFuture<Void> future : futures) {
try {
future.get();
} catch (InterruptedException ie) {
anyFailure = true;
errors++;
log.error("Interruped exception when computed future. Errors: " + errors, ie);
} catch (ExecutionException ee) {
anyFailure = true;
errors++;
log.error("ExecutionException when computed future. Errors: " + errors, ee);
}
}
// Check the results
for (SessionLoader.WorkerResult result : results) {
if (result.isSuccess()) {
state.markSegmentFinished(result.getSegment());
if (result.getSegment() == segmentToLoad + distributedWorkersCount - 1) {
// last result for next iteration when complete
nextResult = result;
}
} else {
if (log.isTraceEnabled()) {
log.tracef("Segment %d failed to compute", result.getSegment());
}
anyFailure = true;
}
}
if (errors >= maxErrors) {
throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details");
}
if (!anyFailure) {
// everything is OK, prepare the new row
segmentToLoad += distributedWorkersCount;
firstTryForSegment = true;
previousResult = nextResult;
nextResult = null;
if (log.isTraceEnabled()) {
log.debugf("New initializer state is: %s", state);
// Check the results
for (SessionLoader.WorkerResult result : results) {
if (result.isSuccess()) {
state.markSegmentFinished(result.getSegment());
if (result.getSegment() == segmentToLoad + distributedWorkersCount - 1) {
// last result for next iteration when complete
nextResult = result;
}
} else {
// some segments failed, try to load unloaded segments
firstTryForSegment = false;
if (log.isTraceEnabled()) {
log.tracef("Segment %d failed to compute", result.getSegment());
}
anyFailure = true;
}
}
// Push the state after computation is finished
saveStateToCache(state);
if (errors >= maxErrors) {
throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details");
}
// Loader callback after the task is finished
this.sessionLoader.afterAllSessionsLoaded(this);
if (!anyFailure) {
// everything is OK, prepare the new row
segmentToLoad += distributedWorkersCount;
previousResult = nextResult;
nextResult = null;
if (log.isTraceEnabled()) {
log.debugf("New initializer state is: %s", state);
}
}
}
// Push the state after computation is finished
saveStateToCache(state);
// Loader callback after the task is finished
this.sessionLoader.afterAllSessionsLoaded(this);
}
}

View file

@ -17,12 +17,7 @@
package org.keycloak.models.sessions.infinispan.initializer;
import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.utils.KeycloakModelUtils;
import java.io.Serializable;
@ -31,48 +26,21 @@ import java.util.function.Function;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class SessionInitializerWorker implements Function<EmbeddedCacheManager, SessionLoader.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
public class SessionInitializerWorker implements Function<KeycloakSessionFactory, SessionLoader.WorkerResult>, Serializable {
private SessionLoader.LoaderContext loaderCtx;
private SessionLoader.WorkerContext workerCtx;
private SessionLoader sessionLoader;
private SessionLoader<SessionLoader.LoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult> sessionLoader;
private String cacheName;
public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader, String cacheName) {
public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader<SessionLoader.LoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult> sessionLoader) {
this.loaderCtx = loaderCtx;
this.workerCtx = workerCtx;
this.sessionLoader = sessionLoader;
this.cacheName = cacheName;
}
@Override
public SessionLoader.WorkerResult apply(EmbeddedCacheManager embeddedCacheManager) {
Cache<Object, Object> workCache = embeddedCacheManager.getCache(cacheName);
if (log.isTraceEnabled()) {
log.tracef("Running computation for segment %s with worker %s", workerCtx.getSegment(), workerCtx.getWorkerId());
}
KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
if (sessionFactory == null) {
log.debugf("KeycloakSessionFactory not yet set in cache. Worker skipped");
return sessionLoader.createFailedWorkerResult(loaderCtx, workerCtx);
}
SessionLoader.WorkerResult[] ref = new SessionLoader.WorkerResult[1];
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
ref[0] = sessionLoader.loadSessions(session, loaderCtx, workerCtx);
}
});
return ref[0];
public SessionLoader.WorkerResult apply(KeycloakSessionFactory sessionFactory) {
return KeycloakModelUtils.runJobInTransactionWithResult(sessionFactory, (session) -> sessionLoader.loadSessions(session, loaderCtx, workerCtx));
}
}

View file

@ -18,21 +18,15 @@
package org.keycloak.models.sessions.infinispan.remotestore;
import java.io.Serializable;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.operations.IterationStartOperation;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.context.Flag;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Retry;
import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
@ -40,8 +34,6 @@ import org.keycloak.models.sessions.infinispan.initializer.BaseCacheInitializer;
import org.keycloak.models.sessions.infinispan.initializer.OfflinePersistentUserSessionLoader;
import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
import static org.infinispan.client.hotrod.impl.Util.await;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -65,42 +57,10 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
@Override
public RemoteCacheSessionsLoaderContext computeLoaderContext(KeycloakSession session) {
RemoteCache remoteCache = getRemoteCache(session);
int sessionsTotal = remoteCache.size();
int ispnSegments = getIspnSegmentsCount(remoteCache);
return new RemoteCacheSessionsLoaderContext(ispnSegments, sessionsPerSegment, sessionsTotal);
return new RemoteCacheSessionsLoaderContext(sessionsPerSegment);
}
protected int getIspnSegmentsCount(RemoteCache remoteCache) {
OperationsFactory operationsFactory = ((RemoteCacheImpl) remoteCache).getOperationsFactory();
Map<SocketAddress, Set<Integer>> segmentsByAddress = operationsFactory.getPrimarySegmentsByAddress();
for (Map.Entry<SocketAddress, Set<Integer>> entry : segmentsByAddress.entrySet()) {
SocketAddress targetAddress = entry.getKey();
// Same like RemoteCloseableIterator.startInternal
IterationStartOperation iterationStartOperation = operationsFactory.newIterationStartOperation(null, null, null, sessionsPerSegment, false, null, targetAddress);
IterationStartResponse startResponse = await(iterationStartOperation.execute());
try {
// Could happen for non-clustered caches
if (startResponse.getSegmentConsistentHash() == null) {
return -1;
} else {
return startResponse.getSegmentConsistentHash().getNumSegments();
}
} finally {
startResponse.getChannel().close();
}
}
// Handle the case when primary segments owned by the address are not known
return -1;
}
@Override
public WorkerContext computeWorkerContext(RemoteCacheSessionsLoaderContext loaderCtx, int segment, int workerId, WorkerResult previousResult) {
return new WorkerContext(segment, workerId);
@ -112,72 +72,59 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
return new WorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId());
}
@Override
public WorkerResult loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext loaderContext, WorkerContext ctx) {
Cache cache = getCache(session);
Cache decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES);
RemoteCache remoteCache = getRemoteCache(session);
Cache<Object, Object> cache = getCache(session);
Cache<Object, Object> decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES);
RemoteCache<?, ?> remoteCache = getRemoteCache(session);
Set<Integer> myIspnSegments = getMyIspnSegments(ctx.getSegment(), loaderContext);
log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment());
Map<Object, Object> remoteEntries = new HashMap<>();
CloseableIterator<Map.Entry> iterator = null;
int countLoaded = 0;
try {
iterator = remoteCache.retrieveEntries(null, myIspnSegments, loaderContext.getSessionsPerSegment());
while (iterator.hasNext()) {
countLoaded++;
Map.Entry entry = iterator.next();
remoteEntries.put(entry.getKey(), entry.getValue());
try (CloseableIterator<Map.Entry<Object, Object>> it = remoteCache.retrieveEntries(null, loaderContext.getSessionsPerSegment())) {
Map<Object, Object> toInsert = new HashMap<>(loaderContext.getSessionsPerSegment());
int count = 0;
while (it.hasNext()) {
Map.Entry<?,?> entry = it.next();
toInsert.put(entry.getKey(), entry.getValue());
++countLoaded;
if (++count == loaderContext.getSessionsPerSegment()) {
insertSessions(decoratedCache, toInsert);
toInsert = new HashMap<>(loaderContext.getSessionsPerSegment());
count = 0;
}
}
if (!toInsert.isEmpty()) {
// last batch
insertSessions(decoratedCache, toInsert);
}
} catch (RuntimeException e) {
log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
throw e;
} finally {
if (iterator != null) {
iterator.close();
}
}
DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() ->
// With Infinispan 14.0.21/14.0.19, we've seen deadlocks in tests where this future never completed when shutting down the internal Infinispan.
// Therefore, prevent the shutdown of the internal Infinispan during this step.
decoratedCache.putAll(remoteEntries)
);
log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded);
return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());
}
private void insertSessions(Cache<Object, Object> cache, Map<Object, Object> entries) {
log.debugf("Adding %d entries to cache '%s'", entries.size(), cacheName);
// Compute set of ISPN segments into 1 "worker" segment
protected Set<Integer> getMyIspnSegments(int segment, RemoteCacheSessionsLoaderContext ctx) {
// Remote cache is non-clustered
if (ctx.getIspnSegmentsCount() < 0) {
return null;
}
if (ctx.getIspnSegmentsCount() % ctx.getSegmentsCount() > 0) {
throw new IllegalStateException("Illegal state. IspnSegmentsCount: " + ctx.getIspnSegmentsCount() + ", segmentsCount: " + ctx.getSegmentsCount());
}
int countPerSegment = ctx.getIspnSegmentsCount() / ctx.getSegmentsCount();
int first = segment * countPerSegment;
int last = first + countPerSegment - 1;
Set<Integer> myIspnSegments = new HashSet<>();
for (int i=first ; i<=last ; i++) {
myIspnSegments.add(i);
}
return myIspnSegments;
// The `putAll` operation might time out when a node becomes unavailable, therefore, retry.
Retry.executeWithBackoff(
(int iteration) -> {
DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> {
// With Infinispan 14.0.21/14.0.19, we've seen deadlocks in tests where this future never completed when shutting down the internal Infinispan.
// Therefore, prevent the shutdown of the internal Infinispan during this step.
cache.putAll(entries);
});
},
(iteration, throwable) -> log.warnf("Unable to put entries into the cache in iteration %s", iteration, throwable),
3,
10);
}
@Override
public boolean isFinished(BaseCacheInitializer initializer) {
Cache<String, Serializable> workCache = initializer.getWorkCache();

View file

@ -25,65 +25,34 @@ import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
*/
public class RemoteCacheSessionsLoaderContext extends SessionLoader.LoaderContext {
// Count of hash segments for remote infinispan cache. It's by default 256 for distributed/replicated caches
private final int ispnSegmentsCount;
private final int sessionsPerSegment;
private final int sessionsTotal;
public RemoteCacheSessionsLoaderContext(int ispnSegmentsCount, int sessionsPerSegment, int sessionsTotal) {
super(computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount));
this.ispnSegmentsCount = ispnSegmentsCount;
public RemoteCacheSessionsLoaderContext(int sessionsPerSegment) {
super(1);
this.sessionsPerSegment = sessionsPerSegment;
this.sessionsTotal = sessionsTotal;
}
// Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
// Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
// we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
// and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
private static int computeSegmentsCount(int ispnSegments) {
// No support by remote ISPN cache for segments. This can happen if remoteCache is local (non-clustered)
if (ispnSegments < 0) {
return 1;
}
int seg = sessionsTotal / sessionsPerSegment;
if (sessionsTotal % sessionsPerSegment > 0) {
seg = seg + 1;
}
int seg2 = 1;
while (seg2<seg && seg2<ispnSegments) {
seg2 = seg2 << 1;
}
return seg2;
// always use the same number of ISPN segments to avoid touching multiple segments at a time
return ispnSegments;
}
public int getIspnSegmentsCount() {
return ispnSegmentsCount;
}
public int getSessionsPerSegment() {
return sessionsPerSegment;
}
public int getSessionsTotal() {
return sessionsTotal;
}
@Override
public String toString() {
return new StringBuilder("RemoteCacheSessionsLoaderContext [ ")
.append("segmentsCount: ").append(getSegmentsCount())
.append(", ispnSegmentsCount: ").append(ispnSegmentsCount)
.append(", sessionsPerSegment: ").append(sessionsPerSegment)
.append(", sessionsTotal: ").append(sessionsTotal)
.append(" ]")
.toString();
}

View file

@ -18,19 +18,12 @@
package org.keycloak.cluster.infinispan;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.operations.IterationStartOperation;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
@ -41,8 +34,6 @@ import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessi
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
import org.keycloak.connections.infinispan.InfinispanUtil;
import static org.infinispan.client.hotrod.impl.Util.await;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -100,81 +91,24 @@ public class JDGPutTest {
}
private static void bulkLoadSessions(RemoteCache remoteCache) {
int size = remoteCache.size();
int ispnSegmentsCount = getIspnSegmentsCount(remoteCache);
RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(ispnSegmentsCount, 64, 1);
RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(64);
Set<Integer> myIspnSegments = getMyIspnSegments(0, ctx);
Map<Object, Object> toInsert = new HashMap<>(ctx.getSessionsPerSegment());
Map<Object, Object> remoteEntries = new HashMap<>();
CloseableIterator<Map.Entry> iterator = null;
int countLoaded = 0;
try {
iterator = remoteCache.retrieveEntries(null, myIspnSegments, ctx.getSessionsPerSegment());
while (iterator.hasNext()) {
countLoaded++;
Map.Entry entry = iterator.next();
remoteEntries.put(entry.getKey(), entry.getValue());
try (CloseableIterator<Map.Entry<Object, Object>> it = remoteCache.retrieveEntries(null, ctx.getSessionsPerSegment())) {
while (it.hasNext()) {
Map.Entry<?,?> entry = it.next();
toInsert.put(entry.getKey(), entry.getValue());
}
} catch (RuntimeException e) {
System.err.println(String.format("Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), 1));
logger.warnf(e, "Error loading sessions from remote cache '%s'", remoteCache.getName());
throw e;
} finally {
if (iterator != null) {
iterator.close();
}
}
logger.info("Loaded " + remoteEntries);
logger.info("Loaded " + toInsert);
}
protected static int getIspnSegmentsCount(RemoteCache remoteCache) {
OperationsFactory operationsFactory = ((RemoteCacheImpl) remoteCache).getOperationsFactory();
Map<SocketAddress, Set<Integer>> segmentsByAddress = operationsFactory.getPrimarySegmentsByAddress();
for (Map.Entry<SocketAddress, Set<Integer>> entry : segmentsByAddress.entrySet()) {
SocketAddress targetAddress = entry.getKey();
// Same like RemoteCloseableIterator.startInternal
IterationStartOperation iterationStartOperation = operationsFactory.newIterationStartOperation(null, null, null, 64, false, null, targetAddress);
IterationStartResponse startResponse = await(iterationStartOperation.execute());
try {
// Could happen for non-clustered caches
if (startResponse.getSegmentConsistentHash() == null) {
return -1;
} else {
return startResponse.getSegmentConsistentHash().getNumSegments();
}
} finally {
startResponse.getChannel().close();
}
}
// Handle the case when primary segments owned by the address are not known
return -1;
}
// Compute set of ISPN segments into 1 "worker" segment
protected static Set<Integer> getMyIspnSegments(int segment, RemoteCacheSessionsLoaderContext ctx) {
// Remote cache is non-clustered
if (ctx.getIspnSegmentsCount() < 0) {
return null;
}
if (ctx.getIspnSegmentsCount() % ctx.getSegmentsCount() > 0) {
throw new IllegalStateException("Illegal state. IspnSegmentsCount: " + ctx.getIspnSegmentsCount() + ", segmentsCount: " + ctx.getSegmentsCount());
}
int countPerSegment = ctx.getIspnSegmentsCount() / ctx.getSegmentsCount();
int first = segment * countPerSegment;
int last = first + countPerSegment - 1;
Set<Integer> myIspnSegments = new HashSet<>();
for (int i=first ; i<=last ; i++) {
myIspnSegments.add(i);
}
return myIspnSegments;
}
}

View file

@ -105,9 +105,6 @@ public class RemoteCacheSessionsLoaderTest {
loader.init(null);
RemoteCacheSessionsLoaderContext ctx = loader.computeLoaderContext(null);
Assert.assertEquals(ctx.getSessionsTotal(), COUNT);
Assert.assertEquals(ctx.getIspnSegmentsCount(), 256);
//Assert.assertEquals(ctx.getSegmentsCount(), 16);
Assert.assertEquals(ctx.getSessionsPerSegment(), 64);
int totalCount = 0;

View file

@ -50,27 +50,11 @@ public class InitializerStateTest {
@Test
public void testRemoteLoaderContext() {
assertSegmentsForRemoteLoader(0, 64, -1, 1);
assertSegmentsForRemoteLoader(0, 64, 256, 1);
assertSegmentsForRemoteLoader(5, 64, 256, 1);
assertSegmentsForRemoteLoader(63, 64, 256, 1);
assertSegmentsForRemoteLoader(64, 64, 256, 1);
assertSegmentsForRemoteLoader(65, 64, 256, 2);
assertSegmentsForRemoteLoader(127, 64, 256, 2);
assertSegmentsForRemoteLoader(1000, 64, 256, 16);
assertSegmentsForRemoteLoader(2047, 64, 256, 32);
assertSegmentsForRemoteLoader(2048, 64, 256, 32);
assertSegmentsForRemoteLoader(2049, 64, 256, 64);
assertSegmentsForRemoteLoader(1000, 64, 256, 16);
assertSegmentsForRemoteLoader(10000, 64, 256, 256);
assertSegmentsForRemoteLoader(1000000, 64, 256, 256);
assertSegmentsForRemoteLoader(10000000, 64, 256, 256);
assertSegmentsForRemoteLoader(64, 1);
}
private void assertSegmentsForRemoteLoader(int sessionsTotal, int sessionsPerSegment, int ispnSegmentsCount, int expectedSegments) {
RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(ispnSegmentsCount, sessionsPerSegment, sessionsTotal);
private void assertSegmentsForRemoteLoader(int sessionsPerSegment, int expectedSegments) {
RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(sessionsPerSegment);
Assert.assertEquals(expectedSegments, ctx.getSegmentsCount());
}
@ -111,7 +95,7 @@ public class InitializerStateTest {
}
@Test
public void testDailyTimeout() throws Exception {
public void testDailyTimeout() {
Date date = new Date(CacheableStorageProviderModel.dailyTimeout(10, 30));
System.out.println(DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL).format(date));
date = new Date(CacheableStorageProviderModel.dailyTimeout(17, 45));

View file

@ -343,7 +343,7 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
// Shutdown factory -> enforce session persistence
closeKeycloakSessionFactory();
inIndependentFactories(4, 30, () -> assertOfflineSessionsExist(realmId, offlineSessionIds));
inIndependentFactories(4, 60, () -> assertOfflineSessionsExist(realmId, offlineSessionIds));
}
/**