Avoid conflicts when writing to session stores by checking for concurrent requests within the JVM
Closes #28388 Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
parent
8140c76147
commit
dc18bd4efb
12 changed files with 129 additions and 45 deletions
|
@ -136,9 +136,11 @@ public class RealmCacheManager extends CacheManager {
|
|||
public <T> T computeSerialized(KeycloakSession session, String id, BiFunction<String, KeycloakSession, T> compute) {
|
||||
// this locking is only to ensure that if there is a computation for the same id in the "synchronized" block below,
|
||||
// it will have the same object instance to lock the current execution until the other is finished.
|
||||
Object lock = cacheInteractions.computeIfAbsent(id, s -> id);
|
||||
String lock = cacheInteractions.computeIfAbsent(id, s -> id);
|
||||
try {
|
||||
synchronized (lock) {
|
||||
// in case the previous thread has removed the entry in the finally block
|
||||
cacheInteractions.putIfAbsent(id, lock);
|
||||
return compute.apply(id, session);
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.keycloak.models.RealmModel;
|
|||
import org.keycloak.models.UserLoginFailureModel;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||
|
@ -59,10 +60,11 @@ public class InfinispanUserLoginFailureProvider implements UserLoginFailureProvi
|
|||
|
||||
public InfinispanUserLoginFailureProvider(KeycloakSession session,
|
||||
RemoteCacheInvoker remoteCacheInvoker,
|
||||
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailureCache) {
|
||||
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailureCache,
|
||||
SerializeExecutionsByKey<LoginFailureKey> serializer) {
|
||||
this.session = session;
|
||||
this.loginFailureCache = loginFailureCache;
|
||||
this.loginFailuresTx = new InfinispanChangelogBasedTransaction<>(session, loginFailureCache, remoteCacheInvoker, SessionTimeouts::getLoginFailuresLifespanMs, SessionTimeouts::getLoginFailuresMaxIdleMs);
|
||||
this.loginFailuresTx = new InfinispanChangelogBasedTransaction<>(session, loginFailureCache, remoteCacheInvoker, SessionTimeouts::getLoginFailuresLifespanMs, SessionTimeouts::getLoginFailuresMaxIdleMs, serializer);
|
||||
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
||||
|
||||
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.keycloak.Config;
|
|||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.ClientModel;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.KeycloakSessionTask;
|
||||
|
@ -32,6 +31,7 @@ import org.keycloak.models.UserLoginFailureProvider;
|
|||
import org.keycloak.models.UserLoginFailureProviderFactory;
|
||||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
|
||||
|
@ -50,6 +50,7 @@ import org.keycloak.models.utils.PostMigrationEvent;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory.PROVIDER_PRIORITY;
|
||||
|
||||
/**
|
||||
|
@ -68,13 +69,14 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
|
|||
private Config.Scope config;
|
||||
|
||||
private RemoteCacheInvoker remoteCacheInvoker;
|
||||
SerializeExecutionsByKey<LoginFailureKey> serializer = new SerializeExecutionsByKey<>();
|
||||
|
||||
@Override
|
||||
public UserLoginFailureProvider create(KeycloakSession session) {
|
||||
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
|
||||
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME);
|
||||
|
||||
return new InfinispanUserLoginFailureProvider(session, remoteCacheInvoker, loginFailures);
|
||||
return new InfinispanUserLoginFailureProvider(session, remoteCacheInvoker, loginFailures, serializer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.keycloak.models.UserProvider;
|
|||
import org.keycloak.models.UserSessionModel;
|
||||
import org.keycloak.models.UserSessionProvider;
|
||||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
|
||||
|
@ -130,7 +131,11 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
|
|||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
|
||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
|
||||
SessionFunction<UserSessionEntity> offlineSessionCacheEntryLifespanAdjuster,
|
||||
SessionFunction<AuthenticatedClientSessionEntity> offlineClientSessionCacheEntryLifespanAdjuster) {
|
||||
SessionFunction<AuthenticatedClientSessionEntity> offlineClientSessionCacheEntryLifespanAdjuster,
|
||||
SerializeExecutionsByKey<String> serializerSession,
|
||||
SerializeExecutionsByKey<String> serializerOfflineSession,
|
||||
SerializeExecutionsByKey<UUID> serializerClientSession,
|
||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
|
||||
this.session = session;
|
||||
|
||||
this.sessionCache = sessionCache;
|
||||
|
@ -138,10 +143,10 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
|
|||
this.offlineSessionCache = offlineSessionCache;
|
||||
this.offlineClientSessionCache = offlineClientSessionCache;
|
||||
|
||||
this.sessionTx = new InfinispanChangelogBasedTransaction<>(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs);
|
||||
this.offlineSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs);
|
||||
this.clientSessionTx = new InfinispanChangelogBasedTransaction<>(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs);
|
||||
this.offlineClientSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs);
|
||||
this.sessionTx = new InfinispanChangelogBasedTransaction<>(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, serializerSession);
|
||||
this.offlineSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, serializerOfflineSession);
|
||||
this.clientSessionTx = new InfinispanChangelogBasedTransaction<>(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, serializerClientSession);
|
||||
this.offlineClientSessionTx = new InfinispanChangelogBasedTransaction<>(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, serializerOfflineClientSession);
|
||||
|
||||
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.keycloak.models.RealmModel;
|
|||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.UserSessionProvider;
|
||||
import org.keycloak.models.UserSessionProviderFactory;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
|
||||
|
@ -93,6 +94,10 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
private CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
|
||||
private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
|
||||
private InfinispanKeyGenerator keyGenerator;
|
||||
SerializeExecutionsByKey<String> serializerSession = new SerializeExecutionsByKey<>();
|
||||
SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
|
||||
SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
|
||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
|
||||
|
||||
@Override
|
||||
public UserSessionProvider create(KeycloakSession session) {
|
||||
|
@ -115,7 +120,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
clientSessionCache,
|
||||
offlineClientSessionsCache,
|
||||
this::deriveOfflineSessionCacheEntryLifespanMs,
|
||||
this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs
|
||||
this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs,
|
||||
serializerSession,
|
||||
serializerOfflineSession,
|
||||
serializerClientSession,
|
||||
serializerOfflineClientSession
|
||||
);
|
||||
}
|
||||
return new InfinispanUserSessionProvider(
|
||||
|
@ -130,7 +139,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
clientSessionCache,
|
||||
offlineClientSessionsCache,
|
||||
this::deriveOfflineSessionCacheEntryLifespanMs,
|
||||
this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs
|
||||
this::deriveOfflineClientSessionCacheEntryLifespanOverrideMs,
|
||||
serializerSession,
|
||||
serializerOfflineSession,
|
||||
serializerClientSession,
|
||||
serializerOfflineClientSession
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter;
|
|||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||
|
@ -128,7 +129,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
|
||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
|
||||
SessionFunction<UserSessionEntity> offlineSessionCacheEntryLifespanAdjuster,
|
||||
SessionFunction<AuthenticatedClientSessionEntity> offlineClientSessionCacheEntryLifespanAdjuster) {
|
||||
SessionFunction<AuthenticatedClientSessionEntity> offlineClientSessionCacheEntryLifespanAdjuster,
|
||||
SerializeExecutionsByKey<String> serializerSession,
|
||||
SerializeExecutionsByKey<String> serializerOfflineSession,
|
||||
SerializeExecutionsByKey<UUID> serializerClientSession,
|
||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||
}
|
||||
|
@ -140,11 +145,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
this.offlineSessionCache = offlineSessionCache;
|
||||
this.offlineClientSessionCache = offlineClientSessionCache;
|
||||
|
||||
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false);
|
||||
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true);
|
||||
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession);
|
||||
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession);
|
||||
|
||||
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx);
|
||||
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx);
|
||||
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession);
|
||||
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession);
|
||||
|
||||
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
||||
|
||||
|
|
|
@ -44,8 +44,8 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
private final InfinispanKeyGenerator keyGenerator;
|
||||
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
|
||||
|
||||
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, UserSessionPersistentChangelogBasedTransaction userSessionTx) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline);
|
||||
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey<UUID> serializer) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer);
|
||||
this.keyGenerator = keyGenerator;
|
||||
this.userSessionTx = userSessionTx;
|
||||
}
|
||||
|
|
|
@ -27,16 +27,19 @@ import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
|
||||
private final Cache<K, SessionEntityWrapper<V>> cache;
|
||||
private final SerializeExecutionsByKey<K> serializer;
|
||||
private final List<Runnable> changes = new LinkedList<>();
|
||||
|
||||
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache) {
|
||||
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache, SerializeExecutionsByKey<K> serializer) {
|
||||
this.cache = cache;
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
|
||||
|
@ -83,37 +86,39 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
|||
}
|
||||
|
||||
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
||||
serializer.runSerialized(key, () -> {
|
||||
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
||||
boolean replaced = false;
|
||||
int iteration = 0;
|
||||
V session = oldVersionEntity.getEntity();
|
||||
V session = oldVersion.getEntity();
|
||||
|
||||
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||
iteration++;
|
||||
|
||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata());
|
||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
||||
|
||||
// Atomic cluster-aware replace
|
||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersionEntity, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
|
||||
if (!replaced) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion());
|
||||
LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
||||
}
|
||||
|
||||
oldVersionEntity = cache.get(key);
|
||||
oldVersion = cache.get(key);
|
||||
|
||||
if (oldVersionEntity == null) {
|
||||
if (oldVersion == null) {
|
||||
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||
return;
|
||||
}
|
||||
|
||||
session = oldVersionEntity.getEntity();
|
||||
session = oldVersion.getEntity();
|
||||
|
||||
task.runUpdate(session);
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -121,7 +126,7 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
|||
if (!replaced) {
|
||||
LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan.changes;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
|
@ -55,15 +56,17 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
|
|||
|
||||
protected final SessionFunction<V> lifespanMsLoader;
|
||||
protected final SessionFunction<V> maxIdleTimeMsLoader;
|
||||
private final SerializeExecutionsByKey<K> serializer;
|
||||
|
||||
public InfinispanChangelogBasedTransaction(KeycloakSession kcSession, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker,
|
||||
SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader) {
|
||||
SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, SerializeExecutionsByKey<K> serializer) {
|
||||
this.kcSession = kcSession;
|
||||
this.cacheName = cache.getName();
|
||||
this.cache = cache;
|
||||
this.remoteCacheInvoker = remoteCacheInvoker;
|
||||
this.lifespanMsLoader = lifespanMsLoader;
|
||||
this.maxIdleTimeMsLoader = maxIdleTimeMsLoader;
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
|
||||
|
@ -231,39 +234,40 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
|
|||
|
||||
}
|
||||
|
||||
|
||||
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
||||
serializer.runSerialized(key, () -> {
|
||||
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
||||
boolean replaced = false;
|
||||
int iteration = 0;
|
||||
V session = oldVersionEntity.getEntity();
|
||||
V session = oldVersion.getEntity();
|
||||
|
||||
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||
iteration++;
|
||||
|
||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata());
|
||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
||||
|
||||
// Atomic cluster-aware replace
|
||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersionEntity, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
|
||||
if (!replaced) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion());
|
||||
logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
||||
}
|
||||
|
||||
oldVersionEntity = cache.get(key);
|
||||
oldVersion = cache.get(key);
|
||||
|
||||
if (oldVersionEntity == null) {
|
||||
if (oldVersion == null) {
|
||||
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||
return;
|
||||
}
|
||||
|
||||
session = oldVersionEntity.getEntity();
|
||||
session = oldVersion.getEntity();
|
||||
|
||||
task.runUpdate(session);
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersionEntity.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -271,7 +275,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
|
|||
if (!replaced) {
|
||||
logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -37,8 +37,8 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
|||
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
||||
protected final boolean offline;
|
||||
|
||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader);
|
||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<K> serializer) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer);
|
||||
this.offline = offline;
|
||||
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||
|
@ -52,7 +52,7 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
|||
} else {
|
||||
changesPerformers = List.of(
|
||||
new JpaChangesPerformer<>(session, cache.getName(), offline),
|
||||
new EmbeddedCachesChangesPerformer<>(cache),
|
||||
new EmbeddedCachesChangesPerformer<>(cache, serializer),
|
||||
new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright 2024 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.sessions.infinispan.changes;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Adding an in-JVM lock to prevent a best-effort concurrent executions for the same ID.
|
||||
* This should prevent a burst of requests by letting only the first request pass, and then the others will follow one-by-one.
|
||||
* Use this when the code wrapped by runSerialized is known to produce conflicts when run concurrently with the same ID.
|
||||
*
|
||||
* @author Alexander Schwartz
|
||||
*/
|
||||
public class SerializeExecutionsByKey<K> {
|
||||
private final ConcurrentHashMap<K, K> cacheInteractions = new ConcurrentHashMap<>();
|
||||
|
||||
public void runSerialized(K key, Runnable task) {
|
||||
// this locking is only to ensure that if there is a computation for the same id in the "synchronized" block below,
|
||||
// it will have the same object instance to lock the current execution until the other is finished.
|
||||
K lock = cacheInteractions.computeIfAbsent(key, s -> key);
|
||||
try {
|
||||
synchronized (lock) {
|
||||
// in case the previous thread has removed the entry in the finally block
|
||||
cacheInteractions.putIfAbsent(key, lock);
|
||||
task.run();
|
||||
}
|
||||
} finally {
|
||||
cacheInteractions.remove(lock);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -40,8 +40,8 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U
|
|||
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
|
||||
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline);
|
||||
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<String> serializer) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer);
|
||||
}
|
||||
|
||||
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key) {
|
||||
|
|
Loading…
Reference in a new issue