Use cache.compute() method to improve the replace retry loop
Closes #29073 Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
parent
ae1aaef96c
commit
17a700b6b9
5 changed files with 125 additions and 61 deletions
|
@ -17,6 +17,14 @@
|
|||
|
||||
package org.keycloak.connections.infinispan;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.infinispan.client.hotrod.ProtocolVersion;
|
||||
import org.infinispan.commons.dataconversion.MediaType;
|
||||
import org.infinispan.configuration.cache.CacheMode;
|
||||
|
@ -24,7 +32,6 @@ import org.infinispan.configuration.cache.Configuration;
|
|||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.eviction.EvictionStrategy;
|
||||
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
|
@ -37,6 +44,7 @@ import org.keycloak.cluster.ClusterEvent;
|
|||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.cluster.ManagedCacheManagerProvider;
|
||||
import org.keycloak.cluster.infinispan.KeycloakHotRodMarshallerFactory;
|
||||
import org.keycloak.marshalling.Marshalling;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.cache.infinispan.ClearCacheEvent;
|
||||
|
@ -47,14 +55,6 @@ import org.keycloak.models.utils.PostMigrationEvent;
|
|||
import org.keycloak.provider.InvalidationHandler.ObjectType;
|
||||
import org.keycloak.provider.ProviderEvent;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME;
|
||||
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_CACHE_NAME;
|
||||
|
@ -247,11 +247,7 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
|||
gcb.jmx().domain(InfinispanConnectionProvider.JMX_DOMAIN).enable();
|
||||
}
|
||||
|
||||
// For Infinispan 10, we go with the JBoss marshalling.
|
||||
// TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream.
|
||||
// See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details
|
||||
gcb.serialization().marshaller(new JBossUserMarshaller());
|
||||
|
||||
Marshalling.configure(gcb);
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
if (useKeycloakTimeService) {
|
||||
setTimeServiceToKeycloakTime(cacheManager);
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package org.keycloak.marshalling;
|
||||
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
|
||||
import org.keycloak.models.sessions.infinispan.changes.ReplaceFunction;
|
||||
|
||||
@SuppressWarnings("removal")
|
||||
public final class Marshalling {
|
||||
|
||||
private Marshalling() {
|
||||
}
|
||||
|
||||
// Note: Min ID is 2500
|
||||
public static final Integer REPLACE_FUNCTION_ID = 2500;
|
||||
|
||||
// For Infinispan 10, we go with the JBoss marshalling.
|
||||
// TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream.
|
||||
// See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details
|
||||
public static void configure(GlobalConfigurationBuilder builder) {
|
||||
builder.serialization()
|
||||
.marshaller(new JBossUserMarshaller())
|
||||
.addAdvancedExternalizer(ReplaceFunction.INSTANCE);
|
||||
}
|
||||
|
||||
}
|
|
@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan.changes;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
|
@ -239,60 +238,35 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
|
|||
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
||||
serializer.runSerialized(key, () -> {
|
||||
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
||||
boolean replaced = false;
|
||||
SessionEntityWrapper<V> returnValue = null;
|
||||
int iteration = 0;
|
||||
V session = oldVersion.getEntity();
|
||||
|
||||
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||
iteration++;
|
||||
|
||||
var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
|
||||
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
||||
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Atomic cluster-aware replace
|
||||
replaced = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).replace(key, oldVersion, newVersionEntity, lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||
if (returnValue == null) {
|
||||
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||
return;
|
||||
}
|
||||
|
||||
// Replace fail. Need to load latest entity from cache, apply updates again and try to replace in cache again
|
||||
if (!replaced) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debugf("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
||||
}
|
||||
backoff(iteration);
|
||||
|
||||
oldVersion = cache.get(key);
|
||||
|
||||
if (oldVersion == null) {
|
||||
logger.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||
return;
|
||||
}
|
||||
|
||||
session = oldVersion.getEntity();
|
||||
|
||||
task.runUpdate(session);
|
||||
} else {
|
||||
if (returnValue.getVersion().equals(newVersionEntity.getVersion())){
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
oldVersion = returnValue;
|
||||
session = oldVersion.getEntity();
|
||||
task.runUpdate(session);
|
||||
}
|
||||
|
||||
if (!replaced) {
|
||||
logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
|
||||
}
|
||||
logger.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait a random amount of time to avoid a conflict with other concurrent actors on the next attempt.
|
||||
*/
|
||||
private static void backoff(int iteration) {
|
||||
try {
|
||||
Thread.sleep(new Random().nextInt(iteration));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackImpl() {
|
||||
}
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
package org.keycloak.models.sessions.infinispan.changes;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectOutput;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.infinispan.commons.marshall.AdvancedExternalizer;
|
||||
import org.infinispan.commons.marshall.MarshallUtil;
|
||||
import org.keycloak.marshalling.Marshalling;
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
|
||||
/**
|
||||
* Performs an entity replacement in Infinispan, using its versions instead of equality.
|
||||
*
|
||||
* @param <K> The Infinispan key type.
|
||||
* @param <T> The Infinispan value type (Keycloak entity)
|
||||
*/
|
||||
public class ReplaceFunction<K, T extends SessionEntity> implements BiFunction<K, SessionEntityWrapper<T>, SessionEntityWrapper<T>> {
|
||||
|
||||
@SuppressWarnings({"removal", "rawtypes"})
|
||||
public static final AdvancedExternalizer<ReplaceFunction> INSTANCE = new Externalizer();
|
||||
private final UUID expectedVersion;
|
||||
private final SessionEntityWrapper<T> newValue;
|
||||
|
||||
public ReplaceFunction(UUID expectedVersion, SessionEntityWrapper<T> newValue) {
|
||||
this.expectedVersion = Objects.requireNonNull(expectedVersion);
|
||||
this.newValue = Objects.requireNonNull(newValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SessionEntityWrapper<T> apply(K key, SessionEntityWrapper<T> currentValue) {
|
||||
assert currentValue != null;
|
||||
return expectedVersion.equals(currentValue.getVersion()) ? newValue : currentValue;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"removal", "rawtypes"})
|
||||
private static class Externalizer implements AdvancedExternalizer<ReplaceFunction> {
|
||||
|
||||
private static final SessionEntityWrapper.ExternalizerImpl EXTERNALIZER = new SessionEntityWrapper.ExternalizerImpl();
|
||||
private static final byte VERSION_1 = 1;
|
||||
|
||||
@Override
|
||||
public Set<Class<? extends ReplaceFunction>> getTypeClasses() {
|
||||
return Set.of(ReplaceFunction.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getId() {
|
||||
return Marshalling.REPLACE_FUNCTION_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObject(ObjectOutput output, ReplaceFunction object) throws IOException {
|
||||
output.writeByte(VERSION_1);
|
||||
MarshallUtil.marshallUUID(object.expectedVersion, output, false);
|
||||
EXTERNALIZER.writeObject(output, object.newValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceFunction<?,?> readObject(ObjectInput input) throws IOException, ClassNotFoundException {
|
||||
var version = input.readByte();
|
||||
if (version != VERSION_1) {
|
||||
throw new IOException("Invalid version: " + version);
|
||||
}
|
||||
//noinspection unchecked
|
||||
return new ReplaceFunction<Object, SessionEntity>(MarshallUtil.unmarshallUUID(input, false), EXTERNALIZER.readObject(input));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -33,7 +33,6 @@ import org.infinispan.configuration.cache.PersistenceConfigurationBuilder;
|
|||
import org.infinispan.configuration.global.GlobalConfiguration;
|
||||
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
|
||||
import org.infinispan.configuration.parsing.ParserRegistry;
|
||||
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.metrics.config.MicrometerMeterRegisterConfigurationBuilder;
|
||||
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
|
||||
|
@ -47,6 +46,7 @@ import org.jgroups.util.TLSClientAuth;
|
|||
import org.keycloak.common.Profile;
|
||||
import org.keycloak.config.CachingOptions;
|
||||
import org.keycloak.config.MetricsOptions;
|
||||
import org.keycloak.marshalling.Marshalling;
|
||||
import org.keycloak.quarkus.runtime.configuration.Configuration;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
@ -134,11 +134,7 @@ public class CacheManagerFactory {
|
|||
builder.getNamedConfigurationBuilders().forEach((s, configurationBuilder) -> configurationBuilder.statistics().enabled(true));
|
||||
}
|
||||
|
||||
// For Infinispan 10, we go with the JBoss marshalling.
|
||||
// TODO: This should be replaced later with the marshalling recommended by infinispan. Probably protostream.
|
||||
// See https://infinispan.org/docs/stable/titles/developing/developing.html#marshalling for the details
|
||||
builder.getGlobalConfigurationBuilder().serialization().marshaller(new JBossUserMarshaller());
|
||||
|
||||
Marshalling.configure(builder.getGlobalConfigurationBuilder());
|
||||
return new DefaultCacheManager(builder, isStartEagerly());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue