From 00d8e06f7995eafe061ac2d407869372d3be3c3e Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Mon, 29 Jul 2024 16:34:13 +0200 Subject: [PATCH] Optimize CPU cycles for persistent sessions (#31702) Closes #31701 Signed-off-by: Alexander Schwartz --- ...tentSessionsChangelogBasedTransaction.java | 112 ++++++++++-------- .../changes/PersistentSessionsWorker.java | 3 +- 2 files changed, 67 insertions(+), 48 deletions(-) diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java index c13fffcaa8..563c538824 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsChangelogBasedTransaction.java @@ -41,13 +41,17 @@ abstract public class PersistentSessionsChangelogBasedTransaction> updates = new HashMap<>(); protected final Map> offlineUpdates = new HashMap<>(); - private final List> changesPerformers; + private final String cacheName; private final Cache> cache; private final Cache> offlineCache; + private final RemoteCacheInvoker remoteCacheInvoker; private final SessionFunction lifespanMsLoader; private final SessionFunction maxIdleTimeMsLoader; private final SessionFunction offlineLifespanMsLoader; private final SessionFunction offlineMaxIdleTimeMsLoader; + private final ArrayBlockingQueue batchingQueue; + private final SerializeExecutionsByKey serializerOnline; + private final SerializeExecutionsByKey serializerOffline; public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, String cacheName, @@ -62,57 +66,17 @@ abstract public class PersistentSessionsChangelogBasedTransaction serializerOnline, SerializeExecutionsByKey serializerOffline) { kcSession = session; - - changesPerformers = new LinkedList<>(); - - if (batchingQueue != null) { - changesPerformers.add(new JpaChangesPerformer<>(cacheName, batchingQueue)); - } else { - changesPerformers.add(new JpaChangesPerformer<>(cacheName, null) { - @Override - public void applyChanges() { - KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), - super::applyChangesSynchronously); - } - }); - } - - if (cache != null) { - changesPerformers.add(new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) { - @Override - public boolean shouldConsumeChange(V entity) { - return !entity.isOffline(); - } - }); - changesPerformers.add(new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) { - @Override - public boolean shouldConsumeChange(V entity) { - return !entity.isOffline(); - } - }); - } - - if (offlineCache != null) { - changesPerformers.add(new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){ - @Override - public boolean shouldConsumeChange(V entity) { - return entity.isOffline(); - } - }); - changesPerformers.add(new RemoteCachesChangesPerformer<>(session, offlineCache, remoteCacheInvoker) { - @Override - public boolean shouldConsumeChange(V entity) { - return entity.isOffline(); - } - }); - } - + this.cacheName = cacheName; this.cache = cache; this.offlineCache = offlineCache; + this.remoteCacheInvoker = remoteCacheInvoker; this.lifespanMsLoader = lifespanMsLoader; this.maxIdleTimeMsLoader = maxIdleTimeMsLoader; this.offlineLifespanMsLoader = offlineLifespanMsLoader; this.offlineMaxIdleTimeMsLoader = offlineMaxIdleTimeMsLoader; + this.batchingQueue = batchingQueue; + this.serializerOnline = serializerOnline; + this.serializerOffline = serializerOffline; } protected Cache> getCache(boolean offline) { @@ -174,8 +138,57 @@ abstract public class PersistentSessionsChangelogBasedTransaction> prepareChangesPerformers() { + List> changesPerformers = new LinkedList<>(); + + if (batchingQueue != null) { + changesPerformers.add(new JpaChangesPerformer<>(cacheName, batchingQueue)); + } else { + changesPerformers.add(new JpaChangesPerformer<>(cacheName, null) { + @Override + public void applyChanges() { + KeycloakModelUtils.runJobInTransaction(kcSession.getKeycloakSessionFactory(), + super::applyChangesSynchronously); + } + }); + } + + if (cache != null) { + changesPerformers.add(new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) { + @Override + public boolean shouldConsumeChange(V entity) { + return !entity.isOffline(); + } + }); + changesPerformers.add(new RemoteCachesChangesPerformer<>(kcSession, cache, remoteCacheInvoker) { + @Override + public boolean shouldConsumeChange(V entity) { + return !entity.isOffline(); + } + }); + } + + if (offlineCache != null) { + changesPerformers.add(new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){ + @Override + public boolean shouldConsumeChange(V entity) { + return entity.isOffline(); + } + }); + changesPerformers.add(new RemoteCachesChangesPerformer<>(kcSession, offlineCache, remoteCacheInvoker) { + @Override + public boolean shouldConsumeChange(V entity) { + return entity.isOffline(); + } + }); + } + + return changesPerformers; + } + @Override protected void commitImpl() { + List> changesPerformers = null; for (Map.Entry> entry : Stream.concat(updates.entrySet().stream(), offlineUpdates.entrySet().stream()).toList()) { SessionUpdatesList sessionUpdates = entry.getValue(); SessionEntityWrapper sessionWrapper = sessionUpdates.getEntityWrapper(); @@ -193,13 +206,18 @@ abstract public class PersistentSessionsChangelogBasedTransaction merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs); if (merged != null) { + if (changesPerformers == null) { + changesPerformers = prepareChangesPerformers(); + } changesPerformers.stream() .filter(performer -> performer.shouldConsumeChange(entity)) .forEach(p -> p.registerChange(entry, merged)); } } - changesPerformers.forEach(SessionChangesPerformer::applyChanges); + if (changesPerformers != null) { + changesPerformers.forEach(SessionChangesPerformer::applyChanges); + } } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java index 4d9d0d7e6c..b29826c919 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/PersistentSessionsWorker.java @@ -76,7 +76,8 @@ public class PersistentSessionsWorker { private void process(ArrayBlockingQueue queue) throws InterruptedException { ArrayList batch = new ArrayList<>(); - PersistentUpdate polled = queue.poll(100, TimeUnit.MILLISECONDS); + // Timeout is only a backup if interrupting the worker task in the stop() method didn't work as expected because someone else swallowed the interrupted flag. + PersistentUpdate polled = queue.poll(1, TimeUnit.SECONDS); if (polled != null) { batch.add(polled); queue.drainTo(batch, maxBatchSize - 1);