Reduce logging of errors due to the bounded queue

Closes #10588
This commit is contained in:
Alexander Schwartz 2022-03-04 12:32:45 +01:00 committed by Hynek Mlnařík
parent 7d6c6fff17
commit 3ebfc91b75
6 changed files with 61 additions and 11 deletions

View file

@ -19,10 +19,10 @@ package org.keycloak.executors;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -189,7 +189,7 @@ public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory
// Same like Executors.newCachedThreadPool. Besides that "min" and "max" are configurable
return new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new LinkedBlockingQueue<>(),
threadFactory);
}
}

View file

@ -638,4 +638,14 @@ public abstract class KeycloakModelTest {
protected boolean isUseSameKeycloakSessionFactoryForAllThreads() {
return false;
}
protected void sleep(long timeMs) {
try {
Thread.sleep(timeMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
}

View file

@ -90,7 +90,7 @@ public class CacheExpirationTest extends KeycloakModelTest {
log.debug("Waiting for caches to join the cluster");
do {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
sleep(1000);
} while (! cache.getAdvancedCache().getDistributionManager().isJoinComplete());
String site = CONFIG.scope("connectionsInfinispan", "default").get("siteName");
@ -98,7 +98,7 @@ public class CacheExpirationTest extends KeycloakModelTest {
log.debug("Waiting for cache to receive the two elements within the cluster");
do {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
sleep(1000);
} while (cache.entrySet().stream()
.filter(me -> me.getValue() instanceof AuthenticationSessionAuthNoteUpdateEvent)
.count() != 2);
@ -113,7 +113,7 @@ public class CacheExpirationTest extends KeycloakModelTest {
// original issue: https://issues.redhat.com/browse/KEYCLOAK-18518
log.debug("Waiting for garbage collection to collect the entries across all caches in JVM");
do {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
sleep(1000);
} while (getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class) > previousInstancesOfClass);
log.debug("Test completed");

View file

@ -138,7 +138,7 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
public void testPersistenceMultipleNodesClientSessionAtSameNode() throws InterruptedException {
List<String> clientIds = withRealm(realmId, (session, realm) -> {
return IntStream.range(0, 5)
.mapToObj(cid -> (ClientModel) session.clients().addClient(realm, "client-" + cid))
.mapToObj(cid -> session.clients().addClient(realm, "client-" + cid))
.map(ClientModel::getId)
.collect(Collectors.toList());
});
@ -146,7 +146,10 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
// Shutdown factory -> enforce session persistence
closeKeycloakSessionFactory();
Set<String> clientSessionIds = Collections.newSetFromMap(new ConcurrentHashMap<>());
inIndependentFactories(3, 60, () -> {
int NUM_FACTORIES = 3;
CountDownLatch intermediate = new CountDownLatch(NUM_FACTORIES);
inIndependentFactories(NUM_FACTORIES, 60, () -> {
withRealm(realmId, (session, realm) -> {
// Create offline sessions
userIds.forEach(userId -> createOfflineSessions(session, realm, userId, offlineUserSession -> {
@ -160,6 +163,27 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
}).forEach(userSessionModel -> clientSessionIds.add(userSessionModel.getId())));
return null;
});
// ensure that all session have been created on all nodes
intermediate.countDown();
try {
intermediate.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
// defer the shutdown and check if all sessions exist to ensure that they replicate across the different nodes
// this should avoid an "org.infinispan.remoting.transport.jgroups.SuspectException: ISPN000400: Node node-XX was suspected"
while (true) {
try {
assertOfflineSessionsExist(realmId, clientSessionIds);
break;
} catch (AssertionError e) {
log.warn("assertion failed, retrying to see if all sessions exist.");
sleep(1000);
}
}
});
reinitializeKeycloakSessionFactory();
@ -172,7 +196,7 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
public void testPersistenceMultipleNodesClientSessionsAtRandomNode() throws InterruptedException {
List<String> clientIds = withRealm(realmId, (session, realm) -> {
return IntStream.range(0, 5)
.mapToObj(cid -> (ClientModel) session.clients().addClient(realm, "client-" + cid))
.mapToObj(cid -> session.clients().addClient(realm, "client-" + cid))
.map(ClientModel::getId)
.collect(Collectors.toList());
});
@ -189,7 +213,22 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
int oid = index % offlineSessionIds.size();
String offlineSessionId = offlineSessionIds.get(oid);
int cid = index % clientIds.size();
String clientSessionId = createOfflineClientSession(offlineSessionId, clientIds.get(cid));
String clientSessionId;
while (true) {
try {
clientSessionId = createOfflineClientSession(offlineSessionId, clientIds.get(cid));
break;
} catch (RuntimeException ex) {
// invocation can fail when remote cache is stopping, this is actually part of this test:
// "ISPN000217: Received exception from node-8, see cause for remote stack trace
// IllegalLifecycleStateException: ISPN000324: Cache 'clientSessions' is in 'STOPPING' state and this is an invocation not belonging to an
// on-going transaction, so it does not accept new invocations."
if (ex.getCause() != null && ex.getCause().getMessage().contains("ISPN000324")) {
log.warn("invocation failed, retrying", ex);
sleep(1000);
}
}
}
clientSessionIds.computeIfAbsent(offlineSessionId, a -> new LinkedList<>()).add(clientSessionId);
if (index % 100 == 0) {
// don't re-initialize all caches at the same time to avoid an unstable cluster with no leader

View file

@ -344,7 +344,7 @@ public class UserSessionProviderOfflineModelTest extends KeycloakModelTest {
InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class);
Cache<String, Object> cache = provider.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
while (! cache.getAdvancedCache().getDistributionManager().isJoinComplete()) {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
sleep(1000);
}
cache.keySet().forEach(s -> {});
});
@ -354,7 +354,7 @@ public class UserSessionProviderOfflineModelTest extends KeycloakModelTest {
final UserModel user = session.users().getUserByUsername(realm, "user1");
// it might take a moment to propagate, therefore loop
while (! assertOfflineSession(offlineUserSessions, session.sessions().getOfflineUserSessionsStream(realm, user).collect(Collectors.toList()))) {
try { Thread.sleep(1000); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new RuntimeException(ex); }
sleep(1000);
}
return null;
});

View file

@ -55,6 +55,7 @@ log4j.logger.org.keycloak.keys.infinispan=${keycloak.infinispan.logging.level}
log4j.logger.org.keycloak.models.cache.infinispan=${keycloak.infinispan.logging.level}
log4j.logger.org.keycloak.models.sessions.infinispan=${keycloak.infinispan.logging.level}
log4j.logger.org.infinispan.CLUSTER=warn
log4j.logger.org.infinispan.server.hotrod=info
log4j.logger.org.infinispan.client.hotrod.impl=info
log4j.logger.org.infinispan.client.hotrod.event.impl=info