KEYCLOAK-8904 Backpressure in RemoteCacheSessionsLoader

This commit is contained in:
mposolda 2018-12-10 14:38:12 +01:00 committed by Marek Posolda
parent 40071a95da
commit 04445c8a23

View file

@ -18,6 +18,7 @@
package org.keycloak.models.sessions.infinispan.remotestore; package org.keycloak.models.sessions.infinispan.remotestore;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -114,6 +115,7 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment()); log.debugf("Will do bulk load of sessions from remote cache '%s' . Segment: %d", cache.getName(), ctx.getSegment());
Map<Object, Object> remoteEntries = new HashMap<>();
CloseableIterator<Map.Entry> iterator = null; CloseableIterator<Map.Entry> iterator = null;
int countLoaded = 0; int countLoaded = 0;
try { try {
@ -121,7 +123,7 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
while (iterator.hasNext()) { while (iterator.hasNext()) {
countLoaded++; countLoaded++;
Map.Entry entry = iterator.next(); Map.Entry entry = iterator.next();
decoratedCache.putAsync(entry.getKey(), entry.getValue()); remoteEntries.put(entry.getKey(), entry.getValue());
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment()); log.warnf(e, "Error loading sessions from remote cache '%s' for segment '%d'", remoteCache.getName(), ctx.getSegment());
@ -132,6 +134,8 @@ public class RemoteCacheSessionsLoader implements SessionLoader<RemoteCacheSessi
} }
} }
decoratedCache.putAll(remoteEntries);
log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded); log.debugf("Successfully finished loading sessions from cache '%s' . Segment: %d, Count of sessions loaded: %d", cache.getName(), ctx.getSegment(), countLoaded);
return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId()); return new WorkerResult(true, ctx.getSegment(), ctx.getWorkerId());