From d69872fa11351018c59a14a0bcdb888e38b3c9ef Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Tue, 30 Apr 2024 14:07:35 +0200 Subject: [PATCH] Batch writes originating from logins/logouts for persistent sessions All writes for the sessions are handled by a background thread which batches them. Closes #28862 Wait for persistent-store to contain update instead of cache which has the change immediately since it is in memory + introduce new model-test profile Closes #29141 Signed-off-by: Alexander Schwartz Signed-off-by: Michal Hajas Co-authored-by: Michal Hajas --- .../AuthenticatedClientSessionAdapter.java | 5 - .../InfinispanUserSessionProviderFactory.java | 64 ++++++--- .../PersistentUserSessionProvider.java | 29 ++-- .../infinispan/UserSessionAdapter.java | 5 - ...onPersistentChangelogBasedTransaction.java | 9 +- .../EmbeddedCachesChangesPerformer.java | 131 ++++++++++-------- .../changes/JpaChangesPerformer.java | 58 ++++++-- .../infinispan/changes/MergedUpdate.java | 18 --- ...tentSessionsChangelogBasedTransaction.java | 33 +---- .../changes/PersistentSessionsWorker.java | 102 +++++++------- ...rredElement.java => PersistentUpdate.java} | 33 +++-- .../changes/SessionChangesPerformer.java | 4 + .../infinispan/changes/SessionUpdateTask.java | 4 - ...onPersistentChangelogBasedTransaction.java | 9 +- .../JpaUserSessionPersisterProvider.java | 16 ++- .../PersistentClientSessionEntity.java | 11 ++ .../session/PersistentUserSessionEntity.java | 8 ++ .../models/AbstractKeycloakTransaction.java | 2 - testsuite/model/pom.xml | 7 + .../parameters/PersistentUserSessions.java | 1 - .../PersistentUserSessionsNoCache.java | 41 ++++++ .../model/session/SessionTimeoutsTest.java | 35 +++-- 22 files changed, 365 insertions(+), 260 deletions(-) rename model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/{PersistentDeferredElement.java => PersistentUpdate.java} (55%) create mode 100644 testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessionsNoCache.java diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java index b4a566a870..c32ef85984 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java @@ -156,11 +156,6 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes .shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp); } - @Override - public boolean isDeferrable() { - return true; - } - @Override public String toString() { return "setTimestamp(" + timestamp + ')'; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java index b144aca376..963d461062 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java @@ -35,8 +35,8 @@ import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionProvider; import org.keycloak.models.UserSessionProviderFactory; +import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate; import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; -import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement; import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory; @@ -60,12 +60,15 @@ import org.keycloak.models.sessions.infinispan.util.SessionTimeouts; import org.keycloak.models.utils.KeycloakModelUtils; import org.keycloak.models.utils.PostMigrationEvent; import org.keycloak.models.utils.ResetTimeOffsetEvent; +import org.keycloak.provider.ProviderConfigProperty; +import org.keycloak.provider.ProviderConfigurationBuilder; import org.keycloak.provider.ProviderEvent; import org.keycloak.provider.ProviderEventListener; import org.keycloak.provider.ServerInfoAwareProviderFactory; import java.io.Serializable; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -85,6 +88,10 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider public static final String CLIENT_REMOVED_SESSION_EVENT = "CLIENT_REMOVED_SESSION_SESSIONS"; public static final String REMOVE_USER_SESSIONS_EVENT = "REMOVE_USER_SESSIONS_EVENT"; + public static final String CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE = "offlineSessionCacheEntryLifespanOverride"; + public static final String CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE = "offlineClientSessionCacheEntryLifespanOverride"; + public static final String CONFIG_MAX_BATCH_SIZE = "maxBatchSize"; + public static final int DEFAULT_MAX_BATCH_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 2); private long offlineSessionCacheEntryLifespanOverride; @@ -101,11 +108,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider SerializeExecutionsByKey serializerOfflineSession = new SerializeExecutionsByKey<>(); SerializeExecutionsByKey serializerClientSession = new SerializeExecutionsByKey<>(); SerializeExecutionsByKey serializerOfflineClientSession = new SerializeExecutionsByKey<>(); - ArrayBlockingQueue> asyncQueueUserSessions = new ArrayBlockingQueue<>(1000); - ArrayBlockingQueue> asyncQueueUserOfflineSessions = new ArrayBlockingQueue<>(1000); - ArrayBlockingQueue> asyncQueueClientSessions = new ArrayBlockingQueue<>(1000); - ArrayBlockingQueue> asyncQueueClientOfflineSessions = new ArrayBlockingQueue<>(1000); + ArrayBlockingQueue asyncQueuePersistentUpdate = new ArrayBlockingQueue<>(1000); private PersistentSessionsWorker persistentSessionsWorker; + private int maxBatchSize; @Override public UserSessionProvider create(KeycloakSession session) { @@ -133,10 +138,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider serializerOfflineSession, serializerClientSession, serializerOfflineClientSession, - asyncQueueUserSessions, - asyncQueueUserOfflineSessions, - asyncQueueClientSessions, - asyncQueueClientOfflineSessions + asyncQueuePersistentUpdate ); } return new InfinispanUserSessionProvider( @@ -162,8 +164,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider @Override public void init(Config.Scope config) { this.config = config; - offlineSessionCacheEntryLifespanOverride = config.getInt("offlineSessionCacheEntryLifespanOverride", -1); - offlineClientSessionCacheEntryLifespanOverride = config.getInt("offlineClientSessionCacheEntryLifespanOverride", -1); + offlineSessionCacheEntryLifespanOverride = config.getInt(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, -1); + offlineClientSessionCacheEntryLifespanOverride = config.getInt(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, -1); + maxBatchSize = config.getInt(CONFIG_MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_SIZE); } @Override @@ -212,10 +215,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider } } }); - persistentSessionsWorker = new PersistentSessionsWorker(factory, asyncQueueUserSessions, - asyncQueueUserOfflineSessions, - asyncQueueClientSessions, - asyncQueueClientOfflineSessions); + persistentSessionsWorker = new PersistentSessionsWorker(factory, + asyncQueuePersistentUpdate, + maxBatchSize); persistentSessionsWorker.start(); } @@ -452,9 +454,37 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider @Override public Map getOperationalInfo() { Map info = new HashMap<>(); - info.put("offlineSessionCacheEntryLifespanOverride", Long.toString(offlineSessionCacheEntryLifespanOverride)); - info.put("offlineClientSessionCacheEntryLifespanOverride", Long.toString(offlineClientSessionCacheEntryLifespanOverride)); + info.put(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, Long.toString(offlineSessionCacheEntryLifespanOverride)); + info.put(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, Long.toString(offlineClientSessionCacheEntryLifespanOverride)); + info.put(CONFIG_MAX_BATCH_SIZE, Integer.toString(maxBatchSize)); return info; } + + @Override + public List getConfigMetadata() { + ProviderConfigurationBuilder builder = ProviderConfigurationBuilder.create(); + + builder.property() + .name(CONFIG_MAX_BATCH_SIZE) + .type("int") + .helpText("Maximum size of a batch size (only applicable to persistent sessions") + .defaultValue(DEFAULT_MAX_BATCH_SIZE) + .add(); + + builder.property() + .name(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE) + .type("int") + .helpText("Override how long offline client sessions should be kept in memory") + .add(); + + builder.property() + .name(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE) + .type("int") + .helpText("Override how long offline user sessions should be kept in memory") + .add(); + + return builder.build(); + } + } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java index 9e939bb3ce..eeeab59f07 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java @@ -43,7 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter; import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction; import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction; -import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement; +import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate; import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; @@ -136,10 +136,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi SerializeExecutionsByKey serializerOfflineSession, SerializeExecutionsByKey serializerClientSession, SerializeExecutionsByKey serializerOfflineClientSession, - ArrayBlockingQueue> asyncQueueUserSessions, - ArrayBlockingQueue> asyncQueueUserOfflineSessions, - ArrayBlockingQueue> asyncQueueClientSessions, - ArrayBlockingQueue> asyncQueueClientOfflineSessions) { + ArrayBlockingQueue asyncQueuePersistentUpdate) { if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { throw new IllegalStateException("Persistent user sessions are not enabled"); } @@ -151,11 +148,15 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi this.offlineSessionCache = offlineSessionCache; this.offlineClientSessionCache = offlineClientSessionCache; - this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession, asyncQueueUserSessions); - this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession, asyncQueueUserOfflineSessions); + this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession, + asyncQueuePersistentUpdate); + this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession, + asyncQueuePersistentUpdate); - this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession, asyncQueueClientSessions); - this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession, asyncQueueClientOfflineSessions); + this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession, + asyncQueuePersistentUpdate); + this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession, + asyncQueuePersistentUpdate); this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session); @@ -1055,16 +1056,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG; } - public void processDeferredUserSessionElements(Collection> batch, boolean offline) { - UserSessionPersistentChangelogBasedTransaction transaction = getTransaction(offline); - transaction.applyDeferredBatch(batch); - } - - public void processDeferredClientSessionElements(Collection> batch, boolean offline) { - ClientSessionPersistentChangelogBasedTransaction transaction = getClientSessionTransaction(offline); - transaction.applyDeferredBatch(batch); - } - private static class RegisterClientSessionTask implements SessionUpdateTask { private final String clientUuid; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java index c40e9682b9..62401613ca 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java @@ -250,11 +250,6 @@ public class UserSessionAdapter> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, - UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey serializer, ArrayBlockingQueue> asyncQueue) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue); + UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey serializer, ArrayBlockingQueue batchingQueue) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, batchingQueue); this.keyGenerator = keyGenerator; this.userSessionTx = userSessionTx; } @@ -57,10 +57,14 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent if (myUpdates == null) { SessionEntityWrapper wrappedEntity = cache.get(key); if (wrappedEntity == null) { + LOG.debugf("client-session not found in cache for sessionId=%s, offline=%s, loading from persister", key, offline); wrappedEntity = getSessionEntityFromPersister(realm, client, userSession); + } else { + LOG.debugf("client-session found in cache for sessionId=%s, offline=%s", key, offline); } if (wrappedEntity == null) { + LOG.debugf("client-session not found in persister for sessionId=%s, offline=%s", key, offline); return null; } @@ -98,6 +102,7 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent SessionEntityWrapper authenticatedClientSessionEntitySessionEntityWrapper = importClientSession(realm, client, userSession, clientSession); if (authenticatedClientSessionEntitySessionEntityWrapper == null) { + LOG.debugf("client-session not imported from persister for sessionId=%s, offline=%s, removing from persister.", clientSession.getId(), offline); persister.removeClientSession(userSession.getId(), client.getId(), offline); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java index c23378672d..1fa7321acc 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java @@ -24,25 +24,26 @@ import org.keycloak.connections.infinispan.InfinispanUtil; import org.keycloak.models.sessions.infinispan.CacheDecorators; import org.keycloak.models.sessions.infinispan.entities.SessionEntity; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; public class EmbeddedCachesChangesPerformer implements SessionChangesPerformer { private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class); private final Cache> cache; - private final SerializeExecutionsByKey serializer; - private final List changes = new LinkedList<>(); + private final List>> changes = new LinkedList<>(); - public EmbeddedCachesChangesPerformer(Cache> cache, SerializeExecutionsByKey serializer) { + public EmbeddedCachesChangesPerformer(Cache> cache) { this.cache = cache; - this.serializer = serializer; } - private void runOperationInCluster(K key, MergedUpdate task, SessionEntityWrapper sessionWrapper) { + private CompletableFuture runOperationInCluster(K key, MergedUpdate task, SessionEntityWrapper sessionWrapper) { V session = sessionWrapper.getEntity(); SessionUpdateTask.CacheOperation operation = task.getOperation(session); @@ -52,82 +53,78 @@ public class EmbeddedCachesChangesPerformer implemen switch (operation) { case REMOVE: // Just remove it - CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache) + return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache) .withFlags(Flag.IGNORE_RETURN_VALUES) - .remove(key); - break; + .removeAsyncEntry(key); case ADD: - CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache) + return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache) .withFlags(Flag.IGNORE_RETURN_VALUES) - .put(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS); - - LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs()); - break; + .putAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS) + .thenAcceptAsync(v -> LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs())); case ADD_IF_ABSENT: - SessionEntityWrapper existing = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).putIfAbsent(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS); - if (existing != null) { - LOG.debugf("Existing entity in cache for key: %s . Will update it", key); + return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).putIfAbsentAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS) + .thenAccept(existing -> { + if (existing != null) { + LOG.debugf("Existing entity in cache for key: %s . Will update it", key); - // Apply updates on the existing entity and replace it - task.runUpdate(existing.getEntity()); + // Apply updates on the existing entity and replace it + task.runUpdate(existing.getEntity()); - replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs()); - } else { - LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs()); - } - break; + replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs()).join(); + } else { + LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + } + }); case REPLACE: - replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs()); - break; + return replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs()); default: throw new IllegalStateException("Unsupported state " + operation); } - } - private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { - serializer.runSerialized(key, () -> { - SessionEntityWrapper oldVersion = oldVersionEntity; - boolean replaced = false; - int iteration = 0; - V session = oldVersion.getEntity(); + private CompletableFuture replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { + // make one async attempt + return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replaceAsync(key, oldVersionEntity, generateNewVersionAndWrapEntity(oldVersionEntity.getEntity(), oldVersionEntity.getLocalMetadata()), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS) + .thenAccept(replaced -> { + int iteration = 0; + SessionEntityWrapper oldVersion = oldVersionEntity; + while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { + iteration++; - while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { - iteration++; + V session = oldVersion.getEntity(); + SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); - SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); + // Atomic cluster-aware replace + replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - // Atomic cluster-aware replace - replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); + // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again + if (!replaced) { + if (LOG.isDebugEnabled()) { + LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); + } + backoff(iteration); - // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again - if (!replaced) { - if (LOG.isDebugEnabled()) { - LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); - } - backoff(iteration); + oldVersion = cache.get(key); - oldVersion = cache.get(key); + if (oldVersion == null) { + LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); + return; + } - if (oldVersion == null) { - LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); - return; + session = oldVersion.getEntity(); + + task.runUpdate(session); + } else { + if (LOG.isTraceEnabled()) { + LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + } + } } - session = oldVersion.getEntity(); - - task.runUpdate(session); - } else { - if (LOG.isTraceEnabled()) { - LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + if (!replaced) { + LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); } - } - } - - if (!replaced) { - LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); - } - }); + }); } /** @@ -153,6 +150,18 @@ public class EmbeddedCachesChangesPerformer implemen @Override public void applyChanges() { - changes.forEach(Runnable::run); + if (!changes.isEmpty()) { + List exceptions = new ArrayList<>(); + CompletableFuture.allOf(changes.stream().map(s -> s.get().exceptionally(throwable -> { + exceptions.add(throwable); + return null; + })).toArray(CompletableFuture[]::new)).join(); + // If any of those futures has failed, add the exceptions as suppressed exceptions to our runtime exception + if (!exceptions.isEmpty()) { + RuntimeException ex = new RuntimeException("unable to complete the session updates"); + exceptions.forEach(ex::addSuppressed); + throw ex; + } + } } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java index 780db27e30..aad6c2cb41 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java @@ -18,7 +18,7 @@ package org.keycloak.models.sessions.infinispan.changes; import org.infinispan.util.function.TriConsumer; -import org.keycloak.common.util.Retry; +import org.jboss.logging.Logger; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; import org.keycloak.models.KeycloakSession; @@ -33,11 +33,11 @@ import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessi import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore; import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import org.keycloak.models.utils.KeycloakModelUtils; import org.keycloak.models.utils.RealmModelDelegate; import org.keycloak.models.utils.UserModelDelegate; import org.keycloak.models.utils.UserSessionModelDelegate; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -45,7 +45,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Consumer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME; @@ -53,23 +54,29 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.O import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME; public class JpaChangesPerformer implements SessionChangesPerformer { + private static final Logger LOG = Logger.getLogger(JpaChangesPerformer.class); - private final KeycloakSession session; private final String cacheName; private final boolean offline; - private final List> changes = new LinkedList<>(); + private final List changes = new LinkedList<>(); private final TriConsumer>, MergedUpdate> processor; + private final ArrayBlockingQueue batchingQueue; - public JpaChangesPerformer(KeycloakSession session, String cacheName, boolean offline) { - this.session = session; + public JpaChangesPerformer(String cacheName, boolean offline, ArrayBlockingQueue batchingQueue) { this.cacheName = cacheName; this.offline = offline; + this.batchingQueue = batchingQueue; processor = processor(); } + @Override + public boolean benefitsFromBatching() { + return true; + } + @Override public void registerChange(Map.Entry> entry, MergedUpdate merged) { - changes.add(innerSession -> processor.accept(innerSession, entry, merged)); + changes.add(new PersistentUpdate(innerSession -> processor.accept(innerSession, entry, merged))); } private TriConsumer>, MergedUpdate> processor() { @@ -80,12 +87,39 @@ public class JpaChangesPerformer implements SessionC }; } + private boolean warningShown = false; + + private void offer(PersistentUpdate update) { + if (!batchingQueue.offer(update)) { + if (!warningShown) { + warningShown = true; + LOG.warn("Queue is full, will block"); + } + try { + // this will block until there is a free spot in the queue + batchingQueue.put(update); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + @Override public void applyChanges() { - if (changes.size() > 0) { - Retry.executeWithBackoff(iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), - innerSession -> changes.forEach(c -> c.accept(innerSession))), - 10, 10); + if (!changes.isEmpty()) { + changes.forEach(this::offer); + List exceptions = new ArrayList<>(); + CompletableFuture.allOf(changes.stream().map(f -> f.future().exceptionally(throwable -> { + exceptions.add(throwable); + return null; + })).toArray(CompletableFuture[]::new)).join(); + // If any of those futures has failed, add the exceptions as suppressed exceptions to our runtime exception + if (!exceptions.isEmpty()) { + RuntimeException ex = new RuntimeException("unable to complete the session updates"); + exceptions.forEach(ex::addSuppressed); + throw ex; + } } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java index 2fb323473c..3c44789414 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java @@ -36,12 +36,6 @@ public class MergedUpdate implements SessionUpdateTask< private CrossDCMessageStatus crossDCMessageStatus; private final long lifespanMs; private final long maxIdleTimeMs; - private boolean isDeferrable; - - @Override - public boolean isDeferrable() { - return isDeferrable; - } private MergedUpdate(CacheOperation operation, CrossDCMessageStatus crossDCMessageStatus, long lifespanMs, long maxIdleTimeMs) { this.operation = operation; @@ -83,11 +77,7 @@ public class MergedUpdate implements SessionUpdateTask< MergedUpdate result = null; S session = sessionWrapper.getEntity(); - boolean isDeferrable = true; for (SessionUpdateTask child : childUpdates) { - if (!child.isDeferrable()) { - isDeferrable = false; - } if (result == null) { CacheOperation operation = child.getOperation(session); @@ -123,20 +113,12 @@ public class MergedUpdate implements SessionUpdateTask< result.childUpdates.add(child); } } - if (result != null) { - result.setDeferable(isDeferrable); - } return result; } - private void setDeferable(boolean isDeferrable) { - this.isDeferrable = isDeferrable; - } - @Override public String toString() { return "MergedUpdate" + childUpdates; } - } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java index 596a746dda..d9cf0027e9 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java @@ -26,8 +26,6 @@ import org.keycloak.models.sessions.infinispan.SessionFunction; import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; @@ -41,13 +39,11 @@ public class PersistentSessionsChangelogBasedTransaction> changesPerformers; protected final boolean offline; - private final ArrayBlockingQueue> asyncQueue; - private Collection> batch; - public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer, ArrayBlockingQueue> asyncQueue) { + public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer, + ArrayBlockingQueue batchingQueue) { super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer); this.offline = offline; - this.asyncQueue = asyncQueue; if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { throw new IllegalStateException("Persistent user sessions are not enabled"); @@ -56,12 +52,12 @@ public class PersistentSessionsChangelogBasedTransaction(session, cache.getName(), offline) + new JpaChangesPerformer<>(cache.getName(), offline, batchingQueue) ); } else { changesPerformers = List.of( - new JpaChangesPerformer<>(session, cache.getName(), offline), - new EmbeddedCachesChangesPerformer<>(cache, serializer), + new JpaChangesPerformer<>(cache.getName(), offline, batchingQueue), + new EmbeddedCachesChangesPerformer<>(cache), new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) ); } @@ -84,20 +80,10 @@ public class PersistentSessionsChangelogBasedTransaction merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs); if (merged != null) { - if (merged.isDeferrable()) { - asyncQueue.add(new PersistentDeferredElement<>(entry, merged)); - } else { - changesPerformers.forEach(p -> p.registerChange(entry, merged)); - } + changesPerformers.forEach(p -> p.registerChange(entry, merged)); } } - if (batch != null) { - batch.forEach(o -> { - changesPerformers.forEach(p -> p.registerChange(o.getEntry(), o.getMerged())); - }); - } - changesPerformers.forEach(SessionChangesPerformer::applyChanges); } @@ -106,11 +92,4 @@ public class PersistentSessionsChangelogBasedTransaction> batchToApply) { - if (this.batch == null) { - this.batch = new ArrayList<>(batchToApply.size()); - } - batch.addAll(batchToApply); - } - } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java index 9c90875950..4d9d0d7e6c 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java @@ -18,18 +18,14 @@ package org.keycloak.models.sessions.infinispan.changes; import org.jboss.logging.Logger; +import org.keycloak.common.util.Retry; import org.keycloak.models.KeycloakSessionFactory; -import org.keycloak.models.UserSessionProvider; -import org.keycloak.models.sessions.infinispan.PersistentUserSessionProvider; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.SessionEntity; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.utils.KeycloakModelUtils; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; @@ -42,57 +38,35 @@ public class PersistentSessionsWorker { private static final Logger LOG = Logger.getLogger(PersistentSessionsWorker.class); private final KeycloakSessionFactory factory; - private final ArrayBlockingQueue> asyncQueueUserSessions; - private final ArrayBlockingQueue> asyncQueueUserOfflineSessions; - private final ArrayBlockingQueue> asyncQueueClientSessions; - private final ArrayBlockingQueue> asyncQueueClientOfflineSessions; + private final ArrayBlockingQueue asyncQueuePersistentUpdate; + private final int maxBatchSize; private final List threads = new ArrayList<>(); private volatile boolean stop; - public PersistentSessionsWorker(KeycloakSessionFactory factory, ArrayBlockingQueue> asyncQueueUserSessions, ArrayBlockingQueue> asyncQueueUserOfflineSessions, ArrayBlockingQueue> asyncQueueClientSessions, ArrayBlockingQueue> asyncQueueClientOfflineSessions) { + public PersistentSessionsWorker(KeycloakSessionFactory factory, + ArrayBlockingQueue asyncQueuePersistentUpdate, int maxBatchSize) { this.factory = factory; - this.asyncQueueUserSessions = asyncQueueUserSessions; - this.asyncQueueUserOfflineSessions = asyncQueueUserOfflineSessions; - this.asyncQueueClientSessions = asyncQueueClientSessions; - this.asyncQueueClientOfflineSessions = asyncQueueClientOfflineSessions; + this.asyncQueuePersistentUpdate = asyncQueuePersistentUpdate; + this.maxBatchSize = maxBatchSize; } public void start() { - threads.add(new WorkerUserSession(asyncQueueUserSessions, false)); - threads.add(new WorkerUserSession(asyncQueueUserOfflineSessions, true)); - threads.add(new WorkerClientSession(asyncQueueClientSessions, false)); - threads.add(new WorkerClientSession(asyncQueueClientOfflineSessions, true)); + threads.add(new BatchWorker(asyncQueuePersistentUpdate)); threads.forEach(Thread::start); } - private class WorkerUserSession extends Worker { - public WorkerUserSession(ArrayBlockingQueue> queue, boolean offline) { - super(queue, offline, PersistentUserSessionProvider::processDeferredUserSessionElements); - } - } + private class BatchWorker extends Thread { + private final ArrayBlockingQueue queue; - private class WorkerClientSession extends Worker { - public WorkerClientSession(ArrayBlockingQueue> queue, boolean offline) { - super(queue, offline, PersistentUserSessionProvider::processDeferredClientSessionElements); - } - } - - private class Worker extends Thread { - private final ArrayBlockingQueue> queue; - private final boolean offline; - private final Adapter adapter; - - public Worker(ArrayBlockingQueue> queue, boolean offline, Adapter adapter) { + public BatchWorker(ArrayBlockingQueue queue) { this.queue = queue; - this.offline = offline; - this.adapter = adapter; } public void run() { - Thread.currentThread().setName(this.getClass().getName() + " for " + (offline ? "offline" : "online") + " sessions"); + Thread.currentThread().setName(this.getClass().getName()); while (!stop) { try { - process(queue, offline); + process(queue); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -100,25 +74,49 @@ public class PersistentSessionsWorker { } } - private void process(ArrayBlockingQueue> queue, boolean offline) throws InterruptedException { - Collection> batch = new ArrayList<>(); - PersistentDeferredElement polled = queue.poll(100, TimeUnit.MILLISECONDS); + private void process(ArrayBlockingQueue queue) throws InterruptedException { + ArrayList batch = new ArrayList<>(); + PersistentUpdate polled = queue.poll(100, TimeUnit.MILLISECONDS); if (polled != null) { batch.add(polled); - queue.drainTo(batch, 99); + queue.drainTo(batch, maxBatchSize - 1); try { LOG.debugf("Processing %d deferred session updates.", batch.size()); - KeycloakModelUtils.runJobInTransaction(factory, - session -> adapter.run(((PersistentUserSessionProvider) session.getProvider(UserSessionProvider.class)), batch, offline)); + Retry.executeWithBackoff(iteration -> { + if (iteration < 2) { + // attempt to write whole batch in the first two attempts + KeycloakModelUtils.runJobInTransaction(factory, + innerSession -> batch.forEach(c -> c.perform(innerSession))); + batch.forEach(PersistentUpdate::complete); + } else { + LOG.warnf("Running single changes in iteration %d for %d entries", iteration, batch.size()); + ArrayList performedChanges = new ArrayList<>(); + List throwables = new ArrayList<>(); + batch.forEach(change -> { + try { + KeycloakModelUtils.runJobInTransaction(factory, + change::perform); + change.complete(); + performedChanges.add(change); + } catch (Throwable ex) { + throwables.add(ex); + } + }); + batch.removeAll(performedChanges); + if (!throwables.isEmpty()) { + RuntimeException ex = new RuntimeException("unable to complete some changes"); + throwables.forEach(ex::addSuppressed); + throw ex; + } + } + }, + Duration.of(10, ChronoUnit.SECONDS), 0); } catch (RuntimeException ex) { - LOG.warnf(ex, "Unable to write %d deferred session updates", queue.size()); + batch.forEach(o -> o.fail(ex)); + LOG.warnf(ex, "Unable to write %d deferred session updates", batch.size()); } } } - - interface Adapter { - void run(PersistentUserSessionProvider sessionProvider, Collection> batch, boolean offline); - } } public void stop() { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentDeferredElement.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentUpdate.java similarity index 55% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentDeferredElement.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentUpdate.java index 91133104d2..accc5144c8 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentDeferredElement.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentUpdate.java @@ -17,29 +17,38 @@ package org.keycloak.models.sessions.infinispan.changes; -import org.keycloak.models.sessions.infinispan.entities.SessionEntity; +import org.keycloak.models.KeycloakSession; -import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * Capture information for a deferred update of the session stores. * * @author Alexander Schwartz */ -public class PersistentDeferredElement { - private final Map.Entry> entry; - private final MergedUpdate merged; +public class PersistentUpdate { - public PersistentDeferredElement(Map.Entry> entry, MergedUpdate merged) { - this.entry = entry; - this.merged = merged; + private final Consumer task; + private final CompletableFuture future = new CompletableFuture<>(); + + public PersistentUpdate(Consumer task) { + this.task = task; } - public Map.Entry> getEntry() { - return entry; + public void perform(KeycloakSession session) { + task.accept(session); } - public MergedUpdate getMerged() { - return merged; + public void complete() { + future.complete(null); + } + + public void fail(Throwable throwable) { + future.completeExceptionally(throwable); + } + + public CompletableFuture future() { + return future; } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionChangesPerformer.java index 51640c5d14..017e2f747b 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionChangesPerformer.java @@ -24,5 +24,9 @@ import java.util.Map; public interface SessionChangesPerformer { void registerChange(Map.Entry> entry, MergedUpdate merged); + default boolean benefitsFromBatching() { + return false; + } + void applyChanges(); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java index 234d8a312e..4a71507e5d 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java @@ -30,10 +30,6 @@ public interface SessionUpdateTask { CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper sessionWrapper); - default boolean isDeferrable() { - return false; - } - enum CacheOperation { ADD, diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java index 99bcf96ac5..ac191a01a3 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java @@ -42,8 +42,9 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction { private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class); - public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer, ArrayBlockingQueue> asyncQueue) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue); + public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer, + ArrayBlockingQueue batchingQueue) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, batchingQueue); } public SessionEntityWrapper get(RealmModel realm, String key) { @@ -54,10 +55,14 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe wrappedEntity = cache.get(key); } if (wrappedEntity == null) { + LOG.debugf("user-session not found in cache for sessionId=%s offline=%s, loading from persister", key, offline); wrappedEntity = getSessionEntityFromPersister(realm, key); + } else { + LOG.debugf("user-session found in cache for sessionId=%s offline=%s %s", key, offline, wrappedEntity.getEntity().getLastSessionRefresh()); } if (wrappedEntity == null) { + LOG.debugf("user-session not found in persister for sessionId=%s offline=%s", key, offline); return null; } diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java index 986b4bc633..4b3d376e7f 100644 --- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java +++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java @@ -165,13 +165,15 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv if (sessionEntity != null) { em.remove(sessionEntity); - // Remove userSession if it was last clientSession - List clientSessions = getClientSessionsByUserSession(sessionEntity.getUserSessionId(), offline); - if (clientSessions.size() == 0 && offline) { - offlineStr = offlineToString(offline); - PersistentUserSessionEntity userSessionEntity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(sessionEntity.getUserSessionId(), offlineStr), LockModeType.PESSIMISTIC_WRITE); - if (userSessionEntity != null) { - em.remove(userSessionEntity); + if (offline) { + // Remove userSession if it was last clientSession + List clientSessions = getClientSessionsByUserSession(sessionEntity.getUserSessionId(), offline); + if (clientSessions.isEmpty()) { + offlineStr = offlineToString(offline); + PersistentUserSessionEntity userSessionEntity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(sessionEntity.getUserSessionId(), offlineStr), LockModeType.PESSIMISTIC_WRITE); + if (userSessionEntity != null) { + em.remove(userSessionEntity); + } } } diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java index 832197d4f9..9bb1ee75f0 100644 --- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java +++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java @@ -211,5 +211,16 @@ public class PersistentClientSessionEntity { result = 31 * result + (this.offline != null ? this.offline.hashCode() : 0); return result; } + + @Override + public String toString() { + return "PersistentClientSessionEntity$Key[" + + "userSessionId='" + userSessionId + '\'' + + ", clientId='" + clientId + '\'' + + ", clientStorageProvider='" + clientStorageProvider + '\'' + + ", externalClientId='" + externalClientId + '\'' + + ", offline='" + offline + '\'' + + ']'; + } } } diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java index 78198658a9..4facb253db 100644 --- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java +++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java @@ -201,5 +201,13 @@ public class PersistentUserSessionEntity { result = 31 * result + (this.offline != null ? this.offline.hashCode() : 0); return result; } + + @Override + public String toString() { + return "PersistentUserSessionEntity$Key [" + + "userSessionId='" + userSessionId + '\'' + + ", offline='" + offline + '\'' + + ']'; + } } } diff --git a/server-spi/src/main/java/org/keycloak/models/AbstractKeycloakTransaction.java b/server-spi/src/main/java/org/keycloak/models/AbstractKeycloakTransaction.java index a3ea54bcd4..1692bd7ed7 100644 --- a/server-spi/src/main/java/org/keycloak/models/AbstractKeycloakTransaction.java +++ b/server-spi/src/main/java/org/keycloak/models/AbstractKeycloakTransaction.java @@ -26,8 +26,6 @@ import org.jboss.logging.Logger; */ public abstract class AbstractKeycloakTransaction implements KeycloakTransaction { - public static final Logger logger = Logger.getLogger(AbstractKeycloakTransaction.class); - protected TransactionState state = TransactionState.NOT_STARTED; @Override diff --git a/testsuite/model/pom.xml b/testsuite/model/pom.xml index 4dc42c2e60..bfeab7b5e7 100644 --- a/testsuite/model/pom.xml +++ b/testsuite/model/pom.xml @@ -219,6 +219,13 @@ + + jpa+persistentsessions + + Jpa,PersistentUserSessionsNoCache + + + jpa+infinispan+client-storage diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessions.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessions.java index a1ae1187e5..d3c1e0aa19 100644 --- a/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessions.java +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessions.java @@ -36,6 +36,5 @@ public class PersistentUserSessions extends KeycloakModelParameters { public static void updateConfigForJpa(Config cf) { System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS), "enabled"); - System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE), "enabled"); } } diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessionsNoCache.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessionsNoCache.java new file mode 100644 index 0000000000..818db83cb5 --- /dev/null +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/parameters/PersistentUserSessionsNoCache.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.keycloak.testsuite.model.parameters; + +import org.keycloak.common.Profile; +import org.keycloak.common.profile.PropertiesProfileConfigResolver; +import org.keycloak.testsuite.model.Config; +import org.keycloak.testsuite.model.KeycloakModelParameters; + +import java.util.Collections; + +public class PersistentUserSessionsNoCache extends KeycloakModelParameters { + + public PersistentUserSessionsNoCache() { + super(Collections.emptySet(), Collections.emptySet()); + } + + @Override + public void updateConfig(Config cf) { + updateConfigForJpa(cf); + } + + public static void updateConfigForJpa(Config cf) { + System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS), "enabled"); + System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE), "enabled"); + } +} diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java index 546710a9d7..33749cbd12 100644 --- a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java @@ -21,6 +21,7 @@ import org.junit.Assert; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import org.keycloak.common.Profile; import org.keycloak.common.util.Retry; import org.keycloak.common.util.Time; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; @@ -34,6 +35,7 @@ import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionModel; import org.keycloak.models.UserProvider; import org.keycloak.models.UserSessionProvider; +import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.protocol.oidc.OIDCConfigAttributes; import org.keycloak.protocol.oidc.OIDCLoginProtocol; import org.keycloak.testsuite.model.HotRodServerRule; @@ -276,20 +278,25 @@ public class SessionTimeoutsTest extends KeycloakModelTest { clientSession.setTimestamp(time); return null; }); - // The persistent session will write the update data asynchronously, wait for it to arrive. - Retry.executeWithBackoff(iteration -> { - withRealm(realmId, (session, realm) -> { - // refresh sessions before user session expires => both session should exist - ClientModel client = realm.getClientByClientId("test-app"); - UserSessionModel userSession = getUserSession(session, realm, sessions[0], offline); - Assert.assertNotNull(userSession); - AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId()); - Assert.assertNotNull(clientSession); - Assert.assertEquals(userSession.getLastSessionRefresh(), time); - Assert.assertEquals(clientSession.getTimestamp(), time); - return null; - }); - }, 10, 10); + + if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { + // The persistent session will write the update data asynchronously, wait for it to arrive. + Retry.executeWithBackoff(iteration -> { + withRealm(realmId, (session, realm) -> { + UserSessionPersisterProvider provider = session.getProvider(UserSessionPersisterProvider.class); + UserSessionModel userSessionModel = provider.loadUserSession(realm, sessions[0], offline); + Assert.assertNotNull(userSessionModel); + Assert.assertEquals(userSessionModel.getLastSessionRefresh(), time); + + // refresh sessions before user session expires => both session should exist + ClientModel client = realm.getClientByClientId("test-app"); + AuthenticatedClientSessionModel clientSession = userSessionModel.getAuthenticatedClientSessionByClient(client.getId()); + Assert.assertNotNull(clientSession); + Assert.assertEquals(clientSession.getTimestamp(), time); + return null; + }); + }, 10, 10); + } } offset += 2100;