From a72c297d5d246e17eb30e6b65f780638800f52cf Mon Sep 17 00:00:00 2001 From: mposolda Date: Tue, 8 Aug 2017 10:53:25 +0200 Subject: [PATCH] KEYCLOAK-4187 Fix LoginCrossDCTest --- .../InfinispanUserSessionProvider.java | 21 +- .../InfinispanChangelogBasedTransaction.java | 11 +- .../infinispan/remotestore/KcRemoteStore.java | 4 +- ...encyJDGRemoteCacheClientListenersTest.java | 58 +--- .../ConcurrencyJDGRemoteCacheTest.java | 52 +--- .../ConcurrencyJDGRemoveSessionTest.java | 292 ++++++++++++++++++ .../ConcurrencyJDGSessionsCacheTest.java | 51 +-- .../infinispan/TestCacheManagerFactory.java | 85 +++++ .../protocol/AuthorizationEndpointBase.java | 3 +- .../managers/UserSessionCrossDCManager.java | 13 + .../testsuite/crossdc/LoginCrossDCTest.java | 4 +- 11 files changed, 428 insertions(+), 166 deletions(-) create mode 100644 model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java create mode 100644 model/infinispan/src/test/java/org/keycloak/cluster/infinispan/TestCacheManagerFactory.java diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java index 8d04a50c80..7f10f9ecdb 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java @@ -321,13 +321,26 @@ public class InfinispanUserSessionProvider implements UserSessionProvider { // Recursion. We should have it locally now return getUserSessionWithPredicate(realm, id, offline, predicate); + } else { + log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id); + + return null; } + } else { + log.debugf("getUserSessionWithPredicate(%s): not found", id); + + // Session not available on remoteCache. Was already removed there. So removing locally too. + // TODO: Can be optimized to skip calling remoteCache.remove + removeUserSession(realm, userSession); + + return null; } + } else { + + log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id); + + return null; } - - log.debugf("getUserSessionWithPredicate(%s): not found", id); - - return null; } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java index d3bcacc122..43bb2b3929 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java @@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction extend return wrappedEntity; } else { - return myUpdates.getEntityWrapper(); + S entity = myUpdates.getEntityWrapper().getEntity(); + + // If entity is scheduled for remove, we don't return it. + boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> { + + return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE; + + }).findFirst().isPresent(); + + return scheduledForRemove ? null : myUpdates.getEntityWrapper(); } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java index 3d311222e1..60c34bf2d7 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/KcRemoteStore.java @@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore { public boolean delete(Object key) throws PersistenceException { logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName); - // Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously - getRemoteCache().removeAsync(key); + // Optimization - we don't need to know the previous value. + getRemoteCache().remove(key); return true; } diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java index f18d8d34fb..b5248850ae 100644 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java +++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java @@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; import org.infinispan.client.hotrod.annotation.ClientListener; import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; -import org.infinispan.configuration.cache.Configuration; -import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.configuration.global.GlobalConfigurationBuilder; -import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.persistence.manager.PersistenceManager; import org.infinispan.persistence.remote.RemoteStore; -import org.infinispan.persistence.remote.configuration.ExhaustedAction; import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; import org.junit.Assert; -import org.junit.Ignore; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.sessions.infinispan.util.InfinispanUtil; @@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest { } private static Worker createWorker(int threadId) { - EmbeddedCacheManager manager = createManager(threadId); + EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class); Cache cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME); System.out.println("Retrieved cache: " + threadId); @@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest { return new Worker(cache, threadId); } - private static EmbeddedCacheManager createManager(int threadId) { - System.setProperty("java.net.preferIPv4Stack", "true"); - System.setProperty("jgroups.tcp.port", "53715"); - GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); - - boolean clustered = false; - boolean async = false; - boolean allowDuplicateJMXDomains = true; - - if (clustered) { - gcb = gcb.clusteredDefault(); - gcb.transport().clusterName("test-clustering"); - } - - gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains); - - EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build()); - - Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId); - - cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration); - return cacheManager; - - } - - private static Configuration getCacheBackedByRemoteStore(int threadId) { - ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder(); - - int port = threadId==1 ? 12232 : 13232; - //int port = 12232; - - return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class) - .fetchPersistentState(false) - .ignoreModifications(false) - .purgeOnStartup(false) - .preload(false) - .shared(true) - .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME) - .rawValues(true) - .forceReturnValues(false) - .addServer() - .host("localhost") - .port(port) - .connectionPool() - .maxActive(20) - .exhaustedAction(ExhaustedAction.CREATE_NEW) - .async() - . enabled(false).build(); - } - @ClientListener public static class HotRodListener { diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java index df1b80ed52..9c23452e06 100644 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java +++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheTest.java @@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest { } private static Worker createWorker(int threadId) { - EmbeddedCacheManager manager = createManager(threadId); + EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class); Cache cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME); System.out.println("Retrieved cache: " + threadId); @@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest { return new Worker(cache, threadId); } - private static EmbeddedCacheManager createManager(int threadId) { - System.setProperty("java.net.preferIPv4Stack", "true"); - System.setProperty("jgroups.tcp.port", "53715"); - GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); - - boolean clustered = false; - boolean async = false; - boolean allowDuplicateJMXDomains = true; - - if (clustered) { - gcb = gcb.clusteredDefault(); - gcb.transport().clusterName("test-clustering"); - } - - gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains); - - EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build()); - - Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId); - - cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration); - return cacheManager; - - } - - private static Configuration getCacheBackedByRemoteStore(int threadId) { - ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder(); - - int port = threadId==1 ? 12232 : 13232; - //int port = 12232; - - return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class) - .fetchPersistentState(false) - .ignoreModifications(false) - .purgeOnStartup(false) - .preload(false) - .shared(true) - .remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME) - .rawValues(true) - .forceReturnValues(false) - .addServer() - .host("localhost") - .port(port) - .connectionPool() - .maxActive(20) - .exhaustedAction(ExhaustedAction.CREATE_NEW) - .async() - . enabled(false).build(); - } - @ClientListener public static class HotRodListener { diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java new file mode 100644 index 0000000000..ff4c3ce83e --- /dev/null +++ b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2017 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.cluster.infinispan; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import org.infinispan.Cache; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; +import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; +import org.infinispan.client.hotrod.annotation.ClientListener; +import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; +import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; +import org.infinispan.context.Flag; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.keycloak.common.util.Time; +import org.keycloak.connections.infinispan.InfinispanConnectionProvider; +import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; +import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; +import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; +import org.keycloak.models.sessions.infinispan.util.InfinispanUtil; + +/** + * Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true. + * + * Also check that listeners are executed asynchronously with some delay. + * + * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest} + * + * @author Marek Posolda + */ +public class ConcurrencyJDGRemoveSessionTest { + + protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class); + + private static final int ITERATIONS = 10000; + + private static RemoteCache remoteCache1; + private static RemoteCache remoteCache2; + + private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0); + private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0); + + private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0); + private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0); + + //private static Map state = new HashMap<>(); + + public static void main(String[] args) throws Exception { + Cache> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME); + Cache> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME); + + // Create caches, listeners and finally worker threads + Thread worker1 = createWorker(cache1, 1); + Thread worker2 = createWorker(cache2, 2); + + // Create 100 initial sessions + for (int i=0 ; i wrappedSession = createSessionEntity(sessionId); + cache1.put(sessionId, wrappedSession); + } + + logger.info("SESSIONS CREATED"); + + // Create 100 initial sessions + for (int i=0 ; i=0 ; i--) { + String sessionId = String.valueOf(i); + + logger.infof("Before call cache2.get: %s", sessionId); + + SessionEntityWrapper loadedWrapper = cache2.get(sessionId); + Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper); + } + + logger.info("SESSIONS NOT AVAILABLE ON DC2"); + + + // // Start and join workers +// worker1.start(); +// worker2.start(); +// +// worker1.join(); +// worker2.join(); + + } finally { + Thread.sleep(2000); + + // Finish JVM + cache1.getCacheManager().stop(); + cache2.getCacheManager().stop(); + } + + long took = System.currentTimeMillis() - start; + +// // Output +// for (Map.Entry entry : state.entrySet()) { +// System.out.println(entry.getKey() + ":::" + entry.getValue()); +// worker1.cache.remove(entry.getKey()); +// } + +// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() + +// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + +// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() ); +// +// System.out.println("Sleeping before other report"); +// +// Thread.sleep(1000); +// +// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() + +// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + +// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get()); + + + } + + + private static SessionEntityWrapper createSessionEntity(String sessionId) { + // Create 100 initial sessions + UserSessionEntity session = new UserSessionEntity(); + session.setId(sessionId); + session.setRealm("foo"); + session.setBrokerSessionId("!23123123"); + session.setBrokerUserId(null); + session.setUser("foo"); + session.setLoginUsername("foo"); + session.setIpAddress("123.44.143.178"); + session.setStarted(Time.currentTime()); + session.setLastSessionRefresh(Time.currentTime()); + + AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(); + clientSession.setAuthMethod("saml"); + clientSession.setAction("something"); + clientSession.setTimestamp(1234); + clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2"))); + clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2"))); + session.getAuthenticatedClientSessions().put("client1", clientSession); + + SessionEntityWrapper wrappedSession = new SessionEntityWrapper<>(session); + return wrappedSession; + } + + + private static Thread createWorker(Cache> cache, int threadId) { + System.out.println("Retrieved cache: " + threadId); + + RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache); + + if (threadId == 1) { + remoteCache1 = remoteCache; + } else { + remoteCache2 = remoteCache; + } + + AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2; + HotRodListener listener = new HotRodListener(cache, remoteCache, counter); + remoteCache.addClientListener(listener); + + return new RemoteCacheWorker(remoteCache, threadId); + //return new CacheWorker(cache, threadId); + } + + + private static EmbeddedCacheManager createManager(int threadId) { + return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class); + } + + + @ClientListener + public static class HotRodListener { + + private Cache> origCache; + private RemoteCache remoteCache; + private AtomicInteger listenerCount; + + public HotRodListener(Cache> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) { + this.listenerCount = listenerCount; + this.remoteCache = remoteCache; + this.origCache = origCache; + } + + + @ClientCacheEntryCreated + public void created(ClientCacheEntryCreatedEvent event) { + String cacheKey = (String) event.getKey(); + + logger.infof("Listener executed for creating of session %s", cacheKey); + } + + + @ClientCacheEntryModified + public void modified(ClientCacheEntryModifiedEvent event) { + String cacheKey = (String) event.getKey(); + + logger.infof("Listener executed for modifying of session %s", cacheKey); + } + + + @ClientCacheEntryRemoved + public void removed(ClientCacheEntryRemovedEvent event) { + String cacheKey = (String) event.getKey(); + + logger.infof("Listener executed for removing of session %s", cacheKey); + + // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried + origCache + .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE) + .remove(cacheKey); + + } + + } + + private static class RemoteCacheWorker extends Thread { + + private final RemoteCache remoteCache; + + private final int myThreadId; + + private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) { + this.remoteCache = remoteCache; + this.myThreadId = myThreadId; + } + + @Override + public void run() { + + for (int i=0 ; iMarek Posolda + */ +class TestCacheManagerFactory { + + + EmbeddedCacheManager createManager(int threadId, String cacheName, Class builderClass) { + System.setProperty("java.net.preferIPv4Stack", "true"); + System.setProperty("jgroups.tcp.port", "53715"); + GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); + + boolean clustered = false; + boolean async = false; + boolean allowDuplicateJMXDomains = true; + + if (clustered) { + gcb = gcb.clusteredDefault(); + gcb.transport().clusterName("test-clustering"); + } + + gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains); + + EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build()); + + Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass); + + cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration); + return cacheManager; + + } + + + private Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class builderClass) { + ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder(); + + int port = threadId==1 ? 12232 : 13232; + //int port = 12232; + + return cacheConfigBuilder.persistence().addStore(builderClass) + .fetchPersistentState(false) + .ignoreModifications(false) + .purgeOnStartup(false) + .preload(false) + .shared(true) + .remoteCacheName(cacheName) + .rawValues(true) + .forceReturnValues(false) + .marshaller(KeycloakHotRodMarshallerFactory.class.getName()) + .addServer() + .host("localhost") + .port(port) + .connectionPool() + .maxActive(20) + .exhaustedAction(ExhaustedAction.CREATE_NEW) + .async() + . enabled(false).build(); + } +} diff --git a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java index 11d44af84a..c06caa0737 100755 --- a/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java +++ b/services/src/main/java/org/keycloak/protocol/AuthorizationEndpointBase.java @@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException; import org.keycloak.services.managers.AuthenticationManager; import org.keycloak.services.managers.AuthenticationSessionManager; import org.keycloak.services.managers.ClientSessionCode; +import org.keycloak.services.managers.UserSessionCrossDCManager; import org.keycloak.services.messages.Messages; import org.keycloak.services.resources.LoginActionsService; import org.keycloak.services.util.CacheControlUtil; @@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase { } } - UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId); + UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId); if (userSession != null) { logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId); diff --git a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java index 1d1a3bec7f..11795e5456 100644 --- a/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java +++ b/services/src/main/java/org/keycloak/services/managers/UserSessionCrossDCManager.java @@ -62,4 +62,17 @@ public class UserSessionCrossDCManager { }); } + + + // Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified + public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) { + UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id); + + // This will remove userSession "locally" if it doesn't exists on remoteCache + kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> { + return userSession2 == null; + }); + + return kcSession.sessions().getUserSession(realm, id); + } } diff --git a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java index f305013f7f..c0eb849048 100644 --- a/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java +++ b/testsuite/integration-arquillian/tests/base/src/test/java/org/keycloak/testsuite/crossdc/LoginCrossDCTest.java @@ -39,8 +39,8 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest { //log.info("Started to sleep"); //Thread.sleep(10000000); - for (int i=0 ; i<10 ; i++) { - OAuthClient.AuthorizationEndpointResponse response1 = Retry.call(() -> oauth.doLogin("test-user@localhost", "password"), 20, 100); + for (int i=0 ; i<30 ; i++) { + OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password"); String code = response1.getCode(); OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password"); Assert.assertNotNull(response2.getAccessToken());