diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java index 5b7197bbcf..b4a566a870 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java @@ -144,6 +144,9 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes @Override public void runUpdate(AuthenticatedClientSessionEntity entity) { + if (entity.getTimestamp() >= timestamp) { + return; + } entity.setTimestamp(timestamp); } @@ -153,6 +156,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes .shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp); } + @Override + public boolean isDeferrable() { + return true; + } + @Override public String toString() { return "setTimestamp(" + timestamp + ')'; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java index ed52ed7aab..b144aca376 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java @@ -36,6 +36,8 @@ import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionProvider; import org.keycloak.models.UserSessionProviderFactory; import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; +import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement; +import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory; import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore; @@ -67,6 +69,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import static org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory.PROVIDER_PRIORITY; @@ -98,6 +101,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider SerializeExecutionsByKey serializerOfflineSession = new SerializeExecutionsByKey<>(); SerializeExecutionsByKey serializerClientSession = new SerializeExecutionsByKey<>(); SerializeExecutionsByKey serializerOfflineClientSession = new SerializeExecutionsByKey<>(); + ArrayBlockingQueue> asyncQueueUserSessions = new ArrayBlockingQueue<>(1000); + ArrayBlockingQueue> asyncQueueUserOfflineSessions = new ArrayBlockingQueue<>(1000); + ArrayBlockingQueue> asyncQueueClientSessions = new ArrayBlockingQueue<>(1000); + ArrayBlockingQueue> asyncQueueClientOfflineSessions = new ArrayBlockingQueue<>(1000); + private PersistentSessionsWorker persistentSessionsWorker; @Override public UserSessionProvider create(KeycloakSession session) { @@ -124,7 +132,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider serializerSession, serializerOfflineSession, serializerClientSession, - serializerOfflineClientSession + serializerOfflineClientSession, + asyncQueueUserSessions, + asyncQueueUserOfflineSessions, + asyncQueueClientSessions, + asyncQueueClientOfflineSessions ); } return new InfinispanUserSessionProvider( @@ -200,6 +212,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider } } }); + persistentSessionsWorker = new PersistentSessionsWorker(factory, asyncQueueUserSessions, + asyncQueueUserOfflineSessions, + asyncQueueClientSessions, + asyncQueueClientOfflineSessions); + persistentSessionsWorker.start(); } // Max count of worker errors. Initialization will end with exception when this number is reached @@ -419,6 +436,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider @Override public void close() { + persistentSessionsWorker.stop(); } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java index 4df31db468..9e939bb3ce 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java @@ -43,6 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter; import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction; import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction; +import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement; import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; @@ -73,6 +74,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -133,7 +135,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi SerializeExecutionsByKey serializerSession, SerializeExecutionsByKey serializerOfflineSession, SerializeExecutionsByKey serializerClientSession, - SerializeExecutionsByKey serializerOfflineClientSession) { + SerializeExecutionsByKey serializerOfflineClientSession, + ArrayBlockingQueue> asyncQueueUserSessions, + ArrayBlockingQueue> asyncQueueUserOfflineSessions, + ArrayBlockingQueue> asyncQueueClientSessions, + ArrayBlockingQueue> asyncQueueClientOfflineSessions) { if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { throw new IllegalStateException("Persistent user sessions are not enabled"); } @@ -145,11 +151,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi this.offlineSessionCache = offlineSessionCache; this.offlineClientSessionCache = offlineClientSessionCache; - this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession); - this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession); + this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession, asyncQueueUserSessions); + this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession, asyncQueueUserOfflineSessions); - this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession); - this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession); + this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession, asyncQueueClientSessions); + this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession, asyncQueueClientOfflineSessions); this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session); @@ -1049,6 +1055,16 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG; } + public void processDeferredUserSessionElements(Collection> batch, boolean offline) { + UserSessionPersistentChangelogBasedTransaction transaction = getTransaction(offline); + transaction.applyDeferredBatch(batch); + } + + public void processDeferredClientSessionElements(Collection> batch, boolean offline) { + ClientSessionPersistentChangelogBasedTransaction transaction = getClientSessionTransaction(offline); + transaction.applyDeferredBatch(batch); + } + private static class RegisterClientSessionTask implements SessionUpdateTask { private final String clientUuid; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java index bccbf98df5..c40e9682b9 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java @@ -238,6 +238,9 @@ public class UserSessionAdapter= lastSessionRefresh) { + return; + } entity.setLastSessionRefresh(lastSessionRefresh); } @@ -247,6 +250,11 @@ public class UserSessionAdapter { @@ -44,8 +45,9 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent private final InfinispanKeyGenerator keyGenerator; private final UserSessionPersistentChangelogBasedTransaction userSessionTx; - public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey serializer) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer); + public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, + UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey serializer, ArrayBlockingQueue> asyncQueue) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue); this.keyGenerator = keyGenerator; this.userSessionTx = userSessionTx; } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java index 7b6ffaa19e..780db27e30 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/JpaChangesPerformer.java @@ -82,9 +82,11 @@ public class JpaChangesPerformer implements SessionC @Override public void applyChanges() { - Retry.executeWithBackoff(iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), - innerSession -> changes.forEach(c -> c.accept(innerSession))), - 10, 10); + if (changes.size() > 0) { + Retry.executeWithBackoff(iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), + innerSession -> changes.forEach(c -> c.accept(innerSession))), + 10, 10); + } } private void processClientSessionUpdate(KeycloakSession innerSession, Map.Entry> entry, MergedUpdate merged) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java index eaa7bb562a..2fb323473c 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/MergedUpdate.java @@ -36,7 +36,12 @@ public class MergedUpdate implements SessionUpdateTask< private CrossDCMessageStatus crossDCMessageStatus; private final long lifespanMs; private final long maxIdleTimeMs; + private boolean isDeferrable; + @Override + public boolean isDeferrable() { + return isDeferrable; + } private MergedUpdate(CacheOperation operation, CrossDCMessageStatus crossDCMessageStatus, long lifespanMs, long maxIdleTimeMs) { this.operation = operation; @@ -78,7 +83,11 @@ public class MergedUpdate implements SessionUpdateTask< MergedUpdate result = null; S session = sessionWrapper.getEntity(); + boolean isDeferrable = true; for (SessionUpdateTask child : childUpdates) { + if (!child.isDeferrable()) { + isDeferrable = false; + } if (result == null) { CacheOperation operation = child.getOperation(session); @@ -114,10 +123,16 @@ public class MergedUpdate implements SessionUpdateTask< result.childUpdates.add(child); } } - + if (result != null) { + result.setDeferable(isDeferrable); + } return result; } + private void setDeferable(boolean isDeferrable) { + this.isDeferrable = isDeferrable; + } + @Override public String toString() { return "MergedUpdate" + childUpdates; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentDeferredElement.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentDeferredElement.java new file mode 100644 index 0000000000..91133104d2 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentDeferredElement.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes; + +import org.keycloak.models.sessions.infinispan.entities.SessionEntity; + +import java.util.Map; + +/** + * Capture information for a deferred update of the session stores. + * + * @author Alexander Schwartz + */ +public class PersistentDeferredElement { + private final Map.Entry> entry; + private final MergedUpdate merged; + + public PersistentDeferredElement(Map.Entry> entry, MergedUpdate merged) { + this.entry = entry; + this.merged = merged; + } + + public Map.Entry> getEntry() { + return entry; + } + + public MergedUpdate getMerged() { + return merged; + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java index d623a7220c..596a746dda 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java @@ -26,8 +26,11 @@ import org.keycloak.models.sessions.infinispan.SessionFunction; import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME; @@ -38,10 +41,13 @@ public class PersistentSessionsChangelogBasedTransaction> changesPerformers; protected final boolean offline; + private final ArrayBlockingQueue> asyncQueue; + private Collection> batch; - public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer) { + public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer, ArrayBlockingQueue> asyncQueue) { super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer); this.offline = offline; + this.asyncQueue = asyncQueue; if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) { throw new IllegalStateException("Persistent user sessions are not enabled"); @@ -78,10 +84,20 @@ public class PersistentSessionsChangelogBasedTransaction merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs); if (merged != null) { - changesPerformers.forEach(p -> p.registerChange(entry, merged)); + if (merged.isDeferrable()) { + asyncQueue.add(new PersistentDeferredElement<>(entry, merged)); + } else { + changesPerformers.forEach(p -> p.registerChange(entry, merged)); + } } } + if (batch != null) { + batch.forEach(o -> { + changesPerformers.forEach(p -> p.registerChange(o.getEntry(), o.getMerged())); + }); + } + changesPerformers.forEach(SessionChangesPerformer::applyChanges); } @@ -89,4 +105,12 @@ public class PersistentSessionsChangelogBasedTransaction> batchToApply) { + if (this.batch == null) { + this.batch = new ArrayList<>(batchToApply.size()); + } + batch.addAll(batchToApply); + } + } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java new file mode 100644 index 0000000000..9c90875950 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java @@ -0,0 +1,135 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes; + +import org.jboss.logging.Logger; +import org.keycloak.models.KeycloakSessionFactory; +import org.keycloak.models.UserSessionProvider; +import org.keycloak.models.sessions.infinispan.PersistentUserSessionProvider; +import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; +import org.keycloak.models.sessions.infinispan.entities.SessionEntity; +import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; +import org.keycloak.models.utils.KeycloakModelUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Run one thread per session type and drain the queues once there is an entry. Will batch entries if possible. + * + * @author Alexander Schwartz + */ +public class PersistentSessionsWorker { + private static final Logger LOG = Logger.getLogger(PersistentSessionsWorker.class); + + private final KeycloakSessionFactory factory; + private final ArrayBlockingQueue> asyncQueueUserSessions; + private final ArrayBlockingQueue> asyncQueueUserOfflineSessions; + private final ArrayBlockingQueue> asyncQueueClientSessions; + private final ArrayBlockingQueue> asyncQueueClientOfflineSessions; + private final List threads = new ArrayList<>(); + private volatile boolean stop; + + public PersistentSessionsWorker(KeycloakSessionFactory factory, ArrayBlockingQueue> asyncQueueUserSessions, ArrayBlockingQueue> asyncQueueUserOfflineSessions, ArrayBlockingQueue> asyncQueueClientSessions, ArrayBlockingQueue> asyncQueueClientOfflineSessions) { + this.factory = factory; + this.asyncQueueUserSessions = asyncQueueUserSessions; + this.asyncQueueUserOfflineSessions = asyncQueueUserOfflineSessions; + this.asyncQueueClientSessions = asyncQueueClientSessions; + this.asyncQueueClientOfflineSessions = asyncQueueClientOfflineSessions; + } + + public void start() { + threads.add(new WorkerUserSession(asyncQueueUserSessions, false)); + threads.add(new WorkerUserSession(asyncQueueUserOfflineSessions, true)); + threads.add(new WorkerClientSession(asyncQueueClientSessions, false)); + threads.add(new WorkerClientSession(asyncQueueClientOfflineSessions, true)); + threads.forEach(Thread::start); + } + + private class WorkerUserSession extends Worker { + public WorkerUserSession(ArrayBlockingQueue> queue, boolean offline) { + super(queue, offline, PersistentUserSessionProvider::processDeferredUserSessionElements); + } + } + + private class WorkerClientSession extends Worker { + public WorkerClientSession(ArrayBlockingQueue> queue, boolean offline) { + super(queue, offline, PersistentUserSessionProvider::processDeferredClientSessionElements); + } + } + + private class Worker extends Thread { + private final ArrayBlockingQueue> queue; + private final boolean offline; + private final Adapter adapter; + + public Worker(ArrayBlockingQueue> queue, boolean offline, Adapter adapter) { + this.queue = queue; + this.offline = offline; + this.adapter = adapter; + } + + public void run() { + Thread.currentThread().setName(this.getClass().getName() + " for " + (offline ? "offline" : "online") + " sessions"); + while (!stop) { + try { + process(queue, offline); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + + private void process(ArrayBlockingQueue> queue, boolean offline) throws InterruptedException { + Collection> batch = new ArrayList<>(); + PersistentDeferredElement polled = queue.poll(100, TimeUnit.MILLISECONDS); + if (polled != null) { + batch.add(polled); + queue.drainTo(batch, 99); + try { + LOG.debugf("Processing %d deferred session updates.", batch.size()); + KeycloakModelUtils.runJobInTransaction(factory, + session -> adapter.run(((PersistentUserSessionProvider) session.getProvider(UserSessionProvider.class)), batch, offline)); + } catch (RuntimeException ex) { + LOG.warnf(ex, "Unable to write %d deferred session updates", queue.size()); + } + } + } + + interface Adapter { + void run(PersistentUserSessionProvider sessionProvider, Collection> batch, boolean offline); + } + } + + public void stop() { + stop = true; + threads.forEach(Thread::interrupt); + threads.forEach(t -> { + try { + t.join(TimeUnit.MINUTES.toMillis(1)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java index 4a71507e5d..234d8a312e 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/SessionUpdateTask.java @@ -30,6 +30,10 @@ public interface SessionUpdateTask { CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper sessionWrapper); + default boolean isDeferrable() { + return false; + } + enum CacheOperation { ADD, diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java index ea6b5e8fbf..99bcf96ac5 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/UserSessionPersistentChangelogBasedTransaction.java @@ -31,8 +31,8 @@ import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker; -import java.util.Collections; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME; @@ -42,8 +42,8 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction { private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class); - public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer) { - super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer); + public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction lifespanMsLoader, SessionFunction maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey serializer, ArrayBlockingQueue> asyncQueue) { + super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue); } public SessionEntityWrapper get(RealmModel realm, String key) { @@ -93,12 +93,7 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe return null; } - SessionEntityWrapper userSessionEntitySessionEntityWrapper = importUserSession(persistentUserSession); - if (userSessionEntitySessionEntityWrapper == null) { - removeSessionEntityFromPersister(key); - } - - return userSessionEntitySessionEntityWrapper; + return importUserSession(persistentUserSession); } private void removeSessionEntityFromPersister(String key) { diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java index 60fac5d669..546710a9d7 100644 --- a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/SessionTimeoutsTest.java @@ -21,6 +21,7 @@ import org.junit.Assert; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import org.keycloak.common.util.Retry; import org.keycloak.common.util.Time; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.AuthenticatedClientSessionModel; @@ -263,6 +264,7 @@ public class SessionTimeoutsTest extends KeycloakModelTest { for (int i = 0; i < refreshTimes; i++) { offset += 1500; setTimeOffset(offset); + int time = Time.currentTime(); withRealm(realmId, (session, realm) -> { // refresh sessions before user session expires => both session should exist ClientModel client = realm.getClientByClientId("test-app"); @@ -270,10 +272,24 @@ public class SessionTimeoutsTest extends KeycloakModelTest { Assert.assertNotNull(userSession); AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId()); Assert.assertNotNull(clientSession); - userSession.setLastSessionRefresh(Time.currentTime()); - clientSession.setTimestamp(Time.currentTime()); + userSession.setLastSessionRefresh(time); + clientSession.setTimestamp(time); return null; }); + // The persistent session will write the update data asynchronously, wait for it to arrive. + Retry.executeWithBackoff(iteration -> { + withRealm(realmId, (session, realm) -> { + // refresh sessions before user session expires => both session should exist + ClientModel client = realm.getClientByClientId("test-app"); + UserSessionModel userSession = getUserSession(session, realm, sessions[0], offline); + Assert.assertNotNull(userSession); + AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId()); + Assert.assertNotNull(clientSession); + Assert.assertEquals(userSession.getLastSessionRefresh(), time); + Assert.assertEquals(clientSession.getTimestamp(), time); + return null; + }); + }, 10, 10); } offset += 2100;