KEYCLOAK-4187 Fix LastSessionRefreshCrossDCTest and ConcurrentLoginCrossDCTest

This commit is contained in:
mposolda 2017-08-03 20:21:54 +02:00
parent 3fce14d9ce
commit 251b41a7ac
7 changed files with 349 additions and 32 deletions

View file

@ -23,28 +23,23 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.context.Flag;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.persistence.remote.RemoteStore;
@ -52,8 +47,7 @@ import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.MultivaluedHashMap;
import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
/**
* Impl for sending infinispan messages across cluster and listening to them
*
@ -63,7 +57,7 @@ public class InfinispanNotificationsManager {
protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class);
private final MultivaluedHashMap<String, ClusterListener> listeners = new MultivaluedHashMap<>();
private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>();
private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>();
@ -132,8 +126,10 @@ public class InfinispanNotificationsManager {
wrappedEvent.setSender(myAddress);
wrappedEvent.setSenderSite(mySite);
String eventKey = UUID.randomUUID().toString();
if (logger.isTraceEnabled()) {
logger.tracef("Sending event: %s", event);
logger.tracef("Sending event with key %s: %s", eventKey, event);
}
Flag[] flags = dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY
@ -142,7 +138,7 @@ public class InfinispanNotificationsManager {
// Put the value to the cache to notify listeners on all the nodes
workCache.getAdvancedCache().withFlags(flags)
.put(UUID.randomUUID().toString(), wrappedEvent, 120, TimeUnit.SECONDS);
.put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
}
@ -208,6 +204,9 @@ public class InfinispanNotificationsManager {
private void eventReceived(String key, Serializable obj) {
if (!(obj instanceof WrapperClusterEvent)) {
if (obj == null) {
logger.warnf("Event object wasn't available in remote cache after event was received. Event key: %s", key);
}
return;
}

View file

@ -90,7 +90,7 @@ public class LastSessionRefreshStore {
LastSessionRefreshEvent event = new LastSessionRefreshEvent(refreshesToSend);
if (logger.isDebugEnabled()) {
logger.debugf("Sending lastSessionRefreshes: %s", event.getLastSessionRefreshes().toString());
logger.debugf("Sending lastSessionRefreshes for key '%s'. Refreshes: %s", eventKey, event.getLastSessionRefreshes().toString());
}
// Don't notify local DC about the lastSessionRefreshes. They were processed here already

View file

@ -0,0 +1,295 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.junit.Assert;
import org.junit.Ignore;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* Test that hotrod ClientListeners are correctly executed as expected
*
* STEPS TO REPRODUCE:
* - Unzip infinispan-server-8.2.6.Final to some locations ISPN1 and ISPN2
*
* - Edit both ISPN1/standalone/configuration/clustered.xml and ISPN2/standalone/configuration/clustered.xml . Configure cache in container "clustered"
*
* <replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
<transaction mode="NON_XA" locking="PESSIMISTIC"/>
</replicated-cache-configuration>
<replicated-cache name="work" configuration="sessions-cfg" />
- Run server1
./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=1010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server
- Run server2
./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=2010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server-dc-2
- Run this test as main class from IDE
*
*
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ConcurrencyJDGRemoteCacheClientListenersTest {
// Helper map to track if listeners were executed
private static Map<String, EntryInfo> state = new HashMap<>();
private static AtomicInteger totalListenerCalls = new AtomicInteger(0);
private static AtomicInteger totalErrors = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
// Init map somehow
for (int i=0 ; i<1000 ; i++) {
String key = "key-" + i;
EntryInfo entryInfo = new EntryInfo();
entryInfo.val.set(i);
state.put(key, entryInfo);
}
// Create caches, listeners and finally worker threads
Worker worker1 = createWorker(1);
Worker worker2 = createWorker(2);
// Note "run", so it's not executed asynchronously here!!!
worker1.run();
//
// // Start and join workers
// worker1.start();
// worker2.start();
//
// worker1.join();
// worker2.join();
// Output
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
System.out.println(entry.getKey() + ":::" + entry.getValue());
}
System.out.println("totalListeners: " + totalListenerCalls.get() + ", totalErrors: " + totalErrors.get());
// Assert that ClientListener was able to read the value and save it into EntryInfo
try {
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
EntryInfo info = entry.getValue();
Assert.assertEquals(info.val.get(), info.dc1Created.get());
Assert.assertEquals(info.val.get(), info.dc2Created.get());
Assert.assertEquals(info.val.get() * 2, info.dc1Updated.get());
Assert.assertEquals(info.val.get() * 2, info.dc2Updated.get());
worker1.cache.remove(entry.getKey());
}
} finally {
// Finish JVM
worker1.cache.getCacheManager().stop();
worker2.cache.getCacheManager().stop();
}
}
private static Worker createWorker(int threadId) {
EmbeddedCacheManager manager = createManager(threadId);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
RemoteStore remoteStore = cache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class).getStores(RemoteStore.class).iterator().next();
HotRodListener listener = new HotRodListener(cache, threadId);
remoteStore.getRemoteCache().addClientListener(listener);
return new Worker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
@ClientListener
public static class HotRodListener {
private final RemoteCache<String, Integer> remoteCache;
private final int threadId;
public HotRodListener(Cache<String, Integer> cache, int threadId) {
this.remoteCache = InfinispanUtil.getRemoteCache(cache);
this.threadId = threadId;
}
//private AtomicInteger listenerCount = new AtomicInteger(0);
@ClientCacheEntryCreated
public void created(ClientCacheEntryCreatedEvent event) {
String cacheKey = (String) event.getKey();
event(cacheKey, true);
}
@ClientCacheEntryModified
public void updated(ClientCacheEntryModifiedEvent event) {
String cacheKey = (String) event.getKey();
event(cacheKey, false);
}
private void event(String cacheKey, boolean created) {
EntryInfo entryInfo = state.get(cacheKey);
entryInfo.successfulListenerWrites.incrementAndGet();
totalListenerCalls.incrementAndGet();
Integer val = remoteCache.get(cacheKey);
if (val != null) {
AtomicInteger dcVal;
if (created) {
dcVal = threadId == 1 ? entryInfo.dc1Created : entryInfo.dc2Created;
} else {
dcVal = threadId == 1 ? entryInfo.dc1Updated : entryInfo.dc2Updated;
}
dcVal.set(val);
} else {
System.err.println("NOT A VALUE FOR KEY: " + cacheKey);
totalErrors.incrementAndGet();
}
}
}
private static class Worker extends Thread {
private final Cache<String, Integer> cache;
private final int myThreadId;
private Worker(Cache<String, Integer> cache, int myThreadId) {
this.cache = cache;
this.myThreadId = myThreadId;
}
@Override
public void run() {
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
String cacheKey = entry.getKey();
Integer value = entry.getValue().val.get();
this.cache.put(cacheKey, value);
}
System.out.println("Worker creating finished: " + myThreadId);
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
String cacheKey = entry.getKey();
Integer value = entry.getValue().val.get() * 2;
this.cache.replace(cacheKey, value);
}
System.out.println("Worker updating finished: " + myThreadId);
}
}
public static class EntryInfo {
AtomicInteger val = new AtomicInteger();
AtomicInteger successfulListenerWrites = new AtomicInteger(0);
AtomicInteger dc1Created = new AtomicInteger();
AtomicInteger dc2Created = new AtomicInteger();
AtomicInteger dc1Updated = new AtomicInteger();
AtomicInteger dc2Updated = new AtomicInteger();
@Override
public String toString() {
return String.format("val: %d, successfulListenerWrites: %d, dc1Created: %d, dc2Created: %d, dc1Updated: %d, dc2Updated: %d", val.get(), successfulListenerWrites.get(),
dc1Created.get(), dc2Created.get(), dc1Updated.get(), dc2Updated.get());
}
}
}

View file

@ -43,11 +43,12 @@ import org.junit.Ignore;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
/**
* Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG
* Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "putIfAbsent" contract.
*
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@Ignore
public class ConcurrencyJDGRemoteCacheTest {
private static Map<String, EntryInfo> state = new HashMap<>();
@ -122,8 +123,8 @@ public class ConcurrencyJDGRemoteCacheTest {
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
// int port = threadId==1 ? 11222 : 11322;
int port = 11222;
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)

View file

@ -39,6 +39,7 @@ import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
@ -50,13 +51,9 @@ import org.keycloak.models.sessions.infinispan.remotestore.KcRemoteStoreConfigur
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* Test requires to prepare 2 JDG (or infinispan servers) before it's runned.
* Steps:
* - In JDG1/standalone/configuration/clustered.xml add this: <replicated-cache name="sessions" mode="SYNC" start="EAGER"/>
* - Same in JDG2
* - Run JDG1 with: ./standalone.sh -c clustered.xml
* - Run JDG2 with: ./standalone.sh -c clustered.xml -Djboss.socket.binding.port-offset=100
* - Run this test
* Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "replaceWithVersion" contract.
*
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -66,6 +63,9 @@ public class ConcurrencyJDGSessionsCacheTest {
private static final int ITERATION_PER_WORKER = 1000;
private static RemoteCache remoteCache1;
private static RemoteCache remoteCache2;
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
@ -176,6 +176,16 @@ public class ConcurrencyJDGSessionsCacheTest {
", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
System.out.println("Sleeping before other report");
Thread.sleep(1000);
System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
// Finish JVM
cache1.getCacheManager().stop();
cache2.getCacheManager().stop();
@ -186,7 +196,11 @@ public class ConcurrencyJDGSessionsCacheTest {
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
remoteCache.keySet();
if (threadId == 1) {
remoteCache1 = remoteCache;
} else {
remoteCache2 = remoteCache;
}
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
@ -224,8 +238,8 @@ public class ConcurrencyJDGSessionsCacheTest {
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
//int port = threadId==1 ? 11222 : 11322;
int port = 11222;
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
@ -288,12 +302,12 @@ public class ConcurrencyJDGSessionsCacheTest {
private static class RemoteCacheWorker extends Thread {
private final RemoteCache<String, UserSessionEntity> cache;
private final RemoteCache<String, UserSessionEntity> remoteCache;
private final int myThreadId;
private RemoteCacheWorker(RemoteCache cache, int myThreadId) {
this.cache = cache;
private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
this.remoteCache = remoteCache;
this.myThreadId = myThreadId;
}
@ -306,7 +320,7 @@ public class ConcurrencyJDGSessionsCacheTest {
boolean replaced = false;
while (!replaced) {
VersionedValue<UserSessionEntity> versioned = cache.getVersioned("123");
VersionedValue<UserSessionEntity> versioned = remoteCache.getVersioned("123");
UserSessionEntity oldSession = versioned.getValue();
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
UserSessionEntity clone = oldSession;
@ -315,13 +329,20 @@ public class ConcurrencyJDGSessionsCacheTest {
//cache.replace("123", clone);
replaced = cacheReplace(versioned, clone);
}
// Try to see if remoteCache on 2nd DC is immediatelly seeing our change
RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
UserSessionEntity thatSession = (UserSessionEntity) secondDCRemoteCache.get("123");
Assert.assertEquals("someVal", thatSession.getNotes().get(noteKey));
//System.out.println("Passed");
}
}
private boolean cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) {
try {
boolean replaced = cache.replaceWithVersion("123", newSession, oldSession.getVersion());
boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion());
//cache.replace("123", newSession);
if (!replaced) {
failedReplaceCounter.incrementAndGet();

View file

@ -47,7 +47,7 @@
<xsl:copy>
<xsl:apply-templates select="@* | node()" />
<replicated-cache-configuration name="sessions-cfg" mode="SYNC" start="EAGER" batching="false">
<replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
<transaction mode="NON_XA" locking="PESSIMISTIC"/>
</replicated-cache-configuration>

View file

@ -76,6 +76,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
runnable.run(arrayIndex % numThreads, keycloaks.get(), keycloaks.get().realm(REALM_NAME));
} catch (Throwable ex) {
failures.add(ex);
log.error(ex.getMessage(), ex);
}
return null;
});
@ -93,7 +94,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
}
if (! failures.isEmpty()) {
RuntimeException ex = new RuntimeException("There were failures in threads");
RuntimeException ex = new RuntimeException("There were failures in threads. Failures count: " + failures.size());
failures.forEach(ex::addSuppressed);
throw ex;
}