Deleting a user leads to ISPN marshalling exception

Fixes #34224

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-10-23 00:02:41 +01:00 committed by Alexander Schwartz
parent 42f8647e60
commit f507caae6c
10 changed files with 387 additions and 50 deletions

View file

@ -94,8 +94,13 @@ import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent;
import org.keycloak.models.sessions.infinispan.events.RemoveAllUserLoginFailuresEvent;
import org.keycloak.models.sessions.infinispan.events.RemoveUserSessionsEvent;
import org.keycloak.models.sessions.infinispan.initializer.InitializerState;
import org.keycloak.models.sessions.infinispan.stream.AuthClientSessionSetMapper;
import org.keycloak.models.sessions.infinispan.stream.CollectionToStreamMapper;
import org.keycloak.models.sessions.infinispan.stream.GroupAndCountCollectorSupplier;
import org.keycloak.models.sessions.infinispan.stream.MapEntryToKeyMapper;
import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.SessionUnwrapMapper;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.sessions.CommonClientSessionModel;
import org.keycloak.storage.UserStorageProviderModel;
@ -210,7 +215,14 @@ import org.keycloak.storage.managers.UserStorageSyncManager;
RootAuthenticationSessionEntity.class,
SingleUseObjectValueEntity.class,
UserSessionEntity.class,
ReplaceFunction.class
ReplaceFunction.class,
// sessions.infinispan.stream
AuthClientSessionSetMapper.class,
CollectionToStreamMapper.class,
GroupAndCountCollectorSupplier.class,
MapEntryToKeyMapper.class,
SessionUnwrapMapper.class,
}
)
public interface KeycloakModelSchema extends GeneratedSchema {

View file

@ -154,6 +154,12 @@ public final class Marshalling {
public static final int CLIENT_SESSION_KEY = 65606;
public static final int REMOTE_CLIENT_SESSION_ENTITY = 65607;
public static final int AUTHENTICATION_CLIENT_SESSION_KEY_SET_MAPPER = 65608;
public static final int COLLECTION_TO_STREAM_MAPPER = 65609;
public static final int GROUP_AND_COUNT_COLLECTOR_SUPPLIER = 65610;
public static final int MAP_ENTRY_TO_KEY_FUNCTION = 65611;
public static final int SESSION_UNWRAP_MAPPER = 65612;
public static void configure(GlobalConfigurationBuilder builder) {
builder.serialization()
.addContextInitializer(KeycloakModelSchema.INSTANCE);

View file

@ -17,7 +17,6 @@
package org.keycloak.models.sessions.infinispan;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -25,7 +24,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@ -79,6 +77,8 @@ import org.keycloak.models.sessions.infinispan.events.RealmRemovedSessionEvent;
import org.keycloak.models.sessions.infinispan.events.RemoveUserSessionsEvent;
import org.keycloak.models.sessions.infinispan.events.SessionEventsSenderTransaction;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import org.keycloak.models.sessions.infinispan.stream.CollectionToStreamMapper;
import org.keycloak.models.sessions.infinispan.stream.GroupAndCountCollectorSupplier;
import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.SessionWrapperPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
@ -388,8 +388,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
// return a stream that 'wraps' the infinispan cache stream so that the cache stream's elements are read one by one
// and then mapped locally to avoid serialization issues when trying to manipulate the cache stream directly.
return StreamSupport.stream(cache.entrySet().stream().filter(predicate).spliterator(), false)
.map(Mappers.userSessionEntity())
return StreamSupport.stream(cache.entrySet().stream().filter(predicate).map(Mappers.userSessionEntity()).spliterator(), false)
.map(entity -> this.wrap(realm, entity, offline))
.filter(Objects::nonNull).map(Function.identity());
}
@ -561,12 +560,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider, Sessi
return cache.entrySet().stream()
.filter(UserSessionPredicate.create(realm.getId()))
.map(Mappers.authClientSessionSetMapper())
.flatMap((Serializable & Function<Set<String>, Stream<? extends String>>) Mappers::toStream)
.collect(
CacheCollectors.serializableCollector(
() -> Collectors.groupingBy(Function.identity(), Collectors.counting())
)
);
.flatMap(CollectionToStreamMapper.getInstance())
.collect(CacheCollectors.collector(GroupAndCountCollectorSupplier.getInstance()));
}
protected long getUserSessionsCount(RealmModel realm, ClientModel client, boolean offline) {

View file

@ -0,0 +1,54 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.infinispan.CacheStream;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* A {@link Function} to be used by {@link CacheStream} to extract the client's ID from the client sessions associated
* to a {@link UserSessionEntity}.
* <p>
* This function is marshaled with ProtoStream.
*/
@ProtoTypeId(Marshalling.AUTHENTICATION_CLIENT_SESSION_KEY_SET_MAPPER)
public class AuthClientSessionSetMapper implements Function<Map.Entry<String, SessionEntityWrapper<UserSessionEntity>>, Set<String>> {
private static final AuthClientSessionSetMapper INSTANCE = new AuthClientSessionSetMapper();
private AuthClientSessionSetMapper() {
}
@ProtoFactory
public static AuthClientSessionSetMapper getInstance() {
return INSTANCE;
}
@Override
public Set<String> apply(Map.Entry<String, SessionEntityWrapper<UserSessionEntity>> entry) {
return entry.getValue().getEntity().getAuthenticatedClientSessions().keySet();
}
}

View file

@ -0,0 +1,57 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.function.Function;
import java.util.stream.Stream;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.keycloak.marshalling.Marshalling;
/**
* A {@link Function} that converts the {@link Collection} to a {@link Stream}.
* <p>
* Same as {@code Collection::stream}.
* <p>
* Infinispan can marshall lambdas, by using {@link SerializedLambda} but it is not as efficient and ProtoStream
* marshaller.
*
* @param <T> The type of the collection elements.
*/
@ProtoTypeId(Marshalling.COLLECTION_TO_STREAM_MAPPER)
public class CollectionToStreamMapper<T> implements Function<Collection<T>, Stream<T>> {
private static final CollectionToStreamMapper<?> INSTANCE = new CollectionToStreamMapper<>();
private CollectionToStreamMapper() {
}
@ProtoFactory
@SuppressWarnings("unchecked")
public static <T1> CollectionToStreamMapper<T1> getInstance() {
return (CollectionToStreamMapper<T1>) INSTANCE;
}
@Override
public Stream<T> apply(Collection<T> collection) {
return collection.stream();
}
}

View file

@ -0,0 +1,57 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.keycloak.marshalling.Marshalling;
/**
* A {@link Supplier} that returns a {@link Collector} to group and count elements.
* <p>
* Infinispan can marshall lambdas, by using {@link SerializedLambda} but it is not as efficient and ProtoStream
* marshaller.
*
* @param <T> The type of the elements.
*/
@ProtoTypeId(Marshalling.GROUP_AND_COUNT_COLLECTOR_SUPPLIER)
public class GroupAndCountCollectorSupplier<T> implements Supplier<Collector<T, ?, Map<T, Long>>> {
private static final GroupAndCountCollectorSupplier<?> INSTANCE = new GroupAndCountCollectorSupplier<>();
private GroupAndCountCollectorSupplier() {
}
@ProtoFactory
@SuppressWarnings("unchecked")
public static <T1> GroupAndCountCollectorSupplier<T1> getInstance() {
return (GroupAndCountCollectorSupplier<T1>) INSTANCE;
}
@Override
public Collector<T, ?, Map<T, Long>> get() {
return Collectors.groupingBy(CompletableFutures.identity(), Collectors.counting());
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.function.Function;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.keycloak.marshalling.Marshalling;
/**
* A {@link Function} to extract the key from a {@link Map.Entry}.
* <p>
* Same as {@code Map.Entry::getKey}.
* <p>
* Infinispan can marshall lambdas, by using {@link SerializedLambda} but it is not as efficient and ProtoStream
* marshaller.
*
* @param <K>
* @param <V>
*/
@ProtoTypeId(Marshalling.MAP_ENTRY_TO_KEY_FUNCTION)
public class MapEntryToKeyMapper<K, V> implements Function<Map.Entry<K, V>, K> {
private static final MapEntryToKeyMapper<?, ?> INSTANCE = new MapEntryToKeyMapper<>();
private MapEntryToKeyMapper() {
}
@ProtoFactory
@SuppressWarnings("unchecked")
public static <K1, V1> MapEntryToKeyMapper<K1, V1> getInstance() {
return (MapEntryToKeyMapper<K1, V1>) INSTANCE;
}
@Override
public K apply(Map.Entry<K, V> entry) {
return entry.getKey();
}
}

View file

@ -22,7 +22,6 @@ import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
@ -35,45 +34,20 @@ import java.util.stream.Stream;
public class Mappers {
public static Function<Map.Entry<String, SessionEntityWrapper<UserSessionEntity>>, UserSessionEntity> userSessionEntity() {
return new UserSessionEntityMapper();
return SessionUnwrapMapper.getInstance();
}
public static Function<Map.Entry<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>>, LoginFailureKey> loginFailureId() {
return new LoginFailureIdMapper();
}
private static class UserSessionEntityMapper implements Function<Map.Entry<String, SessionEntityWrapper<UserSessionEntity>>, UserSessionEntity>, Serializable {
@Override
public UserSessionEntity apply(Map.Entry<String, SessionEntityWrapper<UserSessionEntity>> entry) {
return entry.getValue().getEntity();
}
}
private static class LoginFailureIdMapper implements Function<Map.Entry<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>>, LoginFailureKey> {
@Override
public LoginFailureKey apply(Map.Entry<LoginFailureKey, SessionEntityWrapper<LoginFailureEntity>> entry) {
return entry.getKey();
}
}
private static class AuthClientSessionSetMapper implements Function<Map.Entry<String, SessionEntityWrapper<UserSessionEntity>>, Set<String>> {
@Override
public Set<String> apply(Map.Entry<String, SessionEntityWrapper<UserSessionEntity>> entry) {
UserSessionEntity entity = entry.getValue().getEntity();
return entity.getAuthenticatedClientSessions().keySet();
}
return MapEntryToKeyMapper.getInstance();
}
@Deprecated(since = "26.0", forRemoval = true)
public static <T> Stream<T> toStream(Collection<T> collection) {
return collection.stream();
}
public static Function<Map.Entry<String, SessionEntityWrapper<UserSessionEntity>>, Set<String>> authClientSessionSetMapper() {
return new AuthClientSessionSetMapper();
return AuthClientSessionSetMapper.getInstance();
}

View file

@ -0,0 +1,55 @@
/*
* Copyright 2024 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.stream;
import java.util.Map;
import java.util.function.Function;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
/**
* A {@link Function} to unwrap the {@link SessionEntity} from the {@link SessionEntityWrapper}.
* <p>
* The {@link SessionEntityWrapper} is part of the value of {@link Map.Entry}.
*
* @param <K> The key type.
* @param <V> The value type.
*/
@ProtoTypeId(Marshalling.SESSION_UNWRAP_MAPPER)
public class SessionUnwrapMapper<K, V extends SessionEntity> implements Function<Map.Entry<K, SessionEntityWrapper<V>>, V> {
private static final SessionUnwrapMapper<?, ?> INSTANCE = new SessionUnwrapMapper<>();
private SessionUnwrapMapper() {
}
@ProtoFactory
@SuppressWarnings("unchecked")
public static <K1, V1 extends SessionEntity> SessionUnwrapMapper<K1, V1> getInstance() {
return (SessionUnwrapMapper<K1, V1>) INSTANCE;
}
@Override
public V apply(Map.Entry<K, SessionEntityWrapper<V>> entry) {
return entry.getValue().getEntity();
}
}

View file

@ -16,11 +16,24 @@
*/
package org.keycloak.testsuite.model.session;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.keycloak.common.Profile;
import org.keycloak.common.util.MultiSiteUtils;
import org.keycloak.infinispan.util.InfinispanUtils;
import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.Constants;
@ -39,17 +52,12 @@ import org.keycloak.testsuite.model.RequireProvider;
import org.keycloak.testsuite.model.infinispan.InfinispanTestUtil;
import org.keycloak.timer.TimerProvider;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.keycloak.testsuite.model.session.UserSessionPersisterProviderTest.createClients;
import static org.keycloak.testsuite.model.session.UserSessionPersisterProviderTest.createSessions;
@ -280,4 +288,65 @@ public class UserSessionProviderModelTest extends KeycloakModelTest {
});
});
}
@Test
public void testStreamsMarshalling() throws InterruptedException {
Assume.assumeTrue(InfinispanUtils.isEmbeddedInfinispan());
closeKeycloakSessionFactory();
var clusterSize = 4;
var barrier = new CyclicBarrier(clusterSize);
inIndependentFactories(clusterSize, 30, () -> {
// populate the cache
withRealmConsumer(realmId, (keycloakSession, realm) -> {
var user = keycloakSession.users().getUserByUsername(realm, "user1");
var client = realm.getClientByClientId("test-app");
assertNotNull(user);
assertNotNull(client);
var userSession = keycloakSession.sessions().createUserSession(null, realm, user, "user1", "127.0.0.1", "form", true, null, null, UserSessionModel.SessionPersistenceState.PERSISTENT);
assertNotNull(userSession);
var clientSession = keycloakSession.sessions().createClientSession(realm, client, userSession);
assertNotNull(clientSession);
});
try {
barrier.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (BrokenBarrierException | TimeoutException e) {
throw new RuntimeException(e);
}
withRealmConsumer(realmId, (keycloakSession, realm) -> {
var user = keycloakSession.users().getUserByUsername(realm, "user1");
assertNotNull(user);
var client = realm.getClientByClientId("test-app");
assertNotNull(client);
var activeClientSessionsStats = keycloakSession.sessions().getActiveClientSessionStats(realm, false);
assertNotNull(activeClientSessionsStats);
assertEquals(1, activeClientSessionsStats.size());
assertTrue(activeClientSessionsStats.containsKey(client.getId()));
assertEquals(4L, (long) activeClientSessionsStats.get(client.getId()));
var userSessions = keycloakSession.sessions().getUserSessionsStream(realm, user).toList();
assertNotNull(userSessions);
assertEquals(4, userSessions.size());
// sync everybody here since we are going to remove everything.
try {
barrier.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (BrokenBarrierException | TimeoutException e) {
throw new RuntimeException(e);
}
keycloakSession.sessions().removeUserSessions(realm, user);
});
});
}
}