diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java index 7d65f25d08..75c2a42499 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remote/RemoteUserSessionProvider.java @@ -32,7 +32,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.Maybe; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.commons.util.concurrent.CompletionStages; import org.jboss.logging.Logger; @@ -383,11 +382,6 @@ public class RemoteUserSessionProvider implements UserSessionProvider { return initUserSessionUpdater(updater, UserSessionModel.SessionPersistenceState.PERSISTENT, realm, user, offline); } - private Maybe maybeInitUserSessionFromQuery(UserSessionUpdater updater, RealmModel realm, boolean offline) { - var model = initUserSessionFromQuery(updater, realm, null, offline); - return model == null ? Maybe.empty() : Maybe.just(model); - } - private UserSessionUpdater initUserSessionUpdater(UserSessionUpdater updater, UserSessionModel.SessionPersistenceState persistenceState, RealmModel realm, UserModel user, boolean offline) { if (user instanceof LightweightUserAdapter) { updater.initialize(persistenceState, realm, user, new ClientSessionMapping(updater)); @@ -462,9 +456,10 @@ public class RemoteUserSessionProvider implements UserSessionProvider { var userSessionTx = getUserSessionTransaction(offline); return Flowable.fromIterable(QueryHelper.toCollection(userSessionIdQuery, QueryHelper.SINGLE_PROJECTION_TO_STRING)) .flatMapMaybe(userSessionTx::maybeGet, false, MAX_CONCURRENT_REQUESTS) - .concatMapMaybe(session -> maybeInitUserSessionFromQuery(session, realm, offline)) - .map(UserSessionModel.class::cast) - .blockingStream(batchSize); + .blockingStream(batchSize) + .map(session -> initUserSessionFromQuery(session, realm, null, offline)) + .filter(Objects::nonNull) + .map(UserSessionModel.class::cast); } private static > T checkExpiration(T updater) {