Set expiry and maxidle when loading entries from the remote store (#26942)
Closes #26941 Signed-off-by: Michal Hajas <mhajas@redhat.com> Signed-off-by: Alexander Schwartz <aschwart@redhat.com> Co-authored-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
parent
3b721512c4
commit
ee3a4a6e4f
1 changed files with 57 additions and 13 deletions
|
@ -20,8 +20,10 @@ package org.keycloak.models.sessions.infinispan.remotestore;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.infinispan.Cache;
|
import org.infinispan.Cache;
|
||||||
|
import org.infinispan.client.hotrod.MetadataValue;
|
||||||
import org.infinispan.client.hotrod.RemoteCache;
|
import org.infinispan.client.hotrod.RemoteCache;
|
||||||
import org.infinispan.commons.util.CloseableIterator;
|
import org.infinispan.commons.util.CloseableIterator;
|
||||||
import org.infinispan.context.Flag;
|
import org.infinispan.context.Flag;
|
||||||
|
@ -79,23 +81,66 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
|
||||||
RemoteCache<?, ?> remoteCache = getRemoteCache(session);
|
RemoteCache<?, ?> remoteCache = getRemoteCache(session);
|
||||||
|
|
||||||
int countLoaded = 0;
|
int countLoaded = 0;
|
||||||
try (CloseableIterator<Map.Entry<Object, Object>> it = remoteCache.retrieveEntries(null, loaderContext.getSessionsPerSegment())) {
|
try (CloseableIterator<Map.Entry<Object, MetadataValue<Object>>> it = remoteCache.retrieveEntriesWithMetadata(null, loaderContext.getSessionsPerSegment())) {
|
||||||
Map<Object, Object> toInsert = new HashMap<>(loaderContext.getSessionsPerSegment());
|
Map<Object, Object> toInsertExpiring = new HashMap<>(loaderContext.getSessionsPerSegment());
|
||||||
|
Map<Object, Object> toInsertImmortal = new HashMap<>(loaderContext.getSessionsPerSegment());
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
int maxLifespanExpiring = 0;
|
||||||
|
int maxIdleExpiring = -1;
|
||||||
|
int maxIdleImmortal = -1;
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
Map.Entry<?,?> entry = it.next();
|
Map.Entry<Object, MetadataValue<Object>> entry = it.next();
|
||||||
toInsert.put(entry.getKey(), entry.getValue());
|
boolean isImmortal = entry.getValue().getLifespan() < 0;
|
||||||
|
boolean shouldInsert = true;
|
||||||
|
|
||||||
|
if (!isImmortal) {
|
||||||
|
// Calculate the remaining lifetime reduced by the current time, not Keycloak time as the remote Infinispan isn't on Keycloak's clock.
|
||||||
|
// The lifetime will be larger than on the remote store for those entries, but all sessions contain timestamp which will be validated anyway.
|
||||||
|
// If we don't trust the clock calculations here, we would instead use the maxLifeSpan as is, which could enlarge the expiry time significantly.
|
||||||
|
int remainingLifespan = entry.getValue().getLifespan() - (int) ((System.currentTimeMillis() - entry.getValue().getCreated()) / 1000);
|
||||||
|
maxLifespanExpiring = Math.max(maxLifespanExpiring, remainingLifespan);
|
||||||
|
if (remainingLifespan <= 0) {
|
||||||
|
shouldInsert = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entry.getValue().getMaxIdle() > 0) {
|
||||||
|
// The max idle time on the remote store is set to the max lifetime as remote store entries are not touched on read, and therefore would otherwise expire too early.
|
||||||
|
// Still, this is the only number we have available, so we use it.
|
||||||
|
if (isImmortal) {
|
||||||
|
maxIdleImmortal = Math.max(maxIdleImmortal, entry.getValue().getMaxIdle());
|
||||||
|
} else {
|
||||||
|
maxIdleExpiring = Math.max(maxIdleExpiring, entry.getValue().getMaxIdle());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shouldInsert) {
|
||||||
|
(isImmortal ? toInsertImmortal : toInsertExpiring).put(entry.getKey(), entry.getValue().getValue());
|
||||||
++countLoaded;
|
++countLoaded;
|
||||||
|
}
|
||||||
|
|
||||||
if (++count == loaderContext.getSessionsPerSegment()) {
|
if (++count == loaderContext.getSessionsPerSegment()) {
|
||||||
insertSessions(decoratedCache, toInsert);
|
if (!toInsertExpiring.isEmpty()) {
|
||||||
toInsert = new HashMap<>(loaderContext.getSessionsPerSegment());
|
insertSessions(decoratedCache, toInsertExpiring, maxIdleExpiring, maxLifespanExpiring);
|
||||||
|
toInsertExpiring.clear();
|
||||||
|
maxLifespanExpiring = 0;
|
||||||
|
maxIdleExpiring = -1;
|
||||||
|
}
|
||||||
|
if (!toInsertImmortal.isEmpty()) {
|
||||||
|
insertSessions(decoratedCache, toInsertImmortal, maxIdleImmortal, -1);
|
||||||
|
toInsertImmortal.clear();
|
||||||
|
maxIdleImmortal = -1;
|
||||||
|
}
|
||||||
count = 0;
|
count = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!toInsert.isEmpty()) {
|
|
||||||
// last batch
|
// last batch
|
||||||
insertSessions(decoratedCache, toInsert);
|
if (!toInsertExpiring.isEmpty()) {
|
||||||
|
insertSessions(decoratedCache, toInsertExpiring, maxIdleExpiring, maxLifespanExpiring);
|
||||||
|
}
|
||||||
|
if (!toInsertImmortal.isEmpty()) {
|
||||||
|
insertSessions(decoratedCache, toInsertImmortal, maxIdleImmortal, -1);
|
||||||
}
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
|
log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
|
||||||
|
@ -107,7 +152,7 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
|
||||||
return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());
|
return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insertSessions(Cache<Object, Object> cache, Map<Object, Object> entries) {
|
private void insertSessions(Cache<Object, Object> cache, Map<Object, Object> entries, int maxIdle, int lifespan) {
|
||||||
log.debugf("Adding %d entries to cache '%s'", entries.size(), cacheName);
|
log.debugf("Adding %d entries to cache '%s'", entries.size(), cacheName);
|
||||||
|
|
||||||
// The `putAll` operation might time out when a node becomes unavailable, therefore, retry.
|
// The `putAll` operation might time out when a node becomes unavailable, therefore, retry.
|
||||||
|
@ -116,8 +161,7 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
|
||||||
DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> {
|
DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> {
|
||||||
// With Infinispan 14.0.21/14.0.19, we've seen deadlocks in tests where this future never completed when shutting down the internal Infinispan.
|
// With Infinispan 14.0.21/14.0.19, we've seen deadlocks in tests where this future never completed when shutting down the internal Infinispan.
|
||||||
// Therefore, prevent the shutdown of the internal Infinispan during this step.
|
// Therefore, prevent the shutdown of the internal Infinispan during this step.
|
||||||
|
cache.putAll(entries, lifespan, TimeUnit.SECONDS, maxIdle, TimeUnit.SECONDS);
|
||||||
cache.putAll(entries);
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
(iteration, throwable) -> log.warnf("Unable to put entries into the cache in iteration %s", iteration, throwable),
|
(iteration, throwable) -> log.warnf("Unable to put entries into the cache in iteration %s", iteration, throwable),
|
||||||
|
|
Loading…
Reference in a new issue