ConditionalRemover interface for External Infinispan feature

Add a ConditionalRemover interface to remove entries from a RemoteCache
based on the key or value fields.
The default implementation provided by this PR uses streaming/iteration
to test and remove entries

On a side change, moved all the transactions to the same package and
created one transaction class per entity/cache to simplify code and
avoid writing "RemoteChangeLogTransaction" with a long list of types.

Fixes #31046

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-07-05 11:22:40 +01:00 committed by Alexander Schwartz
parent 814e958e11
commit e62604b1ec
25 changed files with 637 additions and 237 deletions

View file

@ -1,76 +0,0 @@
package org.keycloak.models.sessions.infinispan.changes.remote;
import java.util.Objects;
import java.util.UUID;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* A {@link KeycloakTransaction} implementation that wraps all the user and client session transactions.
* <p>
* This implementation commits all modifications asynchronously and concurrently in both user and client sessions
* transactions. Waits for all them to complete. This is an optimization to reduce the response time.
*/
public class UserSessionTransaction extends AbstractKeycloakTransaction {
private final RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> userSessions;
private final RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> offlineUserSessions;
private final RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> clientSessions;
private final RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> offlineClientSessions;
public UserSessionTransaction(RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> userSessions, RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> offlineUserSessions, RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> clientSessions, RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> offlineClientSessions) {
this.userSessions = Objects.requireNonNull(userSessions);
this.offlineUserSessions = Objects.requireNonNull(offlineUserSessions);
this.clientSessions = Objects.requireNonNull(clientSessions);
this.offlineClientSessions = Objects.requireNonNull(offlineClientSessions);
}
@Override
public void begin() {
super.begin();
userSessions.begin();
offlineUserSessions.begin();
clientSessions.begin();
offlineClientSessions.begin();
}
@Override
protected void commitImpl() {
var stage = CompletionStages.aggregateCompletionStage();
userSessions.commitAsync(stage);
offlineUserSessions.commitAsync(stage);
clientSessions.commitAsync(stage);
offlineClientSessions.commitAsync(stage);
CompletionStages.join(stage.freeze());
}
@Override
protected void rollbackImpl() {
userSessions.rollback();
offlineUserSessions.rollback();
clientSessions.rollback();
offlineClientSessions.rollback();
}
public RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> getClientSessions() {
return clientSessions;
}
public RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> getOfflineClientSessions() {
return offlineClientSessions;
}
public RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> getOfflineUserSessions() {
return offlineUserSessions;
}
public RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> getUserSessions() {
return userSessions;
}
}

View file

@ -0,0 +1,65 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.remote.remover;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;
/**
* It handles conditional remove operations.
* <p>
* This class is preferred to remove an unknown amount of entries by its key and/or value state. The implement may use
* queries (delete statements) or perform a full cache scan to find the entries to remove.
* <p>
* The method {@link #willRemove(Updater)} is invoked by {@link RemoteChangeLogTransaction} before perform any change
* tracked by the {@link Updater}. This is an optimization to prevent sending changes that would be removed by this
* {@link ConditionalRemover}.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public interface ConditionalRemover<K, V> {
/**
* @param key The entry's key to test.
* @param value The entry's value to test.
* @return {@code true} if the entry will be removed from the {@link RemoteCache}.
*/
boolean willRemove(K key, V value);
/**
* @param updater The {@link Updater} to test.
* @return {@code true} if the entry tracked by the {@link Updater} will be removed from the {@link RemoteCache}.
*/
default boolean willRemove(Updater<K, V> updater) {
// The value can be null if the entry updated is marked as deleted.
// In that case, we don't have the value to check for the condition and will let the transaction perform the removal.
return updater.getValue() != null && willRemove(updater.getKey(), updater.getValue());
}
/**
* Executes the conditional removes in the {@link RemoteCache}.
*
* @param cache The {@link RemoteCache} to perform the remove operations.
* @param stage The {@link AggregateCompletionStage} to add any incomplete tasks.
*/
void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage);
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.remote.remover;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
/**
* A {@link ConditionalRemover} that does not remove anything.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public class EmptyConditionalRemover<K, V> implements ConditionalRemover<K, V> {
private static final EmptyConditionalRemover<?, ?> INSTANCE = new EmptyConditionalRemover<>();
@SuppressWarnings("unchecked")
public static <K1, V1> ConditionalRemover<K1, V1> instance() {
return (ConditionalRemover<K1, V1>) INSTANCE;
}
@Override
public boolean willRemove(K key, V value) {
return false;
}
@Override
public void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage) {
//no-op
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;
import java.util.ArrayList;
import java.util.List;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
/**
* A {@link ConditionalRemover} implementation to remove {@link SessionEntity} from a {@link RemoteCache} based on
* {@link SessionEntity#getRealmId()} value.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
public class ByRealmIdConditionalRemover<K, V extends SessionEntity> extends IterationBasedConditionalRemover<K, V> {
private final List<String> realms;
public ByRealmIdConditionalRemover() {
realms = new ArrayList<>();
}
public void removeByRealmId(String realmId) {
realms.add(realmId);
}
@Override
boolean isEmpty() {
return realms.isEmpty();
}
@Override
public boolean willRemove(K key, V value) {
return realms.contains(value.getRealmId());
}
}

View file

@ -0,0 +1,65 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration;
import java.util.Map;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Predicate;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
/**
* An iteration based implementation of {@link ConditionalRemover}.
* <p>
* This class is not performance efficient since it has to download the full {@link RemoteCache} content to perform the
* removal tests.
*
* @param <K> The key's type stored in the {@link RemoteCache}.
* @param <V> The value's type stored in the {@link RemoteCache}.
*/
abstract class IterationBasedConditionalRemover<K, V> implements ConditionalRemover<K, V>, Predicate<Map.Entry<K, MetadataValue<V>>> {
@Override
public final void executeRemovals(RemoteCache<K, V> cache, AggregateCompletionStage<Void> stage) {
if (isEmpty()) {
return;
}
var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
.filter(this)
.map(Map.Entry::getKey)
.flatMapCompletable(key -> Completable.fromCompletionStage(cache.removeAsync(key)))
.toCompletionStage(null);
stage.dependsOn(rmStage);
}
@Override
public final boolean test(Map.Entry<K, MetadataValue<V>> entry) throws Throwable {
return willRemove(entry.getKey(), entry.getValue().getValue());
}
/**
* @return {@code true} if this implementation won't remove anything. It avoids iterating over the
* {@link RemoteCache} contents.
*/
abstract boolean isEmpty();
}

View file

@ -16,10 +16,11 @@
*/ */
package org.keycloak.models.sessions.infinispan.changes.remote.updater; package org.keycloak.models.sessions.infinispan.changes.remote.updater;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLogTransaction;
/** /**
* An interface used by {@link RemoteChangeLogTransaction}. * An interface used by {@link RemoteChangeLogTransaction}.
* <p> * <p>

View file

@ -31,13 +31,13 @@ import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel; import org.keycloak.models.ClientModel;
import org.keycloak.models.RealmModel; import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel; import org.keycloak.models.UserSessionModel;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration; import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.helper.MapUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.helper.MapUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.util.SessionTimeouts; import org.keycloak.models.sessions.infinispan.util.SessionTimeouts;
/** /**
@ -53,7 +53,7 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater<UUID, Authent
private final boolean offline; private final boolean offline;
private UserSessionModel userSession; private UserSessionModel userSession;
private ClientModel client; private ClientModel client;
private RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> clientTransaction; private ClientSessionChangeLogTransaction clientTransaction;
private AuthenticatedClientSessionUpdater(UUID cacheKey, AuthenticatedClientSessionEntity cacheValue, long version, boolean offline, UpdaterState initialState) { private AuthenticatedClientSessionUpdater(UUID cacheKey, AuthenticatedClientSessionEntity cacheValue, long version, boolean offline, UpdaterState initialState) {
super(cacheKey, cacheValue, version, initialState); super(cacheKey, cacheValue, version, initialState);
@ -200,10 +200,10 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater<UUID, Authent
* *
* @param userSession The {@link UserSessionModel} associated with this client session. * @param userSession The {@link UserSessionModel} associated with this client session.
* @param client The {@link ClientModel} associated with this client session. * @param client The {@link ClientModel} associated with this client session.
* @param clientTransaction The {@link RemoteChangeLogTransaction} to perform the changes in this class into the * @param clientTransaction The {@link ClientSessionChangeLogTransaction} to perform the changes in this class into the
* {@link RemoteCache}. * {@link RemoteCache}.
*/ */
public synchronized void initialize(UserSessionModel userSession, ClientModel client, RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> clientTransaction) { public synchronized void initialize(UserSessionModel userSession, ClientModel client, ClientSessionChangeLogTransaction clientTransaction) {
this.userSession = Objects.requireNonNull(userSession); this.userSession = Objects.requireNonNull(userSession);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.clientTransaction = Objects.requireNonNull(clientTransaction); this.clientTransaction = Objects.requireNonNull(clientTransaction);

View file

@ -20,7 +20,6 @@ package org.keycloak.models.sessions.infinispan.remote;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.keycloak.cluster.ClusterProvider; import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time; import org.keycloak.common.util.Time;
@ -32,6 +31,7 @@ import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionPr
import org.keycloak.models.sessions.infinispan.RootAuthenticationSessionAdapter; import org.keycloak.models.sessions.infinispan.RootAuthenticationSessionAdapter;
import org.keycloak.models.sessions.infinispan.SessionEntityUpdater; import org.keycloak.models.sessions.infinispan.SessionEntityUpdater;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity; import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.AuthenticationSessionTransaction;
import org.keycloak.models.utils.KeycloakModelUtils; import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.SessionExpiration; import org.keycloak.models.utils.SessionExpiration;
import org.keycloak.sessions.AuthenticationSessionCompoundId; import org.keycloak.sessions.AuthenticationSessionCompoundId;
@ -41,14 +41,13 @@ import org.keycloak.sessions.RootAuthenticationSessionModel;
public class RemoteInfinispanAuthenticationSessionProvider implements AuthenticationSessionProvider { public class RemoteInfinispanAuthenticationSessionProvider implements AuthenticationSessionProvider {
private final KeycloakSession session; private final KeycloakSession session;
private final RemoteInfinispanKeycloakTransaction<String, RootAuthenticationSessionEntity> transaction; private final AuthenticationSessionTransaction transaction;
private final int authSessionsLimit; private final int authSessionsLimit;
public RemoteInfinispanAuthenticationSessionProvider(KeycloakSession session, RemoteInfinispanAuthenticationSessionProviderFactory factory) { public RemoteInfinispanAuthenticationSessionProvider(KeycloakSession session, int authSessionsLimit, AuthenticationSessionTransaction transaction) {
this.session = Objects.requireNonNull(session); this.session = Objects.requireNonNull(session);
authSessionsLimit = Objects.requireNonNull(factory).getAuthSessionsLimit(); this.authSessionsLimit = authSessionsLimit;
transaction = new RemoteInfinispanKeycloakTransaction<>(factory.getCache()); this.transaction = Objects.requireNonNull(transaction);
session.getTransactionManager().enlistAfterCompletion(transaction);
} }
@Override @Override
@ -95,8 +94,7 @@ public class RemoteInfinispanAuthenticationSessionProvider implements Authentica
@Override @Override
public void onRealmRemoved(RealmModel realm) { public void onRealmRemoved(RealmModel realm) {
// TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ... transaction.removeByRealmId(realm.getId());
transaction.removeIf(new RealmFilter(realm.getId()));
} }
@Override @Override
@ -123,7 +121,7 @@ public class RemoteInfinispanAuthenticationSessionProvider implements Authentica
} }
private record RootAuthenticationSessionUpdater(RealmModel realm, RootAuthenticationSessionEntity entity, private record RootAuthenticationSessionUpdater(RealmModel realm, RootAuthenticationSessionEntity entity,
RemoteInfinispanKeycloakTransaction<String, RootAuthenticationSessionEntity> transaction AuthenticationSessionTransaction transaction
) implements SessionEntityUpdater<RootAuthenticationSessionEntity> { ) implements SessionEntityUpdater<RootAuthenticationSessionEntity> {
@Override @Override
@ -142,12 +140,4 @@ public class RemoteInfinispanAuthenticationSessionProvider implements Authentica
transaction.remove(entity.getId()); transaction.remove(entity.getId());
} }
} }
private record RealmFilter(String realmId) implements Predicate<RootAuthenticationSessionEntity> {
@Override
public boolean test(RootAuthenticationSessionEntity entity) {
return Objects.equals(realmId, entity.getRealmId());
}
}
} }

View file

@ -28,6 +28,7 @@ import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory; import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory; import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity; import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.AuthenticationSessionTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty; import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder; import org.keycloak.provider.ProviderConfigurationBuilder;
@ -51,7 +52,7 @@ public class RemoteInfinispanAuthenticationSessionProviderFactory implements Aut
@Override @Override
public RemoteInfinispanAuthenticationSessionProvider create(KeycloakSession session) { public RemoteInfinispanAuthenticationSessionProvider create(KeycloakSession session) {
return new RemoteInfinispanAuthenticationSessionProvider(session, this); return new RemoteInfinispanAuthenticationSessionProvider(session, authSessionsLimit, createAndEnlistTransaction(session));
} }
@Override @Override
@ -92,11 +93,9 @@ public class RemoteInfinispanAuthenticationSessionProviderFactory implements Aut
return InfinispanUtils.PROVIDER_ORDER; return InfinispanUtils.PROVIDER_ORDER;
} }
public int getAuthSessionsLimit() { private AuthenticationSessionTransaction createAndEnlistTransaction(KeycloakSession session) {
return authSessionsLimit; var tx = new AuthenticationSessionTransaction(cache);
} session.getTransactionManager().enlist(tx);
return tx;
public RemoteCache<String, RootAuthenticationSessionEntity> getCache() {
return cache;
} }
} }

View file

@ -17,27 +17,27 @@
package org.keycloak.models.sessions.infinispan.remote; package org.keycloak.models.sessions.infinispan.remote;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.Flag; import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException; import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.SingleUseObjectProvider; import org.keycloak.models.SingleUseObjectProvider;
import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity; import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.SingleUseObjectTransaction;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RemoteInfinispanSingleUseObjectProvider implements SingleUseObjectProvider { public class RemoteInfinispanSingleUseObjectProvider implements SingleUseObjectProvider {
private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass()); private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final RemoteInfinispanKeycloakTransaction<String, SingleUseObjectValueEntity> transaction; private final SingleUseObjectTransaction transaction;
public RemoteInfinispanSingleUseObjectProvider(KeycloakSession session, RemoteCache<String, SingleUseObjectValueEntity> cache) { public RemoteInfinispanSingleUseObjectProvider(SingleUseObjectTransaction transaction) {
transaction = new RemoteInfinispanKeycloakTransaction<>(cache); this.transaction = Objects.requireNonNull(transaction);
session.getTransactionManager().enlistAfterCompletion(transaction);
} }
@Override @Override

View file

@ -27,6 +27,7 @@ import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory; import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.SingleUseObjectProviderFactory; import org.keycloak.models.SingleUseObjectProviderFactory;
import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity; import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.SingleUseObjectTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.EnvironmentDependentProviderFactory;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.ACTION_TOKEN_CACHE;
@ -41,7 +42,7 @@ public class RemoteInfinispanSingleUseObjectProviderFactory implements SingleUse
@Override @Override
public RemoteInfinispanSingleUseObjectProvider create(KeycloakSession session) { public RemoteInfinispanSingleUseObjectProvider create(KeycloakSession session) {
assert cache != null; assert cache != null;
return new RemoteInfinispanSingleUseObjectProvider(session, cache); return new RemoteInfinispanSingleUseObjectProvider(createAndEnlistTransaction(session));
} }
@Override @Override
@ -74,4 +75,10 @@ public class RemoteInfinispanSingleUseObjectProviderFactory implements SingleUse
public boolean isSupported(Config.Scope config) { public boolean isSupported(Config.Scope config) {
return InfinispanUtils.isRemoteInfinispan(); return InfinispanUtils.isRemoteInfinispan();
} }
private SingleUseObjectTransaction createAndEnlistTransaction(KeycloakSession session) {
var tx = new SingleUseObjectTransaction(cache);
session.getTransactionManager().enlistAfterCompletion(tx);
return tx;
}
} }

View file

@ -23,10 +23,9 @@ import org.jboss.logging.Logger;
import org.keycloak.models.RealmModel; import org.keycloak.models.RealmModel;
import org.keycloak.models.UserLoginFailureModel; import org.keycloak.models.UserLoginFailureModel;
import org.keycloak.models.UserLoginFailureProvider; import org.keycloak.models.UserLoginFailureProvider;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity; import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction;
import static org.keycloak.common.util.StackUtil.getShortStackTrace; import static org.keycloak.common.util.StackUtil.getShortStackTrace;
@ -35,13 +34,12 @@ public class RemoteUserLoginFailureProvider implements UserLoginFailureProvider
private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final RemoteChangeLogTransaction<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater> transaction; private final LoginFailureChangeLogTransaction transaction;
public RemoteUserLoginFailureProvider(RemoteChangeLogTransaction<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater> transaction) { public RemoteUserLoginFailureProvider(LoginFailureChangeLogTransaction transaction) {
this.transaction = Objects.requireNonNull(transaction); this.transaction = Objects.requireNonNull(transaction);
} }
@Override @Override
public UserLoginFailureModel getUserLoginFailure(RealmModel realm, String userId) { public UserLoginFailureModel getUserLoginFailure(RealmModel realm, String userId) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
@ -75,7 +73,7 @@ public class RemoteUserLoginFailureProvider implements UserLoginFailureProvider
log.tracef("removeAllUserLoginFailures(%s)%s", realm, getShortStackTrace()); log.tracef("removeAllUserLoginFailures(%s)%s", realm, getShortStackTrace());
} }
transaction.removeIf(entity -> Objects.equals(entity.getRealmId(), realm.getId())); transaction.removeByRealmId(realm.getId());
} }
@Override @Override

View file

@ -16,6 +16,8 @@
*/ */
package org.keycloak.models.sessions.infinispan.remote; package org.keycloak.models.sessions.infinispan.remote;
import java.lang.invoke.MethodHandles;
import org.infinispan.client.hotrod.MetadataValue; import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
@ -26,15 +28,13 @@ import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.UserLoginFailureProvider; import org.keycloak.models.UserLoginFailureProvider;
import org.keycloak.models.UserLoginFailureProviderFactory; import org.keycloak.models.UserLoginFailureProviderFactory;
import org.keycloak.models.UserModel; import org.keycloak.models.UserModel;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity; import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.remote.transaction.LoginFailureChangeLogTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.EnvironmentDependentProviderFactory;
import java.lang.invoke.MethodHandles;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.getRemoteCache; import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.getRemoteCache;
@ -46,9 +46,7 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
@Override @Override
public RemoteUserLoginFailureProvider create(KeycloakSession session) { public RemoteUserLoginFailureProvider create(KeycloakSession session) {
var tx = new RemoteChangeLogTransaction<>(this, cache); return new RemoteUserLoginFailureProvider(createAndEnlistTransaction(session));
session.getTransactionManager().enlistAfterCompletion(tx);
return new RemoteUserLoginFailureProvider(tx);
} }
@Override @Override
@ -102,4 +100,10 @@ public class RemoteUserLoginFailureProviderFactory implements UserLoginFailurePr
public LoginFailuresUpdater deleted(LoginFailureKey key) { public LoginFailuresUpdater deleted(LoginFailureKey key) {
return LoginFailuresUpdater.delete(key); return LoginFailuresUpdater.delete(key);
} }
private LoginFailureChangeLogTransaction createAndEnlistTransaction(KeycloakSession session) {
var tx = new LoginFailureChangeLogTransaction(this, cache);
session.getTransactionManager().enlistAfterCompletion(tx);
return tx;
}
} }

View file

@ -52,8 +52,6 @@ import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider; import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.light.LightweightUserAdapter; import org.keycloak.models.light.LightweightUserAdapter;
import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.UserSessionTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.ClientSessionMappingAdapter; import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.ClientSessionMappingAdapter;
@ -61,8 +59,10 @@ import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.Clien
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionStore;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction;
import org.keycloak.models.utils.KeycloakModelUtils; import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.utils.StreamsUtil; import org.keycloak.utils.StreamsUtil;
@ -203,22 +203,14 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
//rely on Infinispan expiration //rely on Infinispan expiration
} }
@SuppressWarnings("unchecked")
@Override @Override
public void removeUserSessions(RealmModel realm) { public void removeUserSessions(RealmModel realm) {
Predicate<? extends SessionEntity> predicate = e -> Objects.equals(e.getRealmId(), realm.getId()); transaction.removeOnlineSessionsByRealmId(realm.getId());
transaction.getUserSessions().removeIf((Predicate<UserSessionEntity>) predicate);
transaction.getClientSessions().removeIf((Predicate<AuthenticatedClientSessionEntity>) predicate);
} }
@SuppressWarnings("unchecked")
@Override @Override
public void onRealmRemoved(RealmModel realm) { public void onRealmRemoved(RealmModel realm) {
Predicate<? extends SessionEntity> predicate = e -> Objects.equals(e.getRealmId(), realm.getId()); transaction.removeAllSessionsByRealmId(realm.getId());
transaction.getUserSessions().removeIf((Predicate<UserSessionEntity>) predicate);
transaction.getOfflineUserSessions().removeIf((Predicate<UserSessionEntity>) predicate);
transaction.getClientSessions().removeIf((Predicate<AuthenticatedClientSessionEntity>) predicate);
transaction.getOfflineClientSessions().removeIf((Predicate<AuthenticatedClientSessionEntity>) predicate);
var database = session.getProvider(UserSessionPersisterProvider.class); var database = session.getProvider(UserSessionPersisterProvider.class);
if (database != null) { if (database != null) {
database.onRealmRemoved(realm); database.onRealmRemoved(realm);
@ -297,11 +289,6 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
return session; return session;
} }
@Override
public void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline) {
//no-op
}
@Override @Override
public void close() { public void close() {
@ -407,11 +394,11 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
.blockingStream(batchSize); .blockingStream(batchSize);
} }
private RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> getUserSessionTransaction(boolean offline) { private UseSessionChangeLogTransaction getUserSessionTransaction(boolean offline) {
return offline ? transaction.getOfflineUserSessions() : transaction.getUserSessions(); return offline ? transaction.getOfflineUserSessions() : transaction.getUserSessions();
} }
private RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> getClientSessionTransaction(boolean offline) { private ClientSessionChangeLogTransaction getClientSessionTransaction(boolean offline) {
return offline ? transaction.getOfflineClientSessions() : transaction.getClientSessions(); return offline ? transaction.getOfflineClientSessions() : transaction.getClientSessions();
} }
@ -516,10 +503,10 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
private class RemoteClientSessionAdapterProvider implements ClientSessionProvider, UserSessionUpdater.ClientSessionAdapterFactory { private class RemoteClientSessionAdapterProvider implements ClientSessionProvider, UserSessionUpdater.ClientSessionAdapterFactory {
private final RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> transaction; private final ClientSessionChangeLogTransaction transaction;
private final UserSessionUpdater userSession; private final UserSessionUpdater userSession;
private RemoteClientSessionAdapterProvider(RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> transaction, UserSessionUpdater userSession) { private RemoteClientSessionAdapterProvider(ClientSessionChangeLogTransaction transaction, UserSessionUpdater userSession) {
this.transaction = transaction; this.transaction = transaction;
this.userSession = userSession; this.userSession = userSession;
} }

View file

@ -14,12 +14,13 @@ import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionProvider; import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UserSessionProviderFactory; import org.keycloak.models.UserSessionProviderFactory;
import org.keycloak.models.session.UserSessionPersisterProvider; import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.changes.remote.RemoteChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.UserSessionTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.remote.transaction.ClientSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UseSessionChangeLogTransaction;
import org.keycloak.models.sessions.infinispan.remote.transaction.UserSessionTransaction;
import org.keycloak.provider.EnvironmentDependentProviderFactory; import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty; import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder; import org.keycloak.provider.ProviderConfigurationBuilder;
@ -117,12 +118,12 @@ public class RemoteUserSessionProviderFactory implements UserSessionProviderFact
); );
} }
private RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater> createUserSessionTransaction(boolean offline) { private UseSessionChangeLogTransaction createUserSessionTransaction(boolean offline) {
return new RemoteChangeLogTransaction<>(UserSessionUpdater.factory(offline), remoteCacheHolder.userSessionCache(offline)); return new UseSessionChangeLogTransaction(UserSessionUpdater.factory(offline), remoteCacheHolder.userSessionCache(offline));
} }
private RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> createClientSessionTransaction(boolean offline) { private ClientSessionChangeLogTransaction createClientSessionTransaction(boolean offline) {
return new RemoteChangeLogTransaction<>(AuthenticatedClientSessionUpdater.factory(offline), remoteCacheHolder.clientSessionCache(offline)); return new ClientSessionChangeLogTransaction(AuthenticatedClientSessionUpdater.factory(offline), remoteCacheHolder.clientSessionCache(offline));
} }
private record RemoteCacheHolder( private record RemoteCacheHolder(

View file

@ -0,0 +1,38 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.remote.transaction;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
/**
* Syntactic sugar for
* {@code RemoteInfinispanKeycloakTransaction<String, RootAuthenticationSessionEntity,
* ByRealmIdConditionalRemover<String, RootAuthenticationSessionEntity>>}
*/
public class AuthenticationSessionTransaction extends RemoteInfinispanKeycloakTransaction<String, RootAuthenticationSessionEntity, ByRealmIdConditionalRemover<String, RootAuthenticationSessionEntity>> {
public AuthenticationSessionTransaction(RemoteCache<String, RootAuthenticationSessionEntity> cache) {
super(cache, new ByRealmIdConditionalRemover<>());
}
public void removeByRealmId(String realmId) {
getConditionalRemover().removeByRealmId(realmId);
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.util.UUID;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
/**
* Syntactic sugar for
* {@code RemoteChangeLogTransaction<SessionKey, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater,
* UserAndClientSessionConditionalRemover<AuthenticatedClientSessionEntity>>}
*/
public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransaction<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater, ByRealmIdConditionalRemover<UUID, AuthenticatedClientSessionEntity>> {
public ClientSessionChangeLogTransaction(UpdaterFactory<UUID, AuthenticatedClientSessionEntity, AuthenticatedClientSessionUpdater> factory, RemoteCache<UUID, AuthenticatedClientSessionEntity> cache) {
super(factory, cache, new ByRealmIdConditionalRemover<>());
}
}

View file

@ -0,0 +1,41 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.remote.transaction;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.loginfailures.LoginFailuresUpdater;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
/**
* Syntactic sugar for
* {@code RemoteChangeLogTransaction<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater,
* ByRealmIdConditionalRemover<LoginFailureKey, LoginFailureEntity>>}
*/
public class LoginFailureChangeLogTransaction extends RemoteChangeLogTransaction<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater, ByRealmIdConditionalRemover<LoginFailureKey, LoginFailureEntity>> {
public LoginFailureChangeLogTransaction(UpdaterFactory<LoginFailureKey, LoginFailureEntity, LoginFailuresUpdater> factory, RemoteCache<LoginFailureKey, LoginFailureEntity> cache) {
super(factory, cache, new ByRealmIdConditionalRemover<>());
}
public void removeByRealmId(String realmId) {
getConditionalRemover().removeByRealmId(realmId);
}
}

View file

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.keycloak.models.sessions.infinispan.changes.remote; package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -22,10 +22,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import org.infinispan.client.hotrod.Flag; import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.MetadataValue; import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
@ -34,29 +31,31 @@ import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages; import org.infinispan.commons.util.concurrent.CompletionStages;
import org.keycloak.models.AbstractKeycloakTransaction; import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakTransaction; import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration; import org.keycloak.models.sessions.infinispan.changes.remote.updater.Expiration;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
/** /**
* A {@link KeycloakTransaction} implementation that keeps track of changes made to entities stored * A {@link KeycloakTransaction} implementation that keeps track of changes made to entities stored in a Infinispan
* in a Infinispan cache. * cache.
* *
* @param <K> The type of the Infinispan cache key. * @param <K> The type of the Infinispan cache key.
* @param <V> The type of the Infinispan cache value. * @param <V> The type of the Infinispan cache value.
* @param <T> The type of the {@link Updater} implementation. * @param <T> The type of the {@link Updater} implementation.
*/ */
public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>> extends AbstractKeycloakTransaction { public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends ConditionalRemover<K, V>> extends AbstractKeycloakTransaction {
private final Map<K, T> entityChanges; private final Map<K, T> entityChanges;
private final UpdaterFactory<K, V, T> factory; private final UpdaterFactory<K, V, T> factory;
private final RemoteCache<K, V> cache; private final RemoteCache<K, V> cache;
private Predicate<V> removePredicate; private final R conditionalRemover;
public RemoteChangeLogTransaction(UpdaterFactory<K, V, T> factory, RemoteCache<K, V> cache) { RemoteChangeLogTransaction(UpdaterFactory<K, V, T> factory, RemoteCache<K, V> cache, R conditionalRemover) {
this.factory = Objects.requireNonNull(factory); this.factory = Objects.requireNonNull(factory);
this.cache = Objects.requireNonNull(cache); this.cache = Objects.requireNonNull(cache);
this.conditionalRemover = Objects.requireNonNull(conditionalRemover);
entityChanges = new ConcurrentHashMap<>(8); entityChanges = new ConcurrentHashMap<>(8);
} }
@ -66,13 +65,11 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>> extends A
doCommit(stage); doCommit(stage);
CompletionStages.join(stage.freeze()); CompletionStages.join(stage.freeze());
entityChanges.clear(); entityChanges.clear();
removePredicate = null;
} }
@Override @Override
protected void rollbackImpl() { protected void rollbackImpl() {
entityChanges.clear(); entityChanges.clear();
removePredicate = null;
} }
public void commitAsync(AggregateCompletionStage<Void> stage) { public void commitAsync(AggregateCompletionStage<Void> stage) {
@ -86,18 +83,10 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>> extends A
} }
private void doCommit(AggregateCompletionStage<Void> stage) { private void doCommit(AggregateCompletionStage<Void> stage) {
if (removePredicate != null) { conditionalRemover.executeRemovals(cache, stage);
// TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ...
var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
.filter(e -> removePredicate.test(e.getValue().getValue()))
.map(Map.Entry::getKey)
.flatMapCompletable(this::removeKey)
.toCompletionStage(null);
stage.dependsOn(rmStage);
}
for (var updater : entityChanges.values()) { for (var updater : entityChanges.values()) {
if (updater.isReadOnly() || updater.isTransient() || (removePredicate != null && removePredicate.test(updater.getValue()))) { if (updater.isReadOnly() || updater.isTransient() || conditionalRemover.willRemove(updater)) {
continue; continue;
} }
if (updater.isDeleted()) { if (updater.isDeleted()) {
@ -185,17 +174,8 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>> extends A
entityChanges.put(key, factory.deleted(key)); entityChanges.put(key, factory.deleted(key));
} }
/** R getConditionalRemover() {
* Removes all Infinispan cache values that satisfy the given predicate. return conditionalRemover;
*
* @param predicate The {@link Predicate} which returns {@code true} for elements to be removed.
*/
public void removeIf(Predicate<V> predicate) {
if (removePredicate == null) {
removePredicate = predicate;
return;
}
removePredicate = removePredicate.or(predicate);
} }
public T wrap(Map.Entry<K, MetadataValue<V>> entry) { public T wrap(Map.Entry<K, MetadataValue<V>> entry) {
@ -231,8 +211,4 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>> extends A
return cache.computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS); return cache.computeIfPresentAsync(updater.getKey(), updater, expiration.lifespan(), TimeUnit.MILLISECONDS, expiration.maxIdle(), TimeUnit.MILLISECONDS);
} }
private Completable removeKey(K key) {
return Completable.fromCompletionStage(cache.removeAsync(key));
}
} }

View file

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.keycloak.models.sessions.infinispan.remote; package org.keycloak.models.sessions.infinispan.remote.transaction;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -24,42 +24,32 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage; import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures; import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages; import org.infinispan.commons.util.concurrent.CompletionStages;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.keycloak.models.AbstractKeycloakTransaction; import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakTransaction { class RemoteInfinispanKeycloakTransaction<K, V, R extends ConditionalRemover<K, V>> extends AbstractKeycloakTransaction {
private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass()); private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final Map<K, Operation<K, V>> tasks = new LinkedHashMap<>(); private final Map<K, Operation<K, V>> tasks = new LinkedHashMap<>();
private final RemoteCache<K, V> cache; private final RemoteCache<K, V> cache;
private Predicate<V> removePredicate; private final R conditionalRemover;
public RemoteInfinispanKeycloakTransaction(RemoteCache<K, V> cache) { RemoteInfinispanKeycloakTransaction(RemoteCache<K, V> cache, R conditionalRemover) {
this.cache = Objects.requireNonNull(cache); this.cache = Objects.requireNonNull(cache);
this.conditionalRemover = Objects.requireNonNull(conditionalRemover);
} }
@Override @Override
protected void commitImpl() { protected void commitImpl() {
AggregateCompletionStage<Void> stage = CompletionStages.aggregateCompletionStage(); AggregateCompletionStage<Void> stage = CompletionStages.aggregateCompletionStage();
if (removePredicate != null) { conditionalRemover.executeRemovals(cache, stage);
// TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ...
var rmStage = Flowable.fromPublisher(cache.publishEntriesWithMetadata(null, 2048))
.filter(this::shouldRemoveEntry)
.map(Map.Entry::getKey)
.flatMapCompletable(this::removeKey)
.toCompletionStage(null);
stage.dependsOn(rmStage);
}
tasks.values().stream() tasks.values().stream()
.filter(this::shouldCommitOperation) .filter(this::shouldCommitOperation)
.map(this::commitOperation) .map(this::commitOperation)
@ -127,21 +117,8 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
return cache; return cache;
} }
/** R getConditionalRemover() {
* Removes all Infinispan cache values that satisfy the given predicate. return conditionalRemover;
*
* @param predicate The {@link Predicate} which returns {@code true} for elements to be removed.
*/
public void removeIf(Predicate<V> predicate) {
if (removePredicate == null) {
removePredicate = predicate;
return;
}
removePredicate = removePredicate.or(predicate);
}
private Completable removeKey(K key) {
return Completable.fromCompletionStage(cache.removeAsync(key));
} }
private boolean shouldCommitOperation(Operation<K, V> operation) { private boolean shouldCommitOperation(Operation<K, V> operation) {
@ -149,15 +126,7 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
// 1. it is a removal operation (no value to test the predicate). // 1. it is a removal operation (no value to test the predicate).
// 2. remove predicate is not present. // 2. remove predicate is not present.
// 3. value does not match the remove predicate. // 3. value does not match the remove predicate.
return !operation.hasValue() || return !operation.hasValue() || !conditionalRemover.willRemove(operation.getCacheKey(), operation.getValue());
removePredicate == null ||
!removePredicate.test(operation.getValue());
}
private boolean shouldRemoveEntry(Map.Entry<K, MetadataValue<V>> entry) {
// invoked by stream, so removePredicate is not null
assert removePredicate != null;
return removePredicate.test(entry.getValue().getValue());
} }
private CompletionStage<?> commitOperation(Operation<K, V> operation) { private CompletionStage<?> commitOperation(Operation<K, V> operation) {
@ -195,6 +164,8 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
default V getValue() { default V getValue() {
return null; return null;
} }
K getCacheKey();
} }
private record PutOperation<K, V>(K key, V value, long lifespan, TimeUnit timeUnit) implements Operation<K, V> { private record PutOperation<K, V>(K key, V value, long lifespan, TimeUnit timeUnit) implements Operation<K, V> {
@ -225,7 +196,10 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
return value; return value;
} }
@Override
public K getCacheKey() {
return key;
}
} }
private record ReplaceOperation<K, V>(K key, V value, long lifespan, TimeUnit timeUnit) implements Operation<K, V> { private record ReplaceOperation<K, V>(K key, V value, long lifespan, TimeUnit timeUnit) implements Operation<K, V> {
@ -249,6 +223,11 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
public V getValue() { public V getValue() {
return value; return value;
} }
@Override
public K getCacheKey() {
return key;
}
} }
private record RemoveOperation<K, V>(K key) implements Operation<K, V> { private record RemoveOperation<K, V>(K key) implements Operation<K, V> {
@ -257,15 +236,25 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
public CompletionStage<?> execute(RemoteCache<K, V> cache) { public CompletionStage<?> execute(RemoteCache<K, V> cache) {
return cache.removeAsync(key); return cache.removeAsync(key);
} }
@Override
public K getCacheKey() {
return key;
}
} }
private static final Operation<?,?> TOMBSTONE = new Operation<>() { private static final Operation<?, ?> TOMBSTONE = new Operation<>() {
@Override @Override
public boolean canRemove() { public boolean canRemove() {
return true; return true;
} }
@Override
public Object getCacheKey() {
return null;
}
@Override @Override
public CompletionStage<?> execute(RemoteCache<Object, Object> cache) { public CompletionStage<?> execute(RemoteCache<Object, Object> cache) {
return CompletableFutures.completedNull(); return CompletableFutures.completedNull();

View file

@ -0,0 +1,35 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.remote.transaction;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.ConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.EmptyConditionalRemover;
import org.keycloak.models.sessions.infinispan.entities.SingleUseObjectValueEntity;
/**
* Syntactic sugar for
* {@code RemoteInfinispanKeycloakTransaction<String, SingleUseObjectValueEntity, ConditionalRemover<String,
* SingleUseObjectValueEntity>>}
*/
public class SingleUseObjectTransaction extends RemoteInfinispanKeycloakTransaction<String, SingleUseObjectValueEntity, ConditionalRemover<String, SingleUseObjectValueEntity>> {
public SingleUseObjectTransaction(RemoteCache<String, SingleUseObjectValueEntity> cache) {
super(cache, EmptyConditionalRemover.instance());
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.remote.transaction;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.iteration.ByRealmIdConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* Syntactic sugar for
* {@code RemoteChangeLogTransaction<SessionKey, UserSessionEntity, UserSessionUpdater,
* UserAndClientSessionConditionalRemover<UserSessionEntity>>}
*/
public class UseSessionChangeLogTransaction extends RemoteChangeLogTransaction<String, UserSessionEntity, UserSessionUpdater, ByRealmIdConditionalRemover<String, UserSessionEntity>> {
public UseSessionChangeLogTransaction(UpdaterFactory<String, UserSessionEntity, UserSessionUpdater> factory, RemoteCache<String, UserSessionEntity> cache) {
super(factory, cache, new ByRealmIdConditionalRemover<>());
}
}

View file

@ -0,0 +1,99 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.remote.transaction;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakTransaction;
/**
* A {@link KeycloakTransaction} implementation that wraps all the user and client session transactions.
* <p>
* This implementation commits all modifications asynchronously and concurrently in both user and client sessions
* transactions. Waits for all them to complete. This is an optimization to reduce the response time.
*/
public class UserSessionTransaction extends AbstractKeycloakTransaction {
private final UseSessionChangeLogTransaction userSessions;
private final ClientSessionChangeLogTransaction clientSessions;
private final UseSessionChangeLogTransaction offlineUserSessions;
private final ClientSessionChangeLogTransaction offlineClientSessions;
public UserSessionTransaction(UseSessionChangeLogTransaction userSessions, UseSessionChangeLogTransaction offlineUserSessions, ClientSessionChangeLogTransaction clientSessions, ClientSessionChangeLogTransaction offlineClientSessions) {
this.userSessions = userSessions;
this.offlineUserSessions = offlineUserSessions;
this.clientSessions = clientSessions;
this.offlineClientSessions = offlineClientSessions;
}
@Override
public void begin() {
super.begin();
userSessions.begin();
clientSessions.begin();
offlineUserSessions.begin();
offlineClientSessions.begin();
}
@Override
protected void commitImpl() {
var stage = CompletionStages.aggregateCompletionStage();
userSessions.commitAsync(stage);
clientSessions.commitAsync(stage);
offlineUserSessions.commitAsync(stage);
offlineClientSessions.commitAsync(stage);
CompletionStages.join(stage.freeze());
}
@Override
protected void rollbackImpl() {
userSessions.rollback();
clientSessions.rollback();
offlineUserSessions.rollback();
offlineClientSessions.rollback();
}
public ClientSessionChangeLogTransaction getClientSessions() {
return clientSessions;
}
public UseSessionChangeLogTransaction getUserSessions() {
return userSessions;
}
public ClientSessionChangeLogTransaction getOfflineClientSessions() {
return offlineClientSessions;
}
public UseSessionChangeLogTransaction getOfflineUserSessions() {
return offlineUserSessions;
}
public void removeAllSessionsByRealmId(String realmId) {
clientSessions.getConditionalRemover().removeByRealmId(realmId);
userSessions.getConditionalRemover().removeByRealmId(realmId);
offlineClientSessions.getConditionalRemover().removeByRealmId(realmId);
offlineUserSessions.getConditionalRemover().removeByRealmId(realmId);
}
public void removeOnlineSessionsByRealmId(String realmId) {
clientSessions.getConditionalRemover().removeByRealmId(realmId);
userSessions.getConditionalRemover().removeByRealmId(realmId);
}
}

View file

@ -204,7 +204,7 @@ public interface UserSessionProvider extends Provider {
* @deprecated Deprecated as offline session preloading was removed in KC25. This method will be removed in KC27. * @deprecated Deprecated as offline session preloading was removed in KC25. This method will be removed in KC27.
*/ */
@Deprecated(forRemoval = true) @Deprecated(forRemoval = true)
void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline); default void importUserSessions(Collection<UserSessionModel> persistentUserSessions, boolean offline) {}
void close(); void close();

View file

@ -327,7 +327,8 @@ public class SessionTimeoutsTest extends KeycloakModelTest {
Assert.assertNull(getUserSession(session, realm, sessions[0], offline)); Assert.assertNull(getUserSession(session, realm, sessions[0], offline));
return null; return null;
}); });
processExpiration(offline); processExpiration(true);
processExpiration(false);
} finally { } finally {
setTimeOffset(0); setTimeOffset(0);
} }