From 701b7acd802684db3c716c4bc124c347367b96fe Mon Sep 17 00:00:00 2001 From: mposolda Date: Tue, 7 Nov 2017 11:24:17 +0100 Subject: [PATCH] KEYCLOAK-5371 More stable cross-dc tests --- .../java/org/keycloak/common/util/Retry.java | 80 +++++++++++++++++-- .../InfinispanNotificationsManager.java | 7 +- .../InfinispanCodeToTokenStoreProvider.java | 27 +++---- .../ClientListenerExecutorDecorator.java | 24 +++++- .../remotestore/RemoteCacheInvoker.java | 27 ++++--- .../DefaultExecutorsProviderFactory.java | 2 +- 6 files changed, 125 insertions(+), 42 deletions(-) diff --git a/common/src/main/java/org/keycloak/common/util/Retry.java b/common/src/main/java/org/keycloak/common/util/Retry.java index 50f916ead1..1a6191e976 100644 --- a/common/src/main/java/org/keycloak/common/util/Retry.java +++ b/common/src/main/java/org/keycloak/common/util/Retry.java @@ -17,13 +17,16 @@ package org.keycloak.common.util; +import java.util.Random; + /** * @author Stian Thorgersen */ public class Retry { + /** - * Runs the given {@code runnable} at most {@code retryCount} times until it passes, + * Runs the given {@code runnable} at most {@code attemptsCount} times until it passes, * leaving {@code intervalMillis} milliseconds between the invocations. * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}. * @param runnable @@ -32,14 +35,14 @@ public class Retry { * @return Index of the first successful invocation, starting from 0. */ public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) { - int executionIndex = 0; + int iteration = 0; while (true) { try { runnable.run(); - return executionIndex; + return iteration; } catch (RuntimeException | AssertionError e) { attemptsCount--; - executionIndex++; + iteration++; if (attemptsCount > 0) { try { if (intervalMillis > 0) { @@ -56,8 +59,56 @@ public class Retry { } } + /** - * Runs the given {@code runnable} at most {@code retryCount} times until it passes, + * Runs the given {@code runnable} at most {@code attemptsCount} times until it passes, + * leaving some increasing random delay milliseconds between the invocations. It uses Exponential backoff + jitter algorithm + * to compute the delay. More details https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + * + * The base for delay is specified by {@code intervalBaseMillis} number. + * + * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}. + * + * @param runnable + * @param attemptsCount Total number of attempts to execute the {@code runnable} + * @param intervalBaseMillis base for the exponential backoff + jitter + * + * @return Index of the first successful invocation, starting from 0. + */ + public static int executeWithBackoff(AdvancedRunnable runnable, int attemptsCount, int intervalBaseMillis) { + int iteration = 0; + while (true) { + try { + runnable.run(iteration); + return iteration; + } catch (RuntimeException | AssertionError e) { + attemptsCount--; + iteration++; + if (attemptsCount > 0) { + try { + if (intervalBaseMillis > 0) { + int delay = computeBackoffInterval(intervalBaseMillis, iteration); + Thread.sleep(delay); + } + } catch (InterruptedException ie) { + ie.addSuppressed(e); + throw new RuntimeException(ie); + } + } else { + throw e; + } + } + } + } + + private static int computeBackoffInterval(int base, int iteration) { + int iterationBase = base * (int)Math.pow(2, iteration); + return new Random().nextInt(iterationBase); + } + + + /** + * Runs the given {@code runnable} at most {@code attemptsCount} times until it passes, * leaving {@code intervalMillis} milliseconds between the invocations. * The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}. * @param supplier @@ -66,11 +117,13 @@ public class Retry { * @return Value generated by the {@code supplier}. */ public static T call(Supplier supplier, int attemptsCount, long intervalMillis) { + int iteration = 0; while (true) { try { - return supplier.get(); + return supplier.get(iteration); } catch (RuntimeException | AssertionError e) { attemptsCount--; + iteration++; if (attemptsCount > 0) { try { if (intervalMillis > 0) { @@ -89,7 +142,18 @@ public class Retry { /** - * Needed here just because java.util.function.Supplier defined from Java 8 + * Runnable, which provides some additional info (iteration for now) + */ + public interface AdvancedRunnable { + + void run(int iteration); + + } + + /** + * Needed here because: + * - java.util.function.Supplier defined from Java 8 + * - Adds some additional info (current iteration) */ public interface Supplier { @@ -98,7 +162,7 @@ public class Retry { * * @return a result */ - T get(); + T get(int iteration); } diff --git a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java index 65946a4efe..847e1449a3 100644 --- a/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java +++ b/model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java @@ -222,11 +222,8 @@ public class InfinispanNotificationsManager { }); } catch (RejectedExecutionException ree) { - logger.warnf("Rejected submitting of the event for key: %s. Probably server going to shutdown", key); - - if (logger.isDebugEnabled()) { - logger.debug(ree.getMessage(), ree); - } + logger.errorf("Rejected submitting of the event for key: %s. Value: %s, Server going to shutdown or pool exhausted. Pool: %s", key, workCache.get(key), listenersExecutor.toString()); + throw ree; } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java index 1928733b1b..463b777bfc 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java @@ -21,6 +21,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.commons.api.BasicCache; import org.jboss.logging.Logger; import org.keycloak.common.util.Retry; @@ -49,24 +50,20 @@ public class InfinispanCodeToTokenStoreProvider implements CodeToTokenStoreProvi int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan(); - boolean codeAlreadyExists = Retry.call(() -> { - - try { - BasicCache cache = codeCache.get(); - ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS); - return existing == null; - } catch (RuntimeException re) { - if (logger.isDebugEnabled()) { - logger.debugf(re, "Failed when adding code %s", codeId); - } - - // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation. - throw re; + try { + BasicCache cache = codeCache.get(); + ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS); + return existing == null; + } catch (HotRodClientException re) { + // No need to retry. The hotrod (remoteCache) has some retries in itself in case of some random network error happened. + // In case of lock conflict, we don't want to retry anyway as there was likely an attempt to use the code from different place. + if (logger.isDebugEnabled()) { + logger.debugf(re, "Failed when adding code %s", codeId); } - }, 3, 0); + return false; + } - return codeAlreadyExists; } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java index 58b83a081c..f2221cb714 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; @@ -28,6 +29,7 @@ import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; import org.infinispan.client.hotrod.event.ClientEvent; import org.jboss.logging.Logger; import org.keycloak.common.util.MultivaluedHashMap; +import org.keycloak.common.util.Time; import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED; import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED; @@ -94,24 +96,40 @@ public class ClientListenerExecutorDecorator { // Assume it's called from the synchronized block private void submitImpl(K key, MyClientEvent event, Runnable r) { - logger.debugf("Submitting event to the executor: %s", event.toString()); + logger.debugf("Submitting event to the executor: %s . eventsInProgress size: %d, eventsQueue size: %d", event.toString(), eventsInProgress.size(), eventsQueue.size()); eventsInProgress.put(key, event); Runnable decoratedRunnable = () -> { + Long start = null; try { + if (logger.isDebugEnabled()) { + start = Time.currentTimeMillis(); + } + r.run(); } finally { synchronized (lock) { - logger.debugf("Finished processing event by the executor: %s", event.toString()); eventsInProgress.remove(key); + if (logger.isDebugEnabled()) { + long took = Time.currentTimeMillis() - start; + logger.debugf("Finished processing event by the executor: %s, took: %d ms. EventsInProgress size: %d", event.toString(), took, eventsInProgress.size()); + } + pollQueue(key); } } }; - decorated.submit(decoratedRunnable); + try { + decorated.submit(decoratedRunnable); + } catch (RejectedExecutionException ree) { + eventsInProgress.remove(key); + + logger.errorf("Rejected execution of task for the event '%s' . Try to increase the pool size. Pool is '%s'", event.toString(), decorated.toString()); + throw ree; + } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java index 49024ac686..404d3ca6e8 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java @@ -17,6 +17,7 @@ package org.keycloak.models.sessions.infinispan.remotestore; +import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.keycloak.common.util.Retry; import org.keycloak.common.util.Time; import java.util.Collections; @@ -69,7 +70,9 @@ public class RemoteCacheInvoker { SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper); if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) { - logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation); + if (logger.isTraceEnabled()) { + logger.tracef("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation); + } return; } @@ -78,23 +81,25 @@ public class RemoteCacheInvoker { // Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh) final long maxIdleTimeMs = loadedMaxIdleTimeMs * 2; - logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key); + if (logger.isTraceEnabled()) { + logger.tracef("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key); + } - Retry.execute(() -> { + Retry.executeWithBackoff((int iteration) -> { try { runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper); - } catch (RuntimeException re) { + } catch (HotRodClientException re) { if (logger.isDebugEnabled()) { - logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s' . Will try to retry the task", - operation, cacheName, key); + logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s', iteration '%s'. Will try to retry the task", + operation, cacheName, key, iteration); } // Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation. throw re; } - }, 10, 0); + }, 10, 10); } @@ -146,15 +151,17 @@ public class RemoteCacheInvoker { // Run task on the remote session task.runUpdate(session); - logger.debugf("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session); + if (logger.isTraceEnabled()) { + logger.tracef("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session); + } replaced = remoteCache.replaceWithVersion(key, SessionEntityWrapper.forTransport(session), versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS); if (!replaced) { logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion()); } else { - if (logger.isDebugEnabled()) { - logger.debugf("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session); + if (logger.isTraceEnabled()) { + logger.tracef("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session); } } } diff --git a/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java b/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java index adc761dfc5..4d7ccb4152 100644 --- a/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java +++ b/services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java @@ -45,7 +45,7 @@ public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory protected static final Logger logger = Logger.getLogger(DefaultExecutorsProviderFactory.class); private static final int DEFAULT_MIN_THREADS = 4; - private static final int DEFAULT_MAX_THREADS = 16; + private static final int DEFAULT_MAX_THREADS = 64; private static final String MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX = "java:jboss/ee/concurrency/executor/";