diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/UserSessionTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/UserSessionTransaction.java
deleted file mode 100644
index 833fca778c..0000000000
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/UserSessionTransaction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.keycloak.models.sessions.infinispan.changes.remote;
-
-import java.util.Objects;
-import java.util.UUID;
-
-import org.infinispan.commons.util.concurrent.CompletionStages;
-import org.keycloak.models.AbstractKeycloakTransaction;
-import org.keycloak.models.KeycloakTransaction;
-import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
-import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
-import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
-import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
-
-/**
- * A {@link KeycloakTransaction} implementation that wraps all the user and client session transactions.
- *
- * This implementation commits all modifications asynchronously and concurrently in both user and client sessions
- * transactions. Waits for all them to complete. This is an optimization to reduce the response time.
- */
-public class UserSessionTransaction extends AbstractKeycloakTransaction {
-
- private final RemoteChangeLogTransaction userSessions;
- private final RemoteChangeLogTransaction offlineUserSessions;
- private final RemoteChangeLogTransaction clientSessions;
- private final RemoteChangeLogTransaction offlineClientSessions;
-
- public UserSessionTransaction(RemoteChangeLogTransaction userSessions, RemoteChangeLogTransaction offlineUserSessions, RemoteChangeLogTransaction clientSessions, RemoteChangeLogTransaction offlineClientSessions) {
- this.userSessions = Objects.requireNonNull(userSessions);
- this.offlineUserSessions = Objects.requireNonNull(offlineUserSessions);
- this.clientSessions = Objects.requireNonNull(clientSessions);
- this.offlineClientSessions = Objects.requireNonNull(offlineClientSessions);
- }
-
- @Override
- public void begin() {
- super.begin();
- userSessions.begin();
- offlineUserSessions.begin();
- clientSessions.begin();
- offlineClientSessions.begin();
- }
-
- @Override
- protected void commitImpl() {
- var stage = CompletionStages.aggregateCompletionStage();
- userSessions.commitAsync(stage);
- offlineUserSessions.commitAsync(stage);
- clientSessions.commitAsync(stage);
- offlineClientSessions.commitAsync(stage);
- CompletionStages.join(stage.freeze());
- }
-
- @Override
- protected void rollbackImpl() {
- userSessions.rollback();
- offlineUserSessions.rollback();
- clientSessions.rollback();
- offlineClientSessions.rollback();
- }
-
- public RemoteChangeLogTransaction getClientSessions() {
- return clientSessions;
- }
-
- public RemoteChangeLogTransaction getOfflineClientSessions() {
- return offlineClientSessions;
- }
-
- public RemoteChangeLogTransaction getOfflineUserSessions() {
- return offlineUserSessions;
- }
-
- public RemoteChangeLogTransaction getUserSessions() {
- return userSessions;
- }
-}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/ConditionalRemover.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/ConditionalRemover.java
new file mode 100644
index 0000000000..5efa0f0597
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/ConditionalRemover.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.remote.remover;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
+import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
+import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;
+
+/**
+ * It handles conditional remove operations.
+ *
+ * This class is preferred to remove an unknown amount of entries by its key and/or value state. The implement may use
+ * queries (delete statements) or perform a full cache scan to find the entries to remove.
+ *
+ * The method {@link #willRemove(Updater)} is invoked by {@link RemoteChangeLogTransaction} before perform any change
+ * tracked by the {@link Updater}. This is an optimization to prevent sending changes that would be removed by this
+ * {@link ConditionalRemover}.
+ *
+ * @param The key's type stored in the {@link RemoteCache}.
+ * @param The value's type stored in the {@link RemoteCache}.
+ */
+public interface ConditionalRemover {
+
+ /**
+ * @param key The entry's key to test.
+ * @param value The entry's value to test.
+ * @return {@code true} if the entry will be removed from the {@link RemoteCache}.
+ */
+ boolean willRemove(K key, V value);
+
+ /**
+ * @param updater The {@link Updater} to test.
+ * @return {@code true} if the entry tracked by the {@link Updater} will be removed from the {@link RemoteCache}.
+ */
+ default boolean willRemove(Updater updater) {
+ // The value can be null if the entry updated is marked as deleted.
+ // In that case, we don't have the value to check for the condition and will let the transaction perform the removal.
+ return updater.getValue() != null && willRemove(updater.getKey(), updater.getValue());
+ }
+
+ /**
+ * Executes the conditional removes in the {@link RemoteCache}.
+ *
+ * @param cache The {@link RemoteCache} to perform the remove operations.
+ * @param stage The {@link AggregateCompletionStage} to add any incomplete tasks.
+ */
+ void executeRemovals(RemoteCache cache, AggregateCompletionStage stage);
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/EmptyConditionalRemover.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/EmptyConditionalRemover.java
new file mode 100644
index 0000000000..d60ebf7afa
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/EmptyConditionalRemover.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.remote.remover;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
+
+/**
+ * A {@link ConditionalRemover} that does not remove anything.
+ *
+ * @param The key's type stored in the {@link RemoteCache}.
+ * @param The value's type stored in the {@link RemoteCache}.
+ */
+public class EmptyConditionalRemover implements ConditionalRemover {
+
+ private static final EmptyConditionalRemover, ?> INSTANCE = new EmptyConditionalRemover<>();
+
+ @SuppressWarnings("unchecked")
+ public static ConditionalRemover instance() {
+ return (ConditionalRemover) INSTANCE;
+ }
+
+
+ @Override
+ public boolean willRemove(K key, V value) {
+ return false;
+ }
+
+ @Override
+ public void executeRemovals(RemoteCache cache, AggregateCompletionStage stage) {
+ //no-op
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/ByRealmIdConditionalRemover.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/ByRealmIdConditionalRemover.java
new file mode 100644
index 0000000000..6838d6e16d
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/ByRealmIdConditionalRemover.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
+import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
+
+/**
+ * A {@link ConditionalRemover} implementation to remove {@link SessionEntity} from a {@link RemoteCache} based on
+ * {@link SessionEntity#getRealmId()} value.
+ *
+ * @param The key's type stored in the {@link RemoteCache}.
+ * @param The value's type stored in the {@link RemoteCache}.
+ */
+public class ByRealmIdConditionalRemover extends IterationBasedConditionalRemover {
+
+ private final List realms;
+
+ public ByRealmIdConditionalRemover() {
+ realms = new ArrayList<>();
+ }
+
+ public void removeByRealmId(String realmId) {
+ realms.add(realmId);
+ }
+
+ @Override
+ boolean isEmpty() {
+ return realms.isEmpty();
+ }
+
+ @Override
+ public boolean willRemove(K key, V value) {
+ return realms.contains(value.getRealmId());
+ }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/IterationBasedConditionalRemover.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/IterationBasedConditionalRemover.java
new file mode 100644
index 0000000000..43865c9ebb
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/remover/iteration/IterationBasedConditionalRemover.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;
+
+import java.util.Map;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.Flowable;
+import io.reactivex.rxjava3.functions.Predicate;
+import org.infinispan.client.hotrod.MetadataValue;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
+
+/**
+ * An iteration based implementation of {@link ConditionalRemover}.
+ *
+ * This class is not performance efficient since it has to download the full {@link RemoteCache} content to perform the
+ * removal tests.
+ *
+ * @param The key's type stored in the {@link RemoteCache}.
+ * @param The value's type stored in the {@link RemoteCache}.
+ */
+abstract class IterationBasedConditionalRemover implements ConditionalRemover, Predicate>> {
+
+ @Override
+ public final void executeRemovals(RemoteCache cache, AggregateCompletionStage stage) {
+ if (isEmpty()) {
+ return;
+ }
+ var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
+ .filter(this)
+ .map(Map.Entry::getKey)
+ .flatMapCompletable(key -> Completable.fromCompletionStage(cache.removeAsync(key)))
+ .toCompletionStage(null);
+ stage.dependsOn(rmStage);
+ }
+
+ @Override
+ public final boolean test(Map.Entry> entry) throws Throwable {
+ return willRemove(entry.getKey(), entry.getValue().getValue());
+ }
+
+ /**
+ * @return {@code true} if this implementation won't remove anything. It avoids iterating over the
+ * {@link RemoteCache} contents.
+ */
+ abstract boolean isEmpty();
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java
index 138c498415..e04d4e8208 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java
@@ -16,10 +16,11 @@
*/
package org.keycloak.models.sessions.infinispan.changes.remote.updater;
-import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import java.util.function.BiFunction;
+import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;
+
/**
* An interface used by {@link RemoteChangeLogTransaction}.
*
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java
index 0d13d57c06..78457c34b3 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java
@@ -31,13 +31,13 @@ import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
-import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.helper.MapUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
+import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
/**
@@ -53,7 +53,7 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater clientTransaction;
+ private ClientSessionChangeLogTransaction clientTransaction;
private AuthenticatedClientSessionUpdater(UUID cacheKey, AuthenticatedClientSessionEntity cacheValue, long version, boolean offline, UpdaterState initialState) {
super(cacheKey, cacheValue, version, initialState);
@@ -200,10 +200,10 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater clientTransaction) {
+ public synchronized void initialize(UserSessionModel userSession, ClientModel client, ClientSessionChangeLogTransaction clientTransaction) {
this.userSession = Objects.requireNonNull(userSession);
this.client = Objects.requireNonNull(client);
this.clientTransaction = Objects.requireNonNull(clientTransaction);
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProvider.java
index b522d5cb1d..7a98f66cce 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProvider.java
@@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan.remote;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
@@ -32,6 +31,7 @@ import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionPr
import org.keycloak.models.sessions.infinispan.RootAuthenticationSessionAdapter;
import org.keycloak.models.sessions.infinispan.SessionEntityUpdater;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
+import org.keycloak.models.sessions.infinispan.remote.transaction.AuthenticationSessionTransaction;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.SessionExpiration;
import org.keycloak.sessions.AuthenticationSessionCompoundId;
@@ -41,14 +41,13 @@ import org.keycloak.sessions.RootAuthenticationSessionModel;
public class RemoteInfinispanAuthenticationSessionProvider implements AuthenticationSessionProvider {
private final KeycloakSession session;
- private final RemoteInfinispanKeycloakTransaction transaction;
+ private final AuthenticationSessionTransaction transaction;
private final int authSessionsLimit;
- public RemoteInfinispanAuthenticationSessionProvider(KeycloakSession session, RemoteInfinispanAuthenticationSessionProviderFactory factory) {
+ public RemoteInfinispanAuthenticationSessionProvider(KeycloakSession session, int authSessionsLimit, AuthenticationSessionTransaction transaction) {
this.session = Objects.requireNonNull(session);
- authSessionsLimit = Objects.requireNonNull(factory).getAuthSessionsLimit();
- transaction = new RemoteInfinispanKeycloakTransaction<>(factory.getCache());
- session.getTransactionManager().enlistAfterCompletion(transaction);
+ this.authSessionsLimit = authSessionsLimit;
+ this.transaction = Objects.requireNonNull(transaction);
}
@Override
@@ -95,8 +94,7 @@ public class RemoteInfinispanAuthenticationSessionProvider implements Authentica
@Override
public void onRealmRemoved(RealmModel realm) {
- // TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ...
- transaction.removeIf(new RealmFilter(realm.getId()));
+ transaction.removeByRealmId(realm.getId());
}
@Override
@@ -123,7 +121,7 @@ public class RemoteInfinispanAuthenticationSessionProvider implements Authentica
}
private record RootAuthenticationSessionUpdater(RealmModel realm, RootAuthenticationSessionEntity entity,
- RemoteInfinispanKeycloakTransaction transaction
+ AuthenticationSessionTransaction transaction
) implements SessionEntityUpdater {
@Override
@@ -142,12 +140,4 @@ public class RemoteInfinispanAuthenticationSessionProvider implements Authentica
transaction.remove(entity.getId());
}
}
-
- private record RealmFilter(String realmId) implements Predicate {
-
- @Override
- public boolean test(RootAuthenticationSessionEntity entity) {
- return Objects.equals(realmId, entity.getRealmId());
- }
- }
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProviderFactory.java
index fea6890cd4..1d7a740c96 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanAuthenticationSessionProviderFactory.java
@@ -28,6 +28,7 @@ import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
+import org.keycloak.models.sessions.infinispan.remote.transaction.AuthenticationSessionTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
@@ -51,7 +52,7 @@ public class RemoteInfinispanAuthenticationSessionProviderFactory implements Aut
@Override
public RemoteInfinispanAuthenticationSessionProvider create(KeycloakSession session) {
- return new RemoteInfinispanAuthenticationSessionProvider(session, this);
+ return new RemoteInfinispanAuthenticationSessionProvider(session, authSessionsLimit, createAndEnlistTransaction(session));
}
@Override
@@ -92,11 +93,9 @@ public class RemoteInfinispanAuthenticationSessionProviderFactory implements Aut
return InfinispanUtils.PROVIDER_ORDER;
}
- public int getAuthSessionsLimit() {
- return authSessionsLimit;
- }
-
- public RemoteCache getCache() {
- return cache;
+ private AuthenticationSessionTransaction createAndEnlistTransaction(KeycloakSession session) {
+ var tx = new AuthenticationSessionTransaction(cache);
+ session.getTransactionManager().enlist(tx);
+ return tx;
}
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProvider.java
index 20ed5e9056..b95a51eb54 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProvider.java
@@ -17,27 +17,27 @@
package org.keycloak.models.sessions.infinispan.remote;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.jboss.logging.Logger;
-import org.keycloak.models.KeycloakSession;
import org.keycloak.models.SingleUseObjectProvider;
import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.keycloak.models.sessions.infinispan.remote.transaction.SingleUseObjectTransaction;
public class RemoteInfinispanSingleUseObjectProvider implements SingleUseObjectProvider {
private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
- private final RemoteInfinispanKeycloakTransaction transaction;
+ private final SingleUseObjectTransaction transaction;
- public RemoteInfinispanSingleUseObjectProvider(KeycloakSession session, RemoteCache cache) {
- transaction = new RemoteInfinispanKeycloakTransaction<>(cache);
- session.getTransactionManager().enlistAfterCompletion(transaction);
+ public RemoteInfinispanSingleUseObjectProvider(SingleUseObjectTransaction transaction) {
+ this.transaction = Objects.requireNonNull(transaction);
}
@Override
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProviderFactory.java
index 5a250b37d6..48c5f680a6 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanSingleUseObjectProviderFactory.java
@@ -27,6 +27,7 @@ import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.SingleUseObjectProviderFactory;
import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity;
+import org.keycloak.models.sessions.infinispan.remote.transaction.SingleUseObjectTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE;
@@ -41,7 +42,7 @@ public class RemoteInfinispanSingleUseObjectProviderFactory implements SingleUse
@Override
public RemoteInfinispanSingleUseObjectProvider create(KeycloakSession session) {
assert cache != null;
- return new RemoteInfinispanSingleUseObjectProvider(session, cache);
+ return new RemoteInfinispanSingleUseObjectProvider(createAndEnlistTransaction(session));
}
@Override
@@ -74,4 +75,10 @@ public class RemoteInfinispanSingleUseObjectProviderFactory implements SingleUse
public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isRemoteInfinispan();
}
+
+ private SingleUseObjectTransaction createAndEnlistTransaction(KeycloakSession session) {
+ var tx = new SingleUseObjectTransaction(cache);
+ session.getTransactionManager().enlistAfterCompletion(tx);
+ return tx;
+ }
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProvider.java
index 0e0b29c500..ade1c723e5 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProvider.java
@@ -23,10 +23,9 @@ import org.jboss.logging.Logger;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserLoginFailureModel;
import org.keycloak.models.UserLoginFailureProvider;
-import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
-import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
+import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction;
import static org.keycloak.common.util.StackUtil.getShortStackTrace;
@@ -35,13 +34,12 @@ public class RemoteUserLoginFailureProvider implements UserLoginFailureProvider
private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass());
- private final RemoteChangeLogTransaction transaction;
+ private final LoginFailureChangeLogTransaction transaction;
- public RemoteUserLoginFailureProvider(RemoteChangeLogTransaction transaction) {
+ public RemoteUserLoginFailureProvider(LoginFailureChangeLogTransaction transaction) {
this.transaction = Objects.requireNonNull(transaction);
}
-
@Override
public UserLoginFailureModel getUserLoginFailure(RealmModel realm, String userId) {
if (log.isTraceEnabled()) {
@@ -75,7 +73,7 @@ public class RemoteUserLoginFailureProvider implements UserLoginFailureProvider
log.tracef("removeAllUserLoginFailures(%s)%s", realm, getShortStackTrace());
}
- transaction.removeIf(entity -> Objects.equals(entity.getRealmId(), realm.getId()));
+ transaction.removeByRealmId(realm.getId());
}
@Override
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java
index c8076f640c..701826cc82 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserLoginFailureProviderFactory.java
@@ -16,6 +16,8 @@
*/
package org.keycloak.models.sessions.infinispan.remote;
+import java.lang.invoke.MethodHandles;
+
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.jboss.logging.Logger;
@@ -26,15 +28,13 @@ import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.UserLoginFailureProvider;
import org.keycloak.models.UserLoginFailureProviderFactory;
import org.keycloak.models.UserModel;
-import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
+import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
-import java.lang.invoke.MethodHandles;
-
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.getRemoteCache;
@@ -46,9 +46,7 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
@Override
public RemoteUserLoginFailureProvider create(KeycloakSession session) {
- var tx = new RemoteChangeLogTransaction<>(this, cache);
- session.getTransactionManager().enlistAfterCompletion(tx);
- return new RemoteUserLoginFailureProvider(tx);
+ return new RemoteUserLoginFailureProvider(createAndEnlistTransaction(session));
}
@Override
@@ -102,4 +100,10 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
public LoginFailuresUpdater deleted(LoginFailureKey key) {
return LoginFailuresUpdater.delete(key);
}
+
+ private LoginFailureChangeLogTransaction createAndEnlistTransaction(KeycloakSession session) {
+ var tx = new LoginFailureChangeLogTransaction(this, cache);
+ session.getTransactionManager().enlistAfterCompletion(tx);
+ return tx;
+ }
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java
index 0f2ac1a5dd..9c7aa1e9f2 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java
@@ -52,8 +52,6 @@ import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.light.LightweightUserAdapter;
import org.keycloak.models.session.UserSessionPersisterProvider;
-import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
-import org.keycloak.models.sessions.infinispan.changes.remote.UserSessionTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.ClientSessionMappingAdapter;
@@ -61,8 +59,10 @@ import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.Clien
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
-import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
+import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction;
+import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.utils.StreamsUtil;
@@ -203,22 +203,14 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
//rely on Infinispan expiration
}
- @SuppressWarnings("unchecked")
@Override
public void removeUserSessions(RealmModel realm) {
- Predicate extends SessionEntity> predicate = e -> Objects.equals(e.getRealmId(), realm.getId());
- transaction.getUserSessions().removeIf((Predicate) predicate);
- transaction.getClientSessions().removeIf((Predicate) predicate);
+ transaction.removeOnlineSessionsByRealmId(realm.getId());
}
- @SuppressWarnings("unchecked")
@Override
public void onRealmRemoved(RealmModel realm) {
- Predicate extends SessionEntity> predicate = e -> Objects.equals(e.getRealmId(), realm.getId());
- transaction.getUserSessions().removeIf((Predicate) predicate);
- transaction.getOfflineUserSessions().removeIf((Predicate) predicate);
- transaction.getClientSessions().removeIf((Predicate) predicate);
- transaction.getOfflineClientSessions().removeIf((Predicate) predicate);
+ transaction.removeAllSessionsByRealmId(realm.getId());
var database = session.getProvider(UserSessionPersisterProvider.class);
if (database != null) {
database.onRealmRemoved(realm);
@@ -297,11 +289,6 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
return session;
}
- @Override
- public void importUserSessions(Collection persistentUserSessions, boolean offline) {
- //no-op
- }
-
@Override
public void close() {
@@ -407,11 +394,11 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
.blockingStream(batchSize);
}
- private RemoteChangeLogTransaction getUserSessionTransaction(boolean offline) {
+ private UseSessionChangeLogTransaction getUserSessionTransaction(boolean offline) {
return offline ? transaction.getOfflineUserSessions() : transaction.getUserSessions();
}
- private RemoteChangeLogTransaction getClientSessionTransaction(boolean offline) {
+ private ClientSessionChangeLogTransaction getClientSessionTransaction(boolean offline) {
return offline ? transaction.getOfflineClientSessions() : transaction.getClientSessions();
}
@@ -516,10 +503,10 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
private class RemoteClientSessionAdapterProvider implements ClientSessionProvider, UserSessionUpdater.ClientSessionAdapterFactory {
- private final RemoteChangeLogTransaction transaction;
+ private final ClientSessionChangeLogTransaction transaction;
private final UserSessionUpdater userSession;
- private RemoteClientSessionAdapterProvider(RemoteChangeLogTransaction transaction, UserSessionUpdater userSession) {
+ private RemoteClientSessionAdapterProvider(ClientSessionChangeLogTransaction transaction, UserSessionUpdater userSession) {
this.transaction = transaction;
this.userSession = userSession;
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java
index c75ce13950..dc18abf898 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProviderFactory.java
@@ -14,12 +14,13 @@ import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UserSessionProviderFactory;
import org.keycloak.models.session.UserSessionPersisterProvider;
-import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
-import org.keycloak.models.sessions.infinispan.changes.remote.UserSessionTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
+import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
+import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction;
+import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
@@ -117,12 +118,12 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
);
}
- private RemoteChangeLogTransaction createUserSessionTransaction(boolean offline) {
- return new RemoteChangeLogTransaction<>(UserSessionUpdater.factory(offline), remoteCacheHolder.userSessionCache(offline));
+ private UseSessionChangeLogTransaction createUserSessionTransaction(boolean offline) {
+ return new UseSessionChangeLogTransaction(UserSessionUpdater.factory(offline), remoteCacheHolder.userSessionCache(offline));
}
- private RemoteChangeLogTransaction createClientSessionTransaction(boolean offline) {
- return new RemoteChangeLogTransaction<>(AuthenticatedClientSessionUpdater.factory(offline), remoteCacheHolder.clientSessionCache(offline));
+ private ClientSessionChangeLogTransaction createClientSessionTransaction(boolean offline) {
+ return new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.factory(offline), remoteCacheHolder.clientSessionCache(offline));
}
private record RemoteCacheHolder(
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/AuthenticationSessionTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/AuthenticationSessionTransaction.java
new file mode 100644
index 0000000000..de45a259b5
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/AuthenticationSessionTransaction.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.remote.transaction;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
+import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
+
+/**
+ * Syntactic sugar for
+ * {@code RemoteInfinispanKeycloakTransaction>}
+ */
+public class AuthenticationSessionTransaction extends RemoteInfinispanKeycloakTransaction> {
+
+ public AuthenticationSessionTransaction(RemoteCache cache) {
+ super(cache, new ByRealmIdConditionalRemover<>());
+ }
+
+ public void removeByRealmId(String realmId) {
+ getConditionalRemover().removeByRealmId(realmId);
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java
new file mode 100644
index 0000000000..18ee33ef7b
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.remote.transaction;
+
+import java.util.UUID;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
+import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
+import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
+import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
+
+/**
+ * Syntactic sugar for
+ * {@code RemoteChangeLogTransaction>}
+ */
+public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransaction> {
+
+ public ClientSessionChangeLogTransaction(UpdaterFactory factory, RemoteCache cache) {
+ super(factory, cache, new ByRealmIdConditionalRemover<>());
+ }
+
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java
new file mode 100644
index 0000000000..716f83e995
--- /dev/null
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/LoginFailureChangeLogTransaction.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2024 Red Hat, Inc. and/or its affiliates
+ * and other contributors as indicated by the @author tags.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.keycloak.models.sessions.infinispan.remote.transaction;
+
+import org.infinispan.client.hotrod.RemoteCache;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
+import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
+import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
+import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
+import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
+
+/**
+ * Syntactic sugar for
+ * {@code RemoteChangeLogTransaction>}
+ */
+public class LoginFailureChangeLogTransaction extends RemoteChangeLogTransaction> {
+
+ public LoginFailureChangeLogTransaction(UpdaterFactory factory, RemoteCache cache) {
+ super(factory, cache, new ByRealmIdConditionalRemover<>());
+ }
+
+ public void removeByRealmId(String realmId) {
+ getConditionalRemover().removeByRealmId(realmId);
+ }
+}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/RemoteChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java
similarity index 82%
rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/RemoteChangeLogTransaction.java
rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java
index e83773281b..1df7151e44 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/RemoteChangeLogTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.keycloak.models.sessions.infinispan.changes.remote;
+package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.util.Map;
import java.util.Objects;
@@ -22,10 +22,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import io.reactivex.rxjava3.core.Completable;
-import io.reactivex.rxjava3.core.Flowable;
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
@@ -34,29 +31,31 @@ import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakTransaction;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
/**
- * A {@link KeycloakTransaction} implementation that keeps track of changes made to entities stored
- * in a Infinispan cache.
+ * A {@link KeycloakTransaction} implementation that keeps track of changes made to entities stored in a Infinispan
+ * cache.
*
* @param The type of the Infinispan cache key.
* @param The type of the Infinispan cache value.
* @param The type of the {@link Updater} implementation.
*/
-public class RemoteChangeLogTransaction> extends AbstractKeycloakTransaction {
+public class RemoteChangeLogTransaction, R extends ConditionalRemover> extends AbstractKeycloakTransaction {
private final Map entityChanges;
private final UpdaterFactory factory;
private final RemoteCache cache;
- private Predicate removePredicate;
+ private final R conditionalRemover;
- public RemoteChangeLogTransaction(UpdaterFactory factory, RemoteCache cache) {
+ RemoteChangeLogTransaction(UpdaterFactory factory, RemoteCache cache, R conditionalRemover) {
this.factory = Objects.requireNonNull(factory);
this.cache = Objects.requireNonNull(cache);
+ this.conditionalRemover = Objects.requireNonNull(conditionalRemover);
entityChanges = new ConcurrentHashMap<>(8);
}
@@ -66,13 +65,11 @@ public class RemoteChangeLogTransaction> extends A
doCommit(stage);
CompletionStages.join(stage.freeze());
entityChanges.clear();
- removePredicate = null;
}
@Override
protected void rollbackImpl() {
entityChanges.clear();
- removePredicate = null;
}
public void commitAsync(AggregateCompletionStage stage) {
@@ -86,18 +83,10 @@ public class RemoteChangeLogTransaction> extends A
}
private void doCommit(AggregateCompletionStage stage) {
- if (removePredicate != null) {
- // TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ...
- var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
- .filter(e -> removePredicate.test(e.getValue().getValue()))
- .map(Map.Entry::getKey)
- .flatMapCompletable(this::removeKey)
- .toCompletionStage(null);
- stage.dependsOn(rmStage);
- }
+ conditionalRemover.executeRemovals(cache, stage);
for (var updater : entityChanges.values()) {
- if (updater.isReadOnly() || updater.isTransient() || (removePredicate != null && removePredicate.test(updater.getValue()))) {
+ if (updater.isReadOnly() || updater.isTransient() || conditionalRemover.willRemove(updater)) {
continue;
}
if (updater.isDeleted()) {
@@ -185,17 +174,8 @@ public class RemoteChangeLogTransaction> extends A
entityChanges.put(key, factory.deleted(key));
}
- /**
- * Removes all Infinispan cache values that satisfy the given predicate.
- *
- * @param predicate The {@link Predicate} which returns {@code true} for elements to be removed.
- */
- public void removeIf(Predicate predicate) {
- if (removePredicate == null) {
- removePredicate = predicate;
- return;
- }
- removePredicate = removePredicate.or(predicate);
+ R getConditionalRemover() {
+ return conditionalRemover;
}
public T wrap(Map.Entry> entry) {
@@ -231,8 +211,4 @@ public class RemoteChangeLogTransaction> extends A
return cache.computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS);
}
- private Completable removeKey(K key) {
- return Completable.fromCompletionStage(cache.removeAsync(key));
- }
-
}
diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanKeycloakTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteInfinispanKeycloakTransaction.java
similarity index 79%
rename from model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanKeycloakTransaction.java
rename to model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteInfinispanKeycloakTransaction.java
index 1e32fc1734..7c74f45536 100644
--- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteInfinispanKeycloakTransaction.java
+++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteInfinispanKeycloakTransaction.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.keycloak.models.sessions.infinispan.remote;
+package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap;
@@ -24,42 +24,32 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-import io.reactivex.rxjava3.core.Completable;
-import io.reactivex.rxjava3.core.Flowable;
-import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.jboss.logging.Logger;
import org.keycloak.models.AbstractKeycloakTransaction;
+import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
-public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakTransaction {
+class RemoteInfinispanKeycloakTransaction> extends AbstractKeycloakTransaction {
private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final Map> tasks = new LinkedHashMap<>();
private final RemoteCache cache;
- private Predicate removePredicate;
+ private final R conditionalRemover;
- public RemoteInfinispanKeycloakTransaction(RemoteCache cache) {
+ RemoteInfinispanKeycloakTransaction(RemoteCache cache, R conditionalRemover) {
this.cache = Objects.requireNonNull(cache);
+ this.conditionalRemover = Objects.requireNonNull(conditionalRemover);
}
@Override
protected void commitImpl() {
AggregateCompletionStage stage = CompletionStages.aggregateCompletionStage();
- if (removePredicate != null) {
- // TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ...
- var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
- .filter(this::shouldRemoveEntry)
- .map(Map.Entry::getKey)
- .flatMapCompletable(this::removeKey)
- .toCompletionStage(null);
- stage.dependsOn(rmStage);
- }
+ conditionalRemover.executeRemovals(cache, stage);
tasks.values().stream()
.filter(this::shouldCommitOperation)
.map(this::commitOperation)
@@ -127,21 +117,8 @@ public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakT
return cache;
}
- /**
- * Removes all Infinispan cache values that satisfy the given predicate.
- *
- * @param predicate The {@link Predicate} which returns {@code true} for elements to be removed.
- */
- public void removeIf(Predicate predicate) {
- if (removePredicate == null) {
- removePredicate = predicate;
- return;
- }
- removePredicate = removePredicate.or(predicate);
- }
-
- private Completable removeKey(K key) {
- return Completable.fromCompletionStage(cache.removeAsync(key));
+ R getConditionalRemover() {
+ return conditionalRemover;
}
private boolean shouldCommitOperation(Operation operation) {
@@ -149,15 +126,7 @@ public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakT
// 1. it is a removal operation (no value to test the predicate).
// 2. remove predicate is not present.
// 3. value does not match the remove predicate.
- return !operation.hasValue() ||
- removePredicate == null ||
- !removePredicate.test(operation.getValue());
- }
-
- private boolean shouldRemoveEntry(Map.Entry> entry) {
- // invoked by stream, so removePredicate is not null
- assert removePredicate != null;
- return removePredicate.test(entry.getValue().getValue());
+ return !operation.hasValue() || !conditionalRemover.willRemove(operation.getCacheKey(), operation.getValue());
}
private CompletionStage> commitOperation(Operation operation) {
@@ -195,6 +164,8 @@ public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakT
default V getValue() {
return null;
}
+
+ K getCacheKey();
}
private record PutOperation(K key, V value, long lifespan, TimeUnit timeUnit) implements Operation {
@@ -225,7 +196,10 @@ public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakT
return value;
}
-
+ @Override
+ public K getCacheKey() {
+ return key;
+ }
}
private record ReplaceOperation(K key, V value, long lifespan, TimeUnit timeUnit) implements Operation {
@@ -249,6 +223,11 @@ public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakT
public V getValue() {
return value;
}
+
+ @Override
+ public K getCacheKey() {
+ return key;
+ }
}
private record RemoveOperation(K key) implements Operation {
@@ -257,15 +236,25 @@ public class RemoteInfinispanKeycloakTransaction extends AbstractKeycloakT
public CompletionStage> execute(RemoteCache cache) {
return cache.removeAsync(key);
}
+
+ @Override
+ public K getCacheKey() {
+ return key;
+ }
}
- private static final Operation,?> TOMBSTONE = new Operation<>() {
+ private static final Operation, ?> TOMBSTONE = new Operation<>() {
@Override
public boolean canRemove() {
return true;
}
+ @Override
+ public Object getCacheKey() {
+ return null;
+ }
+
@Override
public CompletionStage> execute(RemoteCache