From 0533782d90642ee5d7fca0723558cf55fe2bfa20 Mon Sep 17 00:00:00 2001 From: mposolda Date: Thu, 1 Nov 2018 13:54:32 +0100 Subject: [PATCH] KEYCLOAK-7275 KEYCLOAK-5479 Faster offline sessions preloading at startup. Track lastSessionRefresh timestamps more properly by support bulk update to DB --- .../AuthenticatedClientSessionAdapter.java | 5 +- .../InfinispanUserSessionProvider.java | 192 +++++++++++--- .../InfinispanUserSessionProviderFactory.java | 21 +- .../infinispan/UserSessionAdapter.java | 16 +- ...a => AbstractLastSessionRefreshStore.java} | 32 +-- ...stractLastSessionRefreshStoreFactory.java} | 34 +-- ... => CrossDCLastSessionRefreshChecker.java} | 17 +- ...=> CrossDCLastSessionRefreshListener.java} | 6 +- .../CrossDCLastSessionRefreshStore.java | 58 ++++ ...CrossDCLastSessionRefreshStoreFactory.java | 63 +++++ .../PersisterLastSessionRefreshStore.java | 77 ++++++ ...rsisterLastSessionRefreshStoreFactory.java | 49 ++++ .../InfinispanCacheInitializer.java | 73 +++-- ...va => OfflinePersistentLoaderContext.java} | 22 +- .../OfflinePersistentUserSessionLoader.java | 71 +++-- .../OfflinePersistentWorkerContext.java | 42 +++ .../OfflinePersistentWorkerResult.java | 44 +++ .../initializer/SessionInitializerWorker.java | 24 +- .../infinispan/initializer/SessionLoader.java | 108 +++++++- .../RemoteCacheSessionsLoader.java | 29 +- .../RemoteCacheSessionsLoaderContext.java | 25 +- .../RemoteCacheSessionsLoaderTest.java | 3 +- .../initializer/ConcurrencyLockingTest.java | 9 +- .../DistributedCacheConcurrentWritesTest.java | 250 ++++++++---------- .../initializer/InitializerStateTest.java | 11 +- ...Update4_7_0_OfflineSessionsTimestamps.java | 54 ++++ .../JpaUserSessionPersisterProvider.java | 134 +++++----- .../PersistentClientSessionEntity.java | 5 +- .../session/PersistentUserSessionEntity.java | 20 +- .../META-INF/jpa-changelog-4.7.0.xml | 25 +- .../DisabledUserSessionPersisterProvider.java | 12 +- .../session/PersistentUserSessionAdapter.java | 7 +- .../session/PersistentUserSessionModel.java | 9 + .../session/UserSessionPersisterProvider.java | 13 +- .../keycloak/models/UserSessionProvider.java | 7 +- .../keycloak/protocol/oidc/TokenManager.java | 2 +- .../scheduled/ClearExpiredUserSessions.java | 2 + .../rest/TestingResourceProvider.java | 6 +- .../session/LastSessionRefreshUnitTest.java | 18 +- .../model/UserSessionInitializerTest.java | 8 +- .../UserSessionPersisterProviderTest.java | 222 +++++++++------- .../model/UserSessionProviderOfflineTest.java | 53 ++-- .../cli/LoadPersistentSessionsCommand.java | 102 ++++++- .../util/cli/PersistSessionsCommand.java | 13 +- 44 files changed, 1381 insertions(+), 612 deletions(-) rename model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/{LastSessionRefreshStore.java => AbstractLastSessionRefreshStore.java} (68%) rename model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/{LastSessionRefreshStoreFactory.java => AbstractLastSessionRefreshStoreFactory.java} (51%) rename model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/{LastSessionRefreshChecker.java => CrossDCLastSessionRefreshChecker.java} (88%) rename model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/{LastSessionRefreshListener.java => CrossDCLastSessionRefreshListener.java} (92%) create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java rename model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/{OfflinePersistentUserSessionLoaderContext.java => OfflinePersistentLoaderContext.java} (75%) create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java create mode 100644 model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java index 6551bf8d91..a76639d3fb 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/AuthenticatedClientSessionAdapter.java @@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; @@ -33,7 +32,7 @@ import org.keycloak.models.sessions.infinispan.changes.ClientSessionUpdateTask; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.changes.Tasks; import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import java.util.UUID; @@ -150,7 +149,7 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes @Override public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper sessionWrapper) { - return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore()) + return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore()) .shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java index bf7b04f2d3..b1429a6391 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java @@ -19,10 +19,13 @@ package org.keycloak.models.sessions.infinispan; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.context.Flag; import org.infinispan.stream.CacheCollectors; import org.jboss.logging.Logger; import org.keycloak.cluster.ClusterProvider; +import org.keycloak.common.util.ObjectUtil; +import org.keycloak.common.util.Retry; import org.keycloak.common.util.Time; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; @@ -35,7 +38,8 @@ import org.keycloak.models.UserSessionModel; import org.keycloak.models.UserSessionProvider; import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.sessions.infinispan.changes.Tasks; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; +import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction; @@ -61,6 +65,8 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil; import org.keycloak.models.utils.SessionTimeoutHelper; import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -69,11 +75,11 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -100,15 +106,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { protected final SessionEventsSenderTransaction clusterEventsSenderTx; - protected final LastSessionRefreshStore lastSessionRefreshStore; - protected final LastSessionRefreshStore offlineLastSessionRefreshStore; + protected final CrossDCLastSessionRefreshStore lastSessionRefreshStore; + protected final CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore; + protected final PersisterLastSessionRefreshStore persisterLastSessionRefreshStore; + protected final RemoteCacheInvoker remoteCacheInvoker; protected final InfinispanKeyGenerator keyGenerator; public InfinispanUserSessionProvider(KeycloakSession session, RemoteCacheInvoker remoteCacheInvoker, - LastSessionRefreshStore lastSessionRefreshStore, - LastSessionRefreshStore offlineLastSessionRefreshStore, + CrossDCLastSessionRefreshStore lastSessionRefreshStore, + CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore, + PersisterLastSessionRefreshStore persisterLastSessionRefreshStore, InfinispanKeyGenerator keyGenerator, Cache> sessionCache, Cache> offlineSessionCache, @@ -134,6 +143,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { this.lastSessionRefreshStore = lastSessionRefreshStore; this.offlineLastSessionRefreshStore = offlineLastSessionRefreshStore; + this.persisterLastSessionRefreshStore = persisterLastSessionRefreshStore; + this.remoteCacheInvoker = remoteCacheInvoker; this.keyGenerator = keyGenerator; session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx); @@ -160,14 +171,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { return offline ? offlineClientSessionTx : clientSessionTx; } - protected LastSessionRefreshStore getLastSessionRefreshStore() { + protected CrossDCLastSessionRefreshStore getLastSessionRefreshStore() { return lastSessionRefreshStore; } - protected LastSessionRefreshStore getOfflineLastSessionRefreshStore() { + protected CrossDCLastSessionRefreshStore getOfflineLastSessionRefreshStore() { return offlineLastSessionRefreshStore; } + protected PersisterLastSessionRefreshStore getPersisterLastSessionRefreshStore() { + return persisterLastSessionRefreshStore; + } + @Override public AuthenticatedClientSessionModel createClientSession(RealmModel realm, ClientModel client, UserSessionModel userSession) { final UUID clientSessionId = keyGenerator.generateKeyUUID(session, clientSessionCache); @@ -535,7 +550,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { } private void removeExpiredOfflineUserSessions(RealmModel realm) { - UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class); int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS; // Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account) @@ -570,8 +584,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { futures.addTask(f); }); - // TODO:mposolda can be likely optimized to delete all expired at one step - persister.removeUserSession( userSessionEntity.getId(), true); } }); @@ -796,7 +808,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { @Override public UserSessionModel createOfflineUserSession(UserSessionModel userSession) { - UserSessionAdapter offlineUserSession = importUserSession(userSession, true, false); + UserSessionAdapter offlineUserSession = importUserSession(userSession, true); // started and lastSessionRefresh set to current time int currentTime = Time.currentTime(); @@ -866,8 +878,117 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { return getUserSessions(realm, client, first, max, true); } + @Override - public UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline, boolean importAuthenticatedClientSessions) { + public void importUserSessions(Collection persistentUserSessions, boolean offline) { + if (persistentUserSessions == null || persistentUserSessions.isEmpty()) { + return; + } + + Map> clientSessionsById = new HashMap<>(); + + Map> sessionsById = persistentUserSessions.stream() + .map((UserSessionModel persistentUserSession) -> { + + UserSessionEntity userSessionEntityToImport = createUserSessionEntityInstance(persistentUserSession); + + for (Map.Entry entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) { + String clientUUID = entry.getKey(); + AuthenticatedClientSessionModel clientSession = entry.getValue(); + AuthenticatedClientSessionEntity clientSessionToImport = createAuthenticatedClientSessionInstance(clientSession, userSessionEntityToImport.getRealmId(), offline); + + // Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value + clientSessionToImport.setTimestamp(userSessionEntityToImport.getLastSessionRefresh()); + + clientSessionsById.put(clientSessionToImport.getId(), new SessionEntityWrapper<>(clientSessionToImport)); + + // Update userSession entity with the clientSession + AuthenticatedClientSessionStore clientSessions = userSessionEntityToImport.getAuthenticatedClientSessions(); + clientSessions.put(clientUUID, clientSessionToImport.getId()); + } + + return userSessionEntityToImport; + }) + .map(SessionEntityWrapper::new) + .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity())); + + // Directly put all entities to the infinispan cache + Cache> cache = CacheDecorators.skipCacheLoaders(getCache(offline)); + cache.putAll(sessionsById); + + // put all entities to the remoteCache (if exists) + RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache); + if (remoteCache != null) { + Map> sessionsByIdForTransport = sessionsById.values().stream() + .map(SessionEntityWrapper::forTransport) + .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity())); + + Retry.executeWithBackoff((int iteration) -> { + + try { + remoteCache.putAll(sessionsByIdForTransport); + } catch (HotRodClientException re) { + if (log.isDebugEnabled()) { + log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task", + sessionsByIdForTransport.size(), iteration); + } + + // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation. + throw re; + } + + }, 10, 10); + } + + // Import client sessions + Cache> clientSessCache = offline ? offlineClientSessionCache : clientSessionCache; + clientSessCache = CacheDecorators.skipCacheLoaders(clientSessCache); + + clientSessCache.putAll(clientSessionsById); + + // put all entities to the remoteCache (if exists) + RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache); + if (remoteCacheClientSessions != null) { + Map> sessionsByIdForTransport = clientSessionsById.values().stream() + .map(SessionEntityWrapper::forTransport) + .collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity())); + + Retry.executeWithBackoff((int iteration) -> { + + try { + remoteCacheClientSessions.putAll(sessionsByIdForTransport); + } catch (HotRodClientException re) { + if (log.isDebugEnabled()) { + log.debugf(re, "Failed to put import %d client sessions to remoteCache. Iteration '%s'. Will try to retry the task", + sessionsByIdForTransport.size(), iteration); + } + + // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation. + throw re; + } + + }, 10, 10); + } + } + + + // Imports just userSession without it's clientSessions + protected UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline) { + UserSessionEntity entity = createUserSessionEntityInstance(userSession); + + InfinispanChangelogBasedTransaction userSessionUpdateTx = getTransaction(offline); + InfinispanChangelogBasedTransaction clientSessionUpdateTx = getClientSessionTransaction(offline); + + SessionUpdateTask importTask = Tasks.addIfAbsentSync(); + userSessionUpdateTx.addTask(userSession.getId(), importTask, entity); + + UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline); + + return importedSession; + } + + + private UserSessionEntity createUserSessionEntityInstance(UserSessionModel userSession) { UserSessionEntity entity = new UserSessionEntity(); entity.setId(userSession.getId()); entity.setRealmId(userSession.getRealm().getId()); @@ -896,22 +1017,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { entity.setStarted(userSession.getStarted()); entity.setLastSessionRefresh(userSession.getLastSessionRefresh()); - InfinispanChangelogBasedTransaction userSessionUpdateTx = getTransaction(offline); - InfinispanChangelogBasedTransaction clientSessionUpdateTx = getClientSessionTransaction(offline); - - SessionUpdateTask importTask = Tasks.addIfAbsentSync(); - userSessionUpdateTx.addTask(userSession.getId(), importTask, entity); - - UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline); - - // Handle client sessions - if (importAuthenticatedClientSessions) { - for (AuthenticatedClientSessionModel clientSession : userSession.getAuthenticatedClientSessions().values()) { - importClientSession(importedSession, clientSession, userSessionUpdateTx, clientSessionUpdateTx, offline); - } - } - - return importedSession; + return entity; } @@ -919,16 +1025,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { InfinispanChangelogBasedTransaction userSessionUpdateTx, InfinispanChangelogBasedTransaction clientSessionUpdateTx, boolean offline) { - final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline)); - AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId); - entity.setRealmId(sessionToImportInto.getRealm().getId()); - - entity.setAction(clientSession.getAction()); - entity.setAuthMethod(clientSession.getProtocol()); - - entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes()); - entity.setRedirectUri(clientSession.getRedirectUri()); - entity.setTimestamp(clientSession.getTimestamp()); + AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(clientSession, sessionToImportInto.getRealm().getId(), offline); + final UUID clientSessionId = entity.getId(); SessionUpdateTask createClientSessionTask = Tasks.addIfAbsentSync(); clientSessionUpdateTx.addTask(entity.getId(), createClientSessionTask, entity); @@ -942,6 +1040,22 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { return new AuthenticatedClientSessionAdapter(session,this, entity, clientSession.getClient(), sessionToImportInto, userSessionUpdateTx, clientSessionUpdateTx, offline); } + + private AuthenticatedClientSessionEntity createAuthenticatedClientSessionInstance(AuthenticatedClientSessionModel clientSession, String realmId, boolean offline) { + final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline)); + AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId); + entity.setRealmId(realmId); + + entity.setAction(clientSession.getAction()); + entity.setAuthMethod(clientSession.getProtocol()); + + entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes()); + entity.setRedirectUri(clientSession.getRedirectUri()); + entity.setTimestamp(clientSession.getTimestamp()); + + return entity; + } + private static class RegisterClientSessionTask implements SessionUpdateTask { private final String clientUuid; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java index 15d8c25413..a59706af73 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java @@ -32,8 +32,10 @@ import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionProvider; import org.keycloak.models.UserSessionProviderFactory; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory; +import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore; +import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStoreFactory; import org.keycloak.models.sessions.infinispan.initializer.CacheInitializer; import org.keycloak.models.sessions.infinispan.initializer.DBLockBasedCacheInitializer; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker; @@ -80,8 +82,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider private Config.Scope config; private RemoteCacheInvoker remoteCacheInvoker; - private LastSessionRefreshStore lastSessionRefreshStore; - private LastSessionRefreshStore offlineLastSessionRefreshStore; + private CrossDCLastSessionRefreshStore lastSessionRefreshStore; + private CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore; + private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore; private InfinispanKeyGenerator keyGenerator; @Override @@ -93,7 +96,8 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider Cache> offlineClientSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME); Cache> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME); - return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore, keyGenerator, + return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore, + persisterLastSessionRefreshStore, keyGenerator, cache, offlineSessionsCache, clientSessionCache, offlineClientSessionsCache, loginFailures); } @@ -169,6 +173,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider initializer.initCache(); initializer.loadSessions(); + + // Initialize persister for periodically doing bulk DB updates of lastSessionRefresh timestamps of refreshed sessions + persisterLastSessionRefreshStore = new PersisterLastSessionRefreshStoreFactory().createAndInit(session, true); } }); @@ -233,7 +240,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider }); if (sessionsRemoteCache) { - lastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false); + lastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false); } Cache> clientSessionsCache = ispn.getCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME); @@ -248,7 +255,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider }); if (offlineSessionsRemoteCache) { - offlineLastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true); + offlineLastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true); } Cache> offlineClientSessionsCache = ispn.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java index a7c6c7fd4c..7355e6c96f 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/UserSessionAdapter.java @@ -24,10 +24,11 @@ import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionModel; import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.Tasks; import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; @@ -42,6 +43,8 @@ import java.util.Objects; import java.util.UUID; import java.util.stream.Collectors; +import static org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE; + /** * @author Stian Thorgersen */ @@ -207,6 +210,15 @@ public class UserSessionAdapter implements UserSessionModel { } public void setLastSessionRefresh(int lastSessionRefresh) { + if (offline) { + // Received the message from the other DC that we should update the lastSessionRefresh in local cluster. Don't update DB in that case. + // The other DC already did. + Boolean ignoreRemoteCacheUpdate = (Boolean) session.getAttribute(CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE); + if (ignoreRemoteCacheUpdate == null || !ignoreRemoteCacheUpdate) { + provider.getPersisterLastSessionRefreshStore().putLastSessionRefresh(session, entity.getId(), realm.getId(), lastSessionRefresh); + } + } + UserSessionUpdateTask task = new UserSessionUpdateTask() { @Override @@ -216,7 +228,7 @@ public class UserSessionAdapter implements UserSessionModel { @Override public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper sessionWrapper) { - return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore()) + return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore()) .shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStore.java similarity index 68% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStore.java index c50bcf1951..69db5a26e1 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStore.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStore.java @@ -20,34 +20,27 @@ package org.keycloak.models.sessions.infinispan.changes.sessions; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.jboss.logging.Logger; -import org.keycloak.cluster.ClusterProvider; import org.keycloak.common.util.Time; import org.keycloak.models.KeycloakSession; /** - * Tracks the queue of lastSessionRefreshes, which were updated on this host. Those will be sent to the second DC in bulk, so second DC can update - * lastSessionRefreshes on it's side. Message is sent either periodically or if there are lots of stored lastSessionRefreshes. + * Abstract "store" for bulk sending of the updates related to lastSessionRefresh * * @author Marek Posolda */ -public class LastSessionRefreshStore { - - protected static final Logger logger = Logger.getLogger(LastSessionRefreshStore.class); +public abstract class AbstractLastSessionRefreshStore { private final int maxIntervalBetweenMessagesSeconds; private final int maxCount; - private final String eventKey; private volatile Map lastSessionRefreshes = new ConcurrentHashMap<>(); private volatile int lastRun = Time.currentTime(); - protected LastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) { + protected AbstractLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount) { this.maxIntervalBetweenMessagesSeconds = maxIntervalBetweenMessagesSeconds; this.maxCount = maxCount; - this.eventKey = eventKey; } @@ -86,16 +79,11 @@ public class LastSessionRefreshStore { } - protected void sendMessage(KeycloakSession kcSession, Map refreshesToSend) { - LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend); - - if (logger.isDebugEnabled()) { - logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString()); - } - - // Don't notify local DC about the lastSessionRefreshes. They were processed here already - ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class); - cluster.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC); - } - + /** + * Bulk update the underlying store with all the user sessions, which were refreshed by Keycloak since the last call of this method + * + * @param kcSession + * @param refreshesToSend Key is userSession ID, SessionData are data about the session + */ + protected abstract void sendMessage(KeycloakSession kcSession, Map refreshesToSend); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStoreFactory.java similarity index 51% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStoreFactory.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStoreFactory.java index 6db17d0760..3427ee3f07 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshStoreFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/AbstractLastSessionRefreshStoreFactory.java @@ -17,19 +17,15 @@ package org.keycloak.models.sessions.infinispan.changes.sessions; -import org.infinispan.Cache; -import org.keycloak.cluster.ClusterProvider; import org.keycloak.common.util.Time; import org.keycloak.models.KeycloakSession; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.utils.SessionTimeoutHelper; import org.keycloak.timer.TimerProvider; /** * @author Marek Posolda */ -public class LastSessionRefreshStoreFactory { +public abstract class AbstractLastSessionRefreshStoreFactory { // Timer interval. The store will be checked every 5 seconds whether the message with stored lastSessionRefreshes should be sent public static final long DEFAULT_TIMER_INTERVAL_MS = 5000; @@ -40,40 +36,14 @@ public class LastSessionRefreshStoreFactory { // Max count of lastSessionRefreshes. If count of lastSessionRefreshes reach this value, the message is sent to second DC public static final int DEFAULT_MAX_COUNT = 100; - // Name of periodic tasks to send events to the other DCs - public static final String LSR_PERIODIC_TASK_NAME = "lastSessionRefreshes"; - public static final String LSR_OFFLINE_PERIODIC_TASK_NAME = "lastSessionRefreshes-offline"; - public LastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache> cache, boolean offline) { - return createAndInit(kcSession, cache, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline); - } - - - public LastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache> cache, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) { - String eventKey = offline ? LSR_OFFLINE_PERIODIC_TASK_NAME : LSR_PERIODIC_TASK_NAME; - LastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, eventKey); - - // Register listener - ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class); - cluster.registerListener(eventKey, new LastSessionRefreshListener(kcSession, cache, offline)); - - // Setup periodic timer check + protected void setupPeriodicTimer(KeycloakSession kcSession, AbstractLastSessionRefreshStore store, long timerIntervalMs, String eventKey) { TimerProvider timer = kcSession.getProvider(TimerProvider.class); timer.scheduleTask((KeycloakSession keycloakSession) -> { store.checkSendingMessage(keycloakSession, Time.currentTime()); }, timerIntervalMs, eventKey); - - return store; } - - - protected LastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) { - return new LastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey); - } - - - } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshChecker.java similarity index 88% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshChecker.java index 6d2e8e2133..13adb792c5 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshChecker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshChecker.java @@ -23,7 +23,6 @@ import org.jboss.logging.Logger; import org.keycloak.models.KeycloakSession; import org.keycloak.models.RealmModel; import org.keycloak.models.UserSessionModel; -import org.keycloak.models.sessions.infinispan.AuthenticatedClientSessionAdapter; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; @@ -32,15 +31,15 @@ import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; /** * @author Marek Posolda */ -public class LastSessionRefreshChecker { +public class CrossDCLastSessionRefreshChecker { - public static final Logger logger = Logger.getLogger(LastSessionRefreshChecker.class); + public static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshChecker.class); - private final LastSessionRefreshStore store; - private final LastSessionRefreshStore offlineStore; + private final CrossDCLastSessionRefreshStore store; + private final CrossDCLastSessionRefreshStore offlineStore; - public LastSessionRefreshChecker(LastSessionRefreshStore store, LastSessionRefreshStore offlineStore) { + public CrossDCLastSessionRefreshChecker(CrossDCLastSessionRefreshStore store, CrossDCLastSessionRefreshStore offlineStore) { this.store = store; this.offlineStore = offlineStore; } @@ -73,7 +72,7 @@ public class LastSessionRefreshChecker { logger.debugf("Skip writing last session refresh to the remoteCache. Session %s newLastSessionRefresh %d", userSessionId, newLastSessionRefresh); } - LastSessionRefreshStore storeToUse = offline ? offlineStore : store; + CrossDCLastSessionRefreshStore storeToUse = offline ? offlineStore : store; storeToUse.putLastSessionRefresh(kcSession, userSessionId, realm.getId(), newLastSessionRefresh); return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED; @@ -118,13 +117,13 @@ public class LastSessionRefreshChecker { } // We're likely not in cross-dc environment. Doesn't matter what we return - LastSessionRefreshStore storeToUse = offline ? offlineStore : store; + CrossDCLastSessionRefreshStore storeToUse = offline ? offlineStore : store; if (storeToUse == null) { return SessionUpdateTask.CrossDCMessageStatus.SYNC; } // Received the message from the other DC that we should update the lastSessionRefresh in local cluster - Boolean ignoreRemoteCacheUpdate = (Boolean) kcSession.getAttribute(LastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE); + Boolean ignoreRemoteCacheUpdate = (Boolean) kcSession.getAttribute(CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE); if (ignoreRemoteCacheUpdate != null && ignoreRemoteCacheUpdate) { return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED; } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshListener.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshListener.java similarity index 92% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshListener.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshListener.java index a18443884c..e4859907bf 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/LastSessionRefreshListener.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshListener.java @@ -36,9 +36,9 @@ import org.keycloak.models.utils.KeycloakModelUtils; /** * @author Marek Posolda */ -public class LastSessionRefreshListener implements ClusterListener { +public class CrossDCLastSessionRefreshListener implements ClusterListener { - public static final Logger logger = Logger.getLogger(LastSessionRefreshListener.class); + public static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshListener.class); public static final String IGNORE_REMOTE_CACHE_UPDATE = "IGNORE_REMOTE_CACHE_UPDATE"; @@ -48,7 +48,7 @@ public class LastSessionRefreshListener implements ClusterListener { private final Cache> cache; private final TopologyInfo topologyInfo; - public LastSessionRefreshListener(KeycloakSession session, Cache> cache, boolean offline) { + public CrossDCLastSessionRefreshListener(KeycloakSession session, Cache> cache, boolean offline) { this.sessionFactory = session.getKeycloakSessionFactory(); this.cache = cache; this.offline = offline; diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java new file mode 100644 index 0000000000..81669d9217 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStore.java @@ -0,0 +1,58 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes.sessions; + +import java.util.Map; + +import org.jboss.logging.Logger; +import org.keycloak.cluster.ClusterProvider; +import org.keycloak.models.KeycloakSession; + +/** + * Cross-DC based CrossDCLastSessionRefreshStore + * + * Tracks the queue of lastSessionRefreshes, which were updated on this host. Those will be sent to the second DC in bulk, so second DC can update + * lastSessionRefreshes on it's side. Message is sent either periodically or if there are lots of stored lastSessionRefreshes. + * + * @author Marek Posolda + */ +public class CrossDCLastSessionRefreshStore extends AbstractLastSessionRefreshStore { + + protected static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshStore.class); + + private final String eventKey; + + protected CrossDCLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) { + super(maxIntervalBetweenMessagesSeconds, maxCount); + this.eventKey = eventKey; + } + + + protected void sendMessage(KeycloakSession kcSession, Map refreshesToSend) { + LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend); + + if (logger.isDebugEnabled()) { + logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString()); + } + + // Don't notify local DC about the lastSessionRefreshes. They were processed here already + ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class); + cluster.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC); + } + +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java new file mode 100644 index 0000000000..b0d5702244 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/CrossDCLastSessionRefreshStoreFactory.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes.sessions; + +import org.infinispan.Cache; +import org.keycloak.cluster.ClusterProvider; +import org.keycloak.models.KeycloakSession; +import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; +import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; + +/** + * @author Marek Posolda + */ +public class CrossDCLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory { + + // Name of periodic tasks to send events to the other DCs + public static final String LSR_PERIODIC_TASK_NAME = "lastSessionRefreshes"; + public static final String LSR_OFFLINE_PERIODIC_TASK_NAME = "lastSessionRefreshes-offline"; + + + public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache> cache, boolean offline) { + return createAndInit(kcSession, cache, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline); + } + + + public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache> cache, + long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) { + String eventKey = offline ? LSR_OFFLINE_PERIODIC_TASK_NAME : LSR_PERIODIC_TASK_NAME; + CrossDCLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, eventKey); + + // Register listener + ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class); + cluster.registerListener(eventKey, new CrossDCLastSessionRefreshListener(kcSession, cache, offline)); + + // Setup periodic timer check + setupPeriodicTimer(kcSession, store, timerIntervalMs, eventKey); + + return store; + } + + + protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) { + return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey); + } + + + +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java new file mode 100644 index 0000000000..cecc417971 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStore.java @@ -0,0 +1,77 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes.sessions; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.jboss.logging.Logger; +import org.keycloak.common.util.Time; +import org.keycloak.models.KeycloakSession; +import org.keycloak.models.RealmModel; +import org.keycloak.models.session.UserSessionPersisterProvider; +import org.keycloak.models.utils.SessionTimeoutHelper; + +/** + * The store is supposed to do periodic bulk update of lastSessionRefresh times of all userSessions, which were refreshed during some period + * of time. The updates are sent to UserSessionPersisterProvider (DB) + * + * @author Marek Posolda + */ +public class PersisterLastSessionRefreshStore extends AbstractLastSessionRefreshStore { + + protected static final Logger logger = Logger.getLogger(PersisterLastSessionRefreshStore.class); + + private final boolean offline; + + protected PersisterLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) { + super(maxIntervalBetweenMessagesSeconds, maxCount); + this.offline = offline; + } + + + protected void sendMessage(KeycloakSession kcSession, Map refreshesToSend) { + Map> sessionIdsByRealm = + refreshesToSend.entrySet().stream().collect( + Collectors.groupingBy(entry -> entry.getValue().getRealmId(), + Collectors.mapping(Map.Entry::getKey, Collectors.toSet()))); + + // Update DB with a bit lower value than current time to ensure 'revokeRefreshToken' will work correctly taking server + int lastSessionRefresh = Time.currentTime() - SessionTimeoutHelper.PERIODIC_TASK_INTERVAL_SECONDS; + + if (logger.isDebugEnabled()) { + logger.debugf("Updating %d userSessions with lastSessionRefresh: %d", refreshesToSend.size(), lastSessionRefresh); + } + + UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class); + + for (Map.Entry> entry : sessionIdsByRealm.entrySet()) { + RealmModel realm = kcSession.realms().getRealm(entry.getKey()); + + // Case when realm was deleted in the meantime. UserSessions were already deleted as well (callback for realm deletion) + if (realm == null) { + continue; + } + + Set userSessionIds = entry.getValue(); + + persister.updateLastSessionRefreshes(realm, lastSessionRefresh, userSessionIds, offline); + } + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java new file mode 100644 index 0000000000..d720ef818b --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/sessions/PersisterLastSessionRefreshStoreFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.changes.sessions; + +import org.infinispan.Cache; +import org.keycloak.models.KeycloakSession; +import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; +import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; + +/** + * @author Marek Posolda + */ +public class PersisterLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory { + + public PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession, boolean offline) { + return createAndInit(kcSession, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline); + } + + + private PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession, + long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) { + PersisterLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, offline); + + // Setup periodic timer check + setupPeriodicTimer(kcSession, store, timerIntervalMs, "db-last-session-refresh"); + + return store; + } + + + protected PersisterLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) { + return new PersisterLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, offline); + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java index 8dbcc20bcf..afbd37ecfe 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/InfinispanCacheInitializer.java @@ -84,7 +84,6 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { }); state = new InitializerState(ctx[0].getSegmentsCount()); - saveStateToCache(state); } else { KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() { @Override @@ -102,7 +101,7 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { } - protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext ctx) { + protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) { // Assume each worker has same processor's count int processors = Runtime.getRuntime().availableProcessors(); @@ -114,6 +113,8 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { int errors = 0; try { + List previousResults = new LinkedList<>(); + while (!state.isFinished()) { int nodesCount = transport==null ? 1 : transport.getMembers().size(); int distributedWorkersCount = processors * nodesCount; @@ -126,34 +127,43 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { log.trace("unfinished segments for this iteration: " + segments); } - List> futures = new LinkedList<>(); + List> futures = new LinkedList<>(); + + int workerId = 0; for (Integer segment : segments) { + SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, workerId, previousResults); + SessionInitializerWorker worker = new SessionInitializerWorker(); - worker.setWorkerEnvironment(segment, ctx, sessionLoader); + worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader); + if (!distributed) { worker.setEnvironment(workCache, null); } - Future future = executorService.submit(worker); + Future future = executorService.submit(worker); futures.add(future); + + workerId++; } - for (Future future : futures) { + boolean anyFailure = false; + for (Future future : futures) { try { - WorkerResult result = future.get(); + SessionLoader.WorkerResult result = future.get(); + previousResults.add(result); - if (result.getSuccess()) { - int computedSegment = result.getSegment(); - state.markSegmentFinished(computedSegment); - } else { + if (!result.isSuccess()) { if (log.isTraceEnabled()) { log.tracef("Segment %d failed to compute", result.getSegment()); } + anyFailure = true; } } catch (InterruptedException ie) { + anyFailure = true; errors++; log.error("Interruped exception when computed future. Errors: " + errors, ie); } catch (ExecutionException ee) { + anyFailure = true; errors++; log.error("ExecutionException when computed future. Errors: " + errors, ee); } @@ -163,11 +173,19 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details"); } - saveStateToCache(state); + // Save just if no error happened. Otherwise re-compute + if (!anyFailure) { + for (SessionLoader.WorkerResult result : previousResults) { + state.markSegmentFinished(result.getSegment()); + } - log.debugf("New initializer state pushed. The state is: %s", state); + log.debugf("New initializer state is: %s", state); + } } + // Push the state after computation is finished + saveStateToCache(state); + // Loader callback after the task is finished this.sessionLoader.afterAllSessionsLoaded(this); @@ -179,33 +197,4 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer { } } - - public static class WorkerResult implements Serializable { - - private Integer segment; - private Boolean success; - - public static WorkerResult create (Integer segment, boolean success) { - WorkerResult res = new WorkerResult(); - res.setSegment(segment); - res.setSuccess(success); - return res; - } - - public Integer getSegment() { - return segment; - } - - public void setSegment(Integer segment) { - this.segment = segment; - } - - public Boolean getSuccess() { - return success; - } - - public void setSuccess(Boolean success) { - this.success = success; - } - } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoaderContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentLoaderContext.java similarity index 75% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoaderContext.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentLoaderContext.java index 491239b397..a18b15a818 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoaderContext.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentLoaderContext.java @@ -17,26 +17,27 @@ package org.keycloak.models.sessions.infinispan.initializer; -import java.io.Serializable; - /** * @author Marek Posolda */ -public class OfflinePersistentUserSessionLoaderContext implements SessionLoader.LoaderContext, Serializable { +public class OfflinePersistentLoaderContext extends SessionLoader.LoaderContext { private final int sessionsTotal; - private final int segmentsCount; private final int sessionsPerSegment; - public OfflinePersistentUserSessionLoaderContext(int sessionsTotal, int sessionsPerSegment) { + public OfflinePersistentLoaderContext(int sessionsTotal, int sessionsPerSegment) { + super(computeSegmentsCount(sessionsTotal, sessionsPerSegment)); this.sessionsTotal = sessionsTotal; this.sessionsPerSegment = sessionsPerSegment; + } + + private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment) { int segmentsCount = sessionsTotal / sessionsPerSegment; if (sessionsTotal % sessionsPerSegment >= 1) { segmentsCount = segmentsCount + 1; } - this.segmentsCount = segmentsCount; + return segmentsCount; } @@ -44,11 +45,6 @@ public class OfflinePersistentUserSessionLoaderContext implements SessionLoader. return sessionsTotal; } - @Override - public int getSegmentsCount() { - return segmentsCount; - } - public int getSessionsPerSegment() { return sessionsPerSegment; } @@ -56,10 +52,10 @@ public class OfflinePersistentUserSessionLoaderContext implements SessionLoader. @Override public String toString() { - return new StringBuilder("OfflinePersistentUserSessionLoaderContext [ ") + return new StringBuilder("OfflinePersistentLoaderContext [ ") .append(" sessionsTotal: ").append(sessionsTotal) .append(", sessionsPerSegment: ").append(sessionsPerSegment) - .append(", segmentsCount: ").append(segmentsCount) + .append(", segmentsCount: ").append(getSegmentsCount()) .append(" ]") .toString(); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java index cf62230bce..74ad20a7bf 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentUserSessionLoader.java @@ -21,8 +21,8 @@ import org.infinispan.Cache; import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.context.Flag; import org.jboss.logging.Logger; -import org.keycloak.cluster.ClusterProvider; import org.keycloak.common.util.Retry; +import org.keycloak.common.util.Time; import org.keycloak.models.KeycloakSession; import org.keycloak.models.UserSessionModel; import org.keycloak.models.session.UserSessionPersisterProvider; @@ -33,7 +33,11 @@ import java.util.List; /** * @author Marek Posolda */ -public class OfflinePersistentUserSessionLoader implements SessionLoader, Serializable { +public class OfflinePersistentUserSessionLoader implements SessionLoader, Serializable { + + // Placeholder String used in the searching conditions to identify very first session + private static final String FIRST_SESSION_ID = "000"; private static final Logger log = Logger.getLogger(OfflinePersistentUserSessionLoader.class); @@ -53,46 +57,67 @@ public class OfflinePersistentUserSessionLoader implements SessionLoader previousResults) { + int lastCreatedOn; + String lastSessionId; + if (previousResults.isEmpty()) { + lastCreatedOn = 0; + lastSessionId = FIRST_SESSION_ID; + } else { + OfflinePersistentWorkerResult lastResult = previousResults.get(previousResults.size() - 1); + lastCreatedOn = lastResult.getLastCreatedOn(); + lastSessionId = lastResult.getLastSessionId(); } + // We know the last loaded session. New workers iteration will start from this place + return new OfflinePersistentWorkerContext(segment, workerId, lastCreatedOn, lastSessionId); + } + + + @Override + public OfflinePersistentWorkerResult createFailedWorkerResult(OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext workerContext) { + return new OfflinePersistentWorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId(), -1, FIRST_SESSION_ID); + } + + + @Override + public OfflinePersistentWorkerResult loadSessions(KeycloakSession session, OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext ctx) { + int first = ctx.getWorkerId() * sessionsPerSegment; + + log.tracef("Loading sessions for segment: %d", ctx.getSegment()); + UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class); - List sessions = persister.loadUserSessions(first, max, true); + List sessions = persister.loadUserSessions(first, sessionsPerSegment, true, ctx.getLastCreatedOn(), ctx.getLastSessionId()); - for (UserSessionModel persistentSession : sessions) { + log.tracef("Sessions loaded from DB - segment: %d", ctx.getSegment()); + + UserSessionModel lastSession = null; + if (!sessions.isEmpty()) { + lastSession = sessions.get(sessions.size() - 1); // Save to memory/infinispan - UserSessionModel offlineUserSession = session.sessions().importUserSession(persistentSession, true, true); + session.sessions().importUserSessions(sessions, true); } - return true; + int lastCreatedOn = lastSession==null ? Time.currentTime() + 100000 : lastSession.getStarted(); + String lastSessionId = lastSession==null ? FIRST_SESSION_ID : lastSession.getId(); + + log.tracef("Sessions imported to infinispan - segment: %d, lastCreatedOn: %d, lastSessionId: %s", ctx.getSegment(), lastCreatedOn, lastSessionId); + + return new OfflinePersistentWorkerResult(true, ctx.getSegment(), ctx.getWorkerId(), lastCreatedOn, lastSessionId); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java new file mode 100644 index 0000000000..8d8e3f3ba3 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerContext.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.initializer; + +/** + * @author Marek Posolda + */ +public class OfflinePersistentWorkerContext extends SessionLoader.WorkerContext { + + private final int lastCreatedOn; + private final String lastSessionId; + + public OfflinePersistentWorkerContext(int segment, int workerId, int lastCreatedOn, String lastSessionId) { + super(segment, workerId); + this.lastCreatedOn = lastCreatedOn; + this.lastSessionId = lastSessionId; + } + + + public int getLastCreatedOn() { + return lastCreatedOn; + } + + public String getLastSessionId() { + return lastSessionId; + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java new file mode 100644 index 0000000000..44aa2c52cd --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/OfflinePersistentWorkerResult.java @@ -0,0 +1,44 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.initializer; + +/** + * @author Marek Posolda + */ +public class OfflinePersistentWorkerResult extends SessionLoader.WorkerResult { + + private final int lastCreatedOn; + private final String lastSessionId; + + + public OfflinePersistentWorkerResult(boolean success, int segment, int workerId, int lastCreatedOn, String lastSessionId) { + super(success, segment, workerId); + this.lastCreatedOn = lastCreatedOn; + this.lastSessionId = lastSessionId; + } + + + public int getLastCreatedOn() { + return lastCreatedOn; + } + + + public String getLastSessionId() { + return lastSessionId; + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java index 7df94bf04b..9bfb463320 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/SessionInitializerWorker.java @@ -31,19 +31,20 @@ import java.util.Set; /** * @author Marek Posolda */ -public class SessionInitializerWorker implements DistributedCallable, Serializable { +public class SessionInitializerWorker implements DistributedCallable, Serializable { private static final Logger log = Logger.getLogger(SessionInitializerWorker.class); - private int segment; - private SessionLoader.LoaderContext ctx; + + private SessionLoader.LoaderContext loaderCtx; + private SessionLoader.WorkerContext workerCtx; private SessionLoader sessionLoader; private transient Cache workCache; - public void setWorkerEnvironment(int segment, SessionLoader.LoaderContext ctx, SessionLoader sessionLoader) { - this.segment = segment; - this.ctx = ctx; + public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader) { + this.loaderCtx = loaderCtx; + this.workerCtx = workerCtx; this.sessionLoader = sessionLoader; } @@ -53,27 +54,28 @@ public class SessionInitializerWorker implements DistributedCallableMarek Posolda */ -public interface SessionLoader extends Serializable { +public interface SessionLoader extends Serializable { /** * Will be triggered just once on cluster coordinator node to perform some generic initialization tasks (Eg. update DB before starting load). @@ -38,7 +41,7 @@ public interface SessionLoader previousResults); + + /** * Will be called on all cluster nodes to load the specified page. * * @param session - * @param loaderContext loaderContext object, which was already computed before - * @param segment to be computed + * @param loaderContext global loaderContext object, which was already computed before + * @param workerContext for current iteration * @return */ - boolean loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, int segment); + WORKER_RESULT loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext); + + + /** + * Called when it's not possible to compute current iteration and load session for some reason (EG. infinispan not yet fully initialized) + * + * @param loaderContext + * @param workerContext + * @return + */ + WORKER_RESULT createFailedWorkerResult(LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext); /** @@ -81,9 +106,78 @@ public interface SessionLoaderMarek Posolda */ -public class RemoteCacheSessionsLoader implements SessionLoader, Serializable { +public class RemoteCacheSessionsLoader implements SessionLoader, Serializable { private static final Logger log = Logger.getLogger(RemoteCacheSessionsLoader.class); @@ -92,26 +93,38 @@ public class RemoteCacheSessionsLoader implements SessionLoader previousResults) { + return new WorkerContext(segment, workerId); + } + + + @Override + public WorkerResult createFailedWorkerResult(RemoteCacheSessionsLoaderContext loaderContext, WorkerContext workerContext) { + return new WorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId()); + } + + + @Override + public WorkerResult loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext loaderContext, WorkerContext ctx) { Cache cache = getCache(session); Cache decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES); RemoteCache remoteCache = getRemoteCache(session); - Set myIspnSegments = getMyIspnSegments(segment, context); + Set myIspnSegments = getMyIspnSegments(ctx.getSegment(), loaderContext); - log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), segment); + log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment()); CloseableIterator iterator = null; int countLoaded = 0; try { - iterator = remoteCache.retrieveEntries(null, myIspnSegments, context.getSessionsPerSegment()); + iterator = remoteCache.retrieveEntries(null, myIspnSegments, loaderContext.getSessionsPerSegment()); while (iterator.hasNext()) { countLoaded++; Map.Entry entry = iterator.next(); decoratedCache.putAsync(entry.getKey(), entry.getValue()); } } catch (RuntimeException e) { - log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), segment); + log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment()); throw e; } finally { if (iterator != null) { @@ -119,9 +132,9 @@ public class RemoteCacheSessionsLoader implements SessionLoaderMarek Posolda */ -public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderContext, Serializable { +public class RemoteCacheSessionsLoaderContext extends SessionLoader.LoaderContext { // Count of hash segments for remote infinispan cache. It's by default 256 for distributed/replicated caches private final int ispnSegmentsCount; - // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration. - // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items, - // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8) - // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment) - private final int segmentsCount; - private final int sessionsPerSegment; private final int sessionsTotal; public RemoteCacheSessionsLoaderContext(int ispnSegmentsCount, int sessionsPerSegment, int sessionsTotal) { + super(computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount)); this.ispnSegmentsCount = ispnSegmentsCount; this.sessionsPerSegment = sessionsPerSegment; this.sessionsTotal = sessionsTotal; - this.segmentsCount = computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount); } - private int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) { + // Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration. + // Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items, + // we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8) + // and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment) + private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) { // No support by remote ISPN cache for segments. This can happen if remoteCache is local (non-clustered) if (ispnSegments < 0) { return 1; @@ -68,11 +64,6 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon } - @Override - public int getSegmentsCount() { - return segmentsCount; - } - public int getIspnSegmentsCount() { return ispnSegmentsCount; } @@ -89,7 +80,7 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon @Override public String toString() { return new StringBuilder("RemoteCacheSessionsLoaderContext [ ") - .append("segmentsCount: ").append(segmentsCount) + .append("segmentsCount: ").append(getSegmentsCount()) .append(", ispnSegmentsCount: ").append(ispnSegmentsCount) .append(", sessionsPerSegment: ").append(sessionsPerSegment) .append(", sessionsTotal: ").append(sessionsTotal) diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java index 950a4c03c5..8ae5053518 100644 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java +++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/RemoteCacheSessionsLoaderTest.java @@ -31,6 +31,7 @@ import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.KeycloakSession; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; +import org.keycloak.models.sessions.infinispan.initializer.SessionLoader; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoader; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext; import org.keycloak.models.sessions.infinispan.util.InfinispanUtil; @@ -110,7 +111,7 @@ public class RemoteCacheSessionsLoaderTest { Set visitedKeys = new HashSet<>(); for (int currentSegment=0 ; currentSegment cache = cacheManager.getCache("COUNTER_CACHE"); - cache.put("key", "init"); + + Map map = new HashMap<>(); + map.put("key1", "val1"); + map.put("key2", "val2"); + cache.putAll(map); + ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(new Runnable() { @Override diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java index 12a3ee5cb8..719a39a562 100644 --- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java +++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java @@ -17,17 +17,18 @@ package org.keycloak.models.sessions.infinispan.initializer; -import java.util.Arrays; -import java.util.HashSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.HashMap; +import java.util.Map; import org.infinispan.Cache; +import org.infinispan.client.hotrod.ProtocolVersion; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.RemoteCacheManager; +import org.infinispan.commons.api.BasicCache; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; -import org.infinispan.context.Flag; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.remoting.transport.jgroups.JGroupsTransport; @@ -37,7 +38,6 @@ import org.keycloak.common.util.Time; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import java.util.UUID; @@ -49,20 +49,36 @@ import java.util.UUID; @Ignore public class DistributedCacheConcurrentWritesTest { - private static final int ITERATION_PER_WORKER = 1000; - - private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0); - private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0); - - private static final UUID CLIENT_1_UUID = UUID.randomUUID(); + private static final int BATCHES_PER_WORKER = 1000; + private static final int ITEMS_IN_BATCH = 100; public static void main(String[] args) throws Exception { - CacheWrapper cache1 = createCache("node1"); - CacheWrapper cache2 = createCache("node2"); + BasicCache> cache1 = createCache("node1"); + BasicCache> cache2 = createCache("node2"); + // NOTE: This setup requires infinispan servers to be up and running on localhost:12232 and localhost:13232 +// BasicCache> cache1 = createRemoteCache("node1"); +// BasicCache> cache2 = createRemoteCache("node2"); + + try { + testConcurrentPut(cache1, cache2); + } finally { + + // Kill JVM + cache1.stop(); + cache2.stop(); + stopMgr(cache1); + stopMgr(cache2); + + System.out.println("Managers killed"); + } + } + + + private static SessionEntityWrapper createEntityInstance(String id) { // Create initial item UserSessionEntity session = new UserSessionEntity(); - session.setId("123"); + session.setId(id); session.setRealmId("foo"); session.setBrokerSessionId("!23123123"); session.setBrokerUserId(null); @@ -76,177 +92,97 @@ public class DistributedCacheConcurrentWritesTest { clientSession.setAuthMethod("saml"); clientSession.setAction("something"); clientSession.setTimestamp(1234); - session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId()); + session.getAuthenticatedClientSessions().put("foo-client", clientSession.getId()); - try { - for (int i = 0; i < 10; i++) { - testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2); - } - } finally { - - // Kill JVM - cache1.getCache().stop(); - cache2.getCache().stop(); - cache1.getCache().getCacheManager().stop(); - cache2.getCache().getCacheManager().stop(); - - System.out.println("Managers killed"); - } + return new SessionEntityWrapper<>(session); } // Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime - private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper cache1, - CacheWrapper cache2) throws InterruptedException { - cache1.put(key, session); + private static void testConcurrentPut(BasicCache> cache1, + BasicCache> cache2) throws InterruptedException { - // Create 2 workers for concurrent write and start them - Worker worker1 = new Worker(1, cache1, key); - Worker worker2 = new Worker(2, cache2, key); + // Create workers for concurrent write and start them + Worker worker1 = new Worker(1, cache1); + Worker worker2 = new Worker(2, cache2); long start = System.currentTimeMillis(); - System.out.println("Started clustering test for key " + key); + System.out.println("Started clustering test"); worker1.start(); //worker1.join(); worker2.start(); - Thread.sleep(1000); - // Try to remove the entity after some sleep time. - cache1.wrappedCache.getAdvancedCache() - .withFlags(Flag.CACHE_MODE_LOCAL) - .remove(key); - worker1.join(); worker2.join(); long took = System.currentTimeMillis() - start; - System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms"); + System.out.println("Test finished. Took: " + took + " ms. Cache size: " + cache1.size()); -// System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get() -// + ", failedReplaceCounter2: " + failedReplaceCounter2.get()); - -// // JGroups statistics -// JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel(); -// System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 + -// ", received messages: " + channel.getReceivedMessages()); + // JGroups statistics + printStats(cache1); } private static class Worker extends Thread { - private final CacheWrapper cache; - private final int threadId; - private final String key; + private final BasicCache> cache; + private final int startIndex; - public Worker(int threadId, CacheWrapper cache, String key) { - this.threadId = threadId; + public Worker(int threadId, BasicCache> cache) { this.cache = cache; - this.key = key; - setName("th-" + key + "-" + threadId); + this.startIndex = (threadId - 1) * (ITEMS_IN_BATCH * BATCHES_PER_WORKER); + setName("th-" + threadId); } @Override public void run() { - for (int i=0 ; i oldWrapped = cache.get(key); -// oldWrapped.getEntity().getNotes().put(noteKey, "someVal"); -// replaced = cacheReplace(oldWrapped, oldWrapped.getEntity()); -// } - - int count = 0; - boolean replaced = false; - while (!replaced && count < 25) { - count++; - SessionEntityWrapper oldWrapped = cache.get(key); - oldWrapped.getEntity().getNotes().put(noteKey, "someVal"); - replaced = cacheReplace(oldWrapped, oldWrapped.getEntity()); - } - if (!replaced) { - System.err.println("FAILED TO REPLACE ENTITY: " + key); - return; - } + System.out.println("Thread " + getName() + ": Saved items from " + startPageIndex + " to " + (startPageIndex + ITEMS_IN_BATCH - 1)); } - } - private boolean cacheReplace(SessionEntityWrapper oldSession, UserSessionEntity newSession) { - try { - boolean replaced = cache.replace(key, oldSession, newSession); - //cache.replace(key, newSession); - if (!replaced) { - failedReplaceCounter.incrementAndGet(); - //return false; - //System.out.println("Replace failed!!!"); - } - return replaced; - } catch (Exception re) { - failedReplaceCounter2.incrementAndGet(); - return false; + + // put items 1 by 1 + private void putItemsClassic(int startPageIndex) { + for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) { + String key = "key-" + startIndex + i; + SessionEntityWrapper session = createEntityInstance(key); + cache.put(key, session); } - //return replaced; } - } - // Session clone + // put all items together + private void putItemsAll(int startPageIndex) { + Map> mapp = new HashMap<>(); - private static UserSessionEntity cloneSession(UserSessionEntity session) { - UserSessionEntity clone = new UserSessionEntity(); - clone.setId(session.getId()); - clone.setRealmId(session.getRealmId()); - clone.setNotes(new ConcurrentHashMap<>(session.getNotes())); - return clone; + for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) { + String key = "key-" + startIndex + i; + SessionEntityWrapper session = createEntityInstance(key); + mapp.put(key, session); + } + + cache.putAll(mapp); + } } // Cache creation utils - public static class CacheWrapper { - private final Cache> wrappedCache; - - public CacheWrapper(Cache> wrappedCache) { - this.wrappedCache = wrappedCache; - } - - - public SessionEntityWrapper get(K key) { - SessionEntityWrapper val = wrappedCache.get(key); - return val; - } - - public void put(K key, V newVal) { - SessionEntityWrapper newWrapper = new SessionEntityWrapper<>(newVal); - wrappedCache.put(key, newWrapper); - } - - - public boolean replace(K key, SessionEntityWrapper oldVal, V newVal) { - SessionEntityWrapper newWrapper = new SessionEntityWrapper<>(newVal); - return wrappedCache.replace(key, oldVal, newWrapper); - } - - private Cache> getCache() { - return wrappedCache; - } - - } - - - public static CacheWrapper createCache(String nodeName) { + public static BasicCache> createCache(String nodeName) { EmbeddedCacheManager mgr = createManager(nodeName); - Cache> wrapped = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - return new CacheWrapper<>(wrapped); + Cache> cache = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); + return cache; } @@ -272,7 +208,7 @@ public class DistributedCacheConcurrentWritesTest { ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder(); if (clustered) { distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC); - distConfigBuilder.clustering().hash().numOwners(2); + distConfigBuilder.clustering().hash().numOwners(1); // Disable L1 cache distConfigBuilder.clustering().hash().l1().enabled(false); @@ -283,4 +219,42 @@ public class DistributedCacheConcurrentWritesTest { return cacheManager; } + + + public static BasicCache> createRemoteCache(String nodeName) { + int port = ("node1".equals(nodeName)) ? 12232 : 13232; + + org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder(); + org.infinispan.client.hotrod.configuration.Configuration cfg = builder + .addServer().host("localhost").port(port) + .version(ProtocolVersion.PROTOCOL_VERSION_26) + .build(); + RemoteCacheManager mgr = new RemoteCacheManager(cfg); + return mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); + } + + // CLEANUP METHODS + + private static void stopMgr(BasicCache cache) { + if (cache instanceof Cache) { + ((Cache) cache).getCacheManager().stop(); + } else { + ((RemoteCache) cache).getRemoteCacheManager().stop(); + } + } + + + private static void printStats(BasicCache cache) { + if (cache instanceof Cache) { + Cache cache1 = (Cache) cache; + + JChannel channel = ((JGroupsTransport)cache1.getAdvancedCache().getRpcManager().getTransport()).getChannel(); + + System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 + + ", received messages: " + channel.getReceivedMessages()); + } else { + Map stats = ((RemoteCache) cache).stats().getStatsMap(); + System.out.println("Stats: " + stats); + } + } } diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java index 1e72aa8844..8a04231c66 100644 --- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java +++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/InitializerStateTest.java @@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.initializer; import org.junit.Assert; import org.junit.Test; -import org.keycloak.models.cache.infinispan.UserCacheSession; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext; import org.keycloak.storage.CacheableStorageProviderModel; @@ -35,16 +34,16 @@ public class InitializerStateTest { @Test public void testOfflineLoaderContext() { - OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5); + OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5); Assert.assertEquals(ctx.getSegmentsCount(), 6); - ctx = new OfflinePersistentUserSessionLoaderContext(19, 5); + ctx = new OfflinePersistentLoaderContext(19, 5); Assert.assertEquals(ctx.getSegmentsCount(), 4); - ctx = new OfflinePersistentUserSessionLoaderContext(20, 5); + ctx = new OfflinePersistentLoaderContext(20, 5); Assert.assertEquals(ctx.getSegmentsCount(), 4); - ctx = new OfflinePersistentUserSessionLoaderContext(21, 5); + ctx = new OfflinePersistentLoaderContext(21, 5); Assert.assertEquals(ctx.getSegmentsCount(), 5); } @@ -78,7 +77,7 @@ public class InitializerStateTest { @Test public void testComputationState() { - OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5); + OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5); Assert.assertEquals(ctx.getSegmentsCount(), 6); InitializerState state = new InitializerState(ctx.getSegmentsCount()); diff --git a/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java b/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java new file mode 100644 index 0000000000..2aa1b1171d --- /dev/null +++ b/model/jpa/src/main/java/org/keycloak/connections/jpa/updater/liquibase/custom/JpaUpdate4_7_0_OfflineSessionsTimestamps.java @@ -0,0 +1,54 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.connections.jpa.updater.liquibase.custom; + +import liquibase.exception.CustomChangeException; +import liquibase.statement.core.UpdateStatement; +import liquibase.structure.core.Table; +import org.keycloak.common.util.Time; + +/** + * Update CREATED_ON and LAST_SESSION_REFRESH columns to current startup time + * + * @author Marek Posolda + */ +public class JpaUpdate4_7_0_OfflineSessionsTimestamps extends CustomKeycloakTask { + + @Override + protected void generateStatementsImpl() throws CustomChangeException { + String offlineUserSessionsTableName = database.correctObjectName("OFFLINE_USER_SESSION", Table.class); + + try { + int currentTime = Time.currentTime(); + + UpdateStatement updateStatement = new UpdateStatement(null, null, offlineUserSessionsTableName) + .addNewColumnValue("LAST_SESSION_REFRESH", currentTime); + + statements.add(updateStatement); + + confirmationMessage.append("Updated column LAST_SESSION_REFRESH in OFFLINE_USER_SESSION table with time " + currentTime); + } catch (Exception e) { + throw new CustomChangeException(getTaskId() + ": Exception when updating data from previous version", e); + } + } + + @Override + protected String getTaskId() { + return "Update 4.7.0.Final"; + } +} diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java index 587ed7e960..132af92d0f 100644 --- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java +++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/JpaUserSessionPersisterProvider.java @@ -18,10 +18,10 @@ package org.keycloak.models.jpa.session; import org.jboss.logging.Logger; +import org.keycloak.common.util.Time; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; import org.keycloak.models.KeycloakSession; -import org.keycloak.models.ModelException; import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionModel; @@ -30,17 +30,23 @@ import org.keycloak.models.session.PersistentClientSessionModel; import org.keycloak.models.session.PersistentUserSessionAdapter; import org.keycloak.models.session.PersistentUserSessionModel; import org.keycloak.models.session.UserSessionPersisterProvider; +import org.keycloak.models.utils.SessionTimeoutHelper; import org.keycloak.storage.StorageId; import javax.persistence.EntityManager; import javax.persistence.Query; import javax.persistence.TypedQuery; + import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; /** * @author Marek Posolda @@ -63,6 +69,7 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv PersistentUserSessionEntity entity = new PersistentUserSessionEntity(); entity.setUserSessionId(model.getUserSessionId()); + entity.setCreatedOn(model.getStarted()); entity.setRealmId(adapter.getRealm().getId()); entity.setUserId(adapter.getUser().getId()); String offlineStr = offlineToString(offline); @@ -99,26 +106,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv em.flush(); } - @Override - public void updateUserSession(UserSessionModel userSession, boolean offline) { - PersistentUserSessionAdapter adapter; - if (userSession instanceof PersistentUserSessionAdapter) { - adapter = (PersistentUserSessionAdapter) userSession; - } else { - adapter = new PersistentUserSessionAdapter(userSession); - } - - PersistentUserSessionModel model = adapter.getUpdatedModel(); - - String offlineStr = offlineToString(offline); - PersistentUserSessionEntity entity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(userSession.getId(), offlineStr)); - if (entity == null) { - throw new ModelException("UserSession with ID " + userSession.getId() + ", offline: " + offline + " not found"); - } - entity.setLastSessionRefresh(model.getLastSessionRefresh()); - entity.setData(model.getData()); - } - @Override public void removeUserSession(String userSessionId, boolean offline) { String offlineStr = offlineToString(offline); @@ -200,7 +187,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv .setParameter("externalClientId", clientStorageId.getExternalId()) .executeUpdate(); } - num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate(); } @Override @@ -213,24 +199,53 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv num = em.createNamedQuery("deleteUserSessionsByUser").setParameter("userId", userId).executeUpdate(); } + @Override - public void clearDetachedUserSessions() { - int num = em.createNamedQuery("deleteDetachedClientSessions").executeUpdate(); - num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate(); + public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection userSessionIds, boolean offline) { + String offlineStr = offlineToString(offline); + + int us = em.createNamedQuery("updateUserSessionLastSessionRefresh") + .setParameter("lastSessionRefresh", lastSessionRefresh) + .setParameter("realmId", realm.getId()) + .setParameter("offline", offlineStr) + .setParameter("userSessionIds", userSessionIds) + .executeUpdate(); + + logger.debugf("Updated lastSessionRefresh of %d user sessions in realm '%s'", us, realm.getName()); } @Override - public void updateAllTimestamps(int time) { - int num = em.createNamedQuery("updateClientSessionsTimestamps").setParameter("timestamp", time).executeUpdate(); - num = em.createNamedQuery("updateUserSessionsTimestamps").setParameter("lastSessionRefresh", time).executeUpdate(); + public void removeExpired(RealmModel realm) { + int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS; + + String offlineStr = offlineToString(true); + + logger.tracef("Trigger removing expired user sessions for realm '%s'", realm.getName()); + + int cs = em.createNamedQuery("deleteExpiredClientSessions") + .setParameter("realmId", realm.getId()) + .setParameter("lastSessionRefresh", expiredOffline) + .setParameter("offline", offlineStr) + .executeUpdate(); + + int us = em.createNamedQuery("deleteExpiredUserSessions") + .setParameter("realmId", realm.getId()) + .setParameter("lastSessionRefresh", expiredOffline) + .setParameter("offline", offlineStr) + .executeUpdate(); + + logger.debugf("Removed %d expired user sessions and %d expired client sessions in realm '%s'", us, cs, realm.getName()); + } @Override - public List loadUserSessions(int firstResult, int maxResults, boolean offline) { + public List loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) { String offlineStr = offlineToString(offline); TypedQuery query = em.createNamedQuery("findUserSessions", PersistentUserSessionEntity.class); query.setParameter("offline", offlineStr); + query.setParameter("lastCreatedOn", lastCreatedOn); + query.setParameter("lastSessionId", lastUserSessionId); if (firstResult != -1) { query.setFirstResult(firstResult); @@ -239,26 +254,14 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv query.setMaxResults(maxResults); } - List results = query.getResultList(); - List result = new ArrayList<>(); - List userSessionIds = new ArrayList<>(); - for (PersistentUserSessionEntity entity : results) { - RealmModel realm = session.realms().getRealm(entity.getRealmId()); - try { - UserModel user = session.users().getUserById(entity.getUserId(), realm); - // Case when user was deleted in the meantime - if (user == null) { - onUserRemoved(realm, entity.getUserId()); - return loadUserSessions(firstResult, maxResults, offline); - } - } catch (Exception e) { - logger.debugv(e,"Failed to load user with id {0}", entity.getUserId()); - } + List result = query.getResultStream() + .map(this::toAdapter) + .collect(Collectors.toList()); + Map sessionsById = result.stream() + .collect(Collectors.toMap(UserSessionModel::getId, Function.identity())); - result.add(toAdapter(realm, entity)); - userSessionIds.add(entity.getUserSessionId()); - } + Set userSessionIds = sessionsById.keySet(); Set removedClientUUIDs = new HashSet<>(); @@ -268,28 +271,17 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv query2.setParameter("offline", offlineStr); List clientSessions = query2.getResultList(); - // Assume both userSessions and clientSessions ordered by userSessionId - int j = 0; - for (UserSessionModel ss : result) { - PersistentUserSessionAdapter userSession = (PersistentUserSessionAdapter) ss; - Map currentClientSessions = userSession.getAuthenticatedClientSessions(); // This is empty now and we want to fill it + for (PersistentClientSessionEntity clientSession : clientSessions) { + PersistentUserSessionAdapter userSession = sessionsById.get(clientSession.getUserSessionId()); - boolean next = true; - while (next && j < clientSessions.size()) { - PersistentClientSessionEntity clientSession = clientSessions.get(j); - if (clientSession.getUserSessionId().equals(userSession.getId())) { - PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession); + PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession); + Map currentClientSessions = userSession.getAuthenticatedClientSessions(); - // Case when client was removed in the meantime - if (clientSessAdapter.getClient() == null) { - removedClientUUIDs.add(clientSession.getClientId()); - } else { - currentClientSessions.put(clientSession.getClientId(), clientSessAdapter); - } - j++; - } else { - next = false; - } + // Case when client was removed in the meantime + if (clientSessAdapter.getClient() == null) { + removedClientUUIDs.add(clientSession.getClientId()); + } else { + currentClientSessions.put(clientSession.getClientId(), clientSessAdapter); } } } @@ -298,12 +290,18 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv onClientRemoved(clientUUID); } - return result; + return (List) result; + } + + private PersistentUserSessionAdapter toAdapter(PersistentUserSessionEntity entity) { + RealmModel realm = session.realms().getRealm(entity.getRealmId()); + return toAdapter(realm, entity); } private PersistentUserSessionAdapter toAdapter(RealmModel realm, PersistentUserSessionEntity entity) { PersistentUserSessionModel model = new PersistentUserSessionModel(); model.setUserSessionId(entity.getUserSessionId()); + model.setStarted(entity.getCreatedOn()); model.setLastSessionRefresh(entity.getLastSessionRefresh()); model.setData(entity.getData()); model.setOffline(offlineFromString(entity.getOffline())); diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java index 44c3188c85..5703f18403 100644 --- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java +++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentClientSessionEntity.java @@ -36,10 +36,9 @@ import java.io.Serializable; @NamedQuery(name="deleteClientSessionsByClientStorageProvider", query="delete from PersistentClientSessionEntity sess where sess.clientStorageProvider = :clientStorageProvider"), @NamedQuery(name="deleteClientSessionsByUser", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.userId = :userId)"), @NamedQuery(name="deleteClientSessionsByUserSession", query="delete from PersistentClientSessionEntity sess where sess.userSessionId = :userSessionId and sess.offline = :offline"), - @NamedQuery(name="deleteDetachedClientSessions", query="delete from PersistentClientSessionEntity sess where NOT EXISTS (select u.userSessionId from PersistentUserSessionEntity u where u.userSessionId = sess.userSessionId )"), + @NamedQuery(name="deleteExpiredClientSessions", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.realmId = :realmId AND u.offline = :offline AND u.lastSessionRefresh < :lastSessionRefresh)"), @NamedQuery(name="findClientSessionsByUserSession", query="select sess from PersistentClientSessionEntity sess where sess.userSessionId=:userSessionId and sess.offline = :offline"), - @NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId"), - @NamedQuery(name="updateClientSessionsTimestamps", query="update PersistentClientSessionEntity c set timestamp = :timestamp"), + @NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId") }) @Table(name="OFFLINE_CLIENT_SESSION") @Entity diff --git a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java index cc35be2822..ade3ea0e32 100644 --- a/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java +++ b/model/jpa/src/main/java/org/keycloak/models/jpa/session/PersistentUserSessionEntity.java @@ -34,10 +34,13 @@ import java.io.Serializable; @NamedQueries({ @NamedQuery(name="deleteUserSessionsByRealm", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId"), @NamedQuery(name="deleteUserSessionsByUser", query="delete from PersistentUserSessionEntity sess where sess.userId = :userId"), - @NamedQuery(name="deleteDetachedUserSessions", query="delete from PersistentUserSessionEntity sess where NOT EXISTS (select c.userSessionId from PersistentClientSessionEntity c where c.userSessionId = sess.userSessionId)"), + @NamedQuery(name="deleteExpiredUserSessions", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId AND sess.offline = :offline AND sess.lastSessionRefresh < :lastSessionRefresh"), + @NamedQuery(name="updateUserSessionLastSessionRefresh", query="update PersistentUserSessionEntity sess set lastSessionRefresh = :lastSessionRefresh where sess.realmId = :realmId" + + " AND sess.offline = :offline AND sess.userSessionId IN (:userSessionIds)"), @NamedQuery(name="findUserSessionsCount", query="select count(sess) from PersistentUserSessionEntity sess where sess.offline = :offline"), - @NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline order by sess.userSessionId"), - @NamedQuery(name="updateUserSessionsTimestamps", query="update PersistentUserSessionEntity c set lastSessionRefresh = :lastSessionRefresh"), + @NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline" + + " AND (sess.createdOn > :lastCreatedOn OR (sess.createdOn = :lastCreatedOn AND sess.userSessionId > :lastSessionId))" + + " order by sess.createdOn,sess.userSessionId") }) @Table(name="OFFLINE_USER_SESSION") @@ -55,6 +58,9 @@ public class PersistentUserSessionEntity { @Column(name="USER_ID", length = 255) protected String userId; + @Column(name = "CREATED_ON") + protected int createdOn; + @Column(name = "LAST_SESSION_REFRESH") protected int lastSessionRefresh; @@ -90,6 +96,14 @@ public class PersistentUserSessionEntity { this.userId = userId; } + public int getCreatedOn() { + return createdOn; + } + + public void setCreatedOn(int createdOn) { + this.createdOn = createdOn; + } + public int getLastSessionRefresh() { return lastSessionRefresh; } diff --git a/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml b/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml index f8c969330f..43a98713c8 100644 --- a/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml +++ b/model/jpa/src/main/resources/META-INF/jpa-changelog-4.7.0.xml @@ -19,9 +19,30 @@ - - + + + + + + + + + + + + + + + + + + + + + + + diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java b/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java index 10b28a28ba..f818f16f27 100644 --- a/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java +++ b/server-spi-private/src/main/java/org/keycloak/models/session/DisabledUserSessionPersisterProvider.java @@ -26,6 +26,7 @@ import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionModel; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -74,11 +75,6 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste } - @Override - public void updateUserSession(UserSessionModel userSession, boolean offline) { - - } - @Override public void removeUserSession(String userSessionId, boolean offline) { @@ -105,17 +101,17 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste } @Override - public void clearDetachedUserSessions() { + public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection userSessionIds, boolean offline) { } @Override - public void updateAllTimestamps(int time) { + public void removeExpired(RealmModel realm) { } @Override - public List loadUserSessions(int firstResult, int maxResults, boolean offline) { + public List loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) { return Collections.emptyList(); } diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java index 61d5818672..7fcec1c8be 100644 --- a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java +++ b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionAdapter.java @@ -54,12 +54,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel { data.setIpAddress(other.getIpAddress()); data.setNotes(other.getNotes()); data.setRememberMe(other.isRememberMe()); - data.setStarted(other.getStarted()); if (other.getState() != null) { data.setState(other.getState().toString()); } this.model = new PersistentUserSessionModel(); + this.model.setStarted(other.getStarted()); this.model.setUserSessionId(other.getId()); this.model.setLastSessionRefresh(other.getLastSessionRefresh()); @@ -157,7 +157,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel { @Override public int getStarted() { - return getData().getStarted(); + return model.getStarted(); } @Override @@ -274,6 +274,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel { @JsonProperty("rememberMe") private boolean rememberMe; + // TODO: Keeping those just for backwards compatibility. @JsonIgnoreProperties doesn't work on Wildfly - probably due to classloading issues @JsonProperty("started") private int started; @@ -323,10 +324,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel { this.rememberMe = rememberMe; } + @Deprecated public int getStarted() { return started; } + @Deprecated public void setStarted(int started) { this.started = started; } diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java index ced17684a8..508b81741b 100644 --- a/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java +++ b/server-spi-private/src/main/java/org/keycloak/models/session/PersistentUserSessionModel.java @@ -23,6 +23,7 @@ package org.keycloak.models.session; public class PersistentUserSessionModel { private String userSessionId; + private int started; private int lastSessionRefresh; private boolean offline; private String data; @@ -35,6 +36,14 @@ public class PersistentUserSessionModel { this.userSessionId = userSessionId; } + public int getStarted() { + return started; + } + + public void setStarted(int started) { + this.started = started; + } + public int getLastSessionRefresh() { return lastSessionRefresh; } diff --git a/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java b/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java index ba5a595f76..11c7567d03 100644 --- a/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java +++ b/server-spi-private/src/main/java/org/keycloak/models/session/UserSessionPersisterProvider.java @@ -24,6 +24,7 @@ import org.keycloak.models.UserModel; import org.keycloak.models.UserSessionModel; import org.keycloak.provider.Provider; +import java.util.Collection; import java.util.List; /** @@ -37,8 +38,6 @@ public interface UserSessionPersisterProvider extends Provider { // Assuming that corresponding userSession is already persisted void createClientSession(AuthenticatedClientSessionModel clientSession, boolean offline); - void updateUserSession(UserSessionModel userSession, boolean offline); - // Called during logout (for online session) or during periodic expiration. It will remove all corresponding clientSessions too void removeUserSession(String userSessionId, boolean offline); @@ -49,14 +48,14 @@ public interface UserSessionPersisterProvider extends Provider { void onClientRemoved(RealmModel realm, ClientModel client); void onUserRemoved(RealmModel realm, UserModel user); - // Called at startup to remove userSessions without any clientSession - void clearDetachedUserSessions(); + // Bulk update of lastSessionRefresh of all specified userSessions to the given value. + void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection userSessionIds, boolean offline); - // Update "lastSessionRefresh" of all userSessions and "timestamp" of all clientSessions to specified time - void updateAllTimestamps(int time); + // Remove userSessions and clientSessions, which are expired + void removeExpired(RealmModel realm); // Called during startup. For each userSession, it loads also clientSessions - List loadUserSessions(int firstResult, int maxResults, boolean offline); + List loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId); int getUserSessionsCount(boolean offline); diff --git a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java index ac1c22409b..61d563db40 100755 --- a/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java +++ b/server-spi/src/main/java/org/keycloak/models/UserSessionProvider.java @@ -19,6 +19,7 @@ package org.keycloak.models; import org.keycloak.provider.Provider; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.UUID; @@ -63,7 +64,7 @@ public interface UserSessionProvider extends Provider { void removeUserSession(RealmModel realm, UserSessionModel session); void removeUserSessions(RealmModel realm, UserModel user); - /** Implementation should propagate removal of expired userSessions to userSessionPersister too **/ + /** Implementation doesn't need to propagate removal of expired userSessions to userSessionPersister. Cleanup on persister will be called separately **/ void removeExpired(RealmModel realm); void removeUserSessions(RealmModel realm); @@ -89,8 +90,8 @@ public interface UserSessionProvider extends Provider { long getOfflineSessionsCount(RealmModel realm, ClientModel client); List getOfflineUserSessions(RealmModel realm, ClientModel client, int first, int max); - /** Triggered by persister during pre-load. It optionally imports authenticatedClientSessions too if requested. Otherwise the imported UserSession will have empty list of AuthenticationSessionModel **/ - UserSessionModel importUserSession(UserSessionModel persistentUserSession, boolean offline, boolean importAuthenticatedClientSessions); + /** Triggered by persister during pre-load. It imports authenticatedClientSessions too **/ + void importUserSessions(Collection persistentUserSessions, boolean offline); void close(); diff --git a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java index fc3c584f98..1311635471 100755 --- a/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java +++ b/services/src/main/java/org/keycloak/protocol/oidc/TokenManager.java @@ -309,7 +309,7 @@ public class TokenManager { if (clientSession.getCurrentRefreshToken() != null && !refreshToken.getId().equals(clientSession.getCurrentRefreshToken()) && refreshToken.getIssuedAt() < clientSession.getTimestamp() && - clusterStartupTime != clientSession.getTimestamp()) { + clusterStartupTime <= clientSession.getTimestamp()) { throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token"); } diff --git a/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java b/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java index e61969ad8f..4ebd599a66 100755 --- a/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java +++ b/services/src/main/java/org/keycloak/services/scheduled/ClearExpiredUserSessions.java @@ -20,6 +20,7 @@ package org.keycloak.services.scheduled; import org.keycloak.models.KeycloakSession; import org.keycloak.models.RealmModel; import org.keycloak.models.UserSessionProvider; +import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.timer.ScheduledTask; /** @@ -35,6 +36,7 @@ public class ClearExpiredUserSessions implements ScheduledTask { for (RealmModel realm : session.realms().getRealms()) { sessions.removeExpired(realm); session.authenticationSessions().removeExpired(realm); + session.getProvider(UserSessionPersisterProvider.class).removeExpired(realm); } } diff --git a/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java b/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java index fca6e02c17..7e936278d8 100644 --- a/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java +++ b/testsuite/integration-arquillian/servers/auth-server/services/testsuite-providers/src/main/java/org/keycloak/testsuite/rest/TestingResourceProvider.java @@ -44,7 +44,7 @@ import org.keycloak.models.UserCredentialModel; import org.keycloak.models.UserModel; import org.keycloak.models.UserProvider; import org.keycloak.models.UserSessionModel; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory; import org.keycloak.models.utils.ModelToRepresentation; import org.keycloak.protocol.oidc.OIDCLoginProtocol; import org.keycloak.protocol.oidc.mappers.AudienceProtocolMapper; @@ -695,8 +695,8 @@ public class TestingResourceProvider implements RealmResourceProvider { @Produces(MediaType.APPLICATION_JSON) public Response suspendPeriodicTasks() { suspendTask(ClearExpiredUserSessions.TASK_NAME); - suspendTask(LastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME); - suspendTask(LastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME); + suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME); + suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME); return Response.noContent().build(); } diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java index f10ca46511..51eb60caab 100644 --- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java +++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/session/LastSessionRefreshUnitTest.java @@ -31,8 +31,8 @@ import org.keycloak.common.util.Time; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.KeycloakSession; import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore; -import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore; +import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory; import org.keycloak.models.sessions.infinispan.changes.sessions.SessionData; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.representations.idm.RealmRepresentation; @@ -69,7 +69,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest { @Override public void run(KeycloakSession session) { - LastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000); + CrossDCLastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000); System.out.println("sss"); int lastSessionRefresh = Time.currentTime(); @@ -113,7 +113,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest { @Override public void run(KeycloakSession session) { // Long timer interval. No message due the timer wasn't executed - LastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10); + CrossDCLastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10); Time.setOffset(100); try { @@ -124,7 +124,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest { Assert.assertEquals(0, counter.get()); // Short timer interval 10 ms. 1 message due the interval is executed and lastRun was in the past due to Time.setOffset - LastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10); + CrossDCLastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10); Time.setOffset(200); Retry.execute(() -> { @@ -152,12 +152,12 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest { AtomicInteger counter = new AtomicInteger(); - LastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) { - LastSessionRefreshStoreFactory factory = new LastSessionRefreshStoreFactory() { + CrossDCLastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) { + CrossDCLastSessionRefreshStoreFactory factory = new CrossDCLastSessionRefreshStoreFactory() { @Override - protected LastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) { - return new LastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) { + protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) { + return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) { @Override protected void sendMessage(KeycloakSession kcSession, Map refreshesToSend) { diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java index 172dd3f7c2..a95e27ffbe 100644 --- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java +++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionInitializerTest.java @@ -97,9 +97,9 @@ public class UserSessionInitializerTest { List loadedSessions = session.sessions().getOfflineUserSessions(realm, testApp, 0, 10); UserSessionProviderTest.assertSessions(loadedSessions, origSessions); - UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "test-app", "third-party"); - UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, serverStartTime, "test-app"); - UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, serverStartTime, "test-app"); + UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "test-app", "third-party"); + UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app"); + UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app"); } @@ -130,7 +130,7 @@ public class UserSessionInitializerTest { List loadedSessions = session.sessions().getOfflineUserSessions(realm, thirdparty, 0, 10); Assert.assertEquals(1, loadedSessions.size()); - UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "third-party"); + UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "third-party"); // Revert client realm.addClient("test-app"); diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java index 8a3ffa99ed..7c10701173 100644 --- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java +++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionPersisterProviderTest.java @@ -22,6 +22,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.keycloak.cluster.ClusterProvider; import org.keycloak.common.util.Time; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; @@ -37,8 +38,12 @@ import org.keycloak.models.UserManager; import org.keycloak.testsuite.rule.KeycloakRule; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * @author Marek Posolda */ @@ -112,51 +117,6 @@ public class UserSessionPersisterProviderTest { assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app"); } - @Test - public void testUpdateTimestamps() { - // Create some sessions in infinispan - int started = Time.currentTime(); - UserSessionModel[] origSessions = createSessions(); - - resetSession(); - - // Persist 3 created userSessions and clientSessions as offline - ClientModel testApp = realm.getClientByClientId("test-app"); - List userSessions = session.sessions().getUserSessions(realm, testApp); - for (UserSessionModel userSession : userSessions) { - persistUserSession(userSession, true); - } - - // Persist 1 online session - UserSessionModel userSession = session.sessions().getUserSession(realm, origSessions[0].getId()); - persistUserSession(userSession, false); - - resetSession(); - - // update timestamps - int newTime = started + 50; - persister.updateAllTimestamps(newTime); - - // Assert online session - List loadedSessions = loadPersistedSessionsPaginated(false, 1, 1, 1); - Assert.assertEquals(2, assertTimestampsUpdated(loadedSessions, newTime)); - - // Assert offline sessions - loadedSessions = loadPersistedSessionsPaginated(true, 2, 2, 3); - Assert.assertEquals(4, assertTimestampsUpdated(loadedSessions, newTime)); - } - - private int assertTimestampsUpdated(List loadedSessions, int expectedTime) { - int clientSessionsCount = 0; - for (UserSessionModel loadedSession : loadedSessions) { - Assert.assertEquals(expectedTime, loadedSession.getLastSessionRefresh()); - for (AuthenticatedClientSessionModel clientSession : loadedSession.getAuthenticatedClientSessions().values()) { - Assert.assertEquals(expectedTime, clientSession.getTimestamp()); - clientSessionsCount++; - } - } - return clientSessionsCount; - } @Test public void testUpdateAndRemove() { @@ -177,48 +137,30 @@ public class UserSessionPersisterProviderTest { UserSessionModel persistedSession = loadedSessions.get(0); UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app"); - // Update userSession - Time.setOffset(10); - try { - persistedSession.setLastSessionRefresh(Time.currentTime()); - persistedSession.setNote("foo", "bar"); - persistedSession.setState(UserSessionModel.State.LOGGED_IN); - persister.updateUserSession(persistedSession, true); + // create new clientSession + AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()), + "http://redirect", "state"); + persister.createClientSession(clientSession, true); - // create new clientSession - AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()), - "http://redirect", "state"); - persister.createClientSession(clientSession, true); + resetSession(); - resetSession(); + // Remove clientSession + persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true); - // Assert session updated - loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1); - persistedSession = loadedSessions.get(0); - UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started+10, "test-app", "third-party"); - Assert.assertEquals("bar", persistedSession.getNote("foo")); - Assert.assertEquals(UserSessionModel.State.LOGGED_IN, persistedSession.getState()); + resetSession(); - // Remove clientSession - persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true); + // Assert clientSession removed + loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1); + persistedSession = loadedSessions.get(0); + UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started , "test-app"); - resetSession(); + // Remove userSession + persister.removeUserSession(persistedSession.getId(), true); - // Assert clientSession removed - loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1); - persistedSession = loadedSessions.get(0); - UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started + 10, "test-app"); + resetSession(); - // Remove userSession - persister.removeUserSession(persistedSession.getId(), true); - - resetSession(); - - // Assert nothing found - loadPersistedSessionsPaginated(true, 10, 0, 0); - } finally { - Time.setOffset(0); - } + // Assert nothing found + loadPersistedSessionsPaginated(true, 10, 0, 0); } @Test @@ -302,8 +244,8 @@ public class UserSessionPersisterProviderTest { resetSession(); - // Assert nothing loaded - userSession was removed as well because it was last userSession - loadPersistedSessionsPaginated(true, 10, 0, 0); + // Assert loading still works - last userSession is still there, but no clientSession on it + loadPersistedSessionsPaginated(true, 10, 1, 1); // Cleanup realmMgr = new RealmManager(session); @@ -340,23 +282,108 @@ public class UserSessionPersisterProviderTest { UserSessionModel persistedSession = loadedSessions.get(0); UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app"); - // KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly" + // KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly". + // No exception will happen. However session will be still there UserModel user2 = session.users().getUserByUsername("user2", realm); session.users().removeUser(realm, user2); - loadedSessions = loadPersistedSessionsPaginated(true, 10, 0, 0); + loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1); + // Cleanup + UserSessionModel userSession = loadedSessions.get(0); + session.sessions().removeUserSession(realm, userSession); + persister.removeUserSession(userSession.getId(), userSession.isOffline()); } // KEYCLOAK-1999 @Test public void testNoSessions() { UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class); - List sessions = persister.loadUserSessions(0, 1, true); + List sessions = persister.loadUserSessions(0, 1, true, 0, "abc"); Assert.assertEquals(0, sessions.size()); } + @Test + public void testMoreSessions() { + // Create 10 userSessions - each having 1 clientSession + List userSessions = new ArrayList<>(); + UserModel user = session.users().getUserByUsername("user1", realm); + + for (int i=0 ; i<20 ; i++) { + // Having different offsets for each session (to ensure that lastSessionRefresh is also different) + Time.setOffset(i); + + UserSessionModel userSession = session.sessions().createUserSession(realm, user, "user1", "127.0.0.1", "form", true, null, null); + createClientSession(realm.getClientByClientId("test-app"), userSession, "http://redirect", "state"); + userSessions.add(userSession); + } + + resetSession(); + + for (UserSessionModel userSession : userSessions) { + UserSessionModel userSession2 = session.sessions().getUserSession(realm, userSession.getId()); + persistUserSession(userSession2, true); + } + + resetSession(); + + List loadedSessions = loadPersistedSessionsPaginated(true, 2, 10, 20); + user = session.users().getUserByUsername("user1", realm); + ClientModel testApp = realm.getClientByClientId("test-app"); + + for (UserSessionModel loadedSession : loadedSessions) { + assertEquals(user.getId(), loadedSession.getUser().getId()); + assertEquals("127.0.0.1", loadedSession.getIpAddress()); + assertEquals(user.getUsername(), loadedSession.getLoginUsername()); + + assertEquals(1, loadedSession.getAuthenticatedClientSessions().size()); + assertTrue(loadedSession.getAuthenticatedClientSessions().containsKey(testApp.getId())); + } + } + + + @Test + public void testExpiredSessions() { + // Create some sessions in infinispan + int started = Time.currentTime(); + UserSessionModel[] origSessions = createSessions(); + + resetSession(); + + // Persist 2 offline sessions of 2 users + UserSessionModel userSession1 = session.sessions().getUserSession(realm, origSessions[1].getId()); + UserSessionModel userSession2 = session.sessions().getUserSession(realm, origSessions[2].getId()); + persistUserSession(userSession1, true); + persistUserSession(userSession2, true); + + resetSession(); + + // Update one of the sessions with lastSessionRefresh of 20 days ahead + int lastSessionRefresh = Time.currentTime() + 1728000; + persister.updateLastSessionRefreshes(realm, lastSessionRefresh, Collections.singleton(userSession1.getId()), true); + + resetSession(); + + // Increase time offset - 40 days + Time.setOffset(3456000); + try { + // Run expiration thread + persister.removeExpired(realm); + + // Test the updated session is still in persister. Not updated session is not there anymore + List loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1); + UserSessionModel persistedSession = loadedSessions.get(0); + UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, lastSessionRefresh, "test-app"); + + } finally { + // Cleanup + Time.setOffset(0); + } + + } + + private AuthenticatedClientSessionModel createClientSession(ClientModel client, UserSessionModel userSession, String redirect, String state) { AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, client, userSession); clientSession.setRedirectUri(redirect); @@ -407,19 +434,32 @@ public class UserSessionPersisterProviderTest { private List loadPersistedSessionsPaginated(boolean offline, int sessionsPerPage, int expectedPageCount, int expectedSessionsCount) { int count = persister.getUserSessionsCount(offline); - int start = 0; + int pageCount = 0; boolean next = true; List result = new ArrayList<>(); - while (next && start < count) { - List sess = persister.loadUserSessions(start, sessionsPerPage, offline); - if (sess.size() == 0) { + int lastCreatedOn = 0; + String lastSessionId = "abc"; + + while (next) { + List sess = persister.loadUserSessions(0, sessionsPerPage, offline, lastCreatedOn, lastSessionId); + + if (sess.size() < sessionsPerPage) { next = false; + + // We had at least some session + if (sess.size() > 0) { + pageCount++; + } } else { pageCount++; - start += sess.size(); - result.addAll(sess); + + UserSessionModel lastSession = sess.get(sess.size() - 1); + lastCreatedOn = lastSession.getStarted(); + lastSessionId = lastSession.getId(); } + + result.addAll(sess); } Assert.assertEquals(pageCount, expectedPageCount); diff --git a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java index 4f3ca2011a..381de7569a 100644 --- a/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java +++ b/testsuite/integration-deprecated/src/test/java/org/keycloak/testsuite/model/UserSessionProviderOfflineTest.java @@ -363,24 +363,45 @@ public class UserSessionProviderOfflineTest { // sessions are in persister too Assert.assertEquals(3, persister.getUserSessionsCount(true)); - // Set lastSessionRefresh to session[0] to 0 - session0.setLastSessionRefresh(0); - - resetSession(); - - session.sessions().removeExpired(realm); - - resetSession(); - - // assert session0 not found now - Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId())); - - Assert.assertEquals(2, persister.getUserSessionsCount(true)); - - // Expire everything and assert nothing found - Time.setOffset(3000000); + // Increase timeOffset - 5 minutes + Time.setOffset(300); try { + + // Update lastSessionRefresh of session0. This will update lastSessionRefresh of all the sessions to DB as they were not yet updated to DB + session0.setLastSessionRefresh(Time.currentTime()); + + resetSession(); + + // Increase timeOffset - 20 days + Time.setOffset(1728000); + + session0 = session.sessions().getOfflineUserSession(realm, origSessions[0].getId()); + session0.setLastSessionRefresh(Time.currentTime()); + + resetSession(); + + // Increase timeOffset - 40 days + Time.setOffset(3456000); + + // Expire and ensure that all sessions despite session0 were removed + session.sessions().removeExpired(realm); + persister.removeExpired(realm); + + resetSession(); + + // assert session0 is the only session found + Assert.assertNotNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId())); + Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[1].getId())); + Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[2].getId())); + + Assert.assertEquals(1, persister.getUserSessionsCount(true)); + + // Expire everything and assert nothing found + Time.setOffset(6000000); + + session.sessions().removeExpired(realm); + persister.removeExpired(realm); resetSession(); diff --git a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java index 0c1a506ec4..796e3c69cf 100644 --- a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java +++ b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/LoadPersistentSessionsCommand.java @@ -17,9 +17,17 @@ package org.keycloak.testsuite.util.cli; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + import org.keycloak.models.KeycloakSession; -import org.keycloak.models.UserSessionProvider; -import org.keycloak.models.UserSessionProviderFactory; +import org.keycloak.models.KeycloakSessionFactory; +import org.keycloak.models.UserSessionModel; +import org.keycloak.models.session.UserSessionPersisterProvider; +import org.keycloak.models.utils.KeycloakModelUtils; /** * @author Marek Posolda @@ -33,15 +41,97 @@ public class LoadPersistentSessionsCommand extends AbstractCommand { @Override protected void doRunCommand(KeycloakSession session) { - int sessionsPerSegment = getIntArg(0); - UserSessionProviderFactory sessionProviderFactory = (UserSessionProviderFactory) sessionFactory.getProviderFactory(UserSessionProvider.class); - sessionProviderFactory.loadPersistentSessions(sessionFactory, 10, sessionsPerSegment); + final int workersCount = getIntArg(0); + final int limit = getIntArg(1); + //int workersCount = 8; + //int limit = 64; + + AtomicInteger lastCreatedOn = new AtomicInteger(0); + AtomicReference lastSessionId = new AtomicReference<>("abc"); + + AtomicBoolean finished = new AtomicBoolean(false); + int i=0; + + while (!finished.get()) { + if (i % 16 == 0) { + log.infof("Starting iteration: %s . lastCreatedOn: %d, lastSessionId: %s", i, lastCreatedOn.get(), lastSessionId.get()); + } + + i = i + workersCount; + List workers = new LinkedList<>(); + MyWorker lastWorker = null; + + for (int workerId = 0 ; workerId < workersCount ; workerId++) { + lastWorker = new MyWorker(workerId, lastCreatedOn.get(), lastSessionId.get(), limit, sessionFactory); + Thread worker = new Thread(lastWorker); + workers.add(worker); + } + + for (Thread worker : workers) { + worker.start(); + } + for (Thread worker : workers) { + try { + worker.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + List lastWorkerSessions = lastWorker.getLoadedSessions(); + + if (lastWorkerSessions.size() < limit) { + finished.set(true); + } else { + UserSessionModel lastSession = lastWorkerSessions.get(lastWorkerSessions.size() - 1); + lastCreatedOn.set(lastSession.getStarted()); + lastSessionId.set(lastSession.getId()); + } + + + } log.info("All persistent sessions loaded successfully"); } @Override public String printUsage() { - return super.printUsage() + " "; + return super.printUsage() + " "; + } + + + private class MyWorker implements Runnable { + + private final int workerId; + private final int lastCreatedOn; + private final String lastSessionId; + private final int limit; + private final KeycloakSessionFactory sessionFactory; + + private List loadedSessions = new LinkedList<>(); + + public MyWorker(int workerId, int lastCreatedOn, String lastSessionId, int limit, KeycloakSessionFactory sessionFactory) { + this.workerId = workerId; + this.lastCreatedOn = lastCreatedOn; + this.lastSessionId = lastSessionId; + this.limit = limit; + this.sessionFactory = sessionFactory; + } + + @Override + public void run() { + KeycloakModelUtils.runJobInTransaction(sessionFactory, (keycloakSession) -> { + int offset = workerId * limit; + + UserSessionPersisterProvider persister = keycloakSession.getProvider(UserSessionPersisterProvider.class); + loadedSessions = persister.loadUserSessions(offset, limit, true, lastCreatedOn, lastSessionId); + + }); + } + + + private List getLoadedSessions() { + return loadedSessions; + } } } diff --git a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java index 1e4cb48e17..6b8dd25e98 100644 --- a/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java +++ b/testsuite/utils/src/main/java/org/keycloak/testsuite/util/cli/PersistSessionsCommand.java @@ -19,6 +19,7 @@ package org.keycloak.testsuite.util.cli; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; @@ -36,6 +37,8 @@ import org.keycloak.models.utils.KeycloakModelUtils; */ public class PersistSessionsCommand extends AbstractCommand { + private AtomicInteger userCounter = new AtomicInteger(); + @Override public String getName() { return "persistSessions"; @@ -75,12 +78,18 @@ public class PersistSessionsCommand extends AbstractCommand { @Override public void run(KeycloakSession session) { RealmModel realm = session.realms().getRealmByName("master"); - UserModel john = session.users().getUserByUsername("admin", realm); + ClientModel testApp = realm.getClientByClientId("security-admin-console"); UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class); for (int i = 0; i < countInThisBatch; i++) { - UserSessionModel userSession = session.sessions().createUserSession(realm, john, "john-doh@localhost", "127.0.0.2", "form", true, null, null); + String username = "john-" + userCounter.incrementAndGet(); + UserModel john = session.users().getUserByUsername(username, realm); + if (john == null) { + john = session.users().addUser(realm, username); + } + + UserSessionModel userSession = session.sessions().createUserSession(realm, john, username, "127.0.0.2", "form", true, null, null); AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, testApp, userSession); clientSession.setRedirectUri("http://redirect"); clientSession.setNote("foo", "bar-" + i);