Use computeIfPresent also for Persistent sessions
Follow-up-on #29073 Signed-off-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
parent
4697cc956b
commit
e93b7d4f3a
1 changed files with 32 additions and 46 deletions
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.keycloak.models.sessions.infinispan.changes;
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
|
import org.infinispan.AdvancedCache;
|
||||||
import org.infinispan.Cache;
|
import org.infinispan.Cache;
|
||||||
import org.infinispan.context.Flag;
|
import org.infinispan.context.Flag;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -28,7 +29,6 @@ import java.util.ArrayList;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -82,63 +82,49 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<?> replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
private CompletableFuture<?> replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntityFirst, long lifespanMs, long maxIdleTimeMs) {
|
||||||
|
AdvancedCache<K, SessionEntityWrapper<V>> writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
|
||||||
// make one async attempt
|
// make one async attempt
|
||||||
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replaceAsync(key, oldVersionEntity, generateNewVersionAndWrapEntity(oldVersionEntity.getEntity(), oldVersionEntity.getLocalMetadata()), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS)
|
SessionEntityWrapper<V> newVersionEntityFirst = generateNewVersionAndWrapEntity(oldVersionEntityFirst.getEntity(), oldVersionEntityFirst.getLocalMetadata());
|
||||||
.thenAccept(replaced -> {
|
return writeCache.computeIfPresentAsync(key, new ReplaceFunction<>(oldVersionEntityFirst.getVersion(), newVersionEntityFirst), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS)
|
||||||
|
.thenAccept(returnValue -> {
|
||||||
int iteration = 0;
|
int iteration = 0;
|
||||||
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
SessionEntityWrapper<V> newVersionEntity = newVersionEntityFirst;
|
||||||
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
SessionEntityWrapper<V> oldVersion = oldVersionEntityFirst;
|
||||||
iteration++;
|
|
||||||
|
|
||||||
V session = oldVersion.getEntity();
|
while (true) {
|
||||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
if (returnValue == null) {
|
||||||
|
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Atomic cluster-aware replace
|
if (returnValue.getVersion().equals(newVersionEntity.getVersion())){
|
||||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
|
|
||||||
if (!replaced) {
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
|
||||||
}
|
|
||||||
backoff(iteration);
|
|
||||||
|
|
||||||
oldVersion = cache.get(key);
|
|
||||||
|
|
||||||
if (oldVersion == null) {
|
|
||||||
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
session = oldVersion.getEntity();
|
|
||||||
|
|
||||||
task.runUpdate(session);
|
|
||||||
} else {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!replaced) {
|
return;
|
||||||
LOG.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
|
} else {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.tracef("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++iteration >= InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||||
|
LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
oldVersion = returnValue;
|
||||||
|
V session = oldVersion.getEntity();
|
||||||
|
task.runUpdate(session);
|
||||||
|
newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
||||||
|
|
||||||
|
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt.
|
|
||||||
*/
|
|
||||||
private static void backoff(int iteration) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(new Random().nextInt(iteration));
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
|
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
|
||||||
return new SessionEntityWrapper<>(localMetadata, entity);
|
return new SessionEntityWrapper<>(localMetadata, entity);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue