Add a back-off period when replacing cache entries fails

Closes #28388

Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
Alexander Schwartz 2024-04-03 11:10:39 +02:00 committed by Alexander Schwartz
parent 63e7523a6d
commit 355901dfd8
2 changed files with 65 additions and 40 deletions

View file

@ -88,47 +88,60 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
serializer.runSerialized(key, () -> { serializer.runSerialized(key, () -> {
SessionEntityWrapper<V> oldVersion = oldVersionEntity; SessionEntityWrapper<V> oldVersion = oldVersionEntity;
boolean replaced = false; boolean replaced = false;
int iteration = 0; int iteration = 0;
V session = oldVersion.getEntity(); V session = oldVersion.getEntity();
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
iteration++; iteration++;
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
// Atomic cluster-aware replace // Atomic cluster-aware replace
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
if (!replaced) { if (!replaced) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
} }
backoff(iteration);
oldVersion = cache.get(key); oldVersion = cache.get(key);
if (oldVersion == null) { if (oldVersion == null) {
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return; return;
} }
session = oldVersion.getEntity(); session = oldVersion.getEntity();
task.runUpdate(session); task.runUpdate(session);
} else { } else {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
} }
} }
}
if (!replaced) { if (!replaced) {
LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
} }
}); });
} }
/**
* Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt.
*/
private static void backoff(int iteration) {
try {
Thread.sleep(new Random().nextInt(iteration));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) { private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
return new SessionEntityWrapper<>(localMetadata, entity); return new SessionEntityWrapper<>(localMetadata, entity);
} }

View file

@ -237,47 +237,59 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) { private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
serializer.runSerialized(key, () -> { serializer.runSerialized(key, () -> {
SessionEntityWrapper<V> oldVersion = oldVersionEntity; SessionEntityWrapper<V> oldVersion = oldVersionEntity;
boolean replaced = false; boolean replaced = false;
int iteration = 0; int iteration = 0;
V session = oldVersion.getEntity(); V session = oldVersion.getEntity();
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) { while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
iteration++; iteration++;
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata()); SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
// Atomic cluster-aware replace // Atomic cluster-aware replace
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS); replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again // Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
if (!replaced) { if (!replaced) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion()); logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
} }
backoff(iteration);
oldVersion = cache.get(key); oldVersion = cache.get(key);
if (oldVersion == null) { if (oldVersion == null) {
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key); logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
return; return;
} }
session = oldVersion.getEntity(); session = oldVersion.getEntity();
task.runUpdate(session); task.runUpdate(session);
} else { } else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs()); logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
}
} }
} }
}
if (!replaced) { if (!replaced) {
logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName()); logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
} }
}); });
} }
/**
* Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt.
*/
private static void backoff(int iteration) {
try {
Thread.sleep(new Random().nextInt(iteration));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override @Override
protected void rollbackImpl() { protected void rollbackImpl() {