diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProvider.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProvider.java index 1900565d5b..5634657aa1 100644 --- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/DefaultInfinispanConnectionProvider.java @@ -19,7 +19,6 @@ package org.keycloak.connections.infinispan; import java.util.Arrays; import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.infinispan.Cache; @@ -71,7 +70,7 @@ public class DefaultInfinispanConnectionProvider implements InfinispanConnection } @Override - public CompletionStage migrateToProtostream() { + public CompletionStage migrateToProtoStream() { // Only the CacheStore (persistence) stores data in binary format and needs to be deleted. // We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster. var stage = CompletionStages.aggregateCompletionStage(); @@ -83,17 +82,17 @@ public class DefaultInfinispanConnectionProvider implements InfinispanConnection return stage.freeze(); } - @Override - public Executor getExecutor(String name) { - return GlobalComponentRegistry.componentOf(cacheManager, BlockingManager.class).asExecutor(name); - } - @Override public ScheduledExecutorService getScheduledExecutor() { //noinspection removal return GlobalComponentRegistry.of(cacheManager).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR); } + @Override + public BlockingManager getBlockingManager() { + return GlobalComponentRegistry.componentOf(cacheManager, BlockingManager.class); + } + @Override public void close() { } diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java index 0bda460fd2..3a0c1da83e 100755 --- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/InfinispanConnectionProvider.java @@ -25,6 +25,7 @@ import java.util.stream.Stream; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.util.concurrent.BlockingManager; import org.keycloak.models.KeycloakSessionFactory; import org.keycloak.provider.Provider; @@ -131,11 +132,11 @@ public interface InfinispanConnectionProvider extends Provider { TopologyInfo getTopologyInfo(); /** - * Migrates the JBoss Marshalling encoding to Infinispan Protostream + * Migrates the JBoss Marshalling encoding to Infinispan ProtoStream * * @return A {@link CompletionStage} to signal when the operator is completed. */ - CompletionStage migrateToProtostream(); + CompletionStage migrateToProtoStream(); /** * Returns an executor that will run the given tasks on a blocking thread as required. @@ -146,7 +147,9 @@ public interface InfinispanConnectionProvider extends Provider { * @param name The name for trace logging purpose. * @return The Infinispan blocking {@link Executor}. */ - Executor getExecutor(String name); + default Executor getExecutor(String name) { + return getBlockingManager().asExecutor(name); + } /** * @return The Infinispan {@link ScheduledExecutorService}. Long or blocking operations must not be executed directly. @@ -164,4 +167,14 @@ public interface InfinispanConnectionProvider extends Provider { } } + /** + * Returns the Infinispan {@link BlockingManager}. + *

+ * The {@link BlockingManager} should be used to execute blocking operation like disk I/O. It offloads the task to + * the Infinispan blocking thread pool. + * + * @return The Infinispan {@link BlockingManager}. + */ + BlockingManager getBlockingManager(); + } diff --git a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/remote/RemoteInfinispanConnectionProvider.java b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/remote/RemoteInfinispanConnectionProvider.java index 9767a728b7..17ce8d4f21 100644 --- a/model/infinispan/src/main/java/org/keycloak/connections/infinispan/remote/RemoteInfinispanConnectionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/connections/infinispan/remote/RemoteInfinispanConnectionProvider.java @@ -20,7 +20,6 @@ package org.keycloak.connections.infinispan.remote; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.infinispan.Cache; @@ -60,7 +59,7 @@ public record RemoteInfinispanConnectionProvider(EmbeddedCacheManager embeddedCa } @Override - public CompletionStage migrateToProtostream() { + public CompletionStage migrateToProtoStream() { // Only the CacheStore (persistence) stores data in binary format and needs to be deleted. // We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster. var stage = CompletionStages.aggregateCompletionStage(); @@ -71,17 +70,17 @@ public record RemoteInfinispanConnectionProvider(EmbeddedCacheManager embeddedCa return stage.freeze(); } - @Override - public Executor getExecutor(String name) { - return GlobalComponentRegistry.componentOf(embeddedCacheManager, BlockingManager.class).asExecutor(name); - } - @Override public ScheduledExecutorService getScheduledExecutor() { //noinspection removal return GlobalComponentRegistry.of(embeddedCacheManager).getComponent(ScheduledExecutorService.class, KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR); } + @Override + public BlockingManager getBlockingManager() { + return GlobalComponentRegistry.componentOf(embeddedCacheManager, BlockingManager.class); + } + @Override public void close() { //no-op diff --git a/model/infinispan/src/main/java/org/keycloak/infinispan/util/InfinispanUtils.java b/model/infinispan/src/main/java/org/keycloak/infinispan/util/InfinispanUtils.java index ecfe9569d7..46e29a0269 100644 --- a/model/infinispan/src/main/java/org/keycloak/infinispan/util/InfinispanUtils.java +++ b/model/infinispan/src/main/java/org/keycloak/infinispan/util/InfinispanUtils.java @@ -17,8 +17,12 @@ package org.keycloak.infinispan.util; +import java.util.Map; + +import org.keycloak.Config; import org.keycloak.common.Profile; import org.keycloak.common.util.MultiSiteUtils; +import org.keycloak.provider.ProviderConfigurationBuilder; import static org.keycloak.common.Profile.Feature.REMOTE_CACHE; @@ -45,4 +49,48 @@ public final class InfinispanUtils { public static boolean isEmbeddedInfinispan() { return !isRemoteInfinispan(); } + + // ---- Retries on Error - Exponential Back Off ---- + + // max number of retries on error. + public static final int DEFAULT_MAX_RETRIES = 10; + private static final String CONFIG_MAX_RETRIES = "maxRetries"; + + // the base back-off time in milliseconds + public static final int DEFAULT_RETRIES_BASE_TIME_MILLIS = 10; + private static final String CONFIG_RETRIES_BASE_TIME_MILLIS = "retryBaseTime"; + + public static void configureMaxRetries(ProviderConfigurationBuilder builder) { + builder.property() + .name(CONFIG_MAX_RETRIES) + .type("int") + .helpText("The maximum number of retries if an error occurs. A value of zero or less disable any retries.") + .defaultValue(DEFAULT_MAX_RETRIES) + .add(); + } + + public static void configureRetryBaseTime(ProviderConfigurationBuilder builder) { + builder.property() + .name(CONFIG_RETRIES_BASE_TIME_MILLIS) + .type("int") + .helpText("The base back-off time in milliseconds.") + .defaultValue(DEFAULT_RETRIES_BASE_TIME_MILLIS) + .add(); + } + + public static int getMaxRetries(Config.Scope config) { + return Math.max(0, config.getInt(CONFIG_MAX_RETRIES, DEFAULT_MAX_RETRIES)); + } + + public static int getRetryBaseTimeMillis(Config.Scope config) { + return Math.max(1, config.getInt(CONFIG_RETRIES_BASE_TIME_MILLIS, DEFAULT_RETRIES_BASE_TIME_MILLIS)); + } + + public static void maxRetriesToOperationalInfo(Map map, int value) { + map.put(CONFIG_MAX_RETRIES, Integer.toString(value)); + } + + public static void retryBaseTimeMillisToOperationalInfo(Map map, int value) { + map.put(CONFIG_RETRIES_BASE_TIME_MILLIS, Integer.toString(value)); + } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java index 13334215f3..feb9298fa8 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java @@ -253,7 +253,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi // Unable to read the cached data. if ("26.0.0".equals(modelVersion)) { log.debug("Clear caches to migrate to Infinispan Protostream"); - CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtostream()); + CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtoStream()); } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java index b4f7301093..7ba7d9ef98 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/PersistentUserSessionProvider.java @@ -976,7 +976,7 @@ public class PersistentUserSessionProvider implements UserSessionProvider, Sessi // Unable to read the cached data. if ("26.0.0".equals(modelVersion)) { log.debug("Clear caches to migrate to Infinispan Protostream"); - CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtostream()); + CompletionStages.join(session.getProvider(InfinispanConnectionProvider.class).migrateToProtoStream()); } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/ByRealmIdConditionalRemover.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/ByRealmIdConditionalRemover.java deleted file mode 100644 index 6838d6e16d..0000000000 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/ByRealmIdConditionalRemover.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2024 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration; - -import java.util.ArrayList; -import java.util.List; - -import org.infinispan.client.hotrod.RemoteCache; -import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover; -import org.keycloak.models.sessions.infinispan.entities.SessionEntity; - -/** - * A {@link ConditionalRemover} implementation to remove {@link SessionEntity} from a {@link RemoteCache} based on - * {@link SessionEntity#getRealmId()} value. - * - * @param The key's type stored in the {@link RemoteCache}. - * @param The value's type stored in the {@link RemoteCache}. - */ -public class ByRealmIdConditionalRemover extends IterationBasedConditionalRemover { - - private final List realms; - - public ByRealmIdConditionalRemover() { - realms = new ArrayList<>(); - } - - public void removeByRealmId(String realmId) { - realms.add(realmId); - } - - @Override - boolean isEmpty() { - return realms.isEmpty(); - } - - @Override - public boolean willRemove(K key, V value) { - return realms.contains(value.getRealmId()); - } - -} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/IterationBasedConditionalRemover.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/IterationBasedConditionalRemover.java deleted file mode 100644 index 43865c9ebb..0000000000 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/IterationBasedConditionalRemover.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2024 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration; - -import java.util.Map; - -import io.reactivex.rxjava3.core.Completable; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.functions.Predicate; -import org.infinispan.client.hotrod.MetadataValue; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.commons.util.concurrent.AggregateCompletionStage; -import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover; - -/** - * An iteration based implementation of {@link ConditionalRemover}. - *

- * This class is not performance efficient since it has to download the full {@link RemoteCache} content to perform the - * removal tests. - * - * @param The key's type stored in the {@link RemoteCache}. - * @param The value's type stored in the {@link RemoteCache}. - */ -abstract class IterationBasedConditionalRemover implements ConditionalRemover, Predicate>> { - - @Override - public final void executeRemovals(RemoteCache cache, AggregateCompletionStage stage) { - if (isEmpty()) { - return; - } - var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048)) - .filter(this) - .map(Map.Entry::getKey) - .flatMapCompletable(key -> Completable.fromCompletionStage(cache.removeAsync(key))) - .toCompletionStage(null); - stage.dependsOn(rmStage); - } - - @Override - public final boolean test(Map.Entry> entry) throws Throwable { - return willRemove(entry.getKey(), entry.getValue().getValue()); - } - - /** - * @return {@code true} if this implementation won't remove anything. It avoids iterating over the - * {@link RemoteCache} contents. - */ - abstract boolean isEmpty(); - -} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java index 671beb2e43..720d728687 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java @@ -69,12 +69,19 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater factory(boolean offline) { - return offline ? OFFLINE : ONLINE; + public static UpdaterFactory onlineFactory() { + return ONLINE; + } + + /** + * @return The {@link UpdaterFactory} implementation to create offline session instances of + * {@link AuthenticatedClientSessionUpdater}. + */ + public static UpdaterFactory offlineFactory() { + return OFFLINE; } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/user/UserSessionUpdater.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/user/UserSessionUpdater.java index db3bccc5a2..71409c3b9f 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/user/UserSessionUpdater.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/user/UserSessionUpdater.java @@ -52,11 +52,19 @@ public class UserSessionUpdater extends BaseUpdater factory(boolean offline) { - return offline ? OFFLINE : ONLINE; + public static UpdaterFactory onlineFactory() { + return ONLINE; + } + + /** + * @return The {@link UpdaterFactory} implementation to create offline sessions instances of + * {@link UserSessionModel}. + */ + public static UpdaterFactory offlineFactory() { + return OFFLINE; } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java index e20da5e885..bf76d4f670 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java @@ -17,10 +17,15 @@ package org.keycloak.models.sessions.infinispan.remote; import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.util.concurrent.BlockingManager; import org.jboss.logging.Logger; import org.keycloak.Config; +import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.infinispan.util.InfinispanUtils; import org.keycloak.marshalling.Marshalling; import org.keycloak.models.KeycloakSession; @@ -34,17 +39,23 @@ import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailu import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity; import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction; +import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction; import org.keycloak.provider.EnvironmentDependentProviderFactory; +import org.keycloak.provider.ProviderConfigProperty; +import org.keycloak.provider.ProviderConfigurationBuilder; +import org.keycloak.provider.ServerInfoAwareProviderFactory; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME; -import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.getRemoteCache; -public class RemoteUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory, UpdaterFactory, EnvironmentDependentProviderFactory { +public class RemoteUserLoginFailureProviderFactory implements UserLoginFailureProviderFactory, UpdaterFactory, EnvironmentDependentProviderFactory, RemoteChangeLogTransaction.SharedState, ServerInfoAwareProviderFactory { private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass()); public static final String PROTO_ENTITY = Marshalling.protoEntity(LoginFailureEntity.class); private volatile RemoteCache cache; + private volatile BlockingManager blockingManager; + private volatile int maxRetries = InfinispanUtils.DEFAULT_MAX_RETRIES; + private volatile int backOffBaseTimeMillis = InfinispanUtils.DEFAULT_RETRIES_BASE_TIME_MILLIS; @Override public RemoteUserLoginFailureProvider create(KeycloakSession session) { @@ -53,11 +64,17 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr @Override public void init(Config.Scope config) { + maxRetries = InfinispanUtils.getMaxRetries(config); + backOffBaseTimeMillis = InfinispanUtils.getRetryBaseTimeMillis(config); } @Override public void postInit(final KeycloakSessionFactory factory) { - cache = getRemoteCache(factory, LOGIN_FAILURE_CACHE_NAME); + try (var session = factory.create()) { + var provider = session.getProvider(InfinispanConnectionProvider.class); + cache = provider.getRemoteCache(LOGIN_FAILURE_CACHE_NAME); + blockingManager = provider.getBlockingManager(); + } factory.register(event -> { if (event instanceof UserModel.UserRemovedEvent userRemovedEvent) { UserLoginFailureProvider provider = userRemovedEvent.getKeycloakSession().getProvider(UserLoginFailureProvider.class, getId()); @@ -87,6 +104,22 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr return InfinispanUtils.isRemoteInfinispan(); } + @Override + public List getConfigMetadata() { + ProviderConfigurationBuilder builder = ProviderConfigurationBuilder.create(); + InfinispanUtils.configureMaxRetries(builder); + InfinispanUtils.configureRetryBaseTime(builder); + return builder.build(); + } + + @Override + public Map getOperationalInfo() { + Map map = new HashMap<>(); + InfinispanUtils.maxRetriesToOperationalInfo(map, maxRetries); + InfinispanUtils.retryBaseTimeMillisToOperationalInfo(map, backOffBaseTimeMillis); + return map; + } + @Override public LoginFailuresUpdater create(LoginFailureKey key, LoginFailureEntity entity) { return LoginFailuresUpdater.create(key, entity); @@ -102,8 +135,32 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr return LoginFailuresUpdater.delete(key); } + @Override + public RemoteCache cache() { + return cache; + } + + @Override + public int maxRetries() { + return maxRetries; + } + + @Override + public int backOffBaseTimeMillis() { + return backOffBaseTimeMillis; + } + + @Override + public BlockingManager blockingManager() { + return blockingManager; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = Math.max(0, maxRetries); + } + private LoginFailureChangeLogTransaction createAndEnlistTransaction(KeycloakSession session) { - var tx = new LoginFailureChangeLogTransaction(this, cache, new ByRealmIdQueryConditionalRemover<>(PROTO_ENTITY)); + var tx = new LoginFailureChangeLogTransaction(this, this, new ByRealmIdQueryConditionalRemover<>(PROTO_ENTITY)); session.getTransactionManager().enlistAfterCompletion(tx); return tx; } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java index 07c1953450..7d65f25d08 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java @@ -58,7 +58,7 @@ import org.keycloak.models.sessions.infinispan.query.ClientSessionQueries; import org.keycloak.models.sessions.infinispan.query.QueryHelper; import org.keycloak.models.sessions.infinispan.query.UserSessionQueries; import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction; -import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction; +import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction; import org.keycloak.models.utils.KeycloakModelUtils; import org.keycloak.utils.StreamsUtil; @@ -360,7 +360,7 @@ public class RemoteUserSessionProvider implements UserSessionProvider { transaction.removeUserSessionById(userSession.getId(), offline); } - private UseSessionChangeLogTransaction getUserSessionTransaction(boolean offline) { + private UserSessionChangeLogTransaction getUserSessionTransaction(boolean offline) { return transaction.getUserSessions(offline); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java index 0458dbf517..c2de7c5f70 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java @@ -1,9 +1,11 @@ package org.keycloak.models.sessions.infinispan.remote; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.Executor; +import java.util.Map; import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.util.concurrent.BlockingManager; import org.keycloak.Config; import org.keycloak.common.util.MultiSiteUtils; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; @@ -20,22 +22,30 @@ import org.keycloak.models.sessions.infinispan.entities.ClientSessionKey; import org.keycloak.models.sessions.infinispan.entities.RemoteAuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity; import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction; -import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction; +import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction; +import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionChangeLogTransaction; import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction; import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.ProviderConfigProperty; import org.keycloak.provider.ProviderConfigurationBuilder; import org.keycloak.provider.ProviderEvent; import org.keycloak.provider.ProviderEventListener; +import org.keycloak.provider.ServerInfoAwareProviderFactory; -public class RemoteUserSessionProviderFactory implements UserSessionProviderFactory, EnvironmentDependentProviderFactory, ProviderEventListener { +public class RemoteUserSessionProviderFactory implements UserSessionProviderFactory, EnvironmentDependentProviderFactory, ProviderEventListener, ServerInfoAwareProviderFactory { // Sessions are close to 1KB of data. Fetch 1MB per batch request (can be configured) private static final int DEFAULT_BATCH_SIZE = 1024; private static final String CONFIG_MAX_BATCH_SIZE = "batchSize"; - private volatile RemoteCacheHolder remoteCacheHolder; + private volatile SharedStateImpl userSessionState; + private volatile SharedStateImpl offlineUserSessionState; + private volatile SharedStateImpl clientSessionState; + private volatile SharedStateImpl offlineClientSessionState; + private volatile BlockingManager blockingManager; private volatile int batchSize = DEFAULT_BATCH_SIZE; + private volatile int maxRetries = InfinispanUtils.DEFAULT_MAX_RETRIES; + private volatile int backOffBaseTimeMillis = InfinispanUtils.DEFAULT_RETRIES_BASE_TIME_MILLIS; @Override public RemoteUserSessionProvider create(KeycloakSession session) { @@ -47,6 +57,8 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact @Override public void init(Config.Scope config) { batchSize = Math.max(1, config.getInt(CONFIG_MAX_BATCH_SIZE, DEFAULT_BATCH_SIZE)); + maxRetries = InfinispanUtils.getMaxRetries(config); + backOffBaseTimeMillis = InfinispanUtils.getRetryBaseTimeMillis(config); } @Override @@ -55,12 +67,15 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact lazyInit(session); } factory.register(this); - } @Override public void close() { - remoteCacheHolder = null; + blockingManager = null; + userSessionState = null; + offlineUserSessionState = null; + clientSessionState = null; + offlineClientSessionState = null; } @Override @@ -82,9 +97,22 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact .helpText("Batch size when streaming session from the remote cache") .defaultValue(DEFAULT_BATCH_SIZE) .add(); + + InfinispanUtils.configureMaxRetries(builder); + InfinispanUtils.configureRetryBaseTime(builder); + return builder.build(); } + @Override + public Map getOperationalInfo() { + Map map = new HashMap<>(); + map.put(CONFIG_MAX_BATCH_SIZE, Integer.toString(batchSize)); + InfinispanUtils.maxRetriesToOperationalInfo(map, maxRetries); + InfinispanUtils.retryBaseTimeMillisToOperationalInfo(map, backOffBaseTimeMillis); + return map; + } + @Override public void onEvent(ProviderEvent event) { if (event instanceof UserModel.UserRemovedEvent ure) { @@ -98,49 +126,53 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact } private void lazyInit(KeycloakSession session) { - if (remoteCacheHolder != null) { + if (blockingManager != null) { return; } - InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class); - RemoteCache userSessionCache = connections.getRemoteCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - RemoteCache offlineUserSessionsCache = connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME); - RemoteCache clientSessionCache = connections.getRemoteCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME); - RemoteCache offlineClientSessionsCache = connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME); - var executor = connections.getExecutor("query-delete"); - remoteCacheHolder = new RemoteCacheHolder(userSessionCache, offlineUserSessionsCache, clientSessionCache, offlineClientSessionsCache, executor); + var connections = session.getProvider(InfinispanConnectionProvider.class); + userSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME)); + offlineUserSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME)); + clientSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME)); + offlineClientSessionState = new SharedStateImpl<>(connections.getRemoteCache(InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME)); + blockingManager = connections.getBlockingManager(); } private UserSessionTransaction createTransaction(KeycloakSession session) { lazyInit(session); return new UserSessionTransaction( - createUserSessionTransaction(false), - createUserSessionTransaction(true), - createClientSessionTransaction(false), - createClientSessionTransaction(true) + new UserSessionChangeLogTransaction(UserSessionUpdater.onlineFactory(), userSessionState), + new UserSessionChangeLogTransaction(UserSessionUpdater.offlineFactory(), offlineUserSessionState), + new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.onlineFactory(), clientSessionState), + new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.offlineFactory(), offlineClientSessionState) ); } - private UseSessionChangeLogTransaction createUserSessionTransaction(boolean offline) { - return new UseSessionChangeLogTransaction(UserSessionUpdater.factory(offline), remoteCacheHolder.userSessionCache(offline)); - } + private class SharedStateImpl implements RemoteChangeLogTransaction.SharedState { - private ClientSessionChangeLogTransaction createClientSessionTransaction(boolean offline) { - return new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.factory(offline), remoteCacheHolder.clientSessionCache(offline)); - } + private final RemoteCache cache; - private record RemoteCacheHolder( - RemoteCache userSession, - RemoteCache offlineUserSession, - RemoteCache clientSession, - RemoteCache offlineClientSession, - Executor executor) { - - RemoteCache userSessionCache(boolean offline) { - return offline ? offlineUserSession : userSession; + private SharedStateImpl(RemoteCache cache) { + this.cache = cache; } - RemoteCache clientSessionCache(boolean offline) { - return offline ? offlineClientSession : clientSession; + @Override + public RemoteCache cache() { + return cache; + } + + @Override + public int maxRetries() { + return maxRetries; + } + + @Override + public int backOffBaseTimeMillis() { + return backOffBaseTimeMillis; + } + + @Override + public BlockingManager blockingManager() { + return blockingManager; } } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java index 44398a6db3..8b30ba2777 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java @@ -19,7 +19,6 @@ package org.keycloak.models.sessions.infinispan.remote.transaction; import java.util.stream.Stream; -import org.infinispan.client.hotrod.RemoteCache; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ClientSessionQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; @@ -34,8 +33,8 @@ import org.keycloak.models.sessions.infinispan.entities.RemoteAuthenticatedClien */ public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransaction { - public ClientSessionChangeLogTransaction(UpdaterFactory factory, RemoteCache cache) { - super(factory, cache, new ClientSessionQueryConditionalRemover()); + public ClientSessionChangeLogTransaction(UpdaterFactory factory, SharedState sharedState) { + super(factory, sharedState, new ClientSessionQueryConditionalRemover()); } /** diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java index a1258217b3..85a95962d1 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java @@ -17,7 +17,6 @@ package org.keycloak.models.sessions.infinispan.remote.transaction; -import org.infinispan.client.hotrod.RemoteCache; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ByRealmIdQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater; @@ -31,8 +30,8 @@ import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; */ public class LoginFailureChangeLogTransaction extends RemoteChangeLogTransaction> { - public LoginFailureChangeLogTransaction(UpdaterFactory factory, RemoteCache cache, ByRealmIdQueryConditionalRemover conditionalRemover) { - super(factory, cache, conditionalRemover); + public LoginFailureChangeLogTransaction(UpdaterFactory factory, SharedState sharedState, ByRealmIdQueryConditionalRemover conditionalRemover) { + super(factory, sharedState, conditionalRemover); } public void removeByRealmId(String realmId) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java index 631b24db37..ac38826620 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java @@ -29,6 +29,8 @@ import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.commons.util.concurrent.AggregateCompletionStage; import org.infinispan.commons.util.concurrent.CompletableFutures; import org.infinispan.commons.util.concurrent.CompletionStages; +import org.infinispan.util.concurrent.BlockingManager; +import org.keycloak.common.util.Retry; import org.keycloak.models.AbstractKeycloakTransaction; import org.keycloak.models.KeycloakTransaction; import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover; @@ -46,25 +48,29 @@ import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFac */ public class RemoteChangeLogTransaction, R extends ConditionalRemover> extends AbstractKeycloakTransaction { + private static final RetryOperationSuccess TO_NULL = (ignored1, ignored2, ignored3) -> CompletableFutures.completedNull(); private final Map entityChanges; private final UpdaterFactory factory; - private final RemoteCache cache; private final R conditionalRemover; + private final SharedState sharedState; - RemoteChangeLogTransaction(UpdaterFactory factory, RemoteCache cache, R conditionalRemover) { + RemoteChangeLogTransaction(UpdaterFactory factory, SharedState sharedState, R conditionalRemover) { this.factory = Objects.requireNonNull(factory); - this.cache = Objects.requireNonNull(cache); this.conditionalRemover = Objects.requireNonNull(conditionalRemover); + this.sharedState = Objects.requireNonNull(sharedState); entityChanges = new ConcurrentHashMap<>(8); } @Override protected void commitImpl() { - var stage = CompletionStages.aggregateCompletionStage(); - doCommit(stage); - CompletionStages.join(stage.freeze()); - entityChanges.clear(); + try { + var stage = CompletionStages.aggregateCompletionStage(); + doCommit(stage); + CompletionStages.join(stage.freeze()); + } finally { + entityChanges.clear(); + } } @Override @@ -83,30 +89,30 @@ public class RemoteChangeLogTransaction, R extends } private void doCommit(AggregateCompletionStage stage) { - conditionalRemover.executeRemovals(cache, stage); + conditionalRemover.executeRemovals(getCache(), stage); for (var updater : entityChanges.values()) { if (updater.isReadOnly() || updater.isTransient() || conditionalRemover.willRemove(updater)) { continue; } if (updater.isDeleted()) { - stage.dependsOn(cache.removeAsync(updater.getKey())); + stage.dependsOn(commitRemove(updater)); continue; } var expiration = updater.computeExpiration(); if (expiration.isExpired()) { - stage.dependsOn(cache.removeAsync(updater.getKey())); + stage.dependsOn(commitRemove(updater)); continue; } if (updater.isCreated()) { - stage.dependsOn(putIfAbsent(updater, expiration)); + stage.dependsOn(commitPutIfAbsent(updater, expiration)); continue; } - stage.dependsOn(replace(updater, expiration)); + stage.dependsOn(commitReplace(updater, expiration)); } } @@ -114,7 +120,7 @@ public class RemoteChangeLogTransaction, R extends * @return The {@link RemoteCache} tracked by the transaction. */ public RemoteCache getCache() { - return cache; + return sharedState.cache(); } /** @@ -130,7 +136,7 @@ public class RemoteChangeLogTransaction, R extends if (updater != null) { return updater.isDeleted() ? null : updater; } - return onEntityFromCache(key, cache.getWithMetadata(key)); + return onEntityFromCache(key, getCache().getWithMetadata(key)); } /** @@ -144,7 +150,7 @@ public class RemoteChangeLogTransaction, R extends if (updater != null) { return updater.isDeleted() ? CompletableFutures.completedNull() : CompletableFuture.completedFuture(updater); } - return cache.getWithMetadataAsync(key).thenApply(e -> onEntityFromCache(key, e)); + return getCache().getWithMetadataAsync(key).thenApply(e -> onEntityFromCache(key, e)); } /** @@ -199,24 +205,84 @@ public class RemoteChangeLogTransaction, R extends return updater.isDeleted() ? null : updater; } - private CompletionStage putIfAbsent(Updater updater, Expiration expiration) { - return cache.withFlags(Flag.FORCE_RETURN_VALUE) + @SuppressWarnings("unchecked") + private CompletionStage commitRemove(Updater updater) { + return executeWithRetries(this::invokeCacheRemove, (RetryOperationSuccess) TO_NULL, updater, null, 0); + } + + private CompletionStage commitPutIfAbsent(Updater updater, Expiration expiration) { + return executeWithRetries(this::invokeCachePutIfAbsent, this::handleBooleanResult, updater, expiration, 0); + } + + private CompletionStage commitReplace(Updater updater, Expiration expiration) { + return executeWithRetries(this::invokeCacheReplace, this::handleBooleanResult, updater, expiration, 0); + } + + @SuppressWarnings("unchecked") + private CompletionStage handleBooleanResult(boolean success, Updater updater, Expiration expiration) { + return success ? + CompletableFutures.completedNull() : + executeWithRetries(this::invokeCacheCompute, (RetryOperationSuccess) TO_NULL, updater, expiration, 0); + } + + private CompletionStage invokeCacheRemove(Updater updater, Expiration ignored) { + return getCache().removeAsync(updater.getKey()); + } + + private CompletionStage invokeCachePutIfAbsent(Updater updater, Expiration expiration) { + return getCache().withFlags(Flag.FORCE_RETURN_VALUE) .putIfAbsentAsync(updater.getKey(), updater.getValue(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS) - .thenApply(Objects::isNull) - .thenCompose(completed -> handleResponse(completed, updater, expiration)); + .thenApply(Objects::isNull); } - private CompletionStage replace(Updater updater, Expiration expiration) { - return cache.replaceWithVersionAsync(updater.getKey(), updater.getValue(), updater.getVersionRead(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS) - .thenCompose(completed -> handleResponse(completed, updater, expiration)); + private CompletionStage invokeCacheReplace(Updater updater, Expiration expiration) { + return getCache().replaceWithVersionAsync(updater.getKey(), updater.getValue(), updater.getVersionRead(), expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS); } - private CompletionStage handleResponse(boolean completed, Updater updater, Expiration expiration) { - return completed ? CompletableFutures.completedNull() : merge(updater, expiration); + private CompletionStage invokeCacheCompute(Updater updater, Expiration expiration) { + return getCache().computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS); } - private CompletionStage merge(Updater updater, Expiration expiration) { - return cache.computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS); + private CompletionStage executeWithRetries(RetryOperation operation, RetryOperationSuccess onSuccessAction, Updater updater, Expiration expiration, int retry) { + return operation.execute(updater, expiration) + .handle((result, throwable) -> handleOperationResult(result, throwable, operation, onSuccessAction, updater, expiration, retry)) + .thenCompose(CompletableFutures.identity()); } + private CompletionStage handleOperationResult(OR result, Throwable throwable, RetryOperation operation, RetryOperationSuccess onSuccessAction, Updater updater, Expiration expiration, int retry) { + if (throwable == null) { + return onSuccessAction.onSuccess(result, updater, expiration); + } + if (retry >= sharedState.maxRetries()) { + return CompletableFuture.failedFuture(CompletableFutures.extractException(throwable)); + } + return backOffAndExecuteWithRetries(operation, onSuccessAction, updater, expiration, retry + 1); + } + + private CompletionStage backOffAndExecuteWithRetries(RetryOperation operation, RetryOperationSuccess onSuccessAction, Updater updater, Expiration expiration, int retry) { + var delayMillis = Retry.computeBackoffInterval(sharedState.backOffBaseTimeMillis(), retry); + return sharedState.blockingManager().scheduleRunBlocking( + () -> executeWithRetries(operation, onSuccessAction, updater, expiration, retry), + delayMillis, TimeUnit.MILLISECONDS, "retry-" + updater) + .thenCompose(CompletableFutures.identity()); + } + + private interface RetryOperation { + CompletionStage execute(Updater updater, Expiration expiration); + } + + private interface RetryOperationSuccess { + CompletionStage onSuccess(R result, Updater updater, Expiration expiration); + } + + // Attempt to minimize class size. Each request creates a new instance of this class, and the shared state can be shared among those instances. + public interface SharedState { + RemoteCache cache(); + + int maxRetries(); + + int backOffBaseTimeMillis(); + + BlockingManager blockingManager(); + } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UseSessionChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java similarity index 79% rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UseSessionChangeLogTransaction.java rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java index 89e6c1f5ca..04b9b2a456 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UseSessionChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java @@ -18,7 +18,6 @@ package org.keycloak.models.sessions.infinispan.remote.transaction; import io.reactivex.rxjava3.core.Maybe; -import org.infinispan.client.hotrod.RemoteCache; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.UserSessionQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater; @@ -29,10 +28,10 @@ import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity; * {@code RemoteChangeLogTransaction>} */ -public class UseSessionChangeLogTransaction extends RemoteChangeLogTransaction { +public class UserSessionChangeLogTransaction extends RemoteChangeLogTransaction { - public UseSessionChangeLogTransaction(UpdaterFactory factory, RemoteCache cache) { - super(factory, cache, new UserSessionQueryConditionalRemover()); + public UserSessionChangeLogTransaction(UpdaterFactory factory, SharedState sharedState) { + super(factory, sharedState, new UserSessionQueryConditionalRemover()); } public UserSessionUpdater wrapFromProjection(Object[] projection) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionTransaction.java index e12c4f4fe7..7de29ccd88 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionTransaction.java @@ -29,12 +29,12 @@ import org.keycloak.models.KeycloakTransaction; */ public class UserSessionTransaction extends AbstractKeycloakTransaction { - private final UseSessionChangeLogTransaction userSessions; + private final UserSessionChangeLogTransaction userSessions; private final ClientSessionChangeLogTransaction clientSessions; - private final UseSessionChangeLogTransaction offlineUserSessions; + private final UserSessionChangeLogTransaction offlineUserSessions; private final ClientSessionChangeLogTransaction offlineClientSessions; - public UserSessionTransaction(UseSessionChangeLogTransaction userSessions, UseSessionChangeLogTransaction offlineUserSessions, ClientSessionChangeLogTransaction clientSessions, ClientSessionChangeLogTransaction offlineClientSessions) { + public UserSessionTransaction(UserSessionChangeLogTransaction userSessions, UserSessionChangeLogTransaction offlineUserSessions, ClientSessionChangeLogTransaction clientSessions, ClientSessionChangeLogTransaction offlineClientSessions) { this.userSessions = userSessions; this.offlineUserSessions = offlineUserSessions; this.clientSessions = clientSessions; @@ -73,7 +73,7 @@ public class UserSessionTransaction extends AbstractKeycloakTransaction { return offline ? offlineClientSessions : clientSessions; } - public UseSessionChangeLogTransaction getUserSessions(boolean offline) { + public UserSessionChangeLogTransaction getUserSessions(boolean offline) { return offline ? offlineUserSessions : userSessions; } diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/RetryAndBackOffTest.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/RetryAndBackOffTest.java new file mode 100644 index 0000000000..0cd98afd03 --- /dev/null +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/RetryAndBackOffTest.java @@ -0,0 +1,188 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.testsuite.model.infinispan; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ThreadLocalRandom; + +import org.infinispan.client.hotrod.exceptions.HotRodClientException; +import org.junit.Assume; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.keycloak.infinispan.util.InfinispanUtils; +import org.keycloak.models.Constants; +import org.keycloak.models.KeycloakSession; +import org.keycloak.models.RealmModel; +import org.keycloak.models.RealmProvider; +import org.keycloak.models.UserLoginFailureProvider; +import org.keycloak.models.UserProvider; +import org.keycloak.models.sessions.infinispan.remote.RemoteUserLoginFailureProviderFactory; +import org.keycloak.provider.ProviderFactory; +import org.keycloak.testsuite.model.HotRodServerRule; +import org.keycloak.testsuite.model.KeycloakModelTest; +import org.keycloak.testsuite.model.RequireProvider; + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME; + +@RequireProvider(UserLoginFailureProvider.class) +@RequireProvider(UserProvider.class) +@RequireProvider(RealmProvider.class) +public class RetryAndBackOffTest extends KeycloakModelTest { + + private static final int MAX_RETRIES = 5; + + @ClassRule + public static final TestRule SKIPPED_PROFILES = (base, description) -> { + // we only want to test retries with the remote cache/multi-site + Assume.assumeTrue(InfinispanUtils.isRemoteInfinispan()); + return base; + }; + + private String realmId; + private String userId; + private TimeOutInterceptor timeOutInterceptor; + + @Override + public void createEnvironment(KeycloakSession session) { + RealmModel realm = createRealm(session, "retry-and-backoff-test"); + session.getContext().setRealm(realm); + realm.setDefaultRole(session.roles().addRealmRole(realm, Constants.DEFAULT_ROLES_ROLE_PREFIX + "-" + realm.getName())); + realmId = realm.getId(); + var user = session.users().addUser(realm, "retry-user"); + user.setEmail("retry-user@localhost"); + userId = user.getId(); + timeOutInterceptor = injectInterceptor(); + ProviderFactory provider = session.getKeycloakSessionFactory().getProviderFactory(UserLoginFailureProvider.class); + assertTrue(provider instanceof RemoteUserLoginFailureProviderFactory); + ((RemoteUserLoginFailureProviderFactory) provider).setMaxRetries(MAX_RETRIES); + } + + @Override + public void cleanEnvironment(KeycloakSession s) { + RealmModel realm = s.realms().getRealm(realmId); + s.getContext().setRealm(realm); + s.realms().removeRealm(realmId); + } + + private static TimeOutInterceptor injectInterceptor() { + var optRemote = getParameters(HotRodServerRule.class).findFirst(); + assertTrue(optRemote.isPresent()); + var cacheManager = optRemote.get().getHotRodCacheManager(); + return TimeOutInterceptor.getOrInject(cacheManager.getCache(LOGIN_FAILURE_CACHE_NAME)); + } + + @Test + public void testRetryWithPut() { + timeOutInterceptor.timeoutPuts(randomTimeoutFailures()); + + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId); + loginFailures.incrementFailures(); + }); + + timeOutInterceptor.assertPutTimedOutExhausted(); + } + + @Test + public void testRetryWithReplace() { + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId); + loginFailures.incrementFailures(); + }); + + timeOutInterceptor.timeoutReplace(randomTimeoutFailures()); + + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + var loginFailures = session.loginFailures().getUserLoginFailure(realm, userId); + loginFailures.incrementFailures(); + }); + + timeOutInterceptor.assertReplaceTimedOutExhausted(); + } + + @Test + public void testRetryWithRemove() { + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId); + loginFailures.incrementFailures(); + }); + + timeOutInterceptor.timeoutRemove(randomTimeoutFailures()); + + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + session.loginFailures().removeUserLoginFailure(realm, userId); + }); + + timeOutInterceptor.assertRemoveTimedOutExhausted(); + } + + @Test + public void testRetryWithCompute() { + // compute is implemented with get() and replace() + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + var loginFailures = session.loginFailures().addUserLoginFailure(realm, userId); + loginFailures.incrementFailures(); + }); + + // first replace returns false, it should switch to compute + timeOutInterceptor.replaceReturnsFalse(1); + timeOutInterceptor.timeoutReplace(randomTimeoutFailures()); + + inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + var loginFailures = session.loginFailures().getUserLoginFailure(realm, userId); + loginFailures.incrementFailures(); + }); + + timeOutInterceptor.assertReplaceCheckFailExhausted(); + timeOutInterceptor.assertReplaceTimedOutExhausted(); + } + + @Test + public void testExceptionThrown() { + timeOutInterceptor.timeoutPuts(MAX_RETRIES + 1); + + try { + var ce = assertThrows(CompletionException.class, + () -> inComittedTransaction(session -> { + var realm = session.realms().getRealm(realmId); + session.loginFailures().addUserLoginFailure(realm, userId); + })); + assertTrue(String.valueOf(ce.getCause()), ce.getCause() instanceof HotRodClientException); + assertTrue(ce.getCause().getLocalizedMessage(), ce.getCause().getLocalizedMessage().contains(TimeOutInterceptor.MESSAGE)); + } finally { + timeOutInterceptor.reset(); + } + } + + private static int randomTimeoutFailures() { + // At most, 4 failures so the test don't take a long time to run. + // Adds a little bit of randomness into the test. + return Math.max(1, ThreadLocalRandom.current().nextInt(MAX_RETRIES)); + } + +} diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/TimeOutInterceptor.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/TimeOutInterceptor.java new file mode 100644 index 0000000000..686293f98b --- /dev/null +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/TimeOutInterceptor.java @@ -0,0 +1,134 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.testsuite.model.infinispan; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.infinispan.Cache; +import org.infinispan.commands.write.PutKeyValueCommand; +import org.infinispan.commands.write.RemoveCommand; +import org.infinispan.commands.write.ReplaceCommand; +import org.infinispan.commons.TimeoutException; +import org.infinispan.context.InvocationContext; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.interceptors.AsyncInterceptor; +import org.infinispan.interceptors.AsyncInterceptorChain; +import org.infinispan.interceptors.BaseCustomAsyncInterceptor; + +import static org.junit.Assert.assertTrue; + +/** + * An Infinispan {@link AsyncInterceptor} that throws {@link TimeoutException} for several times before letting the + * operation continue normally. + */ +public class TimeOutInterceptor extends BaseCustomAsyncInterceptor { + + public static final String MESSAGE = "Generated TimeOutException"; + + private final AtomicInteger putCount = new AtomicInteger(0); + private final AtomicInteger removeCount = new AtomicInteger(0); + private final AtomicInteger replaceCheckCount = new AtomicInteger(0); + private final AtomicInteger replaceCount = new AtomicInteger(0); + + public static TimeOutInterceptor getOrInject(Cache cache) { + var interceptorChain = ComponentRegistry.componentOf(cache, AsyncInterceptorChain.class); + var existing = interceptorChain.findInterceptorWithClass(TimeOutInterceptor.class); + if (existing != null) { + return existing; + } + var interceptor = new TimeOutInterceptor(); + interceptorChain.addInterceptor(interceptor, 0); + return interceptor; + } + + @Override + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) { + timeOut(putCount); + return invokeNext(ctx, command); + } + + @Override + public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) { + timeOut(removeCount); + return invokeNext(ctx, command); + } + + @Override + public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) { + if (replaceCheckCount.decrementAndGet() >= 0) { + return false; + } + timeOut(replaceCount); + return invokeNext(ctx, command); + } + + private static void timeOut(AtomicInteger counter) { + if (counter.decrementAndGet() >= 0) { + throw new TimeoutException(MESSAGE); + } + } + + private static void setCount(AtomicInteger counter, int value) { + assertTrue("number of failures must be positive: %s".formatted(value), value > 0); + counter.set(value); + } + + private static void assertCountExhausted(AtomicInteger counter, String operation) { + var pending = counter.get(); + assertTrue("Operation '%s' still has %s pending time out(s)".formatted(operation, pending), pending <= 0); + } + + public void timeoutPuts(int numberOfTimeOuts) { + setCount(putCount, numberOfTimeOuts); + } + + public void timeoutRemove(int numberOfTimeOuts) { + setCount(putCount, numberOfTimeOuts); + } + + public void timeoutReplace(int numberOfTimeOuts) { + setCount(replaceCount, numberOfTimeOuts); + } + + public void replaceReturnsFalse(int numberOfCheckFails) { + setCount(replaceCheckCount, numberOfCheckFails); + } + + public void assertPutTimedOutExhausted() { + assertCountExhausted(putCount, "put"); + } + + public void assertRemoveTimedOutExhausted() { + assertCountExhausted(removeCount, "remove"); + } + + public void assertReplaceTimedOutExhausted() { + assertCountExhausted(replaceCount, "replace"); + } + + public void assertReplaceCheckFailExhausted() { + assertCountExhausted(replaceCheckCount, "replace (check)"); + } + + public void reset() { + putCount.set(0); + removeCount.set(0); + replaceCount.set(0); + replaceCheckCount.set(0); + } +}