Merge pull request #4375 from mposolda/ispn-clientListeners-bugs

KEYCLOAK-4187 Fix LoginCrossDCTest
This commit is contained in:
Marek Posolda 2017-08-08 15:56:51 +02:00 committed by GitHub
commit 07044eeeaa
11 changed files with 428 additions and 166 deletions

View file

@ -321,14 +321,27 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
// Recursion. We should have it locally now
return getUserSessionWithPredicate(realm, id, offline, predicate);
}
}
}
log.debugf("getUserSessionWithPredicate(%s): not found", id);
} else {
log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id);
return null;
}
} else {
log.debugf("getUserSessionWithPredicate(%s): not found", id);
// Session not available on remoteCache. Was already removed there. So removing locally too.
// TODO: Can be optimized to skip calling remoteCache.remove
removeUserSession(realm, userSession);
return null;
}
} else {
log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id);
return null;
}
}
@Override

View file

@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction<S extends SessionEntity> extend
return wrappedEntity;
} else {
return myUpdates.getEntityWrapper();
S entity = myUpdates.getEntityWrapper().getEntity();
// If entity is scheduled for remove, we don't return it.
boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
}).findFirst().isPresent();
return scheduledForRemove ? null : myUpdates.getEntityWrapper();
}
}

View file

@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore {
public boolean delete(Object key) throws PersistenceException {
logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
// Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously
getRemoteCache().removeAsync(key);
// Optimization - we don't need to know the previous value.
getRemoteCache().remove(key);
return true;
}

View file

@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.junit.Assert;
import org.junit.Ignore;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
}
private static Worker createWorker(int threadId) {
EmbeddedCacheManager manager = createManager(threadId);
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
return new Worker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
@ClientListener
public static class HotRodListener {

View file

@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest {
}
private static Worker createWorker(int threadId) {
EmbeddedCacheManager manager = createManager(threadId);
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest {
return new Worker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
@ClientListener
public static class HotRodListener {

View file

@ -0,0 +1,292 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.context.Flag;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
*
* Also check that listeners are executed asynchronously with some delay.
*
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ConcurrencyJDGRemoveSessionTest {
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
private static final int ITERATIONS = 10000;
private static RemoteCache remoteCache1;
private static RemoteCache remoteCache2;
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
//private static Map<String, EntryInfo> state = new HashMap<>();
public static void main(String[] args) throws Exception {
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
// Create caches, listeners and finally worker threads
Thread worker1 = createWorker(cache1, 1);
Thread worker2 = createWorker(cache2, 2);
// Create 100 initial sessions
for (int i=0 ; i<ITERATIONS ; i++) {
String sessionId = String.valueOf(i);
SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
cache1.put(sessionId, wrappedSession);
}
logger.info("SESSIONS CREATED");
// Create 100 initial sessions
for (int i=0 ; i<ITERATIONS ; i++) {
String sessionId = String.valueOf(i);
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
}
logger.info("SESSIONS AVAILABLE ON DC2");
long start = System.currentTimeMillis();
try {
// Just running in current thread
worker1.run();
logger.info("SESSIONS REMOVED");
//Thread.sleep(5000);
// Doing it in opposite direction to ensure that newer are checked first.
// This us currently FAILING (expected) as listeners are executed asynchronously.
for (int i=ITERATIONS-1 ; i>=0 ; i--) {
String sessionId = String.valueOf(i);
logger.infof("Before call cache2.get: %s", sessionId);
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
}
logger.info("SESSIONS NOT AVAILABLE ON DC2");
// // Start and join workers
// worker1.start();
// worker2.start();
//
// worker1.join();
// worker2.join();
} finally {
Thread.sleep(2000);
// Finish JVM
cache1.getCacheManager().stop();
cache2.getCacheManager().stop();
}
long took = System.currentTimeMillis() - start;
// // Output
// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
// System.out.println(entry.getKey() + ":::" + entry.getValue());
// worker1.cache.remove(entry.getKey());
// }
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
//
// System.out.println("Sleeping before other report");
//
// Thread.sleep(1000);
//
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
}
private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
// Create 100 initial sessions
UserSessionEntity session = new UserSessionEntity();
session.setId(sessionId);
session.setRealm("foo");
session.setBrokerSessionId("!23123123");
session.setBrokerUserId(null);
session.setUser("foo");
session.setLoginUsername("foo");
session.setIpAddress("123.44.143.178");
session.setStarted(Time.currentTime());
session.setLastSessionRefresh(Time.currentTime());
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
clientSession.setAuthMethod("saml");
clientSession.setAction("something");
clientSession.setTimestamp(1234);
clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
session.getAuthenticatedClientSessions().put("client1", clientSession);
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
return wrappedSession;
}
private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
System.out.println("Retrieved cache: " + threadId);
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
if (threadId == 1) {
remoteCache1 = remoteCache;
} else {
remoteCache2 = remoteCache;
}
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
remoteCache.addClientListener(listener);
return new RemoteCacheWorker(remoteCache, threadId);
//return new CacheWorker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
}
@ClientListener
public static class HotRodListener {
private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
private RemoteCache remoteCache;
private AtomicInteger listenerCount;
public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
this.listenerCount = listenerCount;
this.remoteCache = remoteCache;
this.origCache = origCache;
}
@ClientCacheEntryCreated
public void created(ClientCacheEntryCreatedEvent event) {
String cacheKey = (String) event.getKey();
logger.infof("Listener executed for creating of session %s", cacheKey);
}
@ClientCacheEntryModified
public void modified(ClientCacheEntryModifiedEvent event) {
String cacheKey = (String) event.getKey();
logger.infof("Listener executed for modifying of session %s", cacheKey);
}
@ClientCacheEntryRemoved
public void removed(ClientCacheEntryRemovedEvent event) {
String cacheKey = (String) event.getKey();
logger.infof("Listener executed for removing of session %s", cacheKey);
// TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
origCache
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
.remove(cacheKey);
}
}
private static class RemoteCacheWorker extends Thread {
private final RemoteCache<String, Object> remoteCache;
private final int myThreadId;
private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
this.remoteCache = remoteCache;
this.myThreadId = myThreadId;
}
@Override
public void run() {
for (int i=0 ; i<ITERATIONS ; i++) {
String sessionId = String.valueOf(i);
remoteCache.remove(sessionId);
logger.infof("Session %s removed on DC1", sessionId);
// Check if it's immediately seen that session is removed on 2nd DC
RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
// Also check that it's immediatelly removed on my DC
SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
}
}
}
}

View file

@ -59,7 +59,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
*/
public class ConcurrencyJDGSessionsCacheTest {
protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGSessionsCacheTest.class);
private static final int ITERATION_PER_WORKER = 1000;
@ -210,56 +210,11 @@ public class ConcurrencyJDGSessionsCacheTest {
//return new CacheWorker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, KcRemoteStoreConfigurationBuilder.class);
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.marshaller(KeycloakHotRodMarshallerFactory.class.getName())
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
.enabled(false).build();
}
@ClientListener
public static class HotRodListener {

View file

@ -0,0 +1,85 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
class TestCacheManagerFactory {
<T extends RemoteStoreConfigurationBuilder> EmbeddedCacheManager createManager(int threadId, String cacheName, Class<T> builderClass) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass);
cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration);
return cacheManager;
}
private <T extends RemoteStoreConfigurationBuilder> Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class<T> builderClass) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(builderClass)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(cacheName)
.rawValues(true)
.forceReturnValues(false)
.marshaller(KeycloakHotRodMarshallerFactory.class.getName())
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
}

View file

@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException;
import org.keycloak.services.managers.AuthenticationManager;
import org.keycloak.services.managers.AuthenticationSessionManager;
import org.keycloak.services.managers.ClientSessionCode;
import org.keycloak.services.managers.UserSessionCrossDCManager;
import org.keycloak.services.messages.Messages;
import org.keycloak.services.resources.LoginActionsService;
import org.keycloak.services.util.CacheControlUtil;
@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase {
}
}
UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId);
UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId);
if (userSession != null) {
logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId);

View file

@ -62,4 +62,17 @@ public class UserSessionCrossDCManager {
});
}
// Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified
public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) {
UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id);
// This will remove userSession "locally" if it doesn't exists on remoteCache
kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> {
return userSession2 == null;
});
return kcSession.sessions().getUserSession(realm, id);
}
}

View file

@ -39,8 +39,8 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest {
//log.info("Started to sleep");
//Thread.sleep(10000000);
for (int i=0 ; i<10 ; i++) {
OAuthClient.AuthorizationEndpointResponse response1 = Retry.call(() -> oauth.doLogin("test-user@localhost", "password"), 20, 100);
for (int i=0 ; i<30 ; i++) {
OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
String code = response1.getCode();
OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password");
Assert.assertNotNull(response2.getAccessToken());