Retry remote cache operations with back off

Implement a retry mechanism for remote cache writes.

Fixes #32030

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-08-07 19:23:56 +01:00 committed by Alexander Schwartz
parent a38d3b2f55
commit e13c9bf462
20 changed files with 656 additions and 229 deletions

View file

@ -19,7 +19,6 @@ package org.keycloak.connections.infinispan;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.infinispan.Cache; import org.infinispan.Cache;
@ -71,7 +70,7 @@ public class DefaultInfinispanConnectionProvider implements InfinispanConnection
} }
@Override @Override
public CompletionStage<Void> migrateToProtostream() { public CompletionStage<Void> migrateToProtoStream() {
// Only the CacheStore (persistence) stores data in binary format and needs to be deleted. // Only the CacheStore (persistence) stores data in binary format and needs to be deleted.
// We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster. // We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster.
var stage = CompletionStages.aggregateCompletionStage(); var stage = CompletionStages.aggregateCompletionStage();
@ -83,17 +82,17 @@ public class DefaultInfinispanConnectionProvider implements InfinispanConnection
return stage.freeze(); return stage.freeze();
} }
@Override
public Executor getExecutor(String name) {
return GlobalComponentRegistry.componentOf(cacheManager, BlockingManager.class).asExecutor(name);
}
@Override @Override
public ScheduledExecutorService getScheduledExecutor() { public ScheduledExecutorService getScheduledExecutor() {
//noinspection removal //noinspection removal
return GlobalComponentRegistry.of(cacheManager).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR); return GlobalComponentRegistry.of(cacheManager).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR);
} }
@Override
public BlockingManager getBlockingManager() {
return GlobalComponentRegistry.componentOf(cacheManager, BlockingManager.class);
}
@Override @Override
public void close() { public void close() {
} }

View file

@ -25,6 +25,7 @@ import java.util.stream.Stream;
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.util.concurrent.BlockingManager;
import org.keycloak.models.KeycloakSessionFactory; import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.Provider; import org.keycloak.provider.Provider;
@ -131,11 +132,11 @@ public interface InfinispanConnectionProvider extends Provider {
TopologyInfo getTopologyInfo(); TopologyInfo getTopologyInfo();
/** /**
* Migrates the JBoss Marshalling encoding to Infinispan Protostream * Migrates the JBoss Marshalling encoding to Infinispan ProtoStream
* *
* @return A {@link CompletionStage} to signal when the operator is completed. * @return A {@link CompletionStage} to signal when the operator is completed.
*/ */
CompletionStage<Void> migrateToProtostream(); CompletionStage<Void> migrateToProtoStream();
/** /**
* Returns an executor that will run the given tasks on a blocking thread as required. * Returns an executor that will run the given tasks on a blocking thread as required.
@ -146,7 +147,9 @@ public interface InfinispanConnectionProvider extends Provider {
* @param name The name for trace logging purpose. * @param name The name for trace logging purpose.
* @return The Infinispan blocking {@link Executor}. * @return The Infinispan blocking {@link Executor}.
*/ */
Executor getExecutor(String name); default Executor getExecutor(String name) {
return getBlockingManager().asExecutor(name);
}
/** /**
* @return The Infinispan {@link ScheduledExecutorService}. Long or blocking operations must not be executed directly. * @return The Infinispan {@link ScheduledExecutorService}. Long or blocking operations must not be executed directly.
@ -164,4 +167,14 @@ public interface InfinispanConnectionProvider extends Provider {
} }
} }
/**
* Returns the Infinispan {@link BlockingManager}.
* <p>
* The {@link BlockingManager} should be used to execute blocking operation like disk I/O. It offloads the task to
* the Infinispan blocking thread pool.
*
* @return The Infinispan {@link BlockingManager}.
*/
BlockingManager getBlockingManager();
} }

View file

@ -20,7 +20,6 @@ package org.keycloak.connections.infinispan.remote;
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.infinispan.Cache; import org.infinispan.Cache;
@ -60,7 +59,7 @@ public record RemoteInfinispanConnectionProvider(EmbeddedCacheManager embeddedCa
} }
@Override @Override
public CompletionStage<Void> migrateToProtostream() { public CompletionStage<Void> migrateToProtoStream() {
// Only the CacheStore (persistence) stores data in binary format and needs to be deleted. // Only the CacheStore (persistence) stores data in binary format and needs to be deleted.
// We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster. // We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster.
var stage = CompletionStages.aggregateCompletionStage(); var stage = CompletionStages.aggregateCompletionStage();
@ -71,17 +70,17 @@ public record RemoteInfinispanConnectionProvider(EmbeddedCacheManager embeddedCa
return stage.freeze(); return stage.freeze();
} }
@Override
public Executor getExecutor(String name) {
return GlobalComponentRegistry.componentOf(embeddedCacheManager, BlockingManager.class).asExecutor(name);
}
@Override @Override
public ScheduledExecutorService getScheduledExecutor() { public ScheduledExecutorService getScheduledExecutor() {
//noinspection removal //noinspection removal
return GlobalComponentRegistry.of(embeddedCacheManager).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR); return GlobalComponentRegistry.of(embeddedCacheManager).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR);
} }
@Override
public BlockingManager getBlockingManager() {
return GlobalComponentRegistry.componentOf(embeddedCacheManager, BlockingManager.class);
}
@Override @Override
public void close() { public void close() {
//no-op //no-op

View file

@ -17,8 +17,12 @@
package org.keycloak.infinispan.util; package org.keycloak.infinispan.util;
import java.util.Map;
import org.keycloak.Config;
import org.keycloak.common.Profile; import org.keycloak.common.Profile;
import org.keycloak.common.util.MultiSiteUtils; import org.keycloak.common.util.MultiSiteUtils;
import org.keycloak.provider.ProviderConfigurationBuilder;
import static org.keycloak.common.Profile.Feature.REMOTE_CACHE; import static org.keycloak.common.Profile.Feature.REMOTE_CACHE;
@ -45,4 +49,48 @@ public final class InfinispanUtils {
public static boolean isEmbeddedInfinispan() { public static boolean isEmbeddedInfinispan() {
return !isRemoteInfinispan(); return !isRemoteInfinispan();
} }
// ---- Retries on Error - Exponential Back Off ----
// max number of retries on error.
public static final int DEFAULT_MAX_RETRIES = 10;
private static final String CONFIG_MAX_RETRIES = "maxRetries";
// the base back-off time in milliseconds
public static final int DEFAULT_RETRIES_BASE_TIME_MILLIS = 10;
private static final String CONFIG_RETRIES_BASE_TIME_MILLIS = "retryBaseTime";
public static void configureMaxRetries(ProviderConfigurationBuilder builder) {
builder.property()
.name(CONFIG_MAX_RETRIES)
.type("int")
.helpText("The maximum number of retries if an error occurs. A value of zero or less disable any retries.")
.defaultValue(DEFAULT_MAX_RETRIES)
.add();
}
public static void configureRetryBaseTime(ProviderConfigurationBuilder builder) {
builder.property()
.name(CONFIG_RETRIES_BASE_TIME_MILLIS)
.type("int")
.helpText("The base back-off time in milliseconds.")
.defaultValue(DEFAULT_RETRIES_BASE_TIME_MILLIS)
.add();
}
public static int getMaxRetries(Config.Scope config) {
return Math.max(0, config.getInt(CONFIG_MAX_RETRIES, DEFAULT_MAX_RETRIES));
}
public static int getRetryBaseTimeMillis(Config.Scope config) {
return Math.max(1, config.getInt(CONFIG_RETRIES_BASE_TIME_MILLIS, DEFAULT_RETRIES_BASE_TIME_MILLIS));
}
public static void maxRetriesToOperationalInfo(Map<String, String> map, int value) {
map.put(CONFIG_MAX_RETRIES, Integer.toString(value));
}
public static void retryBaseTimeMillisToOperationalInfo(Map<String, String> map, int value) {
map.put(CONFIG_RETRIES_BASE_TIME_MILLIS, Integer.toString(value));
}
} }

View file

@ -253,7 +253,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
// Unable to read the cached data. // Unable to read the cached data.
if ("26.0.0".equals(modelVersion)) { if ("26.0.0".equals(modelVersion)) {
log.debug("Clear caches to migrate to Infinispan Protostream"); log.debug("Clear caches to migrate to Infinispan Protostream");
CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtostream()); CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtoStream());
} }
} }

View file

@ -976,7 +976,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi
// Unable to read the cached data. // Unable to read the cached data.
if ("26.0.0".equals(modelVersion)) { if ("26.0.0".equals(modelVersion)) {
log.debug("Clear caches to migrate to Infinispan Protostream"); log.debug("Clear caches to migrate to Infinispan Protostream");
CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtostream()); CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtoStream());
} }
} }

View file

@ -1,56 +0,0 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;
import java.util.ArrayList;
import java.util.List;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
/**
* A {@link ConditionalRemover} implementation to remove {@link SessionEntity} from a {@link RemoteCache} based on
* {@link SessionEntity#getRealmId()} value.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public class ByRealmIdConditionalRemover<K, V extends SessionEntity> extends IterationBasedConditionalRemover<K, V> {
private final List<String> realms;
public ByRealmIdConditionalRemover() {
realms = new ArrayList<>();
}
public void removeByRealmId(String realmId) {
realms.add(realmId);
}
@Override
boolean isEmpty() {
return realms.isEmpty();
}
@Override
public boolean willRemove(K key, V value) {
return realms.contains(value.getRealmId());
}
}

View file

@ -1,65 +0,0 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;
import java.util.Map;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Predicate;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
/**
* An iteration based implementation of {@link ConditionalRemover}.
* <p>
* This class is not performance efficient since it has to download the full {@link RemoteCache} content to perform the
* removal tests.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
abstract class IterationBasedConditionalRemover<K, V> implements ConditionalRemover<K, V>, Predicate<Map.Entry<K, MetadataValue<V>>> {
@Override
public final void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage) {
if (isEmpty()) {
return;
}
var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
.filter(this)
.map(Map.Entry::getKey)
.flatMapCompletable(key -> Completable.fromCompletionStage(cache.removeAsync(key)))
.toCompletionStage(null);
stage.dependsOn(rmStage);
}
@Override
public final boolean test(Map.Entry<K, MetadataValue<V>> entry) throws Throwable {
return willRemove(entry.getKey(), entry.getValue().getValue());
}
/**
* @return {@code true} if this implementation won't remove anything. It avoids iterating over the
* {@link RemoteCache} contents.
*/
abstract boolean isEmpty();
}

View file

@ -69,12 +69,19 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater<ClientSession
} }
/** /**
* @param offline If {@code true}, it creates offline {@link AuthenticatedClientSessionModel}. * @return The {@link UpdaterFactory} implementation to create online session instances of
* @return The {@link UpdaterFactory} implementation to create instances of
* {@link AuthenticatedClientSessionUpdater}. * {@link AuthenticatedClientSessionUpdater}.
*/ */
public static UpdaterFactory<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> factory(boolean offline) { public static UpdaterFactory<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> onlineFactory() {
return offline ? OFFLINE : ONLINE; return ONLINE;
}
/**
* @return The {@link UpdaterFactory} implementation to create offline session instances of
* {@link AuthenticatedClientSessionUpdater}.
*/
public static UpdaterFactory<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> offlineFactory() {
return OFFLINE;
} }
@Override @Override

View file

@ -52,11 +52,19 @@ public class UserSessionUpdater extends BaseUpdater<String, RemoteUserSessionEnt
} }
/** /**
* @param offline If {@code true}, it creates offline {@link UserSessionModel}. * @return The {@link UpdaterFactory} implementation to create online sessions instances of
* @return The {@link UpdaterFactory} implementation to create instances of {@link UserSessionModel}. * {@link UserSessionModel}.
*/ */
public static UpdaterFactory<String, RemoteUserSessionEntity, UserSessionUpdater> factory(boolean offline) { public static UpdaterFactory<String, RemoteUserSessionEntity, UserSessionUpdater> onlineFactory() {
return offline ? OFFLINE : ONLINE; return ONLINE;
}
/**
* @return The {@link UpdaterFactory} implementation to create offline sessions instances of
* {@link UserSessionModel}.
*/
public static UpdaterFactory<String, RemoteUserSessionEntity, UserSessionUpdater> offlineFactory() {
return OFFLINE;
} }
@Override @Override

View file

@ -17,10 +17,15 @@
package org.keycloak.models.sessions.infinispan.remote; package org.keycloak.models.sessions.infinispan.remote;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.util.concurrent.BlockingManager;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.keycloak.Config; import org.keycloak.Config;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.infinispan.util.InfinispanUtils; import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.marshalling.Marshalling; import org.keycloak.marshalling.Marshalling;
import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSession;
@ -34,17 +39,23 @@ import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailu
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity; import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.provider.ServerInfoAwareProviderFactory;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.getRemoteCache;
public class RemoteUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory<RemoteUserLoginFailureProvider>, UpdaterFactory<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater>, EnvironmentDependentProviderFactory { public class RemoteUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory<RemoteUserLoginFailureProvider>, UpdaterFactory<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater>, EnvironmentDependentProviderFactory, RemoteChangeLogTransaction.SharedState<LoginFailureKey, LoginFailureEntity>, ServerInfoAwareProviderFactory {
private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass());
public static final String PROTO_ENTITY = Marshalling.protoEntity(LoginFailureEntity.class); public static final String PROTO_ENTITY = Marshalling.protoEntity(LoginFailureEntity.class);
private volatile RemoteCache<LoginFailureKey, LoginFailureEntity> cache; private volatile RemoteCache<LoginFailureKey, LoginFailureEntity> cache;
private volatile BlockingManager blockingManager;
private volatile int maxRetries = InfinispanUtils.DEFAULT_MAX_RETRIES;
private volatile int backOffBaseTimeMillis = InfinispanUtils.DEFAULT_RETRIES_BASE_TIME_MILLIS;
@Override @Override
public RemoteUserLoginFailureProvider create(KeycloakSession session) { public RemoteUserLoginFailureProvider create(KeycloakSession session) {
@ -53,11 +64,17 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
@Override @Override
public void init(Config.Scope config) { public void init(Config.Scope config) {
maxRetries = InfinispanUtils.getMaxRetries(config);
backOffBaseTimeMillis = InfinispanUtils.getRetryBaseTimeMillis(config);
} }
@Override @Override
public void postInit(final KeycloakSessionFactory factory) { public void postInit(final KeycloakSessionFactory factory) {
cache = getRemoteCache(factory, LOGIN_FAILURE_CACHE_NAME); try (var session = factory.create()) {
var provider = session.getProvider(InfinispanConnectionProvider.class);
cache = provider.getRemoteCache(LOGIN_FAILURE_CACHE_NAME);
blockingManager = provider.getBlockingManager();
}
factory.register(event -> { factory.register(event -> {
if (event instanceof UserModel.UserRemovedEvent userRemovedEvent) { if (event instanceof UserModel.UserRemovedEvent userRemovedEvent) {
UserLoginFailureProvider provider = userRemovedEvent.getKeycloakSession().getProvider(UserLoginFailureProvider.class, getId()); UserLoginFailureProvider provider = userRemovedEvent.getKeycloakSession().getProvider(UserLoginFailureProvider.class, getId());
@ -87,6 +104,22 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
return InfinispanUtils.isRemoteInfinispan(); return InfinispanUtils.isRemoteInfinispan();
} }
@Override
public List<ProviderConfigProperty> getConfigMetadata() {
ProviderConfigurationBuilder builder = ProviderConfigurationBuilder.create();
InfinispanUtils.configureMaxRetries(builder);
InfinispanUtils.configureRetryBaseTime(builder);
return builder.build();
}
@Override
public Map<String, String> getOperationalInfo() {
Map<String, String> map = new HashMap<>();
InfinispanUtils.maxRetriesToOperationalInfo(map, maxRetries);
InfinispanUtils.retryBaseTimeMillisToOperationalInfo(map, backOffBaseTimeMillis);
return map;
}
@Override @Override
public LoginFailuresUpdater create(LoginFailureKey key, LoginFailureEntity entity) { public LoginFailuresUpdater create(LoginFailureKey key, LoginFailureEntity entity) {
return LoginFailuresUpdater.create(key, entity); return LoginFailuresUpdater.create(key, entity);
@ -102,8 +135,32 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
return LoginFailuresUpdater.delete(key); return LoginFailuresUpdater.delete(key);
} }
@Override
public RemoteCache<LoginFailureKey, LoginFailureEntity> cache() {
return cache;
}
@Override
public int maxRetries() {
return maxRetries;
}
@Override
public int backOffBaseTimeMillis() {
return backOffBaseTimeMillis;
}
@Override
public BlockingManager blockingManager() {
return blockingManager;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = Math.max(0, maxRetries);
}
private LoginFailureChangeLogTransaction createAndEnlistTransaction(KeycloakSession session) { private LoginFailureChangeLogTransaction createAndEnlistTransaction(KeycloakSession session) {
var tx = new LoginFailureChangeLogTransaction(this, cache, new ByRealmIdQueryConditionalRemover<>(PROTO_ENTITY)); var tx = new LoginFailureChangeLogTransaction(this, this, new ByRealmIdQueryConditionalRemover<>(PROTO_ENTITY));
session.getTransactionManager().enlistAfterCompletion(tx); session.getTransactionManager().enlistAfterCompletion(tx);
return tx; return tx;
} }

View file

@ -58,7 +58,7 @@ import org.keycloak.models.sessions.infinispan.query.ClientSessionQueries;
import org.keycloak.models.sessions.infinispan.query.QueryHelper; import org.keycloak.models.sessions.infinispan.query.QueryHelper;
import org.keycloak.models.sessions.infinispan.query.UserSessionQueries; import org.keycloak.models.sessions.infinispan.query.UserSessionQueries;
import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction;
import org.keycloak.models.utils.KeycloakModelUtils; import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.utils.StreamsUtil; import org.keycloak.utils.StreamsUtil;
@ -360,7 +360,7 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
transaction.removeUserSessionById(userSession.getId(), offline); transaction.removeUserSessionById(userSession.getId(), offline);
} }
private UseSessionChangeLogTransaction getUserSessionTransaction(boolean offline) { private UserSessionChangeLogTransaction getUserSessionTransaction(boolean offline) {
return transaction.getUserSessions(offline); return transaction.getUserSessions(offline);
} }

View file

@ -1,9 +1,11 @@
package org.keycloak.models.sessions.infinispan.remote; package org.keycloak.models.sessions.infinispan.remote;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.Map;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.util.concurrent.BlockingManager;
import org.keycloak.Config; import org.keycloak.Config;
import org.keycloak.common.util.MultiSiteUtils; import org.keycloak.common.util.MultiSiteUtils;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
@ -20,22 +22,30 @@ import org.keycloak.models.sessions.infinispan.entities.ClientSessionKey;
import org.keycloak.models.sessions.infinispan.entities.RemoteAuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.RemoteAuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity; import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty; import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder; import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.provider.ProviderEvent; import org.keycloak.provider.ProviderEvent;
import org.keycloak.provider.ProviderEventListener; import org.keycloak.provider.ProviderEventListener;
import org.keycloak.provider.ServerInfoAwareProviderFactory;
public class RemoteUserSessionProviderFactory implements UserSessionProviderFactory<RemoteUserSessionProvider>, EnvironmentDependentProviderFactory, ProviderEventListener { public class RemoteUserSessionProviderFactory implements UserSessionProviderFactory<RemoteUserSessionProvider>, EnvironmentDependentProviderFactory, ProviderEventListener, ServerInfoAwareProviderFactory {
// Sessions are close to 1KB of data. Fetch 1MB per batch request (can be configured) // Sessions are close to 1KB of data. Fetch 1MB per batch request (can be configured)
private static final int DEFAULT_BATCH_SIZE = 1024; private static final int DEFAULT_BATCH_SIZE = 1024;
private static final String CONFIG_MAX_BATCH_SIZE = "batchSize"; private static final String CONFIG_MAX_BATCH_SIZE = "batchSize";
private volatile RemoteCacheHolder remoteCacheHolder; private volatile SharedStateImpl<String, RemoteUserSessionEntity> userSessionState;
private volatile SharedStateImpl<String, RemoteUserSessionEntity> offlineUserSessionState;
private volatile SharedStateImpl<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> clientSessionState;
private volatile SharedStateImpl<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> offlineClientSessionState;
private volatile BlockingManager blockingManager;
private volatile int batchSize = DEFAULT_BATCH_SIZE; private volatile int batchSize = DEFAULT_BATCH_SIZE;
private volatile int maxRetries = InfinispanUtils.DEFAULT_MAX_RETRIES;
private volatile int backOffBaseTimeMillis = InfinispanUtils.DEFAULT_RETRIES_BASE_TIME_MILLIS;
@Override @Override
public RemoteUserSessionProvider create(KeycloakSession session) { public RemoteUserSessionProvider create(KeycloakSession session) {
@ -47,6 +57,8 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
@Override @Override
public void init(Config.Scope config) { public void init(Config.Scope config) {
batchSize = Math.max(1, config.getInt(CONFIG_MAX_BATCH_SIZE, DEFAULT_BATCH_SIZE)); batchSize = Math.max(1, config.getInt(CONFIG_MAX_BATCH_SIZE, DEFAULT_BATCH_SIZE));
maxRetries = InfinispanUtils.getMaxRetries(config);
backOffBaseTimeMillis = InfinispanUtils.getRetryBaseTimeMillis(config);
} }
@Override @Override
@ -55,12 +67,15 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
lazyInit(session); lazyInit(session);
} }
factory.register(this); factory.register(this);
} }
@Override @Override
public void close() { public void close() {
remoteCacheHolder = null; blockingManager = null;
userSessionState = null;
offlineUserSessionState = null;
clientSessionState = null;
offlineClientSessionState = null;
} }
@Override @Override
@ -82,9 +97,22 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
.helpText("Batch size when streaming session from the remote cache") .helpText("Batch size when streaming session from the remote cache")
.defaultValue(DEFAULT_BATCH_SIZE) .defaultValue(DEFAULT_BATCH_SIZE)
.add(); .add();
InfinispanUtils.configureMaxRetries(builder);
InfinispanUtils.configureRetryBaseTime(builder);
return builder.build(); return builder.build();
} }
@Override
public Map<String, String> getOperationalInfo() {
Map<String, String> map = new HashMap<>();
map.put(CONFIG_MAX_BATCH_SIZE, Integer.toString(batchSize));
InfinispanUtils.maxRetriesToOperationalInfo(map, maxRetries);
InfinispanUtils.retryBaseTimeMillisToOperationalInfo(map, backOffBaseTimeMillis);
return map;
}
@Override @Override
public void onEvent(ProviderEvent event) { public void onEvent(ProviderEvent event) {
if (event instanceof UserModel.UserRemovedEvent ure) { if (event instanceof UserModel.UserRemovedEvent ure) {
@ -98,49 +126,53 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
} }
private void lazyInit(KeycloakSession session) { private void lazyInit(KeycloakSession session) {
if (remoteCacheHolder != null) { if (blockingManager != null) {
return; return;
} }
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class); var connections = session.getProvider(InfinispanConnectionProvider.class);
RemoteCache<String, RemoteUserSessionEntity> userSessionCache = connections.getRemoteCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); userSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME));
RemoteCache<String, RemoteUserSessionEntity> offlineUserSessionsCache = connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME); offlineUserSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME));
RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> clientSessionCache = connections.getRemoteCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME); clientSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME));
RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> offlineClientSessionsCache = connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME); offlineClientSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME));
var executor = connections.getExecutor("query-delete"); blockingManager = connections.getBlockingManager();
remoteCacheHolder = new RemoteCacheHolder(userSessionCache, offlineUserSessionsCache, clientSessionCache, offlineClientSessionsCache, executor);
} }
private UserSessionTransaction createTransaction(KeycloakSession session) { private UserSessionTransaction createTransaction(KeycloakSession session) {
lazyInit(session); lazyInit(session);
return new UserSessionTransaction( return new UserSessionTransaction(
createUserSessionTransaction(false), new UserSessionChangeLogTransaction(UserSessionUpdater.onlineFactory(), userSessionState),
createUserSessionTransaction(true), new UserSessionChangeLogTransaction(UserSessionUpdater.offlineFactory(), offlineUserSessionState),
createClientSessionTransaction(false), new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.onlineFactory(), clientSessionState),
createClientSessionTransaction(true) new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.offlineFactory(), offlineClientSessionState)
); );
} }
private UseSessionChangeLogTransaction createUserSessionTransaction(boolean offline) { private class SharedStateImpl<K, V> implements RemoteChangeLogTransaction.SharedState<K, V> {
return new UseSessionChangeLogTransaction(UserSessionUpdater.factory(offline), remoteCacheHolder.userSessionCache(offline));
private final RemoteCache<K, V> cache;
private SharedStateImpl(RemoteCache<K, V> cache) {
this.cache = cache;
} }
private ClientSessionChangeLogTransaction createClientSessionTransaction(boolean offline) { @Override
return new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.factory(offline), remoteCacheHolder.clientSessionCache(offline)); public RemoteCache<K, V> cache() {
return cache;
} }
private record RemoteCacheHolder( @Override
RemoteCache<String, RemoteUserSessionEntity> userSession, public int maxRetries() {
RemoteCache<String, RemoteUserSessionEntity> offlineUserSession, return maxRetries;
RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> clientSession,
RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> offlineClientSession,
Executor executor) {
RemoteCache<String, RemoteUserSessionEntity> userSessionCache(boolean offline) {
return offline ? offlineUserSession : userSession;
} }
RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> clientSessionCache(boolean offline) { @Override
return offline ? offlineClientSession : clientSession; public int backOffBaseTimeMillis() {
return backOffBaseTimeMillis;
}
@Override
public BlockingManager blockingManager() {
return blockingManager;
} }
} }
} }

View file

@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ClientSessionQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ClientSessionQueryConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
@ -34,8 +33,8 @@ import org.keycloak.models.sessions.infinispan.entities.RemoteAuthenticatedClien
*/ */
public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransaction<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater, ClientSessionQueryConditionalRemover> { public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransaction<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater, ClientSessionQueryConditionalRemover> {
public ClientSessionChangeLogTransaction(UpdaterFactory<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> factory, RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> cache) { public ClientSessionChangeLogTransaction(UpdaterFactory<ClientSessionKey, RemoteAuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> factory, SharedState<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> sharedState) {
super(factory, cache, new ClientSessionQueryConditionalRemover()); super(factory, sharedState, new ClientSessionQueryConditionalRemover());
} }
/** /**

View file

@ -17,7 +17,6 @@
package org.keycloak.models.sessions.infinispan.remote.transaction; package org.keycloak.models.sessions.infinispan.remote.transaction;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ByRealmIdQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ByRealmIdQueryConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
@ -31,8 +30,8 @@ import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
*/ */
public class LoginFailureChangeLogTransaction extends RemoteChangeLogTransaction<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater, ByRealmIdQueryConditionalRemover<LoginFailureKey, LoginFailureEntity>> { public class LoginFailureChangeLogTransaction extends RemoteChangeLogTransaction<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater, ByRealmIdQueryConditionalRemover<LoginFailureKey, LoginFailureEntity>> {
public LoginFailureChangeLogTransaction(UpdaterFactory<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater> factory, RemoteCache<LoginFailureKey, LoginFailureEntity> cache, ByRealmIdQueryConditionalRemover<LoginFailureKey, LoginFailureEntity> conditionalRemover) { public LoginFailureChangeLogTransaction(UpdaterFactory<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater> factory, SharedState<LoginFailureKey, LoginFailureEntity> sharedState, ByRealmIdQueryConditionalRemover<LoginFailureKey, LoginFailureEntity> conditionalRemover) {
super(factory, cache, conditionalRemover); super(factory, sharedState, conditionalRemover);
} }
public void removeByRealmId(String realmId) { public void removeByRealmId(String realmId) {

View file

@ -29,6 +29,8 @@ import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage; import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures; import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages; import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.BlockingManager;
import org.keycloak.common.util.Retry;
import org.keycloak.models.AbstractKeycloakTransaction; import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakTransaction; import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
@ -46,26 +48,30 @@ import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFac
*/ */
public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends ConditionalRemover<K, V>> extends AbstractKeycloakTransaction { public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends ConditionalRemover<K, V>> extends AbstractKeycloakTransaction {
private static final RetryOperationSuccess<?, ?, ?> TO_NULL = (ignored1, ignored2, ignored3) -> CompletableFutures.completedNull();
private final Map<K, T> entityChanges; private final Map<K, T> entityChanges;
private final UpdaterFactory<K, V, T> factory; private final UpdaterFactory<K, V, T> factory;
private final RemoteCache<K, V> cache;
private final R conditionalRemover; private final R conditionalRemover;
private final SharedState<K, V> sharedState;
RemoteChangeLogTransaction(UpdaterFactory<K, V, T> factory, RemoteCache<K, V> cache, R conditionalRemover) { RemoteChangeLogTransaction(UpdaterFactory<K, V, T> factory, SharedState<K, V> sharedState, R conditionalRemover) {
this.factory = Objects.requireNonNull(factory); this.factory = Objects.requireNonNull(factory);
this.cache = Objects.requireNonNull(cache);
this.conditionalRemover = Objects.requireNonNull(conditionalRemover); this.conditionalRemover = Objects.requireNonNull(conditionalRemover);
this.sharedState = Objects.requireNonNull(sharedState);
entityChanges = new ConcurrentHashMap<>(8); entityChanges = new ConcurrentHashMap<>(8);
} }
@Override @Override
protected void commitImpl() { protected void commitImpl() {
try {
var stage = CompletionStages.aggregateCompletionStage(); var stage = CompletionStages.aggregateCompletionStage();
doCommit(stage); doCommit(stage);
CompletionStages.join(stage.freeze()); CompletionStages.join(stage.freeze());
} finally {
entityChanges.clear(); entityChanges.clear();
} }
}
@Override @Override
protected void rollbackImpl() { protected void rollbackImpl() {
@ -83,30 +89,30 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
} }
private void doCommit(AggregateCompletionStage<Void> stage) { private void doCommit(AggregateCompletionStage<Void> stage) {
conditionalRemover.executeRemovals(cache, stage); conditionalRemover.executeRemovals(getCache(), stage);
for (var updater : entityChanges.values()) { for (var updater : entityChanges.values()) {
if (updater.isReadOnly() || updater.isTransient() || conditionalRemover.willRemove(updater)) { if (updater.isReadOnly() || updater.isTransient() || conditionalRemover.willRemove(updater)) {
continue; continue;
} }
if (updater.isDeleted()) { if (updater.isDeleted()) {
stage.dependsOn(cache.removeAsync(updater.getKey())); stage.dependsOn(commitRemove(updater));
continue; continue;
} }
var expiration = updater.computeExpiration(); var expiration = updater.computeExpiration();
if (expiration.isExpired()) { if (expiration.isExpired()) {
stage.dependsOn(cache.removeAsync(updater.getKey())); stage.dependsOn(commitRemove(updater));
continue; continue;
} }
if (updater.isCreated()) { if (updater.isCreated()) {
stage.dependsOn(putIfAbsent(updater, expiration)); stage.dependsOn(commitPutIfAbsent(updater, expiration));
continue; continue;
} }
stage.dependsOn(replace(updater, expiration)); stage.dependsOn(commitReplace(updater, expiration));
} }
} }
@ -114,7 +120,7 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
* @return The {@link RemoteCache} tracked by the transaction. * @return The {@link RemoteCache} tracked by the transaction.
*/ */
public RemoteCache<K, V> getCache() { public RemoteCache<K, V> getCache() {
return cache; return sharedState.cache();
} }
/** /**
@ -130,7 +136,7 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
if (updater != null) { if (updater != null) {
return updater.isDeleted() ? null : updater; return updater.isDeleted() ? null : updater;
} }
return onEntityFromCache(key, cache.getWithMetadata(key)); return onEntityFromCache(key, getCache().getWithMetadata(key));
} }
/** /**
@ -144,7 +150,7 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
if (updater != null) { if (updater != null) {
return updater.isDeleted() ? CompletableFutures.completedNull() : CompletableFuture.completedFuture(updater); return updater.isDeleted() ? CompletableFutures.completedNull() : CompletableFuture.completedFuture(updater);
} }
return cache.getWithMetadataAsync(key).thenApply(e -> onEntityFromCache(key, e)); return getCache().getWithMetadataAsync(key).thenApply(e -> onEntityFromCache(key, e));
} }
/** /**
@ -199,24 +205,84 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
return updater.isDeleted() ? null : updater; return updater.isDeleted() ? null : updater;
} }
private CompletionStage<V> putIfAbsent(Updater<K, V> updater, Expiration expiration) { @SuppressWarnings("unchecked")
return cache.withFlags(Flag.FORCE_RETURN_VALUE) private CompletionStage<Void> commitRemove(Updater<K, V> updater) {
return executeWithRetries(this::invokeCacheRemove, (RetryOperationSuccess<V, K, V>) TO_NULL, updater, null, 0);
}
private CompletionStage<Void> commitPutIfAbsent(Updater<K, V> updater, Expiration expiration) {
return executeWithRetries(this::invokeCachePutIfAbsent, this::handleBooleanResult, updater, expiration, 0);
}
private CompletionStage<Void> commitReplace(Updater<K, V> updater, Expiration expiration) {
return executeWithRetries(this::invokeCacheReplace, this::handleBooleanResult, updater, expiration, 0);
}
@SuppressWarnings("unchecked")
private CompletionStage<Void> handleBooleanResult(boolean success, Updater<K, V> updater, Expiration expiration) {
return success ?
CompletableFutures.completedNull() :
executeWithRetries(this::invokeCacheCompute, (RetryOperationSuccess<V, K, V>) TO_NULL, updater, expiration, 0);
}
private CompletionStage<V> invokeCacheRemove(Updater<K, V> updater, Expiration ignored) {
return getCache().removeAsync(updater.getKey());
}
private CompletionStage<Boolean> invokeCachePutIfAbsent(Updater<K, V> updater, Expiration expiration) {
return getCache().withFlags(Flag.FORCE_RETURN_VALUE)
.putIfAbsentAsync(updater.getKey(), updater.getValue(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS) .putIfAbsentAsync(updater.getKey(), updater.getValue(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS)
.thenApply(Objects::isNull) .thenApply(Objects::isNull);
.thenCompose(completed -> handleResponse(completed, updater, expiration));
} }
private CompletionStage<V> replace(Updater<K, V> updater, Expiration expiration) { private CompletionStage<Boolean> invokeCacheReplace(Updater<K, V> updater, Expiration expiration) {
return cache.replaceWithVersionAsync(updater.getKey(), updater.getValue(), updater.getVersionRead(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS) return getCache().replaceWithVersionAsync(updater.getKey(), updater.getValue(), updater.getVersionRead(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS);
.thenCompose(completed -> handleResponse(completed, updater, expiration));
} }
private CompletionStage<V> handleResponse(boolean completed, Updater<K, V> updater, Expiration expiration) { private CompletionStage<V> invokeCacheCompute(Updater<K, V> updater, Expiration expiration) {
return completed ? CompletableFutures.completedNull() : merge(updater, expiration); return getCache().computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS);
} }
private CompletionStage<V> merge(Updater<K, V> updater, Expiration expiration) { private <OR> CompletionStage<Void> executeWithRetries(RetryOperation<OR, K, V> operation, RetryOperationSuccess<OR, K, V> onSuccessAction, Updater<K, V> updater, Expiration expiration, int retry) {
return cache.computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS); return operation.execute(updater, expiration)
.handle((result, throwable) -> handleOperationResult(result, throwable, operation, onSuccessAction, updater, expiration, retry))
.thenCompose(CompletableFutures.identity());
} }
private <OR> CompletionStage<Void> handleOperationResult(OR result, Throwable throwable, RetryOperation<OR, K, V> operation, RetryOperationSuccess<OR, K, V> onSuccessAction, Updater<K, V> updater, Expiration expiration, int retry) {
if (throwable == null) {
return onSuccessAction.onSuccess(result, updater, expiration);
}
if (retry >= sharedState.maxRetries()) {
return CompletableFuture.failedFuture(CompletableFutures.extractException(throwable));
}
return backOffAndExecuteWithRetries(operation, onSuccessAction, updater, expiration, retry + 1);
}
private <OR> CompletionStage<Void> backOffAndExecuteWithRetries(RetryOperation<OR, K, V> operation, RetryOperationSuccess<OR, K, V> onSuccessAction, Updater<K, V> updater, Expiration expiration, int retry) {
var delayMillis = Retry.computeBackoffInterval(sharedState.backOffBaseTimeMillis(), retry);
return sharedState.blockingManager().scheduleRunBlocking(
() -> executeWithRetries(operation, onSuccessAction, updater, expiration, retry),
delayMillis, TimeUnit.MILLISECONDS, "retry-" + updater)
.thenCompose(CompletableFutures.identity());
}
private interface RetryOperation<R, K, V> {
CompletionStage<R> execute(Updater<K, V> updater, Expiration expiration);
}
private interface RetryOperationSuccess<R, K, V> {
CompletionStage<Void> onSuccess(R result, Updater<K, V> updater, Expiration expiration);
}
// Attempt to minimize class size. Each request creates a new instance of this class, and the shared state can be shared among those instances.
public interface SharedState<K, V> {
RemoteCache<K, V> cache();
int maxRetries();
int backOffBaseTimeMillis();
BlockingManager blockingManager();
}
} }

View file

@ -18,7 +18,6 @@
package org.keycloak.models.sessions.infinispan.remote.transaction; package org.keycloak.models.sessions.infinispan.remote.transaction;
import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Maybe;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.UserSessionQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.UserSessionQueryConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
@ -29,10 +28,10 @@ import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity;
* {@code RemoteChangeLogTransaction<SessionKey, UserSessionEntity, UserSessionUpdater, * {@code RemoteChangeLogTransaction<SessionKey, UserSessionEntity, UserSessionUpdater,
* UserAndClientSessionConditionalRemover<UserSessionEntity>>} * UserAndClientSessionConditionalRemover<UserSessionEntity>>}
*/ */
public class UseSessionChangeLogTransaction extends RemoteChangeLogTransaction<String, RemoteUserSessionEntity, UserSessionUpdater, UserSessionQueryConditionalRemover> { public class UserSessionChangeLogTransaction extends RemoteChangeLogTransaction<String, RemoteUserSessionEntity, UserSessionUpdater, UserSessionQueryConditionalRemover> {
public UseSessionChangeLogTransaction(UpdaterFactory<String, RemoteUserSessionEntity, UserSessionUpdater> factory, RemoteCache<String, RemoteUserSessionEntity> cache) { public UserSessionChangeLogTransaction(UpdaterFactory<String, RemoteUserSessionEntity, UserSessionUpdater> factory, SharedState<String, RemoteUserSessionEntity> sharedState) {
super(factory, cache, new UserSessionQueryConditionalRemover()); super(factory, sharedState, new UserSessionQueryConditionalRemover());
} }
public UserSessionUpdater wrapFromProjection(Object[] projection) { public UserSessionUpdater wrapFromProjection(Object[] projection) {

View file

@ -29,12 +29,12 @@ import org.keycloak.models.KeycloakTransaction;
*/ */
public class UserSessionTransaction extends AbstractKeycloakTransaction { public class UserSessionTransaction extends AbstractKeycloakTransaction {
private final UseSessionChangeLogTransaction userSessions; private final UserSessionChangeLogTransaction userSessions;
private final ClientSessionChangeLogTransaction clientSessions; private final ClientSessionChangeLogTransaction clientSessions;
private final UseSessionChangeLogTransaction offlineUserSessions; private final UserSessionChangeLogTransaction offlineUserSessions;
private final ClientSessionChangeLogTransaction offlineClientSessions; private final ClientSessionChangeLogTransaction offlineClientSessions;
public UserSessionTransaction(UseSessionChangeLogTransaction userSessions, UseSessionChangeLogTransaction offlineUserSessions, ClientSessionChangeLogTransaction clientSessions, ClientSessionChangeLogTransaction offlineClientSessions) { public UserSessionTransaction(UserSessionChangeLogTransaction userSessions, UserSessionChangeLogTransaction offlineUserSessions, ClientSessionChangeLogTransaction clientSessions, ClientSessionChangeLogTransaction offlineClientSessions) {
this.userSessions = userSessions; this.userSessions = userSessions;
this.offlineUserSessions = offlineUserSessions; this.offlineUserSessions = offlineUserSessions;
this.clientSessions = clientSessions; this.clientSessions = clientSessions;
@ -73,7 +73,7 @@ public class UserSessionTransaction extends AbstractKeycloakTransaction {
return offline ? offlineClientSessions : clientSessions; return offline ? offlineClientSessions : clientSessions;
} }
public UseSessionChangeLogTransaction getUserSessions(boolean offline) { public UserSessionChangeLogTransaction getUserSessions(boolean offline) {
return offline ? offlineUserSessions : userSessions; return offline ? offlineUserSessions : userSessions;
} }

View file

@ -0,0 +1,188 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.model.infinispan;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadLocalRandom;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.Constants;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.RealmProvider;
import org.keycloak.models.UserLoginFailureProvider;
import org.keycloak.models.UserProvider;
import org.keycloak.models.sessions.infinispan.remote.RemoteUserLoginFailureProviderFactory;
import org.keycloak.provider.ProviderFactory;
import org.keycloak.testsuite.model.HotRodServerRule;
import org.keycloak.testsuite.model.KeycloakModelTest;
import org.keycloak.testsuite.model.RequireProvider;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
@RequireProvider(UserLoginFailureProvider.class)
@RequireProvider(UserProvider.class)
@RequireProvider(RealmProvider.class)
public class RetryAndBackOffTest extends KeycloakModelTest {
private static final int MAX_RETRIES = 5;
@ClassRule
public static final TestRule SKIPPED_PROFILES = (base, description) -> {
// we only want to test retries with the remote cache/multi-site
Assume.assumeTrue(InfinispanUtils.isRemoteInfinispan());
return base;
};
private String realmId;
private String userId;
private TimeOutInterceptor timeOutInterceptor;
@Override
public void createEnvironment(KeycloakSession session) {
RealmModel realm = createRealm(session, "retry-and-backoff-test");
session.getContext().setRealm(realm);
realm.setDefaultRole(session.roles().addRealmRole(realm, Constants.DEFAULT_ROLES_ROLE_PREFIX + "-" + realm.getName()));
realmId = realm.getId();
var user = session.users().addUser(realm, "retry-user");
user.setEmail("retry-user@localhost");
userId = user.getId();
timeOutInterceptor = injectInterceptor();
ProviderFactory<?> provider = session.getKeycloakSessionFactory().getProviderFactory(UserLoginFailureProvider.class);
assertTrue(provider instanceof RemoteUserLoginFailureProviderFactory);
((RemoteUserLoginFailureProviderFactory) provider).setMaxRetries(MAX_RETRIES);
}
@Override
public void cleanEnvironment(KeycloakSession s) {
RealmModel realm = s.realms().getRealm(realmId);
s.getContext().setRealm(realm);
s.realms().removeRealm(realmId);
}
private static TimeOutInterceptor injectInterceptor() {
var optRemote = getParameters(HotRodServerRule.class).findFirst();
assertTrue(optRemote.isPresent());
var cacheManager = optRemote.get().getHotRodCacheManager();
return TimeOutInterceptor.getOrInject(cacheManager.getCache(LOGIN_FAILURE_CACHE_NAME));
}
@Test
public void testRetryWithPut() {
timeOutInterceptor.timeoutPuts(randomTimeoutFailures());
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId);
loginFailures.incrementFailures();
});
timeOutInterceptor.assertPutTimedOutExhausted();
}
@Test
public void testRetryWithReplace() {
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId);
loginFailures.incrementFailures();
});
timeOutInterceptor.timeoutReplace(randomTimeoutFailures());
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
var loginFailures = session.loginFailures().getUserLoginFailure(realm, userId);
loginFailures.incrementFailures();
});
timeOutInterceptor.assertReplaceTimedOutExhausted();
}
@Test
public void testRetryWithRemove() {
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId);
loginFailures.incrementFailures();
});
timeOutInterceptor.timeoutRemove(randomTimeoutFailures());
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
session.loginFailures().removeUserLoginFailure(realm, userId);
});
timeOutInterceptor.assertRemoveTimedOutExhausted();
}
@Test
public void testRetryWithCompute() {
// compute is implemented with get() and replace()
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId);
loginFailures.incrementFailures();
});
// first replace returns false, it should switch to compute
timeOutInterceptor.replaceReturnsFalse(1);
timeOutInterceptor.timeoutReplace(randomTimeoutFailures());
inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
var loginFailures = session.loginFailures().getUserLoginFailure(realm, userId);
loginFailures.incrementFailures();
});
timeOutInterceptor.assertReplaceCheckFailExhausted();
timeOutInterceptor.assertReplaceTimedOutExhausted();
}
@Test
public void testExceptionThrown() {
timeOutInterceptor.timeoutPuts(MAX_RETRIES + 1);
try {
var ce = assertThrows(CompletionException.class,
() -> inComittedTransaction(session -> {
var realm = session.realms().getRealm(realmId);
session.loginFailures().addUserLoginFailure(realm, userId);
}));
assertTrue(String.valueOf(ce.getCause()), ce.getCause() instanceof HotRodClientException);
assertTrue(ce.getCause().getLocalizedMessage(), ce.getCause().getLocalizedMessage().contains(TimeOutInterceptor.MESSAGE));
} finally {
timeOutInterceptor.reset();
}
}
private static int randomTimeoutFailures() {
// At most, 4 failures so the test don't take a long time to run.
// Adds a little bit of randomness into the test.
return Math.max(1, ThreadLocalRandom.current().nextInt(MAX_RETRIES));
}
}

View file

@ -0,0 +1,134 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.model.infinispan;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commons.TimeoutException;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.BaseCustomAsyncInterceptor;
import static org.junit.Assert.assertTrue;
/**
* An Infinispan {@link AsyncInterceptor} that throws {@link TimeoutException} for several times before letting the
* operation continue normally.
*/
public class TimeOutInterceptor extends BaseCustomAsyncInterceptor {
public static final String MESSAGE = "Generated TimeOutException";
private final AtomicInteger putCount = new AtomicInteger(0);
private final AtomicInteger removeCount = new AtomicInteger(0);
private final AtomicInteger replaceCheckCount = new AtomicInteger(0);
private final AtomicInteger replaceCount = new AtomicInteger(0);
public static TimeOutInterceptor getOrInject(Cache<?, ?> cache) {
var interceptorChain = ComponentRegistry.componentOf(cache, AsyncInterceptorChain.class);
var existing = interceptorChain.findInterceptorWithClass(TimeOutInterceptor.class);
if (existing != null) {
return existing;
}
var interceptor = new TimeOutInterceptor();
interceptorChain.addInterceptor(interceptor, 0);
return interceptor;
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
timeOut(putCount);
return invokeNext(ctx, command);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) {
timeOut(removeCount);
return invokeNext(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) {
if (replaceCheckCount.decrementAndGet() >= 0) {
return false;
}
timeOut(replaceCount);
return invokeNext(ctx, command);
}
private static void timeOut(AtomicInteger counter) {
if (counter.decrementAndGet() >= 0) {
throw new TimeoutException(MESSAGE);
}
}
private static void setCount(AtomicInteger counter, int value) {
assertTrue("number of failures must be positive: %s".formatted(value), value > 0);
counter.set(value);
}
private static void assertCountExhausted(AtomicInteger counter, String operation) {
var pending = counter.get();
assertTrue("Operation '%s' still has %s pending time out(s)".formatted(operation, pending), pending <= 0);
}
public void timeoutPuts(int numberOfTimeOuts) {
setCount(putCount, numberOfTimeOuts);
}
public void timeoutRemove(int numberOfTimeOuts) {
setCount(putCount, numberOfTimeOuts);
}
public void timeoutReplace(int numberOfTimeOuts) {
setCount(replaceCount, numberOfTimeOuts);
}
public void replaceReturnsFalse(int numberOfCheckFails) {
setCount(replaceCheckCount, numberOfCheckFails);
}
public void assertPutTimedOutExhausted() {
assertCountExhausted(putCount, "put");
}
public void assertRemoveTimedOutExhausted() {
assertCountExhausted(removeCount, "remove");
}
public void assertReplaceTimedOutExhausted() {
assertCountExhausted(replaceCount, "replace");
}
public void assertReplaceCheckFailExhausted() {
assertCountExhausted(replaceCheckCount, "replace (check)");
}
public void reset() {
putCount.set(0);
removeCount.set(0);
replaceCount.set(0);
replaceCheckCount.set(0);
}
}