Defer updates of last session updates and batch them (#28502)
Defer updates of last session refreshes and batch them Signed-off-by: Alexander Schwartz <aschwart@redhat.com> Signed-off-by: Michal Hajas <mhajas@redhat.com> Co-authored-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
parent
a8d1d6edd7
commit
13af4f44f5
13 changed files with 313 additions and 25 deletions
|
@ -144,6 +144,9 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runUpdate(AuthenticatedClientSessionEntity entity) {
|
public void runUpdate(AuthenticatedClientSessionEntity entity) {
|
||||||
|
if (entity.getTimestamp() >= timestamp) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
entity.setTimestamp(timestamp);
|
entity.setTimestamp(timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,6 +156,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
||||||
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
|
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDeferrable() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "setTimestamp(" + timestamp + ')';
|
return "setTimestamp(" + timestamp + ')';
|
||||||
|
|
|
@ -36,6 +36,8 @@ import org.keycloak.models.UserModel;
|
||||||
import org.keycloak.models.UserSessionProvider;
|
import org.keycloak.models.UserSessionProvider;
|
||||||
import org.keycloak.models.UserSessionProviderFactory;
|
import org.keycloak.models.UserSessionProviderFactory;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||||
|
import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement;
|
||||||
|
import org.keycloak.models.sessions.infinispan.changes.PersistentSessionsWorker;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
|
||||||
|
@ -67,6 +69,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory.PROVIDER_PRIORITY;
|
import static org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory.PROVIDER_PRIORITY;
|
||||||
|
@ -98,6 +101,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
|
SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
|
||||||
SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
|
SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
|
||||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
|
SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions = new ArrayBlockingQueue<>(1000);
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions = new ArrayBlockingQueue<>(1000);
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions = new ArrayBlockingQueue<>(1000);
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions = new ArrayBlockingQueue<>(1000);
|
||||||
|
private PersistentSessionsWorker persistentSessionsWorker;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public UserSessionProvider create(KeycloakSession session) {
|
public UserSessionProvider create(KeycloakSession session) {
|
||||||
|
@ -124,7 +132,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
serializerSession,
|
serializerSession,
|
||||||
serializerOfflineSession,
|
serializerOfflineSession,
|
||||||
serializerClientSession,
|
serializerClientSession,
|
||||||
serializerOfflineClientSession
|
serializerOfflineClientSession,
|
||||||
|
asyncQueueUserSessions,
|
||||||
|
asyncQueueUserOfflineSessions,
|
||||||
|
asyncQueueClientSessions,
|
||||||
|
asyncQueueClientOfflineSessions
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
return new InfinispanUserSessionProvider(
|
return new InfinispanUserSessionProvider(
|
||||||
|
@ -200,6 +212,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
persistentSessionsWorker = new PersistentSessionsWorker(factory, asyncQueueUserSessions,
|
||||||
|
asyncQueueUserOfflineSessions,
|
||||||
|
asyncQueueClientSessions,
|
||||||
|
asyncQueueClientOfflineSessions);
|
||||||
|
persistentSessionsWorker.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Max count of worker errors. Initialization will end with exception when this number is reached
|
// Max count of worker errors. Initialization will end with exception when this number is reached
|
||||||
|
@ -419,6 +436,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
persistentSessionsWorker.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.keycloak.models.light.LightweightUserAdapter;
|
||||||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
|
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||||
|
import org.keycloak.models.sessions.infinispan.changes.PersistentDeferredElement;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
||||||
|
@ -73,6 +74,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -133,7 +135,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
||||||
SerializeExecutionsByKey<String> serializerSession,
|
SerializeExecutionsByKey<String> serializerSession,
|
||||||
SerializeExecutionsByKey<String> serializerOfflineSession,
|
SerializeExecutionsByKey<String> serializerOfflineSession,
|
||||||
SerializeExecutionsByKey<UUID> serializerClientSession,
|
SerializeExecutionsByKey<UUID> serializerClientSession,
|
||||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession) {
|
SerializeExecutionsByKey<UUID> serializerOfflineClientSession,
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions,
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions,
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions,
|
||||||
|
ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions) {
|
||||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||||
}
|
}
|
||||||
|
@ -145,11 +151,11 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
||||||
this.offlineSessionCache = offlineSessionCache;
|
this.offlineSessionCache = offlineSessionCache;
|
||||||
this.offlineClientSessionCache = offlineClientSessionCache;
|
this.offlineClientSessionCache = offlineClientSessionCache;
|
||||||
|
|
||||||
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession);
|
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession, asyncQueueUserSessions);
|
||||||
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession);
|
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, offlineSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession, asyncQueueUserOfflineSessions);
|
||||||
|
|
||||||
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession);
|
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession, asyncQueueClientSessions);
|
||||||
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession);
|
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, offlineClientSessionCacheEntryLifespanAdjuster, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession, asyncQueueClientOfflineSessions);
|
||||||
|
|
||||||
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
||||||
|
|
||||||
|
@ -1049,6 +1055,16 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
||||||
return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
|
return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void processDeferredUserSessionElements(Collection<PersistentDeferredElement<String, UserSessionEntity>> batch, boolean offline) {
|
||||||
|
UserSessionPersistentChangelogBasedTransaction transaction = getTransaction(offline);
|
||||||
|
transaction.applyDeferredBatch(batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processDeferredClientSessionElements(Collection<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> batch, boolean offline) {
|
||||||
|
ClientSessionPersistentChangelogBasedTransaction transaction = getClientSessionTransaction(offline);
|
||||||
|
transaction.applyDeferredBatch(batch);
|
||||||
|
}
|
||||||
|
|
||||||
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
|
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
|
||||||
|
|
||||||
private final String clientUuid;
|
private final String clientUuid;
|
||||||
|
|
|
@ -238,6 +238,9 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runUpdate(UserSessionEntity entity) {
|
public void runUpdate(UserSessionEntity entity) {
|
||||||
|
if (entity.getLastSessionRefresh() >= lastSessionRefresh) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
entity.setLastSessionRefresh(lastSessionRefresh);
|
entity.setLastSessionRefresh(lastSessionRefresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,6 +250,11 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
||||||
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
|
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDeferrable() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "setLastSessionRefresh(" + lastSessionRefresh + ')';
|
return "setLastSessionRefresh(" + lastSessionRefresh + ')';
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanKeyGenerator;
|
||||||
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
|
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
|
||||||
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class ClientSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> {
|
public class ClientSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> {
|
||||||
|
@ -44,8 +45,9 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
||||||
private final InfinispanKeyGenerator keyGenerator;
|
private final InfinispanKeyGenerator keyGenerator;
|
||||||
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
|
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
|
||||||
|
|
||||||
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator, UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey<UUID> serializer) {
|
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator,
|
||||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer);
|
UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey<UUID> serializer, ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueue) {
|
||||||
|
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue);
|
||||||
this.keyGenerator = keyGenerator;
|
this.keyGenerator = keyGenerator;
|
||||||
this.userSessionTx = userSessionTx;
|
this.userSessionTx = userSessionTx;
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,10 +82,12 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyChanges() {
|
public void applyChanges() {
|
||||||
|
if (changes.size() > 0) {
|
||||||
Retry.executeWithBackoff(iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
|
Retry.executeWithBackoff(iteration -> KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
|
||||||
innerSession -> changes.forEach(c -> c.accept(innerSession))),
|
innerSession -> changes.forEach(c -> c.accept(innerSession))),
|
||||||
10, 10);
|
10, 10);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void processClientSessionUpdate(KeycloakSession innerSession, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
private void processClientSessionUpdate(KeycloakSession innerSession, Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
||||||
SessionUpdatesList<V> sessionUpdates = entry.getValue();
|
SessionUpdatesList<V> sessionUpdates = entry.getValue();
|
||||||
|
|
|
@ -36,7 +36,12 @@ public class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<
|
||||||
private CrossDCMessageStatus crossDCMessageStatus;
|
private CrossDCMessageStatus crossDCMessageStatus;
|
||||||
private final long lifespanMs;
|
private final long lifespanMs;
|
||||||
private final long maxIdleTimeMs;
|
private final long maxIdleTimeMs;
|
||||||
|
private boolean isDeferrable;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDeferrable() {
|
||||||
|
return isDeferrable;
|
||||||
|
}
|
||||||
|
|
||||||
private MergedUpdate(CacheOperation operation, CrossDCMessageStatus crossDCMessageStatus, long lifespanMs, long maxIdleTimeMs) {
|
private MergedUpdate(CacheOperation operation, CrossDCMessageStatus crossDCMessageStatus, long lifespanMs, long maxIdleTimeMs) {
|
||||||
this.operation = operation;
|
this.operation = operation;
|
||||||
|
@ -78,7 +83,11 @@ public class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<
|
||||||
|
|
||||||
MergedUpdate<S> result = null;
|
MergedUpdate<S> result = null;
|
||||||
S session = sessionWrapper.getEntity();
|
S session = sessionWrapper.getEntity();
|
||||||
|
boolean isDeferrable = true;
|
||||||
for (SessionUpdateTask<S> child : childUpdates) {
|
for (SessionUpdateTask<S> child : childUpdates) {
|
||||||
|
if (!child.isDeferrable()) {
|
||||||
|
isDeferrable = false;
|
||||||
|
}
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
CacheOperation operation = child.getOperation(session);
|
CacheOperation operation = child.getOperation(session);
|
||||||
|
|
||||||
|
@ -114,10 +123,16 @@ public class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<
|
||||||
result.childUpdates.add(child);
|
result.childUpdates.add(child);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (result != null) {
|
||||||
|
result.setDeferable(isDeferrable);
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setDeferable(boolean isDeferrable) {
|
||||||
|
this.isDeferrable = isDeferrable;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MergedUpdate" + childUpdates;
|
return "MergedUpdate" + childUpdates;
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2024 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Capture information for a deferred update of the session stores.
|
||||||
|
*
|
||||||
|
* @author Alexander Schwartz
|
||||||
|
*/
|
||||||
|
public class PersistentDeferredElement<K, V extends SessionEntity> {
|
||||||
|
private final Map.Entry<K, SessionUpdatesList<V>> entry;
|
||||||
|
private final MergedUpdate<V> merged;
|
||||||
|
|
||||||
|
public PersistentDeferredElement(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
||||||
|
this.entry = entry;
|
||||||
|
this.merged = merged;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map.Entry<K, SessionUpdatesList<V>> getEntry() {
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
public MergedUpdate<V> getMerged() {
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,8 +26,11 @@ import org.keycloak.models.sessions.infinispan.SessionFunction;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
||||||
|
@ -38,10 +41,13 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
|
|
||||||
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
||||||
protected final boolean offline;
|
protected final boolean offline;
|
||||||
|
private final ArrayBlockingQueue<PersistentDeferredElement<K, V>> asyncQueue;
|
||||||
|
private Collection<PersistentDeferredElement<K, V>> batch;
|
||||||
|
|
||||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<K> serializer) {
|
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<K> serializer, ArrayBlockingQueue<PersistentDeferredElement<K, V>> asyncQueue) {
|
||||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer);
|
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer);
|
||||||
this.offline = offline;
|
this.offline = offline;
|
||||||
|
this.asyncQueue = asyncQueue;
|
||||||
|
|
||||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||||
|
@ -78,9 +84,19 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
||||||
|
|
||||||
if (merged != null) {
|
if (merged != null) {
|
||||||
|
if (merged.isDeferrable()) {
|
||||||
|
asyncQueue.add(new PersistentDeferredElement<>(entry, merged));
|
||||||
|
} else {
|
||||||
changesPerformers.forEach(p -> p.registerChange(entry, merged));
|
changesPerformers.forEach(p -> p.registerChange(entry, merged));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (batch != null) {
|
||||||
|
batch.forEach(o -> {
|
||||||
|
changesPerformers.forEach(p -> p.registerChange(o.getEntry(), o.getMerged()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
||||||
}
|
}
|
||||||
|
@ -89,4 +105,12 @@ public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEnt
|
||||||
protected void rollbackImpl() {
|
protected void rollbackImpl() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void applyDeferredBatch(Collection<PersistentDeferredElement<K, V>> batchToApply) {
|
||||||
|
if (this.batch == null) {
|
||||||
|
this.batch = new ArrayList<>(batchToApply.size());
|
||||||
|
}
|
||||||
|
batch.addAll(batchToApply);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,135 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2024 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
|
import org.jboss.logging.Logger;
|
||||||
|
import org.keycloak.models.KeycloakSessionFactory;
|
||||||
|
import org.keycloak.models.UserSessionProvider;
|
||||||
|
import org.keycloak.models.sessions.infinispan.PersistentUserSessionProvider;
|
||||||
|
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||||
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
|
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||||
|
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run one thread per session type and drain the queues once there is an entry. Will batch entries if possible.
|
||||||
|
*
|
||||||
|
* @author Alexander Schwartz
|
||||||
|
*/
|
||||||
|
public class PersistentSessionsWorker {
|
||||||
|
private static final Logger LOG = Logger.getLogger(PersistentSessionsWorker.class);
|
||||||
|
|
||||||
|
private final KeycloakSessionFactory factory;
|
||||||
|
private final ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions;
|
||||||
|
private final ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions;
|
||||||
|
private final ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions;
|
||||||
|
private final ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions;
|
||||||
|
private final List<Thread> threads = new ArrayList<>();
|
||||||
|
private volatile boolean stop;
|
||||||
|
|
||||||
|
public PersistentSessionsWorker(KeycloakSessionFactory factory, ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserSessions, ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueueUserOfflineSessions, ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientSessions, ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> asyncQueueClientOfflineSessions) {
|
||||||
|
this.factory = factory;
|
||||||
|
this.asyncQueueUserSessions = asyncQueueUserSessions;
|
||||||
|
this.asyncQueueUserOfflineSessions = asyncQueueUserOfflineSessions;
|
||||||
|
this.asyncQueueClientSessions = asyncQueueClientSessions;
|
||||||
|
this.asyncQueueClientOfflineSessions = asyncQueueClientOfflineSessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() {
|
||||||
|
threads.add(new WorkerUserSession(asyncQueueUserSessions, false));
|
||||||
|
threads.add(new WorkerUserSession(asyncQueueUserOfflineSessions, true));
|
||||||
|
threads.add(new WorkerClientSession(asyncQueueClientSessions, false));
|
||||||
|
threads.add(new WorkerClientSession(asyncQueueClientOfflineSessions, true));
|
||||||
|
threads.forEach(Thread::start);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class WorkerUserSession extends Worker<String, UserSessionEntity> {
|
||||||
|
public WorkerUserSession(ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> queue, boolean offline) {
|
||||||
|
super(queue, offline, PersistentUserSessionProvider::processDeferredUserSessionElements);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class WorkerClientSession extends Worker<UUID, AuthenticatedClientSessionEntity> {
|
||||||
|
public WorkerClientSession(ArrayBlockingQueue<PersistentDeferredElement<UUID, AuthenticatedClientSessionEntity>> queue, boolean offline) {
|
||||||
|
super(queue, offline, PersistentUserSessionProvider::processDeferredClientSessionElements);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Worker<K, V extends SessionEntity> extends Thread {
|
||||||
|
private final ArrayBlockingQueue<PersistentDeferredElement<K, V>> queue;
|
||||||
|
private final boolean offline;
|
||||||
|
private final Adapter<K, V> adapter;
|
||||||
|
|
||||||
|
public Worker(ArrayBlockingQueue<PersistentDeferredElement<K, V>> queue, boolean offline, Adapter<K, V> adapter) {
|
||||||
|
this.queue = queue;
|
||||||
|
this.offline = offline;
|
||||||
|
this.adapter = adapter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
Thread.currentThread().setName(this.getClass().getName() + " for " + (offline ? "offline" : "online") + " sessions");
|
||||||
|
while (!stop) {
|
||||||
|
try {
|
||||||
|
process(queue, offline);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void process(ArrayBlockingQueue<PersistentDeferredElement<K, V>> queue, boolean offline) throws InterruptedException {
|
||||||
|
Collection<PersistentDeferredElement<K, V>> batch = new ArrayList<>();
|
||||||
|
PersistentDeferredElement<K, V> polled = queue.poll(100, TimeUnit.MILLISECONDS);
|
||||||
|
if (polled != null) {
|
||||||
|
batch.add(polled);
|
||||||
|
queue.drainTo(batch, 99);
|
||||||
|
try {
|
||||||
|
LOG.debugf("Processing %d deferred session updates.", batch.size());
|
||||||
|
KeycloakModelUtils.runJobInTransaction(factory,
|
||||||
|
session -> adapter.run(((PersistentUserSessionProvider) session.getProvider(UserSessionProvider.class)), batch, offline));
|
||||||
|
} catch (RuntimeException ex) {
|
||||||
|
LOG.warnf(ex, "Unable to write %d deferred session updates", queue.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Adapter<K, V extends SessionEntity> {
|
||||||
|
void run(PersistentUserSessionProvider sessionProvider, Collection<PersistentDeferredElement<K, V>> batch, boolean offline);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
stop = true;
|
||||||
|
threads.forEach(Thread::interrupt);
|
||||||
|
threads.forEach(t -> {
|
||||||
|
try {
|
||||||
|
t.join(TimeUnit.MINUTES.toMillis(1));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,6 +30,10 @@ public interface SessionUpdateTask<S extends SessionEntity> {
|
||||||
|
|
||||||
CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<S> sessionWrapper);
|
CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<S> sessionWrapper);
|
||||||
|
|
||||||
|
default boolean isDeferrable() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
enum CacheOperation {
|
enum CacheOperation {
|
||||||
|
|
||||||
ADD,
|
ADD,
|
||||||
|
|
|
@ -31,8 +31,8 @@ import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
||||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
||||||
|
@ -42,8 +42,8 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U
|
||||||
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
|
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
|
||||||
|
|
||||||
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
|
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
|
||||||
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<String> serializer) {
|
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<String> serializer, ArrayBlockingQueue<PersistentDeferredElement<String, UserSessionEntity>> asyncQueue) {
|
||||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer);
|
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, asyncQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key) {
|
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key) {
|
||||||
|
@ -93,12 +93,7 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
SessionEntityWrapper<UserSessionEntity> userSessionEntitySessionEntityWrapper = importUserSession(persistentUserSession);
|
return importUserSession(persistentUserSession);
|
||||||
if (userSessionEntitySessionEntityWrapper == null) {
|
|
||||||
removeSessionEntityFromPersister(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
return userSessionEntitySessionEntityWrapper;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeSessionEntityFromPersister(String key) {
|
private void removeSessionEntityFromPersister(String key) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.junit.Assert;
|
||||||
import org.junit.FixMethodOrder;
|
import org.junit.FixMethodOrder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runners.MethodSorters;
|
import org.junit.runners.MethodSorters;
|
||||||
|
import org.keycloak.common.util.Retry;
|
||||||
import org.keycloak.common.util.Time;
|
import org.keycloak.common.util.Time;
|
||||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||||
import org.keycloak.models.AuthenticatedClientSessionModel;
|
import org.keycloak.models.AuthenticatedClientSessionModel;
|
||||||
|
@ -263,6 +264,7 @@ public class SessionTimeoutsTest extends KeycloakModelTest {
|
||||||
for (int i = 0; i < refreshTimes; i++) {
|
for (int i = 0; i < refreshTimes; i++) {
|
||||||
offset += 1500;
|
offset += 1500;
|
||||||
setTimeOffset(offset);
|
setTimeOffset(offset);
|
||||||
|
int time = Time.currentTime();
|
||||||
withRealm(realmId, (session, realm) -> {
|
withRealm(realmId, (session, realm) -> {
|
||||||
// refresh sessions before user session expires => both session should exist
|
// refresh sessions before user session expires => both session should exist
|
||||||
ClientModel client = realm.getClientByClientId("test-app");
|
ClientModel client = realm.getClientByClientId("test-app");
|
||||||
|
@ -270,10 +272,24 @@ public class SessionTimeoutsTest extends KeycloakModelTest {
|
||||||
Assert.assertNotNull(userSession);
|
Assert.assertNotNull(userSession);
|
||||||
AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId());
|
AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId());
|
||||||
Assert.assertNotNull(clientSession);
|
Assert.assertNotNull(clientSession);
|
||||||
userSession.setLastSessionRefresh(Time.currentTime());
|
userSession.setLastSessionRefresh(time);
|
||||||
clientSession.setTimestamp(Time.currentTime());
|
clientSession.setTimestamp(time);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
// The persistent session will write the update data asynchronously, wait for it to arrive.
|
||||||
|
Retry.executeWithBackoff(iteration -> {
|
||||||
|
withRealm(realmId, (session, realm) -> {
|
||||||
|
// refresh sessions before user session expires => both session should exist
|
||||||
|
ClientModel client = realm.getClientByClientId("test-app");
|
||||||
|
UserSessionModel userSession = getUserSession(session, realm, sessions[0], offline);
|
||||||
|
Assert.assertNotNull(userSession);
|
||||||
|
AuthenticatedClientSessionModel clientSession = userSession.getAuthenticatedClientSessionByClient(client.getId());
|
||||||
|
Assert.assertNotNull(clientSession);
|
||||||
|
Assert.assertEquals(userSession.getLastSessionRefresh(), time);
|
||||||
|
Assert.assertEquals(clientSession.getTimestamp(), time);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}, 10, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += 2100;
|
offset += 2100;
|
||||||
|
|
Loading…
Reference in a new issue