Merge pull request #4647 from mposolda/crossdc
KEYCLOAK-5371 More stable cross-dc tests
This commit is contained in:
commit
fce9cc205c
6 changed files with 125 additions and 42 deletions
|
@ -17,13 +17,16 @@
|
||||||
|
|
||||||
package org.keycloak.common.util;
|
package org.keycloak.common.util;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
|
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
|
||||||
*/
|
*/
|
||||||
public class Retry {
|
public class Retry {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the given {@code runnable} at most {@code retryCount} times until it passes,
|
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
|
||||||
* leaving {@code intervalMillis} milliseconds between the invocations.
|
* leaving {@code intervalMillis} milliseconds between the invocations.
|
||||||
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
|
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
|
||||||
* @param runnable
|
* @param runnable
|
||||||
|
@ -32,14 +35,14 @@ public class Retry {
|
||||||
* @return Index of the first successful invocation, starting from 0.
|
* @return Index of the first successful invocation, starting from 0.
|
||||||
*/
|
*/
|
||||||
public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) {
|
public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) {
|
||||||
int executionIndex = 0;
|
int iteration = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
runnable.run();
|
runnable.run();
|
||||||
return executionIndex;
|
return iteration;
|
||||||
} catch (RuntimeException | AssertionError e) {
|
} catch (RuntimeException | AssertionError e) {
|
||||||
attemptsCount--;
|
attemptsCount--;
|
||||||
executionIndex++;
|
iteration++;
|
||||||
if (attemptsCount > 0) {
|
if (attemptsCount > 0) {
|
||||||
try {
|
try {
|
||||||
if (intervalMillis > 0) {
|
if (intervalMillis > 0) {
|
||||||
|
@ -56,8 +59,56 @@ public class Retry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the given {@code runnable} at most {@code retryCount} times until it passes,
|
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
|
||||||
|
* leaving some increasing random delay milliseconds between the invocations. It uses Exponential backoff + jitter algorithm
|
||||||
|
* to compute the delay. More details https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
|
||||||
|
*
|
||||||
|
* The base for delay is specified by {@code intervalBaseMillis} number.
|
||||||
|
*
|
||||||
|
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
|
||||||
|
*
|
||||||
|
* @param runnable
|
||||||
|
* @param attemptsCount Total number of attempts to execute the {@code runnable}
|
||||||
|
* @param intervalBaseMillis base for the exponential backoff + jitter
|
||||||
|
*
|
||||||
|
* @return Index of the first successful invocation, starting from 0.
|
||||||
|
*/
|
||||||
|
public static int executeWithBackoff(AdvancedRunnable runnable, int attemptsCount, int intervalBaseMillis) {
|
||||||
|
int iteration = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
runnable.run(iteration);
|
||||||
|
return iteration;
|
||||||
|
} catch (RuntimeException | AssertionError e) {
|
||||||
|
attemptsCount--;
|
||||||
|
iteration++;
|
||||||
|
if (attemptsCount > 0) {
|
||||||
|
try {
|
||||||
|
if (intervalBaseMillis > 0) {
|
||||||
|
int delay = computeBackoffInterval(intervalBaseMillis, iteration);
|
||||||
|
Thread.sleep(delay);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
ie.addSuppressed(e);
|
||||||
|
throw new RuntimeException(ie);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int computeBackoffInterval(int base, int iteration) {
|
||||||
|
int iterationBase = base * (int)Math.pow(2, iteration);
|
||||||
|
return new Random().nextInt(iterationBase);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
|
||||||
* leaving {@code intervalMillis} milliseconds between the invocations.
|
* leaving {@code intervalMillis} milliseconds between the invocations.
|
||||||
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
|
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
|
||||||
* @param supplier
|
* @param supplier
|
||||||
|
@ -66,11 +117,13 @@ public class Retry {
|
||||||
* @return Value generated by the {@code supplier}.
|
* @return Value generated by the {@code supplier}.
|
||||||
*/
|
*/
|
||||||
public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalMillis) {
|
public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalMillis) {
|
||||||
|
int iteration = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
return supplier.get();
|
return supplier.get(iteration);
|
||||||
} catch (RuntimeException | AssertionError e) {
|
} catch (RuntimeException | AssertionError e) {
|
||||||
attemptsCount--;
|
attemptsCount--;
|
||||||
|
iteration++;
|
||||||
if (attemptsCount > 0) {
|
if (attemptsCount > 0) {
|
||||||
try {
|
try {
|
||||||
if (intervalMillis > 0) {
|
if (intervalMillis > 0) {
|
||||||
|
@ -89,7 +142,18 @@ public class Retry {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Needed here just because java.util.function.Supplier defined from Java 8
|
* Runnable, which provides some additional info (iteration for now)
|
||||||
|
*/
|
||||||
|
public interface AdvancedRunnable {
|
||||||
|
|
||||||
|
void run(int iteration);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Needed here because:
|
||||||
|
* - java.util.function.Supplier defined from Java 8
|
||||||
|
* - Adds some additional info (current iteration)
|
||||||
*/
|
*/
|
||||||
public interface Supplier<T> {
|
public interface Supplier<T> {
|
||||||
|
|
||||||
|
@ -98,7 +162,7 @@ public class Retry {
|
||||||
*
|
*
|
||||||
* @return a result
|
* @return a result
|
||||||
*/
|
*/
|
||||||
T get();
|
T get(int iteration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -222,11 +222,8 @@ public class InfinispanNotificationsManager {
|
||||||
|
|
||||||
});
|
});
|
||||||
} catch (RejectedExecutionException ree) {
|
} catch (RejectedExecutionException ree) {
|
||||||
logger.warnf("Rejected submitting of the event for key: %s. Probably server going to shutdown", key);
|
logger.errorf("Rejected submitting of the event for key: %s. Value: %s, Server going to shutdown or pool exhausted. Pool: %s", key, workCache.get(key), listenersExecutor.toString());
|
||||||
|
throw ree;
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug(ree.getMessage(), ree);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
||||||
import org.infinispan.commons.api.BasicCache;
|
import org.infinispan.commons.api.BasicCache;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.keycloak.common.util.Retry;
|
import org.keycloak.common.util.Retry;
|
||||||
|
@ -49,24 +50,20 @@ public class InfinispanCodeToTokenStoreProvider implements CodeToTokenStoreProvi
|
||||||
|
|
||||||
int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan();
|
int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan();
|
||||||
|
|
||||||
boolean codeAlreadyExists = Retry.call(() -> {
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
|
BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
|
||||||
ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
|
ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
|
||||||
return existing == null;
|
return existing == null;
|
||||||
} catch (RuntimeException re) {
|
} catch (HotRodClientException re) {
|
||||||
|
// No need to retry. The hotrod (remoteCache) has some retries in itself in case of some random network error happened.
|
||||||
|
// In case of lock conflict, we don't want to retry anyway as there was likely an attempt to use the code from different place.
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debugf(re, "Failed when adding code %s", codeId);
|
logger.debugf(re, "Failed when adding code %s", codeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
return false;
|
||||||
throw re;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}, 3, 0);
|
|
||||||
|
|
||||||
return codeAlreadyExists;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||||
|
@ -28,6 +29,7 @@ import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
|
||||||
import org.infinispan.client.hotrod.event.ClientEvent;
|
import org.infinispan.client.hotrod.event.ClientEvent;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.keycloak.common.util.MultivaluedHashMap;
|
import org.keycloak.common.util.MultivaluedHashMap;
|
||||||
|
import org.keycloak.common.util.Time;
|
||||||
|
|
||||||
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
|
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
|
||||||
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;
|
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;
|
||||||
|
@ -94,24 +96,40 @@ public class ClientListenerExecutorDecorator<K> {
|
||||||
|
|
||||||
// Assume it's called from the synchronized block
|
// Assume it's called from the synchronized block
|
||||||
private void submitImpl(K key, MyClientEvent event, Runnable r) {
|
private void submitImpl(K key, MyClientEvent event, Runnable r) {
|
||||||
logger.debugf("Submitting event to the executor: %s", event.toString());
|
logger.debugf("Submitting event to the executor: %s . eventsInProgress size: %d, eventsQueue size: %d", event.toString(), eventsInProgress.size(), eventsQueue.size());
|
||||||
|
|
||||||
eventsInProgress.put(key, event);
|
eventsInProgress.put(key, event);
|
||||||
|
|
||||||
Runnable decoratedRunnable = () -> {
|
Runnable decoratedRunnable = () -> {
|
||||||
|
Long start = null;
|
||||||
try {
|
try {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
start = Time.currentTimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
r.run();
|
r.run();
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
logger.debugf("Finished processing event by the executor: %s", event.toString());
|
|
||||||
eventsInProgress.remove(key);
|
eventsInProgress.remove(key);
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
long took = Time.currentTimeMillis() - start;
|
||||||
|
logger.debugf("Finished processing event by the executor: %s, took: %d ms. EventsInProgress size: %d", event.toString(), took, eventsInProgress.size());
|
||||||
|
}
|
||||||
|
|
||||||
pollQueue(key);
|
pollQueue(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
decorated.submit(decoratedRunnable);
|
decorated.submit(decoratedRunnable);
|
||||||
|
} catch (RejectedExecutionException ree) {
|
||||||
|
eventsInProgress.remove(key);
|
||||||
|
|
||||||
|
logger.errorf("Rejected execution of task for the event '%s' . Try to increase the pool size. Pool is '%s'", event.toString(), decorated.toString());
|
||||||
|
throw ree;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.keycloak.models.sessions.infinispan.remotestore;
|
package org.keycloak.models.sessions.infinispan.remotestore;
|
||||||
|
|
||||||
|
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
||||||
import org.keycloak.common.util.Retry;
|
import org.keycloak.common.util.Retry;
|
||||||
import org.keycloak.common.util.Time;
|
import org.keycloak.common.util.Time;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -69,7 +70,9 @@ public class RemoteCacheInvoker {
|
||||||
SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper);
|
SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper);
|
||||||
|
|
||||||
if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) {
|
if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) {
|
||||||
logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.tracef("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,23 +81,25 @@ public class RemoteCacheInvoker {
|
||||||
// Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh)
|
// Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh)
|
||||||
final long maxIdleTimeMs = loadedMaxIdleTimeMs * 2;
|
final long maxIdleTimeMs = loadedMaxIdleTimeMs * 2;
|
||||||
|
|
||||||
logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.tracef("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
|
||||||
|
}
|
||||||
|
|
||||||
Retry.execute(() -> {
|
Retry.executeWithBackoff((int iteration) -> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
|
runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
|
||||||
} catch (RuntimeException re) {
|
} catch (HotRodClientException re) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s' . Will try to retry the task",
|
logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s', iteration '%s'. Will try to retry the task",
|
||||||
operation, cacheName, key);
|
operation, cacheName, key, iteration);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
||||||
throw re;
|
throw re;
|
||||||
}
|
}
|
||||||
|
|
||||||
}, 10, 0);
|
}, 10, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -146,15 +151,17 @@ public class RemoteCacheInvoker {
|
||||||
// Run task on the remote session
|
// Run task on the remote session
|
||||||
task.runUpdate(session);
|
task.runUpdate(session);
|
||||||
|
|
||||||
logger.debugf("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.tracef("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
|
||||||
|
}
|
||||||
|
|
||||||
replaced = remoteCache.replaceWithVersion(key, SessionEntityWrapper.forTransport(session), versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
|
replaced = remoteCache.replaceWithVersion(key, SessionEntityWrapper.forTransport(session), versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
if (!replaced) {
|
if (!replaced) {
|
||||||
logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion());
|
logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion());
|
||||||
} else {
|
} else {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.debugf("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
|
logger.tracef("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory
|
||||||
protected static final Logger logger = Logger.getLogger(DefaultExecutorsProviderFactory.class);
|
protected static final Logger logger = Logger.getLogger(DefaultExecutorsProviderFactory.class);
|
||||||
|
|
||||||
private static final int DEFAULT_MIN_THREADS = 4;
|
private static final int DEFAULT_MIN_THREADS = 4;
|
||||||
private static final int DEFAULT_MAX_THREADS = 16;
|
private static final int DEFAULT_MAX_THREADS = 64;
|
||||||
|
|
||||||
private static final String MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX = "java:jboss/ee/concurrency/executor/";
|
private static final String MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX = "java:jboss/ee/concurrency/executor/";
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue