Avoid conflicts when writing to session stores by checking for concurrent requests within the JVM (#29393)

Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
Signed-off-by: Michal Hajas <mhajas@redhat.com>
Co-authored-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
Alexander Schwartz 2024-05-09 10:24:43 +02:00 committed by GitHub
parent 741cb2ab1e
commit eaeffe95ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 58 additions and 37 deletions

View file

@ -128,7 +128,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
offlineSessionsCache,
clientSessionCache,
offlineClientSessionsCache,
asyncQueuePersistentUpdate
asyncQueuePersistentUpdate,
serializerSession,
serializerOfflineSession,
serializerClientSession,
serializerOfflineClientSession
);
}
return new InfinispanUserSessionProvider(

View file

@ -43,6 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
@ -119,7 +120,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate) {
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate,
SerializeExecutionsByKey<String> serializerSession,
SerializeExecutionsByKey<String> serializerOfflineSession,
SerializeExecutionsByKey<UUID> serializerClientSession,
SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
throw new IllegalStateException("Persistent user sessions are not enabled");
}
@ -136,7 +141,9 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
remoteCacheInvoker,
SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs,
SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs,
asyncQueuePersistentUpdate);
asyncQueuePersistentUpdate,
serializerSession,
serializerOfflineSession);
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session,
clientSessionCache, offlineClientSessionCache,
@ -144,7 +151,9 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs,
SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs,
sessionTx,
asyncQueuePersistentUpdate);
asyncQueuePersistentUpdate,
serializerClientSession,
serializerOfflineClientSession);
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);

View file

@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.common.Profile;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
@ -53,8 +52,10 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
SessionFunction<AuthenticatedClientSessionEntity> offlineLifespanMsLoader,
SessionFunction<AuthenticatedClientSessionEntity> offlineMaxIdleTimeMsLoader,
UserSessionPersistentChangelogBasedTransaction userSessionTx,
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue);
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<UUID> serializerOnline,
SerializeExecutionsByKey<UUID> serializerOffline) {
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
this.userSessionTx = userSessionTx;
}

View file

@ -33,10 +33,12 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
private final Cache<K, SessionEntityWrapper<V>> cache;
private final SerializeExecutionsByKey<K> serializer;
private final List<Runnable> changes = new LinkedList<>();
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache) {
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache, SerializeExecutionsByKey<K> serializer) {
this.cache = cache;
this.serializer = serializer;
}
private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
@ -82,33 +84,35 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
}
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
SessionEntityWrapper<V> returnValue = null;
int iteration = 0;
V session = oldVersion.getEntity();
var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
serializer.runSerialized(key, () -> {
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
SessionEntityWrapper<V> returnValue = null;
int iteration = 0;
V session = oldVersion.getEntity();
var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
if (returnValue == null) {
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return;
}
if (returnValue.getVersion().equals(newVersionEntity.getVersion())){
if (LOG.isTraceEnabled()) {
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
if (returnValue == null) {
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return;
}
return;
if (returnValue.getVersion().equals(newVersionEntity.getVersion())) {
if (LOG.isTraceEnabled()) {
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
return;
}
oldVersion = returnValue;
session = oldVersion.getEntity();
task.runUpdate(session);
}
oldVersion = returnValue;
session = oldVersion.getEntity();
task.runUpdate(session);
}
LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
});
}
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {

View file

@ -61,7 +61,9 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
SessionFunction<V> maxIdleTimeMsLoader,
SessionFunction<V> offlineLifespanMsLoader,
SessionFunction<V> offlineMaxIdleTimeMsLoader,
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<K> serializerOnline,
SerializeExecutionsByKey<K> serializerOffline) {
kcSession = session;
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
@ -79,13 +81,13 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
changesPerformers = List.of(
new JpaChangesPerformer<>(cache.getName(), batchingQueue),
new EmbeddedCachesChangesPerformer<>(cache) {
new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) {
@Override
public boolean shouldConsumeChange(V entity) {
return !entity.isOffline();
}
},
new EmbeddedCachesChangesPerformer<>(offlineCache){
new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){
@Override
public boolean shouldConsumeChange(V entity) {
return entity.isOffline();

View file

@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.common.Profile;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
@ -45,8 +44,10 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
SessionFunction<UserSessionEntity> maxIdleTimeMsLoader,
SessionFunction<UserSessionEntity> offlineLifespanMsLoader,
SessionFunction<UserSessionEntity> offlineMaxIdleTimeMsLoader,
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue);
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<String> serializerOnline,
SerializeExecutionsByKey<String> serializerOffline) {
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
}
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key, boolean offline) {