Remove version() projection from Ickle Queries

Closes #32590

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-08-30 18:14:02 +01:00 committed by Alexander Schwartz
parent 079242c398
commit ba861fc5d7
11 changed files with 48 additions and 39 deletions

View file

@ -31,6 +31,8 @@ import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLo
*/
public interface Updater<K, V> extends BiFunction<K, V, V> {
int NO_VERSION = -1;
/**
* @return The Infinispan cache key.
*/
@ -81,4 +83,8 @@ public interface Updater<K, V> extends BiFunction<K, V, V> {
* @return The {@link Expiration} data.
*/
Expiration computeExpiration();
default boolean hasVersion() {
return getVersionRead() != NO_VERSION;
}
}

View file

@ -270,7 +270,7 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater<ClientSession
@Override
public AuthenticatedClientSessionUpdater create(ClientSessionKey key, RemoteAuthenticatedClientSessionEntity entity) {
return new AuthenticatedClientSessionUpdater(key, Objects.requireNonNull(entity), -1, offline, UpdaterState.CREATED);
return new AuthenticatedClientSessionUpdater(key, Objects.requireNonNull(entity), NO_VERSION, offline, UpdaterState.CREATED);
}
@Override
@ -280,7 +280,7 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater<ClientSession
@Override
public AuthenticatedClientSessionUpdater deleted(ClientSessionKey key) {
return new AuthenticatedClientSessionUpdater(key, null, -1, offline, UpdaterState.DELETED);
return new AuthenticatedClientSessionUpdater(key, null, NO_VERSION, offline, UpdaterState.DELETED);
}
}

View file

@ -49,7 +49,7 @@ public class LoginFailuresUpdater extends BaseUpdater<LoginFailureKey, LoginFail
}
public static LoginFailuresUpdater create(LoginFailureKey key, LoginFailureEntity entity) {
return new LoginFailuresUpdater(key, Objects.requireNonNull(entity), -1, UpdaterState.CREATED);
return new LoginFailuresUpdater(key, Objects.requireNonNull(entity), NO_VERSION, UpdaterState.CREATED);
}
public static LoginFailuresUpdater wrap(LoginFailureKey key, LoginFailureEntity value, long version) {
@ -57,7 +57,7 @@ public class LoginFailuresUpdater extends BaseUpdater<LoginFailureKey, LoginFail
}
public static LoginFailuresUpdater delete(LoginFailureKey key) {
return new LoginFailuresUpdater(key, null, -1, UpdaterState.DELETED);
return new LoginFailuresUpdater(key, null, NO_VERSION, UpdaterState.DELETED);
}
@Override

View file

@ -263,7 +263,7 @@ public class UserSessionUpdater extends BaseUpdater<String, RemoteUserSessionEnt
@Override
public UserSessionUpdater create(String key, RemoteUserSessionEntity entity) {
return new UserSessionUpdater(key, Objects.requireNonNull(entity), -1, offline, UpdaterState.CREATED);
return new UserSessionUpdater(key, Objects.requireNonNull(entity), NO_VERSION, offline, UpdaterState.CREATED);
}
@Override
@ -273,7 +273,7 @@ public class UserSessionUpdater extends BaseUpdater<String, RemoteUserSessionEnt
@Override
public UserSessionUpdater deleted(String key) {
return new UserSessionUpdater(key, null, -1, offline, UpdaterState.DELETED);
return new UserSessionUpdater(key, null, NO_VERSION, offline, UpdaterState.DELETED);
}
}
}

View file

@ -36,7 +36,7 @@ public final class ClientSessionQueries {
private static final String FETCH_USER_SESSION_ID = "SELECT e.userSessionId FROM %s as e WHERE e.realmId = :realmId && e.clientId = :clientId ORDER BY e.userSessionId".formatted(CLIENT_SESSION);
private static final String PER_CLIENT_COUNT = "SELECT e.clientId, count(e.clientId) FROM %s as e GROUP BY e.clientId ORDER BY e.clientId".formatted(CLIENT_SESSION);
private static final String CLIENT_SESSION_COUNT = "SELECT count(e) FROM %s as e WHERE e.realmId = :realmId && e.clientId = :clientId".formatted(CLIENT_SESSION);
private static final String FROM_USER_SESSION = "SELECT e, version(e) FROM %s as e WHERE e.userSessionId = :userSessionId ORDER BY e.clientId".formatted(CLIENT_SESSION);
private static final String FROM_USER_SESSION = "FROM %s as e WHERE e.userSessionId = :userSessionId ORDER BY e.clientId".formatted(CLIENT_SESSION);
/**
* Returns a projection with the user session ID for client sessions from the client {@code clientId}.
@ -67,8 +67,8 @@ public final class ClientSessionQueries {
* Returns a projection with the client session, and the version of all client sessions belonging to the user
* session ID.
*/
public static Query<Object[]> fetchClientSessions(RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> cache, String userSessionId) {
return cache.<Object[]>query(FROM_USER_SESSION)
public static Query<RemoteAuthenticatedClientSessionEntity> fetchClientSessions(RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> cache, String userSessionId) {
return cache.<RemoteAuthenticatedClientSessionEntity>query(FROM_USER_SESSION)
.setParameter("userSessionId", userSessionId);
}

View file

@ -32,7 +32,7 @@ public final class UserSessionQueries {
public static final String USER_SESSION = Marshalling.protoEntity(RemoteUserSessionEntity.class);
private static final String BASE_QUERY = "SELECT e, version(e) FROM %s as e ".formatted(USER_SESSION);
private static final String BASE_QUERY = "FROM %s as e ".formatted(USER_SESSION);
private static final String BY_BROKER_SESSION_ID = BASE_QUERY + "WHERE e.realmId = :realmId && e.brokerSessionId = :brokerSessionId ORDER BY e.userSessionId";
private static final String BY_USER_ID = BASE_QUERY + "WHERE e.realmId = :realmId && e.userId = :userId ORDER BY e.userSessionId";
private static final String BY_BROKER_USER_ID = BASE_QUERY + "WHERE e.realmId = :realmId && e.brokerUserId = :brokerUserId ORDER BY e.userSessionId";
@ -41,8 +41,8 @@ public final class UserSessionQueries {
* Returns a projection with the user session, and the version of all user sessions belonging to the broker session
* ID.
*/
public static Query<Object[]> searchByBrokerSessionId(RemoteCache<String, RemoteUserSessionEntity> cache, String realmId, String brokerSessionId) {
return cache.<Object[]>query(BY_BROKER_SESSION_ID)
public static Query<RemoteUserSessionEntity> searchByBrokerSessionId(RemoteCache<String, RemoteUserSessionEntity> cache, String realmId, String brokerSessionId) {
return cache.<RemoteUserSessionEntity>query(BY_BROKER_SESSION_ID)
.setParameter("realmId", realmId)
.setParameter("brokerSessionId", brokerSessionId);
}
@ -50,8 +50,8 @@ public final class UserSessionQueries {
/**
* Returns a projection with the user session, and the version of all user sessions belonging to the user ID.
*/
public static Query<Object[]> searchByUserId(RemoteCache<String, RemoteUserSessionEntity> cache, String realmId, String userId) {
return cache.<Object[]>query(BY_USER_ID)
public static Query<RemoteUserSessionEntity> searchByUserId(RemoteCache<String, RemoteUserSessionEntity> cache, String realmId, String userId) {
return cache.<RemoteUserSessionEntity>query(BY_USER_ID)
.setParameter("realmId", realmId)
.setParameter("userId", userId);
}
@ -60,8 +60,8 @@ public final class UserSessionQueries {
* Returns a projection with the user session, and the version of all user sessions belonging to the broker user
* ID.
*/
public static Query<Object[]> searchByBrokerUserId(RemoteCache<String, RemoteUserSessionEntity> cache, String realmId, String brokerUserId) {
return cache.<Object[]>query(BY_BROKER_USER_ID)
public static Query<RemoteUserSessionEntity> searchByBrokerUserId(RemoteCache<String, RemoteUserSessionEntity> cache, String realmId, String brokerUserId) {
return cache.<RemoteUserSessionEntity>query(BY_BROKER_USER_ID)
.setParameter("realmId", realmId)
.setParameter("brokerUserId", brokerUserId);
}

View file

@ -471,7 +471,7 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
return updater;
}
private class ClientSessionMapping extends AbstractMap<String, AuthenticatedClientSessionModel> implements Consumer<Object[]> {
private class ClientSessionMapping extends AbstractMap<String, AuthenticatedClientSessionModel> implements Consumer<RemoteAuthenticatedClientSessionEntity> {
private final UserSessionUpdater userSession;
private boolean coldCache = true;
@ -532,8 +532,8 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
}
@Override
public void accept(Object[] projections) {
getTransaction().wrapFromProjection(projections);
public void accept(RemoteAuthenticatedClientSessionEntity entity) {
getTransaction().wrapFromProjection(entity);
}
private ClientSessionChangeLogTransaction getTransaction() {

View file

@ -21,6 +21,7 @@ import java.util.stream.Stream;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ClientSessionQueryConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.ClientSessionKey;
@ -40,10 +41,8 @@ public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransactio
/**
* Wraps a Query project results, where the first argument is the entity, and the second the version.
*/
public void wrapFromProjection(Object[] projection) {
assert projection.length == 2;
RemoteAuthenticatedClientSessionEntity entity = (RemoteAuthenticatedClientSessionEntity) projection[0];
wrap(entity.createCacheKey(), entity, (long) projection[1]);
public void wrapFromProjection(RemoteAuthenticatedClientSessionEntity entity) {
wrap(entity.createCacheKey(), entity, Updater.NO_VERSION);
}
/**

View file

@ -112,7 +112,12 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
continue;
}
if (updater.hasVersion()) {
stage.dependsOn(commitReplace(updater, expiration));
continue;
}
stage.dependsOn(commitCompute(updater, expiration));
}
}
@ -219,10 +224,14 @@ public class RemoteChangeLogTransaction<K, V, T extends Updater<K, V>, R extends
}
@SuppressWarnings("unchecked")
private CompletionStage<Void> commitCompute(Updater<K, V> updater, Expiration expiration) {
return executeWithRetries(this::invokeCacheCompute, (RetryOperationSuccess<V, K, V>) TO_NULL, updater, expiration, 0);
}
private CompletionStage<Void> handleBooleanResult(boolean success, Updater<K, V> updater, Expiration expiration) {
return success ?
CompletableFutures.completedNull() :
executeWithRetries(this::invokeCacheCompute, (RetryOperationSuccess<V, K, V>) TO_NULL, updater, expiration, 0);
commitCompute(updater, expiration);
}
private CompletionStage<V> invokeCacheRemove(Updater<K, V> updater, Expiration ignored) {

View file

@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.remote.transaction;
import io.reactivex.rxjava3.core.Maybe;
import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.UserSessionQueryConditionalRemover;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory;
import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater;
import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity;
@ -34,10 +35,8 @@ public class UserSessionChangeLogTransaction extends RemoteChangeLogTransaction<
super(factory, sharedState, new UserSessionQueryConditionalRemover());
}
public UserSessionUpdater wrapFromProjection(Object[] projection) {
assert projection.length == 2;
RemoteUserSessionEntity entity = (RemoteUserSessionEntity) projection[0];
return wrap(entity.getUserSessionId(), entity, (long) projection[1]);
public UserSessionUpdater wrapFromProjection(RemoteUserSessionEntity entity) {
return wrap(entity.getUserSessionId(), entity, Updater.NO_VERSION);
}
public Maybe<UserSessionUpdater> maybeGet(String userSessionId) {

View file

@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -33,7 +32,6 @@ import java.util.stream.Stream;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.api.query.Query;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.ClassRule;
@ -68,7 +66,6 @@ import org.keycloak.testsuite.model.RequireProvider;
@RequireProvider(RealmProvider.class)
public class InfinispanIckleQueryTest extends KeycloakModelTest {
private static final Executor EXECUTOR = new WithinThreadExecutor();
private static final List<String> REALMS = IntStream.range(0, 2).mapToObj(value -> "realm" + value).toList();
private static final List<String> USERS = IntStream.range(0, 2).mapToObj(value -> "user" + value).toList();
private static final List<String> BROKER_SESSIONS = IntStream.range(0, 2).mapToObj(value -> "brokerSession" + value).toList();
@ -418,22 +415,21 @@ public class InfinispanIckleQueryTest extends KeycloakModelTest {
var query = UserSessionQueries.searchByBrokerSessionId(cache, realm, brokerSession);
var expectedResults = expectUserSessionId(realm, USERS, List.of(brokerSession), BROKER_USERS);
assertQuery(query, objects -> ((RemoteUserSessionEntity) objects[0]).getUserSessionId(), expectedResults);
assertQuery(query, RemoteUserSessionEntity::getUserSessionId, expectedResults);
query = UserSessionQueries.searchByUserId(cache, realm, user);
expectedResults = expectUserSessionId(realm, List.of(user), BROKER_SESSIONS, BROKER_USERS);
assertQuery(query, objects -> ((RemoteUserSessionEntity) objects[0]).getUserSessionId(), expectedResults);
assertQuery(query, RemoteUserSessionEntity::getUserSessionId, expectedResults);
query = UserSessionQueries.searchByBrokerUserId(cache, realm, brokerUser);
expectedResults = expectUserSessionId(realm, USERS, BROKER_SESSIONS, List.of(brokerUser));
assertQuery(query, objects -> ((RemoteUserSessionEntity) objects[0]).getUserSessionId(), expectedResults);
assertQuery(query, RemoteUserSessionEntity::getUserSessionId, expectedResults);
}
@Test
public void testClientSessionQueries() {
RemoteCache<ClientSessionKey, RemoteAuthenticatedClientSessionEntity> cache = assumeAndReturnCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME);
for (var realmId : REALMS) {
for (var clientId : CLIENTS) {
for (var userSessionId : USER_SESSIONS) {
@ -458,11 +454,11 @@ public class InfinispanIckleQueryTest extends KeycloakModelTest {
expectedResults = USER_SESSIONS.stream().map(s -> s + "-" + realm).collect(Collectors.toSet());
assertQuery(query, objects -> String.valueOf(objects[0]), expectedResults);
query = ClientSessionQueries.fetchClientSessions(cache, userSession);
var query2 = ClientSessionQueries.fetchClientSessions(cache, userSession);
expectedResults = CLIENTS.stream().map(s -> new ClientSessionKey(userSession, s)).map(Objects::toString).collect(Collectors.toSet());
assertQuery(query, objects -> ((RemoteAuthenticatedClientSessionEntity) objects[0]).createCacheKey().toString(), expectedResults);
assertQuery(query2, objects -> objects.createCacheKey().toString(), expectedResults);
// each client has user-session * realms number of active client sessions
// each client has user-session * realms active client sessions
query = ClientSessionQueries.activeClientCount(cache);
expectedResults = CLIENTS.stream().map(s -> String.format("%s-%s", s, USER_SESSIONS.size() * REALMS.size())).collect(Collectors.toSet());
assertQuery(query, objects -> String.format("%s-%s", objects[0], objects[1]), expectedResults);