Retry fetching event from remote cache

Closes #28303

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
Co-authored-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-05-06 16:27:07 +01:00 committed by GitHub
parent eb9b19abe9
commit fe5bed6191
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 41 additions and 34 deletions

View file

@ -18,7 +18,7 @@
package org.keycloak.common.util; package org.keycloak.common.util;
import java.time.Duration; import java.time.Duration;
import java.util.Random; import java.util.concurrent.ThreadLocalRandom;
/** /**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a> * @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
@ -125,8 +125,8 @@ public class Retry {
} }
} }
private static int computeBackoffInterval(int base, int iteration) { public static int computeBackoffInterval(int base, int iteration) {
return new Random().nextInt(computeIterationBase(base, iteration)); return ThreadLocalRandom.current().nextInt(computeIterationBase(base, iteration));
} }
private static int computeIterationBase(int base, int iteration) { private static int computeIterationBase(int base, int iteration) {

View file

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
@ -36,6 +37,7 @@ import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.notifications.Listener; import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
@ -54,7 +56,6 @@ import org.keycloak.common.util.Retry;
import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory; import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory;
import org.keycloak.executors.ExecutorsProvider; import org.keycloak.executors.ExecutorsProvider;
import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSession;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import static org.keycloak.cluster.infinispan.InfinispanClusterProvider.TASK_KEY_PREFIX; import static org.keycloak.cluster.infinispan.InfinispanClusterProvider.TASK_KEY_PREFIX;
@ -67,13 +68,16 @@ public class InfinispanNotificationsManager {
protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class); protected static final Logger logger = Logger.getLogger(InfinispanNotificationsManager.class);
private static final int BACKOFF_BASE_MILLIS = 10;
private static final int MAX_BACKOFF_RETRIES = 10;
private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>(); private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>();
private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>();
private final Cache<String, Serializable> workCache; private final Cache<String, Serializable> workCache;
private final RemoteCache workRemoteCache; private final RemoteCache<Object, Serializable> workRemoteCache;
private final String myAddress; private final String myAddress;
@ -81,8 +85,7 @@ public class InfinispanNotificationsManager {
private final ExecutorService listenersExecutor; private final ExecutorService listenersExecutor;
protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache<Object, Serializable> workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
this.workCache = workCache; this.workCache = workCache;
this.workRemoteCache = workRemoteCache; this.workRemoteCache = workRemoteCache;
this.myAddress = myAddress; this.myAddress = myAddress;
@ -93,7 +96,7 @@ public class InfinispanNotificationsManager {
// Create and init manager including all listeners etc // Create and init manager including all listeners etc
public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) { public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
RemoteCache workRemoteCache = null; RemoteCache<Object, Serializable> workRemoteCache = null;
if (!remoteStores.isEmpty()) { if (!remoteStores.isEmpty()) {
RemoteStore remoteStore = remoteStores.iterator().next(); RemoteStore remoteStore = remoteStores.iterator().next();
@ -189,12 +192,12 @@ public class InfinispanNotificationsManager {
@CacheEntryModified @CacheEntryModified
public void cacheEntryModified(CacheEntryModifiedEvent<String, Serializable> event) { public void cacheEntryModified(CacheEntryModifiedEvent<String, Serializable> event) {
eventReceived(event.getKey(), event.getValue()); eventReceived(event.getKey(), event.getNewValue());
} }
@CacheEntryRemoved @CacheEntryRemoved
public void cacheEntryRemoved(CacheEntryRemovedEvent<String, Serializable> event) { public void cacheEntryRemoved(CacheEntryRemovedEvent<String, Serializable> event) {
taskFinished(event.getKey(), true); taskFinished(event.getKey());
} }
} }
@ -203,31 +206,28 @@ public class InfinispanNotificationsManager {
@ClientListener @ClientListener
public class HotRodListener { public class HotRodListener {
private final RemoteCache<Object, Object> remoteCache; private final RemoteCache<Object, Serializable> remoteCache;
public HotRodListener(RemoteCache<Object, Object> remoteCache) { public HotRodListener(RemoteCache<Object, Serializable> remoteCache) {
this.remoteCache = remoteCache; this.remoteCache = remoteCache;
} }
@ClientCacheEntryCreated @ClientCacheEntryCreated
public void created(ClientCacheEntryCreatedEvent event) { public void created(ClientCacheEntryCreatedEvent<String> event) {
String key = event.getKey().toString(); hotrodEventReceived(event.getKey());
hotrodEventReceived(key);
} }
@ClientCacheEntryModified @ClientCacheEntryModified
public void updated(ClientCacheEntryModifiedEvent event) { public void updated(ClientCacheEntryModifiedEvent<String> event) {
String key = event.getKey().toString(); hotrodEventReceived(event.getKey());
hotrodEventReceived(key);
} }
@ClientCacheEntryRemoved @ClientCacheEntryRemoved
public void removed(ClientCacheEntryRemovedEvent event) { public void removed(ClientCacheEntryRemovedEvent<String> event) {
String key = event.getKey().toString(); taskFinished(event.getKey());
taskFinished(key, true);
} }
@ -235,11 +235,22 @@ public class InfinispanNotificationsManager {
// TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request // TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request
try { try {
listenersExecutor.submit(() -> { listenersExecutor.submit(() -> {
Object value = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> Supplier<Serializable> fetchEvent = () -> remoteCache.get(key);
// We've seen deadlocks in Infinispan 14.x when shutting down Infinispan concurrently, therefore wrapping this Serializable event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
remoteCache.get(key) int iteration = 0;
); // Event might have been generated from a node which is more up-to-date, so the fetch might return null.
eventReceived(key, (Serializable) value); // Retry until we find a node that is up-to-date and has the entry.
while (event == null && iteration < MAX_BACKOFF_RETRIES) {
++iteration;
try {
Thread.sleep(Retry.computeBackoffInterval(BACKOFF_BASE_MILLIS, iteration));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
}
eventReceived(key, event);
}); });
} catch (RejectedExecutionException ree) { } catch (RejectedExecutionException ree) {
@ -254,11 +265,10 @@ public class InfinispanNotificationsManager {
} }
} }
} }
} }
private void eventReceived(String key, Serializable obj) { private void eventReceived(String key, Serializable obj) {
if (!(obj instanceof WrapperClusterEvent)) { if (!(obj instanceof WrapperClusterEvent event)) {
// Items with the TASK_KEY_PREFIX might be gone fast once the locking is complete, therefore, don't log them. // Items with the TASK_KEY_PREFIX might be gone fast once the locking is complete, therefore, don't log them.
// It is still good to have the warning in case of real events return null because they have been, for example, expired // It is still good to have the warning in case of real events return null because they have been, for example, expired
if (obj == null && !key.startsWith(TASK_KEY_PREFIX)) { if (obj == null && !key.startsWith(TASK_KEY_PREFIX)) {
@ -267,8 +277,6 @@ public class InfinispanNotificationsManager {
return; return;
} }
WrapperClusterEvent event = (WrapperClusterEvent) obj;
if (event.isIgnoreSender()) { if (event.isIgnoreSender()) {
if (this.myAddress.equals(event.getSender())) { if (this.myAddress.equals(event.getSender())) {
return; return;
@ -298,16 +306,15 @@ public class InfinispanNotificationsManager {
} }
void taskFinished(String taskKey, boolean success) { void taskFinished(String taskKey) {
TaskCallback callback = taskCallbacks.remove(taskKey); TaskCallback callback = taskCallbacks.remove(taskKey);
if (callback != null) { if (callback != null) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debugf("Finished task '%s' with '%b'", taskKey, success); logger.debugf("Finished task '%s' with '%b'", taskKey, true);
} }
callback.setSuccess(success); callback.setSuccess(true);
callback.getTaskCompletedLatch().countDown(); callback.getTaskCompletedLatch().countDown();
} }
} }
} }