Optimize CPU cycles for persistent sessions (#31702)
Closes #31701 Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
parent
94784182df
commit
00d8e06f79
2 changed files with 67 additions and 48 deletions
|
@ -41,13 +41,17 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
|
||||||
protected final KeycloakSession kcSession;
|
protected final KeycloakSession kcSession;
|
||||||
protected final Map<K, SessionUpdatesList<V>> updates = new HashMap<>();
|
protected final Map<K, SessionUpdatesList<V>> updates = new HashMap<>();
|
||||||
protected final Map<K, SessionUpdatesList<V>> offlineUpdates = new HashMap<>();
|
protected final Map<K, SessionUpdatesList<V>> offlineUpdates = new HashMap<>();
|
||||||
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
private final String cacheName;
|
||||||
private final Cache<K, SessionEntityWrapper<V>> cache;
|
private final Cache<K, SessionEntityWrapper<V>> cache;
|
||||||
private final Cache<K, SessionEntityWrapper<V>> offlineCache;
|
private final Cache<K, SessionEntityWrapper<V>> offlineCache;
|
||||||
|
private final RemoteCacheInvoker remoteCacheInvoker;
|
||||||
private final SessionFunction<V> lifespanMsLoader;
|
private final SessionFunction<V> lifespanMsLoader;
|
||||||
private final SessionFunction<V> maxIdleTimeMsLoader;
|
private final SessionFunction<V> maxIdleTimeMsLoader;
|
||||||
private final SessionFunction<V> offlineLifespanMsLoader;
|
private final SessionFunction<V> offlineLifespanMsLoader;
|
||||||
private final SessionFunction<V> offlineMaxIdleTimeMsLoader;
|
private final SessionFunction<V> offlineMaxIdleTimeMsLoader;
|
||||||
|
private final ArrayBlockingQueue<PersistentUpdate> batchingQueue;
|
||||||
|
private final SerializeExecutionsByKey<K> serializerOnline;
|
||||||
|
private final SerializeExecutionsByKey<K> serializerOffline;
|
||||||
|
|
||||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session,
|
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session,
|
||||||
String cacheName,
|
String cacheName,
|
||||||
|
@ -62,57 +66,17 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
|
||||||
SerializeExecutionsByKey<K> serializerOnline,
|
SerializeExecutionsByKey<K> serializerOnline,
|
||||||
SerializeExecutionsByKey<K> serializerOffline) {
|
SerializeExecutionsByKey<K> serializerOffline) {
|
||||||
kcSession = session;
|
kcSession = session;
|
||||||
|
this.cacheName = cacheName;
|
||||||
changesPerformers = new LinkedList<>();
|
|
||||||
|
|
||||||
if (batchingQueue != null) {
|
|
||||||
changesPerformers.add(new JpaChangesPerformer<>(cacheName, batchingQueue));
|
|
||||||
} else {
|
|
||||||
changesPerformers.add(new JpaChangesPerformer<>(cacheName, null) {
|
|
||||||
@Override
|
|
||||||
public void applyChanges() {
|
|
||||||
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
|
|
||||||
super::applyChangesSynchronously);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cache != null) {
|
|
||||||
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) {
|
|
||||||
@Override
|
|
||||||
public boolean shouldConsumeChange(V entity) {
|
|
||||||
return !entity.isOffline();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
changesPerformers.add(new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) {
|
|
||||||
@Override
|
|
||||||
public boolean shouldConsumeChange(V entity) {
|
|
||||||
return !entity.isOffline();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (offlineCache != null) {
|
|
||||||
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){
|
|
||||||
@Override
|
|
||||||
public boolean shouldConsumeChange(V entity) {
|
|
||||||
return entity.isOffline();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
changesPerformers.add(new RemoteCachesChangesPerformer<>(session, offlineCache, remoteCacheInvoker) {
|
|
||||||
@Override
|
|
||||||
public boolean shouldConsumeChange(V entity) {
|
|
||||||
return entity.isOffline();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
this.offlineCache = offlineCache;
|
this.offlineCache = offlineCache;
|
||||||
|
this.remoteCacheInvoker = remoteCacheInvoker;
|
||||||
this.lifespanMsLoader = lifespanMsLoader;
|
this.lifespanMsLoader = lifespanMsLoader;
|
||||||
this.maxIdleTimeMsLoader = maxIdleTimeMsLoader;
|
this.maxIdleTimeMsLoader = maxIdleTimeMsLoader;
|
||||||
this.offlineLifespanMsLoader = offlineLifespanMsLoader;
|
this.offlineLifespanMsLoader = offlineLifespanMsLoader;
|
||||||
this.offlineMaxIdleTimeMsLoader = offlineMaxIdleTimeMsLoader;
|
this.offlineMaxIdleTimeMsLoader = offlineMaxIdleTimeMsLoader;
|
||||||
|
this.batchingQueue = batchingQueue;
|
||||||
|
this.serializerOnline = serializerOnline;
|
||||||
|
this.serializerOffline = serializerOffline;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Cache<K, SessionEntityWrapper<V>> getCache(boolean offline) {
|
protected Cache<K, SessionEntityWrapper<V>> getCache(boolean offline) {
|
||||||
|
@ -174,8 +138,57 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<SessionChangesPerformer<K, V>> prepareChangesPerformers() {
|
||||||
|
List<SessionChangesPerformer<K, V>> changesPerformers = new LinkedList<>();
|
||||||
|
|
||||||
|
if (batchingQueue != null) {
|
||||||
|
changesPerformers.add(new JpaChangesPerformer<>(cacheName, batchingQueue));
|
||||||
|
} else {
|
||||||
|
changesPerformers.add(new JpaChangesPerformer<>(cacheName, null) {
|
||||||
|
@Override
|
||||||
|
public void applyChanges() {
|
||||||
|
KeycloakModelUtils.runJobInTransaction(kcSession.getKeycloakSessionFactory(),
|
||||||
|
super::applyChangesSynchronously);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cache != null) {
|
||||||
|
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) {
|
||||||
|
@Override
|
||||||
|
public boolean shouldConsumeChange(V entity) {
|
||||||
|
return !entity.isOffline();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
changesPerformers.add(new RemoteCachesChangesPerformer<>(kcSession, cache, remoteCacheInvoker) {
|
||||||
|
@Override
|
||||||
|
public boolean shouldConsumeChange(V entity) {
|
||||||
|
return !entity.isOffline();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (offlineCache != null) {
|
||||||
|
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){
|
||||||
|
@Override
|
||||||
|
public boolean shouldConsumeChange(V entity) {
|
||||||
|
return entity.isOffline();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
changesPerformers.add(new RemoteCachesChangesPerformer<>(kcSession, offlineCache, remoteCacheInvoker) {
|
||||||
|
@Override
|
||||||
|
public boolean shouldConsumeChange(V entity) {
|
||||||
|
return entity.isOffline();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return changesPerformers;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void commitImpl() {
|
protected void commitImpl() {
|
||||||
|
List<SessionChangesPerformer<K, V>> changesPerformers = null;
|
||||||
for (Map.Entry<K, SessionUpdatesList<V>> entry : Stream.concat(updates.entrySet().stream(), offlineUpdates.entrySet().stream()).toList()) {
|
for (Map.Entry<K, SessionUpdatesList<V>> entry : Stream.concat(updates.entrySet().stream(), offlineUpdates.entrySet().stream()).toList()) {
|
||||||
SessionUpdatesList<V> sessionUpdates = entry.getValue();
|
SessionUpdatesList<V> sessionUpdates = entry.getValue();
|
||||||
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
|
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
|
||||||
|
@ -193,13 +206,18 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
|
||||||
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
||||||
|
|
||||||
if (merged != null) {
|
if (merged != null) {
|
||||||
|
if (changesPerformers == null) {
|
||||||
|
changesPerformers = prepareChangesPerformers();
|
||||||
|
}
|
||||||
changesPerformers.stream()
|
changesPerformers.stream()
|
||||||
.filter(performer -> performer.shouldConsumeChange(entity))
|
.filter(performer -> performer.shouldConsumeChange(entity))
|
||||||
.forEach(p -> p.registerChange(entry, merged));
|
.forEach(p -> p.registerChange(entry, merged));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
if (changesPerformers != null) {
|
||||||
|
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -76,7 +76,8 @@ public class PersistentSessionsWorker {
|
||||||
|
|
||||||
private void process(ArrayBlockingQueue<PersistentUpdate> queue) throws InterruptedException {
|
private void process(ArrayBlockingQueue<PersistentUpdate> queue) throws InterruptedException {
|
||||||
ArrayList<PersistentUpdate> batch = new ArrayList<>();
|
ArrayList<PersistentUpdate> batch = new ArrayList<>();
|
||||||
PersistentUpdate polled = queue.poll(100, TimeUnit.MILLISECONDS);
|
// Timeout is only a backup if interrupting the worker task in the stop() method didn't work as expected because someone else swallowed the interrupted flag.
|
||||||
|
PersistentUpdate polled = queue.poll(1, TimeUnit.SECONDS);
|
||||||
if (polled != null) {
|
if (polled != null) {
|
||||||
batch.add(polled);
|
batch.add(polled);
|
||||||
queue.drainTo(batch, maxBatchSize - 1);
|
queue.drainTo(batch, maxBatchSize - 1);
|
||||||
|
|
Loading…
Reference in a new issue