Merge offline and online sessions transactions
Closes #29139 Signed-off-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
parent
7c427e8d38
commit
4c17c6107e
16 changed files with 523 additions and 225 deletions
|
@ -27,10 +27,10 @@ import org.keycloak.models.KeycloakSession;
|
|||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.changes.ClientSessionUpdateTask;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionsChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
|
@ -46,13 +46,13 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
private final SessionRefreshStore provider;
|
||||
private AuthenticatedClientSessionEntity entity;
|
||||
private final ClientModel client;
|
||||
private final InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx;
|
||||
private final SessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx;
|
||||
private UserSessionModel userSession;
|
||||
private boolean offline;
|
||||
|
||||
public AuthenticatedClientSessionAdapter(KeycloakSession kcSession, SessionRefreshStore provider,
|
||||
AuthenticatedClientSessionEntity entity, ClientModel client, UserSessionModel userSession,
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx, boolean offline) {
|
||||
SessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx, boolean offline) {
|
||||
if (userSession == null) {
|
||||
throw new NullPointerException("userSession must not be null");
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
// as nonexistent in org.keycloak.models.sessions.infinispan.UserSessionAdapter.getAuthenticatedClientSessions()
|
||||
this.userSession = null;
|
||||
|
||||
SessionUpdateTask<AuthenticatedClientSessionEntity> removeTask = Tasks.removeSync();
|
||||
SessionUpdateTask<AuthenticatedClientSessionEntity> removeTask = Tasks.removeSync(offline);
|
||||
|
||||
clientSessionUpdateTx.addTask(entity.getId(), removeTask);
|
||||
}
|
||||
|
@ -109,6 +109,10 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
entity.setRedirectUri(uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -156,6 +160,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
.shouldSaveClientSessionToRemoteCache(kcSession, client.getRealm(), sessionWrapper, userSession, offline, timestamp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "setTimestamp(" + timestamp + ')';
|
||||
|
@ -179,6 +188,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
public void runUpdate(AuthenticatedClientSessionEntity entity) {
|
||||
entity.setCurrentRefreshTokenUseCount(currentRefreshTokenUseCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -197,6 +211,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
public void runUpdate(AuthenticatedClientSessionEntity entity) {
|
||||
entity.setCurrentRefreshToken(currentRefreshToken);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -216,6 +235,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
entity.setAction(action);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -235,6 +259,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
entity.setAuthMethod(method);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -254,6 +283,10 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
entity.getNotes().put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -268,6 +301,11 @@ public class AuthenticatedClientSessionAdapter implements AuthenticatedClientSes
|
|||
entity.getNotes().remove(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
|
|
@ -132,10 +132,6 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
offlineSessionsCache,
|
||||
clientSessionCache,
|
||||
offlineClientSessionsCache,
|
||||
serializerSession,
|
||||
serializerOfflineSession,
|
||||
serializerClientSession,
|
||||
serializerOfflineClientSession,
|
||||
asyncQueuePersistentUpdate
|
||||
);
|
||||
}
|
||||
|
|
|
@ -42,9 +42,7 @@ import org.keycloak.models.UserSessionProvider;
|
|||
import org.keycloak.models.light.LightweightUserAdapter;
|
||||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.ClientSessionPersistentChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.PersistentUpdate;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SerializeExecutionsByKey;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
|
||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||
|
@ -103,9 +101,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
protected final Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache;
|
||||
|
||||
protected final UserSessionPersistentChangelogBasedTransaction sessionTx;
|
||||
protected final UserSessionPersistentChangelogBasedTransaction offlineSessionTx;
|
||||
protected final ClientSessionPersistentChangelogBasedTransaction clientSessionTx;
|
||||
protected final ClientSessionPersistentChangelogBasedTransaction offlineClientSessionTx;
|
||||
|
||||
protected final SessionEventsSenderTransaction clusterEventsSenderTx;
|
||||
|
||||
|
@ -126,10 +122,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionCache,
|
||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache,
|
||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionCache,
|
||||
SerializeExecutionsByKey<String> serializerSession,
|
||||
SerializeExecutionsByKey<String> serializerOfflineSession,
|
||||
SerializeExecutionsByKey<UUID> serializerClientSession,
|
||||
SerializeExecutionsByKey<UUID> serializerOfflineClientSession,
|
||||
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate) {
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||
|
@ -142,14 +134,19 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
this.offlineSessionCache = offlineSessionCache;
|
||||
this.offlineClientSessionCache = offlineClientSessionCache;
|
||||
|
||||
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session, sessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs, false, serializerSession,
|
||||
asyncQueuePersistentUpdate);
|
||||
this.offlineSessionTx = new UserSessionPersistentChangelogBasedTransaction(session, offlineSessionCache, remoteCacheInvoker, SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs, true, serializerOfflineSession,
|
||||
this.sessionTx = new UserSessionPersistentChangelogBasedTransaction(session,
|
||||
sessionCache, offlineSessionCache,
|
||||
remoteCacheInvoker,
|
||||
SessionTimeouts::getUserSessionLifespanMs, SessionTimeouts::getUserSessionMaxIdleMs,
|
||||
SessionTimeouts::getOfflineSessionLifespanMs, SessionTimeouts::getOfflineSessionMaxIdleMs,
|
||||
asyncQueuePersistentUpdate);
|
||||
|
||||
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, clientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs, false, keyGenerator, sessionTx, serializerClientSession,
|
||||
asyncQueuePersistentUpdate);
|
||||
this.offlineClientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session, offlineClientSessionCache, remoteCacheInvoker, SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs, true, keyGenerator, offlineSessionTx, serializerOfflineClientSession,
|
||||
this.clientSessionTx = new ClientSessionPersistentChangelogBasedTransaction(session,
|
||||
clientSessionCache, offlineClientSessionCache,
|
||||
remoteCacheInvoker,
|
||||
SessionTimeouts::getClientSessionLifespanMs, SessionTimeouts::getClientSessionMaxIdleMs,
|
||||
SessionTimeouts::getOfflineClientSessionLifespanMs, SessionTimeouts::getOfflineClientSessionMaxIdleMs,
|
||||
sessionTx,
|
||||
asyncQueuePersistentUpdate);
|
||||
|
||||
this.clusterEventsSenderTx = new SessionEventsSenderTransaction(session);
|
||||
|
@ -162,27 +159,17 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
|
||||
session.getTransactionManager().enlistAfterCompletion(clusterEventsSenderTx);
|
||||
session.getTransactionManager().enlistAfterCompletion(sessionTx);
|
||||
session.getTransactionManager().enlistAfterCompletion(offlineSessionTx);
|
||||
session.getTransactionManager().enlistAfterCompletion(clientSessionTx);
|
||||
session.getTransactionManager().enlistAfterCompletion(offlineClientSessionTx);
|
||||
}
|
||||
|
||||
protected Cache<String, SessionEntityWrapper<UserSessionEntity>> getCache(boolean offline) {
|
||||
return offline ? offlineSessionCache : sessionCache;
|
||||
}
|
||||
|
||||
protected UserSessionPersistentChangelogBasedTransaction getTransaction(boolean offline) {
|
||||
return offline ? offlineSessionTx : sessionTx;
|
||||
}
|
||||
|
||||
protected Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> getClientSessionCache(boolean offline) {
|
||||
return offline ? offlineClientSessionCache : clientSessionCache;
|
||||
}
|
||||
|
||||
protected ClientSessionPersistentChangelogBasedTransaction getClientSessionTransaction(boolean offline) {
|
||||
return offline ? offlineClientSessionTx : clientSessionTx;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CrossDCLastSessionRefreshStore getLastSessionRefreshStore() {
|
||||
return lastSessionRefreshStore;
|
||||
|
@ -217,15 +204,13 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
entity.getNotes().put(AuthenticatedClientSessionModel.USER_SESSION_REMEMBER_ME_NOTE, "true");
|
||||
}
|
||||
|
||||
UserSessionPersistentChangelogBasedTransaction userSessionUpdateTx = getTransaction(false);
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(false);
|
||||
AuthenticatedClientSessionAdapter adapter = new AuthenticatedClientSessionAdapter(session, this, entity, client, userSession, clientSessionUpdateTx, false);
|
||||
AuthenticatedClientSessionAdapter adapter = new AuthenticatedClientSessionAdapter(session, this, entity, client, userSession, clientSessionTx, false);
|
||||
|
||||
if (Profile.isFeatureEnabled(Feature.PERSISTENT_USER_SESSIONS_NO_CACHE)) {
|
||||
if (userSession.isOffline()) {
|
||||
// If this is an offline session, and the referred online session doesn't exist anymore, don't register the client session in the transaction.
|
||||
// Instead keep it transient and it will be added to the offline session only afterward. This is expected by SessionTimeoutsTest.testOfflineUserClientIdleTimeoutSmallerThanSessionOneRefresh.
|
||||
if (userSessionUpdateTx.get(realm, userSession.getId()) == null) {
|
||||
if (sessionTx.get(realm, userSession.getId(), false) == null) {
|
||||
return adapter;
|
||||
}
|
||||
}
|
||||
|
@ -236,10 +221,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
userSession.getPersistenceState() : UserSessionModel.SessionPersistenceState.PERSISTENT;
|
||||
|
||||
SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
|
||||
clientSessionUpdateTx.addTask(clientSessionId, createClientSessionTask, entity, persistenceState);
|
||||
clientSessionTx.addTask(clientSessionId, createClientSessionTask, entity, persistenceState);
|
||||
|
||||
SessionUpdateTask<UserSessionEntity> registerClientSessionTask = new RegisterClientSessionTask(client.getId(), clientSessionId);
|
||||
userSessionUpdateTx.addTask(userSession.getId(), registerClientSessionTask);
|
||||
SessionUpdateTask<UserSessionEntity> registerClientSessionTask = new ClientSessionPersistentChangelogBasedTransaction.RegisterClientSessionTask(client.getId(), clientSessionId, userSession.isOffline());
|
||||
sessionTx.addTask(userSession.getId(), registerClientSessionTask);
|
||||
|
||||
return adapter;
|
||||
}
|
||||
|
@ -286,41 +271,14 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
return getUserSession(realm, id, false);
|
||||
}
|
||||
|
||||
protected UserSessionAdapter getUserSession(RealmModel realm, String id, boolean offline) {
|
||||
UserSessionPersistentChangelogBasedTransaction tx = getTransaction(offline);
|
||||
SessionEntityWrapper<UserSessionEntity> entityWrapper = tx.get(realm, id);
|
||||
if (entityWrapper == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
UserSessionEntity entity = entityWrapper.getEntity();
|
||||
if (entity.getRealmId().equals(realm.getId())) {
|
||||
return wrap(realm, entity, offline);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private UserSessionEntity getUserSessionFromTx(RealmModel realm, boolean offline, UserSessionModel persistentUserSession) {
|
||||
SessionEntityWrapper<UserSessionEntity> userSessionEntitySessionEntityWrapper = getTransaction(offline).get(realm, persistentUserSession.getId());
|
||||
if (userSessionEntitySessionEntityWrapper != null) {
|
||||
return userSessionEntitySessionEntityWrapper.getEntity();
|
||||
}
|
||||
return null;
|
||||
private UserSessionAdapter getUserSession(RealmModel realm, String id, boolean offline) {
|
||||
SessionEntityWrapper<UserSessionEntity> entityWrapper = sessionTx.get(realm, id, offline);
|
||||
return entityWrapper != null ? wrap(realm, entityWrapper.getEntity(), offline) : null;
|
||||
}
|
||||
|
||||
private UserSessionEntity getUserSessionEntity(RealmModel realm, String id, boolean offline) {
|
||||
UserSessionPersistentChangelogBasedTransaction tx = getTransaction(offline);
|
||||
SessionEntityWrapper<UserSessionEntity> entityWrapper = tx.get(realm, id);
|
||||
if (entityWrapper == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
UserSessionEntity entity = entityWrapper.getEntity();
|
||||
if (!entity.getRealmId().equals(realm.getId())) {
|
||||
return null;
|
||||
}
|
||||
return entity;
|
||||
SessionEntityWrapper<UserSessionEntity> entityWrapper = sessionTx.get(realm, id, offline);
|
||||
return entityWrapper != null ? entityWrapper.getEntity() : null;
|
||||
}
|
||||
|
||||
private Stream<UserSessionModel> getUserSessionsFromPersistenceProviderStream(RealmModel realm, UserModel user, boolean offline) {
|
||||
|
@ -399,17 +357,15 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
}
|
||||
|
||||
UUID clientSessionUUID = UUID.fromString(clientSessionId);
|
||||
ClientSessionPersistentChangelogBasedTransaction clientTx = getClientSessionTransaction(offline);
|
||||
|
||||
SessionEntityWrapper<AuthenticatedClientSessionEntity> clientSessionEntity = clientTx.get(client.getRealm(), client, userSession, clientSessionUUID);
|
||||
SessionEntityWrapper<AuthenticatedClientSessionEntity> clientSessionEntity = clientSessionTx.get(client.getRealm(), client, userSession, clientSessionUUID, offline);
|
||||
if (clientSessionEntity != null) {
|
||||
return new AuthenticatedClientSessionAdapter(session, this, clientSessionEntity.getEntity(), client, userSession, clientTx, offline);
|
||||
return new AuthenticatedClientSessionAdapter(session, this, clientSessionEntity.getEntity(), client, userSession, clientSessionTx, offline);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Stream<UserSessionModel> getUserSessionsStream(final RealmModel realm, UserModel user) {
|
||||
return getUserSessionsStream(realm, UserSessionPredicate.create(realm.getId()).user(user.getId()), false);
|
||||
|
@ -469,21 +425,20 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
SessionEntityWrapper<UserSessionEntity> remoteSessionEntityWrapper = (SessionEntityWrapper<UserSessionEntity>) remoteCache.get(id);
|
||||
if (remoteSessionEntityWrapper != null) {
|
||||
UserSessionEntity remoteSessionEntity = remoteSessionEntityWrapper.getEntity();
|
||||
remoteSessionEntity.setOffline(offline);
|
||||
log.debugf("getUserSessionWithPredicate(%s): remote cache contains session entity %s", id, remoteSessionEntity);
|
||||
|
||||
UserSessionModel remoteSessionAdapter = wrap(realm, remoteSessionEntity, offline);
|
||||
if (predicate.test(remoteSessionAdapter)) {
|
||||
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> tx = getTransaction(offline);
|
||||
|
||||
// Remote entity contains our predicate. Update local cache with the remote entity
|
||||
SessionEntityWrapper<UserSessionEntity> sessionWrapper = remoteSessionEntity.mergeRemoteEntityWithLocalEntity(tx.get(id));
|
||||
SessionEntityWrapper<UserSessionEntity> sessionWrapper = remoteSessionEntity.mergeRemoteEntityWithLocalEntity(sessionTx.get(id, offline));
|
||||
|
||||
// Replace entity just in ispn cache. Skip remoteStore
|
||||
cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
|
||||
.replace(id, sessionWrapper);
|
||||
|
||||
tx.reloadEntityInCurrentTransaction(realm, id, sessionWrapper);
|
||||
sessionTx.reloadEntityInCurrentTransaction(realm, id, sessionWrapper);
|
||||
|
||||
// Recursion. We should have it locally now
|
||||
return getUserSessionWithPredicate(realm, id, offline, predicate);
|
||||
|
@ -670,22 +625,17 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
}
|
||||
|
||||
protected void removeUserSession(UserSessionEntity sessionEntity, boolean offline) {
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
|
||||
sessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> clientSessionUpdateTx.addTask(clientSessionId, Tasks.removeSync()));
|
||||
SessionUpdateTask<UserSessionEntity> removeTask = Tasks.removeSync();
|
||||
userSessionUpdateTx.addTask(sessionEntity.getId(), removeTask);
|
||||
sessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> clientSessionTx.addTask(clientSessionId, Tasks.removeSync(offline)));
|
||||
SessionUpdateTask<UserSessionEntity> removeTask = Tasks.removeSync(offline);
|
||||
sessionTx.addTask(sessionEntity.getId(), removeTask);
|
||||
}
|
||||
|
||||
UserSessionAdapter wrap(RealmModel realm, UserSessionEntity entity, boolean offline, UserModel user) {
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
|
||||
|
||||
if (entity == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new UserSessionAdapter(session, user, this, userSessionUpdateTx, clientSessionUpdateTx, realm, entity, offline);
|
||||
return new UserSessionAdapter(session, user, this, sessionTx, clientSessionTx, realm, entity, offline);
|
||||
}
|
||||
|
||||
UserSessionAdapter wrap(RealmModel realm, UserSessionEntity entity, boolean offline) {
|
||||
|
@ -725,11 +675,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
@Override
|
||||
public UserSessionModel createOfflineUserSession(UserSessionModel userSession) {
|
||||
UserSessionEntity entity = createUserSessionEntityInstance(userSession);
|
||||
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(true);
|
||||
entity.setOffline(true);
|
||||
|
||||
SessionUpdateTask<UserSessionEntity> importTask = Tasks.addIfAbsentSync();
|
||||
userSessionUpdateTx.addTask(userSession.getId(), importTask, entity, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
sessionTx.addTask(userSession.getId(), importTask, entity, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
|
||||
UserSessionAdapter offlineUserSession = wrap(userSession.getRealm(), entity, true);
|
||||
|
||||
|
@ -764,9 +713,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
UserSessionAdapter userSessionAdapter = (offlineUserSession instanceof UserSessionAdapter) ? (UserSessionAdapter) offlineUserSession :
|
||||
getOfflineUserSession(offlineUserSession.getRealm(), offlineUserSession.getId());
|
||||
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(true);
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(true);
|
||||
AuthenticatedClientSessionAdapter offlineClientSession = importClientSession(userSessionAdapter, clientSession, userSessionUpdateTx, clientSessionUpdateTx, true, false);
|
||||
AuthenticatedClientSessionAdapter offlineClientSession = importClientSession(userSessionAdapter, clientSession, true, false);
|
||||
|
||||
// update timestamp to current time
|
||||
offlineClientSession.setTimestamp(Time.currentTime());
|
||||
|
@ -939,15 +886,16 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
|
||||
entity.setStarted(userSession.getStarted());
|
||||
entity.setLastSessionRefresh(userSession.getLastSessionRefresh());
|
||||
entity.setOffline(userSession.isOffline());
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
|
||||
private AuthenticatedClientSessionAdapter importClientSession(UserSessionAdapter sessionToImportInto, AuthenticatedClientSessionModel clientSession,
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx,
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx,
|
||||
boolean offline, boolean checkExpiration) {
|
||||
private AuthenticatedClientSessionAdapter importClientSession(UserSessionAdapter sessionToImportInto,
|
||||
AuthenticatedClientSessionModel clientSession,
|
||||
boolean offline,
|
||||
boolean checkExpiration) {
|
||||
AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(sessionToImportInto.getId(), clientSession,
|
||||
sessionToImportInto.getRealm().getId(), clientSession.getClient().getId(), offline);
|
||||
entity.setUserSessionId(sessionToImportInto.getId());
|
||||
|
@ -969,15 +917,15 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
final UUID clientSessionId = entity.getId();
|
||||
|
||||
SessionUpdateTask<AuthenticatedClientSessionEntity> createClientSessionTask = Tasks.addIfAbsentSync();
|
||||
clientSessionUpdateTx.addTask(entity.getId(), createClientSessionTask, entity, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
clientSessionTx.addTask(entity.getId(), createClientSessionTask, entity, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
|
||||
AuthenticatedClientSessionStore clientSessions = sessionToImportInto.getEntity().getAuthenticatedClientSessions();
|
||||
clientSessions.put(clientSession.getClient().getId(), clientSessionId);
|
||||
|
||||
SessionUpdateTask<UserSessionEntity> registerClientSessionTask = new RegisterClientSessionTask(clientSession.getClient().getId(), clientSessionId);
|
||||
userSessionUpdateTx.addTask(sessionToImportInto.getId(), registerClientSessionTask);
|
||||
SessionUpdateTask<UserSessionEntity> registerClientSessionTask = new ClientSessionPersistentChangelogBasedTransaction.RegisterClientSessionTask(clientSession.getClient().getId(), clientSessionId, offline);
|
||||
sessionTx.addTask(sessionToImportInto.getId(), registerClientSessionTask);
|
||||
|
||||
return new AuthenticatedClientSessionAdapter(session, this, entity, clientSession.getClient(), sessionToImportInto, clientSessionUpdateTx, offline);
|
||||
return new AuthenticatedClientSessionAdapter(session, this, entity, clientSession.getClient(), sessionToImportInto, clientSessionTx, offline);
|
||||
}
|
||||
|
||||
|
||||
|
@ -996,6 +944,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
entity.setTimestamp(clientSession.getTimestamp());
|
||||
entity.setCurrentRefreshToken(clientSession.getCurrentRefreshToken());
|
||||
entity.setCurrentRefreshTokenUseCount(clientSession.getCurrentRefreshTokenUseCount());
|
||||
entity.setOffline(offline);
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
@ -1007,10 +956,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
return null;
|
||||
}
|
||||
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx = getTransaction(offline);
|
||||
userSessionUpdateTx.addTask(userSessionEntity.getId(), null, userSessionEntity, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx = getClientSessionTransaction(offline);
|
||||
sessionTx.addTask(userSessionEntity.getId(), null, userSessionEntity, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
|
||||
for (Map.Entry<String, AuthenticatedClientSessionModel> entry : persistentUserSession.getAuthenticatedClientSessions().entrySet()) {
|
||||
String clientUUID = entry.getKey();
|
||||
|
@ -1029,10 +975,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
// Update userSession entity with the clientSession
|
||||
AuthenticatedClientSessionStore clientSessions = userSessionEntity.getAuthenticatedClientSessions();
|
||||
clientSessions.put(clientUUID, clientSession.getId());
|
||||
clientSessionUpdateTx.addTask(clientSession.getId(), null, clientSession, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
clientSessionTx.addTask(clientSession.getId(), null, clientSession, UserSessionModel.SessionPersistenceState.PERSISTENT);
|
||||
}
|
||||
|
||||
return userSessionUpdateTx.get(userSessionEntity.getId());
|
||||
return sessionTx.get(userSessionEntity.getId(), offline);
|
||||
|
||||
}
|
||||
|
||||
|
@ -1048,33 +994,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
|
|||
return idleChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG || lifetimeChecker.apply(realm, null, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG;
|
||||
}
|
||||
|
||||
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
|
||||
|
||||
private final String clientUuid;
|
||||
private final UUID clientSessionId;
|
||||
|
||||
public RegisterClientSessionTask(String clientUuid, UUID clientSessionId) {
|
||||
this.clientUuid = clientUuid;
|
||||
this.clientSessionId = clientSessionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runUpdate(UserSessionEntity session) {
|
||||
AuthenticatedClientSessionStore clientSessions = session.getAuthenticatedClientSessions();
|
||||
clientSessions.put(clientUuid, clientSessionId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheOperation getOperation(UserSessionEntity session) {
|
||||
return CacheOperation.REPLACE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<UserSessionEntity> sessionWrapper) {
|
||||
return CrossDCMessageStatus.SYNC;
|
||||
}
|
||||
}
|
||||
|
||||
public static UUID createClientSessionUUID(String userSessionId, String clientId) {
|
||||
// This allows creating a UUID that is constant even if the entry is reloaded from the database
|
||||
return UUID.nameUUIDFromBytes((userSessionId + clientId).getBytes(StandardCharsets.UTF_8));
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.keycloak.models.RealmModel;
|
|||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
import org.keycloak.models.UserSessionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionsChangelogBasedTransaction;
|
||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshChecker;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||
|
@ -53,9 +53,9 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
|
||||
private final T provider;
|
||||
|
||||
private final InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx;
|
||||
private final SessionsChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx;
|
||||
|
||||
private final InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx;
|
||||
private final SessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx;
|
||||
|
||||
private final RealmModel realm;
|
||||
|
||||
|
@ -68,8 +68,8 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
private SessionPersistenceState persistenceState;
|
||||
|
||||
public UserSessionAdapter(KeycloakSession session, UserModel user, T provider,
|
||||
InfinispanChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx,
|
||||
InfinispanChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx,
|
||||
SessionsChangelogBasedTransaction<String, UserSessionEntity> userSessionUpdateTx,
|
||||
SessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> clientSessionUpdateTx,
|
||||
RealmModel realm, UserSessionEntity entity, boolean offline) {
|
||||
this.session = session;
|
||||
this.user = user;
|
||||
|
@ -144,6 +144,11 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
public void runUpdate(UserSessionEntity entity) {
|
||||
removedClientUUIDS.forEach(entity.getAuthenticatedClientSessions()::remove);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
update(task);
|
||||
}
|
||||
|
@ -155,7 +160,7 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
clientSessionUuids.forEach(clientSessionId -> this.clientSessionUpdateTx.addTask(clientSessionId, Tasks.removeSync()));
|
||||
clientSessionUuids.forEach(clientSessionId -> this.clientSessionUpdateTx.addTask(clientSessionId, Tasks.removeSync(offline)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -250,6 +255,11 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
.shouldSaveUserSessionToRemoteCache(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "setLastSessionRefresh(" + lastSessionRefresh + ')';
|
||||
|
@ -284,6 +294,10 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
entity.getNotes().put(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -298,6 +312,10 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
entity.getNotes().remove(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
};
|
||||
|
||||
update(task);
|
||||
|
@ -317,6 +335,11 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
public void setState(State state) {
|
||||
UserSessionUpdateTask task = new UserSessionUpdateTask() {
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runUpdate(UserSessionEntity entity) {
|
||||
entity.setState(state);
|
||||
|
@ -340,6 +363,11 @@ public class UserSessionAdapter<T extends SessionRefreshStore & UserSessionProvi
|
|||
public void restartSession(RealmModel realm, UserModel user, String loginUsername, String ipAddress, String authMethod, boolean rememberMe, String brokerSessionId, String brokerUserId) {
|
||||
UserSessionUpdateTask task = new UserSessionUpdateTask() {
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runUpdate(UserSessionEntity entity) {
|
||||
InfinispanUserSessionProvider.updateSessionEntity(entity, realm, user, loginUsername, ipAddress, authMethod, rememberMe, brokerSessionId, brokerUserId);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.changes;
|
|||
|
||||
import org.infinispan.Cache;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.common.Profile;
|
||||
import org.keycloak.models.AuthenticatedClientSessionModel;
|
||||
import org.keycloak.models.ClientModel;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
|
@ -32,7 +33,6 @@ import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessi
|
|||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||
import org.keycloak.models.sessions.infinispan.util.InfinispanKeyGenerator;
|
||||
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
|
||||
|
||||
import java.util.UUID;
|
||||
|
@ -42,23 +42,32 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
public class ClientSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(ClientSessionPersistentChangelogBasedTransaction.class);
|
||||
private final InfinispanKeyGenerator keyGenerator;
|
||||
private final UserSessionPersistentChangelogBasedTransaction userSessionTx;
|
||||
|
||||
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader, SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader, boolean offline, InfinispanKeyGenerator keyGenerator,
|
||||
UserSessionPersistentChangelogBasedTransaction userSessionTx, SerializeExecutionsByKey<UUID> serializer, ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, batchingQueue);
|
||||
this.keyGenerator = keyGenerator;
|
||||
public ClientSessionPersistentChangelogBasedTransaction(KeycloakSession session,
|
||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache,
|
||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineCache,
|
||||
RemoteCacheInvoker remoteCacheInvoker,
|
||||
SessionFunction<AuthenticatedClientSessionEntity> lifespanMsLoader,
|
||||
SessionFunction<AuthenticatedClientSessionEntity> maxIdleTimeMsLoader,
|
||||
SessionFunction<AuthenticatedClientSessionEntity> offlineLifespanMsLoader,
|
||||
SessionFunction<AuthenticatedClientSessionEntity> offlineMaxIdleTimeMsLoader,
|
||||
UserSessionPersistentChangelogBasedTransaction userSessionTx,
|
||||
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue);
|
||||
this.userSessionTx = userSessionTx;
|
||||
}
|
||||
|
||||
public SessionEntityWrapper<AuthenticatedClientSessionEntity> get(RealmModel realm, ClientModel client, UserSessionModel userSession, UUID key) {
|
||||
SessionUpdatesList<AuthenticatedClientSessionEntity> myUpdates = updates.get(key);
|
||||
public SessionEntityWrapper<AuthenticatedClientSessionEntity> get(RealmModel realm, ClientModel client, UserSessionModel userSession, UUID key, boolean offline) {
|
||||
SessionUpdatesList<AuthenticatedClientSessionEntity> myUpdates = getUpdates(offline).get(key);
|
||||
if (myUpdates == null) {
|
||||
SessionEntityWrapper<AuthenticatedClientSessionEntity> wrappedEntity = cache.get(key);
|
||||
SessionEntityWrapper<AuthenticatedClientSessionEntity> wrappedEntity = null;
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE)) {
|
||||
wrappedEntity = getCache(offline).get(key);
|
||||
}
|
||||
if (wrappedEntity == null) {
|
||||
LOG.debugf("client-session not found in cache for sessionId=%s, offline=%s, loading from persister", key, offline);
|
||||
wrappedEntity = getSessionEntityFromPersister(realm, client, userSession);
|
||||
wrappedEntity = getSessionEntityFromPersister(realm, client, userSession, offline);
|
||||
} else {
|
||||
LOG.debugf("client-session found in cache for sessionId=%s, offline=%s", key, offline);
|
||||
}
|
||||
|
@ -68,6 +77,9 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
return null;
|
||||
}
|
||||
|
||||
// Cache does not contain the offline flag value so adding it
|
||||
wrappedEntity.getEntity().setOffline(offline);
|
||||
|
||||
RealmModel realmFromSession = kcSession.realms().getRealm(wrappedEntity.getEntity().getRealmId());
|
||||
if (!realmFromSession.getId().equals(realm.getId())) {
|
||||
LOG.warnf("Realm mismatch for session %s. Expected realm %s, but found realm %s", wrappedEntity.getEntity(), realm.getId(), realmFromSession.getId());
|
||||
|
@ -75,7 +87,7 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
}
|
||||
|
||||
myUpdates = new SessionUpdatesList<>(realm, wrappedEntity);
|
||||
updates.put(key, myUpdates);
|
||||
getUpdates(offline).put(key, myUpdates);
|
||||
|
||||
return wrappedEntity;
|
||||
} else {
|
||||
|
@ -92,7 +104,7 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
}
|
||||
}
|
||||
|
||||
private SessionEntityWrapper<AuthenticatedClientSessionEntity> getSessionEntityFromPersister(RealmModel realm, ClientModel client, UserSessionModel userSession) {
|
||||
private SessionEntityWrapper<AuthenticatedClientSessionEntity> getSessionEntityFromPersister(RealmModel realm, ClientModel client, UserSessionModel userSession, boolean offline) {
|
||||
UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class);
|
||||
AuthenticatedClientSessionModel clientSession = persister.loadClientSession(realm, client, userSession, offline);
|
||||
|
||||
|
@ -123,6 +135,7 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
entity.setClientId(clientId);
|
||||
entity.setRedirectUri(clientSession.getRedirectUri());
|
||||
entity.setTimestamp(clientSession.getTimestamp());
|
||||
entity.setOffline(clientSession.getUserSession().isOffline());
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
@ -130,14 +143,15 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
private SessionEntityWrapper<AuthenticatedClientSessionEntity> importClientSession(RealmModel realm, ClientModel client, UserSessionModel userSession, AuthenticatedClientSessionModel persistentClientSession) {
|
||||
AuthenticatedClientSessionEntity entity = createAuthenticatedClientSessionInstance(userSession.getId(), persistentClientSession,
|
||||
realm.getId(), client.getId());
|
||||
boolean offline = userSession.isOffline();
|
||||
|
||||
entity.setUserSessionId(userSession.getId());
|
||||
|
||||
// Update timestamp to same value as userSession. LastSessionRefresh of userSession from DB will have correct value
|
||||
entity.setTimestamp(userSession.getLastSessionRefresh());
|
||||
|
||||
if (maxIdleTimeMsLoader.apply(realm, client, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG
|
||||
|| lifespanMsLoader.apply(realm, client, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG) {
|
||||
if (getMaxIdleMsLoader(offline).apply(realm, client, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG
|
||||
|| getLifespanMsLoader(offline).apply(realm, client, entity) == SessionTimeouts.ENTRY_EXPIRED_FLAG) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -154,20 +168,22 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
AuthenticatedClientSessionStore clientSessions = sessionToImportInto.getEntity().getAuthenticatedClientSessions();
|
||||
clientSessions.put(client.getId(), clientSessionId);
|
||||
|
||||
SessionUpdateTask registerClientSessionTask = new RegisterClientSessionTask(client.getId(), clientSessionId);
|
||||
SessionUpdateTask registerClientSessionTask = new RegisterClientSessionTask(client.getId(), clientSessionId, offline);
|
||||
userSessionTx.addTask(sessionToImportInto.getId(), registerClientSessionTask);
|
||||
|
||||
return new SessionEntityWrapper<>(entity);
|
||||
}
|
||||
|
||||
private static class RegisterClientSessionTask implements SessionUpdateTask<UserSessionEntity> {
|
||||
public static class RegisterClientSessionTask implements PersistentSessionUpdateTask<UserSessionEntity> {
|
||||
|
||||
private final String clientUuid;
|
||||
private final UUID clientSessionId;
|
||||
private final boolean offline;
|
||||
|
||||
public RegisterClientSessionTask(String clientUuid, UUID clientSessionId) {
|
||||
public RegisterClientSessionTask(String clientUuid, UUID clientSessionId, boolean offline) {
|
||||
this.clientUuid = clientUuid;
|
||||
this.clientSessionId = clientSessionId;
|
||||
this.offline = offline;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,5 +201,10 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
|
|||
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<UserSessionEntity> sessionWrapper) {
|
||||
return CrossDCMessageStatus.SYNC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessi
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public abstract class ClientSessionUpdateTask implements SessionUpdateTask<AuthenticatedClientSessionEntity> {
|
||||
public abstract class ClientSessionUpdateTask implements PersistentSessionUpdateTask<AuthenticatedClientSessionEntity> {
|
||||
|
||||
@Override
|
||||
public CacheOperation getOperation(AuthenticatedClientSessionEntity session) {
|
||||
|
|
|
@ -44,7 +44,7 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.U
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> extends AbstractKeycloakTransaction {
|
||||
public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> extends AbstractKeycloakTransaction implements SessionsChangelogBasedTransaction<K, V> {
|
||||
|
||||
public static final Logger logger = Logger.getLogger(InfinispanChangelogBasedTransaction.class);
|
||||
|
||||
|
@ -71,6 +71,7 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void addTask(K key, SessionUpdateTask<V> task) {
|
||||
SessionUpdatesList<V> myUpdates = updates.get(key);
|
||||
if (myUpdates == null) {
|
||||
|
|
|
@ -57,23 +57,16 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
private static final Logger LOG = Logger.getLogger(JpaChangesPerformer.class);
|
||||
|
||||
private final String cacheName;
|
||||
private final boolean offline;
|
||||
private final List<PersistentUpdate> changes = new LinkedList<>();
|
||||
private final TriConsumer<KeycloakSession, Map.Entry<K, SessionUpdatesList<V>>, MergedUpdate<V>> processor;
|
||||
private final ArrayBlockingQueue<PersistentUpdate> batchingQueue;
|
||||
|
||||
public JpaChangesPerformer(String cacheName, boolean offline, ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||
public JpaChangesPerformer(String cacheName, ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||
this.cacheName = cacheName;
|
||||
this.offline = offline;
|
||||
this.batchingQueue = batchingQueue;
|
||||
processor = processor();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean benefitsFromBatching() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged) {
|
||||
changes.add(new PersistentUpdate(innerSession -> processor.accept(innerSession, entry, merged)));
|
||||
|
@ -131,7 +124,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
|
||||
if (merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.REMOVE) {
|
||||
AuthenticatedClientSessionEntity entity = (AuthenticatedClientSessionEntity) sessionWrapper.getEntity();
|
||||
userSessionPersister.removeClientSession(entity.getUserSessionId(), entity.getClientId(), offline);
|
||||
userSessionPersister.removeClientSession(entity.getUserSessionId(), entity.getClientId(), entity.isOffline());
|
||||
} else if (merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.ADD || merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.ADD_IF_ABSENT){
|
||||
AuthenticatedClientSessionEntity entity = (AuthenticatedClientSessionEntity) sessionWrapper.getEntity();
|
||||
userSessionPersister.createClientSession(new AuthenticatedClientSessionModel() {
|
||||
|
@ -264,7 +257,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
public void setProtocol(String method) {
|
||||
throw new IllegalStateException("not implemented");
|
||||
}
|
||||
}, offline);
|
||||
}, entity.isOffline());
|
||||
} else {
|
||||
AuthenticatedClientSessionEntity entity = (AuthenticatedClientSessionEntity) sessionWrapper.getEntity();
|
||||
ClientModel client = new ClientModelLazyDelegate(null) {
|
||||
|
@ -279,7 +272,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
return entity.getUserSessionId();
|
||||
}
|
||||
};
|
||||
PersistentAuthenticatedClientSessionAdapter clientSessionModel = (PersistentAuthenticatedClientSessionAdapter) userSessionPersister.loadClientSession(realm, client, userSession, offline);
|
||||
PersistentAuthenticatedClientSessionAdapter clientSessionModel = (PersistentAuthenticatedClientSessionAdapter) userSessionPersister.loadClientSession(realm, client, userSession, entity.isOffline());
|
||||
if (clientSessionModel != null) {
|
||||
AuthenticatedClientSessionEntity authenticatedClientSessionEntity = new AuthenticatedClientSessionEntity(entity.getId()) {
|
||||
@Override
|
||||
|
@ -413,7 +406,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
sessionUpdates.getUpdateTasks().forEach(vSessionUpdateTask -> {
|
||||
vSessionUpdateTask.runUpdate((V) authenticatedClientSessionEntity);
|
||||
if (vSessionUpdateTask.getOperation((V) authenticatedClientSessionEntity) == SessionUpdateTask.CacheOperation.REMOVE) {
|
||||
userSessionPersister.removeClientSession(entity.getUserSessionId(), entity.getClientId(), offline);
|
||||
userSessionPersister.removeClientSession(entity.getUserSessionId(), entity.getClientId(), entity.isOffline());
|
||||
}
|
||||
});
|
||||
clientSessionModel.getUpdatedModel();
|
||||
|
@ -427,11 +420,11 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
|
||||
RealmModel realm = sessionUpdates.getRealm();
|
||||
UserSessionPersisterProvider userSessionPersister = innerSession.getProvider(UserSessionPersisterProvider.class);
|
||||
|
||||
if (merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.REMOVE) {
|
||||
userSessionPersister.removeUserSession(entry.getKey().toString(), offline);
|
||||
} else if (merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.ADD || merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.ADD_IF_ABSENT){
|
||||
UserSessionEntity entity = (UserSessionEntity) sessionWrapper.getEntity();
|
||||
|
||||
if (merged.getOperation((V) entity) == SessionUpdateTask.CacheOperation.REMOVE) {
|
||||
userSessionPersister.removeUserSession(entry.getKey().toString(), entity.isOffline());
|
||||
} else if (merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.ADD || merged.getOperation(sessionWrapper.getEntity()) == SessionUpdateTask.CacheOperation.ADD_IF_ABSENT){
|
||||
userSessionPersister.createUserSession(new UserSessionModel() {
|
||||
@Override
|
||||
public String getId() {
|
||||
|
@ -505,7 +498,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return offline;
|
||||
return entity.isOffline();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -553,9 +546,9 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
public void restartSession(RealmModel realm, UserModel user, String loginUsername, String ipAddress, String authMethod, boolean rememberMe, String brokerSessionId, String brokerUserId) {
|
||||
throw new IllegalStateException("not implemented");
|
||||
}
|
||||
}, offline);
|
||||
}, entity.isOffline());
|
||||
} else {
|
||||
PersistentUserSessionAdapter userSessionModel = (PersistentUserSessionAdapter) userSessionPersister.loadUserSession(realm, entry.getKey().toString(), offline);
|
||||
PersistentUserSessionAdapter userSessionModel = (PersistentUserSessionAdapter) userSessionPersister.loadUserSession(realm, entry.getKey().toString(), entity.isOffline());
|
||||
if (userSessionModel != null) {
|
||||
UserSessionEntity userSessionEntity = new UserSessionEntity() {
|
||||
@Override
|
||||
|
@ -737,7 +730,7 @@ public class JpaChangesPerformer<K, V extends SessionEntity> implements SessionC
|
|||
sessionUpdates.getUpdateTasks().forEach(vSessionUpdateTask -> {
|
||||
vSessionUpdateTask.runUpdate((V) userSessionEntity);
|
||||
if (vSessionUpdateTask.getOperation((V)userSessionEntity) == SessionUpdateTask.CacheOperation.REMOVE) {
|
||||
userSessionPersister.removeUserSession(entry.getKey().toString(), offline);
|
||||
userSessionPersister.removeUserSession(entry.getKey().toString(), entity.isOffline());
|
||||
}
|
||||
});
|
||||
userSessionModel.getUpdatedModel();
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright 2024 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.sessions.infinispan.changes;
|
||||
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
|
||||
/**
|
||||
* Marker interface for tasks that update persistent sessions
|
||||
*/
|
||||
public interface PersistentSessionUpdateTask<S extends SessionEntity> extends SessionUpdateTask<S> {
|
||||
boolean isOffline();
|
||||
}
|
|
@ -18,7 +18,9 @@
|
|||
package org.keycloak.models.sessions.infinispan.changes;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.common.Profile;
|
||||
import org.keycloak.models.AbstractKeycloakTransaction;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
|
@ -26,67 +28,256 @@ import org.keycloak.models.sessions.infinispan.SessionFunction;
|
|||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
|
||||
|
||||
public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEntity> extends InfinispanChangelogBasedTransaction<K, V> {
|
||||
abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEntity> extends AbstractKeycloakTransaction implements SessionsChangelogBasedTransaction<K, V> {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(PersistentSessionsChangelogBasedTransaction.class);
|
||||
protected final KeycloakSession kcSession;
|
||||
protected final Map<K, SessionUpdatesList<V>> updates = new HashMap<>();
|
||||
protected final Map<K, SessionUpdatesList<V>> offlineUpdates = new HashMap<>();
|
||||
private final List<SessionChangesPerformer<K, V>> changesPerformers;
|
||||
protected final boolean offline;
|
||||
private final Cache<K, SessionEntityWrapper<V>> cache;
|
||||
private final Cache<K, SessionEntityWrapper<V>> offlineCache;
|
||||
private final SessionFunction<V> lifespanMsLoader;
|
||||
private final SessionFunction<V> maxIdleTimeMsLoader;
|
||||
private final SessionFunction<V> offlineLifespanMsLoader;
|
||||
private final SessionFunction<V> offlineMaxIdleTimeMsLoader;
|
||||
|
||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session, Cache<K, SessionEntityWrapper<V>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<V> lifespanMsLoader, SessionFunction<V> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<K> serializer,
|
||||
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session,
|
||||
Cache<K, SessionEntityWrapper<V>> cache,
|
||||
Cache<K, SessionEntityWrapper<V>> offlineCache,
|
||||
RemoteCacheInvoker remoteCacheInvoker,
|
||||
SessionFunction<V> lifespanMsLoader,
|
||||
SessionFunction<V> maxIdleTimeMsLoader,
|
||||
SessionFunction<V> offlineLifespanMsLoader,
|
||||
SessionFunction<V> offlineMaxIdleTimeMsLoader,
|
||||
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, serializer);
|
||||
this.offline = offline;
|
||||
kcSession = session;
|
||||
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||
throw new IllegalStateException("Persistent user sessions are not enabled");
|
||||
}
|
||||
|
||||
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE) &&
|
||||
(cache.getName().equals(USER_SESSION_CACHE_NAME) || cache.getName().equals(CLIENT_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME))) {
|
||||
if (! (
|
||||
cache.getName().equals(USER_SESSION_CACHE_NAME)
|
||||
|| cache.getName().equals(CLIENT_SESSION_CACHE_NAME)
|
||||
|| cache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME)
|
||||
|| cache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME)
|
||||
)) {
|
||||
throw new IllegalStateException("Cache name is not valid for persistent user sessions: " + cache.getName());
|
||||
}
|
||||
|
||||
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE)) {
|
||||
changesPerformers = List.of(
|
||||
new JpaChangesPerformer<>(cache.getName(), offline, batchingQueue)
|
||||
new JpaChangesPerformer<>(cache.getName(), batchingQueue)
|
||||
);
|
||||
} else {
|
||||
changesPerformers = List.of(
|
||||
new JpaChangesPerformer<>(cache.getName(), offline, batchingQueue),
|
||||
new EmbeddedCachesChangesPerformer<>(cache),
|
||||
new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker)
|
||||
new JpaChangesPerformer<>(cache.getName(), batchingQueue),
|
||||
new EmbeddedCachesChangesPerformer<>(cache) {
|
||||
@Override
|
||||
public boolean shouldConsumeChange(V entity) {
|
||||
return !entity.isOffline();
|
||||
}
|
||||
},
|
||||
new EmbeddedCachesChangesPerformer<>(offlineCache){
|
||||
@Override
|
||||
public boolean shouldConsumeChange(V entity) {
|
||||
return entity.isOffline();
|
||||
}
|
||||
},
|
||||
new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) {
|
||||
@Override
|
||||
public boolean shouldConsumeChange(V entity) {
|
||||
return !entity.isOffline();
|
||||
}
|
||||
},
|
||||
new RemoteCachesChangesPerformer<>(session, offlineCache, remoteCacheInvoker) {
|
||||
@Override
|
||||
public boolean shouldConsumeChange(V entity) {
|
||||
return entity.isOffline();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
this.cache = cache;
|
||||
this.offlineCache = offlineCache;
|
||||
this.lifespanMsLoader = lifespanMsLoader;
|
||||
this.maxIdleTimeMsLoader = maxIdleTimeMsLoader;
|
||||
this.offlineLifespanMsLoader = offlineLifespanMsLoader;
|
||||
this.offlineMaxIdleTimeMsLoader = offlineMaxIdleTimeMsLoader;
|
||||
}
|
||||
|
||||
protected Cache<K, SessionEntityWrapper<V>> getCache(boolean offline) {
|
||||
if (offline) {
|
||||
return offlineCache;
|
||||
} else {
|
||||
return cache;
|
||||
}
|
||||
}
|
||||
|
||||
protected SessionFunction<V> getLifespanMsLoader(boolean offline) {
|
||||
if (offline) {
|
||||
return offlineLifespanMsLoader;
|
||||
} else {
|
||||
return lifespanMsLoader;
|
||||
}
|
||||
}
|
||||
|
||||
protected SessionFunction<V> getMaxIdleMsLoader(boolean offline) {
|
||||
if (offline) {
|
||||
return offlineMaxIdleTimeMsLoader;
|
||||
} else {
|
||||
return maxIdleTimeMsLoader;
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<K, SessionUpdatesList<V>> getUpdates(boolean offline) {
|
||||
if (offline) {
|
||||
return offlineUpdates;
|
||||
} else {
|
||||
return updates;
|
||||
}
|
||||
}
|
||||
|
||||
public SessionEntityWrapper<V> get(K key, boolean offline){
|
||||
SessionUpdatesList<V> myUpdates = getUpdates(offline).get(key);
|
||||
if (myUpdates == null) {
|
||||
SessionEntityWrapper<V> wrappedEntity = getCache(offline).get(key);
|
||||
if (wrappedEntity == null) {
|
||||
return null;
|
||||
}
|
||||
wrappedEntity.getEntity().setOffline(offline);
|
||||
|
||||
RealmModel realm = kcSession.realms().getRealm(wrappedEntity.getEntity().getRealmId());
|
||||
|
||||
myUpdates = new SessionUpdatesList<>(realm, wrappedEntity);
|
||||
getUpdates(offline).put(key, myUpdates);
|
||||
|
||||
return wrappedEntity;
|
||||
} else {
|
||||
V entity = myUpdates.getEntityWrapper().getEntity();
|
||||
|
||||
// If entity is scheduled for remove, we don't return it.
|
||||
boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
|
||||
|
||||
return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
|
||||
|
||||
}).findFirst().isPresent();
|
||||
|
||||
return scheduledForRemove ? null : myUpdates.getEntityWrapper();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void commitImpl() {
|
||||
for (Map.Entry<K, SessionUpdatesList<V>> entry : updates.entrySet()) {
|
||||
for (Map.Entry<K, SessionUpdatesList<V>> entry : Stream.concat(updates.entrySet().stream(), offlineUpdates.entrySet().stream()).toList()) {
|
||||
SessionUpdatesList<V> sessionUpdates = entry.getValue();
|
||||
SessionEntityWrapper<V> sessionWrapper = sessionUpdates.getEntityWrapper();
|
||||
V entity = sessionWrapper.getEntity();
|
||||
boolean isOffline = entity.isOffline();
|
||||
|
||||
// Don't save transient entities to infinispan. They are valid just for current transaction
|
||||
if (sessionUpdates.getPersistenceState() == UserSessionModel.SessionPersistenceState.TRANSIENT) continue;
|
||||
|
||||
RealmModel realm = sessionUpdates.getRealm();
|
||||
|
||||
long lifespanMs = lifespanMsLoader.apply(realm, sessionUpdates.getClient(), sessionWrapper.getEntity());
|
||||
long maxIdleTimeMs = maxIdleTimeMsLoader.apply(realm, sessionUpdates.getClient(), sessionWrapper.getEntity());
|
||||
long lifespanMs = getLifespanMsLoader(isOffline).apply(realm, sessionUpdates.getClient(), entity);
|
||||
long maxIdleTimeMs = getMaxIdleMsLoader(isOffline).apply(realm, sessionUpdates.getClient(), entity);
|
||||
|
||||
MergedUpdate<V> merged = MergedUpdate.computeUpdate(sessionUpdates.getUpdateTasks(), sessionWrapper, lifespanMs, maxIdleTimeMs);
|
||||
|
||||
if (merged != null) {
|
||||
changesPerformers.forEach(p -> p.registerChange(entry, merged));
|
||||
changesPerformers.stream()
|
||||
.filter(performer -> performer.shouldConsumeChange(entity))
|
||||
.forEach(p -> p.registerChange(entry, merged));
|
||||
}
|
||||
}
|
||||
|
||||
changesPerformers.forEach(SessionChangesPerformer::applyChanges);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addTask(K key, SessionUpdateTask<V> originalTask) {
|
||||
if (! (originalTask instanceof PersistentSessionUpdateTask)) {
|
||||
throw new IllegalArgumentException("Task must be instance of PersistentSessionUpdateTask");
|
||||
}
|
||||
|
||||
PersistentSessionUpdateTask<V> task = (PersistentSessionUpdateTask<V>) originalTask;
|
||||
SessionUpdatesList<V> myUpdates = getUpdates(task.isOffline()).get(key);
|
||||
if (myUpdates == null) {
|
||||
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE)) {
|
||||
throw new IllegalStateException("Can't load from cache");
|
||||
}
|
||||
|
||||
// Lookup entity from cache
|
||||
SessionEntityWrapper<V> wrappedEntity = getCache(task.isOffline()).get(key);
|
||||
if (wrappedEntity == null) {
|
||||
LOG.tracef("Not present cache item for key %s", key);
|
||||
return;
|
||||
}
|
||||
// Cache does not contain the offline flag value so adding it
|
||||
wrappedEntity.getEntity().setOffline(task.isOffline());
|
||||
|
||||
RealmModel realm = kcSession.realms().getRealm(wrappedEntity.getEntity().getRealmId());
|
||||
|
||||
myUpdates = new SessionUpdatesList<>(realm, wrappedEntity);
|
||||
getUpdates(task.isOffline()).put(key, myUpdates);
|
||||
}
|
||||
|
||||
// Run the update now, so reader in same transaction can see it (TODO: Rollback may not work correctly. See if it's an issue..)
|
||||
task.runUpdate(myUpdates.getEntityWrapper().getEntity());
|
||||
myUpdates.add(task);
|
||||
}
|
||||
|
||||
public void addTask(K key, SessionUpdateTask<V> task, V entity, UserSessionModel.SessionPersistenceState persistenceState) {
|
||||
if (entity == null) {
|
||||
throw new IllegalArgumentException("Null entity not allowed");
|
||||
}
|
||||
|
||||
RealmModel realm = kcSession.realms().getRealm(entity.getRealmId());
|
||||
SessionEntityWrapper<V> wrappedEntity = new SessionEntityWrapper<>(entity);
|
||||
SessionUpdatesList<V> myUpdates = new SessionUpdatesList<>(realm, wrappedEntity, persistenceState);
|
||||
getUpdates(entity.isOffline()).put(key, myUpdates);
|
||||
|
||||
if (task != null) {
|
||||
// Run the update now, so reader in same transaction can see it
|
||||
task.runUpdate(entity);
|
||||
myUpdates.add(task);
|
||||
}
|
||||
}
|
||||
|
||||
public void reloadEntityInCurrentTransaction(RealmModel realm, K key, SessionEntityWrapper<V> entity) {
|
||||
if (entity == null) {
|
||||
throw new IllegalArgumentException("Null entity not allowed");
|
||||
}
|
||||
boolean offline = entity.getEntity().isOffline();
|
||||
|
||||
SessionEntityWrapper<V> latestEntity = getCache(offline).get(key);
|
||||
if (latestEntity == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
SessionUpdatesList<V> newUpdates = new SessionUpdatesList<>(realm, latestEntity);
|
||||
|
||||
SessionUpdatesList<V> existingUpdates = getUpdates(entity.getEntity().isOffline()).get(key);
|
||||
if (existingUpdates != null) {
|
||||
newUpdates.setUpdateTasks(existingUpdates.getUpdateTasks());
|
||||
}
|
||||
|
||||
getUpdates(entity.getEntity().isOffline()).put(key, newUpdates);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackImpl() {
|
||||
|
||||
|
|
|
@ -22,11 +22,11 @@ import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
|||
import java.util.Map;
|
||||
|
||||
public interface SessionChangesPerformer<K, V extends SessionEntity> {
|
||||
void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged);
|
||||
|
||||
default boolean benefitsFromBatching() {
|
||||
return false;
|
||||
default boolean shouldConsumeChange(V entity) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void registerChange(Map.Entry<K, SessionUpdatesList<V>> entry, MergedUpdate<V> merged);
|
||||
|
||||
void applyChanges();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright 2024 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.sessions.infinispan.changes;
|
||||
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
|
||||
public interface SessionsChangelogBasedTransaction<K, V extends SessionEntity> {
|
||||
|
||||
void addTask(K key, SessionUpdateTask<V> task);
|
||||
|
||||
}
|
|
@ -42,7 +42,7 @@ public class Tasks {
|
|||
}
|
||||
};
|
||||
|
||||
private static final SessionUpdateTask<? extends SessionEntity> REMOVE_SYNC = new SessionUpdateTask<SessionEntity>() {
|
||||
private static final SessionUpdateTask<? extends SessionEntity> REMOVE_SYNC = new PersistentSessionUpdateTask<SessionEntity>() {
|
||||
@Override
|
||||
public void runUpdate(SessionEntity entity) {
|
||||
}
|
||||
|
@ -56,6 +56,32 @@ public class Tasks {
|
|||
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<SessionEntity> sessionWrapper) {
|
||||
return CrossDCMessageStatus.SYNC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
private static final SessionUpdateTask<? extends SessionEntity> OFFLINE_REMOVE_SYNC = new PersistentSessionUpdateTask<SessionEntity>() {
|
||||
@Override
|
||||
public void runUpdate(SessionEntity entity) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public CacheOperation getOperation(SessionEntity entity) {
|
||||
return CacheOperation.REMOVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CrossDCMessageStatus getCrossDCMessageStatus(SessionEntityWrapper<SessionEntity> sessionWrapper) {
|
||||
return CrossDCMessageStatus.SYNC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOffline() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -77,4 +103,18 @@ public class Tasks {
|
|||
public static <S extends SessionEntity> SessionUpdateTask<S> removeSync() {
|
||||
return (SessionUpdateTask<S>) REMOVE_SYNC;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a typed task of type {@link CacheOperation#REMOVE} that does no other update. This operation has DC message
|
||||
* status {@link CrossDCMessageStatus#SYNC}.
|
||||
*
|
||||
* @param offline whether the operation should be performed on offline or non-offline session
|
||||
* @param <S>
|
||||
* @return
|
||||
*/
|
||||
public static <S extends SessionEntity> SessionUpdateTask<S> removeSync(boolean offline) {
|
||||
return offline ? (SessionUpdateTask<S>) OFFLINE_REMOVE_SYNC : (SessionUpdateTask<S>) REMOVE_SYNC;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -31,32 +31,35 @@ import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
|||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
|
||||
|
||||
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
|
||||
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session, Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, RemoteCacheInvoker remoteCacheInvoker, SessionFunction<UserSessionEntity> lifespanMsLoader, SessionFunction<UserSessionEntity> maxIdleTimeMsLoader, boolean offline, SerializeExecutionsByKey<String> serializer,
|
||||
|
||||
public UserSessionPersistentChangelogBasedTransaction(KeycloakSession session,
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache,
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineCache,
|
||||
RemoteCacheInvoker remoteCacheInvoker,
|
||||
SessionFunction<UserSessionEntity> lifespanMsLoader,
|
||||
SessionFunction<UserSessionEntity> maxIdleTimeMsLoader,
|
||||
SessionFunction<UserSessionEntity> offlineLifespanMsLoader,
|
||||
SessionFunction<UserSessionEntity> offlineMaxIdleTimeMsLoader,
|
||||
ArrayBlockingQueue<PersistentUpdate> batchingQueue) {
|
||||
super(session, cache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offline, serializer, batchingQueue);
|
||||
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue);
|
||||
}
|
||||
|
||||
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key) {
|
||||
SessionUpdatesList<UserSessionEntity> myUpdates = updates.get(key);
|
||||
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key, boolean offline) {
|
||||
SessionUpdatesList<UserSessionEntity> myUpdates = getUpdates(offline).get(key);
|
||||
if (myUpdates == null) {
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedEntity = null;
|
||||
if (!((Objects.equals(cache.getName(), USER_SESSION_CACHE_NAME) || Objects.equals(cache.getName(), CLIENT_SESSION_CACHE_NAME) || Objects.equals(cache.getName(), OFFLINE_USER_SESSION_CACHE_NAME) || Objects.equals(cache.getName(), OFFLINE_CLIENT_SESSION_CACHE_NAME)) && Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE))) {
|
||||
wrappedEntity = cache.get(key);
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE)) {
|
||||
wrappedEntity = getCache(offline).get(key);
|
||||
}
|
||||
|
||||
if (wrappedEntity == null) {
|
||||
LOG.debugf("user-session not found in cache for sessionId=%s offline=%s, loading from persister", key, offline);
|
||||
wrappedEntity = getSessionEntityFromPersister(realm, key);
|
||||
wrappedEntity = getSessionEntityFromPersister(realm, key, offline);
|
||||
} else {
|
||||
LOG.debugf("user-session found in cache for sessionId=%s offline=%s %s", key, offline, wrappedEntity.getEntity().getLastSessionRefresh());
|
||||
}
|
||||
|
@ -66,6 +69,9 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
|||
return null;
|
||||
}
|
||||
|
||||
// Cache does not contain the offline flag value so adding it
|
||||
wrappedEntity.getEntity().setOffline(offline);
|
||||
|
||||
RealmModel realmFromSession = kcSession.realms().getRealm(wrappedEntity.getEntity().getRealmId());
|
||||
if (!realmFromSession.getId().equals(realm.getId())) {
|
||||
LOG.warnf("Realm mismatch for session %s. Expected realm %s, but found realm %s", wrappedEntity.getEntity(), realm.getId(), realmFromSession.getId());
|
||||
|
@ -73,7 +79,7 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
|||
}
|
||||
|
||||
myUpdates = new SessionUpdatesList<>(realm, wrappedEntity);
|
||||
updates.put(key, myUpdates);
|
||||
getUpdates(offline).put(key, myUpdates);
|
||||
|
||||
return wrappedEntity;
|
||||
} else {
|
||||
|
@ -90,7 +96,7 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
|||
}
|
||||
}
|
||||
|
||||
public SessionEntityWrapper<UserSessionEntity> getSessionEntityFromPersister(RealmModel realm, String key) {
|
||||
private SessionEntityWrapper<UserSessionEntity> getSessionEntityFromPersister(RealmModel realm, String key, boolean offline) {
|
||||
UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class);
|
||||
UserSessionModel persistentUserSession = persister.loadUserSession(realm, key, offline);
|
||||
|
||||
|
@ -101,24 +107,20 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
|||
return importUserSession(persistentUserSession);
|
||||
}
|
||||
|
||||
private void removeSessionEntityFromPersister(String key) {
|
||||
UserSessionPersisterProvider persister = kcSession.getProvider(UserSessionPersisterProvider.class);
|
||||
persister.removeUserSession(key, offline);
|
||||
}
|
||||
|
||||
private SessionEntityWrapper<UserSessionEntity> importUserSession(UserSessionModel persistentUserSession) {
|
||||
String sessionId = persistentUserSession.getId();
|
||||
boolean offline = persistentUserSession.isOffline();
|
||||
|
||||
if (isScheduledForRemove(sessionId)) {
|
||||
if (isScheduledForRemove(sessionId, offline)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE) && (cache.getName().equals(USER_SESSION_CACHE_NAME) || cache.getName().equals(CLIENT_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME) || cache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME))) {
|
||||
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS_NO_CACHE)) {
|
||||
return ((PersistentUserSessionProvider) kcSession.getProvider(UserSessionProvider.class)).wrapPersistentEntity(persistentUserSession.getRealm(), offline, persistentUserSession);
|
||||
}
|
||||
|
||||
LOG.debugf("Attempting to import user-session for sessionId=%s offline=%s", sessionId, offline);
|
||||
SessionEntityWrapper<UserSessionEntity> ispnUserSessionEntity = ((PersistentUserSessionProvider) kcSession.getProvider(UserSessionProvider.class)).importUserSession(persistentUserSession, offline);;
|
||||
SessionEntityWrapper<UserSessionEntity> ispnUserSessionEntity = ((PersistentUserSessionProvider) kcSession.getProvider(UserSessionProvider.class)).importUserSession(persistentUserSession, offline);
|
||||
|
||||
if (ispnUserSessionEntity != null) {
|
||||
LOG.debugf("user-session found after import for sessionId=%s offline=%s", sessionId, offline);
|
||||
|
@ -128,8 +130,9 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
|
|||
LOG.debugf("user-session could not be found after import for sessionId=%s offline=%s", sessionId, offline);
|
||||
return null;
|
||||
}
|
||||
public boolean isScheduledForRemove(String key) {
|
||||
return isScheduledForRemove(updates.get(key));
|
||||
|
||||
public boolean isScheduledForRemove(String key, boolean offline) {
|
||||
return isScheduledForRemove(getUpdates(offline).get(key));
|
||||
}
|
||||
|
||||
private static <V extends SessionEntity> boolean isScheduledForRemove(SessionUpdatesList<V> myUpdates) {
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public abstract class UserSessionUpdateTask implements SessionUpdateTask<UserSessionEntity> {
|
||||
public abstract class UserSessionUpdateTask implements PersistentSessionUpdateTask<UserSessionEntity> {
|
||||
|
||||
@Override
|
||||
public CacheOperation getOperation(UserSessionEntity session) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.entities;
|
|||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.keycloak.common.Profile;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
|
||||
/**
|
||||
|
@ -32,6 +33,7 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
|||
public abstract class SessionEntity implements Serializable {
|
||||
|
||||
private String realmId;
|
||||
private boolean isOffline;
|
||||
|
||||
/**
|
||||
* Returns realmId ID.
|
||||
|
@ -66,4 +68,17 @@ public abstract class SessionEntity implements Serializable {
|
|||
@Override
|
||||
public abstract int hashCode();
|
||||
|
||||
public boolean isOffline() {
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||
throw new IllegalArgumentException("Offline flags are not supported in non-persistent-session environments.");
|
||||
}
|
||||
return isOffline;
|
||||
}
|
||||
|
||||
public void setOffline(boolean offline) {
|
||||
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
|
||||
throw new IllegalArgumentException("Offline flags are not supported in non-persistent-session environments.");
|
||||
}
|
||||
isOffline = offline;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue