diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java index 84fc24256c..808ca89899 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java @@ -128,7 +128,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider offlineSessionsCache, clientSessionCache, offlineClientSessionsCache, - asyncQueuePersistentUpdate + asyncQueuePersistentUpdate, + serializerSession, + serializerOfflineSession, + serializerClientSession, + serializerOfflineClientSession ); } return new InfinispanUserSessionProvider( diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java index ad0b1cd71d..7b568a4acd 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java @@ -43,6 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter; import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction; import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate; +import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.changes.Tasks; @@ -119,7 +120,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi Cache> offlineSessionCache, Cache> clientSessionCache, Cache> offlineClientSessionCache, - ArrayBlockingQueue asyncQueuePersistentUpdate) { + ArrayBlockingQueue asyncQueuePersistentUpdate, + SerializeExecutionsByKey serializerSession, + SerializeExecutionsByKey serializerOfflineSession, + SerializeExecutionsByKey serializerClientSession, + SerializeExecutionsByKey serializerOfflineClientSession) { if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { throw new IllegalStateException("Persistent user sessions are not enabled"); } @@ -136,7 +141,9 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs, - asyncQueuePersistentUpdate); + asyncQueuePersistentUpdate, + serializerSession, + serializerOfflineSession); this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, offlineClientSessionCache, @@ -144,7 +151,9 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs, sessionTx, - asyncQueuePersistentUpdate); + asyncQueuePersistentUpdate, + serializerClientSession, + serializerOfflineClientSession); this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java index 04c394d594..e3fe20bcb7 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java @@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.changes; import org.infinispan.Cache; import org.jboss.logging.Logger; -import org.keycloak.common.Profile; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; import org.keycloak.models.KeycloakSession; @@ -53,8 +52,10 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent SessionFunction offlineLifespanMsLoader, SessionFunction offlineMaxIdleTimeMsLoader, UserSessionPersistentChangelogBasedTransaction userSessionTx, - ArrayBlockingQueue batchingQueue) { - super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue); + ArrayBlockingQueue batchingQueue, + SerializeExecutionsByKey serializerOnline, + SerializeExecutionsByKey serializerOffline) { + super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline); this.userSessionTx = userSessionTx; } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java index f3487a2d85..46065d91cb 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java @@ -33,10 +33,12 @@ public class EmbeddedCachesChangesPerformer implemen private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class); private final Cache> cache; + private final SerializeExecutionsByKey serializer; private final List changes = new LinkedList<>(); - public EmbeddedCachesChangesPerformer(Cache> cache) { + public EmbeddedCachesChangesPerformer(Cache> cache, SerializeExecutionsByKey serializer) { this.cache = cache; + this.serializer = serializer; } private void runOperationInCluster(K key, MergedUpdate task, SessionEntityWrapper sessionWrapper) { @@ -82,33 +84,35 @@ public class EmbeddedCachesChangesPerformer implemen } private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { - SessionEntityWrapper oldVersion = oldVersionEntity; - SessionEntityWrapper returnValue = null; - int iteration = 0; - V session = oldVersion.getEntity(); - var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache); - while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { - SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); - returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); + serializer.runSerialized(key, () -> { + SessionEntityWrapper oldVersion = oldVersionEntity; + SessionEntityWrapper returnValue = null; + int iteration = 0; + V session = oldVersion.getEntity(); + var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache); + while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { + SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); + returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - if (returnValue == null) { - LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); - return; - } - - if (returnValue.getVersion().equals(newVersionEntity.getVersion())){ - if (LOG.isTraceEnabled()) { - LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + if (returnValue == null) { + LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); + return; } - return; + + if (returnValue.getVersion().equals(newVersionEntity.getVersion())) { + if (LOG.isTraceEnabled()) { + LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + } + return; + } + + oldVersion = returnValue; + session = oldVersion.getEntity(); + task.runUpdate(session); } - oldVersion = returnValue; - session = oldVersion.getEntity(); - task.runUpdate(session); - } - - LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue); + LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue); + }); } private SessionEntityWrapper generateNewVersionAndWrapEntity(V entity, Map localMetadata) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java index f6d5a66aa1..23d3189541 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java @@ -61,7 +61,9 @@ abstract public class PersistentSessionsChangelogBasedTransaction maxIdleTimeMsLoader, SessionFunction offlineLifespanMsLoader, SessionFunction offlineMaxIdleTimeMsLoader, - ArrayBlockingQueue batchingQueue) { + ArrayBlockingQueue batchingQueue, + SerializeExecutionsByKey serializerOnline, + SerializeExecutionsByKey serializerOffline) { kcSession = session; if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { @@ -79,13 +81,13 @@ abstract public class PersistentSessionsChangelogBasedTransaction(cache.getName(), batchingQueue), - new EmbeddedCachesChangesPerformer<>(cache) { + new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) { @Override public boolean shouldConsumeChange(V entity) { return !entity.isOffline(); } }, - new EmbeddedCachesChangesPerformer<>(offlineCache){ + new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){ @Override public boolean shouldConsumeChange(V entity) { return entity.isOffline(); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java index 3534e280cd..874b04e6c9 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java @@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.changes; import org.infinispan.Cache; import org.jboss.logging.Logger; -import org.keycloak.common.Profile; import org.keycloak.models.KeycloakSession; import org.keycloak.models.RealmModel; import org.keycloak.models.UserSessionModel; @@ -45,8 +44,10 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe SessionFunction maxIdleTimeMsLoader, SessionFunction offlineLifespanMsLoader, SessionFunction offlineMaxIdleTimeMsLoader, - ArrayBlockingQueue batchingQueue) { - super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue); + ArrayBlockingQueue batchingQueue, + SerializeExecutionsByKey serializerOnline, + SerializeExecutionsByKey serializerOffline) { + super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline); } public SessionEntityWrapper get(RealmModel realm, String key, boolean offline) {