Make persistent sessions co-exist with remote cache feature (#30859)

Closes #30855

Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
Alexander Schwartz 2024-07-09 09:03:36 +02:00 committed by GitHub
parent 523653ba2f
commit d70f78072e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 140 additions and 63 deletions

View file

@ -321,6 +321,10 @@ jobs:
if: needs.conditional.outputs.ci-store == 'true'
runs-on: ubuntu-latest
timeout-minutes: 150
strategy:
matrix:
variant: [ "pus-ec", "pus-rc" ]
fail-fast: false
steps:
- uses: actions/checkout@v4
@ -328,11 +332,24 @@ jobs:
name: Integration test setup
uses: ./.github/actions/integration-test-setup
- name: Run base tests without cache
- name: Run base tests
run: |
TESTS=`testsuite/integration-arquillian/tests/base/testsuites/suite.sh persistent-sessions`
echo "Tests: $TESTS"
./mvnw test ${{ env.SUREFIRE_RETRY }} -Pauth-server-quarkus -Dauth.server.feature="persistent-user-sessions" -Dtest=$TESTS -pl testsuite/integration-arquillian/tests/base 2>&1 | misc/log/trimmer.sh
case "${{ matrix.variant }}" in
pus-ec)
VARIANT="-Dauth.server.feature=persistent-user-sessions"
;;
pus-rc)
VARIANT="-Pinfinispan-server -Dauth.server.feature=persistent-user-sessions,multi-site,remote-cache"
;;
*)
echo "Unknown Matrix element"
exit 1
;;
esac
echo "Variant: $VARIANT"
./mvnw test ${{ env.SUREFIRE_RETRY }} -Pauth-server-quarkus $VARIANT -Dtest=$TESTS -pl testsuite/integration-arquillian/tests/base 2>&1 | misc/log/trimmer.sh
- name: Upload JVM Heapdumps
if: always()
@ -349,7 +366,7 @@ jobs:
if: always()
uses: ./.github/actions/archive-surefire-reports
with:
job-id: store-integration-tests-${{ matrix.db }}
job-id: store-integration-tests-${{ matrix.variant }}
- name: EC2 Maven Logs
if: failure()

View file

@ -26,6 +26,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.affinity.KeyGenerator;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.persistence.remote.RemoteStore;
import org.jboss.logging.Logger;
@ -76,7 +77,9 @@ import org.keycloak.provider.ProviderEvent;
import org.keycloak.provider.ProviderEventListener;
import org.keycloak.provider.ServerInfoAwareProviderFactory;
public class InfinispanUserSessionProviderFactory implements UserSessionProviderFactory, ServerInfoAwareProviderFactory, EnvironmentDependentProviderFactory {
import static org.keycloak.common.Profile.Feature.PERSISTENT_USER_SESSIONS;
public class InfinispanUserSessionProviderFactory implements UserSessionProviderFactory<UserSessionProvider>, ServerInfoAwareProviderFactory, EnvironmentDependentProviderFactory {
private static final Logger log = Logger.getLogger(InfinispanUserSessionProviderFactory.class);
@ -87,6 +90,10 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
public static final String CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE = "offlineClientSessionCacheEntryLifespanOverride";
public static final String CONFIG_MAX_BATCH_SIZE = "maxBatchSize";
public static final int DEFAULT_MAX_BATCH_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 2);
public static final String CONFIG_USE_CACHES = "useCaches";
private static final boolean DEFAULT_USE_CACHES = true;
public static final String CONFIG_USE_BATCHES = "useBatches";
private static final boolean DEFAULT_USE_BATCHES = true;
private long offlineSessionCacheEntryLifespanOverride;
@ -103,17 +110,26 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
SerializeExecutionsByKey<String> serializerOfflineSession = new SerializeExecutionsByKey<>();
SerializeExecutionsByKey<UUID> serializerClientSession = new SerializeExecutionsByKey<>();
SerializeExecutionsByKey<UUID> serializerOfflineClientSession = new SerializeExecutionsByKey<>();
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate = new ArrayBlockingQueue<>(1000);
ArrayBlockingQueue<PersistentUpdate> asyncQueuePersistentUpdate;
private PersistentSessionsWorker persistentSessionsWorker;
private int maxBatchSize;
private boolean useCaches;
private boolean useBatches;
@Override
public UserSessionProvider create(KeycloakSession session) {
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = null;
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionsCache = null;
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache = null;
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = null;
if (useCaches) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = connections.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
Cache<String, SessionEntityWrapper<UserSessionEntity>> offlineSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME);
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessionCache = connections.getCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME);
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> offlineClientSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
cache = connections.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
offlineSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME);
clientSessionCache = connections.getCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME);
offlineClientSessionsCache = connections.getCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME);
}
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
return new PersistentUserSessionProvider(
@ -159,6 +175,11 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
offlineSessionCacheEntryLifespanOverride = config.getInt(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, -1);
offlineClientSessionCacheEntryLifespanOverride = config.getInt(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, -1);
maxBatchSize = config.getInt(CONFIG_MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_SIZE);
useCaches = config.getBoolean(CONFIG_USE_CACHES, DEFAULT_USE_CACHES) && InfinispanUtils.isEmbeddedInfinispan();
useBatches = config.getBoolean(CONFIG_USE_BATCHES, DEFAULT_USE_BATCHES) && Profile.isFeatureEnabled(PERSISTENT_USER_SESSIONS);
if (useBatches) {
asyncQueuePersistentUpdate = new ArrayBlockingQueue<>(1000);
}
}
@Override
@ -168,7 +189,14 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
@Override
public void onEvent(ProviderEvent event) {
if (event instanceof PostMigrationEvent) {
if (!useCaches) {
keyGenerator = new InfinispanKeyGenerator() {
@Override
protected <K> K generateKey(KeycloakSession session, Cache<K, ?> cache, KeyGenerator<K> keyGenerator) {
return keyGenerator.getKey();
}
};
} else {
int preloadTransactionTimeout = getTimeoutForPreloadingSessionsSeconds();
log.debugf("Will preload sessions with transaction timeout %d seconds", preloadTransactionTimeout);
@ -182,6 +210,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
registerClusterListeners(session);
loadSessionsFromRemoteCaches(session);
}, preloadTransactionTimeout);
}
} else if (event instanceof UserModel.UserRemovedEvent) {
UserModel.UserRemovedEvent userRemovedEvent = (UserModel.UserRemovedEvent) event;
@ -208,7 +237,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
}
}
});
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
if (Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS) && useBatches) {
persistentSessionsWorker = new PersistentSessionsWorker(factory,
asyncQueuePersistentUpdate,
maxBatchSize);
@ -428,7 +457,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
@Override
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isEmbeddedInfinispan();
return InfinispanUtils.isEmbeddedInfinispan() || Profile.isFeatureEnabled(PERSISTENT_USER_SESSIONS);
}
@Override
@ -437,6 +466,8 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
info.put(CONFIG_OFFLINE_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, Long.toString(offlineSessionCacheEntryLifespanOverride));
info.put(CONFIG_OFFLINE_CLIENT_SESSION_CACHE_ENTRY_LIFESPAN_OVERRIDE, Long.toString(offlineClientSessionCacheEntryLifespanOverride));
info.put(CONFIG_MAX_BATCH_SIZE, Integer.toString(maxBatchSize));
info.put(CONFIG_USE_CACHES, Boolean.toString(useCaches));
info.put(CONFIG_USE_BATCHES, Boolean.toString(useBatches));
return info;
}
@ -463,6 +494,12 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
.helpText("Override how long offline user sessions should be kept in memory")
.add();
builder.property()
.name(CONFIG_USE_CACHES)
.type("boolean")
.helpText("Enable or disable caches. Enabled by default unless the external feature to use only external remote caches is used")
.add();
return builder.build();
}

View file

@ -425,6 +425,10 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
// Try lookup userSession from remoteCache
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
if (cache == null) {
return null;
}
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
if (remoteCache != null) {

View file

@ -38,6 +38,8 @@ import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
public class ClientSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<UUID, AuthenticatedClientSessionEntity> {
private static final Logger LOG = Logger.getLogger(ClientSessionPersistentChangelogBasedTransaction.class);
@ -55,7 +57,7 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<UUID> serializerOnline,
SerializeExecutionsByKey<UUID> serializerOffline) {
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
super(session, CLIENT_SESSION_CACHE_NAME, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
this.userSessionTx = userSessionTx;
}
@ -63,7 +65,10 @@ public class ClientSessionPersistentChangelogBasedTransaction extends Persistent
SessionUpdatesList<AuthenticatedClientSessionEntity> myUpdates = getUpdates(offline).get(key);
if (myUpdates == null) {
SessionEntityWrapper<AuthenticatedClientSessionEntity> wrappedEntity = null;
wrappedEntity = getCache(offline).get(key);
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> cache = getCache(offline);
if (cache != null) {
wrappedEntity = cache.get(key);
}
if (wrappedEntity == null) {
LOG.debugf("client-session not found in cache for sessionId=%s, offline=%s, loading from persister", key, offline);

View file

@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.changes;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.common.Profile;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
@ -27,18 +26,15 @@ import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.SessionFunction;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import org.keycloak.models.utils.KeycloakModelUtils;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Stream;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends SessionEntity> extends AbstractKeycloakTransaction implements SessionsChangelogBasedTransaction<K, V> {
private static final Logger LOG = Logger.getLogger(PersistentSessionsChangelogBasedTransaction.class);
@ -54,6 +50,7 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
private final SessionFunction<V> offlineMaxIdleTimeMsLoader;
public PersistentSessionsChangelogBasedTransaction(KeycloakSession session,
String cacheName,
Cache<K, SessionEntityWrapper<V>> cache,
Cache<K, SessionEntityWrapper<V>> offlineCache,
RemoteCacheInvoker remoteCacheInvoker,
@ -66,46 +63,49 @@ abstract public class PersistentSessionsChangelogBasedTransaction<K, V extends S
SerializeExecutionsByKey<K> serializerOffline) {
kcSession = session;
if (!Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS)) {
throw new IllegalStateException("Persistent user sessions are not enabled");
changesPerformers = new LinkedList<>();
if (batchingQueue != null) {
changesPerformers.add(new JpaChangesPerformer<>(cacheName, batchingQueue));
} else {
changesPerformers.add(new JpaChangesPerformer<>(cacheName, null) {
@Override
public void applyChanges() {
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(),
super::applyChangesSynchronously);
}
});
}
if (! (
cache.getName().equals(USER_SESSION_CACHE_NAME)
|| cache.getName().equals(CLIENT_SESSION_CACHE_NAME)
|| cache.getName().equals(OFFLINE_USER_SESSION_CACHE_NAME)
|| cache.getName().equals(OFFLINE_CLIENT_SESSION_CACHE_NAME)
)) {
throw new IllegalStateException("Cache name is not valid for persistent user sessions: " + cache.getName());
}
changesPerformers = List.of(
new JpaChangesPerformer<>(cache.getName(), batchingQueue),
new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) {
if (cache != null) {
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(cache, serializerOnline) {
@Override
public boolean shouldConsumeChange(V entity) {
return !entity.isOffline();
}
},
new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){
@Override
public boolean shouldConsumeChange(V entity) {
return entity.isOffline();
}
},
new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) {
});
changesPerformers.add(new RemoteCachesChangesPerformer<>(session, cache, remoteCacheInvoker) {
@Override
public boolean shouldConsumeChange(V entity) {
return !entity.isOffline();
}
},
new RemoteCachesChangesPerformer<>(session, offlineCache, remoteCacheInvoker) {
});
}
if (offlineCache != null) {
changesPerformers.add(new EmbeddedCachesChangesPerformer<>(offlineCache, serializerOffline){
@Override
public boolean shouldConsumeChange(V entity) {
return entity.isOffline();
}
});
changesPerformers.add(new RemoteCachesChangesPerformer<>(session, offlineCache, remoteCacheInvoker) {
@Override
public boolean shouldConsumeChange(V entity) {
return entity.isOffline();
}
});
}
);
this.cache = cache;
this.offlineCache = offlineCache;

View file

@ -32,6 +32,8 @@ import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import java.util.concurrent.ArrayBlockingQueue;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
public class UserSessionPersistentChangelogBasedTransaction extends PersistentSessionsChangelogBasedTransaction<String, UserSessionEntity> {
private static final Logger LOG = Logger.getLogger(UserSessionPersistentChangelogBasedTransaction.class);
@ -47,14 +49,17 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
ArrayBlockingQueue<PersistentUpdate> batchingQueue,
SerializeExecutionsByKey<String> serializerOnline,
SerializeExecutionsByKey<String> serializerOffline) {
super(session, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
super(session, USER_SESSION_CACHE_NAME, cache, offlineCache, remoteCacheInvoker, lifespanMsLoader, maxIdleTimeMsLoader, offlineLifespanMsLoader, offlineMaxIdleTimeMsLoader, batchingQueue, serializerOnline, serializerOffline);
}
public SessionEntityWrapper<UserSessionEntity> get(RealmModel realm, String key, boolean offline) {
SessionUpdatesList<UserSessionEntity> myUpdates = getUpdates(offline).get(key);
if (myUpdates == null) {
SessionEntityWrapper<UserSessionEntity> wrappedEntity = null;
wrappedEntity = getCache(offline).get(key);
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
if (cache != null) {
wrappedEntity = cache.get(key);
}
if (wrappedEntity == null) {
LOG.debugf("user-session not found in cache for sessionId=%s offline=%s, loading from persister", key, offline);
@ -110,6 +115,10 @@ public class UserSessionPersistentChangelogBasedTransaction extends PersistentSe
return null;
}
if (getCache(offline) == null) {
return ((PersistentUserSessionProvider) kcSession.getProvider(UserSessionProvider.class)).wrapPersistentEntity(persistentUserSession.getRealm(), offline, persistentUserSession);
}
LOG.debugf("Attempting to import user-session for sessionId=%s offline=%s", sessionId, offline);
SessionEntityWrapper<UserSessionEntity> ispnUserSessionEntity = ((PersistentUserSessionProvider) kcSession.getProvider(UserSessionProvider.class)).importUserSession(persistentUserSession, offline);

View file

@ -5,6 +5,7 @@ import java.util.UUID;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.Config;
import org.keycloak.common.Profile;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.KeycloakSession;
@ -67,7 +68,7 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
@Override
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isRemoteInfinispan();
return InfinispanUtils.isRemoteInfinispan() && !Profile.isFeatureEnabled(Profile.Feature.PERSISTENT_USER_SESSIONS);
}
@Override

View file

@ -52,7 +52,7 @@ public class InfinispanKeyGenerator {
}
private <K> K generateKey(KeycloakSession session, Cache<K, ?> cache, KeyGenerator<K> keyGenerator) {
protected <K> K generateKey(KeycloakSession session, Cache<K, ?> cache, KeyGenerator<K> keyGenerator) {
String cacheName = cache.getName();
// "wantsLocalKey" is true if route is not attached to the sticky session cookie. Without attached route, We want the key, which will be "owned" by this node.

View file

@ -21,6 +21,7 @@ import org.infinispan.Cache;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
@ -32,6 +33,7 @@ import org.keycloak.models.sessions.infinispan.changes.sessions.SessionData;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.representations.idm.RealmRepresentation;
import org.keycloak.testsuite.AbstractKeycloakTest;
import org.keycloak.testsuite.ProfileAssume;
import org.keycloak.testsuite.runonserver.RunOnServer;
import org.keycloak.timer.TimerProvider;
@ -63,6 +65,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
@Test
public void testLastSessionRefreshCounters() {
ProfileAssume.assumeFeatureDisabled(Profile.Feature.REMOTE_CACHE);
testingClient.server().run(new LastSessionRefreshServerCounterTest());
}
@ -107,6 +110,7 @@ public class LastSessionRefreshUnitTest extends AbstractKeycloakTest {
@Test
public void testLastSessionRefreshIntervals() {
ProfileAssume.assumeFeatureDisabled(Profile.Feature.REMOTE_CACHE);
testingClient.server().run(new LastSessionRefreshServerIntervalsTest());
}