From 1289e84cdb629cc58e24a1ceab3dd072ce0bdee7 Mon Sep 17 00:00:00 2001 From: mposolda Date: Fri, 11 Aug 2017 10:34:05 +0200 Subject: [PATCH] KEYCLOAK-4630 Refactor RemoteCacheSessionsLoader to use JS script for preload sessions through more pages --- .../InfinispanUserSessionProviderFactory.java | 7 +- .../initializer/BaseCacheInitializer.java | 4 +- .../RemoteCacheSessionsLoader.java | 73 +++++++++++++++---- .../util/cli/AbstractSessionCacheCommand.java | 55 +++++++++----- 4 files changed, 99 insertions(+), 40 deletions(-) diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java index 110a8124f8..489dd60d6d 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanUserSessionProviderFactory.java @@ -248,12 +248,12 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider private void loadSessionsFromRemoteCaches(KeycloakSession session) { for (String cacheName : remoteCacheInvoker.getRemoteCacheNames()) { - loadSessionsFromRemoteCache(session.getKeycloakSessionFactory(), cacheName, getMaxErrors()); + loadSessionsFromRemoteCache(session.getKeycloakSessionFactory(), cacheName, getSessionsPerSegment(), getMaxErrors()); } } - private void loadSessionsFromRemoteCache(final KeycloakSessionFactory sessionFactory, String cacheName, final int maxErrors) { + private void loadSessionsFromRemoteCache(final KeycloakSessionFactory sessionFactory, String cacheName, final int sessionsPerSegment, final int maxErrors) { log.debugf("Check pre-loading userSessions from remote cache '%s'", cacheName); KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() { @@ -263,8 +263,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class); Cache workCache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME); - // Use limit for sessionsPerSegment as RemoteCache bulk load doesn't have support for pagination :/ - BaseCacheInitializer initializer = new SingleWorkerCacheInitializer(session, workCache, new RemoteCacheSessionsLoader(cacheName), "remoteCacheLoad::" + cacheName); + InfinispanCacheInitializer initializer = new InfinispanCacheInitializer(sessionFactory, workCache, new RemoteCacheSessionsLoader(cacheName), "remoteCacheLoad::" + cacheName, sessionsPerSegment, maxErrors); initializer.initCache(); initializer.loadSessions(); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java index 43788d07fb..cca28cc656 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/initializer/BaseCacheInitializer.java @@ -106,7 +106,7 @@ public abstract class BaseCacheInitializer extends CacheInitializer { private InitializerState getStateFromCache() { - // We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored. This means that every DC needs to load offline sessions separately. + // We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored. return (InitializerState) workCache.getAdvancedCache() .withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD) .get(stateKey); @@ -122,7 +122,7 @@ public abstract class BaseCacheInitializer extends CacheInitializer { public void run() { // Save this synchronously to ensure all nodes read correct state - // We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored. This means that every DC needs to load offline sessions separately. + // We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored. BaseCacheInitializer.this.workCache.getAdvancedCache(). withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS, Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD) .put(stateKey, state); diff --git a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java index 65c31bc77d..ba01b7148d 100644 --- a/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java +++ b/model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheSessionsLoader.java @@ -18,10 +18,12 @@ package org.keycloak.models.sessions.infinispan.remotestore; import java.io.Serializable; +import java.util.HashMap; import java.util.Map; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.commons.marshall.Marshaller; import org.infinispan.context.Flag; import org.jboss.logging.Logger; import org.keycloak.connections.infinispan.InfinispanConnectionProvider; @@ -40,8 +42,33 @@ public class RemoteCacheSessionsLoader implements SessionLoader { private static final Logger log = Logger.getLogger(RemoteCacheSessionsLoader.class); - // Hardcoded limit for now. See if needs to be configurable (or if preloading can be enabled/disabled in configuration) - public static final int LIMIT = 100000; + + // Javascript to be executed on remote infinispan server (Flag CACHE_MODE_LOCAL assumes that remoteCache is replicated) + private static final String REMOTE_SCRIPT_FOR_LOAD_SESSIONS = + "function loadSessions() {" + + " var flagClazz = cache.getClass().getClassLoader().loadClass(\"org.infinispan.context.Flag\"); \n" + + " var localFlag = java.lang.Enum.valueOf(flagClazz, \"CACHE_MODE_LOCAL\"); \n" + + " var cacheStream = cache.getAdvancedCache().withFlags([ localFlag ]).entrySet().stream();\n" + + " var result = cacheStream.skip(first).limit(max).collect(java.util.stream.Collectors.toMap(\n" + + " new java.util.function.Function() {\n" + + " apply: function(entry) {\n" + + " return entry.getKey();\n" + + " }\n" + + " },\n" + + " new java.util.function.Function() {\n" + + " apply: function(entry) {\n" + + " return entry.getValue();\n" + + " }\n" + + " }\n" + + " ));\n" + + "\n" + + " cacheStream.close();\n" + + " return result;\n" + + "};\n" + + "\n" + + "loadSessions();"; + + private final String cacheName; @@ -51,7 +78,15 @@ public class RemoteCacheSessionsLoader implements SessionLoader { @Override public void init(KeycloakSession session) { + RemoteCache remoteCache = InfinispanUtil.getRemoteCache(getCache(session)); + RemoteCache scriptCache = remoteCache.getRemoteCacheManager().getCache("___script_cache"); + + if (!scriptCache.containsKey("load-sessions.js")) { + scriptCache.put("load-sessions.js", + "// mode=local,language=javascript\n" + + REMOTE_SCRIPT_FOR_LOAD_SESSIONS); + } } @Override @@ -67,21 +102,31 @@ public class RemoteCacheSessionsLoader implements SessionLoader { RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache); - int size = remoteCache.size(); - - if (size > LIMIT) { - log.infof("Skip bulk load of '%d' sessions from remote cache '%s'. Sessions will be retrieved lazily", size, cache.getName()); - return true; - } else { - log.infof("Will do bulk load of '%d' sessions from remote cache '%s'", size, cache.getName()); - } + // TODO:mposolda + log.infof("Will do bulk load of sessions from remote cache '%s' . First: %d, max: %d", cache.getName(), first, max); - for (Map.Entry entry : remoteCache.getBulk().entrySet()) { - SessionEntity entity = (SessionEntity) entry.getValue(); - SessionEntityWrapper entityWrapper = new SessionEntityWrapper(entity); + Map remoteParams = new HashMap<>(); + remoteParams.put("first", first); + remoteParams.put("max", max); + Map remoteObjects = remoteCache.execute("load-sessions.js", remoteParams); - decoratedCache.putAsync(entry.getKey(), entityWrapper); + // TODO:mposolda + log.infof("Finished loading sessions '%s' . First: %d, max: %d", cache.getName(), first, max); + + Marshaller marshaller = remoteCache.getRemoteCacheManager().getMarshaller(); + + for (Map.Entry entry : remoteObjects.entrySet()) { + try { + String key = (String) marshaller.objectFromByteBuffer(entry.getKey()); + SessionEntity entity = (SessionEntity) marshaller.objectFromByteBuffer(entry.getValue()); + + SessionEntityWrapper entityWrapper = new SessionEntityWrapper(entity); + + decoratedCache.putAsync(key, entityWrapper); + } catch (Exception e) { + log.warnf("Error loading session from remote cache", e); + } } return true; diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/AbstractSessionCacheCommand.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/AbstractSessionCacheCommand.java index f85a8e3cc5..8ea51af800 100644 --- a/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/AbstractSessionCacheCommand.java +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/util/cli/AbstractSessionCacheCommand.java @@ -17,6 +17,8 @@ package org.keycloak.testsuite.util.cli; +import java.util.function.Function; + import org.infinispan.AdvancedCache; import org.infinispan.Cache; import org.infinispan.context.Flag; @@ -25,6 +27,7 @@ import org.keycloak.connections.infinispan.InfinispanConnectionProvider; import org.keycloak.models.KeycloakSession; import org.keycloak.models.RealmModel; import org.keycloak.models.UserModel; +import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; import org.keycloak.models.sessions.infinispan.entities.SessionEntity; import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; import org.keycloak.models.utils.KeycloakModelUtils; @@ -44,8 +47,20 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { throw new HandledException(); } - Cache ispnCache = provider.getCache(cacheName); + Cache ispnCache = provider.getCache(cacheName); doRunCacheCommand(session, ispnCache); + + ispnCache.entrySet().stream().skip(0).limit(10).collect(java.util.stream.Collectors.toMap(new java.util.function.Function() { + + public Object apply(Object entry) { + return ((java.util.Map.Entry) entry).getKey(); + } + }, new java.util.function.Function() { + + public Object apply(Object entry) { + return ((java.util.Map.Entry) entry).getValue(); + } + })); } protected void printSession(String id, UserSessionEntity userSession) { @@ -67,7 +82,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { return getName() + " "; } - protected abstract void doRunCacheCommand(KeycloakSession session, Cache cache); + protected abstract void doRunCacheCommand(KeycloakSession session, Cache cache); // IMPLS @@ -80,7 +95,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { UserSessionEntity userSession = new UserSessionEntity(); String id = getArg(1); @@ -88,7 +103,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { userSession.setRealm(getArg(2)); userSession.setLastSessionRefresh(Time.currentTime()); - cache.put(id, userSession); + cache.put(id, new SessionEntityWrapper(userSession)); } @Override @@ -106,9 +121,9 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { String id = getArg(1); - UserSessionEntity userSession = (UserSessionEntity) cache.get(id); + UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity(); printSession(id, userSession); } @@ -127,13 +142,13 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { String id = getArg(1); int count = getIntArg(2); long start = System.currentTimeMillis(); for (int i=0 ; i cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { String id = getArg(1); cache.remove(id); } @@ -175,7 +190,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { cache.clear(); } } @@ -189,7 +204,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { log.info("Size: " + cache.size()); } } @@ -203,13 +218,13 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { for (String id : cache.keySet()) { - SessionEntity entity = cache.get(id); + SessionEntity entity = cache.get(id).getEntity(); if (!(entity instanceof UserSessionEntity)) { continue; } - UserSessionEntity userSession = (UserSessionEntity) cache.get(id); + UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity(); log.info("list: key=" + id + ", value=" + toString(userSession)); } } @@ -225,10 +240,10 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { String id = getArg(1); cache = ((AdvancedCache) cache).withFlags(Flag.CACHE_MODE_LOCAL); - UserSessionEntity userSession = (UserSessionEntity) cache.get(id); + UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity(); printSession(id, userSession); } @@ -247,7 +262,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { log.info("Size local: " + cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).size()); } } @@ -261,7 +276,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { String realmName = getArg(1); int count = getIntArg(2); int batchCount = getIntArg(3); @@ -275,7 +290,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { userSession.setRealm(realmName); userSession.setLastSessionRefresh(Time.currentTime()); - cache.put(id, userSession); + cache.put(id, new SessionEntityWrapper(userSession)); } log.infof("Created '%d' sessions started from offset '%d'", countInIteration, firstInIteration); @@ -301,7 +316,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand { } @Override - protected void doRunCacheCommand(KeycloakSession session, Cache cache) { + protected void doRunCacheCommand(KeycloakSession session, Cache cache) { String realmName = getArg(1); String username = getArg(2); int count = getIntArg(3);