From c6e071cf07022fe7d82d7a30714596cbe3b6bbb2 Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Thu, 23 May 2024 14:47:32 +0200 Subject: [PATCH] Clear entries in remote caches and force events on the remote site (#29597) Closes #29592 Signed-off-by: Alexander Schwartz --- .../PersistentUserSessionProvider.java | 87 ++++++++++++------- .../remotestore/RemoteCacheInvoker.java | 14 +++ 2 files changed, 72 insertions(+), 29 deletions(-) diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java index 2c34bf6221..810cced977 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java @@ -520,10 +520,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi @Override public void removeUserSessions(RealmModel realm) { - // Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions. + // Send message to all DCs as each site might have different entries in the cache clusterEventsSenderTx.addEvent( RemoveUserSessionsEvent.createEvent(RemoveUserSessionsEvent.class, InfinispanUserSessionProviderFactory.REMOVE_USER_SESSIONS_EVENT, session, realm.getId(), true), - ClusterProvider.DCNotify.LOCAL_DC_ONLY); + ClusterProvider.DCNotify.ALL_DCS); session.getProvider(UserSessionPersisterProvider.class).removeUserSessions(realm, false); } @@ -535,8 +535,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi // public for usage in the testsuite public void removeLocalUserSessions(String realmId, boolean offline) { - FuturesHelper futures = new FuturesHelper(); - Cache> cache = getCache(offline); Cache> localCache = CacheDecorators.localCache(cache); Cache> clientSessionCache = getClientSessionCache(offline); @@ -546,40 +544,71 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi final AtomicInteger userSessionsSize = new AtomicInteger(); - localCacheStoreIgnore - .entrySet() - .stream() - .filter(SessionPredicate.create(realmId)) - .map(Mappers.userSessionEntity()) - .forEach(new Consumer() { + removeEntriesByRealm(realmId, localCacheStoreIgnore, userSessionsSize, localCache, localClientSessionCache); - @Override - public void accept(UserSessionEntity userSessionEntity) { - userSessionsSize.incrementAndGet(); - - // Remove session from remoteCache too. Use removeAsync for better perf - Future future = localCache.removeAsync(userSessionEntity.getId()); - futures.addTask(future); - userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> { - Future f = localClientSessionCache.removeAsync(clientSessionId); - futures.addTask(f); - }); - } - - }); - - - futures.waitForAllToFinish(); + // TODO: This now runs on each node on each site. Ideally it should run only once on each site. + removeEntriesByRealmRemote(realmId, InfinispanUtil.getRemoteCache(getCache(offline)), userSessionsSize, InfinispanUtil.getRemoteCache(getClientSessionCache(offline))); log.debugf("Removed %d sessions in realm %s. Offline: %b", (Object) userSessionsSize.get(), realmId, offline); } + private static void removeEntriesByRealm(String realmId, Cache> sessions, AtomicInteger userSessionsSize, Cache> localCache, Cache> clientSessions) { + FuturesHelper futures = new FuturesHelper(); + + sessions + .entrySet() + .stream() + .filter(SessionPredicate.create(realmId)) + .map(Mappers.userSessionEntity()) + .forEach((Consumer) userSessionEntity -> { + userSessionsSize.incrementAndGet(); + + // Remove session from remoteCache too. Use removeAsync for better perf + Future> future = localCache.removeAsync(userSessionEntity.getId()); + futures.addTask(future); + userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> { + Future> f = clientSessions.removeAsync(clientSessionId); + futures.addTask(f); + }); + }); + + futures.waitForAllToFinish(); + } + + private static void removeEntriesByRealmRemote(String realmId, RemoteCache> sessions, AtomicInteger userSessionsSize, RemoteCache> clientSessions) { + if (sessions == null) { + return; + } + + FuturesHelper futures = new FuturesHelper(); + + sessions + .entrySet() + .stream() + .filter(SessionPredicate.create(realmId)) + .map(Mappers.userSessionEntity()) + .forEach((Consumer) userSessionEntity -> { + userSessionsSize.incrementAndGet(); + + Future> future = sessions.withFlags(org.infinispan.client.hotrod.Flag.SKIP_LISTENER_NOTIFICATION).removeAsync(userSessionEntity.getId()); + futures.addTask(future); + if (clientSessions != null) { + userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> { + Future> f = clientSessions.withFlags(org.infinispan.client.hotrod.Flag.SKIP_LISTENER_NOTIFICATION).removeAsync(clientSessionId); + futures.addTask(f); + }); + } + }); + + futures.waitForAllToFinish(); + } + @Override public void onRealmRemoved(RealmModel realm) { - // Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions. + // Send message to all DCs, as each DC might have different entries in their site cache clusterEventsSenderTx.addEvent( RealmRemovedSessionEvent.createEvent(RealmRemovedSessionEvent.class, InfinispanUserSessionProviderFactory.REALM_REMOVED_SESSION_EVENT, session, realm.getId(), true), - ClusterProvider.DCNotify.LOCAL_DC_ONLY); + ClusterProvider.DCNotify.ALL_DCS); UserSessionPersisterProvider sessionsPersister = session.getProvider(UserSessionPersisterProvider.class); if (sessionsPersister != null) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java index 3e76df5a70..45283d7eab 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java @@ -18,6 +18,7 @@ package org.keycloak.models.sessions.infinispan.remotestore; import org.infinispan.client.hotrod.exceptions.HotRodClientException; +import org.keycloak.common.Profile; import org.keycloak.common.util.Retry; import java.util.Collections; import java.util.HashMap; @@ -38,6 +39,11 @@ import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask; import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.connections.infinispan.InfinispanUtil; +import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME; +import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME; +import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME; +import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME; + /** * @author Marek Posolda */ @@ -146,6 +152,14 @@ public class RemoteCacheInvoker { VersionedValue> versioned = remoteCache.getWithMetadata(key); if (versioned == null) { + if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS) && + (remoteCache.getName().equals(USER_SESSION_CACHE_NAME) + || remoteCache.getName().equals(CLIENT_SESSION_CACHE_NAME) + || remoteCache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME) + || remoteCache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME))) { + logger.debugf("No existing entry for %s in the remote cache to remove, might have been evicted. A delete will force an eviction in the other DC.", key); + remoteCache.remove(key); + } logger.warnf("Not found entity to replace for key '%s'", key); return; }