Batch writes originating from logins/logouts for persistent sessions
All writes for the sessions are handled by a background thread which batches them. Closes #28862 Wait for persistent-store to contain update instead of cache which has the change immediately since it is in memory + introduce new model-test profile Closes #29141 Signed-off-by: Alexander Schwartz <aschwart@redhat.com> Signed-off-by: Michal Hajas <mhajas@redhat.com> Co-authored-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
parent
98a620e227
commit
d69872fa11
22 changed files with 365 additions and 260 deletions
|
@ -156,11 +156,6 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
||||||
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
|
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDeferrable() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "setTimestamp(" + timestamp + ')';
|
return "setTimestamp(" + timestamp + ')';
|
||||||
|
|
|
@ -35,8 +35,8 @@ import org.keycloak.models.RealmModel;
|
||||||
import org.keycloak.models.UserModel;
|
import org.keycloak.models.UserModel;
|
||||||
import org.keycloak.models.UserSessionProvider;
|
import org.keycloak.models.UserSessionProvider;
|
||||||
import org.keycloak.models.UserSessionProviderFactory;
|
import org.keycloak.models.UserSessionProviderFactory;
|
||||||
|
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement;
|
|
||||||
import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker;
|
import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
|
||||||
|
@ -60,12 +60,15 @@ import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
|
||||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||||
import org.keycloak.models.utils.PostMigrationEvent;
|
import org.keycloak.models.utils.PostMigrationEvent;
|
||||||
import org.keycloak.models.utils.ResetTimeOffsetEvent;
|
import org.keycloak.models.utils.ResetTimeOffsetEvent;
|
||||||
|
import org.keycloak.provider.ProviderConfigProperty;
|
||||||
|
import org.keycloak.provider.ProviderConfigurationBuilder;
|
||||||
import org.keycloak.provider.ProviderEvent;
|
import org.keycloak.provider.ProviderEvent;
|
||||||
import org.keycloak.provider.ProviderEventListener;
|
import org.keycloak.provider.ProviderEventListener;
|
||||||
import org.keycloak.provider.ServerInfoAwareProviderFactory;
|
import org.keycloak.provider.ServerInfoAwareProviderFactory;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
@ -85,6 +88,10 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
public static final String CLIENT_REMOVED_SESSION_EVENT = "CLIENT_REMOVED_SESSION_SESSIONS";
|
public static final String CLIENT_REMOVED_SESSION_EVENT = "CLIENT_REMOVED_SESSION_SESSIONS";
|
||||||
|
|
||||||
public static final String REMOVE_USER_SESSIONS_EVENT = "REMOVE_USER_SESSIONS_EVENT";
|
public static final String REMOVE_USER_SESSIONS_EVENT = "REMOVE_USER_SESSIONS_EVENT";
|
||||||
|
public static final String CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE = "offlineSessionCacheEntryLifespanOverride";
|
||||||
|
public static final String CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE = "offlineClientSessionCacheEntryLifespanOverride";
|
||||||
|
public static final String CONFIG_MAX_BATCH_SIZE = "maxBatchSize";
|
||||||
|
public static final int DEFAULT_MAX_BATCH_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 2);
|
||||||
|
|
||||||
private long offlineSessionCacheEntryLifespanOverride;
|
private long offlineSessionCacheEntryLifespanOverride;
|
||||||
|
|
||||||
|
@ -101,11 +108,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
|
SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
|
||||||
SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
|
SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
|
||||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
|
SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions = new ArrayBlockingQueue<>(1000);
|
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate = new ArrayBlockingQueue<>(1000);
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions = new ArrayBlockingQueue<>(1000);
|
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions = new ArrayBlockingQueue<>(1000);
|
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions = new ArrayBlockingQueue<>(1000);
|
|
||||||
private PersistentSessionsWorker persistentSessionsWorker;
|
private PersistentSessionsWorker persistentSessionsWorker;
|
||||||
|
private int maxBatchSize;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public UserSessionProvider create(KeycloakSession session) {
|
public UserSessionProvider create(KeycloakSession session) {
|
||||||
|
@ -133,10 +138,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
serializerOfflineSession,
|
serializerOfflineSession,
|
||||||
serializerClientSession,
|
serializerClientSession,
|
||||||
serializerOfflineClientSession,
|
serializerOfflineClientSession,
|
||||||
asyncQueueUserSessions,
|
asyncQueuePersistentUpdate
|
||||||
asyncQueueUserOfflineSessions,
|
|
||||||
asyncQueueClientSessions,
|
|
||||||
asyncQueueClientOfflineSessions
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
return new InfinispanUserSessionProvider(
|
return new InfinispanUserSessionProvider(
|
||||||
|
@ -162,8 +164,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
@Override
|
@Override
|
||||||
public void init(Config.Scope config) {
|
public void init(Config.Scope config) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
offlineSessionCacheEntryLifespanOverride = config.getInt("offlineSessionCacheEntryLifespanOverride", -1);
|
offlineSessionCacheEntryLifespanOverride = config.getInt(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, -1);
|
||||||
offlineClientSessionCacheEntryLifespanOverride = config.getInt("offlineClientSessionCacheEntryLifespanOverride", -1);
|
offlineClientSessionCacheEntryLifespanOverride = config.getInt(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, -1);
|
||||||
|
maxBatchSize = config.getInt(CONFIG_MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -212,10 +215,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
persistentSessionsWorker = new PersistentSessionsWorker(factory, asyncQueueUserSessions,
|
persistentSessionsWorker = new PersistentSessionsWorker(factory,
|
||||||
asyncQueueUserOfflineSessions,
|
asyncQueuePersistentUpdate,
|
||||||
asyncQueueClientSessions,
|
maxBatchSize);
|
||||||
asyncQueueClientOfflineSessions);
|
|
||||||
persistentSessionsWorker.start();
|
persistentSessionsWorker.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,9 +454,37 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
@Override
|
@Override
|
||||||
public Map<String, String> getOperationalInfo() {
|
public Map<String, String> getOperationalInfo() {
|
||||||
Map<String, String> info = new HashMap<>();
|
Map<String, String> info = new HashMap<>();
|
||||||
info.put("offlineSessionCacheEntryLifespanOverride", Long.toString(offlineSessionCacheEntryLifespanOverride));
|
info.put(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, Long.toString(offlineSessionCacheEntryLifespanOverride));
|
||||||
info.put("offlineClientSessionCacheEntryLifespanOverride", Long.toString(offlineClientSessionCacheEntryLifespanOverride));
|
info.put(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, Long.toString(offlineClientSessionCacheEntryLifespanOverride));
|
||||||
|
info.put(CONFIG_MAX_BATCH_SIZE, Integer.toString(maxBatchSize));
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ProviderConfigProperty> getConfigMetadata() {
|
||||||
|
ProviderConfigurationBuilder builder = ProviderConfigurationBuilder.create();
|
||||||
|
|
||||||
|
builder.property()
|
||||||
|
.name(CONFIG_MAX_BATCH_SIZE)
|
||||||
|
.type("int")
|
||||||
|
.helpText("Maximum size of a batch size (only applicable to persistent sessions")
|
||||||
|
.defaultValue(DEFAULT_MAX_BATCH_SIZE)
|
||||||
|
.add();
|
||||||
|
|
||||||
|
builder.property()
|
||||||
|
.name(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE)
|
||||||
|
.type("int")
|
||||||
|
.helpText("Override how long offline client sessions should be kept in memory")
|
||||||
|
.add();
|
||||||
|
|
||||||
|
builder.property()
|
||||||
|
.name(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE)
|
||||||
|
.type("int")
|
||||||
|
.helpText("Override how long offline user sessions should be kept in memory")
|
||||||
|
.add();
|
||||||
|
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter;
|
||||||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
|
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement;
|
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
||||||
|
@ -136,10 +136,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
||||||
SerializeExecutionsByKey<String> serializerOfflineSession,
|
SerializeExecutionsByKey<String> serializerOfflineSession,
|
||||||
SerializeExecutionsByKey<UUID> serializerClientSession,
|
SerializeExecutionsByKey<UUID> serializerClientSession,
|
||||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession,
|
SerializeExecutionsByKey<UUID> serializerOfflineClientSession,
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions,
|
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate) {
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions,
|
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions,
|
|
||||||
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions) {
|
|
||||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||||
}
|
}
|
||||||
|
@ -151,11 +148,15 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
||||||
this.offlineSessionCache = offlineSessionCache;
|
this.offlineSessionCache = offlineSessionCache;
|
||||||
this.offlineClientSessionCache = offlineClientSessionCache;
|
this.offlineClientSessionCache = offlineClientSessionCache;
|
||||||
|
|
||||||
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession, asyncQueueUserSessions);
|
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession,
|
||||||
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession, asyncQueueUserOfflineSessions);
|
asyncQueuePersistentUpdate);
|
||||||
|
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession,
|
||||||
|
asyncQueuePersistentUpdate);
|
||||||
|
|
||||||
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession, asyncQueueClientSessions);
|
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession,
|
||||||
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession, asyncQueueClientOfflineSessions);
|
asyncQueuePersistentUpdate);
|
||||||
|
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession,
|
||||||
|
asyncQueuePersistentUpdate);
|
||||||
|
|
||||||
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
||||||
|
|
||||||
|
@ -1055,16 +1056,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
||||||
return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
|
return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void processDeferredUserSessionElements(Collection<PersistentDeferredElement<String, UserSessionEntity>> batch, boolean offline) {
|
|
||||||
UserSessionPersistentChangelogBasedTransaction transaction = getTransaction(offline);
|
|
||||||
transaction.applyDeferredBatch(batch);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void processDeferredClientSessionElements(Collection<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> batch, boolean offline) {
|
|
||||||
ClientSessionPersistentChangelogBasedTransaction transaction = getClientSessionTransaction(offline);
|
|
||||||
transaction.applyDeferredBatch(batch);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
|
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
|
||||||
|
|
||||||
private final String clientUuid;
|
private final String clientUuid;
|
||||||
|
|
|
@ -250,11 +250,6 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
||||||
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
|
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDeferrable() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "setLastSessionRefresh(" + lastSessionRefresh + ')';
|
return "setLastSessionRefresh(" + lastSessionRefresh + ')';
|
||||||
|
|
|
@ -46,8 +46,8 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
||||||
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
|
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
|
||||||
|
|
||||||
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator,
|
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator,
|
||||||
UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey<UUID> serializer, ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueue) {
|
UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey<UUID> serializer, ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue);
|
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, batchingQueue);
|
||||||
this.keyGenerator = keyGenerator;
|
this.keyGenerator = keyGenerator;
|
||||||
this.userSessionTx = userSessionTx;
|
this.userSessionTx = userSessionTx;
|
||||||
}
|
}
|
||||||
|
@ -57,10 +57,14 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
||||||
if (myUpdates == null) {
|
if (myUpdates == null) {
|
||||||
SessionEntityWrapper<AuthenticatedClientSessionEntity> wrappedEntity = cache.get(key);
|
SessionEntityWrapper<AuthenticatedClientSessionEntity> wrappedEntity = cache.get(key);
|
||||||
if (wrappedEntity == null) {
|
if (wrappedEntity == null) {
|
||||||
|
LOG.debugf("client-session not found in cache for sessionId=%s, offline=%s, loading from persister", key, offline);
|
||||||
wrappedEntity = getSessionEntityFromPersister(realm, client, userSession);
|
wrappedEntity = getSessionEntityFromPersister(realm, client, userSession);
|
||||||
|
} else {
|
||||||
|
LOG.debugf("client-session found in cache for sessionId=%s, offline=%s", key, offline);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wrappedEntity == null) {
|
if (wrappedEntity == null) {
|
||||||
|
LOG.debugf("client-session not found in persister for sessionId=%s, offline=%s", key, offline);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,6 +102,7 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
||||||
|
|
||||||
SessionEntityWrapper<AuthenticatedClientSessionEntity> authenticatedClientSessionEntitySessionEntityWrapper = importClientSession(realm, client, userSession, clientSession);
|
SessionEntityWrapper<AuthenticatedClientSessionEntity> authenticatedClientSessionEntitySessionEntityWrapper = importClientSession(realm, client, userSession, clientSession);
|
||||||
if (authenticatedClientSessionEntitySessionEntityWrapper == null) {
|
if (authenticatedClientSessionEntitySessionEntityWrapper == null) {
|
||||||
|
LOG.debugf("client-session not imported from persister for sessionId=%s, offline=%s, removing from persister.", clientSession.getId(), offline);
|
||||||
persister.removeClientSession(userSession.getId(), client.getId(), offline);
|
persister.removeClientSession(userSession.getId(), client.getId(), offline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,25 +24,26 @@ import org.keycloak.connections.infinispan.InfinispanUtil;
|
||||||
import org.keycloak.models.sessions.infinispan.CacheDecorators;
|
import org.keycloak.models.sessions.infinispan.CacheDecorators;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
||||||
|
|
||||||
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
|
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
|
||||||
private final Cache<K, SessionEntityWrapper<V>> cache;
|
private final Cache<K, SessionEntityWrapper<V>> cache;
|
||||||
private final SerializeExecutionsByKey<K> serializer;
|
private final List<Supplier<CompletableFuture<?>>> changes = new LinkedList<>();
|
||||||
private final List<Runnable> changes = new LinkedList<>();
|
|
||||||
|
|
||||||
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache, SerializeExecutionsByKey<K> serializer) {
|
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache) {
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
this.serializer = serializer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
|
private CompletableFuture<?> runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
|
||||||
V session = sessionWrapper.getEntity();
|
V session = sessionWrapper.getEntity();
|
||||||
SessionUpdateTask.CacheOperation operation = task.getOperation(session);
|
SessionUpdateTask.CacheOperation operation = task.getOperation(session);
|
||||||
|
|
||||||
|
@ -52,82 +53,78 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
||||||
switch (operation) {
|
switch (operation) {
|
||||||
case REMOVE:
|
case REMOVE:
|
||||||
// Just remove it
|
// Just remove it
|
||||||
CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
||||||
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
||||||
.remove(key);
|
.removeAsyncEntry(key);
|
||||||
break;
|
|
||||||
case ADD:
|
case ADD:
|
||||||
CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
||||||
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
||||||
.put(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
|
.putAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS)
|
||||||
|
.thenAcceptAsync(v -> LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs()));
|
||||||
LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
|
||||||
break;
|
|
||||||
case ADD_IF_ABSENT:
|
case ADD_IF_ABSENT:
|
||||||
SessionEntityWrapper<V> existing = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).putIfAbsent(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
|
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).putIfAbsentAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS)
|
||||||
if (existing != null) {
|
.thenAccept(existing -> {
|
||||||
LOG.debugf("Existing entity in cache for key: %s . Will update it", key);
|
if (existing != null) {
|
||||||
|
LOG.debugf("Existing entity in cache for key: %s . Will update it", key);
|
||||||
|
|
||||||
// Apply updates on the existing entity and replace it
|
// Apply updates on the existing entity and replace it
|
||||||
task.runUpdate(existing.getEntity());
|
task.runUpdate(existing.getEntity());
|
||||||
|
|
||||||
replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs());
|
replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs()).join();
|
||||||
} else {
|
} else {
|
||||||
LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
}
|
}
|
||||||
break;
|
});
|
||||||
case REPLACE:
|
case REPLACE:
|
||||||
replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs());
|
return replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException("Unsupported state " + operation);
|
throw new IllegalStateException("Unsupported state " + operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
private CompletableFuture<?> replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
||||||
serializer.runSerialized(key, () -> {
|
// make one async attempt
|
||||||
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replaceAsync(key, oldVersionEntity, generateNewVersionAndWrapEntity(oldVersionEntity.getEntity(), oldVersionEntity.getLocalMetadata()), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS)
|
||||||
boolean replaced = false;
|
.thenAccept(replaced -> {
|
||||||
int iteration = 0;
|
int iteration = 0;
|
||||||
V session = oldVersion.getEntity();
|
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
||||||
|
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||||
|
iteration++;
|
||||||
|
|
||||||
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
V session = oldVersion.getEntity();
|
||||||
iteration++;
|
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
||||||
|
|
||||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
// Atomic cluster-aware replace
|
||||||
|
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
// Atomic cluster-aware replace
|
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
|
||||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
if (!replaced) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
||||||
|
}
|
||||||
|
backoff(iteration);
|
||||||
|
|
||||||
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
|
oldVersion = cache.get(key);
|
||||||
if (!replaced) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
|
||||||
}
|
|
||||||
backoff(iteration);
|
|
||||||
|
|
||||||
oldVersion = cache.get(key);
|
if (oldVersion == null) {
|
||||||
|
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (oldVersion == null) {
|
session = oldVersion.getEntity();
|
||||||
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
|
||||||
return;
|
task.runUpdate(session);
|
||||||
|
} else {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
session = oldVersion.getEntity();
|
if (!replaced) {
|
||||||
|
LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
|
||||||
task.runUpdate(session);
|
|
||||||
} else {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
|
||||||
|
|
||||||
if (!replaced) {
|
|
||||||
LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -153,6 +150,18 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyChanges() {
|
public void applyChanges() {
|
||||||
changes.forEach(Runnable::run);
|
if (!changes.isEmpty()) {
|
||||||
|
List<Throwable> exceptions = new ArrayList<>();
|
||||||
|
CompletableFuture.allOf(changes.stream().map(s -> s.get().exceptionally(throwable -> {
|
||||||
|
exceptions.add(throwable);
|
||||||
|
return null;
|
||||||
|
})).toArray(CompletableFuture[]::new)).join();
|
||||||
|
// If any of those futures has failed, add the exceptions as suppressed exceptions to our runtime exception
|
||||||
|
if (!exceptions.isEmpty()) {
|
||||||
|
RuntimeException ex = new RuntimeException("unable to complete the session updates");
|
||||||
|
exceptions.forEach(ex::addSuppressed);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
package org.keycloak.models.sessions.infinispan.changes;
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
import org.infinispan.util.function.TriConsumer;
|
import org.infinispan.util.function.TriConsumer;
|
||||||
import org.keycloak.common.util.Retry;
|
import org.jboss.logging.Logger;
|
||||||
import org.keycloak.models.AuthenticatedClientSessionModel;
|
import org.keycloak.models.AuthenticatedClientSessionModel;
|
||||||
import org.keycloak.models.ClientModel;
|
import org.keycloak.models.ClientModel;
|
||||||
import org.keycloak.models.KeycloakSession;
|
import org.keycloak.models.KeycloakSession;
|
||||||
|
@ -33,11 +33,11 @@ import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessi
|
||||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
|
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
|
||||||
import org.keycloak.models.utils.RealmModelDelegate;
|
import org.keycloak.models.utils.RealmModelDelegate;
|
||||||
import org.keycloak.models.utils.UserModelDelegate;
|
import org.keycloak.models.utils.UserModelDelegate;
|
||||||
import org.keycloak.models.utils.UserSessionModelDelegate;
|
import org.keycloak.models.utils.UserSessionModelDelegate;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -45,7 +45,8 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.function.Consumer;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
||||||
|
@ -53,23 +54,29 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.O
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
|
||||||
|
|
||||||
public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
||||||
|
private static final Logger LOG = Logger.getLogger(JpaChangesPerformer.class);
|
||||||
|
|
||||||
private final KeycloakSession session;
|
|
||||||
private final String cacheName;
|
private final String cacheName;
|
||||||
private final boolean offline;
|
private final boolean offline;
|
||||||
private final List<Consumer<KeycloakSession>> changes = new LinkedList<>();
|
private final List<PersistentUpdate> changes = new LinkedList<>();
|
||||||
private final TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor;
|
private final TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor;
|
||||||
|
private final ArrayBlockingQueue<PersistentUpdate> batchingQueue;
|
||||||
|
|
||||||
public JpaChangesPerformer(KeycloakSession session, String cacheName, boolean offline) {
|
public JpaChangesPerformer(String cacheName, boolean offline, ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||||
this.session = session;
|
|
||||||
this.cacheName = cacheName;
|
this.cacheName = cacheName;
|
||||||
this.offline = offline;
|
this.offline = offline;
|
||||||
|
this.batchingQueue = batchingQueue;
|
||||||
processor = processor();
|
processor = processor();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean benefitsFromBatching() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
public void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
||||||
changes.add(innerSession -> processor.accept(innerSession, entry, merged));
|
changes.add(new PersistentUpdate(innerSession -> processor.accept(innerSession, entry, merged)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor() {
|
private TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor() {
|
||||||
|
@ -80,12 +87,39 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean warningShown = false;
|
||||||
|
|
||||||
|
private void offer(PersistentUpdate update) {
|
||||||
|
if (!batchingQueue.offer(update)) {
|
||||||
|
if (!warningShown) {
|
||||||
|
warningShown = true;
|
||||||
|
LOG.warn("Queue is full, will block");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// this will block until there is a free spot in the queue
|
||||||
|
batchingQueue.put(update);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyChanges() {
|
public void applyChanges() {
|
||||||
if (changes.size() > 0) {
|
if (!changes.isEmpty()) {
|
||||||
Retry.executeWithBackoff(iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
|
changes.forEach(this::offer);
|
||||||
innerSession -> changes.forEach(c -> c.accept(innerSession))),
|
List<Throwable> exceptions = new ArrayList<>();
|
||||||
10, 10);
|
CompletableFuture.allOf(changes.stream().map(f -> f.future().exceptionally(throwable -> {
|
||||||
|
exceptions.add(throwable);
|
||||||
|
return null;
|
||||||
|
})).toArray(CompletableFuture[]::new)).join();
|
||||||
|
// If any of those futures has failed, add the exceptions as suppressed exceptions to our runtime exception
|
||||||
|
if (!exceptions.isEmpty()) {
|
||||||
|
RuntimeException ex = new RuntimeException("unable to complete the session updates");
|
||||||
|
exceptions.forEach(ex::addSuppressed);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,12 +36,6 @@ public class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<
|
||||||
private CrossDCMessageStatus crossDCMessageStatus;
|
private CrossDCMessageStatus crossDCMessageStatus;
|
||||||
private final long lifespanMs;
|
private final long lifespanMs;
|
||||||
private final long maxIdleTimeMs;
|
private final long maxIdleTimeMs;
|
||||||
private boolean isDeferrable;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDeferrable() {
|
|
||||||
return isDeferrable;
|
|
||||||
}
|
|
||||||
|
|
||||||
private MergedUpdate(CacheOperation operation, CrossDCMessageStatus crossDCMessageStatus, long lifespanMs, long maxIdleTimeMs) {
|
private MergedUpdate(CacheOperation operation, CrossDCMessageStatus crossDCMessageStatus, long lifespanMs, long maxIdleTimeMs) {
|
||||||
this.operation = operation;
|
this.operation = operation;
|
||||||
|
@ -83,11 +77,7 @@ public class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<
|
||||||
|
|
||||||
MergedUpdate<S> result = null;
|
MergedUpdate<S> result = null;
|
||||||
S session = sessionWrapper.getEntity();
|
S session = sessionWrapper.getEntity();
|
||||||
boolean isDeferrable = true;
|
|
||||||
for (SessionUpdateTask<S> child : childUpdates) {
|
for (SessionUpdateTask<S> child : childUpdates) {
|
||||||
if (!child.isDeferrable()) {
|
|
||||||
isDeferrable = false;
|
|
||||||
}
|
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
CacheOperation operation = child.getOperation(session);
|
CacheOperation operation = child.getOperation(session);
|
||||||
|
|
||||||
|
@ -123,20 +113,12 @@ public class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<
|
||||||
result.childUpdates.add(child);
|
result.childUpdates.add(child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (result != null) {
|
|
||||||
result.setDeferable(isDeferrable);
|
|
||||||
}
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setDeferable(boolean isDeferrable) {
|
|
||||||
this.isDeferrable = isDeferrable;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MergedUpdate" + childUpdates;
|
return "MergedUpdate" + childUpdates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.keycloak.models.sessions.infinispan.SessionFunction;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
@ -41,13 +39,11 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
|
|
||||||
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
||||||
protected final boolean offline;
|
protected final boolean offline;
|
||||||
private final ArrayBlockingQueue<PersistentDeferredElement<K, V>> asyncQueue;
|
|
||||||
private Collection<PersistentDeferredElement<K, V>> batch;
|
|
||||||
|
|
||||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<K> serializer, ArrayBlockingQueue<PersistentDeferredElement<K, V>> asyncQueue) {
|
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<K> serializer,
|
||||||
|
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer);
|
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer);
|
||||||
this.offline = offline;
|
this.offline = offline;
|
||||||
this.asyncQueue = asyncQueue;
|
|
||||||
|
|
||||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||||
|
@ -56,12 +52,12 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE) &&
|
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE) &&
|
||||||
(cache.getName().equals(USER_SESSION_CACHE_NAME) || cache.getName().equals(CLIENT_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME))) {
|
(cache.getName().equals(USER_SESSION_CACHE_NAME) || cache.getName().equals(CLIENT_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME))) {
|
||||||
changesPerformers = List.of(
|
changesPerformers = List.of(
|
||||||
new JpaChangesPerformer<>(session, cache.getName(), offline)
|
new JpaChangesPerformer<>(cache.getName(), offline, batchingQueue)
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
changesPerformers = List.of(
|
changesPerformers = List.of(
|
||||||
new JpaChangesPerformer<>(session, cache.getName(), offline),
|
new JpaChangesPerformer<>(cache.getName(), offline, batchingQueue),
|
||||||
new EmbeddedCachesChangesPerformer<>(cache, serializer),
|
new EmbeddedCachesChangesPerformer<>(cache),
|
||||||
new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker)
|
new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -84,20 +80,10 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
||||||
|
|
||||||
if (merged != null) {
|
if (merged != null) {
|
||||||
if (merged.isDeferrable()) {
|
changesPerformers.forEach(p -> p.registerChange(entry, merged));
|
||||||
asyncQueue.add(new PersistentDeferredElement<>(entry, merged));
|
|
||||||
} else {
|
|
||||||
changesPerformers.forEach(p -> p.registerChange(entry, merged));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (batch != null) {
|
|
||||||
batch.forEach(o -> {
|
|
||||||
changesPerformers.forEach(p -> p.registerChange(o.getEntry(), o.getMerged()));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,11 +92,4 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void applyDeferredBatch(Collection<PersistentDeferredElement<K, V>> batchToApply) {
|
|
||||||
if (this.batch == null) {
|
|
||||||
this.batch = new ArrayList<>(batchToApply.size());
|
|
||||||
}
|
|
||||||
batch.addAll(batchToApply);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,18 +18,14 @@
|
||||||
package org.keycloak.models.sessions.infinispan.changes;
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
import org.keycloak.common.util.Retry;
|
||||||
import org.keycloak.models.KeycloakSessionFactory;
|
import org.keycloak.models.KeycloakSessionFactory;
|
||||||
import org.keycloak.models.UserSessionProvider;
|
|
||||||
import org.keycloak.models.sessions.infinispan.PersistentUserSessionProvider;
|
|
||||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
|
||||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
|
||||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -42,57 +38,35 @@ public class PersistentSessionsWorker {
|
||||||
private static final Logger LOG = Logger.getLogger(PersistentSessionsWorker.class);
|
private static final Logger LOG = Logger.getLogger(PersistentSessionsWorker.class);
|
||||||
|
|
||||||
private final KeycloakSessionFactory factory;
|
private final KeycloakSessionFactory factory;
|
||||||
private final ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions;
|
private final ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate;
|
||||||
private final ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions;
|
private final int maxBatchSize;
|
||||||
private final ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions;
|
|
||||||
private final ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions;
|
|
||||||
private final List<Thread> threads = new ArrayList<>();
|
private final List<Thread> threads = new ArrayList<>();
|
||||||
private volatile boolean stop;
|
private volatile boolean stop;
|
||||||
|
|
||||||
public PersistentSessionsWorker(KeycloakSessionFactory factory, ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions, ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions, ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions, ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions) {
|
public PersistentSessionsWorker(KeycloakSessionFactory factory,
|
||||||
|
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate, int maxBatchSize) {
|
||||||
this.factory = factory;
|
this.factory = factory;
|
||||||
this.asyncQueueUserSessions = asyncQueueUserSessions;
|
this.asyncQueuePersistentUpdate = asyncQueuePersistentUpdate;
|
||||||
this.asyncQueueUserOfflineSessions = asyncQueueUserOfflineSessions;
|
this.maxBatchSize = maxBatchSize;
|
||||||
this.asyncQueueClientSessions = asyncQueueClientSessions;
|
|
||||||
this.asyncQueueClientOfflineSessions = asyncQueueClientOfflineSessions;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
threads.add(new WorkerUserSession(asyncQueueUserSessions, false));
|
threads.add(new BatchWorker(asyncQueuePersistentUpdate));
|
||||||
threads.add(new WorkerUserSession(asyncQueueUserOfflineSessions, true));
|
|
||||||
threads.add(new WorkerClientSession(asyncQueueClientSessions, false));
|
|
||||||
threads.add(new WorkerClientSession(asyncQueueClientOfflineSessions, true));
|
|
||||||
threads.forEach(Thread::start);
|
threads.forEach(Thread::start);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class WorkerUserSession extends Worker<String, UserSessionEntity> {
|
private class BatchWorker extends Thread {
|
||||||
public WorkerUserSession(ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> queue, boolean offline) {
|
private final ArrayBlockingQueue<PersistentUpdate> queue;
|
||||||
super(queue, offline, PersistentUserSessionProvider::processDeferredUserSessionElements);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class WorkerClientSession extends Worker<UUID, AuthenticatedClientSessionEntity> {
|
public BatchWorker(ArrayBlockingQueue<PersistentUpdate> queue) {
|
||||||
public WorkerClientSession(ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> queue, boolean offline) {
|
|
||||||
super(queue, offline, PersistentUserSessionProvider::processDeferredClientSessionElements);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Worker<K, V extends SessionEntity> extends Thread {
|
|
||||||
private final ArrayBlockingQueue<PersistentDeferredElement<K, V>> queue;
|
|
||||||
private final boolean offline;
|
|
||||||
private final Adapter<K, V> adapter;
|
|
||||||
|
|
||||||
public Worker(ArrayBlockingQueue<PersistentDeferredElement<K, V>> queue, boolean offline, Adapter<K, V> adapter) {
|
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.offline = offline;
|
|
||||||
this.adapter = adapter;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
Thread.currentThread().setName(this.getClass().getName() + " for " + (offline ? "offline" : "online") + " sessions");
|
Thread.currentThread().setName(this.getClass().getName());
|
||||||
while (!stop) {
|
while (!stop) {
|
||||||
try {
|
try {
|
||||||
process(queue, offline);
|
process(queue);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
break;
|
break;
|
||||||
|
@ -100,25 +74,49 @@ public class PersistentSessionsWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void process(ArrayBlockingQueue<PersistentDeferredElement<K, V>> queue, boolean offline) throws InterruptedException {
|
private void process(ArrayBlockingQueue<PersistentUpdate> queue) throws InterruptedException {
|
||||||
Collection<PersistentDeferredElement<K, V>> batch = new ArrayList<>();
|
ArrayList<PersistentUpdate> batch = new ArrayList<>();
|
||||||
PersistentDeferredElement<K, V> polled = queue.poll(100, TimeUnit.MILLISECONDS);
|
PersistentUpdate polled = queue.poll(100, TimeUnit.MILLISECONDS);
|
||||||
if (polled != null) {
|
if (polled != null) {
|
||||||
batch.add(polled);
|
batch.add(polled);
|
||||||
queue.drainTo(batch, 99);
|
queue.drainTo(batch, maxBatchSize - 1);
|
||||||
try {
|
try {
|
||||||
LOG.debugf("Processing %d deferred session updates.", batch.size());
|
LOG.debugf("Processing %d deferred session updates.", batch.size());
|
||||||
KeycloakModelUtils.runJobInTransaction(factory,
|
Retry.executeWithBackoff(iteration -> {
|
||||||
session -> adapter.run(((PersistentUserSessionProvider) session.getProvider(UserSessionProvider.class)), batch, offline));
|
if (iteration < 2) {
|
||||||
|
// attempt to write whole batch in the first two attempts
|
||||||
|
KeycloakModelUtils.runJobInTransaction(factory,
|
||||||
|
innerSession -> batch.forEach(c -> c.perform(innerSession)));
|
||||||
|
batch.forEach(PersistentUpdate::complete);
|
||||||
|
} else {
|
||||||
|
LOG.warnf("Running single changes in iteration %d for %d entries", iteration, batch.size());
|
||||||
|
ArrayList<PersistentUpdate> performedChanges = new ArrayList<>();
|
||||||
|
List<Throwable> throwables = new ArrayList<>();
|
||||||
|
batch.forEach(change -> {
|
||||||
|
try {
|
||||||
|
KeycloakModelUtils.runJobInTransaction(factory,
|
||||||
|
change::perform);
|
||||||
|
change.complete();
|
||||||
|
performedChanges.add(change);
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
throwables.add(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
batch.removeAll(performedChanges);
|
||||||
|
if (!throwables.isEmpty()) {
|
||||||
|
RuntimeException ex = new RuntimeException("unable to complete some changes");
|
||||||
|
throwables.forEach(ex::addSuppressed);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Duration.of(10, ChronoUnit.SECONDS), 0);
|
||||||
} catch (RuntimeException ex) {
|
} catch (RuntimeException ex) {
|
||||||
LOG.warnf(ex, "Unable to write %d deferred session updates", queue.size());
|
batch.forEach(o -> o.fail(ex));
|
||||||
|
LOG.warnf(ex, "Unable to write %d deferred session updates", batch.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Adapter<K, V extends SessionEntity> {
|
|
||||||
void run(PersistentUserSessionProvider sessionProvider, Collection<PersistentDeferredElement<K, V>> batch, boolean offline);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
|
|
@ -17,29 +17,38 @@
|
||||||
|
|
||||||
package org.keycloak.models.sessions.infinispan.changes;
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
import org.keycloak.models.KeycloakSession;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Capture information for a deferred update of the session stores.
|
* Capture information for a deferred update of the session stores.
|
||||||
*
|
*
|
||||||
* @author Alexander Schwartz
|
* @author Alexander Schwartz
|
||||||
*/
|
*/
|
||||||
public class PersistentDeferredElement<K, V extends SessionEntity> {
|
public class PersistentUpdate {
|
||||||
private final Map.Entry<K, SessionUpdatesList<V>> entry;
|
|
||||||
private final MergedUpdate<V> merged;
|
|
||||||
|
|
||||||
public PersistentDeferredElement(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
private final Consumer<KeycloakSession> task;
|
||||||
this.entry = entry;
|
private final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
this.merged = merged;
|
|
||||||
|
public PersistentUpdate(Consumer<KeycloakSession> task) {
|
||||||
|
this.task = task;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map.Entry<K, SessionUpdatesList<V>> getEntry() {
|
public void perform(KeycloakSession session) {
|
||||||
return entry;
|
task.accept(session);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MergedUpdate<V> getMerged() {
|
public void complete() {
|
||||||
return merged;
|
future.complete(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fail(Throwable throwable) {
|
||||||
|
future.completeExceptionally(throwable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Void> future() {
|
||||||
|
return future;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -24,5 +24,9 @@ import java.util.Map;
|
||||||
public interface SessionChangesPerformer<K, V extends SessionEntity> {
|
public interface SessionChangesPerformer<K, V extends SessionEntity> {
|
||||||
void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged);
|
void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged);
|
||||||
|
|
||||||
|
default boolean benefitsFromBatching() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void applyChanges();
|
void applyChanges();
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,10 +30,6 @@ public interface SessionUpdateTask<S extends SessionEntity> {
|
||||||
|
|
||||||
CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<S> sessionWrapper);
|
CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<S> sessionWrapper);
|
||||||
|
|
||||||
default boolean isDeferrable() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
enum CacheOperation {
|
enum CacheOperation {
|
||||||
|
|
||||||
ADD,
|
ADD,
|
||||||
|
|
|
@ -42,8 +42,9 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U
|
||||||
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
|
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
|
||||||
|
|
||||||
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
|
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
|
||||||
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<String> serializer, ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueue) {
|
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<String> serializer,
|
||||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue);
|
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||||
|
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, batchingQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key) {
|
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key) {
|
||||||
|
@ -54,10 +55,14 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
||||||
wrappedEntity = cache.get(key);
|
wrappedEntity = cache.get(key);
|
||||||
}
|
}
|
||||||
if (wrappedEntity == null) {
|
if (wrappedEntity == null) {
|
||||||
|
LOG.debugf("user-session not found in cache for sessionId=%s offline=%s, loading from persister", key, offline);
|
||||||
wrappedEntity = getSessionEntityFromPersister(realm, key);
|
wrappedEntity = getSessionEntityFromPersister(realm, key);
|
||||||
|
} else {
|
||||||
|
LOG.debugf("user-session found in cache for sessionId=%s offline=%s %s", key, offline, wrappedEntity.getEntity().getLastSessionRefresh());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wrappedEntity == null) {
|
if (wrappedEntity == null) {
|
||||||
|
LOG.debugf("user-session not found in persister for sessionId=%s offline=%s", key, offline);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -165,13 +165,15 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
|
||||||
if (sessionEntity != null) {
|
if (sessionEntity != null) {
|
||||||
em.remove(sessionEntity);
|
em.remove(sessionEntity);
|
||||||
|
|
||||||
// Remove userSession if it was last clientSession
|
if (offline) {
|
||||||
List<PersistentClientSessionEntity> clientSessions = getClientSessionsByUserSession(sessionEntity.getUserSessionId(), offline);
|
// Remove userSession if it was last clientSession
|
||||||
if (clientSessions.size() == 0 && offline) {
|
List<PersistentClientSessionEntity> clientSessions = getClientSessionsByUserSession(sessionEntity.getUserSessionId(), offline);
|
||||||
offlineStr = offlineToString(offline);
|
if (clientSessions.isEmpty()) {
|
||||||
PersistentUserSessionEntity userSessionEntity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(sessionEntity.getUserSessionId(), offlineStr), LockModeType.PESSIMISTIC_WRITE);
|
offlineStr = offlineToString(offline);
|
||||||
if (userSessionEntity != null) {
|
PersistentUserSessionEntity userSessionEntity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(sessionEntity.getUserSessionId(), offlineStr), LockModeType.PESSIMISTIC_WRITE);
|
||||||
em.remove(userSessionEntity);
|
if (userSessionEntity != null) {
|
||||||
|
em.remove(userSessionEntity);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -211,5 +211,16 @@ public class PersistentClientSessionEntity {
|
||||||
result = 31 * result + (this.offline != null ? this.offline.hashCode() : 0);
|
result = 31 * result + (this.offline != null ? this.offline.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "PersistentClientSessionEntity$Key[" +
|
||||||
|
"userSessionId='" + userSessionId + '\'' +
|
||||||
|
", clientId='" + clientId + '\'' +
|
||||||
|
", clientStorageProvider='" + clientStorageProvider + '\'' +
|
||||||
|
", externalClientId='" + externalClientId + '\'' +
|
||||||
|
", offline='" + offline + '\'' +
|
||||||
|
']';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,5 +201,13 @@ public class PersistentUserSessionEntity {
|
||||||
result = 31 * result + (this.offline != null ? this.offline.hashCode() : 0);
|
result = 31 * result + (this.offline != null ? this.offline.hashCode() : 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "PersistentUserSessionEntity$Key [" +
|
||||||
|
"userSessionId='" + userSessionId + '\'' +
|
||||||
|
", offline='" + offline + '\'' +
|
||||||
|
']';
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.jboss.logging.Logger;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractKeycloakTransaction implements KeycloakTransaction {
|
public abstract class AbstractKeycloakTransaction implements KeycloakTransaction {
|
||||||
|
|
||||||
public static final Logger logger = Logger.getLogger(AbstractKeycloakTransaction.class);
|
|
||||||
|
|
||||||
protected TransactionState state = TransactionState.NOT_STARTED;
|
protected TransactionState state = TransactionState.NOT_STARTED;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -219,6 +219,13 @@
|
||||||
</properties>
|
</properties>
|
||||||
</profile>
|
</profile>
|
||||||
|
|
||||||
|
<profile>
|
||||||
|
<id>jpa+persistentsessions</id>
|
||||||
|
<properties>
|
||||||
|
<keycloak.model.parameters>Jpa,PersistentUserSessionsNoCache</keycloak.model.parameters>
|
||||||
|
</properties>
|
||||||
|
</profile>
|
||||||
|
|
||||||
<profile>
|
<profile>
|
||||||
<id>jpa+infinispan+client-storage</id>
|
<id>jpa+infinispan+client-storage</id>
|
||||||
<properties>
|
<properties>
|
||||||
|
|
|
@ -36,6 +36,5 @@ public class PersistentUserSessions extends KeycloakModelParameters {
|
||||||
|
|
||||||
public static void updateConfigForJpa(Config cf) {
|
public static void updateConfigForJpa(Config cf) {
|
||||||
System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS), "enabled");
|
System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS), "enabled");
|
||||||
System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE), "enabled");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.keycloak.testsuite.model.parameters;
|
||||||
|
|
||||||
|
import org.keycloak.common.Profile;
|
||||||
|
import org.keycloak.common.profile.PropertiesProfileConfigResolver;
|
||||||
|
import org.keycloak.testsuite.model.Config;
|
||||||
|
import org.keycloak.testsuite.model.KeycloakModelParameters;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class PersistentUserSessionsNoCache extends KeycloakModelParameters {
|
||||||
|
|
||||||
|
public PersistentUserSessionsNoCache() {
|
||||||
|
super(Collections.emptySet(), Collections.emptySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateConfig(Config cf) {
|
||||||
|
updateConfigForJpa(cf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void updateConfigForJpa(Config cf) {
|
||||||
|
System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS), "enabled");
|
||||||
|
System.getProperties().put(PropertiesProfileConfigResolver.getPropertyKey(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE), "enabled");
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import org.junit.Assert;
|
||||||
import org.junit.FixMethodOrder;
|
import org.junit.FixMethodOrder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runners.MethodSorters;
|
import org.junit.runners.MethodSorters;
|
||||||
|
import org.keycloak.common.Profile;
|
||||||
import org.keycloak.common.util.Retry;
|
import org.keycloak.common.util.Retry;
|
||||||
import org.keycloak.common.util.Time;
|
import org.keycloak.common.util.Time;
|
||||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||||
|
@ -34,6 +35,7 @@ import org.keycloak.models.UserModel;
|
||||||
import org.keycloak.models.UserSessionModel;
|
import org.keycloak.models.UserSessionModel;
|
||||||
import org.keycloak.models.UserProvider;
|
import org.keycloak.models.UserProvider;
|
||||||
import org.keycloak.models.UserSessionProvider;
|
import org.keycloak.models.UserSessionProvider;
|
||||||
|
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||||
import org.keycloak.protocol.oidc.OIDCConfigAttributes;
|
import org.keycloak.protocol.oidc.OIDCConfigAttributes;
|
||||||
import org.keycloak.protocol.oidc.OIDCLoginProtocol;
|
import org.keycloak.protocol.oidc.OIDCLoginProtocol;
|
||||||
import org.keycloak.testsuite.model.HotRodServerRule;
|
import org.keycloak.testsuite.model.HotRodServerRule;
|
||||||
|
@ -276,20 +278,25 @@ public class SessionTimeoutsTest extends KeycloakModelTest {
|
||||||
clientSession.setTimestamp(time);
|
clientSession.setTimestamp(time);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
// The persistent session will write the update data asynchronously, wait for it to arrive.
|
|
||||||
Retry.executeWithBackoff(iteration -> {
|
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||||
withRealm(realmId, (session, realm) -> {
|
// The persistent session will write the update data asynchronously, wait for it to arrive.
|
||||||
// refresh sessions before user session expires => both session should exist
|
Retry.executeWithBackoff(iteration -> {
|
||||||
ClientModel client = realm.getClientByClientId("test-app");
|
withRealm(realmId, (session, realm) -> {
|
||||||
UserSessionModel userSession = getUserSession(session, realm, sessions[0], offline);
|
UserSessionPersisterProvider provider = session.getProvider(UserSessionPersisterProvider.class);
|
||||||
Assert.assertNotNull(userSession);
|
UserSessionModel userSessionModel = provider.loadUserSession(realm, sessions[0], offline);
|
||||||
AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId());
|
Assert.assertNotNull(userSessionModel);
|
||||||
Assert.assertNotNull(clientSession);
|
Assert.assertEquals(userSessionModel.getLastSessionRefresh(), time);
|
||||||
Assert.assertEquals(userSession.getLastSessionRefresh(), time);
|
|
||||||
Assert.assertEquals(clientSession.getTimestamp(), time);
|
// refresh sessions before user session expires => both session should exist
|
||||||
return null;
|
ClientModel client = realm.getClientByClientId("test-app");
|
||||||
});
|
AuthenticatedClientSessionModel clientSession = userSessionModel.getAuthenticatedClientSessionByClient(client.getId());
|
||||||
}, 10, 10);
|
Assert.assertNotNull(clientSession);
|
||||||
|
Assert.assertEquals(clientSession.getTimestamp(), time);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}, 10, 10);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += 2100;
|
offset += 2100;
|
||||||
|
|
Loading…
Reference in a new issue