From dc18bd4efbaa9e9afcc054261acb771aeae62ca3 Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Wed, 3 Apr 2024 10:55:33 +0200 Subject: [PATCH] Avoid conflicts when writing to session stores by checking for concurrent requests within the JVM Closes #28388 Signed-off-by: Alexander Schwartz --- .../cache/infinispan/RealmCacheManager.java | 4 +- .../InfinispanUserLoginFailureProvider.java | 6 ++- ...nispanUserLoginFailureProviderFactory.java | 6 ++- .../InfinispanUserSessionProvider.java | 15 ++++-- .../InfinispanUserSessionProviderFactory.java | 17 ++++++- .../PersistentUserSessionProvider.java | 15 ++++-- ...onPersistentChangelogBasedTransaction.java | 4 +- .../EmbeddedCachesChangesPerformer.java | 25 ++++++---- .../InfinispanChangelogBasedTransaction.java | 26 ++++++----- ...tentSessionsChangelogBasedTransaction.java | 6 +-- .../changes/SerializeExecutionsByKey.java | 46 +++++++++++++++++++ ...onPersistentChangelogBasedTransaction.java | 4 +- 12 files changed, 129 insertions(+), 45 deletions(-) create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SerializeExecutionsByKey.java diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/RealmCacheManager.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/RealmCacheManager.java index ae2633fe1c..5d5d3c8c7b 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/RealmCacheManager.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/RealmCacheManager.java @@ -136,9 +136,11 @@ public class RealmCacheManager extends CacheManager { public T computeSerialized(KeycloakSession session, String id, BiFunction compute) { // this locking is only to ensure that if there is a computation for the same id in the "synchronized" block below, // it will have the same object instance to lock the current execution until the other is finished. - Object lock = cacheInteractions.computeIfAbsent(id, s -> id); + String lock = cacheInteractions.computeIfAbsent(id, s -> id); try { synchronized (lock) { + // in case the previous thread has removed the entry in the finally block + cacheInteractions.putIfAbsent(id, lock); return compute.apply(id, session); } } finally { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProvider.java index 78fcffc4b4..c51f2363ca 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProvider.java @@ -25,6 +25,7 @@ import org.keycloak.models.RealmModel; import org.keycloak.models.UserLoginFailureModel; import org.keycloak.models.UserSessionModel; import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction; +import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.changes.Tasks; @@ -59,10 +60,11 @@ public class InfinispanUserLoginFailureProvider implements UserLoginFailureProvi public InfinispanUserLoginFailureProvider(KeycloakSession session, RemoteCacheInvoker remoteCacheInvoker, - Cache> loginFailureCache) { + Cache> loginFailureCache, + SerializeExecutionsByKey serializer) { this.session = session; this.loginFailureCache = loginFailureCache; - this.loginFailuresTx = new InfinispanChangelogBasedTransaction<>(session, loginFailureCache, remoteCacheInvoker, SessionTimeouts::getLoginFailuresLifespanMs, SessionTimeouts::getLoginFailuresMaxIdleMs); + this.loginFailuresTx = new InfinispanChangelogBasedTransaction<>(session, loginFailureCache, remoteCacheInvoker, SessionTimeouts::getLoginFailuresLifespanMs, SessionTimeouts::getLoginFailuresMaxIdleMs, serializer); this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session); session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProviderFactory.java index 74ddf64b0a..1aeac3f9f9 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserLoginFailureProviderFactory.java @@ -24,7 +24,6 @@ import org.keycloak.Config; import org.keycloak.cluster.ClusterProvider; import org.keycloak.common.util.Time; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.ClientModel; import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSessionFactory; import org.keycloak.models.KeycloakSessionTask; @@ -32,6 +31,7 @@ import org.keycloak.models.UserLoginFailureProvider; import org.keycloak.models.UserLoginFailureProviderFactory; import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; +import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity; import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; @@ -50,6 +50,7 @@ import org.keycloak.models.utils.PostMigrationEvent; import java.io.Serializable; import java.util.Set; + import static org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory.PROVIDER_PRIORITY; /** @@ -68,13 +69,14 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu private Config.Scope config; private RemoteCacheInvoker remoteCacheInvoker; + SerializeExecutionsByKey serializer = new SerializeExecutionsByKey<>(); @Override public UserLoginFailureProvider create(KeycloakSession session) { InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class); Cache> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME); - return new InfinispanUserLoginFailureProvider(session, remoteCacheInvoker, loginFailures); + return new InfinispanUserLoginFailureProvider(session, remoteCacheInvoker, loginFailures, serializer); } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java index 442d861c6a..20ea25ebcc 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java @@ -40,6 +40,7 @@ import org.keycloak.models.UserProvider; import org.keycloak.models.UserSessionModel; import org.keycloak.models.UserSessionProvider; import org.keycloak.models.session.UserSessionPersisterProvider; +import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.Tasks; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore; @@ -130,7 +131,11 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi Cache> clientSessionCache, Cache> offlineClientSessionCache, SessionFunction offlineSessionCacheEntryLifespanAdjuster, - SessionFunction offlineClientSessionCacheEntryLifespanAdjuster) { + SessionFunction offlineClientSessionCacheEntryLifespanAdjuster, + SerializeExecutionsByKey serializerSession, + SerializeExecutionsByKey serializerOfflineSession, + SerializeExecutionsByKey serializerClientSession, + SerializeExecutionsByKey serializerOfflineClientSession) { this.session = session; this.sessionCache = sessionCache; @@ -138,10 +143,10 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi this.offlineSessionCache = offlineSessionCache; this.offlineClientSessionCache = offlineClientSessionCache; - this.sessionTx = new InfinispanChangelogBasedTransaction<>(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs); - this.offlineSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs); - this.clientSessionTx = new InfinispanChangelogBasedTransaction<>(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs); - this.offlineClientSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs); + this.sessionTx = new InfinispanChangelogBasedTransaction<>(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, serializerSession); + this.offlineSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, serializerOfflineSession); + this.clientSessionTx = new InfinispanChangelogBasedTransaction<>(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, serializerClientSession); + this.offlineClientSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, serializerOfflineClientSession); this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java index 9f79f4c733..ed52ed7aab 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java @@ -35,6 +35,7 @@ import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionProvider; import org.keycloak.models.UserSessionProviderFactory; +import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory; import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore; @@ -93,6 +94,10 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider private CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore; private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore; private InfinispanKeyGenerator keyGenerator; + SerializeExecutionsByKey serializerSession = new SerializeExecutionsByKey<>(); + SerializeExecutionsByKey serializerOfflineSession = new SerializeExecutionsByKey<>(); + SerializeExecutionsByKey serializerClientSession = new SerializeExecutionsByKey<>(); + SerializeExecutionsByKey serializerOfflineClientSession = new SerializeExecutionsByKey<>(); @Override public UserSessionProvider create(KeycloakSession session) { @@ -115,7 +120,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider clientSessionCache, offlineClientSessionsCache, this::deriveOfflineSessionCacheEntryLifespanMs, - this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs + this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs, + serializerSession, + serializerOfflineSession, + serializerClientSession, + serializerOfflineClientSession ); } return new InfinispanUserSessionProvider( @@ -130,7 +139,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider clientSessionCache, offlineClientSessionsCache, this::deriveOfflineSessionCacheEntryLifespanMs, - this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs + this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs, + serializerSession, + serializerOfflineSession, + serializerClientSession, + serializerOfflineClientSession ); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java index 936811e442..fcbdd5579a 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java @@ -43,6 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter; import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction; import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction; +import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.changes.Tasks; @@ -128,7 +129,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi Cache> clientSessionCache, Cache> offlineClientSessionCache, SessionFunction offlineSessionCacheEntryLifespanAdjuster, - SessionFunction offlineClientSessionCacheEntryLifespanAdjuster) { + SessionFunction offlineClientSessionCacheEntryLifespanAdjuster, + SerializeExecutionsByKey serializerSession, + SerializeExecutionsByKey serializerOfflineSession, + SerializeExecutionsByKey serializerClientSession, + SerializeExecutionsByKey serializerOfflineClientSession) { if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { throw new IllegalStateException("Persistent user sessions are not enabled"); } @@ -140,11 +145,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi this.offlineSessionCache = offlineSessionCache; this.offlineClientSessionCache = offlineClientSessionCache; - this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false); - this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true); + this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession); + this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession); - this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx); - this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx); + this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession); + this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession); this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java index 943f3d1261..640a212a8d 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/ClientSessionPersistentChangelogBasedTransaction.java @@ -44,8 +44,8 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent private final InfinispanKeyGenerator keyGenerator; private final UserSessionPersistentChangelogBasedTransaction userSessionTx; - public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, UserSessionPersistentChangelogBasedTransaction userSessionTx) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline); + public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey serializer) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer); this.keyGenerator = keyGenerator; this.userSessionTx = userSessionTx; } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java index fbd53bc94f..8d847c7851 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java @@ -27,16 +27,19 @@ import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; public class EmbeddedCachesChangesPerformer implements SessionChangesPerformer { private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class); private final Cache> cache; + private final SerializeExecutionsByKey serializer; private final List changes = new LinkedList<>(); - public EmbeddedCachesChangesPerformer(Cache> cache) { + public EmbeddedCachesChangesPerformer(Cache> cache, SerializeExecutionsByKey serializer) { this.cache = cache; + this.serializer = serializer; } private void runOperationInCluster(K key, MergedUpdate task, SessionEntityWrapper sessionWrapper) { @@ -83,37 +86,39 @@ public class EmbeddedCachesChangesPerformer implemen } private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { + serializer.runSerialized(key, () -> { + SessionEntityWrapper oldVersion = oldVersionEntity; boolean replaced = false; int iteration = 0; - V session = oldVersionEntity.getEntity(); + V session = oldVersion.getEntity(); while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { iteration++; - SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata()); + SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); // Atomic cluster-aware replace - replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersionEntity, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); + replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again if (!replaced) { if (LOG.isDebugEnabled()) { - LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion()); + LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); } - oldVersionEntity = cache.get(key); + oldVersion = cache.get(key); - if (oldVersionEntity == null) { + if (oldVersion == null) { LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); return; } - session = oldVersionEntity.getEntity(); + session = oldVersion.getEntity(); task.runUpdate(session); } else { if (LOG.isTraceEnabled()) { - LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); } } } @@ -121,7 +126,7 @@ public class EmbeddedCachesChangesPerformer implemen if (!replaced) { LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); } - + }); } private SessionEntityWrapper generateNewVersionAndWrapEntity(V entity, Map localMetadata) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java index c7585abbd8..47ade84a35 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java @@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan.changes; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.infinispan.Cache; @@ -55,15 +56,17 @@ public class InfinispanChangelogBasedTransaction ext protected final SessionFunction lifespanMsLoader; protected final SessionFunction maxIdleTimeMsLoader; + private final SerializeExecutionsByKey serializer; public InfinispanChangelogBasedTransaction(KeycloakSession kcSession, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, - SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader) { + SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, SerializeExecutionsByKey serializer) { this.kcSession = kcSession; this.cacheName = cache.getName(); this.cache = cache; this.remoteCacheInvoker = remoteCacheInvoker; this.lifespanMsLoader = lifespanMsLoader; this.maxIdleTimeMsLoader = maxIdleTimeMsLoader; + this.serializer = serializer; } @@ -231,39 +234,40 @@ public class InfinispanChangelogBasedTransaction ext } - private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { + serializer.runSerialized(key, () -> { + SessionEntityWrapper oldVersion = oldVersionEntity; boolean replaced = false; int iteration = 0; - V session = oldVersionEntity.getEntity(); + V session = oldVersion.getEntity(); while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { iteration++; - SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata()); + SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); // Atomic cluster-aware replace - replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersionEntity, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); + replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again if (!replaced) { if (logger.isDebugEnabled()) { - logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion()); + logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); } - oldVersionEntity = cache.get(key); + oldVersion = cache.get(key); - if (oldVersionEntity == null) { + if (oldVersion == null) { logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); return; } - session = oldVersionEntity.getEntity(); + session = oldVersion.getEntity(); task.runUpdate(session); } else { if (logger.isTraceEnabled()) { - logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); } } } @@ -271,7 +275,7 @@ public class InfinispanChangelogBasedTransaction ext if (!replaced) { logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); } - + }); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java index fbec388528..9007ce3b11 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java @@ -37,8 +37,8 @@ public class PersistentSessionsChangelogBasedTransaction> changesPerformers; protected final boolean offline; - public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader); + public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer); this.offline = offline; if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { @@ -52,7 +52,7 @@ public class PersistentSessionsChangelogBasedTransaction(session, cache.getName(), offline), - new EmbeddedCachesChangesPerformer<>(cache), + new EmbeddedCachesChangesPerformer<>(cache, serializer), new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) ); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SerializeExecutionsByKey.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SerializeExecutionsByKey.java new file mode 100644 index 0000000000..8328884040 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SerializeExecutionsByKey.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Adding an in-JVM lock to prevent a best-effort concurrent executions for the same ID. + * This should prevent a burst of requests by letting only the first request pass, and then the others will follow one-by-one. + * Use this when the code wrapped by runSerialized is known to produce conflicts when run concurrently with the same ID. + * + * @author Alexander Schwartz + */ +public class SerializeExecutionsByKey { + private final ConcurrentHashMap cacheInteractions = new ConcurrentHashMap<>(); + + public void runSerialized(K key, Runnable task) { + // this locking is only to ensure that if there is a computation for the same id in the "synchronized" block below, + // it will have the same object instance to lock the current execution until the other is finished. + K lock = cacheInteractions.computeIfAbsent(key, s -> key); + try { + synchronized (lock) { + // in case the previous thread has removed the entry in the finally block + cacheInteractions.putIfAbsent(key, lock); + task.run(); + } + } finally { + cacheInteractions.remove(lock); + } + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java index adc8fa2b6c..12ecd2d900 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java @@ -40,8 +40,8 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction { private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class); - public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline); + public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer); } public SessionEntityWrapper get(RealmModel realm, String key) {