KEYCLOAK-4187 Fix LoginCrossDCTest
This commit is contained in:
parent
6d003555ea
commit
a72c297d5d
11 changed files with 428 additions and 166 deletions
|
@ -321,13 +321,26 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
|
||||
// Recursion. We should have it locally now
|
||||
return getUserSessionWithPredicate(realm, id, offline, predicate);
|
||||
} else {
|
||||
log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id);
|
||||
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
log.debugf("getUserSessionWithPredicate(%s): not found", id);
|
||||
|
||||
// Session not available on remoteCache. Was already removed there. So removing locally too.
|
||||
// TODO: Can be optimized to skip calling remoteCache.remove
|
||||
removeUserSession(realm, userSession);
|
||||
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
|
||||
log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
log.debugf("getUserSessionWithPredicate(%s): not found", id);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction<S extends SessionEntity> extend
|
|||
|
||||
return wrappedEntity;
|
||||
} else {
|
||||
return myUpdates.getEntityWrapper();
|
||||
S entity = myUpdates.getEntityWrapper().getEntity();
|
||||
|
||||
// If entity is scheduled for remove, we don't return it.
|
||||
boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
|
||||
|
||||
return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
|
||||
|
||||
}).findFirst().isPresent();
|
||||
|
||||
return scheduledForRemove ? null : myUpdates.getEntityWrapper();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore {
|
|||
public boolean delete(Object key) throws PersistenceException {
|
||||
logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
|
||||
|
||||
// Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously
|
||||
getRemoteCache().removeAsync(key);
|
||||
// Optimization - we don't need to know the previous value.
|
||||
getRemoteCache().remove(key);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
|
|||
import org.infinispan.client.hotrod.annotation.ClientListener;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.manager.PersistenceManager;
|
||||
import org.infinispan.persistence.remote.RemoteStore;
|
||||
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
|
||||
|
||||
|
@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
|
|||
}
|
||||
|
||||
private static Worker createWorker(int threadId) {
|
||||
EmbeddedCacheManager manager = createManager(threadId);
|
||||
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
|
|||
return new Worker(cache, threadId);
|
||||
}
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
System.setProperty("java.net.preferIPv4Stack", "true");
|
||||
System.setProperty("jgroups.tcp.port", "53715");
|
||||
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
|
||||
|
||||
boolean clustered = false;
|
||||
boolean async = false;
|
||||
boolean allowDuplicateJMXDomains = true;
|
||||
|
||||
if (clustered) {
|
||||
gcb = gcb.clusteredDefault();
|
||||
gcb.transport().clusterName("test-clustering");
|
||||
}
|
||||
|
||||
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
|
||||
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
|
||||
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
|
||||
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
|
||||
return cacheManager;
|
||||
|
||||
}
|
||||
|
||||
private static Configuration getCacheBackedByRemoteStore(int threadId) {
|
||||
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
|
||||
|
||||
int port = threadId==1 ? 12232 : 13232;
|
||||
//int port = 12232;
|
||||
|
||||
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
|
||||
.fetchPersistentState(false)
|
||||
.ignoreModifications(false)
|
||||
.purgeOnStartup(false)
|
||||
.preload(false)
|
||||
.shared(true)
|
||||
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
|
||||
.rawValues(true)
|
||||
.forceReturnValues(false)
|
||||
.addServer()
|
||||
.host("localhost")
|
||||
.port(port)
|
||||
.connectionPool()
|
||||
.maxActive(20)
|
||||
.exhaustedAction(ExhaustedAction.CREATE_NEW)
|
||||
.async()
|
||||
. enabled(false).build();
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
|
|
@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest {
|
|||
}
|
||||
|
||||
private static Worker createWorker(int threadId) {
|
||||
EmbeddedCacheManager manager = createManager(threadId);
|
||||
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest {
|
|||
return new Worker(cache, threadId);
|
||||
}
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
System.setProperty("java.net.preferIPv4Stack", "true");
|
||||
System.setProperty("jgroups.tcp.port", "53715");
|
||||
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
|
||||
|
||||
boolean clustered = false;
|
||||
boolean async = false;
|
||||
boolean allowDuplicateJMXDomains = true;
|
||||
|
||||
if (clustered) {
|
||||
gcb = gcb.clusteredDefault();
|
||||
gcb.transport().clusterName("test-clustering");
|
||||
}
|
||||
|
||||
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
|
||||
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
|
||||
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
|
||||
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
|
||||
return cacheManager;
|
||||
|
||||
}
|
||||
|
||||
private static Configuration getCacheBackedByRemoteStore(int threadId) {
|
||||
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
|
||||
|
||||
int port = threadId==1 ? 12232 : 13232;
|
||||
//int port = 12232;
|
||||
|
||||
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
|
||||
.fetchPersistentState(false)
|
||||
.ignoreModifications(false)
|
||||
.purgeOnStartup(false)
|
||||
.preload(false)
|
||||
.shared(true)
|
||||
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
|
||||
.rawValues(true)
|
||||
.forceReturnValues(false)
|
||||
.addServer()
|
||||
.host("localhost")
|
||||
.port(port)
|
||||
.connectionPool()
|
||||
.maxActive(20)
|
||||
.exhaustedAction(ExhaustedAction.CREATE_NEW)
|
||||
.async()
|
||||
. enabled(false).build();
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
|
|
@ -0,0 +1,292 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
|
||||
import org.infinispan.client.hotrod.annotation.ClientListener;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
|
||||
|
||||
/**
|
||||
* Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
|
||||
*
|
||||
* Also check that listeners are executed asynchronously with some delay.
|
||||
*
|
||||
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyJDGRemoveSessionTest {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
|
||||
|
||||
private static final int ITERATIONS = 10000;
|
||||
|
||||
private static RemoteCache remoteCache1;
|
||||
private static RemoteCache remoteCache2;
|
||||
|
||||
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
|
||||
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
|
||||
|
||||
private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
|
||||
private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
|
||||
|
||||
//private static Map<String, EntryInfo> state = new HashMap<>();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
|
||||
|
||||
// Create caches, listeners and finally worker threads
|
||||
Thread worker1 = createWorker(cache1, 1);
|
||||
Thread worker2 = createWorker(cache2, 2);
|
||||
|
||||
// Create 100 initial sessions
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
|
||||
cache1.put(sessionId, wrappedSession);
|
||||
}
|
||||
|
||||
logger.info("SESSIONS CREATED");
|
||||
|
||||
// Create 100 initial sessions
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
|
||||
Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
|
||||
}
|
||||
|
||||
logger.info("SESSIONS AVAILABLE ON DC2");
|
||||
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
// Just running in current thread
|
||||
worker1.run();
|
||||
|
||||
logger.info("SESSIONS REMOVED");
|
||||
|
||||
//Thread.sleep(5000);
|
||||
|
||||
// Doing it in opposite direction to ensure that newer are checked first.
|
||||
// This us currently FAILING (expected) as listeners are executed asynchronously.
|
||||
for (int i=ITERATIONS-1 ; i>=0 ; i--) {
|
||||
String sessionId = String.valueOf(i);
|
||||
|
||||
logger.infof("Before call cache2.get: %s", sessionId);
|
||||
|
||||
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
|
||||
Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
|
||||
}
|
||||
|
||||
logger.info("SESSIONS NOT AVAILABLE ON DC2");
|
||||
|
||||
|
||||
// // Start and join workers
|
||||
// worker1.start();
|
||||
// worker2.start();
|
||||
//
|
||||
// worker1.join();
|
||||
// worker2.join();
|
||||
|
||||
} finally {
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Finish JVM
|
||||
cache1.getCacheManager().stop();
|
||||
cache2.getCacheManager().stop();
|
||||
}
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
|
||||
// // Output
|
||||
// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
// System.out.println(entry.getKey() + ":::" + entry.getValue());
|
||||
// worker1.cache.remove(entry.getKey());
|
||||
// }
|
||||
|
||||
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
|
||||
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
|
||||
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
|
||||
//
|
||||
// System.out.println("Sleeping before other report");
|
||||
//
|
||||
// Thread.sleep(1000);
|
||||
//
|
||||
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
|
||||
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
|
||||
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
|
||||
// Create 100 initial sessions
|
||||
UserSessionEntity session = new UserSessionEntity();
|
||||
session.setId(sessionId);
|
||||
session.setRealm("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
|
||||
clientSession.setAuthMethod("saml");
|
||||
clientSession.setAction("something");
|
||||
clientSession.setTimestamp(1234);
|
||||
clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
|
||||
clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
|
||||
session.getAuthenticatedClientSessions().put("client1", clientSession);
|
||||
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
|
||||
return wrappedSession;
|
||||
}
|
||||
|
||||
|
||||
private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
||||
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
|
||||
|
||||
if (threadId == 1) {
|
||||
remoteCache1 = remoteCache;
|
||||
} else {
|
||||
remoteCache2 = remoteCache;
|
||||
}
|
||||
|
||||
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
|
||||
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
|
||||
remoteCache.addClientListener(listener);
|
||||
|
||||
return new RemoteCacheWorker(remoteCache, threadId);
|
||||
//return new CacheWorker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
||||
private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
|
||||
private RemoteCache remoteCache;
|
||||
private AtomicInteger listenerCount;
|
||||
|
||||
public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
|
||||
this.listenerCount = listenerCount;
|
||||
this.remoteCache = remoteCache;
|
||||
this.origCache = origCache;
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryCreated
|
||||
public void created(ClientCacheEntryCreatedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
|
||||
logger.infof("Listener executed for creating of session %s", cacheKey);
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryModified
|
||||
public void modified(ClientCacheEntryModifiedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
|
||||
logger.infof("Listener executed for modifying of session %s", cacheKey);
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryRemoved
|
||||
public void removed(ClientCacheEntryRemovedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
|
||||
logger.infof("Listener executed for removing of session %s", cacheKey);
|
||||
|
||||
// TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
|
||||
origCache
|
||||
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
|
||||
.remove(cacheKey);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class RemoteCacheWorker extends Thread {
|
||||
|
||||
private final RemoteCache<String, Object> remoteCache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
|
||||
this.remoteCache = remoteCache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
remoteCache.remove(sessionId);
|
||||
|
||||
|
||||
logger.infof("Session %s removed on DC1", sessionId);
|
||||
|
||||
// Check if it's immediately seen that session is removed on 2nd DC
|
||||
RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
|
||||
SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
|
||||
Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
|
||||
|
||||
// Also check that it's immediatelly removed on my DC
|
||||
SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
|
||||
Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -59,7 +59,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
|
|||
*/
|
||||
public class ConcurrencyJDGSessionsCacheTest {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);
|
||||
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGSessionsCacheTest.class);
|
||||
|
||||
private static final int ITERATION_PER_WORKER = 1000;
|
||||
|
||||
|
@ -210,56 +210,11 @@ public class ConcurrencyJDGSessionsCacheTest {
|
|||
//return new CacheWorker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
System.setProperty("java.net.preferIPv4Stack", "true");
|
||||
System.setProperty("jgroups.tcp.port", "53715");
|
||||
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
|
||||
|
||||
boolean clustered = false;
|
||||
boolean async = false;
|
||||
boolean allowDuplicateJMXDomains = true;
|
||||
|
||||
if (clustered) {
|
||||
gcb = gcb.clusteredDefault();
|
||||
gcb.transport().clusterName("test-clustering");
|
||||
}
|
||||
|
||||
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
|
||||
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
|
||||
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
|
||||
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
|
||||
return cacheManager;
|
||||
|
||||
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, KcRemoteStoreConfigurationBuilder.class);
|
||||
}
|
||||
|
||||
private static Configuration getCacheBackedByRemoteStore(int threadId) {
|
||||
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
|
||||
|
||||
int port = threadId==1 ? 12232 : 13232;
|
||||
//int port = 12232;
|
||||
|
||||
return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
|
||||
.fetchPersistentState(false)
|
||||
.ignoreModifications(false)
|
||||
.purgeOnStartup(false)
|
||||
.preload(false)
|
||||
.shared(true)
|
||||
.remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
|
||||
.rawValues(true)
|
||||
.forceReturnValues(false)
|
||||
.marshaller(KeycloakHotRodMarshallerFactory.class.getName())
|
||||
.addServer()
|
||||
.host("localhost")
|
||||
.port(port)
|
||||
.connectionPool()
|
||||
.maxActive(20)
|
||||
.exhaustedAction(ExhaustedAction.CREATE_NEW)
|
||||
.async()
|
||||
.enabled(false).build();
|
||||
}
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
class TestCacheManagerFactory {
|
||||
|
||||
|
||||
<T extends RemoteStoreConfigurationBuilder> EmbeddedCacheManager createManager(int threadId, String cacheName, Class<T> builderClass) {
|
||||
System.setProperty("java.net.preferIPv4Stack", "true");
|
||||
System.setProperty("jgroups.tcp.port", "53715");
|
||||
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
|
||||
|
||||
boolean clustered = false;
|
||||
boolean async = false;
|
||||
boolean allowDuplicateJMXDomains = true;
|
||||
|
||||
if (clustered) {
|
||||
gcb = gcb.clusteredDefault();
|
||||
gcb.transport().clusterName("test-clustering");
|
||||
}
|
||||
|
||||
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
|
||||
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
|
||||
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass);
|
||||
|
||||
cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration);
|
||||
return cacheManager;
|
||||
|
||||
}
|
||||
|
||||
|
||||
private <T extends RemoteStoreConfigurationBuilder> Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class<T> builderClass) {
|
||||
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
|
||||
|
||||
int port = threadId==1 ? 12232 : 13232;
|
||||
//int port = 12232;
|
||||
|
||||
return cacheConfigBuilder.persistence().addStore(builderClass)
|
||||
.fetchPersistentState(false)
|
||||
.ignoreModifications(false)
|
||||
.purgeOnStartup(false)
|
||||
.preload(false)
|
||||
.shared(true)
|
||||
.remoteCacheName(cacheName)
|
||||
.rawValues(true)
|
||||
.forceReturnValues(false)
|
||||
.marshaller(KeycloakHotRodMarshallerFactory.class.getName())
|
||||
.addServer()
|
||||
.host("localhost")
|
||||
.port(port)
|
||||
.connectionPool()
|
||||
.maxActive(20)
|
||||
.exhaustedAction(ExhaustedAction.CREATE_NEW)
|
||||
.async()
|
||||
. enabled(false).build();
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException;
|
|||
import org.keycloak.services.managers.AuthenticationManager;
|
||||
import org.keycloak.services.managers.AuthenticationSessionManager;
|
||||
import org.keycloak.services.managers.ClientSessionCode;
|
||||
import org.keycloak.services.managers.UserSessionCrossDCManager;
|
||||
import org.keycloak.services.messages.Messages;
|
||||
import org.keycloak.services.resources.LoginActionsService;
|
||||
import org.keycloak.services.util.CacheControlUtil;
|
||||
|
@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase {
|
|||
}
|
||||
}
|
||||
|
||||
UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId);
|
||||
UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId);
|
||||
|
||||
if (userSession != null) {
|
||||
logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId);
|
||||
|
|
|
@ -62,4 +62,17 @@ public class UserSessionCrossDCManager {
|
|||
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified
|
||||
public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) {
|
||||
UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id);
|
||||
|
||||
// This will remove userSession "locally" if it doesn't exists on remoteCache
|
||||
kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> {
|
||||
return userSession2 == null;
|
||||
});
|
||||
|
||||
return kcSession.sessions().getUserSession(realm, id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,8 +39,8 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest {
|
|||
|
||||
//log.info("Started to sleep");
|
||||
//Thread.sleep(10000000);
|
||||
for (int i=0 ; i<10 ; i++) {
|
||||
OAuthClient.AuthorizationEndpointResponse response1 = Retry.call(() -> oauth.doLogin("test-user@localhost", "password"), 20, 100);
|
||||
for (int i=0 ; i<30 ; i++) {
|
||||
OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
|
||||
String code = response1.getCode();
|
||||
OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password");
|
||||
Assert.assertNotNull(response2.getAccessToken());
|
||||
|
|
Loading…
Reference in a new issue