Call JPA code in blocking thread (#32154)

Closes #32153

Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
Alexander Schwartz 2024-08-16 10:17:30 +02:00 committed by GitHub
parent 49d2efbfb2
commit 88904c0a01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -32,7 +32,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.jboss.logging.Logger;
@ -383,11 +382,6 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
return initUserSessionUpdater(updater, UserSessionModel.SessionPersistenceState.PERSISTENT, realm, user, offline);
}
private Maybe<UserSessionUpdater> maybeInitUserSessionFromQuery(UserSessionUpdater updater, RealmModel realm, boolean offline) {
var model = initUserSessionFromQuery(updater, realm, null, offline);
return model == null ? Maybe.empty() : Maybe.just(model);
}
private UserSessionUpdater initUserSessionUpdater(UserSessionUpdater updater, UserSessionModel.SessionPersistenceState persistenceState, RealmModel realm, UserModel user, boolean offline) {
if (user instanceof LightweightUserAdapter) {
updater.initialize(persistenceState, realm, user, new ClientSessionMapping(updater));
@ -462,9 +456,10 @@ public class RemoteUserSessionProvider implements UserSessionProvider {
var userSessionTx = getUserSessionTransaction(offline);
return Flowable.fromIterable(QueryHelper.toCollection(userSessionIdQuery, QueryHelper.SINGLE_PROJECTION_TO_STRING))
.flatMapMaybe(userSessionTx::maybeGet, false, MAX_CONCURRENT_REQUESTS)
.concatMapMaybe(session -> maybeInitUserSessionFromQuery(session, realm, offline))
.map(UserSessionModel.class::cast)
.blockingStream(batchSize);
.blockingStream(batchSize)
.map(session -> initUserSessionFromQuery(session, realm, null, offline))
.filter(Objects::nonNull)
.map(UserSessionModel.class::cast);
}
private static <K, V, T extends BaseUpdater<K, V>> T checkExpiration(T updater) {