diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java index 1fa7321acc..6d3309bc42 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java @@ -17,6 +17,7 @@ package org.keycloak.models.sessions.infinispan.changes; +import org.infinispan.AdvancedCache; import org.infinispan.Cache; import org.infinispan.context.Flag; import org.jboss.logging.Logger; @@ -28,7 +29,6 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -82,63 +82,49 @@ public class EmbeddedCachesChangesPerformer implemen } } - private CompletableFuture replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { + private CompletableFuture replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntityFirst, long lifespanMs, long maxIdleTimeMs) { + AdvancedCache> writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache); // make one async attempt - return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replaceAsync(key, oldVersionEntity, generateNewVersionAndWrapEntity(oldVersionEntity.getEntity(), oldVersionEntity.getLocalMetadata()), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS) - .thenAccept(replaced -> { + SessionEntityWrapper newVersionEntityFirst = generateNewVersionAndWrapEntity(oldVersionEntityFirst.getEntity(), oldVersionEntityFirst.getLocalMetadata()); + return writeCache.computeIfPresentAsync(key, new ReplaceFunction<>(oldVersionEntityFirst.getVersion(), newVersionEntityFirst), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS) + .thenAccept(returnValue -> { int iteration = 0; - SessionEntityWrapper oldVersion = oldVersionEntity; - while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { - iteration++; + SessionEntityWrapper newVersionEntity = newVersionEntityFirst; + SessionEntityWrapper oldVersion = oldVersionEntityFirst; - V session = oldVersion.getEntity(); - SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); + while (true) { + if (returnValue == null) { + LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); + return; + } - // Atomic cluster-aware replace - replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - - // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again - if (!replaced) { - if (LOG.isDebugEnabled()) { - LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); - } - backoff(iteration); - - oldVersion = cache.get(key); - - if (oldVersion == null) { - LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); - return; - } - - session = oldVersion.getEntity(); - - task.runUpdate(session); - } else { + if (returnValue.getVersion().equals(newVersionEntity.getVersion())){ if (LOG.isTraceEnabled()) { LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); } - } - } - if (!replaced) { - LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); + return; + } else { + if (LOG.isTraceEnabled()) { + LOG.tracef("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); + } + } + + if (++iteration >= InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { + LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue); + return; + } + + oldVersion = returnValue; + V session = oldVersion.getEntity(); + task.runUpdate(session); + newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); + + returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); } }); } - /** - * Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt. - */ - private static void backoff(int iteration) { - try { - Thread.sleep(new Random().nextInt(iteration)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - private SessionEntityWrapper generateNewVersionAndWrapEntity(V entity, Map localMetadata) { return new SessionEntityWrapper<>(localMetadata, entity); }