diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java index e04d4e8208..e3f4c8af31 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/Updater.java @@ -31,6 +31,8 @@ import org.keycloak.models.sessions.infinispan.remote.transaction.RemoteChangeLo */ public interface Updater extends BiFunction { + int NO_VERSION = -1; + /** * @return The Infinispan cache key. */ @@ -81,4 +83,8 @@ public interface Updater extends BiFunction { * @return The {@link Expiration} data. */ Expiration computeExpiration(); + + default boolean hasVersion() { + return getVersionRead() != NO_VERSION; + } } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java index 720d728687..fbed0174ba 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/changes/remote/updater/client/AuthenticatedClientSessionUpdater.java @@ -270,7 +270,7 @@ public class AuthenticatedClientSessionUpdater extends BaseUpdater fetchClientSessions(RemoteCache cache, String userSessionId) { - return cache.query(FROM_USER_SESSION) + public static Query fetchClientSessions(RemoteCache cache, String userSessionId) { + return cache.query(FROM_USER_SESSION) .setParameter("userSessionId", userSessionId); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/query/UserSessionQueries.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/query/UserSessionQueries.java index 2b64b5fcec..17c0b9db4a 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/query/UserSessionQueries.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/query/UserSessionQueries.java @@ -32,7 +32,7 @@ public final class UserSessionQueries { public static final String USER_SESSION = Marshalling.protoEntity(RemoteUserSessionEntity.class); - private static final String BASE_QUERY = "SELECT e, version(e) FROM %s as e ".formatted(USER_SESSION); + private static final String BASE_QUERY = "FROM %s as e ".formatted(USER_SESSION); private static final String BY_BROKER_SESSION_ID = BASE_QUERY + "WHERE e.realmId = :realmId && e.brokerSessionId = :brokerSessionId ORDER BY e.userSessionId"; private static final String BY_USER_ID = BASE_QUERY + "WHERE e.realmId = :realmId && e.userId = :userId ORDER BY e.userSessionId"; private static final String BY_BROKER_USER_ID = BASE_QUERY + "WHERE e.realmId = :realmId && e.brokerUserId = :brokerUserId ORDER BY e.userSessionId"; @@ -41,8 +41,8 @@ public final class UserSessionQueries { * Returns a projection with the user session, and the version of all user sessions belonging to the broker session * ID. */ - public static Query searchByBrokerSessionId(RemoteCache cache, String realmId, String brokerSessionId) { - return cache.query(BY_BROKER_SESSION_ID) + public static Query searchByBrokerSessionId(RemoteCache cache, String realmId, String brokerSessionId) { + return cache.query(BY_BROKER_SESSION_ID) .setParameter("realmId", realmId) .setParameter("brokerSessionId", brokerSessionId); } @@ -50,8 +50,8 @@ public final class UserSessionQueries { /** * Returns a projection with the user session, and the version of all user sessions belonging to the user ID. */ - public static Query searchByUserId(RemoteCache cache, String realmId, String userId) { - return cache.query(BY_USER_ID) + public static Query searchByUserId(RemoteCache cache, String realmId, String userId) { + return cache.query(BY_USER_ID) .setParameter("realmId", realmId) .setParameter("userId", userId); } @@ -60,8 +60,8 @@ public final class UserSessionQueries { * Returns a projection with the user session, and the version of all user sessions belonging to the broker user * ID. */ - public static Query searchByBrokerUserId(RemoteCache cache, String realmId, String brokerUserId) { - return cache.query(BY_BROKER_USER_ID) + public static Query searchByBrokerUserId(RemoteCache cache, String realmId, String brokerUserId) { + return cache.query(BY_BROKER_USER_ID) .setParameter("realmId", realmId) .setParameter("brokerUserId", brokerUserId); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java index 75c2a42499..4ecbe801f9 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java @@ -471,7 +471,7 @@ public class RemoteUserSessionProvider implements UserSessionProvider { return updater; } - private class ClientSessionMapping extends AbstractMap implements Consumer { + private class ClientSessionMapping extends AbstractMap implements Consumer { private final UserSessionUpdater userSession; private boolean coldCache = true; @@ -532,8 +532,8 @@ public class RemoteUserSessionProvider implements UserSessionProvider { } @Override - public void accept(Object[] projections) { - getTransaction().wrapFromProjection(projections); + public void accept(RemoteAuthenticatedClientSessionEntity entity) { + getTransaction().wrapFromProjection(entity); } private ClientSessionChangeLogTransaction getTransaction() { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java index 8b30ba2777..0738a559c7 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/ClientSessionChangeLogTransaction.java @@ -21,6 +21,7 @@ import java.util.stream.Stream; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.ClientSessionQueryConditionalRemover; import org.keycloak.models.sessions.infinispan.changes.remote.updater.BaseUpdater; +import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.client.AuthenticatedClientSessionUpdater; import org.keycloak.models.sessions.infinispan.entities.ClientSessionKey; @@ -40,10 +41,8 @@ public class ClientSessionChangeLogTransaction extends RemoteChangeLogTransactio /** * Wraps a Query project results, where the first argument is the entity, and the second the version. */ - public void wrapFromProjection(Object[] projection) { - assert projection.length == 2; - RemoteAuthenticatedClientSessionEntity entity = (RemoteAuthenticatedClientSessionEntity) projection[0]; - wrap(entity.createCacheKey(), entity, (long) projection[1]); + public void wrapFromProjection(RemoteAuthenticatedClientSessionEntity entity) { + wrap(entity.createCacheKey(), entity, Updater.NO_VERSION); } /** diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java index ac38826620..a5905b1209 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/RemoteChangeLogTransaction.java @@ -112,7 +112,12 @@ public class RemoteChangeLogTransaction, R extends continue; } - stage.dependsOn(commitReplace(updater, expiration)); + if (updater.hasVersion()) { + stage.dependsOn(commitReplace(updater, expiration)); + continue; + } + + stage.dependsOn(commitCompute(updater, expiration)); } } @@ -219,10 +224,14 @@ public class RemoteChangeLogTransaction, R extends } @SuppressWarnings("unchecked") + private CompletionStage commitCompute(Updater updater, Expiration expiration) { + return executeWithRetries(this::invokeCacheCompute, (RetryOperationSuccess) TO_NULL, updater, expiration, 0); + } + private CompletionStage handleBooleanResult(boolean success, Updater updater, Expiration expiration) { return success ? CompletableFutures.completedNull() : - executeWithRetries(this::invokeCacheCompute, (RetryOperationSuccess) TO_NULL, updater, expiration, 0); + commitCompute(updater, expiration); } private CompletionStage invokeCacheRemove(Updater updater, Expiration ignored) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java index 04b9b2a456..2b6195f033 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/transaction/UserSessionChangeLogTransaction.java @@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.remote.transaction; import io.reactivex.rxjava3.core.Maybe; import org.keycloak.models.sessions.infinispan.changes.remote.remover.query.UserSessionQueryConditionalRemover; +import org.keycloak.models.sessions.infinispan.changes.remote.updater.Updater; import org.keycloak.models.sessions.infinispan.changes.remote.updater.UpdaterFactory; import org.keycloak.models.sessions.infinispan.changes.remote.updater.user.UserSessionUpdater; import org.keycloak.models.sessions.infinispan.entities.RemoteUserSessionEntity; @@ -34,10 +35,8 @@ public class UserSessionChangeLogTransaction extends RemoteChangeLogTransaction< super(factory, sharedState, new UserSessionQueryConditionalRemover()); } - public UserSessionUpdater wrapFromProjection(Object[] projection) { - assert projection.length == 2; - RemoteUserSessionEntity entity = (RemoteUserSessionEntity) projection[0]; - return wrap(entity.getUserSessionId(), entity, (long) projection[1]); + public UserSessionUpdater wrapFromProjection(RemoteUserSessionEntity entity) { + return wrap(entity.getUserSessionId(), entity, Updater.NO_VERSION); } public Maybe maybeGet(String userSessionId) { diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/InfinispanIckleQueryTest.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/InfinispanIckleQueryTest.java index bb8f107a97..c004b24666 100644 --- a/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/InfinispanIckleQueryTest.java +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/infinispan/InfinispanIckleQueryTest.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.Collectors; @@ -33,7 +32,6 @@ import java.util.stream.Stream; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.commons.api.query.Query; import org.infinispan.commons.util.concurrent.CompletionStages; -import org.infinispan.util.concurrent.WithinThreadExecutor; import org.junit.Assert; import org.junit.Assume; import org.junit.ClassRule; @@ -68,7 +66,6 @@ import org.keycloak.testsuite.model.RequireProvider; @RequireProvider(RealmProvider.class) public class InfinispanIckleQueryTest extends KeycloakModelTest { - private static final Executor EXECUTOR = new WithinThreadExecutor(); private static final List REALMS = IntStream.range(0, 2).mapToObj(value -> "realm" + value).toList(); private static final List USERS = IntStream.range(0, 2).mapToObj(value -> "user" + value).toList(); private static final List BROKER_SESSIONS = IntStream.range(0, 2).mapToObj(value -> "brokerSession" + value).toList(); @@ -418,22 +415,21 @@ public class InfinispanIckleQueryTest extends KeycloakModelTest { var query = UserSessionQueries.searchByBrokerSessionId(cache, realm, brokerSession); var expectedResults = expectUserSessionId(realm, USERS, List.of(brokerSession), BROKER_USERS); - assertQuery(query, objects -> ((RemoteUserSessionEntity) objects[0]).getUserSessionId(), expectedResults); + assertQuery(query, RemoteUserSessionEntity::getUserSessionId, expectedResults); query = UserSessionQueries.searchByUserId(cache, realm, user); expectedResults = expectUserSessionId(realm, List.of(user), BROKER_SESSIONS, BROKER_USERS); - assertQuery(query, objects -> ((RemoteUserSessionEntity) objects[0]).getUserSessionId(), expectedResults); + assertQuery(query, RemoteUserSessionEntity::getUserSessionId, expectedResults); query = UserSessionQueries.searchByBrokerUserId(cache, realm, brokerUser); expectedResults = expectUserSessionId(realm, USERS, BROKER_SESSIONS, List.of(brokerUser)); - assertQuery(query, objects -> ((RemoteUserSessionEntity) objects[0]).getUserSessionId(), expectedResults); + assertQuery(query, RemoteUserSessionEntity::getUserSessionId, expectedResults); } @Test public void testClientSessionQueries() { RemoteCache cache = assumeAndReturnCache(InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME); - for (var realmId : REALMS) { for (var clientId : CLIENTS) { for (var userSessionId : USER_SESSIONS) { @@ -458,11 +454,11 @@ public class InfinispanIckleQueryTest extends KeycloakModelTest { expectedResults = USER_SESSIONS.stream().map(s -> s + "-" + realm).collect(Collectors.toSet()); assertQuery(query, objects -> String.valueOf(objects[0]), expectedResults); - query = ClientSessionQueries.fetchClientSessions(cache, userSession); + var query2 = ClientSessionQueries.fetchClientSessions(cache, userSession); expectedResults = CLIENTS.stream().map(s -> new ClientSessionKey(userSession, s)).map(Objects::toString).collect(Collectors.toSet()); - assertQuery(query, objects -> ((RemoteAuthenticatedClientSessionEntity) objects[0]).createCacheKey().toString(), expectedResults); + assertQuery(query2, objects -> objects.createCacheKey().toString(), expectedResults); - // each client has user-session * realms number of active client sessions + // each client has user-session * realms active client sessions query = ClientSessionQueries.activeClientCount(cache); expectedResults = CLIENTS.stream().map(s -> String.format("%s-%s", s, USER_SESSIONS.size() * REALMS.size())).collect(Collectors.toSet()); assertQuery(query, objects -> String.format("%s-%s", objects[0], objects[1]), expectedResults);