From 355901dfd8eba8b08e436a5ed1ae6b25440d5e91 Mon Sep 17 00:00:00 2001 From: Alexander Schwartz Date: Wed, 3 Apr 2024 11:10:39 +0200 Subject: [PATCH] Add a back-off period when replacing cache entries fails Closes #28388 Signed-off-by: Alexander Schwartz --- .../EmbeddedCachesChangesPerformer.java | 53 ++++++++++++------- .../InfinispanChangelogBasedTransaction.java | 52 +++++++++++------- 2 files changed, 65 insertions(+), 40 deletions(-) diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java index 8d847c7851..c23378672d 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/EmbeddedCachesChangesPerformer.java @@ -88,47 +88,60 @@ public class EmbeddedCachesChangesPerformer implemen private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { serializer.runSerialized(key, () -> { SessionEntityWrapper oldVersion = oldVersionEntity; - boolean replaced = false; - int iteration = 0; + boolean replaced = false; + int iteration = 0; V session = oldVersion.getEntity(); - while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { - iteration++; + while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { + iteration++; SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); - // Atomic cluster-aware replace + // Atomic cluster-aware replace replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again - if (!replaced) { - if (LOG.isDebugEnabled()) { - LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); - } + // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again + if (!replaced) { + if (LOG.isDebugEnabled()) { + LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); + } + backoff(iteration); oldVersion = cache.get(key); if (oldVersion == null) { - LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); - return; - } + LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); + return; + } session = oldVersion.getEntity(); - task.runUpdate(session); - } else { - if (LOG.isTraceEnabled()) { + task.runUpdate(session); + } else { + if (LOG.isTraceEnabled()) { LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + } } } - } - if (!replaced) { - LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); - } + if (!replaced) { + LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); + } }); } + /** + * Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt. + */ + private static void backoff(int iteration) { + try { + Thread.sleep(new Random().nextInt(iteration)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + private SessionEntityWrapper generateNewVersionAndWrapEntity(V entity, Map localMetadata) { return new SessionEntityWrapper<>(localMetadata, entity); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java index 47ade84a35..3cb5b2aad9 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/InfinispanChangelogBasedTransaction.java @@ -237,47 +237,59 @@ public class InfinispanChangelogBasedTransaction ext private void replace(K key, MergedUpdate task, SessionEntityWrapper oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { serializer.runSerialized(key, () -> { SessionEntityWrapper oldVersion = oldVersionEntity; - boolean replaced = false; - int iteration = 0; + boolean replaced = false; + int iteration = 0; V session = oldVersion.getEntity(); - while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { - iteration++; + while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { + iteration++; SessionEntityWrapper newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); - // Atomic cluster-aware replace + // Atomic cluster-aware replace replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); - // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again - if (!replaced) { - if (logger.isDebugEnabled()) { - logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); - } + // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again + if (!replaced) { + if (logger.isDebugEnabled()) { + logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); + } + backoff(iteration); oldVersion = cache.get(key); if (oldVersion == null) { - logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); - return; - } + logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); + return; + } session = oldVersion.getEntity(); - task.runUpdate(session); - } else { - if (logger.isTraceEnabled()) { + task.runUpdate(session); + } else { + if (logger.isTraceEnabled()) { logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); + } } } - } - if (!replaced) { - logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); - } + if (!replaced) { + logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); + } }); } + /** + * Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt. + */ + private static void backoff(int iteration) { + try { + Thread.sleep(new Random().nextInt(iteration)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } @Override protected void rollbackImpl() {