From f507caae6c2a62748f085d121dabf62b96cc28ed Mon Sep 17 00:00:00 2001 From: Pedro Ruivo Date: Wed, 23 Oct 2024 00:02:41 +0100 Subject: [PATCH] Deleting a user leads to ISPN marshalling exception Fixes #34224 Signed-off-by: Pedro Ruivo --- .../marshalling/KeycloakModelSchema.java | 14 ++- .../org/keycloak/marshalling/Marshalling.java | 6 ++ .../InfinispanUserSessionProvider.java | 15 ++-- .../stream/AuthClientSessionSetMapper.java | 54 ++++++++++++ .../stream/CollectionToStreamMapper.java | 57 ++++++++++++ .../GroupAndCountCollectorSupplier.java | 57 ++++++++++++ .../stream/MapEntryToKeyMapper.java | 58 +++++++++++++ .../sessions/infinispan/stream/Mappers.java | 34 +------- .../stream/SessionUnwrapMapper.java | 55 ++++++++++++ .../session/UserSessionProviderModelTest.java | 87 +++++++++++++++++-- 10 files changed, 387 insertions(+), 50 deletions(-) create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/AuthClientSessionSetMapper.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/CollectionToStreamMapper.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/GroupAndCountCollectorSupplier.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/MapEntryToKeyMapper.java create mode 100644 model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/SessionUnwrapMapper.java diff --git a/model/infinispan/src/main/java/org/keycloak/marshalling/KeycloakModelSchema.java b/model/infinispan/src/main/java/org/keycloak/marshalling/KeycloakModelSchema.java index 4870cd2012..4a16b97823 100644 --- a/model/infinispan/src/main/java/org/keycloak/marshalling/KeycloakModelSchema.java +++ b/model/infinispan/src/main/java/org/keycloak/marshalling/KeycloakModelSchema.java @@ -94,8 +94,13 @@ import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent; import org.keycloak.models.sessions.infinispan.events.RemoveAllUserLoginFailuresEvent; import org.keycloak.models.sessions.infinispan.events.RemoveUserSessionsEvent; import org.keycloak.models.sessions.infinispan.initializer.InitializerState; +import org.keycloak.models.sessions.infinispan.stream.AuthClientSessionSetMapper; +import org.keycloak.models.sessions.infinispan.stream.CollectionToStreamMapper; +import org.keycloak.models.sessions.infinispan.stream.GroupAndCountCollectorSupplier; +import org.keycloak.models.sessions.infinispan.stream.MapEntryToKeyMapper; import org.keycloak.models.sessions.infinispan.stream.SessionPredicate; import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate; +import org.keycloak.models.sessions.infinispan.stream.SessionUnwrapMapper; import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate; import org.keycloak.sessions.CommonClientSessionModel; import org.keycloak.storage.UserStorageProviderModel; @@ -210,7 +215,14 @@ import org.keycloak.storage.managers.UserStorageSyncManager; RootAuthenticationSessionEntity.class, SingleUseObjectValueEntity.class, UserSessionEntity.class, - ReplaceFunction.class + ReplaceFunction.class, + + // sessions.infinispan.stream + AuthClientSessionSetMapper.class, + CollectionToStreamMapper.class, + GroupAndCountCollectorSupplier.class, + MapEntryToKeyMapper.class, + SessionUnwrapMapper.class, } ) public interface KeycloakModelSchema extends GeneratedSchema { diff --git a/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java b/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java index 450a657cc5..d251675792 100644 --- a/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java +++ b/model/infinispan/src/main/java/org/keycloak/marshalling/Marshalling.java @@ -154,6 +154,12 @@ public final class Marshalling { public static final int CLIENT_SESSION_KEY = 65606; public static final int REMOTE_CLIENT_SESSION_ENTITY = 65607; + public static final int AUTHENTICATION_CLIENT_SESSION_KEY_SET_MAPPER = 65608; + public static final int COLLECTION_TO_STREAM_MAPPER = 65609; + public static final int GROUP_AND_COUNT_COLLECTOR_SUPPLIER = 65610; + public static final int MAP_ENTRY_TO_KEY_FUNCTION = 65611; + public static final int SESSION_UNWRAP_MAPPER = 65612; + public static void configure(GlobalConfigurationBuilder builder) { builder.serialization() .addContextInitializer(KeycloakModelSchema.INSTANCE); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java index 60844c3dc0..41312fc97f 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProvider.java @@ -17,7 +17,6 @@ package org.keycloak.models.sessions.infinispan; -import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -25,7 +24,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -79,6 +77,8 @@ import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent; import org.keycloak.models.sessions.infinispan.events.RemoveUserSessionsEvent; import org.keycloak.models.sessions.infinispan.events.SessionEventsSenderTransaction; import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker; +import org.keycloak.models.sessions.infinispan.stream.CollectionToStreamMapper; +import org.keycloak.models.sessions.infinispan.stream.GroupAndCountCollectorSupplier; import org.keycloak.models.sessions.infinispan.stream.Mappers; import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate; import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate; @@ -388,8 +388,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi // return a stream that 'wraps' the infinispan cache stream so that the cache stream's elements are read one by one // and then mapped locally to avoid serialization issues when trying to manipulate the cache stream directly. - return StreamSupport.stream(cache.entrySet().stream().filter(predicate).spliterator(), false) - .map(Mappers.userSessionEntity()) + return StreamSupport.stream(cache.entrySet().stream().filter(predicate).map(Mappers.userSessionEntity()).spliterator(), false) .map(entity -> this.wrap(realm, entity, offline)) .filter(Objects::nonNull).map(Function.identity()); } @@ -561,12 +560,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi return cache.entrySet().stream() .filter(UserSessionPredicate.create(realm.getId())) .map(Mappers.authClientSessionSetMapper()) - .flatMap((Serializable & Function, Stream>) Mappers::toStream) - .collect( - CacheCollectors.serializableCollector( - () -> Collectors.groupingBy(Function.identity(), Collectors.counting()) - ) - ); + .flatMap(CollectionToStreamMapper.getInstance()) + .collect(CacheCollectors.collector(GroupAndCountCollectorSupplier.getInstance())); } protected long getUserSessionsCount(RealmModel realm, ClientModel client, boolean offline) { diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/AuthClientSessionSetMapper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/AuthClientSessionSetMapper.java new file mode 100644 index 0000000000..ce72d9bf24 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/AuthClientSessionSetMapper.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.stream; + +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import org.infinispan.CacheStream; +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoTypeId; +import org.keycloak.marshalling.Marshalling; +import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; +import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; + +/** + * A {@link Function} to be used by {@link CacheStream} to extract the client's ID from the client sessions associated + * to a {@link UserSessionEntity}. + *

+ * This function is marshaled with ProtoStream. + */ +@ProtoTypeId(Marshalling.AUTHENTICATION_CLIENT_SESSION_KEY_SET_MAPPER) +public class AuthClientSessionSetMapper implements Function>, Set> { + + private static final AuthClientSessionSetMapper INSTANCE = new AuthClientSessionSetMapper(); + + private AuthClientSessionSetMapper() { + } + + @ProtoFactory + public static AuthClientSessionSetMapper getInstance() { + return INSTANCE; + } + + @Override + public Set apply(Map.Entry> entry) { + return entry.getValue().getEntity().getAuthenticatedClientSessions().keySet(); + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/CollectionToStreamMapper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/CollectionToStreamMapper.java new file mode 100644 index 0000000000..263c6ac4d5 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/CollectionToStreamMapper.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.stream; + +import java.lang.invoke.SerializedLambda; +import java.util.Collection; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoTypeId; +import org.keycloak.marshalling.Marshalling; + +/** + * A {@link Function} that converts the {@link Collection} to a {@link Stream}. + *

+ * Same as {@code Collection::stream}. + *

+ * Infinispan can marshall lambdas, by using {@link SerializedLambda} but it is not as efficient and ProtoStream + * marshaller. + * + * @param The type of the collection elements. + */ +@ProtoTypeId(Marshalling.COLLECTION_TO_STREAM_MAPPER) +public class CollectionToStreamMapper implements Function, Stream> { + + private static final CollectionToStreamMapper INSTANCE = new CollectionToStreamMapper<>(); + + private CollectionToStreamMapper() { + } + + @ProtoFactory + @SuppressWarnings("unchecked") + public static CollectionToStreamMapper getInstance() { + return (CollectionToStreamMapper) INSTANCE; + } + + @Override + public Stream apply(Collection collection) { + return collection.stream(); + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/GroupAndCountCollectorSupplier.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/GroupAndCountCollectorSupplier.java new file mode 100644 index 0000000000..2131a1e0c7 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/GroupAndCountCollectorSupplier.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.stream; + +import java.lang.invoke.SerializedLambda; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +import org.infinispan.commons.util.concurrent.CompletableFutures; +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoTypeId; +import org.keycloak.marshalling.Marshalling; + +/** + * A {@link Supplier} that returns a {@link Collector} to group and count elements. + *

+ * Infinispan can marshall lambdas, by using {@link SerializedLambda} but it is not as efficient and ProtoStream + * marshaller. + * + * @param The type of the elements. + */ +@ProtoTypeId(Marshalling.GROUP_AND_COUNT_COLLECTOR_SUPPLIER) +public class GroupAndCountCollectorSupplier implements Supplier>> { + + private static final GroupAndCountCollectorSupplier INSTANCE = new GroupAndCountCollectorSupplier<>(); + + private GroupAndCountCollectorSupplier() { + } + + @ProtoFactory + @SuppressWarnings("unchecked") + public static GroupAndCountCollectorSupplier getInstance() { + return (GroupAndCountCollectorSupplier) INSTANCE; + } + + @Override + public Collector> get() { + return Collectors.groupingBy(CompletableFutures.identity(), Collectors.counting()); + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/MapEntryToKeyMapper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/MapEntryToKeyMapper.java new file mode 100644 index 0000000000..668a3c1d0e --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/MapEntryToKeyMapper.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.stream; + +import java.lang.invoke.SerializedLambda; +import java.util.Map; +import java.util.function.Function; + +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoTypeId; +import org.keycloak.marshalling.Marshalling; + +/** + * A {@link Function} to extract the key from a {@link Map.Entry}. + *

+ * Same as {@code Map.Entry::getKey}. + *

+ * Infinispan can marshall lambdas, by using {@link SerializedLambda} but it is not as efficient and ProtoStream + * marshaller. + * + * @param + * @param + */ +@ProtoTypeId(Marshalling.MAP_ENTRY_TO_KEY_FUNCTION) +public class MapEntryToKeyMapper implements Function, K> { + + private static final MapEntryToKeyMapper INSTANCE = new MapEntryToKeyMapper<>(); + + private MapEntryToKeyMapper() { + } + + @ProtoFactory + @SuppressWarnings("unchecked") + public static MapEntryToKeyMapper getInstance() { + return (MapEntryToKeyMapper) INSTANCE; + } + + + @Override + public K apply(Map.Entry entry) { + return entry.getKey(); + } +} diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/Mappers.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/Mappers.java index f86a58a41a..66723690d2 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/Mappers.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/Mappers.java @@ -22,7 +22,6 @@ import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity; import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import java.io.Serializable; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -35,45 +34,20 @@ import java.util.stream.Stream; public class Mappers { public static Function>, UserSessionEntity> userSessionEntity() { - return new UserSessionEntityMapper(); + return SessionUnwrapMapper.getInstance(); } public static Function>, LoginFailureKey> loginFailureId() { - return new LoginFailureIdMapper(); - } - - - private static class UserSessionEntityMapper implements Function>, UserSessionEntity>, Serializable { - - @Override - public UserSessionEntity apply(Map.Entry> entry) { - return entry.getValue().getEntity(); - } - - } - - private static class LoginFailureIdMapper implements Function>, LoginFailureKey> { - @Override - public LoginFailureKey apply(Map.Entry> entry) { - return entry.getKey(); - } - } - - private static class AuthClientSessionSetMapper implements Function>, Set> { - - @Override - public Set apply(Map.Entry> entry) { - UserSessionEntity entity = entry.getValue().getEntity(); - return entity.getAuthenticatedClientSessions().keySet(); - } + return MapEntryToKeyMapper.getInstance(); } + @Deprecated(since = "26.0", forRemoval = true) public static Stream toStream(Collection collection) { return collection.stream(); } public static Function>, Set> authClientSessionSetMapper() { - return new AuthClientSessionSetMapper(); + return AuthClientSessionSetMapper.getInstance(); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/SessionUnwrapMapper.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/SessionUnwrapMapper.java new file mode 100644 index 0000000000..72cce7ed75 --- /dev/null +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/stream/SessionUnwrapMapper.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.models.sessions.infinispan.stream; + +import java.util.Map; +import java.util.function.Function; + +import org.infinispan.protostream.annotations.ProtoFactory; +import org.infinispan.protostream.annotations.ProtoTypeId; +import org.keycloak.marshalling.Marshalling; +import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; +import org.keycloak.models.sessions.infinispan.entities.SessionEntity; + +/** + * A {@link Function} to unwrap the {@link SessionEntity} from the {@link SessionEntityWrapper}. + *

+ * The {@link SessionEntityWrapper} is part of the value of {@link Map.Entry}. + * + * @param The key type. + * @param The value type. + */ +@ProtoTypeId(Marshalling.SESSION_UNWRAP_MAPPER) +public class SessionUnwrapMapper implements Function>, V> { + + private static final SessionUnwrapMapper INSTANCE = new SessionUnwrapMapper<>(); + + private SessionUnwrapMapper() { + } + + @ProtoFactory + @SuppressWarnings("unchecked") + public static SessionUnwrapMapper getInstance() { + return (SessionUnwrapMapper) INSTANCE; + } + + @Override + public V apply(Map.Entry> entry) { + return entry.getValue().getEntity(); + } +} diff --git a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/UserSessionProviderModelTest.java b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/UserSessionProviderModelTest.java index d08984fd68..bf18a5e777 100644 --- a/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/UserSessionProviderModelTest.java +++ b/testsuite/model/src/test/java/org/keycloak/testsuite/model/session/UserSessionProviderModelTest.java @@ -16,11 +16,24 @@ */ package org.keycloak.testsuite.model.session; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; -import org.keycloak.common.Profile; import org.keycloak.common.util.MultiSiteUtils; +import org.keycloak.infinispan.util.InfinispanUtils; import org.keycloak.models.AuthenticatedClientSessionModel; import org.keycloak.models.ClientModel; import org.keycloak.models.Constants; @@ -39,17 +52,12 @@ import org.keycloak.testsuite.model.RequireProvider; import org.keycloak.testsuite.model.infinispan.InfinispanTestUtil; import org.keycloak.timer.TimerProvider; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.keycloak.testsuite.model.session.UserSessionPersisterProviderTest.createClients; import static org.keycloak.testsuite.model.session.UserSessionPersisterProviderTest.createSessions; @@ -280,4 +288,65 @@ public class UserSessionProviderModelTest extends KeycloakModelTest { }); }); } + + @Test + public void testStreamsMarshalling() throws InterruptedException { + Assume.assumeTrue(InfinispanUtils.isEmbeddedInfinispan()); + closeKeycloakSessionFactory(); + var clusterSize = 4; + var barrier = new CyclicBarrier(clusterSize); + + inIndependentFactories(clusterSize, 30, () -> { + // populate the cache + withRealmConsumer(realmId, (keycloakSession, realm) -> { + var user = keycloakSession.users().getUserByUsername(realm, "user1"); + var client = realm.getClientByClientId("test-app"); + assertNotNull(user); + assertNotNull(client); + var userSession = keycloakSession.sessions().createUserSession(null, realm, user, "user1", "127.0.0.1", "form", true, null, null, UserSessionModel.SessionPersistenceState.PERSISTENT); + assertNotNull(userSession); + var clientSession = keycloakSession.sessions().createClientSession(realm, client, userSession); + assertNotNull(clientSession); + }); + + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (BrokenBarrierException | TimeoutException e) { + throw new RuntimeException(e); + } + + withRealmConsumer(realmId, (keycloakSession, realm) -> { + var user = keycloakSession.users().getUserByUsername(realm, "user1"); + assertNotNull(user); + + var client = realm.getClientByClientId("test-app"); + assertNotNull(client); + + var activeClientSessionsStats = keycloakSession.sessions().getActiveClientSessionStats(realm, false); + assertNotNull(activeClientSessionsStats); + assertEquals(1, activeClientSessionsStats.size()); + assertTrue(activeClientSessionsStats.containsKey(client.getId())); + assertEquals(4L, (long) activeClientSessionsStats.get(client.getId())); + + var userSessions = keycloakSession.sessions().getUserSessionsStream(realm, user).toList(); + assertNotNull(userSessions); + assertEquals(4, userSessions.size()); + + // sync everybody here since we are going to remove everything. + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (BrokenBarrierException | TimeoutException e) { + throw new RuntimeException(e); + } + + keycloakSession.sessions().removeUserSessions(realm, user); + }); + }); + } }