Ensure that Infinispan shutdowns correctly at the end of the tests. Report any exceptions within another thread as a test failure.

Adding additional information like a thread dump when it doesn't shutdown as expected.

Closes #10016
This commit is contained in:
Alexander Schwartz 2022-02-17 11:22:17 +01:00 committed by Hynek Mlnařík
parent 74581b5c10
commit ebfc24d6c1
10 changed files with 260 additions and 91 deletions

View file

@ -201,28 +201,32 @@ public class InfinispanClusterProviderFactory implements ClusterProviderFactory
// Use separate thread to avoid potential deadlock // Use separate thread to avoid potential deadlock
localExecutor.execute(() -> { localExecutor.execute(() -> {
EmbeddedCacheManager cacheManager = workCache.getCacheManager(); try {
Transport transport = cacheManager.getTransport(); EmbeddedCacheManager cacheManager = workCache.getCacheManager();
Transport transport = cacheManager.getTransport();
// Coordinator makes sure that entries for outdated nodes are cleaned up // Coordinator makes sure that entries for outdated nodes are cleaned up
if (transport != null && transport.isCoordinator()) { if (transport != null && transport.isCoordinator()) {
removedNodesAddresses.removeAll(newAddresses); removedNodesAddresses.removeAll(newAddresses);
if (removedNodesAddresses.isEmpty()) { if (removedNodesAddresses.isEmpty()) {
return; return;
} }
logger.debugf("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString()); logger.debugf("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
/* /*
workaround for Infinispan 12.1.7.Final to prevent a deadlock while workaround for Infinispan 12.1.7.Final to prevent a deadlock while
DefaultInfinispanConnectionProviderFactory is shutting down PersistenceManagerImpl DefaultInfinispanConnectionProviderFactory is shutting down PersistenceManagerImpl
that acquires a writeLock and this removal that acquires a readLock. that acquires a writeLock and this removal that acquires a readLock.
https://issues.redhat.com/browse/ISPN-13664 https://issues.redhat.com/browse/ISPN-13664
*/ */
synchronized (DefaultInfinispanConnectionProviderFactory.class) { synchronized (DefaultInfinispanConnectionProviderFactory.class) {
workCache.entrySet().removeIf(new LockEntryPredicate(removedNodesAddresses)); workCache.entrySet().removeIf(new LockEntryPredicate(removedNodesAddresses));
}
} }
} catch (Throwable t) {
logger.error("caught exception in ViewChangeListener", t);
} }
}); });
} }

View file

@ -254,7 +254,8 @@ public class InfinispanNotificationsManager {
}); });
} catch (RejectedExecutionException ree) { } catch (RejectedExecutionException ree) {
logger.errorf("Rejected submitting of the event for key: %s. Value: %s, Server going to shutdown or pool exhausted. Pool: %s", key, workCache.get(key), listenersExecutor.toString()); // avoid touching the cache when creating a log message to avoid a deadlock in Infinispan 12.1.7.Final
logger.errorf("Rejected submitting of the event for key: %s. Server going to shutdown or pool exhausted. Pool: %s", key, listenersExecutor.toString());
throw ree; throw ree;
} }
} }

View file

@ -151,6 +151,8 @@
<log4j.configuration>file:${project.build.directory}/test-classes/log4j.properties</log4j.configuration> <!-- for the logging to properly work with tests in the 'other' module --> <log4j.configuration>file:${project.build.directory}/test-classes/log4j.properties</log4j.configuration> <!-- for the logging to properly work with tests in the 'other' module -->
<keycloak.profile.feature.map_storage>${keycloak.profile.feature.map_storage}</keycloak.profile.feature.map_storage> <keycloak.profile.feature.map_storage>${keycloak.profile.feature.map_storage}</keycloak.profile.feature.map_storage>
<keycloak.userSessions.infinispan.preloadOfflineSessionsFromDatabase>${keycloak.userSessions.infinispan.preloadOfflineSessionsFromDatabase}</keycloak.userSessions.infinispan.preloadOfflineSessionsFromDatabase> <keycloak.userSessions.infinispan.preloadOfflineSessionsFromDatabase>${keycloak.userSessions.infinispan.preloadOfflineSessionsFromDatabase}</keycloak.userSessions.infinispan.preloadOfflineSessionsFromDatabase>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<org.jboss.logging.provider>log4j</org.jboss.logging.provider>
</systemPropertyVariables> </systemPropertyVariables>
</configuration> </configuration>
</plugin> </plugin>

View file

@ -16,6 +16,9 @@
*/ */
package org.keycloak.testsuite.model; package org.keycloak.testsuite.model;
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.manager.EmbeddedCacheManagerStartupException;
import org.junit.Assert;
import org.keycloak.Config.Scope; import org.keycloak.Config.Scope;
import org.keycloak.authorization.AuthorizationSpi; import org.keycloak.authorization.AuthorizationSpi;
import org.keycloak.authorization.DefaultAuthorizationProviderFactory; import org.keycloak.authorization.DefaultAuthorizationProviderFactory;
@ -49,18 +52,29 @@ import org.keycloak.services.DefaultComponentFactoryProviderFactory;
import org.keycloak.services.DefaultKeycloakSessionFactory; import org.keycloak.services.DefaultKeycloakSessionFactory;
import org.keycloak.timer.TimerSpi; import org.keycloak.timer.TimerSpi;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import java.lang.management.LockInfo;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
@ -155,7 +169,7 @@ public abstract class KeycloakModelTest {
if (getFactory().getProviderFactory(providerClass) == null) { if (getFactory().getProviderFactory(providerClass) == null) {
return new Statement() { return new Statement() {
@Override @Override
public void evaluate() throws Throwable { public void evaluate() {
throw new AssumptionViolatedException("Provider must exist: " + providerClass); throw new AssumptionViolatedException("Provider must exist: " + providerClass);
} }
}; };
@ -165,7 +179,7 @@ public abstract class KeycloakModelTest {
if (notFoundAny) { if (notFoundAny) {
return new Statement() { return new Statement() {
@Override @Override
public void evaluate() throws Throwable { public void evaluate() {
throw new AssumptionViolatedException("Provider must exist: " + providerClass + " one of [" + String.join(",", only) + "]"); throw new AssumptionViolatedException("Provider must exist: " + providerClass + " one of [" + String.join(",", only) + "]");
} }
}; };
@ -322,48 +336,139 @@ public abstract class KeycloakModelTest {
* Runs the given {@code task} in {@code numThreads} parallel threads, each thread operating * Runs the given {@code task} in {@code numThreads} parallel threads, each thread operating
* in the context of a fresh {@link KeycloakSessionFactory} independent of each other thread. * in the context of a fresh {@link KeycloakSessionFactory} independent of each other thread.
* *
* Will throw an exception when the thread throws an exception or if the thread doesn't complete in time.
*
* @see #inIndependentFactory * @see #inIndependentFactory
* *
* @param numThreads
* @param timeoutSeconds
* @param task
* @throws InterruptedException
*/ */
public static void inIndependentFactories(int numThreads, int timeoutSeconds, Runnable task) throws InterruptedException { public static void inIndependentFactories(int numThreads, int timeoutSeconds, Runnable task) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(numThreads); enabledContentionMonitoring();
// memorize threads created to be able to retrieve their stacktrace later if they don't terminate
LinkedList<Thread> threads = new LinkedList<>();
ExecutorService es = Executors.newFixedThreadPool(numThreads, new ThreadFactory() {
final ThreadFactory tf = Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
{
Thread thread = tf.newThread(r);
threads.add(thread);
return thread;
}
}
});
try { try {
/* /*
workaround for Infinispan 12.1.7.Final to prevent an internal Infinispan NullPointerException workaround for Infinispan 12.1.7.Final to prevent an internal Infinispan NullPointerException
when multiple nodes tried to join at the same time by starting them sequentially with 1 sec delay. when multiple nodes tried to join at the same time by starting them sequentially,
Already fixed in Infinispan 13. although that does not catch 100% of all occurrences.
https://issues.redhat.com/browse/ISPN-13231 Already fixed in Infinispan 13.
*/ https://issues.redhat.com/browse/ISPN-13231
*/
Semaphore sem = new Semaphore(1); Semaphore sem = new Semaphore(1);
CountDownLatch start = new CountDownLatch(numThreads);
CountDownLatch stop = new CountDownLatch(numThreads);
Callable<?> independentTask = () -> { Callable<?> independentTask = () -> {
AtomicBoolean locked = new AtomicBoolean(false);
try { try {
sem.acquire(); sem.acquire();
return inIndependentFactory(() -> { locked.set(true);
Thread.sleep(1000); Object val = inIndependentFactory(() -> {
sem.release(); sem.release();
task.run(); locked.set(false);
// use the latch to ensure that all caches are online while the transaction below runs to avoid a RemoteException
start.countDown();
start.await();
try {
task.run();
// use the latch to ensure that all caches are online while the transaction above runs to avoid a RemoteException
// otherwise might fail with "Cannot wire or start components while the registry is not running" during shutdown
// https://issues.redhat.com/browse/ISPN-9761
} finally {
stop.countDown();
}
stop.await();
sem.acquire();
locked.set(true);
return null; return null;
}); });
} catch (Exception ex) { sem.release();
LOG.error("Thread terminated with an exception", ex); locked.set(false);
return null; return val;
} finally {
if (locked.get()) {
sem.release();
}
} }
}; };
es.invokeAll(
IntStream.range(0, numThreads) // submit tasks, and wait for the results without cancelling execution so that we'll be able to analyze the thread dump
.mapToObj(i -> independentTask) List<? extends Future<?>> tasks = IntStream.range(0, numThreads)
.collect(Collectors.toList()), .mapToObj(i -> independentTask)
timeoutSeconds, TimeUnit.SECONDS .map(es::submit).collect(Collectors.toList());
); long limit = System.currentTimeMillis() + timeoutSeconds * 1000L;
for (Future<?> future : tasks) {
long limitForTask = limit - System.currentTimeMillis();
if (limitForTask > 0) {
try {
future.get(limitForTask, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
if (e.getCause() instanceof AssertionError) {
throw (AssertionError) e.getCause();
} else {
LOG.error("Execution didn't complete", e);
Assert.fail("Execution didn't complete: " + e.getMessage());
}
} catch (TimeoutException e) {
failWithThreadDump(threads, e);
}
} else {
failWithThreadDump(threads, null);
}
}
} finally { } finally {
LOG.debugf("waiting for threads to shutdown to avoid that one test pollutes another test"); es.shutdownNow();
es.shutdown();
LOG.debugf("shutdown of threads complete");
} }
// wait for shutdown executor pool, but not if there has been an exception
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
failWithThreadDump(threads, null);
}
}
private static void enabledContentionMonitoring() {
if (!ManagementFactory.getThreadMXBean().isThreadContentionMonitoringEnabled()) {
ManagementFactory.getThreadMXBean().setThreadContentionMonitoringEnabled(true);
}
}
private static void failWithThreadDump(LinkedList<Thread> threads, Exception e) {
ThreadInfo[] infos = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
List<String> liveStacks = Arrays.stream(infos).map(thread -> {
StringBuilder sb = new StringBuilder();
if (threads.stream().anyMatch(t -> t.getId() == thread.getThreadId())) {
sb.append("[OurThreadPool] ");
}
sb.append(thread.getThreadName()).append(" (").append(thread.getThreadState()).append("):");
LockInfo lockInfo = thread.getLockInfo();
if (lockInfo != null) {
sb.append(" locked on ").append(lockInfo);
if (thread.getWaitedTime() != -1) {
sb.append(" waiting for ").append(thread.getWaitedTime()).append(" ms");
}
if (thread.getBlockedTime() != -1) {
sb.append(" blocked for ").append(thread.getBlockedTime()).append(" ms");
}
}
sb.append("\n");
for (StackTraceElement traceElement : thread.getStackTrace()) {
sb.append("\tat ").append(traceElement).append("\n");
}
return sb.toString();
}).collect(Collectors.toList());
throw new AssertionError("threads didn't terminate in time: " + liveStacks, e);
} }
/** /**
@ -376,10 +481,46 @@ public abstract class KeycloakModelTest {
throw new IllegalStateException("USE_DEFAULT_FACTORY must be false to use an independent factory"); throw new IllegalStateException("USE_DEFAULT_FACTORY must be false to use an independent factory");
} }
KeycloakSessionFactory original = getFactory(); KeycloakSessionFactory original = getFactory();
KeycloakSessionFactory factory = createKeycloakSessionFactory(); int retries = 10;
KeycloakSessionFactory factory = null;
do {
try {
factory = createKeycloakSessionFactory();
} catch (CacheConfigurationException | EmbeddedCacheManagerStartupException ex) {
if (retries > 0) {
/*
workaround for Infinispan 12.1.7.Final for a NullPointerException
when multiple nodes tried to join at the same time. Retry until this succeeds.
Already fixed in Infinispan 13.
https://issues.redhat.com/browse/ISPN-13231
*/
LOG.warn("initialization failed, retrying", ex);
--retries;
} else {
throw ex;
}
}
} while (factory == null);
try { try {
setFactory(factory); setFactory(factory);
return task.call(); do {
try {
return task.call();
} catch (CacheConfigurationException | EmbeddedCacheManagerStartupException ex) {
if (retries > 0) {
/*
workaround for Infinispan 12.1.7.Final for a NullPointerException
when multiple nodes tried to join at the same time. Retry until this succeeds.
Already fixed in Infinispan 13.
https://issues.redhat.com/browse/ISPN-13231
*/
LOG.warn("initialization failed, retrying", ex);
-- retries;
} else {
throw ex;
}
}
} while (true);
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} finally { } finally {

View file

@ -20,6 +20,7 @@ import org.keycloak.component.ComponentModel;
import org.keycloak.models.Constants; import org.keycloak.models.Constants;
import org.keycloak.models.GroupModel; import org.keycloak.models.GroupModel;
import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSession;
import org.keycloak.models.ModelException;
import org.keycloak.models.RealmModel; import org.keycloak.models.RealmModel;
import org.keycloak.models.RealmProvider; import org.keycloak.models.RealmProvider;
import org.keycloak.models.UserModel; import org.keycloak.models.UserModel;
@ -39,6 +40,8 @@ import java.util.stream.IntStream;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Test; import org.junit.Test;
import javax.naming.NamingException;
import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -213,7 +216,16 @@ public class UserModelTest extends KeycloakModelTest {
log.debugf("Removing selected users from backend"); log.debugf("Removing selected users from backend");
IntStream.range(FIRST_DELETED_USER_INDEX, LAST_DELETED_USER_INDEX).forEach(j -> { IntStream.range(FIRST_DELETED_USER_INDEX, LAST_DELETED_USER_INDEX).forEach(j -> {
final UserModel user = session.users().getUserByUsername(realm, "user-" + j); final UserModel user = session.users().getUserByUsername(realm, "user-" + j);
((UserRegistrationProvider) instance).removeUser(realm, user); try {
((UserRegistrationProvider) instance).removeUser(realm, user);
} catch (ModelException ex) {
// removing user might have failed for an LDAP reason
// as this is not the main subject under test, retry once more to delete the entry
if (ex.getMessage().contains("Could not unbind DN") && ex.getCause() instanceof NamingException) {
log.warn("removing failed, retrying", ex);
((UserRegistrationProvider) instance).removeUser(realm, user);
}
}
}); });
return null; return null;
}); });

View file

@ -37,7 +37,6 @@ import java.util.regex.Pattern;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assume.assumeThat; import static org.junit.Assume.assumeThat;
@ -54,6 +53,8 @@ public class CacheExpirationTest extends KeycloakModelTest {
@Test @Test
public void testCacheExpiration() throws Exception { public void testCacheExpiration() throws Exception {
log.debugf("Number of previous instances of the class on the heap: %d", getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class));
log.debug("Put two events to the main cache"); log.debug("Put two events to the main cache");
inComittedTransaction(session -> { inComittedTransaction(session -> {
InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class); InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class);
@ -71,12 +72,17 @@ public class CacheExpirationTest extends KeycloakModelTest {
// Ensure that instance counting works as expected, there should be at least two instances in memory now. // Ensure that instance counting works as expected, there should be at least two instances in memory now.
// Infinispan server is decoding the client request before processing the request at the cache level, // Infinispan server is decoding the client request before processing the request at the cache level,
// therefore there are sometimes three instances of AuthenticationSessionAuthNoteUpdateEvent class in the memory // therefore there are sometimes three instances of AuthenticationSessionAuthNoteUpdateEvent class in the memory
assertThat(getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class), greaterThanOrEqualTo(2)); Integer instancesAfterInsertion = getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class);
assertThat(instancesAfterInsertion, greaterThanOrEqualTo(2));
// A third instance created when inserting the instances is never collected from GC for a yet unknown reason.
// Therefore, ignore this additional instance in the upcoming tests.
int previousInstancesOfClass = instancesAfterInsertion - 2;
log.debug("Expecting instance count to go down to " + previousInstancesOfClass);
log.debug("Starting other nodes and see that they join, receive the data and have their data expired"); log.debug("Starting other nodes and see that they join, receive the data and have their data expired");
AtomicInteger completedTests = new AtomicInteger(0); inIndependentFactories(NUM_EXTRA_FACTORIES, 2 * 60, () -> {
inIndependentFactories(NUM_EXTRA_FACTORIES, 5 * 60, () -> {
log.debug("Joining the cluster"); log.debug("Joining the cluster");
inComittedTransaction(session -> { inComittedTransaction(session -> {
InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class); InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class);
@ -108,15 +114,12 @@ public class CacheExpirationTest extends KeycloakModelTest {
log.debug("Waiting for garbage collection to collect the entries across all caches in JVM"); log.debug("Waiting for garbage collection to collect the entries across all caches in JVM");
do { do {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); } try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
} while (getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class) != 0); } while (getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class) > previousInstancesOfClass);
completedTests.incrementAndGet();
log.debug("Test completed"); log.debug("Test completed");
}); });
}); });
assertThat(completedTests.get(), is(NUM_EXTRA_FACTORIES));
} }
private static final Pattern JMAP_HOTSPOT_PATTERN = Pattern.compile("\\s*\\d+:\\s+(\\d+)\\s+(\\d+)\\s+(\\S+)\\s*"); private static final Pattern JMAP_HOTSPOT_PATTERN = Pattern.compile("\\s*\\d+:\\s+(\\d+)\\s+(\\d+)\\s+(\\S+)\\s*");
@ -132,7 +135,9 @@ public class CacheExpirationTest extends KeycloakModelTest {
public synchronized Integer getNumberOfInstancesOfClass(Class<?> c, String pid) { public synchronized Integer getNumberOfInstancesOfClass(Class<?> c, String pid) {
Process proc; Process proc;
try { try {
// running this command will also trigger a garbage collection on the VM // running jmap command will also trigger a garbage collection on the VM, but that might be VM specific
// a test run with adding "-verbose:gc" showed the message "GC(23) Pause Full (Heap Inspection Initiated GC)" that
// indicates a full GC run
proc = Runtime.getRuntime().exec("jmap -histo:live " + pid); proc = Runtime.getRuntime().exec("jmap -histo:live " + pid);
try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(proc.getInputStream()))) { try (BufferedReader stdInput = new BufferedReader(new InputStreamReader(proc.getInputStream()))) {

View file

@ -33,11 +33,14 @@ import org.keycloak.services.managers.RealmManager;
import org.keycloak.testsuite.model.KeycloakModelTest; import org.keycloak.testsuite.model.KeycloakModelTest;
import org.keycloak.testsuite.model.RequireProvider; import org.keycloak.testsuite.model.RequireProvider;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -130,6 +133,8 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
} }
@Test(timeout = 90 * 1000) @Test(timeout = 90 * 1000)
@RequireProvider(UserSessionPersisterProvider.class)
@RequireProvider(value = UserSessionProvider.class, only = InfinispanUserSessionProviderFactory.PROVIDER_ID)
public void testPersistenceMultipleNodesClientSessionAtSameNode() throws InterruptedException { public void testPersistenceMultipleNodesClientSessionAtSameNode() throws InterruptedException {
List<String> clientIds = withRealm(realmId, (session, realm) -> { List<String> clientIds = withRealm(realmId, (session, realm) -> {
return IntStream.range(0, 5) return IntStream.range(0, 5)
@ -140,30 +145,30 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
// Shutdown factory -> enforce session persistence // Shutdown factory -> enforce session persistence
closeKeycloakSessionFactory(); closeKeycloakSessionFactory();
Set<String> clientSessionIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
Map<String, List<String>> clientSessionIds = new ConcurrentHashMap<>(); inIndependentFactories(3, 60, () -> {
inIndependentFactories(3, 30, () -> {
withRealm(realmId, (session, realm) -> { withRealm(realmId, (session, realm) -> {
// Create offline sessions // Create offline sessions
userIds.forEach(userId -> createOfflineSessions(session, realm, userId, offlineUserSession -> { userIds.forEach(userId -> createOfflineSessions(session, realm, userId, offlineUserSession -> {
List<String> innerClientSessionIds = IntStream.range(0, 5) IntStream.range(0, 5)
.mapToObj(cid -> session.clients().getClientById(realm, clientIds.get(cid))) .mapToObj(cid -> session.clients().getClientById(realm, clientIds.get(cid)))
// TODO in the future: The following two lines are weird. Why an online client session needs to exist in order to create an offline one? // TODO in the future: The following two lines are weird. Why an online client session needs to exist in order to create an offline one?
.map(client -> session.sessions().createClientSession(realm, client, offlineUserSession)) .map(client -> session.sessions().createClientSession(realm, client, offlineUserSession))
.map(clientSession -> session.sessions().createOfflineClientSession(clientSession, offlineUserSession)) .map(clientSession -> session.sessions().createOfflineClientSession(clientSession, offlineUserSession))
.map(AuthenticatedClientSessionModel::getId) .map(AuthenticatedClientSessionModel::getId)
.collect(Collectors.toList()); .forEach(s -> {}); // ensure that stream is consumed
clientSessionIds.put(offlineUserSession.getId(), innerClientSessionIds); }).forEach(userSessionModel -> clientSessionIds.add(userSessionModel.getId())));
}));
return null; return null;
}); });
}); });
reinitializeKeycloakSessionFactory(); reinitializeKeycloakSessionFactory();
inIndependentFactories(4, 30, () -> assertOfflineSessionsExist(realmId, clientSessionIds.keySet())); inIndependentFactories(4, 30, () -> assertOfflineSessionsExist(realmId, clientSessionIds));
} }
@Test(timeout = 90 * 1000) @Test(timeout = 90 * 1000)
@RequireProvider(UserSessionPersisterProvider.class)
@RequireProvider(value = UserSessionProvider.class, only = InfinispanUserSessionProviderFactory.PROVIDER_ID)
public void testPersistenceMultipleNodesClientSessionsAtRandomNode() throws InterruptedException { public void testPersistenceMultipleNodesClientSessionsAtRandomNode() throws InterruptedException {
List<String> clientIds = withRealm(realmId, (session, realm) -> { List<String> clientIds = withRealm(realmId, (session, realm) -> {
return IntStream.range(0, 5) return IntStream.range(0, 5)
@ -178,7 +183,7 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
Map<String, List<String>> clientSessionIds = new ConcurrentHashMap<>(); Map<String, List<String>> clientSessionIds = new ConcurrentHashMap<>();
AtomicInteger i = new AtomicInteger(); AtomicInteger i = new AtomicInteger();
inIndependentFactories(3, 30, () -> { inIndependentFactories(3, 60, () -> {
for (int j = 0; j < USER_COUNT * 3; j ++) { for (int j = 0; j < USER_COUNT * 3; j ++) {
int index = i.incrementAndGet(); int index = i.incrementAndGet();
int oid = index % offlineSessionIds.size(); int oid = index % offlineSessionIds.size();
@ -187,7 +192,11 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
String clientSessionId = createOfflineClientSession(offlineSessionId, clientIds.get(cid)); String clientSessionId = createOfflineClientSession(offlineSessionId, clientIds.get(cid));
clientSessionIds.computeIfAbsent(offlineSessionId, a -> new LinkedList<>()).add(clientSessionId); clientSessionIds.computeIfAbsent(offlineSessionId, a -> new LinkedList<>()).add(clientSessionId);
if (index % 100 == 0) { if (index % 100 == 0) {
reinitializeKeycloakSessionFactory(); // don't re-initialize all caches at the same time to avoid an unstable cluster with no leader
// otherwise seen CacheInitializer#loadSessions to loop sleeping
synchronized (OfflineSessionPersistenceTest.class) {
reinitializeKeycloakSessionFactory();
}
} }
} }
}); });
@ -282,6 +291,8 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
} }
@Test(timeout = 90 * 1000) @Test(timeout = 90 * 1000)
@RequireProvider(UserSessionPersisterProvider.class)
@RequireProvider(value = UserSessionProvider.class, only = InfinispanUserSessionProviderFactory.PROVIDER_ID)
public void testPersistenceClientSessionsMultipleNodes() throws InterruptedException { public void testPersistenceClientSessionsMultipleNodes() throws InterruptedException {
// Create offline sessions // Create offline sessions
List<String> offlineSessionIds = createOfflineSessions(realmId, userIds); List<String> offlineSessionIds = createOfflineSessions(realmId, userIds);
@ -294,19 +305,16 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
/** /**
* Assert that all the offline sessions passed in the {@code offlineSessionIds} parameter exist * Assert that all the offline sessions passed in the {@code offlineSessionIds} parameter exist
* @param factory
* @param offlineSessionIds
* @return
*/ */
private Void assertOfflineSessionsExist(String realmId, Collection<String> offlineSessionIds) { private void assertOfflineSessionsExist(String realmId, Collection<String> offlineSessionIds) {
int foundOfflineSessions = withRealm(realmId, (session, realm) -> offlineSessionIds.stream() int foundOfflineSessions = withRealm(realmId, (session, realm) -> offlineSessionIds.stream()
.map(offlineSessionId -> session.sessions().getOfflineUserSession(realm, offlineSessionId)) .map(offlineSessionId -> session.sessions().getOfflineUserSession(realm, offlineSessionId))
.map(ous -> ous == null ? 0 : 1) .map(ous -> ous == null ? 0 : 1)
.reduce(0, Integer::sum)); .reduce(0, Integer::sum));
assertThat(foundOfflineSessions, Matchers.is(USER_COUNT * OFFLINE_SESSION_COUNT_PER_USER)); assertThat(foundOfflineSessions, Matchers.is(offlineSessionIds.size()));
// catch a programming error where an empty collection of offline session IDs is passed
return null; assertThat(foundOfflineSessions, Matchers.greaterThan(0));
} }
// ***************** Helper methods ***************** // ***************** Helper methods *****************

View file

@ -38,6 +38,7 @@ import org.keycloak.models.sessions.infinispan.InfinispanUserSessionProviderFact
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -165,7 +166,7 @@ public class UserSessionInitializerTest extends KeycloakModelTest {
Optional<HotRodServerRule> hotRodServer = getParameters(HotRodServerRule.class).findFirst(); Optional<HotRodServerRule> hotRodServer = getParameters(HotRodServerRule.class).findFirst();
inIndependentFactories(4, 300, () -> { inIndependentFactories(4, 60, () -> {
synchronized (lock) { synchronized (lock) {
if (index.incrementAndGet() == 1) { if (index.incrementAndGet() == 1) {
// create a user session in the first node // create a user session in the first node

View file

@ -314,33 +314,24 @@ public class UserSessionProviderOfflineModelTest extends KeycloakModelTest {
closeKeycloakSessionFactory(); closeKeycloakSessionFactory();
AtomicBoolean result = new AtomicBoolean(true); inIndependentFactories(4, 60, () -> {
CountDownLatch latch = new CountDownLatch(4);
inIndependentFactories(4, 300, () -> {
withRealm(realmId, (session, realm) -> { withRealm(realmId, (session, realm) -> {
final UserModel user = session.users().getUserByUsername(realm, "user1"); final UserModel user = session.users().getUserByUsername(realm, "user1");
result.set(result.get() && assertOfflineSession(offlineUserSessions, session.sessions().getOfflineUserSessionsStream(realm, user).collect(Collectors.toList()))); Assert.assertTrue(assertOfflineSession(offlineUserSessions, session.sessions().getOfflineUserSessionsStream(realm, user).collect(Collectors.toList())));
return null; return null;
}); });
latch.countDown();
awaitLatch(latch);
}); });
Assert.assertTrue(result.get());
} }
@Test @Test
public void testOfflineSessionLazyLoadingPropagationBetweenNodes() throws InterruptedException { public void testOfflineSessionLazyLoadingPropagationBetweenNodes() throws InterruptedException {
AtomicReference<List<UserSessionModel>> offlineUserSessions = new AtomicReference<>(new LinkedList<>()); AtomicReference<List<UserSessionModel>> offlineUserSessions = new AtomicReference<>(new LinkedList<>());
AtomicReference<List<AuthenticatedClientSessionModel>> offlineClientSessions = new AtomicReference<>(new LinkedList<>()); AtomicReference<List<AuthenticatedClientSessionModel>> offlineClientSessions = new AtomicReference<>(new LinkedList<>());
AtomicBoolean result = new AtomicBoolean(true);
AtomicInteger index = new AtomicInteger(); AtomicInteger index = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(4);
CountDownLatch afterFirstNodeLatch = new CountDownLatch(1); CountDownLatch afterFirstNodeLatch = new CountDownLatch(1);
inIndependentFactories(4, 300, () -> { inIndependentFactories(4, 60, () -> {
if (index.incrementAndGet() == 1) { if (index.incrementAndGet() == 1) {
createOfflineSessions("user1", 10, offlineUserSessions, offlineClientSessions); createOfflineSessions("user1", 10, offlineUserSessions, offlineClientSessions);
@ -352,25 +343,24 @@ public class UserSessionProviderOfflineModelTest extends KeycloakModelTest {
inComittedTransaction(session -> { inComittedTransaction(session -> {
InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class); InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class);
Cache<String, Object> cache = provider.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME); Cache<String, Object> cache = provider.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
do { while (! cache.getAdvancedCache().getDistributionManager().isJoinComplete()) {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); } try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
} while (! cache.getAdvancedCache().getDistributionManager().isJoinComplete()); }
cache.keySet().forEach(s -> {}); cache.keySet().forEach(s -> {});
}); });
log.debug("Cluster joined"); log.debug("Cluster joined");
withRealm(realmId, (session, realm) -> { withRealm(realmId, (session, realm) -> {
final UserModel user = session.users().getUserByUsername(realm, "user1"); final UserModel user = session.users().getUserByUsername(realm, "user1");
result.set(result.get() && assertOfflineSession(offlineUserSessions, session.sessions().getOfflineUserSessionsStream(realm, user).collect(Collectors.toList()))); // it might take a moment to propagate, therefore loop
while (! assertOfflineSession(offlineUserSessions, session.sessions().getOfflineUserSessionsStream(realm, user).collect(Collectors.toList()))) {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
}
return null; return null;
}); });
latch.countDown();
awaitLatch(latch);
}); });
Assert.assertTrue(result.get());
} }
private static Set<String> createOfflineSessionIncludeClientSessions(KeycloakSession session, UserSessionModel private static Set<String> createOfflineSessionIncludeClientSessions(KeycloakSession session, UserSessionModel
@ -418,6 +408,8 @@ public class UserSessionProviderOfflineModelTest extends KeycloakModelTest {
try { try {
latch.await(); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} }
} }
} }

View file

@ -13,12 +13,15 @@ EXIT_CODE=0
mvn clean mvn clean
for I in `perl -ne 'print "$1\n" if (m,<id>([^<]+)</id>,)' pom.xml`; do for I in `perl -ne 'print "$1\n" if (m,<id>([^<]+)</id>,)' pom.xml`; do
echo "========" echo "========"
echo "======== Profile $I" echo "======== Start of Profile $I"
echo "========" echo "========"
mvn -B -Dsurefire.timeout=600 test "-P$I" "$@" 2>&1 | tee /tmp/surefire.out mvn -B -Dsurefire.timeout=900 test "-P$I" "$@" 2>&1 | tee /tmp/surefire.out
EXIT_CODE=$[$EXIT_CODE + ${PIPESTATUS[0]}] EXIT_CODE=$[$EXIT_CODE + ${PIPESTATUS[0]}]
mv target/surefire-reports "target/surefire-reports-$I" mv target/surefire-reports "target/surefire-reports-$I"
perl -ne "print '::error::| $I | Timed out.' . \"\n\" if (/There was a timeout in the fork/)" /tmp/surefire.out perl -ne "print '::error::| $I | Timed out.' . \"\n\" if (/There was a timeout in the fork/)" /tmp/surefire.out
echo "========"
echo "======== End of Profile $I"
echo "========"
done done
## If the jacoco file is present, generate reports in each of the model projects ## If the jacoco file is present, generate reports in each of the model projects