Single offlineSession imported in Infinispan with correctly calculated lifespan and maxIdle parameters
Close #8776
This commit is contained in:
parent
76101e3591
commit
91d37b5686
1 changed files with 90 additions and 26 deletions
|
@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan;
|
||||||
import org.infinispan.Cache;
|
import org.infinispan.Cache;
|
||||||
import org.infinispan.client.hotrod.RemoteCache;
|
import org.infinispan.client.hotrod.RemoteCache;
|
||||||
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
||||||
|
import org.infinispan.commons.api.BasicCache;
|
||||||
import org.infinispan.context.Flag;
|
import org.infinispan.context.Flag;
|
||||||
import org.infinispan.stream.CacheCollectors;
|
import org.infinispan.stream.CacheCollectors;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -42,6 +43,7 @@ import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
import org.keycloak.models.sessions.infinispan.changes.Tasks;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.CrossDCLastSessionRefreshStore;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
|
import org.keycloak.models.sessions.infinispan.changes.sessions.PersisterLastSessionRefreshStore;
|
||||||
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||||
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
import org.keycloak.models.sessions.infinispan.changes.InfinispanChangelogBasedTransaction;
|
||||||
|
@ -72,7 +74,9 @@ import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -848,7 +852,15 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
||||||
|
|
||||||
// Directly put all entities to the infinispan cache
|
// Directly put all entities to the infinispan cache
|
||||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = CacheDecorators.skipCacheLoaders(getCache(offline));
|
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = CacheDecorators.skipCacheLoaders(getCache(offline));
|
||||||
cache.putAll(sessionsById);
|
|
||||||
|
boolean importWithExpiration = sessionsById.size() == 1;
|
||||||
|
if (importWithExpiration) {
|
||||||
|
importSessionsWithExpiration(sessionsById, cache,
|
||||||
|
offline ? SessionTimeouts::getOfflineSessionLifespanMs : SessionTimeouts::getUserSessionLifespanMs,
|
||||||
|
offline ? SessionTimeouts::getOfflineSessionMaxIdleMs : SessionTimeouts::getUserSessionMaxIdleMs);
|
||||||
|
} else {
|
||||||
|
cache.putAll(sessionsById);
|
||||||
|
}
|
||||||
|
|
||||||
// put all entities to the remoteCache (if exists)
|
// put all entities to the remoteCache (if exists)
|
||||||
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
|
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
|
||||||
|
@ -857,28 +869,40 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
||||||
.map(SessionEntityWrapper::forTransport)
|
.map(SessionEntityWrapper::forTransport)
|
||||||
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
|
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
|
||||||
|
|
||||||
Retry.executeWithBackoff((int iteration) -> {
|
if (importWithExpiration) {
|
||||||
|
importSessionsWithExpiration(sessionsByIdForTransport, remoteCache,
|
||||||
|
offline ? SessionTimeouts::getOfflineSessionLifespanMs : SessionTimeouts::getUserSessionLifespanMs,
|
||||||
|
offline ? SessionTimeouts::getOfflineSessionMaxIdleMs : SessionTimeouts::getUserSessionMaxIdleMs);
|
||||||
|
} else {
|
||||||
|
Retry.executeWithBackoff((int iteration) -> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
remoteCache.putAll(sessionsByIdForTransport);
|
remoteCache.putAll(sessionsByIdForTransport);
|
||||||
} catch (HotRodClientException re) {
|
} catch (HotRodClientException re) {
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
|
log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
|
||||||
sessionsByIdForTransport.size(), iteration);
|
sessionsByIdForTransport.size(), iteration);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
||||||
|
throw re;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
}, 10, 10);
|
||||||
throw re;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
}, 10, 10);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Import client sessions
|
// Import client sessions
|
||||||
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessCache = offline ? offlineClientSessionCache : clientSessionCache;
|
Cache<UUID, SessionEntityWrapper<AuthenticatedClientSessionEntity>> clientSessCache = offline ? offlineClientSessionCache : clientSessionCache;
|
||||||
clientSessCache = CacheDecorators.skipCacheLoaders(clientSessCache);
|
clientSessCache = CacheDecorators.skipCacheLoaders(clientSessCache);
|
||||||
|
|
||||||
clientSessCache.putAll(clientSessionsById);
|
if (importWithExpiration) {
|
||||||
|
importSessionsWithExpiration(clientSessionsById, clientSessCache,
|
||||||
|
offline ? SessionTimeouts::getOfflineClientSessionLifespanMs : SessionTimeouts::getClientSessionLifespanMs,
|
||||||
|
offline ? SessionTimeouts::getOfflineClientSessionMaxIdleMs : SessionTimeouts::getClientSessionMaxIdleMs);
|
||||||
|
} else {
|
||||||
|
clientSessCache.putAll(clientSessionsById);
|
||||||
|
}
|
||||||
|
|
||||||
// put all entities to the remoteCache (if exists)
|
// put all entities to the remoteCache (if exists)
|
||||||
RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache);
|
RemoteCache remoteCacheClientSessions = InfinispanUtil.getRemoteCache(clientSessCache);
|
||||||
|
@ -887,24 +911,64 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
||||||
.map(SessionEntityWrapper::forTransport)
|
.map(SessionEntityWrapper::forTransport)
|
||||||
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
|
.collect(Collectors.toMap(sessionEntityWrapper -> sessionEntityWrapper.getEntity().getId(), Function.identity()));
|
||||||
|
|
||||||
Retry.executeWithBackoff((int iteration) -> {
|
if (importWithExpiration) {
|
||||||
|
importSessionsWithExpiration(sessionsByIdForTransport, remoteCacheClientSessions,
|
||||||
|
offline ? SessionTimeouts::getOfflineClientSessionLifespanMs : SessionTimeouts::getClientSessionLifespanMs,
|
||||||
|
offline ? SessionTimeouts::getOfflineClientSessionMaxIdleMs : SessionTimeouts::getClientSessionMaxIdleMs);
|
||||||
|
} else {
|
||||||
|
Retry.executeWithBackoff((int iteration) -> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
remoteCacheClientSessions.putAll(sessionsByIdForTransport);
|
remoteCacheClientSessions.putAll(sessionsByIdForTransport);
|
||||||
} catch (HotRodClientException re) {
|
} catch (HotRodClientException re) {
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debugf(re, "Failed to put import %d client sessions to remoteCache. Iteration '%s'. Will try to retry the task",
|
log.debugf(re, "Failed to put import %d client sessions to remoteCache. Iteration '%s'. Will try to retry the task",
|
||||||
sessionsByIdForTransport.size(), iteration);
|
sessionsByIdForTransport.size(), iteration);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
||||||
|
throw re;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
}, 10, 10);
|
||||||
throw re;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
}, 10, 10);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T extends SessionEntity> void importSessionsWithExpiration(Map<? extends Object, SessionEntityWrapper<T>> sessionsById,
|
||||||
|
BasicCache cache, BiFunction<RealmModel, T, Long> lifespanMsCalculator,
|
||||||
|
BiFunction<RealmModel, T, Long> maxIdleTimeMsCalculator) {
|
||||||
|
sessionsById.forEach((id, sessionEntityWrapper) -> {
|
||||||
|
|
||||||
|
T sessionEntity = sessionEntityWrapper.getEntity();
|
||||||
|
RealmModel currentRealm = session.realms().getRealm(sessionEntity.getRealmId());
|
||||||
|
long lifespan = lifespanMsCalculator.apply(currentRealm, sessionEntity);
|
||||||
|
long maxIdle = maxIdleTimeMsCalculator.apply(currentRealm, sessionEntity);
|
||||||
|
|
||||||
|
if(lifespan != SessionTimeouts.ENTRY_EXPIRED_FLAG
|
||||||
|
&& maxIdle != SessionTimeouts.ENTRY_EXPIRED_FLAG ) {
|
||||||
|
if (cache instanceof RemoteCache) {
|
||||||
|
Retry.executeWithBackoff((int iteration) -> {
|
||||||
|
|
||||||
|
try {
|
||||||
|
cache.put(id, sessionEntityWrapper, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (HotRodClientException re) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debugf(re, "Failed to put import %d sessions to remoteCache. Iteration '%s'. Will try to retry the task",
|
||||||
|
sessionsById.size(), iteration);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
|
||||||
|
throw re;
|
||||||
|
}
|
||||||
|
|
||||||
|
}, 10, 10);
|
||||||
|
} else {
|
||||||
|
cache.put(id, sessionEntityWrapper, lifespan, TimeUnit.MILLISECONDS, maxIdle, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Imports just userSession without it's clientSessions
|
// Imports just userSession without it's clientSessions
|
||||||
protected UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline) {
|
protected UserSessionAdapter importUserSession(UserSessionModel userSession, boolean offline) {
|
||||||
|
|
Loading…
Reference in a new issue