Clear entries in remote caches and force events on the remote site (#29597)

Closes #29592

Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
Alexander Schwartz 2024-05-23 14:47:32 +02:00 committed by GitHub
parent 2efc163b89
commit c6e071cf07
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 72 additions and 29 deletions

View file

@ -520,10 +520,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
@Override
public void removeUserSessions(RealmModel realm) {
// Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions.
// Send message to all DCs as each site might have different entries in the cache
clusterEventsSenderTx.addEvent(
RemoveUserSessionsEvent.createEvent(RemoveUserSessionsEvent.class, InfinispanUserSessionProviderFactory.REMOVE_USER_SESSIONS_EVENT, session, realm.getId(), true),
ClusterProvider.DCNotify.LOCAL_DC_ONLY);
ClusterProvider.DCNotify.ALL_DCS);
session.getProvider(UserSessionPersisterProvider.class).removeUserSessions(realm, false);
}
@ -535,8 +535,6 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
// public for usage in the testsuite
public void removeLocalUserSessions(String realmId, boolean offline) {
FuturesHelper futures = new FuturesHelper();
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(cache);
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache = getClientSessionCache(offline);
@ -546,40 +544,71 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
final AtomicInteger userSessionsSize = new AtomicInteger();
localCacheStoreIgnore
.entrySet()
.stream()
.filter(SessionPredicate.create(realmId))
.map(Mappers.userSessionEntity())
.forEach(new Consumer<UserSessionEntity>() {
removeEntriesByRealm(realmId, localCacheStoreIgnore, userSessionsSize, localCache, localClientSessionCache);
@Override
public void accept(UserSessionEntity userSessionEntity) {
userSessionsSize.incrementAndGet();
// Remove session from remoteCache too. Use removeAsync for better perf
Future future = localCache.removeAsync(userSessionEntity.getId());
futures.addTask(future);
userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> {
Future f = localClientSessionCache.removeAsync(clientSessionId);
futures.addTask(f);
});
}
});
futures.waitForAllToFinish();
// TODO: This now runs on each node on each site. Ideally it should run only once on each site.
removeEntriesByRealmRemote(realmId, InfinispanUtil.getRemoteCache(getCache(offline)), userSessionsSize, InfinispanUtil.getRemoteCache(getClientSessionCache(offline)));
log.debugf("Removed %d sessions in realm %s. Offline: %b", (Object) userSessionsSize.get(), realmId, offline);
}
private static void removeEntriesByRealm(String realmId, Cache<String, SessionEntityWrapper<UserSessionEntity>> sessions, AtomicInteger userSessionsSize, Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache, Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessions) {
FuturesHelper futures = new FuturesHelper();
sessions
.entrySet()
.stream()
.filter(SessionPredicate.create(realmId))
.map(Mappers.userSessionEntity())
.forEach((Consumer<UserSessionEntity>) userSessionEntity -> {
userSessionsSize.incrementAndGet();
// Remove session from remoteCache too. Use removeAsync for better perf
Future<SessionEntityWrapper<UserSessionEntity>> future = localCache.removeAsync(userSessionEntity.getId());
futures.addTask(future);
userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> {
Future<SessionEntityWrapper<AuthenticatedClientSessionEntity>> f = clientSessions.removeAsync(clientSessionId);
futures.addTask(f);
});
});
futures.waitForAllToFinish();
}
private static void removeEntriesByRealmRemote(String realmId, RemoteCache<String, SessionEntityWrapper<UserSessionEntity>> sessions, AtomicInteger userSessionsSize, RemoteCache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessions) {
if (sessions == null) {
return;
}
FuturesHelper futures = new FuturesHelper();
sessions
.entrySet()
.stream()
.filter(SessionPredicate.create(realmId))
.map(Mappers.userSessionEntity())
.forEach((Consumer<UserSessionEntity>) userSessionEntity -> {
userSessionsSize.incrementAndGet();
Future<SessionEntityWrapper<UserSessionEntity>> future = sessions.withFlags(org.infinispan.client.hotrod.Flag.SKIP_LISTENER_NOTIFICATION).removeAsync(userSessionEntity.getId());
futures.addTask(future);
if (clientSessions != null) {
userSessionEntity.getAuthenticatedClientSessions().forEach((clientUUID, clientSessionId) -> {
Future<SessionEntityWrapper<AuthenticatedClientSessionEntity>> f = clientSessions.withFlags(org.infinispan.client.hotrod.Flag.SKIP_LISTENER_NOTIFICATION).removeAsync(clientSessionId);
futures.addTask(f);
});
}
});
futures.waitForAllToFinish();
}
@Override
public void onRealmRemoved(RealmModel realm) {
// Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions.
// Send message to all DCs, as each DC might have different entries in their site cache
clusterEventsSenderTx.addEvent(
RealmRemovedSessionEvent.createEvent(RealmRemovedSessionEvent.class, InfinispanUserSessionProviderFactory.REALM_REMOVED_SESSION_EVENT, session, realm.getId(), true),
ClusterProvider.DCNotify.LOCAL_DC_ONLY);
ClusterProvider.DCNotify.ALL_DCS);
UserSessionPersisterProvider sessionsPersister = session.getProvider(UserSessionPersisterProvider.class);
if (sessionsPersister != null) {

View file

@ -18,6 +18,7 @@
package org.keycloak.models.sessions.infinispan.remotestore;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Retry;
import java.util.Collections;
import java.util.HashMap;
@ -38,6 +39,11 @@ import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.connections.infinispan.InfinispanUtil;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -146,6 +152,14 @@ public class RemoteCacheInvoker {
VersionedValue<SessionEntityWrapper<V>> versioned = remoteCache.getWithMetadata(key);
if (versioned == null) {
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS) &&
(remoteCache.getName().equals(USER_SESSION_CACHE_NAME)
|| remoteCache.getName().equals(CLIENT_SESSION_CACHE_NAME)
|| remoteCache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME)
|| remoteCache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME))) {
logger.debugf("No existing entry for %s in the remote cache to remove, might have been evicted. A delete will force an eviction in the other DC.", key);
remoteCache.remove(key);
}
logger.warnf("Not found entity to replace for key '%s'", key);
return;
}