KEYCLOAK-7275 KEYCLOAK-5479 Faster offline sessions preloading at startup. Track lastSessionRefresh timestamps more properly by support bulk update to DB

This commit is contained in:
mposolda 2018-11-01 13:54:32 +01:00 committed by Marek Posolda
parent 3bdbbf41af
commit 0533782d90
44 changed files with 1381 additions and 612 deletions

View file

@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@ -33,7 +32,7 @@ import org.keycloak.models.sessions.infinispan.changes.ClientSessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import java.util.UUID;
@ -150,7 +149,7 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
@Override
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<AuthenticatedClientSessionEntity> sessionWrapper) {
return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
}

View file

@ -19,10 +19,13 @@ package org.keycloak.models.sessions.infinispan;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.infinispan.stream.CacheCollectors;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.ObjectUtil;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@ -35,7 +38,8 @@ import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
@ -61,6 +65,8 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
import org.keycloak.models.utils.SessionTimeoutHelper;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -69,11 +75,11 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -100,15 +106,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
protected final SessionEventsSenderTransaction clusterEventsSenderTx;
protected final LastSessionRefreshStore lastSessionRefreshStore;
protected final LastSessionRefreshStore offlineLastSessionRefreshStore;
protected final CrossDCLastSessionRefreshStore lastSessionRefreshStore;
protected final CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
protected final PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
protected final RemoteCacheInvoker remoteCacheInvoker;
protected final InfinispanKeyGenerator keyGenerator;
public InfinispanUserSessionProvider(KeycloakSession session,
RemoteCacheInvoker remoteCacheInvoker,
LastSessionRefreshStore lastSessionRefreshStore,
LastSessionRefreshStore offlineLastSessionRefreshStore,
CrossDCLastSessionRefreshStore lastSessionRefreshStore,
CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore,
PersisterLastSessionRefreshStore persisterLastSessionRefreshStore,
InfinispanKeyGenerator keyGenerator,
Cache<String, SessionEntityWrapper<UserSessionEntity>> sessionCache,
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
@ -134,6 +143,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
this.lastSessionRefreshStore = lastSessionRefreshStore;
this.offlineLastSessionRefreshStore = offlineLastSessionRefreshStore;
this.persisterLastSessionRefreshStore = persisterLastSessionRefreshStore;
this.remoteCacheInvoker = remoteCacheInvoker;
this.keyGenerator = keyGenerator;
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
@ -160,14 +171,18 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
return offline ? offlineClientSessionTx : clientSessionTx;
}
protected LastSessionRefreshStore getLastSessionRefreshStore() {
protected CrossDCLastSessionRefreshStore getLastSessionRefreshStore() {
return lastSessionRefreshStore;
}
protected LastSessionRefreshStore getOfflineLastSessionRefreshStore() {
protected CrossDCLastSessionRefreshStore getOfflineLastSessionRefreshStore() {
return offlineLastSessionRefreshStore;
}
protected PersisterLastSessionRefreshStore getPersisterLastSessionRefreshStore() {
return persisterLastSessionRefreshStore;
}
@Override
public AuthenticatedClientSessionModel createClientSession(RealmModel realm, ClientModel client, UserSessionModel userSession) {
final UUID clientSessionId = keyGenerator.generateKeyUUID(session, clientSessionCache);
@ -535,7 +550,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
private void removeExpiredOfflineUserSessions(RealmModel realm) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS;
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
@ -570,8 +584,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
futures.addTask(f);
});
// TODO:mposolda can be likely optimized to delete all expired at one step
persister.removeUserSession( userSessionEntity.getId(), true);
}
});
@ -796,7 +808,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public UserSessionModel createOfflineUserSession(UserSessionModel userSession) {
UserSessionAdapter offlineUserSession = importUserSession(userSession, true, false);
UserSessionAdapter offlineUserSession = importUserSession(userSession, true);
// started and lastSessionRefresh set to current time
int currentTime = Time.currentTime();
@ -866,8 +878,117 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
return getUserSessions(realm, client, first, max, true);
}
@Override
public UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline, boolean importAuthenticatedClientSessions) {
public void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline) {
if (persistentUserSessions == null || persistentUserSessions.isEmpty()) {
return;
}
Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsById = new HashMap<>();
Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsById = persistentUserSessions.stream()
.map((UserSessionModel persistentUserSession) -> {
UserSessionEntity userSessionEntityToImport = createUserSessionEntityInstance(persistentUserSession);
for (Map.Entry<String, AuthenticatedClientSessionModel> entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) {
String clientUUID = entry.getKey();
AuthenticatedClientSessionModel clientSession = entry.getValue();
AuthenticatedClientSessionEntity clientSessionToImport = createAuthenticatedClientSessionInstance(clientSession, userSessionEntityToImport.getRealmId(), offline);
// Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
clientSessionToImport.setTimestamp(userSessionEntityToImport.getLastSessionRefresh());
clientSessionsById.put(clientSessionToImport.getId(), new SessionEntityWrapper<>(clientSessionToImport));
// Update userSession entity with the clientSession
AuthenticatedClientSessionStore clientSessions = userSessionEntityToImport.getAuthenticatedClientSessions();
clientSessions.put(clientUUID, clientSessionToImport.getId());
}
return userSessionEntityToImport;
})
.map(SessionEntityWrapper::new)
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
// Directly put all entities to the infinispan cache
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = CacheDecorators.skipCacheLoaders(getCache(offline));
cache.putAll(sessionsById);
// put all entities to the remoteCache (if exists)
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
if (remoteCache != null) {
Map<String, SessionEntityWrapper<UserSessionEntity>> sessionsByIdForTransport = sessionsById.values().stream()
.map(SessionEntityWrapper::forTransport)
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
Retry.executeWithBackoff((int iteration) -> {
try {
remoteCache.putAll(sessionsByIdForTransport);
} catch (HotRodClientException re) {
if (log.isDebugEnabled()) {
log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
sessionsByIdForTransport.size(), iteration);
}
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}
}, 10, 10);
}
// Import client sessions
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessCache = offline ? offlineClientSessionCache : clientSessionCache;
clientSessCache = CacheDecorators.skipCacheLoaders(clientSessCache);
clientSessCache.putAll(clientSessionsById);
// put all entities to the remoteCache (if exists)
RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache);
if (remoteCacheClientSessions != null) {
Map<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> sessionsByIdForTransport = clientSessionsById.values().stream()
.map(SessionEntityWrapper::forTransport)
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
Retry.executeWithBackoff((int iteration) -> {
try {
remoteCacheClientSessions.putAll(sessionsByIdForTransport);
} catch (HotRodClientException re) {
if (log.isDebugEnabled()) {
log.debugf(re, "Failed to put import %d client sessions to remoteCache. Iteration '%s'. Will try to retry the task",
sessionsByIdForTransport.size(), iteration);
}
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}
}, 10, 10);
}
}
// Imports just userSession without it's clientSessions
protected UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline) {
UserSessionEntity entity = createUserSessionEntityInstance(userSession);
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
userSessionUpdateTx.addTask(userSession.getId(), importTask, entity);
UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline);
return importedSession;
}
private UserSessionEntity createUserSessionEntityInstance(UserSessionModel userSession) {
UserSessionEntity entity = new UserSessionEntity();
entity.setId(userSession.getId());
entity.setRealmId(userSession.getRealm().getId());
@ -896,22 +1017,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
entity.setStarted(userSession.getStarted());
entity.setLastSessionRefresh(userSession.getLastSessionRefresh());
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
userSessionUpdateTx.addTask(userSession.getId(), importTask, entity);
UserSessionAdapter importedSession = wrap(userSession.getRealm(), entity, offline);
// Handle client sessions
if (importAuthenticatedClientSessions) {
for (AuthenticatedClientSessionModel clientSession : userSession.getAuthenticatedClientSessions().values()) {
importClientSession(importedSession, clientSession, userSessionUpdateTx, clientSessionUpdateTx, offline);
}
}
return importedSession;
return entity;
}
@ -919,16 +1025,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx,
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx,
boolean offline) {
final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline));
AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
entity.setRealmId(sessionToImportInto.getRealm().getId());
entity.setAction(clientSession.getAction());
entity.setAuthMethod(clientSession.getProtocol());
entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
entity.setRedirectUri(clientSession.getRedirectUri());
entity.setTimestamp(clientSession.getTimestamp());
AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(clientSession, sessionToImportInto.getRealm().getId(), offline);
final UUID clientSessionId = entity.getId();
SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
clientSessionUpdateTx.addTask(entity.getId(), createClientSessionTask, entity);
@ -942,6 +1040,22 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
return new AuthenticatedClientSessionAdapter(session,this, entity, clientSession.getClient(), sessionToImportInto, userSessionUpdateTx, clientSessionUpdateTx, offline);
}
private AuthenticatedClientSessionEntity createAuthenticatedClientSessionInstance(AuthenticatedClientSessionModel clientSession, String realmId, boolean offline) {
final UUID clientSessionId = keyGenerator.generateKeyUUID(session, getClientSessionCache(offline));
AuthenticatedClientSessionEntity entity = new AuthenticatedClientSessionEntity(clientSessionId);
entity.setRealmId(realmId);
entity.setAction(clientSession.getAction());
entity.setAuthMethod(clientSession.getProtocol());
entity.setNotes(clientSession.getNotes() == null ? new ConcurrentHashMap<>() : clientSession.getNotes());
entity.setRedirectUri(clientSession.getRedirectUri());
entity.setTimestamp(clientSession.getTimestamp());
return entity;
}
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
private final String clientUuid;

View file

@ -32,8 +32,10 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UserSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.initializer.CacheInitializer;
import org.keycloak.models.sessions.infinispan.initializer.DBLockBasedCacheInitializer;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
@ -80,8 +82,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
private Config.Scope config;
private RemoteCacheInvoker remoteCacheInvoker;
private LastSessionRefreshStore lastSessionRefreshStore;
private LastSessionRefreshStore offlineLastSessionRefreshStore;
private CrossDCLastSessionRefreshStore lastSessionRefreshStore;
private CrossDCLastSessionRefreshStore offlineLastSessionRefreshStore;
private PersisterLastSessionRefreshStore persisterLastSessionRefreshStore;
private InfinispanKeyGenerator keyGenerator;
@Override
@ -93,7 +96,8 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
Cache<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> loginFailures = connections.getCache(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME);
return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore, keyGenerator,
return new InfinispanUserSessionProvider(session, remoteCacheInvoker, lastSessionRefreshStore, offlineLastSessionRefreshStore,
persisterLastSessionRefreshStore, keyGenerator,
cache, offlineSessionsCache, clientSessionCache, offlineClientSessionsCache, loginFailures);
}
@ -169,6 +173,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
initializer.initCache();
initializer.loadSessions();
// Initialize persister for periodically doing bulk DB updates of lastSessionRefresh timestamps of refreshed sessions
persisterLastSessionRefreshStore = new PersisterLastSessionRefreshStoreFactory().createAndInit(session, true);
}
});
@ -233,7 +240,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
});
if (sessionsRemoteCache) {
lastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false);
lastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, sessionsCache, false);
}
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionsCache = ispn.getCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME);
@ -248,7 +255,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
});
if (offlineSessionsRemoteCache) {
offlineLastSessionRefreshStore = new LastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true);
offlineLastSessionRefreshStore = new CrossDCLastSessionRefreshStoreFactory().createAndInit(session, offlineSessionsCache, true);
}
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = ispn.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);

View file

@ -24,10 +24,11 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshChecker;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.Tasks;
import org.keycloak.models.sessions.infinispan.changes.UserSessionUpdateTask;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
@ -42,6 +43,8 @@ import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
@ -207,6 +210,15 @@ public class UserSessionAdapter implements UserSessionModel {
}
public void setLastSessionRefresh(int lastSessionRefresh) {
if (offline) {
// Received the message from the other DC that we should update the lastSessionRefresh in local cluster. Don't update DB in that case.
// The other DC already did.
Boolean ignoreRemoteCacheUpdate = (Boolean) session.getAttribute(CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE);
if (ignoreRemoteCacheUpdate == null || !ignoreRemoteCacheUpdate) {
provider.getPersisterLastSessionRefreshStore().putLastSessionRefresh(session, entity.getId(), realm.getId(), lastSessionRefresh);
}
}
UserSessionUpdateTask task = new UserSessionUpdateTask() {
@Override
@ -216,7 +228,7 @@ public class UserSessionAdapter implements UserSessionModel {
@Override
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<UserSessionEntity> sessionWrapper) {
return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
return new CrossDCLastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
}

View file

@ -20,34 +20,27 @@ package org.keycloak.models.sessions.infinispan.changes.sessions;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
/**
* Tracks the queue of lastSessionRefreshes, which were updated on this host. Those will be sent to the second DC in bulk, so second DC can update
* lastSessionRefreshes on it's side. Message is sent either periodically or if there are lots of stored lastSessionRefreshes.
* Abstract "store" for bulk sending of the updates related to lastSessionRefresh
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LastSessionRefreshStore {
protected static final Logger logger = Logger.getLogger(LastSessionRefreshStore.class);
public abstract class AbstractLastSessionRefreshStore {
private final int maxIntervalBetweenMessagesSeconds;
private final int maxCount;
private final String eventKey;
private volatile Map<String, SessionData> lastSessionRefreshes = new ConcurrentHashMap<>();
private volatile int lastRun = Time.currentTime();
protected LastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
protected AbstractLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount) {
this.maxIntervalBetweenMessagesSeconds = maxIntervalBetweenMessagesSeconds;
this.maxCount = maxCount;
this.eventKey = eventKey;
}
@ -86,16 +79,11 @@ public class LastSessionRefreshStore {
}
protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
if (logger.isDebugEnabled()) {
logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
}
// Don't notify local DC about the lastSessionRefreshes. They were processed here already
ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
cluster.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC);
}
/**
* Bulk update the underlying store with all the user sessions, which were refreshed by Keycloak since the last call of this method
*
* @param kcSession
* @param refreshesToSend Key is userSession ID, SessionData are data about the session
*/
protected abstract void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend);
}

View file

@ -17,19 +17,15 @@
package org.keycloak.models.sessions.infinispan.changes.sessions;
import org.infinispan.Cache;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.utils.SessionTimeoutHelper;
import org.keycloak.timer.TimerProvider;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LastSessionRefreshStoreFactory {
public abstract class AbstractLastSessionRefreshStoreFactory {
// Timer interval. The store will be checked every 5 seconds whether the message with stored lastSessionRefreshes should be sent
public static final long DEFAULT_TIMER_INTERVAL_MS = 5000;
@ -40,40 +36,14 @@ public class LastSessionRefreshStoreFactory {
// Max count of lastSessionRefreshes. If count of lastSessionRefreshes reach this value, the message is sent to second DC
public static final int DEFAULT_MAX_COUNT = 100;
// Name of periodic tasks to send events to the other DCs
public static final String LSR_PERIODIC_TASK_NAME = "lastSessionRefreshes";
public static final String LSR_OFFLINE_PERIODIC_TASK_NAME = "lastSessionRefreshes-offline";
public LastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, boolean offline) {
return createAndInit(kcSession, cache, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
}
public LastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
String eventKey = offline ? LSR_OFFLINE_PERIODIC_TASK_NAME : LSR_PERIODIC_TASK_NAME;
LastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
// Register listener
ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
cluster.registerListener(eventKey, new LastSessionRefreshListener(kcSession, cache, offline));
// Setup periodic timer check
protected void setupPeriodicTimer(KeycloakSession kcSession, AbstractLastSessionRefreshStore store, long timerIntervalMs, String eventKey) {
TimerProvider timer = kcSession.getProvider(TimerProvider.class);
timer.scheduleTask((KeycloakSession keycloakSession) -> {
store.checkSendingMessage(keycloakSession, Time.currentTime());
}, timerIntervalMs, eventKey);
return store;
}
protected LastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
return new LastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
}
}

View file

@ -23,7 +23,6 @@ import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.AuthenticatedClientSessionAdapter;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
@ -32,15 +31,15 @@ import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LastSessionRefreshChecker {
public class CrossDCLastSessionRefreshChecker {
public static final Logger logger = Logger.getLogger(LastSessionRefreshChecker.class);
public static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshChecker.class);
private final LastSessionRefreshStore store;
private final LastSessionRefreshStore offlineStore;
private final CrossDCLastSessionRefreshStore store;
private final CrossDCLastSessionRefreshStore offlineStore;
public LastSessionRefreshChecker(LastSessionRefreshStore store, LastSessionRefreshStore offlineStore) {
public CrossDCLastSessionRefreshChecker(CrossDCLastSessionRefreshStore store, CrossDCLastSessionRefreshStore offlineStore) {
this.store = store;
this.offlineStore = offlineStore;
}
@ -73,7 +72,7 @@ public class LastSessionRefreshChecker {
logger.debugf("Skip writing last session refresh to the remoteCache. Session %s newLastSessionRefresh %d", userSessionId, newLastSessionRefresh);
}
LastSessionRefreshStore storeToUse = offline ? offlineStore : store;
CrossDCLastSessionRefreshStore storeToUse = offline ? offlineStore : store;
storeToUse.putLastSessionRefresh(kcSession, userSessionId, realm.getId(), newLastSessionRefresh);
return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED;
@ -118,13 +117,13 @@ public class LastSessionRefreshChecker {
}
// We're likely not in cross-dc environment. Doesn't matter what we return
LastSessionRefreshStore storeToUse = offline ? offlineStore : store;
CrossDCLastSessionRefreshStore storeToUse = offline ? offlineStore : store;
if (storeToUse == null) {
return SessionUpdateTask.CrossDCMessageStatus.SYNC;
}
// Received the message from the other DC that we should update the lastSessionRefresh in local cluster
Boolean ignoreRemoteCacheUpdate = (Boolean) kcSession.getAttribute(LastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE);
Boolean ignoreRemoteCacheUpdate = (Boolean) kcSession.getAttribute(CrossDCLastSessionRefreshListener.IGNORE_REMOTE_CACHE_UPDATE);
if (ignoreRemoteCacheUpdate != null && ignoreRemoteCacheUpdate) {
return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED;
}

View file

@ -36,9 +36,9 @@ import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LastSessionRefreshListener implements ClusterListener {
public class CrossDCLastSessionRefreshListener implements ClusterListener {
public static final Logger logger = Logger.getLogger(LastSessionRefreshListener.class);
public static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshListener.class);
public static final String IGNORE_REMOTE_CACHE_UPDATE = "IGNORE_REMOTE_CACHE_UPDATE";
@ -48,7 +48,7 @@ public class LastSessionRefreshListener implements ClusterListener {
private final Cache<String, SessionEntityWrapper<UserSessionEntity>> cache;
private final TopologyInfo topologyInfo;
public LastSessionRefreshListener(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, boolean offline) {
public CrossDCLastSessionRefreshListener(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, boolean offline) {
this.sessionFactory = session.getKeycloakSessionFactory();
this.cache = cache;
this.offline = offline;

View file

@ -0,0 +1,58 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.sessions;
import java.util.Map;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.KeycloakSession;
/**
* Cross-DC based CrossDCLastSessionRefreshStore
*
* Tracks the queue of lastSessionRefreshes, which were updated on this host. Those will be sent to the second DC in bulk, so second DC can update
* lastSessionRefreshes on it's side. Message is sent either periodically or if there are lots of stored lastSessionRefreshes.
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class CrossDCLastSessionRefreshStore extends AbstractLastSessionRefreshStore {
protected static final Logger logger = Logger.getLogger(CrossDCLastSessionRefreshStore.class);
private final String eventKey;
protected CrossDCLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
super(maxIntervalBetweenMessagesSeconds, maxCount);
this.eventKey = eventKey;
}
protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
if (logger.isDebugEnabled()) {
logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
}
// Don't notify local DC about the lastSessionRefreshes. They were processed here already
ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
cluster.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC);
}
}

View file

@ -0,0 +1,63 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.sessions;
import org.infinispan.Cache;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class CrossDCLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory {
// Name of periodic tasks to send events to the other DCs
public static final String LSR_PERIODIC_TASK_NAME = "lastSessionRefreshes";
public static final String LSR_OFFLINE_PERIODIC_TASK_NAME = "lastSessionRefreshes-offline";
public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, boolean offline) {
return createAndInit(kcSession, cache, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
}
public CrossDCLastSessionRefreshStore createAndInit(KeycloakSession kcSession, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache,
long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
String eventKey = offline ? LSR_OFFLINE_PERIODIC_TASK_NAME : LSR_PERIODIC_TASK_NAME;
CrossDCLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
// Register listener
ClusterProvider cluster = kcSession.getProvider(ClusterProvider.class);
cluster.registerListener(eventKey, new CrossDCLastSessionRefreshListener(kcSession, cache, offline));
// Setup periodic timer check
setupPeriodicTimer(kcSession, store, timerIntervalMs, eventKey);
return store;
}
protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey);
}
}

View file

@ -0,0 +1,77 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.sessions;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.utils.SessionTimeoutHelper;
/**
* The store is supposed to do periodic bulk update of lastSessionRefresh times of all userSessions, which were refreshed during some period
* of time. The updates are sent to UserSessionPersisterProvider (DB)
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class PersisterLastSessionRefreshStore extends AbstractLastSessionRefreshStore {
protected static final Logger logger = Logger.getLogger(PersisterLastSessionRefreshStore.class);
private final boolean offline;
protected PersisterLastSessionRefreshStore(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
super(maxIntervalBetweenMessagesSeconds, maxCount);
this.offline = offline;
}
protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {
Map<String, Set<String>> sessionIdsByRealm =
refreshesToSend.entrySet().stream().collect(
Collectors.groupingBy(entry -> entry.getValue().getRealmId(),
Collectors.mapping(Map.Entry::getKey, Collectors.toSet())));
// Update DB with a bit lower value than current time to ensure 'revokeRefreshToken' will work correctly taking server
int lastSessionRefresh = Time.currentTime() - SessionTimeoutHelper.PERIODIC_TASK_INTERVAL_SECONDS;
if (logger.isDebugEnabled()) {
logger.debugf("Updating %d userSessions with lastSessionRefresh: %d", refreshesToSend.size(), lastSessionRefresh);
}
UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class);
for (Map.Entry<String, Set<String>> entry : sessionIdsByRealm.entrySet()) {
RealmModel realm = kcSession.realms().getRealm(entry.getKey());
// Case when realm was deleted in the meantime. UserSessions were already deleted as well (callback for realm deletion)
if (realm == null) {
continue;
}
Set<String> userSessionIds = entry.getValue();
persister.updateLastSessionRefreshes(realm, lastSessionRefresh, userSessionIds, offline);
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.sessions;
import org.infinispan.Cache;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class PersisterLastSessionRefreshStoreFactory extends AbstractLastSessionRefreshStoreFactory {
public PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession, boolean offline) {
return createAndInit(kcSession, DEFAULT_TIMER_INTERVAL_MS, DEFAULT_MAX_INTERVAL_BETWEEN_MESSAGES_SECONDS, DEFAULT_MAX_COUNT, offline);
}
private PersisterLastSessionRefreshStore createAndInit(KeycloakSession kcSession,
long timerIntervalMs, int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
PersisterLastSessionRefreshStore store = createStoreInstance(maxIntervalBetweenMessagesSeconds, maxCount, offline);
// Setup periodic timer check
setupPeriodicTimer(kcSession, store, timerIntervalMs, "db-last-session-refresh");
return store;
}
protected PersisterLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, boolean offline) {
return new PersisterLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, offline);
}
}

View file

@ -84,7 +84,6 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
});
state = new InitializerState(ctx[0].getSegmentsCount());
saveStateToCache(state);
} else {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
@ -102,7 +101,7 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
}
protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext ctx) {
protected void startLoadingImpl(InitializerState state, SessionLoader.LoaderContext loaderCtx) {
// Assume each worker has same processor's count
int processors = Runtime.getRuntime().availableProcessors();
@ -114,6 +113,8 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
int errors = 0;
try {
List<SessionLoader.WorkerResult> previousResults = new LinkedList<>();
while (!state.isFinished()) {
int nodesCount = transport==null ? 1 : transport.getMembers().size();
int distributedWorkersCount = processors * nodesCount;
@ -126,34 +127,43 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
log.trace("unfinished segments for this iteration: " + segments);
}
List<Future<WorkerResult>> futures = new LinkedList<>();
List<Future<SessionLoader.WorkerResult>> futures = new LinkedList<>();
int workerId = 0;
for (Integer segment : segments) {
SessionLoader.WorkerContext workerCtx = sessionLoader.computeWorkerContext(loaderCtx, segment, workerId, previousResults);
SessionInitializerWorker worker = new SessionInitializerWorker();
worker.setWorkerEnvironment(segment, ctx, sessionLoader);
worker.setWorkerEnvironment(loaderCtx, workerCtx, sessionLoader);
if (!distributed) {
worker.setEnvironment(workCache, null);
}
Future<WorkerResult> future = executorService.submit(worker);
Future<SessionLoader.WorkerResult> future = executorService.submit(worker);
futures.add(future);
workerId++;
}
for (Future<WorkerResult> future : futures) {
boolean anyFailure = false;
for (Future<SessionLoader.WorkerResult> future : futures) {
try {
WorkerResult result = future.get();
SessionLoader.WorkerResult result = future.get();
previousResults.add(result);
if (result.getSuccess()) {
int computedSegment = result.getSegment();
state.markSegmentFinished(computedSegment);
} else {
if (!result.isSuccess()) {
if (log.isTraceEnabled()) {
log.tracef("Segment %d failed to compute", result.getSegment());
}
anyFailure = true;
}
} catch (InterruptedException ie) {
anyFailure = true;
errors++;
log.error("Interruped exception when computed future. Errors: " + errors, ie);
} catch (ExecutionException ee) {
anyFailure = true;
errors++;
log.error("ExecutionException when computed future. Errors: " + errors, ee);
}
@ -163,11 +173,19 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
throw new RuntimeException("Maximum count of worker errors occured. Limit was " + maxErrors + ". See server.log for details");
}
saveStateToCache(state);
// Save just if no error happened. Otherwise re-compute
if (!anyFailure) {
for (SessionLoader.WorkerResult result : previousResults) {
state.markSegmentFinished(result.getSegment());
}
log.debugf("New initializer state pushed. The state is: %s", state);
log.debugf("New initializer state is: %s", state);
}
}
// Push the state after computation is finished
saveStateToCache(state);
// Loader callback after the task is finished
this.sessionLoader.afterAllSessionsLoaded(this);
@ -179,33 +197,4 @@ public class InfinispanCacheInitializer extends BaseCacheInitializer {
}
}
public static class WorkerResult implements Serializable {
private Integer segment;
private Boolean success;
public static WorkerResult create (Integer segment, boolean success) {
WorkerResult res = new WorkerResult();
res.setSegment(segment);
res.setSuccess(success);
return res;
}
public Integer getSegment() {
return segment;
}
public void setSegment(Integer segment) {
this.segment = segment;
}
public Boolean getSuccess() {
return success;
}
public void setSuccess(Boolean success) {
this.success = success;
}
}
}

View file

@ -17,26 +17,27 @@
package org.keycloak.models.sessions.infinispan.initializer;
import java.io.Serializable;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class OfflinePersistentUserSessionLoaderContext implements SessionLoader.LoaderContext, Serializable {
public class OfflinePersistentLoaderContext extends SessionLoader.LoaderContext {
private final int sessionsTotal;
private final int segmentsCount;
private final int sessionsPerSegment;
public OfflinePersistentUserSessionLoaderContext(int sessionsTotal, int sessionsPerSegment) {
public OfflinePersistentLoaderContext(int sessionsTotal, int sessionsPerSegment) {
super(computeSegmentsCount(sessionsTotal, sessionsPerSegment));
this.sessionsTotal = sessionsTotal;
this.sessionsPerSegment = sessionsPerSegment;
}
private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment) {
int segmentsCount = sessionsTotal / sessionsPerSegment;
if (sessionsTotal % sessionsPerSegment >= 1) {
segmentsCount = segmentsCount + 1;
}
this.segmentsCount = segmentsCount;
return segmentsCount;
}
@ -44,11 +45,6 @@ public class OfflinePersistentUserSessionLoaderContext implements SessionLoader.
return sessionsTotal;
}
@Override
public int getSegmentsCount() {
return segmentsCount;
}
public int getSessionsPerSegment() {
return sessionsPerSegment;
}
@ -56,10 +52,10 @@ public class OfflinePersistentUserSessionLoaderContext implements SessionLoader.
@Override
public String toString() {
return new StringBuilder("OfflinePersistentUserSessionLoaderContext [ ")
return new StringBuilder("OfflinePersistentLoaderContext [ ")
.append(" sessionsTotal: ").append(sessionsTotal)
.append(", sessionsPerSegment: ").append(sessionsPerSegment)
.append(", segmentsCount: ").append(segmentsCount)
.append(", segmentsCount: ").append(getSegmentsCount())
.append(" ]")
.toString();
}

View file

@ -21,8 +21,8 @@ import org.infinispan.Cache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
@ -33,7 +33,11 @@ import java.util.List;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class OfflinePersistentUserSessionLoader implements SessionLoader<OfflinePersistentUserSessionLoaderContext>, Serializable {
public class OfflinePersistentUserSessionLoader implements SessionLoader<OfflinePersistentLoaderContext,
OfflinePersistentWorkerContext, OfflinePersistentWorkerResult>, Serializable {
// Placeholder String used in the searching conditions to identify very first session
private static final String FIRST_SESSION_ID = "000";
private static final Logger log = Logger.getLogger(OfflinePersistentUserSessionLoader.class);
@ -53,46 +57,67 @@ public class OfflinePersistentUserSessionLoader implements SessionLoader<Offline
@Override
public void init(KeycloakSession session) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
// TODO: check if update of timestamps in persister can be skipped entirely
int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
persister.clearDetachedUserSessions();
persister.updateAllTimestamps(clusterStartupTime);
}
@Override
public OfflinePersistentUserSessionLoaderContext computeLoaderContext(KeycloakSession session) {
public OfflinePersistentLoaderContext computeLoaderContext(KeycloakSession session) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
int sessionsCount = persister.getUserSessionsCount(true);
return new OfflinePersistentUserSessionLoaderContext(sessionsCount, sessionsPerSegment);
return new OfflinePersistentLoaderContext(sessionsCount, sessionsPerSegment);
}
@Override
public boolean loadSessions(KeycloakSession session, OfflinePersistentUserSessionLoaderContext ctx, int segment) {
int first = ctx.getSessionsPerSegment() * segment;
int max = sessionsPerSegment;
if (log.isTraceEnabled()) {
log.tracef("Loading sessions - first: %d, max: %d", first, max);
public OfflinePersistentWorkerContext computeWorkerContext(OfflinePersistentLoaderContext loaderCtx, int segment, int workerId, List<OfflinePersistentWorkerResult> previousResults) {
int lastCreatedOn;
String lastSessionId;
if (previousResults.isEmpty()) {
lastCreatedOn = 0;
lastSessionId = FIRST_SESSION_ID;
} else {
OfflinePersistentWorkerResult lastResult = previousResults.get(previousResults.size() - 1);
lastCreatedOn = lastResult.getLastCreatedOn();
lastSessionId = lastResult.getLastSessionId();
}
// We know the last loaded session. New workers iteration will start from this place
return new OfflinePersistentWorkerContext(segment, workerId, lastCreatedOn, lastSessionId);
}
@Override
public OfflinePersistentWorkerResult createFailedWorkerResult(OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext workerContext) {
return new OfflinePersistentWorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId(), -1, FIRST_SESSION_ID);
}
@Override
public OfflinePersistentWorkerResult loadSessions(KeycloakSession session, OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext ctx) {
int first = ctx.getWorkerId() * sessionsPerSegment;
log.tracef("Loading sessions for segment: %d", ctx.getSegment());
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
List<UserSessionModel> sessions = persister.loadUserSessions(first, max, true);
List<UserSessionModel> sessions = persister.loadUserSessions(first, sessionsPerSegment, true, ctx.getLastCreatedOn(), ctx.getLastSessionId());
for (UserSessionModel persistentSession : sessions) {
log.tracef("Sessions loaded from DB - segment: %d", ctx.getSegment());
UserSessionModel lastSession = null;
if (!sessions.isEmpty()) {
lastSession = sessions.get(sessions.size() - 1);
// Save to memory/infinispan
UserSessionModel offlineUserSession = session.sessions().importUserSession(persistentSession, true, true);
session.sessions().importUserSessions(sessions, true);
}
return true;
int lastCreatedOn = lastSession==null ? Time.currentTime() + 100000 : lastSession.getStarted();
String lastSessionId = lastSession==null ? FIRST_SESSION_ID : lastSession.getId();
log.tracef("Sessions imported to infinispan - segment: %d, lastCreatedOn: %d, lastSessionId: %s", ctx.getSegment(), lastCreatedOn, lastSessionId);
return new OfflinePersistentWorkerResult(true, ctx.getSegment(), ctx.getWorkerId(), lastCreatedOn, lastSessionId);
}

View file

@ -0,0 +1,42 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.initializer;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class OfflinePersistentWorkerContext extends SessionLoader.WorkerContext {
private final int lastCreatedOn;
private final String lastSessionId;
public OfflinePersistentWorkerContext(int segment, int workerId, int lastCreatedOn, String lastSessionId) {
super(segment, workerId);
this.lastCreatedOn = lastCreatedOn;
this.lastSessionId = lastSessionId;
}
public int getLastCreatedOn() {
return lastCreatedOn;
}
public String getLastSessionId() {
return lastSessionId;
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.initializer;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class OfflinePersistentWorkerResult extends SessionLoader.WorkerResult {
private final int lastCreatedOn;
private final String lastSessionId;
public OfflinePersistentWorkerResult(boolean success, int segment, int workerId, int lastCreatedOn, String lastSessionId) {
super(success, segment, workerId);
this.lastCreatedOn = lastCreatedOn;
this.lastSessionId = lastSessionId;
}
public int getLastCreatedOn() {
return lastCreatedOn;
}
public String getLastSessionId() {
return lastSessionId;
}
}

View file

@ -31,19 +31,20 @@ import java.util.Set;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanCacheInitializer.WorkerResult>, Serializable {
public class SessionInitializerWorker implements DistributedCallable<String, Serializable, SessionLoader.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
private int segment;
private SessionLoader.LoaderContext ctx;
private SessionLoader.LoaderContext loaderCtx;
private SessionLoader.WorkerContext workerCtx;
private SessionLoader sessionLoader;
private transient Cache<String, Serializable> workCache;
public void setWorkerEnvironment(int segment, SessionLoader.LoaderContext ctx, SessionLoader sessionLoader) {
this.segment = segment;
this.ctx = ctx;
public void setWorkerEnvironment(SessionLoader.LoaderContext loaderCtx, SessionLoader.WorkerContext workerCtx, SessionLoader sessionLoader) {
this.loaderCtx = loaderCtx;
this.workerCtx = workerCtx;
this.sessionLoader = sessionLoader;
}
@ -53,27 +54,28 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ser
}
@Override
public InfinispanCacheInitializer.WorkerResult call() throws Exception {
public SessionLoader.WorkerResult call() throws Exception {
if (log.isTraceEnabled()) {
log.tracef("Running computation for segment: %d", segment);
log.tracef("Running computation for segment: %s", workerCtx.toString());
}
KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
if (sessionFactory == null) {
log.debugf("KeycloakSessionFactory not yet set in cache. Worker skipped");
return InfinispanCacheInitializer.WorkerResult.create(segment, false);
return sessionLoader.createFailedWorkerResult(loaderCtx, workerCtx);
}
SessionLoader.WorkerResult[] ref = new SessionLoader.WorkerResult[1];
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
sessionLoader.loadSessions(session, ctx, segment);
ref[0] = sessionLoader.loadSessions(session, loaderCtx, workerCtx);
}
});
return InfinispanCacheInitializer.WorkerResult.create(segment, true);
return ref[0];
}
}

View file

@ -18,13 +18,16 @@
package org.keycloak.models.sessions.infinispan.initializer;
import java.io.Serializable;
import java.util.List;
import org.keycloak.models.KeycloakSession;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContext> extends Serializable {
public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContext,
WORKER_CONTEXT extends SessionLoader.WorkerContext,
WORKER_RESULT extends SessionLoader.WorkerResult> extends Serializable {
/**
* Will be triggered just once on cluster coordinator node to perform some generic initialization tasks (Eg. update DB before starting load).
@ -38,7 +41,7 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
/**
*
* Will be triggered just once on cluster coordinator node to count the number of segments and other context data specific to the worker task.
* Will be triggered just once on cluster coordinator node to count the number of segments and other context data specific to whole computation.
* Each segment will be then later computed in one "worker" task
*
* This method could be expensive to call, so the "computed" loaderContext object is passed among workers/loaders and needs to be serializable
@ -49,15 +52,37 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
LOADER_CONTEXT computeLoaderContext(KeycloakSession session);
/**
* Compute the worker context for current iteration
*
* @param loaderCtx global loader context
* @param segment the current segment (page) to compute
* @param workerId ID of worker for current worker iteration. Usually the number 0-8 (with single cluster node)
* @param previousResults workerResults from previous computation. Can be empty list in case of the operation is triggered for the 1st time
* @return
*/
WORKER_CONTEXT computeWorkerContext(LOADER_CONTEXT loaderCtx, int segment, int workerId, List<WORKER_RESULT> previousResults);
/**
* Will be called on all cluster nodes to load the specified page.
*
* @param session
* @param loaderContext loaderContext object, which was already computed before
* @param segment to be computed
* @param loaderContext global loaderContext object, which was already computed before
* @param workerContext for current iteration
* @return
*/
boolean loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, int segment);
WORKER_RESULT loadSessions(KeycloakSession session, LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext);
/**
* Called when it's not possible to compute current iteration and load session for some reason (EG. infinispan not yet fully initialized)
*
* @param loaderContext
* @param workerContext
* @return
*/
WORKER_RESULT createFailedWorkerResult(LOADER_CONTEXT loaderContext, WORKER_CONTEXT workerContext);
/**
@ -81,9 +106,78 @@ public interface SessionLoader<LOADER_CONTEXT extends SessionLoader.LoaderContex
* Object, which contains some context data to be used by SessionLoader implementation. It's computed just once and then passed
* to each {@link SessionLoader}. It needs to be {@link Serializable}
*/
interface LoaderContext extends Serializable {
class LoaderContext implements Serializable {
int getSegmentsCount();
private final int segmentsCount;
public LoaderContext(int segmentsCount) {
this.segmentsCount = segmentsCount;
}
public int getSegmentsCount() {
return segmentsCount;
}
}
/**
* Object, which is computed before each worker iteration and contains some data to be used by the corresponding worker iteration.
* For example info about which segment/page should be loaded by current worker.
*/
class WorkerContext implements Serializable {
private final int segment;
private final int workerId;
public WorkerContext(int segment, int workerId) {
this.segment = segment;
this.workerId = workerId;
}
public int getSegment() {
return this.segment;
}
public int getWorkerId() {
return this.workerId;
}
}
/**
* Result of single worker iteration
*/
class WorkerResult implements Serializable {
private final boolean success;
private final int segment;
private final int workerId;
public WorkerResult(boolean success, int segment, int workerId) {
this.success = success;
this.segment = segment;
this.workerId = workerId;
}
public boolean isSuccess() {
return success;
}
public int getSegment() {
return segment;
}
public int getWorkerId() {
return workerId;
}
}
}

View file

@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.remotestore;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -42,7 +43,7 @@ import static org.infinispan.client.hotrod.impl.Util.await;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessionsLoaderContext>, Serializable {
public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessionsLoaderContext, SessionLoader.WorkerContext, SessionLoader.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(RemoteCacheSessionsLoader.class);
@ -92,26 +93,38 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
@Override
public boolean loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext context, int segment) {
public WorkerContext computeWorkerContext(RemoteCacheSessionsLoaderContext loaderCtx, int segment, int workerId, List<WorkerResult> previousResults) {
return new WorkerContext(segment, workerId);
}
@Override
public WorkerResult createFailedWorkerResult(RemoteCacheSessionsLoaderContext loaderContext, WorkerContext workerContext) {
return new WorkerResult(false, workerContext.getSegment(), workerContext.getWorkerId());
}
@Override
public WorkerResult loadSessions(KeycloakSession session, RemoteCacheSessionsLoaderContext loaderContext, WorkerContext ctx) {
Cache cache = getCache(session);
Cache decoratedCache = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE, Flag.IGNORE_RETURN_VALUES);
RemoteCache remoteCache = getRemoteCache(session);
Set<Integer> myIspnSegments = getMyIspnSegments(segment, context);
Set<Integer> myIspnSegments = getMyIspnSegments(ctx.getSegment(), loaderContext);
log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), segment);
log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment());
CloseableIterator<Map.Entry> iterator = null;
int countLoaded = 0;
try {
iterator = remoteCache.retrieveEntries(null, myIspnSegments, context.getSessionsPerSegment());
iterator = remoteCache.retrieveEntries(null, myIspnSegments, loaderContext.getSessionsPerSegment());
while (iterator.hasNext()) {
countLoaded++;
Map.Entry entry = iterator.next();
decoratedCache.putAsync(entry.getKey(), entry.getValue());
}
} catch (RuntimeException e) {
log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), segment);
log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
throw e;
} finally {
if (iterator != null) {
@ -119,9 +132,9 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
}
}
log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), segment, countLoaded);
log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded);
return true;
return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());
}

View file

@ -17,38 +17,34 @@
package org.keycloak.models.sessions.infinispan.remotestore;
import java.io.Serializable;
import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
/**
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderContext, Serializable {
public class RemoteCacheSessionsLoaderContext extends SessionLoader.LoaderContext {
// Count of hash segments for remote infinispan cache. It's by default 256 for distributed/replicated caches
private final int ispnSegmentsCount;
// Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
// Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
// we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
// and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
private final int segmentsCount;
private final int sessionsPerSegment;
private final int sessionsTotal;
public RemoteCacheSessionsLoaderContext(int ispnSegmentsCount, int sessionsPerSegment, int sessionsTotal) {
super(computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount));
this.ispnSegmentsCount = ispnSegmentsCount;
this.sessionsPerSegment = sessionsPerSegment;
this.sessionsTotal = sessionsTotal;
this.segmentsCount = computeSegmentsCount(sessionsTotal, sessionsPerSegment, ispnSegmentsCount);
}
private int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
// Count of segments (worker iterations for distributedExecutionService executions on KC side). Each segment will be 1 worker iteration.
// Count of segments could be lower than "ispnSegmentsCount" and depends on the size of the cache. For example if we have cache with just 500 items,
// we don't need 256 segments and send 256 requests to remoteCache to preload thing. Instead, we will have lower number of segments (EG. 8)
// and we will map more ispnSegments into 1 worker segment (In this case 256 / 8 = 32. So 32 ISPN segments mapped to each worker segment)
private static int computeSegmentsCount(int sessionsTotal, int sessionsPerSegment, int ispnSegments) {
// No support by remote ISPN cache for segments. This can happen if remoteCache is local (non-clustered)
if (ispnSegments < 0) {
return 1;
@ -68,11 +64,6 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon
}
@Override
public int getSegmentsCount() {
return segmentsCount;
}
public int getIspnSegmentsCount() {
return ispnSegmentsCount;
}
@ -89,7 +80,7 @@ public class RemoteCacheSessionsLoaderContext implements SessionLoader.LoaderCon
@Override
public String toString() {
return new StringBuilder("RemoteCacheSessionsLoaderContext [ ")
.append("segmentsCount: ").append(segmentsCount)
.append("segmentsCount: ").append(getSegmentsCount())
.append(", ispnSegmentsCount: ").append(ispnSegmentsCount)
.append(", sessionsPerSegment: ").append(sessionsPerSegment)
.append(", sessionsTotal: ").append(sessionsTotal)

View file

@ -31,6 +31,7 @@ import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.initializer.SessionLoader;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoader;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
@ -110,7 +111,7 @@ public class RemoteCacheSessionsLoaderTest {
Set<String> visitedKeys = new HashSet<>();
for (int currentSegment=0 ; currentSegment<ctx.getSegmentsCount() ; currentSegment++) {
logger.infof("Loading segment %d", currentSegment);
loader.loadSessions(null, ctx, currentSegment);
loader.loadSessions(null, ctx, new SessionLoader.WorkerContext(currentSegment, currentSegment));
logger.infof("Loaded %d keys for segment %d", cache2.keySet().size(), currentSegment);
totalCount = totalCount + cache2.keySet().size();

View file

@ -11,6 +11,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -25,7 +27,12 @@ public class ConcurrencyLockingTest {
public void testLocking() throws Exception {
final DefaultCacheManager cacheManager = getVersionedCacheManager();
Cache<String, String> cache = cacheManager.getCache("COUNTER_CACHE");
cache.put("key", "init");
Map<String, String> map = new HashMap<>();
map.put("key1", "val1");
map.put("key2", "val2");
cache.putAll(map);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
@Override

View file

@ -17,17 +17,18 @@
package org.keycloak.models.sessions.infinispan.initializer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.Map;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
@ -37,7 +38,6 @@ import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import java.util.UUID;
@ -49,20 +49,36 @@ import java.util.UUID;
@Ignore
public class DistributedCacheConcurrentWritesTest {
private static final int ITERATION_PER_WORKER = 1000;
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
private static final UUID CLIENT_1_UUID = UUID.randomUUID();
private static final int BATCHES_PER_WORKER = 1000;
private static final int ITEMS_IN_BATCH = 100;
public static void main(String[] args) throws Exception {
CacheWrapper<String, UserSessionEntity> cache1 = createCache("node1");
CacheWrapper<String, UserSessionEntity> cache2 = createCache("node2");
BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createCache("node1");
BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createCache("node2");
// NOTE: This setup requires infinispan servers to be up and running on localhost:12232 and localhost:13232
// BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createRemoteCache("node1");
// BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createRemoteCache("node2");
try {
testConcurrentPut(cache1, cache2);
} finally {
// Kill JVM
cache1.stop();
cache2.stop();
stopMgr(cache1);
stopMgr(cache2);
System.out.println("Managers killed");
}
}
private static SessionEntityWrapper<UserSessionEntity> createEntityInstance(String id) {
// Create initial item
UserSessionEntity session = new UserSessionEntity();
session.setId("123");
session.setId(id);
session.setRealmId("foo");
session.setBrokerSessionId("!23123123");
session.setBrokerUserId(null);
@ -76,177 +92,97 @@ public class DistributedCacheConcurrentWritesTest {
clientSession.setAuthMethod("saml");
clientSession.setAction("something");
clientSession.setTimestamp(1234);
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
session.getAuthenticatedClientSessions().put("foo-client", clientSession.getId());
try {
for (int i = 0; i < 10; i++) {
testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2);
}
} finally {
// Kill JVM
cache1.getCache().stop();
cache2.getCache().stop();
cache1.getCache().getCacheManager().stop();
cache2.getCache().getCacheManager().stop();
System.out.println("Managers killed");
}
return new SessionEntityWrapper<>(session);
}
// Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper<String, UserSessionEntity> cache1,
CacheWrapper<String, UserSessionEntity> cache2) throws InterruptedException {
cache1.put(key, session);
private static void testConcurrentPut(BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1,
BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2) throws InterruptedException {
// Create 2 workers for concurrent write and start them
Worker worker1 = new Worker(1, cache1, key);
Worker worker2 = new Worker(2, cache2, key);
// Create workers for concurrent write and start them
Worker worker1 = new Worker(1, cache1);
Worker worker2 = new Worker(2, cache2);
long start = System.currentTimeMillis();
System.out.println("Started clustering test for key " + key);
System.out.println("Started clustering test");
worker1.start();
//worker1.join();
worker2.start();
Thread.sleep(1000);
// Try to remove the entity after some sleep time.
cache1.wrappedCache.getAdvancedCache()
.withFlags(Flag.CACHE_MODE_LOCAL)
.remove(key);
worker1.join();
worker2.join();
long took = System.currentTimeMillis() - start;
System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms");
System.out.println("Test finished. Took: " + took + " ms. Cache size: " + cache1.size());
// System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
// + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
// // JGroups statistics
// JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
// System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
// ", received messages: " + channel.getReceivedMessages());
// JGroups statistics
printStats(cache1);
}
private static class Worker extends Thread {
private final CacheWrapper<String, UserSessionEntity> cache;
private final int threadId;
private final String key;
private final BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache;
private final int startIndex;
public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache, String key) {
this.threadId = threadId;
public Worker(int threadId, BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache) {
this.cache = cache;
this.key = key;
setName("th-" + key + "-" + threadId);
this.startIndex = (threadId - 1) * (ITEMS_IN_BATCH * BATCHES_PER_WORKER);
setName("th-" + threadId);
}
@Override
public void run() {
for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
for (int page = 0; page < BATCHES_PER_WORKER ; page++) {
int startPageIndex = startIndex + page * ITEMS_IN_BATCH;
String noteKey = "n-" + threadId + "-" + i;
putItemsClassic(startPageIndex);
//putItemsAll(startPageIndex);
// This code can be used to reproduce infinite loop ( KEYCLOAK-7443 )
// boolean replaced = false;
// while (!replaced) {
// SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
// oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
// replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
// }
int count = 0;
boolean replaced = false;
while (!replaced && count < 25) {
count++;
SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
}
if (!replaced) {
System.err.println("FAILED TO REPLACE ENTITY: " + key);
return;
}
System.out.println("Thread " + getName() + ": Saved items from " + startPageIndex + " to " + (startPageIndex + ITEMS_IN_BATCH - 1));
}
}
private boolean cacheReplace(SessionEntityWrapper<UserSessionEntity> oldSession, UserSessionEntity newSession) {
try {
boolean replaced = cache.replace(key, oldSession, newSession);
//cache.replace(key, newSession);
if (!replaced) {
failedReplaceCounter.incrementAndGet();
//return false;
//System.out.println("Replace failed!!!");
}
return replaced;
} catch (Exception re) {
failedReplaceCounter2.incrementAndGet();
return false;
// put items 1 by 1
private void putItemsClassic(int startPageIndex) {
for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
String key = "key-" + startIndex + i;
SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
cache.put(key, session);
}
//return replaced;
}
}
// Session clone
// put all items together
private void putItemsAll(int startPageIndex) {
Map<String, SessionEntityWrapper<UserSessionEntity>> mapp = new HashMap<>();
private static UserSessionEntity cloneSession(UserSessionEntity session) {
UserSessionEntity clone = new UserSessionEntity();
clone.setId(session.getId());
clone.setRealmId(session.getRealmId());
clone.setNotes(new ConcurrentHashMap<>(session.getNotes()));
return clone;
for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
String key = "key-" + startIndex + i;
SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
mapp.put(key, session);
}
cache.putAll(mapp);
}
}
// Cache creation utils
public static class CacheWrapper<K, V extends SessionEntity> {
private final Cache<K, SessionEntityWrapper<V>> wrappedCache;
public CacheWrapper(Cache<K, SessionEntityWrapper<V>> wrappedCache) {
this.wrappedCache = wrappedCache;
}
public SessionEntityWrapper<V> get(K key) {
SessionEntityWrapper<V> val = wrappedCache.get(key);
return val;
}
public void put(K key, V newVal) {
SessionEntityWrapper<V> newWrapper = new SessionEntityWrapper<>(newVal);
wrappedCache.put(key, newWrapper);
}
public boolean replace(K key, SessionEntityWrapper<V> oldVal, V newVal) {
SessionEntityWrapper<V> newWrapper = new SessionEntityWrapper<>(newVal);
return wrappedCache.replace(key, oldVal, newWrapper);
}
private Cache<K, SessionEntityWrapper<V>> getCache() {
return wrappedCache;
}
}
public static CacheWrapper<String, UserSessionEntity> createCache(String nodeName) {
public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createCache(String nodeName) {
EmbeddedCacheManager mgr = createManager(nodeName);
Cache<String, SessionEntityWrapper<UserSessionEntity>> wrapped = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
return new CacheWrapper<>(wrapped);
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
return cache;
}
@ -272,7 +208,7 @@ public class DistributedCacheConcurrentWritesTest {
ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
if (clustered) {
distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC);
distConfigBuilder.clustering().hash().numOwners(2);
distConfigBuilder.clustering().hash().numOwners(1);
// Disable L1 cache
distConfigBuilder.clustering().hash().l1().enabled(false);
@ -283,4 +219,42 @@ public class DistributedCacheConcurrentWritesTest {
return cacheManager;
}
public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createRemoteCache(String nodeName) {
int port = ("node1".equals(nodeName)) ? 12232 : 13232;
org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
org.infinispan.client.hotrod.configuration.Configuration cfg = builder
.addServer().host("localhost").port(port)
.version(ProtocolVersion.PROTOCOL_VERSION_26)
.build();
RemoteCacheManager mgr = new RemoteCacheManager(cfg);
return mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
}
// CLEANUP METHODS
private static void stopMgr(BasicCache cache) {
if (cache instanceof Cache) {
((Cache) cache).getCacheManager().stop();
} else {
((RemoteCache) cache).getRemoteCacheManager().stop();
}
}
private static void printStats(BasicCache cache) {
if (cache instanceof Cache) {
Cache cache1 = (Cache) cache;
JChannel channel = ((JGroupsTransport)cache1.getAdvancedCache().getRpcManager().getTransport()).getChannel();
System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
", received messages: " + channel.getReceivedMessages());
} else {
Map<String, String> stats = ((RemoteCache) cache).stats().getStatsMap();
System.out.println("Stats: " + stats);
}
}
}

View file

@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.initializer;
import org.junit.Assert;
import org.junit.Test;
import org.keycloak.models.cache.infinispan.UserCacheSession;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
import org.keycloak.storage.CacheableStorageProviderModel;
@ -35,16 +34,16 @@ public class InitializerStateTest {
@Test
public void testOfflineLoaderContext() {
OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5);
OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 6);
ctx = new OfflinePersistentUserSessionLoaderContext(19, 5);
ctx = new OfflinePersistentLoaderContext(19, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 4);
ctx = new OfflinePersistentUserSessionLoaderContext(20, 5);
ctx = new OfflinePersistentLoaderContext(20, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 4);
ctx = new OfflinePersistentUserSessionLoaderContext(21, 5);
ctx = new OfflinePersistentLoaderContext(21, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 5);
}
@ -78,7 +77,7 @@ public class InitializerStateTest {
@Test
public void testComputationState() {
OfflinePersistentUserSessionLoaderContext ctx = new OfflinePersistentUserSessionLoaderContext(28, 5);
OfflinePersistentLoaderContext ctx = new OfflinePersistentLoaderContext(28, 5);
Assert.assertEquals(ctx.getSegmentsCount(), 6);
InitializerState state = new InitializerState(ctx.getSegmentsCount());

View file

@ -0,0 +1,54 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.custom;
import liquibase.exception.CustomChangeException;
import liquibase.statement.core.UpdateStatement;
import liquibase.structure.core.Table;
import org.keycloak.common.util.Time;
/**
* Update CREATED_ON and LAST_SESSION_REFRESH columns to current startup time
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class JpaUpdate4_7_0_OfflineSessionsTimestamps extends CustomKeycloakTask {
@Override
protected void generateStatementsImpl() throws CustomChangeException {
String offlineUserSessionsTableName = database.correctObjectName("OFFLINE_USER_SESSION", Table.class);
try {
int currentTime = Time.currentTime();
UpdateStatement updateStatement = new UpdateStatement(null, null, offlineUserSessionsTableName)
.addNewColumnValue("LAST_SESSION_REFRESH", currentTime);
statements.add(updateStatement);
confirmationMessage.append("Updated column LAST_SESSION_REFRESH in OFFLINE_USER_SESSION table with time " + currentTime);
} catch (Exception e) {
throw new CustomChangeException(getTaskId() + ": Exception when updating data from previous version", e);
}
}
@Override
protected String getTaskId() {
return "Update 4.7.0.Final";
}
}

View file

@ -18,10 +18,10 @@
package org.keycloak.models.jpa.session;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.ModelException;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
@ -30,17 +30,23 @@ import org.keycloak.models.session.PersistentClientSessionModel;
import org.keycloak.models.session.PersistentUserSessionAdapter;
import org.keycloak.models.session.PersistentUserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.utils.SessionTimeoutHelper;
import org.keycloak.storage.StorageId;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.TypedQuery;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -63,6 +69,7 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
PersistentUserSessionEntity entity = new PersistentUserSessionEntity();
entity.setUserSessionId(model.getUserSessionId());
entity.setCreatedOn(model.getStarted());
entity.setRealmId(adapter.getRealm().getId());
entity.setUserId(adapter.getUser().getId());
String offlineStr = offlineToString(offline);
@ -99,26 +106,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
em.flush();
}
@Override
public void updateUserSession(UserSessionModel userSession, boolean offline) {
PersistentUserSessionAdapter adapter;
if (userSession instanceof PersistentUserSessionAdapter) {
adapter = (PersistentUserSessionAdapter) userSession;
} else {
adapter = new PersistentUserSessionAdapter(userSession);
}
PersistentUserSessionModel model = adapter.getUpdatedModel();
String offlineStr = offlineToString(offline);
PersistentUserSessionEntity entity = em.find(PersistentUserSessionEntity.class, new PersistentUserSessionEntity.Key(userSession.getId(), offlineStr));
if (entity == null) {
throw new ModelException("UserSession with ID " + userSession.getId() + ", offline: " + offline + " not found");
}
entity.setLastSessionRefresh(model.getLastSessionRefresh());
entity.setData(model.getData());
}
@Override
public void removeUserSession(String userSessionId, boolean offline) {
String offlineStr = offlineToString(offline);
@ -200,7 +187,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
.setParameter("externalClientId", clientStorageId.getExternalId())
.executeUpdate();
}
num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate();
}
@Override
@ -213,24 +199,53 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
num = em.createNamedQuery("deleteUserSessionsByUser").setParameter("userId", userId).executeUpdate();
}
@Override
public void clearDetachedUserSessions() {
int num = em.createNamedQuery("deleteDetachedClientSessions").executeUpdate();
num = em.createNamedQuery("deleteDetachedUserSessions").executeUpdate();
public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline) {
String offlineStr = offlineToString(offline);
int us = em.createNamedQuery("updateUserSessionLastSessionRefresh")
.setParameter("lastSessionRefresh", lastSessionRefresh)
.setParameter("realmId", realm.getId())
.setParameter("offline", offlineStr)
.setParameter("userSessionIds", userSessionIds)
.executeUpdate();
logger.debugf("Updated lastSessionRefresh of %d user sessions in realm '%s'", us, realm.getName());
}
@Override
public void updateAllTimestamps(int time) {
int num = em.createNamedQuery("updateClientSessionsTimestamps").setParameter("timestamp", time).executeUpdate();
num = em.createNamedQuery("updateUserSessionsTimestamps").setParameter("lastSessionRefresh", time).executeUpdate();
public void removeExpired(RealmModel realm) {
int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout() - SessionTimeoutHelper.PERIODIC_CLEANER_IDLE_TIMEOUT_WINDOW_SECONDS;
String offlineStr = offlineToString(true);
logger.tracef("Trigger removing expired user sessions for realm '%s'", realm.getName());
int cs = em.createNamedQuery("deleteExpiredClientSessions")
.setParameter("realmId", realm.getId())
.setParameter("lastSessionRefresh", expiredOffline)
.setParameter("offline", offlineStr)
.executeUpdate();
int us = em.createNamedQuery("deleteExpiredUserSessions")
.setParameter("realmId", realm.getId())
.setParameter("lastSessionRefresh", expiredOffline)
.setParameter("offline", offlineStr)
.executeUpdate();
logger.debugf("Removed %d expired user sessions and %d expired client sessions in realm '%s'", us, cs, realm.getName());
}
@Override
public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline) {
public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) {
String offlineStr = offlineToString(offline);
TypedQuery<PersistentUserSessionEntity> query = em.createNamedQuery("findUserSessions", PersistentUserSessionEntity.class);
query.setParameter("offline", offlineStr);
query.setParameter("lastCreatedOn", lastCreatedOn);
query.setParameter("lastSessionId", lastUserSessionId);
if (firstResult != -1) {
query.setFirstResult(firstResult);
@ -239,26 +254,14 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
query.setMaxResults(maxResults);
}
List<PersistentUserSessionEntity> results = query.getResultList();
List<UserSessionModel> result = new ArrayList<>();
List<String> userSessionIds = new ArrayList<>();
for (PersistentUserSessionEntity entity : results) {
RealmModel realm = session.realms().getRealm(entity.getRealmId());
try {
UserModel user = session.users().getUserById(entity.getUserId(), realm);
// Case when user was deleted in the meantime
if (user == null) {
onUserRemoved(realm, entity.getUserId());
return loadUserSessions(firstResult, maxResults, offline);
}
} catch (Exception e) {
logger.debugv(e,"Failed to load user with id {0}", entity.getUserId());
}
List<PersistentUserSessionAdapter> result = query.getResultStream()
.map(this::toAdapter)
.collect(Collectors.toList());
Map<String, PersistentUserSessionAdapter> sessionsById = result.stream()
.collect(Collectors.toMap(UserSessionModel::getId, Function.identity()));
result.add(toAdapter(realm, entity));
userSessionIds.add(entity.getUserSessionId());
}
Set<String> userSessionIds = sessionsById.keySet();
Set<String> removedClientUUIDs = new HashSet<>();
@ -268,28 +271,17 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
query2.setParameter("offline", offlineStr);
List<PersistentClientSessionEntity> clientSessions = query2.getResultList();
// Assume both userSessions and clientSessions ordered by userSessionId
int j = 0;
for (UserSessionModel ss : result) {
PersistentUserSessionAdapter userSession = (PersistentUserSessionAdapter) ss;
Map<String, AuthenticatedClientSessionModel> currentClientSessions = userSession.getAuthenticatedClientSessions(); // This is empty now and we want to fill it
for (PersistentClientSessionEntity clientSession : clientSessions) {
PersistentUserSessionAdapter userSession = sessionsById.get(clientSession.getUserSessionId());
boolean next = true;
while (next && j < clientSessions.size()) {
PersistentClientSessionEntity clientSession = clientSessions.get(j);
if (clientSession.getUserSessionId().equals(userSession.getId())) {
PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession);
PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSession);
Map<String, AuthenticatedClientSessionModel> currentClientSessions = userSession.getAuthenticatedClientSessions();
// Case when client was removed in the meantime
if (clientSessAdapter.getClient() == null) {
removedClientUUIDs.add(clientSession.getClientId());
} else {
currentClientSessions.put(clientSession.getClientId(), clientSessAdapter);
}
j++;
} else {
next = false;
}
// Case when client was removed in the meantime
if (clientSessAdapter.getClient() == null) {
removedClientUUIDs.add(clientSession.getClientId());
} else {
currentClientSessions.put(clientSession.getClientId(), clientSessAdapter);
}
}
}
@ -298,12 +290,18 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
onClientRemoved(clientUUID);
}
return result;
return (List) result;
}
private PersistentUserSessionAdapter toAdapter(PersistentUserSessionEntity entity) {
RealmModel realm = session.realms().getRealm(entity.getRealmId());
return toAdapter(realm, entity);
}
private PersistentUserSessionAdapter toAdapter(RealmModel realm, PersistentUserSessionEntity entity) {
PersistentUserSessionModel model = new PersistentUserSessionModel();
model.setUserSessionId(entity.getUserSessionId());
model.setStarted(entity.getCreatedOn());
model.setLastSessionRefresh(entity.getLastSessionRefresh());
model.setData(entity.getData());
model.setOffline(offlineFromString(entity.getOffline()));

View file

@ -36,10 +36,9 @@ import java.io.Serializable;
@NamedQuery(name="deleteClientSessionsByClientStorageProvider", query="delete from PersistentClientSessionEntity sess where sess.clientStorageProvider = :clientStorageProvider"),
@NamedQuery(name="deleteClientSessionsByUser", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.userId = :userId)"),
@NamedQuery(name="deleteClientSessionsByUserSession", query="delete from PersistentClientSessionEntity sess where sess.userSessionId = :userSessionId and sess.offline = :offline"),
@NamedQuery(name="deleteDetachedClientSessions", query="delete from PersistentClientSessionEntity sess where NOT EXISTS (select u.userSessionId from PersistentUserSessionEntity u where u.userSessionId = sess.userSessionId )"),
@NamedQuery(name="deleteExpiredClientSessions", query="delete from PersistentClientSessionEntity sess where sess.userSessionId IN (select u.userSessionId from PersistentUserSessionEntity u where u.realmId = :realmId AND u.offline = :offline AND u.lastSessionRefresh < :lastSessionRefresh)"),
@NamedQuery(name="findClientSessionsByUserSession", query="select sess from PersistentClientSessionEntity sess where sess.userSessionId=:userSessionId and sess.offline = :offline"),
@NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId"),
@NamedQuery(name="updateClientSessionsTimestamps", query="update PersistentClientSessionEntity c set timestamp = :timestamp"),
@NamedQuery(name="findClientSessionsByUserSessions", query="select sess from PersistentClientSessionEntity sess where sess.offline = :offline and sess.userSessionId IN (:userSessionIds) order by sess.userSessionId")
})
@Table(name="OFFLINE_CLIENT_SESSION")
@Entity

View file

@ -34,10 +34,13 @@ import java.io.Serializable;
@NamedQueries({
@NamedQuery(name="deleteUserSessionsByRealm", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId"),
@NamedQuery(name="deleteUserSessionsByUser", query="delete from PersistentUserSessionEntity sess where sess.userId = :userId"),
@NamedQuery(name="deleteDetachedUserSessions", query="delete from PersistentUserSessionEntity sess where NOT EXISTS (select c.userSessionId from PersistentClientSessionEntity c where c.userSessionId = sess.userSessionId)"),
@NamedQuery(name="deleteExpiredUserSessions", query="delete from PersistentUserSessionEntity sess where sess.realmId = :realmId AND sess.offline = :offline AND sess.lastSessionRefresh < :lastSessionRefresh"),
@NamedQuery(name="updateUserSessionLastSessionRefresh", query="update PersistentUserSessionEntity sess set lastSessionRefresh = :lastSessionRefresh where sess.realmId = :realmId" +
" AND sess.offline = :offline AND sess.userSessionId IN (:userSessionIds)"),
@NamedQuery(name="findUserSessionsCount", query="select count(sess) from PersistentUserSessionEntity sess where sess.offline = :offline"),
@NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline order by sess.userSessionId"),
@NamedQuery(name="updateUserSessionsTimestamps", query="update PersistentUserSessionEntity c set lastSessionRefresh = :lastSessionRefresh"),
@NamedQuery(name="findUserSessions", query="select sess from PersistentUserSessionEntity sess where sess.offline = :offline" +
" AND (sess.createdOn > :lastCreatedOn OR (sess.createdOn = :lastCreatedOn AND sess.userSessionId > :lastSessionId))" +
" order by sess.createdOn,sess.userSessionId")
})
@Table(name="OFFLINE_USER_SESSION")
@ -55,6 +58,9 @@ public class PersistentUserSessionEntity {
@Column(name="USER_ID", length = 255)
protected String userId;
@Column(name = "CREATED_ON")
protected int createdOn;
@Column(name = "LAST_SESSION_REFRESH")
protected int lastSessionRefresh;
@ -90,6 +96,14 @@ public class PersistentUserSessionEntity {
this.userId = userId;
}
public int getCreatedOn() {
return createdOn;
}
public void setCreatedOn(int createdOn) {
this.createdOn = createdOn;
}
public int getLastSessionRefresh() {
return lastSessionRefresh;
}

View file

@ -19,9 +19,30 @@
<changeSet author="sguilhen@redhat.com" id="4.7.0-KEYCLOAK-1267">
<addColumn tableName="REALM">
<column name="SSO_MAX_LIFESPAN_REMEMBER_ME" type="INT"/>
<column name="SSO_IDLE_TIMEOUT_REMEMBER_ME" type="INT"/>
<column name="SSO_MAX_LIFESPAN_REMEMBER_ME" type="INT" defaultValueNumeric="0"/>
<column name="SSO_IDLE_TIMEOUT_REMEMBER_ME" type="INT" defaultValueNumeric="0"/>
</addColumn>
</changeSet>
<changeSet author="keycloak" id="4.7.0-KEYCLOAK-7275">
<renameColumn tableName="OFFLINE_USER_SESSION" oldColumnName="LAST_SESSION_REFRESH" newColumnName="CREATED_ON" columnDataType="INT" />
<addNotNullConstraint tableName="OFFLINE_USER_SESSION" columnName="CREATED_ON" columnDataType="INT" defaultNullValue="0" />
<addColumn tableName="OFFLINE_USER_SESSION">
<column name="LAST_SESSION_REFRESH" type="INT" defaultValueNumeric="0">
<constraints nullable="false"/>
</column>
</addColumn>
<!--Update "lastSessionRefresh" to the current time when migrating from previous version-->
<customChange class="org.keycloak.connections.jpa.updater.liquibase.custom.JpaUpdate4_7_0_OfflineSessionsTimestamps"/>
<createIndex indexName="IDX_OFFLINE_USS_CREATEDON" tableName="OFFLINE_USER_SESSION">
<column name="CREATED_ON" type="INT"/>
</createIndex>
</changeSet>
</databaseChangeLog>

View file

@ -26,6 +26,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -74,11 +75,6 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste
}
@Override
public void updateUserSession(UserSessionModel userSession, boolean offline) {
}
@Override
public void removeUserSession(String userSessionId, boolean offline) {
@ -105,17 +101,17 @@ public class DisabledUserSessionPersisterProvider implements UserSessionPersiste
}
@Override
public void clearDetachedUserSessions() {
public void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline) {
}
@Override
public void updateAllTimestamps(int time) {
public void removeExpired(RealmModel realm) {
}
@Override
public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline) {
public List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId) {
return Collections.emptyList();
}

View file

@ -54,12 +54,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
data.setIpAddress(other.getIpAddress());
data.setNotes(other.getNotes());
data.setRememberMe(other.isRememberMe());
data.setStarted(other.getStarted());
if (other.getState() != null) {
data.setState(other.getState().toString());
}
this.model = new PersistentUserSessionModel();
this.model.setStarted(other.getStarted());
this.model.setUserSessionId(other.getId());
this.model.setLastSessionRefresh(other.getLastSessionRefresh());
@ -157,7 +157,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
@Override
public int getStarted() {
return getData().getStarted();
return model.getStarted();
}
@Override
@ -274,6 +274,7 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
@JsonProperty("rememberMe")
private boolean rememberMe;
// TODO: Keeping those just for backwards compatibility. @JsonIgnoreProperties doesn't work on Wildfly - probably due to classloading issues
@JsonProperty("started")
private int started;
@ -323,10 +324,12 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
this.rememberMe = rememberMe;
}
@Deprecated
public int getStarted() {
return started;
}
@Deprecated
public void setStarted(int started) {
this.started = started;
}

View file

@ -23,6 +23,7 @@ package org.keycloak.models.session;
public class PersistentUserSessionModel {
private String userSessionId;
private int started;
private int lastSessionRefresh;
private boolean offline;
private String data;
@ -35,6 +36,14 @@ public class PersistentUserSessionModel {
this.userSessionId = userSessionId;
}
public int getStarted() {
return started;
}
public void setStarted(int started) {
this.started = started;
}
public int getLastSessionRefresh() {
return lastSessionRefresh;
}

View file

@ -24,6 +24,7 @@ import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.provider.Provider;
import java.util.Collection;
import java.util.List;
/**
@ -37,8 +38,6 @@ public interface UserSessionPersisterProvider extends Provider {
// Assuming that corresponding userSession is already persisted
void createClientSession(AuthenticatedClientSessionModel clientSession, boolean offline);
void updateUserSession(UserSessionModel userSession, boolean offline);
// Called during logout (for online session) or during periodic expiration. It will remove all corresponding clientSessions too
void removeUserSession(String userSessionId, boolean offline);
@ -49,14 +48,14 @@ public interface UserSessionPersisterProvider extends Provider {
void onClientRemoved(RealmModel realm, ClientModel client);
void onUserRemoved(RealmModel realm, UserModel user);
// Called at startup to remove userSessions without any clientSession
void clearDetachedUserSessions();
// Bulk update of lastSessionRefresh of all specified userSessions to the given value.
void updateLastSessionRefreshes(RealmModel realm, int lastSessionRefresh, Collection<String> userSessionIds, boolean offline);
// Update "lastSessionRefresh" of all userSessions and "timestamp" of all clientSessions to specified time
void updateAllTimestamps(int time);
// Remove userSessions and clientSessions, which are expired
void removeExpired(RealmModel realm);
// Called during startup. For each userSession, it loads also clientSessions
List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline);
List<UserSessionModel> loadUserSessions(int firstResult, int maxResults, boolean offline, int lastCreatedOn, String lastUserSessionId);
int getUserSessionsCount(boolean offline);

View file

@ -19,6 +19,7 @@ package org.keycloak.models;
import org.keycloak.provider.Provider;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -63,7 +64,7 @@ public interface UserSessionProvider extends Provider {
void removeUserSession(RealmModel realm, UserSessionModel session);
void removeUserSessions(RealmModel realm, UserModel user);
/** Implementation should propagate removal of expired userSessions to userSessionPersister too **/
/** Implementation doesn't need to propagate removal of expired userSessions to userSessionPersister. Cleanup on persister will be called separately **/
void removeExpired(RealmModel realm);
void removeUserSessions(RealmModel realm);
@ -89,8 +90,8 @@ public interface UserSessionProvider extends Provider {
long getOfflineSessionsCount(RealmModel realm, ClientModel client);
List<UserSessionModel> getOfflineUserSessions(RealmModel realm, ClientModel client, int first, int max);
/** Triggered by persister during pre-load. It optionally imports authenticatedClientSessions too if requested. Otherwise the imported UserSession will have empty list of AuthenticationSessionModel **/
UserSessionModel importUserSession(UserSessionModel persistentUserSession, boolean offline, boolean importAuthenticatedClientSessions);
/** Triggered by persister during pre-load. It imports authenticatedClientSessions too **/
void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline);
void close();

View file

@ -309,7 +309,7 @@ public class TokenManager {
if (clientSession.getCurrentRefreshToken() != null &&
!refreshToken.getId().equals(clientSession.getCurrentRefreshToken()) &&
refreshToken.getIssuedAt() < clientSession.getTimestamp() &&
clusterStartupTime != clientSession.getTimestamp()) {
clusterStartupTime <= clientSession.getTimestamp()) {
throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");
}

View file

@ -20,6 +20,7 @@ package org.keycloak.services.scheduled;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.timer.ScheduledTask;
/**
@ -35,6 +36,7 @@ public class ClearExpiredUserSessions implements ScheduledTask {
for (RealmModel realm : session.realms().getRealms()) {
sessions.removeExpired(realm);
session.authenticationSessions().removeExpired(realm);
session.getProvider(UserSessionPersisterProvider.class).removeExpired(realm);
}
}

View file

@ -44,7 +44,7 @@ import org.keycloak.models.UserCredentialModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserProvider;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
import org.keycloak.models.utils.ModelToRepresentation;
import org.keycloak.protocol.oidc.OIDCLoginProtocol;
import org.keycloak.protocol.oidc.mappers.AudienceProtocolMapper;
@ -695,8 +695,8 @@ public class TestingResourceProvider implements RealmResourceProvider {
@Produces(MediaType.APPLICATION_JSON)
public Response suspendPeriodicTasks() {
suspendTask(ClearExpiredUserSessions.TASK_NAME);
suspendTask(LastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME);
suspendTask(LastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME);
suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_PERIODIC_TASK_NAME);
suspendTask(CrossDCLastSessionRefreshStoreFactory.LSR_OFFLINE_PERIODIC_TASK_NAME);
return Response.noContent().build();
}

View file

@ -31,8 +31,8 @@ import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.LastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStoreFactory;
import org.keycloak.models.sessions.infinispan.changes.sessions.SessionData;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.representations.idm.RealmRepresentation;
@ -69,7 +69,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
@Override
public void run(KeycloakSession session) {
LastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000);
CrossDCLastSessionRefreshStore customStore = createStoreInstance(session, 1000000, 1000);
System.out.println("sss");
int lastSessionRefresh = Time.currentTime();
@ -113,7 +113,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
@Override
public void run(KeycloakSession session) {
// Long timer interval. No message due the timer wasn't executed
LastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10);
CrossDCLastSessionRefreshStore customStore1 = createStoreInstance(session, 100000, 10);
Time.setOffset(100);
try {
@ -124,7 +124,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
Assert.assertEquals(0, counter.get());
// Short timer interval 10 ms. 1 message due the interval is executed and lastRun was in the past due to Time.setOffset
LastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10);
CrossDCLastSessionRefreshStore customStore2 = createStoreInstance(session, 10, 10);
Time.setOffset(200);
Retry.execute(() -> {
@ -152,12 +152,12 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
AtomicInteger counter = new AtomicInteger();
LastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) {
LastSessionRefreshStoreFactory factory = new LastSessionRefreshStoreFactory() {
CrossDCLastSessionRefreshStore createStoreInstance(KeycloakSession session, long timerIntervalMs, int maxIntervalBetweenMessagesSeconds) {
CrossDCLastSessionRefreshStoreFactory factory = new CrossDCLastSessionRefreshStoreFactory() {
@Override
protected LastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
return new LastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) {
protected CrossDCLastSessionRefreshStore createStoreInstance(int maxIntervalBetweenMessagesSeconds, int maxCount, String eventKey) {
return new CrossDCLastSessionRefreshStore(maxIntervalBetweenMessagesSeconds, maxCount, eventKey) {
@Override
protected void sendMessage(KeycloakSession kcSession, Map<String, SessionData> refreshesToSend) {

View file

@ -97,9 +97,9 @@ public class UserSessionInitializerTest {
List<UserSessionModel> loadedSessions = session.sessions().getOfflineUserSessions(realm, testApp, 0, 10);
UserSessionProviderTest.assertSessions(loadedSessions, origSessions);
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "test-app", "third-party");
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, serverStartTime, "test-app");
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, serverStartTime, "test-app");
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "test-app", "third-party");
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[1].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app");
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
}
@ -130,7 +130,7 @@ public class UserSessionInitializerTest {
List<UserSessionModel> loadedSessions = session.sessions().getOfflineUserSessions(realm, thirdparty, 0, 10);
Assert.assertEquals(1, loadedSessions.size());
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, serverStartTime, "third-party");
UserSessionPersisterProviderTest.assertSessionLoaded(loadedSessions, origSessions[0].getId(), session.users().getUserByUsername("user1", realm), "127.0.0.1", started, started, "third-party");
// Revert client
realm.addClient("test-app");

View file

@ -22,6 +22,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@ -37,8 +38,12 @@ import org.keycloak.models.UserManager;
import org.keycloak.testsuite.rule.KeycloakRule;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -112,51 +117,6 @@ public class UserSessionPersisterProviderTest {
assertSessionLoaded(loadedSessions, origSessions[2].getId(), session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
}
@Test
public void testUpdateTimestamps() {
// Create some sessions in infinispan
int started = Time.currentTime();
UserSessionModel[] origSessions = createSessions();
resetSession();
// Persist 3 created userSessions and clientSessions as offline
ClientModel testApp = realm.getClientByClientId("test-app");
List<UserSessionModel> userSessions = session.sessions().getUserSessions(realm, testApp);
for (UserSessionModel userSession : userSessions) {
persistUserSession(userSession, true);
}
// Persist 1 online session
UserSessionModel userSession = session.sessions().getUserSession(realm, origSessions[0].getId());
persistUserSession(userSession, false);
resetSession();
// update timestamps
int newTime = started + 50;
persister.updateAllTimestamps(newTime);
// Assert online session
List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(false, 1, 1, 1);
Assert.assertEquals(2, assertTimestampsUpdated(loadedSessions, newTime));
// Assert offline sessions
loadedSessions = loadPersistedSessionsPaginated(true, 2, 2, 3);
Assert.assertEquals(4, assertTimestampsUpdated(loadedSessions, newTime));
}
private int assertTimestampsUpdated(List<UserSessionModel> loadedSessions, int expectedTime) {
int clientSessionsCount = 0;
for (UserSessionModel loadedSession : loadedSessions) {
Assert.assertEquals(expectedTime, loadedSession.getLastSessionRefresh());
for (AuthenticatedClientSessionModel clientSession : loadedSession.getAuthenticatedClientSessions().values()) {
Assert.assertEquals(expectedTime, clientSession.getTimestamp());
clientSessionsCount++;
}
}
return clientSessionsCount;
}
@Test
public void testUpdateAndRemove() {
@ -177,48 +137,30 @@ public class UserSessionPersisterProviderTest {
UserSessionModel persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started, "test-app");
// Update userSession
Time.setOffset(10);
try {
persistedSession.setLastSessionRefresh(Time.currentTime());
persistedSession.setNote("foo", "bar");
persistedSession.setState(UserSessionModel.State.LOGGED_IN);
persister.updateUserSession(persistedSession, true);
// create new clientSession
AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()),
"http://redirect", "state");
persister.createClientSession(clientSession, true);
// create new clientSession
AuthenticatedClientSessionModel clientSession = createClientSession(realm.getClientByClientId("third-party"), session.sessions().getUserSession(realm, persistedSession.getId()),
"http://redirect", "state");
persister.createClientSession(clientSession, true);
resetSession();
resetSession();
// Remove clientSession
persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true);
// Assert session updated
loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started+10, "test-app", "third-party");
Assert.assertEquals("bar", persistedSession.getNote("foo"));
Assert.assertEquals(UserSessionModel.State.LOGGED_IN, persistedSession.getState());
resetSession();
// Remove clientSession
persister.removeClientSession(userSession.getId(), realm.getClientByClientId("third-party").getId(), true);
// Assert clientSession removed
loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started , "test-app");
resetSession();
// Remove userSession
persister.removeUserSession(persistedSession.getId(), true);
// Assert clientSession removed
loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, started + 10, "test-app");
resetSession();
// Remove userSession
persister.removeUserSession(persistedSession.getId(), true);
resetSession();
// Assert nothing found
loadPersistedSessionsPaginated(true, 10, 0, 0);
} finally {
Time.setOffset(0);
}
// Assert nothing found
loadPersistedSessionsPaginated(true, 10, 0, 0);
}
@Test
@ -302,8 +244,8 @@ public class UserSessionPersisterProviderTest {
resetSession();
// Assert nothing loaded - userSession was removed as well because it was last userSession
loadPersistedSessionsPaginated(true, 10, 0, 0);
// Assert loading still works - last userSession is still there, but no clientSession on it
loadPersistedSessionsPaginated(true, 10, 1, 1);
// Cleanup
realmMgr = new RealmManager(session);
@ -340,23 +282,108 @@ public class UserSessionPersisterProviderTest {
UserSessionModel persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user2", realm), "127.0.0.3", started, started, "test-app");
// KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly"
// KEYCLOAK-2431 Assert that userSessionPersister is resistent even to situation, when users are deleted "directly".
// No exception will happen. However session will be still there
UserModel user2 = session.users().getUserByUsername("user2", realm);
session.users().removeUser(realm, user2);
loadedSessions = loadPersistedSessionsPaginated(true, 10, 0, 0);
loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
// Cleanup
UserSessionModel userSession = loadedSessions.get(0);
session.sessions().removeUserSession(realm, userSession);
persister.removeUserSession(userSession.getId(), userSession.isOffline());
}
// KEYCLOAK-1999
@Test
public void testNoSessions() {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
List<UserSessionModel> sessions = persister.loadUserSessions(0, 1, true);
List<UserSessionModel> sessions = persister.loadUserSessions(0, 1, true, 0, "abc");
Assert.assertEquals(0, sessions.size());
}
@Test
public void testMoreSessions() {
// Create 10 userSessions - each having 1 clientSession
List<UserSessionModel> userSessions = new ArrayList<>();
UserModel user = session.users().getUserByUsername("user1", realm);
for (int i=0 ; i<20 ; i++) {
// Having different offsets for each session (to ensure that lastSessionRefresh is also different)
Time.setOffset(i);
UserSessionModel userSession = session.sessions().createUserSession(realm, user, "user1", "127.0.0.1", "form", true, null, null);
createClientSession(realm.getClientByClientId("test-app"), userSession, "http://redirect", "state");
userSessions.add(userSession);
}
resetSession();
for (UserSessionModel userSession : userSessions) {
UserSessionModel userSession2 = session.sessions().getUserSession(realm, userSession.getId());
persistUserSession(userSession2, true);
}
resetSession();
List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(true, 2, 10, 20);
user = session.users().getUserByUsername("user1", realm);
ClientModel testApp = realm.getClientByClientId("test-app");
for (UserSessionModel loadedSession : loadedSessions) {
assertEquals(user.getId(), loadedSession.getUser().getId());
assertEquals("127.0.0.1", loadedSession.getIpAddress());
assertEquals(user.getUsername(), loadedSession.getLoginUsername());
assertEquals(1, loadedSession.getAuthenticatedClientSessions().size());
assertTrue(loadedSession.getAuthenticatedClientSessions().containsKey(testApp.getId()));
}
}
@Test
public void testExpiredSessions() {
// Create some sessions in infinispan
int started = Time.currentTime();
UserSessionModel[] origSessions = createSessions();
resetSession();
// Persist 2 offline sessions of 2 users
UserSessionModel userSession1 = session.sessions().getUserSession(realm, origSessions[1].getId());
UserSessionModel userSession2 = session.sessions().getUserSession(realm, origSessions[2].getId());
persistUserSession(userSession1, true);
persistUserSession(userSession2, true);
resetSession();
// Update one of the sessions with lastSessionRefresh of 20 days ahead
int lastSessionRefresh = Time.currentTime() + 1728000;
persister.updateLastSessionRefreshes(realm, lastSessionRefresh, Collections.singleton(userSession1.getId()), true);
resetSession();
// Increase time offset - 40 days
Time.setOffset(3456000);
try {
// Run expiration thread
persister.removeExpired(realm);
// Test the updated session is still in persister. Not updated session is not there anymore
List<UserSessionModel> loadedSessions = loadPersistedSessionsPaginated(true, 10, 1, 1);
UserSessionModel persistedSession = loadedSessions.get(0);
UserSessionProviderTest.assertSession(persistedSession, session.users().getUserByUsername("user1", realm), "127.0.0.2", started, lastSessionRefresh, "test-app");
} finally {
// Cleanup
Time.setOffset(0);
}
}
private AuthenticatedClientSessionModel createClientSession(ClientModel client, UserSessionModel userSession, String redirect, String state) {
AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, client, userSession);
clientSession.setRedirectUri(redirect);
@ -407,19 +434,32 @@ public class UserSessionPersisterProviderTest {
private List<UserSessionModel> loadPersistedSessionsPaginated(boolean offline, int sessionsPerPage, int expectedPageCount, int expectedSessionsCount) {
int count = persister.getUserSessionsCount(offline);
int start = 0;
int pageCount = 0;
boolean next = true;
List<UserSessionModel> result = new ArrayList<>();
while (next && start < count) {
List<UserSessionModel> sess = persister.loadUserSessions(start, sessionsPerPage, offline);
if (sess.size() == 0) {
int lastCreatedOn = 0;
String lastSessionId = "abc";
while (next) {
List<UserSessionModel> sess = persister.loadUserSessions(0, sessionsPerPage, offline, lastCreatedOn, lastSessionId);
if (sess.size() < sessionsPerPage) {
next = false;
// We had at least some session
if (sess.size() > 0) {
pageCount++;
}
} else {
pageCount++;
start += sess.size();
result.addAll(sess);
UserSessionModel lastSession = sess.get(sess.size() - 1);
lastCreatedOn = lastSession.getStarted();
lastSessionId = lastSession.getId();
}
result.addAll(sess);
}
Assert.assertEquals(pageCount, expectedPageCount);

View file

@ -363,24 +363,45 @@ public class UserSessionProviderOfflineTest {
// sessions are in persister too
Assert.assertEquals(3, persister.getUserSessionsCount(true));
// Set lastSessionRefresh to session[0] to 0
session0.setLastSessionRefresh(0);
resetSession();
session.sessions().removeExpired(realm);
resetSession();
// assert session0 not found now
Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId()));
Assert.assertEquals(2, persister.getUserSessionsCount(true));
// Expire everything and assert nothing found
Time.setOffset(3000000);
// Increase timeOffset - 5 minutes
Time.setOffset(300);
try {
// Update lastSessionRefresh of session0. This will update lastSessionRefresh of all the sessions to DB as they were not yet updated to DB
session0.setLastSessionRefresh(Time.currentTime());
resetSession();
// Increase timeOffset - 20 days
Time.setOffset(1728000);
session0 = session.sessions().getOfflineUserSession(realm, origSessions[0].getId());
session0.setLastSessionRefresh(Time.currentTime());
resetSession();
// Increase timeOffset - 40 days
Time.setOffset(3456000);
// Expire and ensure that all sessions despite session0 were removed
session.sessions().removeExpired(realm);
persister.removeExpired(realm);
resetSession();
// assert session0 is the only session found
Assert.assertNotNull(session.sessions().getOfflineUserSession(realm, origSessions[0].getId()));
Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[1].getId()));
Assert.assertNull(session.sessions().getOfflineUserSession(realm, origSessions[2].getId()));
Assert.assertEquals(1, persister.getUserSessionsCount(true));
// Expire everything and assert nothing found
Time.setOffset(6000000);
session.sessions().removeExpired(realm);
persister.removeExpired(realm);
resetSession();

View file

@ -17,9 +17,17 @@
package org.keycloak.testsuite.util.cli;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UserSessionProviderFactory;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -33,15 +41,97 @@ public class LoadPersistentSessionsCommand extends AbstractCommand {
@Override
protected void doRunCommand(KeycloakSession session) {
int sessionsPerSegment = getIntArg(0);
UserSessionProviderFactory sessionProviderFactory = (UserSessionProviderFactory) sessionFactory.getProviderFactory(UserSessionProvider.class);
sessionProviderFactory.loadPersistentSessions(sessionFactory, 10, sessionsPerSegment);
final int workersCount = getIntArg(0);
final int limit = getIntArg(1);
//int workersCount = 8;
//int limit = 64;
AtomicInteger lastCreatedOn = new AtomicInteger(0);
AtomicReference<String> lastSessionId = new AtomicReference<>("abc");
AtomicBoolean finished = new AtomicBoolean(false);
int i=0;
while (!finished.get()) {
if (i % 16 == 0) {
log.infof("Starting iteration: %s . lastCreatedOn: %d, lastSessionId: %s", i, lastCreatedOn.get(), lastSessionId.get());
}
i = i + workersCount;
List<Thread> workers = new LinkedList<>();
MyWorker lastWorker = null;
for (int workerId = 0 ; workerId < workersCount ; workerId++) {
lastWorker = new MyWorker(workerId, lastCreatedOn.get(), lastSessionId.get(), limit, sessionFactory);
Thread worker = new Thread(lastWorker);
workers.add(worker);
}
for (Thread worker : workers) {
worker.start();
}
for (Thread worker : workers) {
try {
worker.join();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
List<UserSessionModel> lastWorkerSessions = lastWorker.getLoadedSessions();
if (lastWorkerSessions.size() < limit) {
finished.set(true);
} else {
UserSessionModel lastSession = lastWorkerSessions.get(lastWorkerSessions.size() - 1);
lastCreatedOn.set(lastSession.getStarted());
lastSessionId.set(lastSession.getId());
}
}
log.info("All persistent sessions loaded successfully");
}
@Override
public String printUsage() {
return super.printUsage() + " <sessions-count-per-segment>";
return super.printUsage() + " <workers-count (for example 8)> <limit (for example 64)>";
}
private class MyWorker implements Runnable {
private final int workerId;
private final int lastCreatedOn;
private final String lastSessionId;
private final int limit;
private final KeycloakSessionFactory sessionFactory;
private List<UserSessionModel> loadedSessions = new LinkedList<>();
public MyWorker(int workerId, int lastCreatedOn, String lastSessionId, int limit, KeycloakSessionFactory sessionFactory) {
this.workerId = workerId;
this.lastCreatedOn = lastCreatedOn;
this.lastSessionId = lastSessionId;
this.limit = limit;
this.sessionFactory = sessionFactory;
}
@Override
public void run() {
KeycloakModelUtils.runJobInTransaction(sessionFactory, (keycloakSession) -> {
int offset = workerId * limit;
UserSessionPersisterProvider persister = keycloakSession.getProvider(UserSessionPersisterProvider.class);
loadedSessions = persister.loadUserSessions(offset, limit, true, lastCreatedOn, lastSessionId);
});
}
private List<UserSessionModel> getLoadedSessions() {
return loadedSessions;
}
}
}

View file

@ -19,6 +19,7 @@ package org.keycloak.testsuite.util.cli;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
@ -36,6 +37,8 @@ import org.keycloak.models.utils.KeycloakModelUtils;
*/
public class PersistSessionsCommand extends AbstractCommand {
private AtomicInteger userCounter = new AtomicInteger();
@Override
public String getName() {
return "persistSessions";
@ -75,12 +78,18 @@ public class PersistSessionsCommand extends AbstractCommand {
@Override
public void run(KeycloakSession session) {
RealmModel realm = session.realms().getRealmByName("master");
UserModel john = session.users().getUserByUsername("admin", realm);
ClientModel testApp = realm.getClientByClientId("security-admin-console");
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
for (int i = 0; i < countInThisBatch; i++) {
UserSessionModel userSession = session.sessions().createUserSession(realm, john, "john-doh@localhost", "127.0.0.2", "form", true, null, null);
String username = "john-" + userCounter.incrementAndGet();
UserModel john = session.users().getUserByUsername(username, realm);
if (john == null) {
john = session.users().addUser(realm, username);
}
UserSessionModel userSession = session.sessions().createUserSession(realm, john, username, "127.0.0.2", "form", true, null, null);
AuthenticatedClientSessionModel clientSession = session.sessions().createClientSession(realm, testApp, userSession);
clientSession.setRedirectUri("http://redirect");
clientSession.setNote("foo", "bar-" + i);