Merge pull request #4382 from mposolda/ispn-clientListeners-bugs
KEYCLOAK-4630 Improve preloading sessions at startup. Added SessionsPreloadCrossDCTest
This commit is contained in:
commit
6ff118e95c
11 changed files with 408 additions and 48 deletions
|
@ -248,12 +248,12 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
|
||||
private void loadSessionsFromRemoteCaches(KeycloakSession session) {
|
||||
for (String cacheName : remoteCacheInvoker.getRemoteCacheNames()) {
|
||||
loadSessionsFromRemoteCache(session.getKeycloakSessionFactory(), cacheName, getMaxErrors());
|
||||
loadSessionsFromRemoteCache(session.getKeycloakSessionFactory(), cacheName, getSessionsPerSegment(), getMaxErrors());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void loadSessionsFromRemoteCache(final KeycloakSessionFactory sessionFactory, String cacheName, final int maxErrors) {
|
||||
private void loadSessionsFromRemoteCache(final KeycloakSessionFactory sessionFactory, String cacheName, final int sessionsPerSegment, final int maxErrors) {
|
||||
log.debugf("Check pre-loading userSessions from remote cache '%s'", cacheName);
|
||||
|
||||
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
|
||||
|
@ -263,8 +263,7 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
|
||||
Cache<String, Serializable> workCache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
|
||||
// Use limit for sessionsPerSegment as RemoteCache bulk load doesn't have support for pagination :/
|
||||
BaseCacheInitializer initializer = new SingleWorkerCacheInitializer(session, workCache, new RemoteCacheSessionsLoader(cacheName), "remoteCacheLoad::" + cacheName);
|
||||
InfinispanCacheInitializer initializer = new InfinispanCacheInitializer(sessionFactory, workCache, new RemoteCacheSessionsLoader(cacheName), "remoteCacheLoad::" + cacheName, sessionsPerSegment, maxErrors);
|
||||
|
||||
initializer.initCache();
|
||||
initializer.loadSessions();
|
||||
|
|
|
@ -106,7 +106,7 @@ public abstract class BaseCacheInitializer extends CacheInitializer {
|
|||
|
||||
|
||||
private InitializerState getStateFromCache() {
|
||||
// We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored. This means that every DC needs to load offline sessions separately.
|
||||
// We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored.
|
||||
return (InitializerState) workCache.getAdvancedCache()
|
||||
.withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD)
|
||||
.get(stateKey);
|
||||
|
@ -122,7 +122,7 @@ public abstract class BaseCacheInitializer extends CacheInitializer {
|
|||
public void run() {
|
||||
|
||||
// Save this synchronously to ensure all nodes read correct state
|
||||
// We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored. This means that every DC needs to load offline sessions separately.
|
||||
// We ignore cacheStore for now, so that in Cross-DC scenario (with RemoteStore enabled) is the remoteStore ignored.
|
||||
BaseCacheInitializer.this.workCache.getAdvancedCache().
|
||||
withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS, Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD)
|
||||
.put(stateKey, state);
|
||||
|
|
|
@ -35,7 +35,7 @@ import java.util.concurrent.Future;
|
|||
* Startup initialization for reading persistent userSessions to be filled into infinispan/memory . In cluster,
|
||||
* the initialization is distributed among all cluster nodes, so the startup time is even faster
|
||||
*
|
||||
* TODO: Move to clusterService. Implementation is already pretty generic and doesn't contain any "userSession" specific stuff. All sessions-specific logic is in the SessionLoader implementation
|
||||
* Implementation is pretty generic and doesn't contain any "userSession" specific stuff. All logic related to how are sessions loaded is in the SessionLoader implementation
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
|
|
|
@ -18,10 +18,12 @@
|
|||
package org.keycloak.models.sessions.infinispan.remotestore;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.commons.marshall.Marshaller;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
|
@ -40,8 +42,33 @@ public class RemoteCacheSessionsLoader implements SessionLoader {
|
|||
|
||||
private static final Logger log = Logger.getLogger(RemoteCacheSessionsLoader.class);
|
||||
|
||||
// Hardcoded limit for now. See if needs to be configurable (or if preloading can be enabled/disabled in configuration)
|
||||
public static final int LIMIT = 100000;
|
||||
|
||||
// Javascript to be executed on remote infinispan server (Flag CACHE_MODE_LOCAL assumes that remoteCache is replicated)
|
||||
private static final String REMOTE_SCRIPT_FOR_LOAD_SESSIONS =
|
||||
"function loadSessions() {" +
|
||||
" var flagClazz = cache.getClass().getClassLoader().loadClass(\"org.infinispan.context.Flag\"); \n" +
|
||||
" var localFlag = java.lang.Enum.valueOf(flagClazz, \"CACHE_MODE_LOCAL\"); \n" +
|
||||
" var cacheStream = cache.getAdvancedCache().withFlags([ localFlag ]).entrySet().stream();\n" +
|
||||
" var result = cacheStream.skip(first).limit(max).collect(java.util.stream.Collectors.toMap(\n" +
|
||||
" new java.util.function.Function() {\n" +
|
||||
" apply: function(entry) {\n" +
|
||||
" return entry.getKey();\n" +
|
||||
" }\n" +
|
||||
" },\n" +
|
||||
" new java.util.function.Function() {\n" +
|
||||
" apply: function(entry) {\n" +
|
||||
" return entry.getValue();\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" ));\n" +
|
||||
"\n" +
|
||||
" cacheStream.close();\n" +
|
||||
" return result;\n" +
|
||||
"};\n" +
|
||||
"\n" +
|
||||
"loadSessions();";
|
||||
|
||||
|
||||
|
||||
private final String cacheName;
|
||||
|
||||
|
@ -51,7 +78,15 @@ public class RemoteCacheSessionsLoader implements SessionLoader {
|
|||
|
||||
@Override
|
||||
public void init(KeycloakSession session) {
|
||||
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(getCache(session));
|
||||
|
||||
RemoteCache<String, String> scriptCache = remoteCache.getRemoteCacheManager().getCache("___script_cache");
|
||||
|
||||
if (!scriptCache.containsKey("load-sessions.js")) {
|
||||
scriptCache.put("load-sessions.js",
|
||||
"// mode=local,language=javascript\n" +
|
||||
REMOTE_SCRIPT_FOR_LOAD_SESSIONS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,21 +102,28 @@ public class RemoteCacheSessionsLoader implements SessionLoader {
|
|||
|
||||
RemoteCache<?, ?> remoteCache = InfinispanUtil.getRemoteCache(cache);
|
||||
|
||||
int size = remoteCache.size();
|
||||
log.debugf("Will do bulk load of sessions from remote cache '%s' . First: %d, max: %d", cache.getName(), first, max);
|
||||
|
||||
if (size > LIMIT) {
|
||||
log.infof("Skip bulk load of '%d' sessions from remote cache '%s'. Sessions will be retrieved lazily", size, cache.getName());
|
||||
return true;
|
||||
} else {
|
||||
log.infof("Will do bulk load of '%d' sessions from remote cache '%s'", size, cache.getName());
|
||||
}
|
||||
Map<String, Integer> remoteParams = new HashMap<>();
|
||||
remoteParams.put("first", first);
|
||||
remoteParams.put("max", max);
|
||||
Map<byte[], byte[]> remoteObjects = remoteCache.execute("load-sessions.js", remoteParams);
|
||||
|
||||
log.debugf("Successfully finished loading sessions '%s' . First: %d, max: %d", cache.getName(), first, max);
|
||||
|
||||
for (Map.Entry<?, ?> entry : remoteCache.getBulk().entrySet()) {
|
||||
SessionEntity entity = (SessionEntity) entry.getValue();
|
||||
SessionEntityWrapper entityWrapper = new SessionEntityWrapper(entity);
|
||||
Marshaller marshaller = remoteCache.getRemoteCacheManager().getMarshaller();
|
||||
|
||||
decoratedCache.putAsync(entry.getKey(), entityWrapper);
|
||||
for (Map.Entry<byte[], byte[]> entry : remoteObjects.entrySet()) {
|
||||
try {
|
||||
String key = (String) marshaller.objectFromByteBuffer(entry.getKey());
|
||||
SessionEntity entity = (SessionEntity) marshaller.objectFromByteBuffer(entry.getValue());
|
||||
|
||||
SessionEntityWrapper entityWrapper = new SessionEntityWrapper(entity);
|
||||
|
||||
decoratedCache.putAsync(key, entityWrapper);
|
||||
} catch (Exception e) {
|
||||
log.warnf("Error loading session from remote cache", e);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
|
|
@ -469,6 +469,16 @@ It can be useful to add additional system property to enable logging:
|
|||
|
||||
-Dkeycloak.infinispan.logging.level=debug
|
||||
|
||||
Tests from package "manual" uses manual lifecycle for all servers, so needs to be executed manually. Also needs to be executed with real DB like MySQL. You can run them with:
|
||||
|
||||
mvn -Pcache-server-infinispan -Dtest=*.crossdc.manual.* -Dmanual.mode=true \
|
||||
-Dkeycloak.connectionsJpa.url.crossdc=jdbc:mysql://localhost/keycloak -Dkeycloak.connectionsJpa.driver.crossdc=com.mysql.jdbc.Driver \
|
||||
-Dkeycloak.connectionsJpa.user=keycloak -Dkeycloak.connectionsJpa.password=keycloak \
|
||||
-pl testsuite/integration-arquillian/tests/base test
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#### Run Cross-DC Tests from Intellij IDEA
|
||||
|
@ -512,6 +522,9 @@ connects to the remoteStore provided by infinispan server configured in previous
|
|||
-Dkeycloak.connectionsInfinispan.remoteStorePort=11222 -Dkeycloak.connectionsInfinispan.remoteStorePort.2=11222 -Dkeycloak.connectionsInfinispan.sessionsOwners=1
|
||||
-Dsession.cache.owners=1 -Dkeycloak.infinispan.logging.level=debug -Dresources
|
||||
|
||||
NOTE: Tests from package "manual" (eg. SessionsPreloadCrossDCTest) needs to be executed with managed containers.
|
||||
So skip steps 1,2 and add property `-Dmanual.mode=true` and change "cache.server.lifecycle.skip" to false `-Dcache.server.lifecycle.skip=false` or remove it.
|
||||
|
||||
7) If you want to debug and test manually, the servers are running on these ports (Note that not all backend servers are running by default and some might be also unused by loadbalancer):
|
||||
|
||||
Loadbalancer -> "http://localhost:8180/auth"
|
||||
|
|
|
@ -195,7 +195,13 @@ public class SimpleUndertowLoadBalancer {
|
|||
@Override
|
||||
protected Host selectHost(HttpServerExchange exchange) {
|
||||
Host host = super.selectHost(exchange);
|
||||
log.debugf("Selected host: %s, host available: %b", host.getUri().toString(), host.isAvailable());
|
||||
|
||||
if (host != null) {
|
||||
log.debugf("Selected host: %s, host available: %b", host.getUri().toString(), host.isAvailable());
|
||||
} else {
|
||||
log.warn("No host available");
|
||||
}
|
||||
|
||||
exchange.putAttachment(SELECTED_HOST, host);
|
||||
return host;
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ import javax.ws.rs.NotFoundException;
|
|||
*/
|
||||
public class AuthServerTestEnricher {
|
||||
|
||||
protected final Logger log = Logger.getLogger(this.getClass());
|
||||
protected static final Logger log = Logger.getLogger(AuthServerTestEnricher.class);
|
||||
|
||||
@Inject
|
||||
private Instance<ContainerRegistry> containerRegistry;
|
||||
|
@ -84,6 +84,10 @@ public class AuthServerTestEnricher {
|
|||
private static final Boolean START_MIGRATION_CONTAINER = "auto".equals(System.getProperty("migration.mode")) ||
|
||||
"manual".equals(System.getProperty("migration.mode"));
|
||||
|
||||
// In manual mode are all containers despite loadbalancers started in mode "manual" and nothing is managed through "suite".
|
||||
// Useful for tests, which require restart servers etc.
|
||||
private static final String MANUAL_MODE = "manual.mode";
|
||||
|
||||
@Inject
|
||||
@SuiteScoped
|
||||
private InstanceProducer<SuiteContext> suiteContextProducer;
|
||||
|
@ -118,6 +122,9 @@ public class AuthServerTestEnricher {
|
|||
.map(ContainerInfo::new)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// A way to specify that containers should be in mode "manual" rather then "suite"
|
||||
checkManualMode(containers);
|
||||
|
||||
suiteContext = new SuiteContext(containers);
|
||||
|
||||
if (AUTH_SERVER_CROSS_DC) {
|
||||
|
@ -148,6 +155,15 @@ public class AuthServerTestEnricher {
|
|||
suiteContext.addAuthServerBackendsInfo(Integer.valueOf(dcString), c);
|
||||
});
|
||||
|
||||
containers.stream()
|
||||
.filter(c -> c.getQualifier().startsWith("cache-server-cross-dc-"))
|
||||
.sorted((a, b) -> a.getQualifier().compareTo(b.getQualifier()))
|
||||
.forEach(containerInfo -> {
|
||||
int prefixSize = "cache-server-cross-dc-".length();
|
||||
int dcIndex = Integer.parseInt(containerInfo.getQualifier().substring(prefixSize)) -1;
|
||||
suiteContext.addCacheServerInfo(dcIndex, containerInfo);
|
||||
});
|
||||
|
||||
if (suiteContext.getDcAuthServerInfo().isEmpty()) {
|
||||
throw new RuntimeException(String.format("No auth server container matching '%s' found in arquillian.xml.", AUTH_SERVER_BACKEND));
|
||||
}
|
||||
|
@ -157,6 +173,9 @@ public class AuthServerTestEnricher {
|
|||
if (suiteContext.getDcAuthServerBackendsInfo().stream().anyMatch(List::isEmpty)) {
|
||||
throw new RuntimeException(String.format("Some data center has no auth server container matching '%s' defined in arquillian.xml.", AUTH_SERVER_BACKEND));
|
||||
}
|
||||
if (suiteContext.getCacheServersInfo().isEmpty()) {
|
||||
throw new IllegalStateException("Cache containers misconfiguration");
|
||||
}
|
||||
|
||||
log.info("Using frontend containers: " + this.suiteContext.getDcAuthServerInfo().stream()
|
||||
.map(ContainerInfo::getQualifier)
|
||||
|
@ -270,10 +289,23 @@ public class AuthServerTestEnricher {
|
|||
public void afterClass(@Observes(precedence = 2) AfterClass event) {
|
||||
TestContext testContext = testContextProducer.get();
|
||||
|
||||
List<RealmRepresentation> testRealmReps = testContext.getTestRealmReps();
|
||||
Keycloak adminClient = testContext.getAdminClient();
|
||||
KeycloakTestingClient testingClient = testContext.getTestingClient();
|
||||
|
||||
removeTestRealms(testContext, adminClient);
|
||||
|
||||
if (adminClient != null) {
|
||||
adminClient.close();
|
||||
}
|
||||
|
||||
if (testingClient != null) {
|
||||
testingClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void removeTestRealms(TestContext testContext, Keycloak adminClient) {
|
||||
List<RealmRepresentation> testRealmReps = testContext.getTestRealmReps();
|
||||
if (testRealmReps != null) {
|
||||
log.info("removing test realms after test class");
|
||||
for (RealmRepresentation testRealm : testRealmReps) {
|
||||
|
@ -286,13 +318,20 @@ public class AuthServerTestEnricher {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (adminClient != null) {
|
||||
adminClient.close();
|
||||
}
|
||||
|
||||
if (testingClient != null) {
|
||||
testingClient.close();
|
||||
private void checkManualMode(Set<ContainerInfo> containers) {
|
||||
String manualMode = System.getProperty(MANUAL_MODE);
|
||||
|
||||
if (Boolean.parseBoolean(manualMode)) {
|
||||
|
||||
containers.stream()
|
||||
.filter(containerInfo -> !containerInfo.getQualifier().contains("balancer"))
|
||||
.forEach(containerInfo -> {
|
||||
log.infof("Container '%s' will be in manual mode", containerInfo.getQualifier());
|
||||
containerInfo.getArquillianContainer().getContainerConfiguration().setMode("manual");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,6 +40,8 @@ public final class SuiteContext {
|
|||
private List<ContainerInfo> authServerInfo = new LinkedList<>();
|
||||
private final List<List<ContainerInfo>> authServerBackendsInfo = new ArrayList<>();
|
||||
|
||||
private final List<ContainerInfo> cacheServersInfo = new ArrayList<>();
|
||||
|
||||
private ContainerInfo migratedAuthServerInfo;
|
||||
private final MigrationContext migrationContext = new MigrationContext();
|
||||
|
||||
|
@ -96,6 +98,13 @@ public final class SuiteContext {
|
|||
this.authServerInfo.set(dcIndex, serverInfo);
|
||||
}
|
||||
|
||||
public void addCacheServerInfo(int dcIndex, ContainerInfo serverInfo) {
|
||||
while (dcIndex >= cacheServersInfo.size()) {
|
||||
cacheServersInfo.add(null);
|
||||
}
|
||||
this.cacheServersInfo.set(dcIndex, serverInfo);
|
||||
}
|
||||
|
||||
public List<ContainerInfo> getAuthServerBackendsInfo() {
|
||||
return getAuthServerBackendsInfo(0);
|
||||
}
|
||||
|
@ -108,6 +117,10 @@ public final class SuiteContext {
|
|||
return authServerBackendsInfo;
|
||||
}
|
||||
|
||||
public List<ContainerInfo> getCacheServersInfo() {
|
||||
return cacheServersInfo;
|
||||
}
|
||||
|
||||
public void addAuthServerBackendsInfo(int dcIndex, ContainerInfo container) {
|
||||
while (dcIndex >= authServerBackendsInfo.size()) {
|
||||
authServerBackendsInfo.add(new LinkedList<>());
|
||||
|
@ -161,6 +174,10 @@ public final class SuiteContext {
|
|||
int dcIndex = i;
|
||||
getDcAuthServerBackendsInfo().get(i).forEach(bInfo -> sb.append("Backend (dc=").append(dcIndex).append("): ").append(bInfo).append("\n"));
|
||||
}
|
||||
|
||||
for (int dcIndex=0 ; dcIndex<cacheServersInfo.size() ; dcIndex++) {
|
||||
sb.append("CacheServer (dc=").append(dcIndex).append("): ").append(getCacheServersInfo().get(dcIndex)).append("\n");
|
||||
}
|
||||
} else if (isAuthServerCluster()) {
|
||||
sb.append(isAuthServerCluster() ? "\nFrontend: " : "")
|
||||
.append(getAuthServerInfo().getQualifier())
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.keycloak.testsuite.crossdc;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.keycloak.admin.client.Keycloak;
|
||||
import org.keycloak.models.Constants;
|
||||
import org.keycloak.testsuite.AbstractTestRealmKeycloakTest;
|
||||
|
@ -24,6 +25,8 @@ import org.keycloak.testsuite.arquillian.LoadBalancerController;
|
|||
import org.keycloak.testsuite.arquillian.annotation.LoadBalancer;
|
||||
import org.keycloak.testsuite.auth.page.AuthRealm;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -321,6 +324,45 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
|
|||
return dcNodes.stream().filter(c -> ! c.isManual());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns cache server corresponding to given DC
|
||||
* @param dc
|
||||
* @return
|
||||
*/
|
||||
public ContainerInfo getCacheServer(DC dc) {
|
||||
int dcIndex = dc.ordinal();
|
||||
return this.suiteContext.getCacheServersInfo().get(dcIndex);
|
||||
}
|
||||
|
||||
|
||||
public void stopCacheServer(ContainerInfo cacheServer) {
|
||||
log.infof("Stopping %s", cacheServer.getQualifier());
|
||||
|
||||
containerController.stop(cacheServer.getQualifier());
|
||||
|
||||
// Workaround for possible arquillian bug. Needs to cleanup dir manually
|
||||
String setupCleanServerBaseDir = cacheServer.getArquillianContainer().getContainerConfiguration().getContainerProperties().get("setupCleanServerBaseDir");
|
||||
String cleanServerBaseDir = cacheServer.getArquillianContainer().getContainerConfiguration().getContainerProperties().get("cleanServerBaseDir");
|
||||
|
||||
if (Boolean.parseBoolean(setupCleanServerBaseDir)) {
|
||||
log.infof("Going to clean directory: %s", cleanServerBaseDir);
|
||||
|
||||
File dir = new File(cleanServerBaseDir);
|
||||
if (dir.exists()) {
|
||||
try {
|
||||
FileUtils.cleanDirectory(dir);
|
||||
|
||||
File deploymentsDir = new File(dir, "deployments");
|
||||
deploymentsDir.mkdir();
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("Failed to clean directory: " + cleanServerBaseDir, ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.infof("Stopped %s", cacheServer.getQualifier());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sets time offset on all the started containers.
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.testsuite.crossdc.manual;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.keycloak.OAuth2Constants;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.testsuite.Assert;
|
||||
import org.keycloak.testsuite.arquillian.AuthServerTestEnricher;
|
||||
import org.keycloak.testsuite.crossdc.AbstractAdminCrossDCTest;
|
||||
import org.keycloak.testsuite.crossdc.DC;
|
||||
import org.keycloak.testsuite.util.OAuthClient;
|
||||
|
||||
/**
|
||||
* Tests userSessions and offline sessions preloading at startup
|
||||
*
|
||||
* This test requires that lifecycle of infinispan/JDG servers is managed by testsuite, so you need to run with:
|
||||
*
|
||||
* -Dmanual.mode=true
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class SessionsPreloadCrossDCTest extends AbstractAdminCrossDCTest {
|
||||
|
||||
private static final int SESSIONS_COUNT = 10;
|
||||
|
||||
@Override
|
||||
public void beforeAbstractKeycloakTest() throws Exception {
|
||||
// Doublecheck we are in manual mode
|
||||
Assert.assertTrue("The test requires to be executed with manual.mode=true", suiteContext.getCacheServersInfo().get(0).isManual());
|
||||
|
||||
stopAllCacheServersAndAuthServers();
|
||||
|
||||
// Start DC1 only
|
||||
containerController.start(getCacheServer(DC.FIRST).getQualifier());
|
||||
startBackendNode(DC.FIRST, 0);
|
||||
enableLoadBalancerNode(DC.FIRST, 0);
|
||||
|
||||
super.beforeAbstractKeycloakTest();
|
||||
}
|
||||
|
||||
|
||||
// Override as we are in manual mode
|
||||
@Override
|
||||
public void enableOnlyFirstNodeInFirstDc() {
|
||||
}
|
||||
|
||||
|
||||
// Override as we are in manual mode
|
||||
@Override
|
||||
public void terminateManuallyStartedServers() {
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void afterAbstractKeycloakTest() {
|
||||
super.afterAbstractKeycloakTest();
|
||||
|
||||
// Remove realms now. In @AfterClass servers are already shutdown
|
||||
AuthServerTestEnricher.removeTestRealms(testContext, adminClient);
|
||||
testContext.setTestRealmReps(null);
|
||||
|
||||
adminClient.close();
|
||||
adminClient = null;
|
||||
testContext.setAdminClient(null);
|
||||
|
||||
stopAllCacheServersAndAuthServers();
|
||||
}
|
||||
|
||||
private void stopAllCacheServersAndAuthServers() {
|
||||
log.infof("Going to stop all auth servers");
|
||||
|
||||
stopBackendNode(DC.FIRST, 0);
|
||||
disableLoadBalancerNode(DC.FIRST, 0);
|
||||
stopBackendNode(DC.SECOND, 0);
|
||||
disableLoadBalancerNode(DC.SECOND, 0);
|
||||
|
||||
log.infof("Auth servers stopped successfully. Going to stop all cache servers");
|
||||
|
||||
suiteContext.getCacheServersInfo().stream()
|
||||
.filter(containerInfo -> containerInfo.isStarted())
|
||||
.forEach(containerInfo -> {
|
||||
stopCacheServer(containerInfo);
|
||||
});
|
||||
|
||||
log.infof("Cache servers stopped successfully");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void sessionsPreloadTest() throws Exception {
|
||||
int sessionsBefore = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.SESSION_CACHE_NAME).size();
|
||||
log.infof("sessionsBefore: %d", sessionsBefore);
|
||||
|
||||
// Create initial sessions
|
||||
createInitialSessions(false);
|
||||
|
||||
// Start 2nd DC.
|
||||
containerController.start(getCacheServer(DC.SECOND).getQualifier());
|
||||
startBackendNode(DC.SECOND, 0);
|
||||
enableLoadBalancerNode(DC.SECOND, 0);
|
||||
|
||||
// Ensure sessions are loaded in both 1st DC and 2nd DC
|
||||
int sessions01 = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.SESSION_CACHE_NAME).size();
|
||||
int sessions02 = getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.SESSION_CACHE_NAME).size();
|
||||
log.infof("sessions01: %d, sessions02: %d", sessions01, sessions02);
|
||||
Assert.assertEquals(sessions01, sessionsBefore + SESSIONS_COUNT);
|
||||
Assert.assertEquals(sessions02, sessionsBefore + SESSIONS_COUNT);
|
||||
|
||||
// On DC2 sessions were preloaded from from remoteCache
|
||||
Assert.assertTrue(getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.WORK_CACHE_NAME).contains("distributed::remoteCacheLoad::sessions"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void offlineSessionsPreloadTest() throws Exception {
|
||||
int offlineSessionsBefore = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME).size();
|
||||
log.infof("offlineSessionsBefore: %d", offlineSessionsBefore);
|
||||
|
||||
// Create initial sessions
|
||||
createInitialSessions(true);
|
||||
|
||||
int offlineSessions01 = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME).size();
|
||||
Assert.assertEquals(offlineSessions01, offlineSessionsBefore + SESSIONS_COUNT);
|
||||
log.infof("offlineSessions01: %d", offlineSessions01);
|
||||
|
||||
// Stop Everything
|
||||
stopAllCacheServersAndAuthServers();
|
||||
|
||||
// Start DC1. Sessions should be preloaded from DB
|
||||
containerController.start(getCacheServer(DC.FIRST).getQualifier());
|
||||
startBackendNode(DC.FIRST, 0);
|
||||
enableLoadBalancerNode(DC.FIRST, 0);
|
||||
|
||||
// Start DC2. Sessions should be preloaded from remoteCache
|
||||
containerController.start(getCacheServer(DC.SECOND).getQualifier());
|
||||
startBackendNode(DC.SECOND, 0);
|
||||
enableLoadBalancerNode(DC.SECOND, 0);
|
||||
|
||||
// Ensure sessions are loaded in both 1st DC and 2nd DC
|
||||
int offlineSessions11 = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME).size();
|
||||
int offlineSessions12 = getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME).size();
|
||||
log.infof("offlineSessions11: %d, offlineSessions12: %d", offlineSessions11, offlineSessions12);
|
||||
Assert.assertEquals(offlineSessions11, offlineSessionsBefore + SESSIONS_COUNT);
|
||||
Assert.assertEquals(offlineSessions12, offlineSessionsBefore + SESSIONS_COUNT);
|
||||
|
||||
// On DC1 sessions were preloaded from DB. On DC2 sessions were preloaded from remoteCache
|
||||
Assert.assertTrue(getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.WORK_CACHE_NAME).contains("distributed::offlineUserSessions"));
|
||||
Assert.assertFalse(getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.WORK_CACHE_NAME).contains("distributed::remoteCacheLoad::offlineSessions"));
|
||||
|
||||
Assert.assertFalse(getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.WORK_CACHE_NAME).contains("distributed::offlineUserSessions"));
|
||||
Assert.assertTrue(getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.WORK_CACHE_NAME).contains("distributed::remoteCacheLoad::offlineSessions"));
|
||||
}
|
||||
|
||||
|
||||
private void createInitialSessions(boolean offline) throws Exception {
|
||||
if (offline) {
|
||||
oauth.scope(OAuth2Constants.OFFLINE_ACCESS);
|
||||
}
|
||||
|
||||
for (int i=0 ; i<SESSIONS_COUNT ; i++) {
|
||||
OAuthClient.AccessTokenResponse resp = oauth.doGrantAccessTokenRequest("password", "test-user@localhost", "password");
|
||||
Assert.assertNull(resp.getError());
|
||||
Assert.assertNotNull(resp.getAccessToken());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.keycloak.testsuite.util.cli;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.context.Flag;
|
||||
|
@ -25,6 +27,7 @@ import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
|||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
@ -44,8 +47,20 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
throw new HandledException();
|
||||
}
|
||||
|
||||
Cache<String, SessionEntity> ispnCache = provider.getCache(cacheName);
|
||||
Cache<String, SessionEntityWrapper> ispnCache = provider.getCache(cacheName);
|
||||
doRunCacheCommand(session, ispnCache);
|
||||
|
||||
ispnCache.entrySet().stream().skip(0).limit(10).collect(java.util.stream.Collectors.toMap(new java.util.function.Function() {
|
||||
|
||||
public Object apply(Object entry) {
|
||||
return ((java.util.Map.Entry) entry).getKey();
|
||||
}
|
||||
}, new java.util.function.Function() {
|
||||
|
||||
public Object apply(Object entry) {
|
||||
return ((java.util.Map.Entry) entry).getValue();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
protected void printSession(String id, UserSessionEntity userSession) {
|
||||
|
@ -67,7 +82,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
return getName() + " <cache-name>";
|
||||
}
|
||||
|
||||
protected abstract void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache);
|
||||
protected abstract void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache);
|
||||
|
||||
|
||||
// IMPLS
|
||||
|
@ -80,7 +95,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
UserSessionEntity userSession = new UserSessionEntity();
|
||||
String id = getArg(1);
|
||||
|
||||
|
@ -88,7 +103,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
userSession.setRealm(getArg(2));
|
||||
|
||||
userSession.setLastSessionRefresh(Time.currentTime());
|
||||
cache.put(id, userSession);
|
||||
cache.put(id, new SessionEntityWrapper(userSession));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -106,9 +121,9 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
String id = getArg(1);
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id);
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity();
|
||||
printSession(id, userSession);
|
||||
}
|
||||
|
||||
|
@ -127,13 +142,13 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
String id = getArg(1);
|
||||
int count = getIntArg(2);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
for (int i=0 ; i<count ; i++) {
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id);
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity();
|
||||
//printSession(id, userSession);
|
||||
}
|
||||
long took = System.currentTimeMillis() - start;
|
||||
|
@ -155,7 +170,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
String id = getArg(1);
|
||||
cache.remove(id);
|
||||
}
|
||||
|
@ -175,7 +190,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
cache.clear();
|
||||
}
|
||||
}
|
||||
|
@ -189,7 +204,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
log.info("Size: " + cache.size());
|
||||
}
|
||||
}
|
||||
|
@ -203,13 +218,13 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
for (String id : cache.keySet()) {
|
||||
SessionEntity entity = cache.get(id);
|
||||
SessionEntity entity = cache.get(id).getEntity();
|
||||
if (!(entity instanceof UserSessionEntity)) {
|
||||
continue;
|
||||
}
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id);
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity();
|
||||
log.info("list: key=" + id + ", value=" + toString(userSession));
|
||||
}
|
||||
}
|
||||
|
@ -225,10 +240,10 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
String id = getArg(1);
|
||||
cache = ((AdvancedCache) cache).withFlags(Flag.CACHE_MODE_LOCAL);
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id);
|
||||
UserSessionEntity userSession = (UserSessionEntity) cache.get(id).getEntity();
|
||||
printSession(id, userSession);
|
||||
}
|
||||
|
||||
|
@ -247,7 +262,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
log.info("Size local: " + cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).size());
|
||||
}
|
||||
}
|
||||
|
@ -261,7 +276,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
String realmName = getArg(1);
|
||||
int count = getIntArg(2);
|
||||
int batchCount = getIntArg(3);
|
||||
|
@ -275,7 +290,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
userSession.setRealm(realmName);
|
||||
|
||||
userSession.setLastSessionRefresh(Time.currentTime());
|
||||
cache.put(id, userSession);
|
||||
cache.put(id, new SessionEntityWrapper(userSession));
|
||||
}
|
||||
|
||||
log.infof("Created '%d' sessions started from offset '%d'", countInIteration, firstInIteration);
|
||||
|
@ -301,7 +316,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntityWrapper> cache) {
|
||||
String realmName = getArg(1);
|
||||
String username = getArg(2);
|
||||
int count = getIntArg(3);
|
||||
|
|
Loading…
Reference in a new issue