Fix pagination issue with H6

With Hibernate ORM 6, pagination started to be unreliable: When
setting the max results only if the first row was 0 has randomly
affected other threads where first row was greater than 0. The
latter thread sometimes produced query which did *not* account
for the offset (cf. threads `-t1` and `-t2` below, while `-t2`
missed the `offset ? rows` part whic `-t3` has).

This has been fixed by setting the first row offset unconditionally.

Closes: #20202
Closes: #16570

```
2023-06-02 10:19:03.855000 TRACE [org.keycloak.models.sessions.infinispan.initializer.SessionInitializerWorker] (blocking-thread-node-2-p8-t1) Running computation for segment 0 with worker 0
2023-06-02 10:19:03.856000 TRACE [org.keycloak.models.sessions.infinispan.initializer.OfflinePersistentUserSessionLoader] (blocking-thread-node-2-p8-t1) Loading sessions for segment=0 lastSessionId=00000000-0000-0000-0000-000000000000 first=0
2023-06-02 10:19:03.856000 DEBUG [org.keycloak.models.jpa.PaginationUtils] (blocking-thread-node-2-p8-t1) Set max to 64 in org.hibernate.query.sqm.internal.QuerySqmImpl@2fb60f8b
2023-06-02 10:19:03.856000 DEBUG [org.keycloak.models.jpa.PaginationUtils] (blocking-thread-node-2-p8-t1) After pagination: 0, 64
2023-06-02 10:19:03.857000 TRACE [org.keycloak.models.sessions.infinispan.initializer.SessionInitializerWorker] (blocking-thread-node-2-p8-t2) Running computation for segment 1 with worker 1
2023-06-02 10:19:03.857000 TRACE [org.keycloak.models.sessions.infinispan.initializer.OfflinePersistentUserSessionLoader] (blocking-thread-node-2-p8-t2) Loading sessions for segment=1 lastSessionId=00000000-0000-0000-0000-000000000000 first=64
2023-06-02 10:19:03.857000 TRACE [org.keycloak.models.sessions.infinispan.initializer.SessionInitializerWorker] (blocking-thread-node-2-p8-t3) Running computation for segment 2 with worker 2
2023-06-02 10:19:03.857000 DEBUG [org.keycloak.models.jpa.PaginationUtils] (blocking-thread-node-2-p8-t2) Set first to 64 in org.hibernate.query.sqm.internal.QuerySqmImpl@71464e9f
2023-06-02 10:19:03.857000 DEBUG [org.keycloak.models.jpa.PaginationUtils] (blocking-thread-node-2-p8-t2) Set max to 64 in org.hibernate.query.sqm.internal.QuerySqmImpl@71464e9f
2023-06-02 10:19:03.857000 DEBUG [org.keycloak.models.jpa.PaginationUtils] (blocking-thread-node-2-p8-t2) After pagination: 64, 64
2023-06-02 10:19:03.857000 TRACE [org.keycloak.models.sessions.infinispan.initializer.OfflinePersistentUserSessionLoader] (blocking-thread-node-2-p8-t3) Loading sessions for segment=2 lastSessionId=00000000-0000-0000-0000-000000000000 first=128
10:19:03,859 DEBUG [org.hibernate.SQL] (blocking-thread-node-2-p8-t1)
    select
        p1_0.OFFLINE_FLAG,
        p1_0.USER_SESSION_ID,
        p1_0.CREATED_ON,
        p1_0.DATA,
        p1_0.LAST_SESSION_REFRESH,
        p1_0.REALM_ID,
        p1_0.USER_ID
    from
        OFFLINE_USER_SESSION p1_0,
        REALM r1_0
    where
        r1_0.ID=p1_0.REALM_ID
        and p1_0.OFFLINE_FLAG=?
        and p1_0.USER_SESSION_ID>?
    order by
        p1_0.USER_SESSION_ID fetch first ? rows only
10:19:03,859 DEBUG [org.hibernate.SQL] (blocking-thread-node-2-p8-t2)
    select
        p1_0.OFFLINE_FLAG,
        p1_0.USER_SESSION_ID,
        p1_0.CREATED_ON,
        p1_0.DATA,
        p1_0.LAST_SESSION_REFRESH,
        p1_0.REALM_ID,
        p1_0.USER_ID
    from
        OFFLINE_USER_SESSION p1_0,
        REALM r1_0
    where
        r1_0.ID=p1_0.REALM_ID
        and p1_0.OFFLINE_FLAG=?
        and p1_0.USER_SESSION_ID>?
    order by
        p1_0.USER_SESSION_ID fetch first ? rows only
2023-06-02 10:19:03.860000 TRACE [org.hibernate.orm.jdbc.bind] (blocking-thread-node-2-p8-t1) binding parameter [1] as [VARCHAR] - [1]
2023-06-02 10:19:03.860000 TRACE [org.hibernate.orm.jdbc.bind] (blocking-thread-node-2-p8-t1) binding parameter [2] as [VARCHAR] - [00000000-0000-0000-0000-000000000000]
2023-06-02 10:19:03.860000 TRACE [org.hibernate.orm.jdbc.bind] (blocking-thread-node-2-p8-t1) binding parameter [3] as [INTEGER] - [64]
10:19:03,860 DEBUG [org.hibernate.SQL] (blocking-thread-node-2-p8-t3)
    select
        p1_0.OFFLINE_FLAG,
        p1_0.USER_SESSION_ID,
        p1_0.CREATED_ON,
        p1_0.DATA,
        p1_0.LAST_SESSION_REFRESH,
        p1_0.REALM_ID,
        p1_0.USER_ID
    from
        OFFLINE_USER_SESSION p1_0,
        REALM r1_0
    where
        r1_0.ID=p1_0.REALM_ID
        and p1_0.OFFLINE_FLAG=?
        and p1_0.USER_SESSION_ID>?
    order by
        p1_0.USER_SESSION_ID offset ? rows fetch first ? rows only
2023-06-02 10:19:03.861000 TRACE [org.hibernate.orm.jdbc.bind] (blocking-thread-node-2-p8-t3) binding parameter [3] as [INTEGER] - [128]
2023-06-02 10:19:03.861000 TRACE [org.hibernate.orm.jdbc.bind] (blocking-thread-node-2-p8-t3) binding parameter [4] as [INTEGER] - [64]
```

Co-authored-by: mkanis <mkanis@redhat.com>
This commit is contained in:
Hynek Mlnarik 2023-06-06 14:17:54 +02:00 committed by Hynek Mlnařík
parent 7e95d57faa
commit 12dd3edb10
6 changed files with 46 additions and 9 deletions

View file

@ -94,7 +94,7 @@ public class OfflinePersistentUserSessionLoader implements SessionLoader<Offline
public OfflinePersistentWorkerResult loadSessions(KeycloakSession session, OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext ctx) { public OfflinePersistentWorkerResult loadSessions(KeycloakSession session, OfflinePersistentLoaderContext loaderContext, OfflinePersistentWorkerContext ctx) {
int first = ctx.getWorkerId() * sessionsPerSegment; int first = ctx.getWorkerId() * sessionsPerSegment;
log.tracef("Loading sessions for segment=%d lastSessionId=%s", ctx.getSegment(), ctx.getLastSessionId()); log.tracef("Loading sessions for segment=%d lastSessionId=%s first=%d", ctx.getSegment(), ctx.getLastSessionId(), (Object) first);
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class); UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
List<UserSessionModel> sessions = persister List<UserSessionModel> sessions = persister

View file

@ -22,11 +22,11 @@ import jakarta.persistence.TypedQuery;
public class PaginationUtils { public class PaginationUtils {
public static final int DEFAULT_MAX_RESULTS = Integer.MAX_VALUE >> 1; public static final int DEFAULT_MAX_RESULTS = Integer.MAX_VALUE >> 1;
public static <T> TypedQuery<T> paginateQuery(TypedQuery<T> query, Integer first, Integer max) { public static <T> TypedQuery<T> paginateQuery(TypedQuery<T> query, Integer first, Integer max) {
if (first != null && first > 0) { if (first != null && first >= 0) {
query = query.setFirstResult(first); query = query.setFirstResult(first);
// Workaround for https://hibernate.atlassian.net/browse/HHH-14295 // Workaround for https://hibernate.atlassian.net/browse/HHH-14295
if (max == null || max < 0) { if (max == null || max < 0) {
max = DEFAULT_MAX_RESULTS; max = DEFAULT_MAX_RESULTS;

View file

@ -417,7 +417,6 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
* @return * @return
*/ */
private Stream<UserSessionModel> loadUserSessionsWithClientSessions(TypedQuery<PersistentUserSessionEntity> query, String offlineStr, boolean useExact) { private Stream<UserSessionModel> loadUserSessionsWithClientSessions(TypedQuery<PersistentUserSessionEntity> query, String offlineStr, boolean useExact) {
List<PersistentUserSessionAdapter> userSessionAdapters = closing(query.getResultStream() List<PersistentUserSessionAdapter> userSessionAdapters = closing(query.getResultStream()
.map(this::toAdapter) .map(this::toAdapter)
.filter(Objects::nonNull)) .filter(Objects::nonNull))
@ -463,6 +462,8 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
onClientRemoved(clientUUID); onClientRemoved(clientUUID);
} }
logger.tracef("Loaded %d user sessions (offline=%s, sessionIds=%s)", userSessionAdapters.size(), offlineStr, sessionsById.keySet());
return userSessionAdapters.stream().map(UserSessionModel.class::cast); return userSessionAdapters.stream().map(UserSessionModel.class::cast);
} }
@ -471,9 +472,12 @@ public class JpaUserSessionPersisterProvider implements UserSessionPersisterProv
PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSessionEntity); PersistentAuthenticatedClientSessionAdapter clientSessAdapter = toAdapter(userSession.getRealm(), userSession, clientSessionEntity);
if (clientSessAdapter.getClient() == null) { if (clientSessAdapter.getClient() == null) {
logger.tracef("Not adding client session %s / %s since client is null", userSession, clientSessAdapter);
return false; return false;
} }
logger.tracef("Adding client session %s / %s", userSession, clientSessAdapter);
String clientId = clientSessionEntity.getClientId(); String clientId = clientSessionEntity.getClientId();
if (isExternalClient(clientSessionEntity)) { if (isExternalClient(clientSessionEntity)) {
clientId = getExternalClientId(clientSessionEntity); clientId = getExternalClientId(clientSessionEntity);

View file

@ -233,6 +233,11 @@ public class PersistentAuthenticatedClientSessionAdapter implements Authenticate
return getId().hashCode(); return getId().hashCode();
} }
@Override
public String toString() {
return getId();
}
protected static class PersistentClientSessionData { protected static class PersistentClientSessionData {
@JsonProperty("authMethod") @JsonProperty("authMethod")

View file

@ -257,6 +257,11 @@ public class PersistentUserSessionAdapter implements OfflineUserSessionModel {
return getId().hashCode(); return getId().hashCode();
} }
@Override
public String toString() {
return getId();
}
protected static class PersistentUserSessionData { protected static class PersistentUserSessionData {
@JsonProperty("brokerSessionId") @JsonProperty("brokerSessionId")

View file

@ -43,12 +43,15 @@ import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -57,6 +60,7 @@ import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
/** /**
* *
@ -345,16 +349,25 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
@RequireProvider(UserSessionPersisterProvider.class) @RequireProvider(UserSessionPersisterProvider.class)
@RequireProvider(value = UserSessionProvider.class, only = InfinispanUserSessionProviderFactory.PROVIDER_ID) @RequireProvider(value = UserSessionProvider.class, only = InfinispanUserSessionProviderFactory.PROVIDER_ID)
public void testLazyOfflineUserSessionFetching() { public void testLazyOfflineUserSessionFetching() {
List<String> offlineSessionIds = createOfflineSessions(realmId, userIds); Map<String, Set<String>> offlineSessionIdsDetailed = createOfflineSessionsDetailed(realmId, userIds);
Collection<String> offlineSessionIds = offlineSessionIdsDetailed.values().stream().flatMap(Set::stream).collect(Collectors.toCollection(TreeSet::new));
assertOfflineSessionsExist(realmId, offlineSessionIds); assertOfflineSessionsExist(realmId, offlineSessionIds);
// Simulate server restart // Simulate server restart
reinitializeKeycloakSessionFactory(); reinitializeKeycloakSessionFactory();
List<String> actualOfflineSessionIds = withRealm(realmId, (session, realm) -> session.users().getUsersStream(realm).flatMap(user -> Map<String, Set<String>> actualOfflineSessionIds = withRealm(realmId, (session, realm) -> session.users()
session.sessions().getOfflineUserSessionsStream(realm, user)).map(UserSessionModel::getId).collect(Collectors.toList())); .getUsersStream(realm)
.collect(Collectors.toMap(
UserModel::getId,
user -> session.sessions().getOfflineUserSessionsStream(realm, user).map(UserSessionModel::getId).collect(Collectors.toCollection(TreeSet::new))
))
);
assertThat(actualOfflineSessionIds, containsInAnyOrder(offlineSessionIds.toArray())); assertThat("User IDs", actualOfflineSessionIds.keySet(), equalTo(offlineSessionIdsDetailed.keySet()));
for (Entry<String, Set<String>> me : offlineSessionIdsDetailed.entrySet()) {
assertThat("Session IDs", actualOfflineSessionIds.get(me.getKey()), equalTo(me.getValue()));
}
} }
private String createOfflineClientSession(String offlineUserSessionId, String clientId) { private String createOfflineClientSession(String offlineUserSessionId, String clientId) {
@ -408,6 +421,16 @@ public class OfflineSessionPersistenceTest extends KeycloakModelTest {
); );
} }
private Map<String, Set<String>> createOfflineSessionsDetailed(String realmId, List<String> userIds) {
return withRealm(realmId, (session, realm) ->
userIds.stream()
.collect(Collectors.toMap(
Function.identity(),
userId -> createOfflineSessions(session, realm, userId, us -> {}).map(UserSessionModel::getId).collect(Collectors.toCollection(TreeSet::new))
))
);
}
/** /**
* Creates {@link #OFFLINE_SESSION_COUNT_PER_USER} offline sessions for {@code userId} user. * Creates {@link #OFFLINE_SESSION_COUNT_PER_USER} offline sessions for {@code userId} user.
*/ */