Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Bill Burke 2017-08-09 10:25:25 -04:00
commit 3470b1839d
37 changed files with 916 additions and 377 deletions

View file

@ -292,6 +292,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
// We have userSession, which passes predicate. No need for remote lookup.
if (predicate.test(userSession)) {
log.debugf("getUserSessionWithPredicate(%s): found in local cache", id);
return userSession;
}
@ -302,6 +303,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
if (remoteCache != null) {
UserSessionEntity remoteSessionEntity = (UserSessionEntity) remoteCache.get(id);
if (remoteSessionEntity != null) {
log.debugf("getUserSessionWithPredicate(%s): remote cache contains session entity %s", id, remoteSessionEntity);
UserSessionModel remoteSessionAdapter = wrap(realm, remoteSessionEntity, offline);
if (predicate.test(remoteSessionAdapter)) {
@ -319,11 +321,26 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
// Recursion. We should have it locally now
return getUserSessionWithPredicate(realm, id, offline, predicate);
}
}
}
} else {
log.debugf("getUserSessionWithPredicate(%s): found, but predicate doesn't pass", id);
return null;
return null;
}
} else {
log.debugf("getUserSessionWithPredicate(%s): not found", id);
// Session not available on remoteCache. Was already removed there. So removing locally too.
// TODO: Can be optimized to skip calling remoteCache.remove
removeUserSession(realm, userSession);
return null;
}
} else {
log.debugf("getUserSessionWithPredicate(%s): remote cache not available", id);
return null;
}
}

View file

@ -163,6 +163,11 @@ public class UserSessionAdapter implements UserSessionModel {
return new LastSessionRefreshChecker(provider.getLastSessionRefreshStore(), provider.getOfflineLastSessionRefreshStore())
.getCrossDCMessageStatus(UserSessionAdapter.this.session, UserSessionAdapter.this.realm, sessionWrapper, offline, lastSessionRefresh);
}
@Override
public String toString() {
return "setLastSessionRefresh(" + lastSessionRefresh + ')';
}
};
update(task);

View file

@ -127,7 +127,16 @@ public class InfinispanChangelogBasedTransaction<S extends SessionEntity> extend
return wrappedEntity;
} else {
return myUpdates.getEntityWrapper();
S entity = myUpdates.getEntityWrapper().getEntity();
// If entity is scheduled for remove, we don't return it.
boolean scheduledForRemove = myUpdates.getUpdateTasks().stream().filter((SessionUpdateTask task) -> {
return task.getOperation(entity) == SessionUpdateTask.CacheOperation.REMOVE;
}).findFirst().isPresent();
return scheduledForRemove ? null : myUpdates.getEntityWrapper();
}
}

View file

@ -95,5 +95,10 @@ class MergedUpdate<S extends SessionEntity> implements SessionUpdateTask<S> {
return result;
}
@Override
public String toString() {
return "MergedUpdate" + childUpdates;
}
}

View file

@ -117,6 +117,10 @@ public class SessionEntityWrapper<S extends SessionEntity> {
+ Objects.hashCode(entity);
}
@Override
public String toString() {
return "SessionEntityWrapper{" + "version=" + version + ", entity=" + entity + ", localMetadata=" + localMetadata + '}';
}
public static class ExternalizerImpl implements Externalizer<SessionEntityWrapper> {

View file

@ -41,10 +41,6 @@ public class LastSessionRefreshChecker {
}
// Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
public SessionUpdateTask.CrossDCMessageStatus getCrossDCMessageStatus(KeycloakSession kcSession, RealmModel realm, SessionEntityWrapper<UserSessionEntity> sessionWrapper, boolean offline, int newLastSessionRefresh) {
// revokeRefreshToken always writes everything to remoteCache immediately
if (realm.isRevokeRefreshToken()) {
@ -62,7 +58,7 @@ public class LastSessionRefreshChecker {
return SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED;
}
Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(LAST_SESSION_REFRESH_REMOTE);
Integer lsrr = sessionWrapper.getLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE);
if (lsrr == null) {
logger.warnf("Not available lsrr note on user session %s.", sessionWrapper.getEntity().getId());
return SessionUpdateTask.CrossDCMessageStatus.SYNC;

View file

@ -24,7 +24,7 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
public class SessionEntity implements Serializable {
public abstract class SessionEntity implements Serializable {
private String id;

View file

@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -39,7 +40,7 @@ public class UserSessionEntity extends SessionEntity {
public static final Logger logger = Logger.getLogger(UserSessionEntity.class);
// Tracks the "lastSessionRefresh" from userSession entity from remote cache
// Metadata attribute, which contains the lastSessionRefresh available on remoteCache. Used in decide whether we need to write to remoteCache (DC) or not
public static final String LAST_SESSION_REFRESH_REMOTE = "lsrr";
private String user;
@ -163,7 +164,8 @@ public class UserSessionEntity extends SessionEntity {
@Override
public String toString() {
return String.format("UserSessionEntity [ id=%s, realm=%s, lastSessionRefresh=%d]", getId(), getRealm(), getLastSessionRefresh());
return String.format("UserSessionEntity [id=%s, realm=%s, lastSessionRefresh=%d, clients=%s]", getId(), getRealm(), getLastSessionRefresh(),
new TreeSet(this.authenticatedClientSessions.keySet()));
}
@Override
@ -194,8 +196,12 @@ public class UserSessionEntity extends SessionEntity {
public static class ExternalizerImpl implements Externalizer<UserSessionEntity> {
private static final int VERSION_1 = 1;
@Override
public void writeObject(ObjectOutput output, UserSessionEntity session) throws IOException {
output.writeByte(VERSION_1);
MarshallUtil.marshallString(session.getAuthMethod(), output);
MarshallUtil.marshallString(session.getBrokerSessionId(), output);
MarshallUtil.marshallString(session.getBrokerUserId(), output);
@ -223,6 +229,15 @@ public class UserSessionEntity extends SessionEntity {
@Override
public UserSessionEntity readObject(ObjectInput input) throws IOException, ClassNotFoundException {
switch (input.readByte()) {
case VERSION_1:
return readObjectVersion1(input);
default:
throw new IOException("Unknown version");
}
}
public UserSessionEntity readObjectVersion1(ObjectInput input) throws IOException, ClassNotFoundException {
UserSessionEntity sessionEntity = new UserSessionEntity();
sessionEntity.setAuthMethod(MarshallUtil.unmarshallString(input));

View file

@ -87,8 +87,8 @@ public class KcRemoteStore extends RemoteStore {
public boolean delete(Object key) throws PersistenceException {
logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
// Optimization - we don't need to know the previous value. Also it's ok to trigger asynchronously
getRemoteCache().removeAsync(key);
// Optimization - we don't need to know the previous value.
getRemoteCache().remove(key);
return true;
}

View file

@ -17,6 +17,7 @@
package org.keycloak.models.sessions.infinispan.remotestore;
import org.keycloak.common.util.Time;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -32,6 +33,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -65,7 +67,7 @@ public class RemoteCacheInvoker {
SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper);
if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) {
logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation.toString());
logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
return;
}
@ -76,11 +78,12 @@ public class RemoteCacheInvoker {
logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, session);
runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
}
private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, S session) {
private <S extends SessionEntity> void runOnRemoteCache(RemoteCache remoteCache, long maxIdleMs, String key, SessionUpdateTask<S> task, SessionEntityWrapper<S> sessionWrapper) {
S session = sessionWrapper.getEntity();
SessionUpdateTask.CacheOperation operation = task.getOperation(session);
switch (operation) {
@ -92,12 +95,14 @@ public class RemoteCacheInvoker {
remoteCache.put(key, session, task.getLifespanMs(), TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
break;
case ADD_IF_ABSENT:
final int currentTime = Time.currentTime();
SessionEntity existing = (SessionEntity) remoteCache
.withFlags(Flag.FORCE_RETURN_VALUE)
.putIfAbsent(key, session, -1, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
if (existing != null) {
throw new IllegalStateException("There is already existing value in cache for key " + key);
}
sessionWrapper.putLocalMetadataNoteInt(UserSessionEntity.LAST_SESSION_REFRESH_REMOTE, currentTime);
break;
case REPLACE:
replace(remoteCache, task.getLifespanMs(), maxIdleMs, key, task);
@ -122,17 +127,15 @@ public class RemoteCacheInvoker {
// Run task on the remote session
task.runUpdate(session);
if (logger.isDebugEnabled()) {
logger.debugf("Before replaceWithVersion. Written entity: %s", session.toString());
}
logger.debugf("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
replaced = remoteCache.replaceWithVersion(key, session, versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
if (!replaced) {
logger.debugf("Failed to replace entity '%s' . Will retry again", key);
logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion());
} else {
if (logger.isDebugEnabled()) {
logger.debugf("Replaced entity in remote cache: %s", session.toString());
logger.debugf("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
}
}
}

View file

@ -36,6 +36,9 @@ import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
import java.util.Random;
import java.util.logging.Level;
import org.infinispan.client.hotrod.VersionedValue;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -85,25 +88,48 @@ public class RemoteCacheSessionListener {
if (shouldUpdateLocalCache(event.getType(), key, event.isCommandRetried())) {
replaceRemoteEntityInCache(key);
replaceRemoteEntityInCache(key, event.getVersion());
}
}
private static final int MAXIMUM_REPLACE_RETRIES = 10;
private void replaceRemoteEntityInCache(String key) {
private void replaceRemoteEntityInCache(String key, long eventVersion) {
// TODO can be optimized and remoteSession sent in the event itself?
SessionEntityWrapper localEntityWrapper = cache.get(key);
SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
boolean replaced = false;
int replaceRetries = 0;
int sleepInterval = 25;
do {
replaceRetries++;
if (logger.isDebugEnabled()) {
logger.debugf("Read session. Entity read from remote cache: %s", remoteSession.toString());
}
SessionEntityWrapper localEntityWrapper = cache.get(key);
VersionedValue remoteSessionVersioned = remoteCache.getVersioned(key);
if (remoteSessionVersioned == null || remoteSessionVersioned.getVersion() < eventVersion) {
try {
logger.debugf("Got replace remote entity event prematurely, will try again. Event version: %d, got: %d",
eventVersion, remoteSessionVersioned == null ? -1 : remoteSessionVersioned.getVersion());
Thread.sleep(new Random().nextInt(sleepInterval)); // using exponential backoff
continue;
} catch (InterruptedException ex) {
continue;
} finally {
sleepInterval = sleepInterval << 1;
}
}
SessionEntity remoteSession = (SessionEntity) remoteCache.get(key);
SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
logger.debugf("Read session%s. Entity read from remote cache: %s", replaceRetries > 1 ? "" : " again", remoteSession);
// We received event from remoteCache, so we won't update it back
cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
.replace(key, sessionWrapper);
SessionEntityWrapper sessionWrapper = remoteSession.mergeRemoteEntityWithLocalEntity(localEntityWrapper);
// We received event from remoteCache, so we won't update it back
replaced = cache.getAdvancedCache().withFlags(Flag.SKIP_CACHE_STORE, Flag.SKIP_CACHE_LOAD, Flag.IGNORE_RETURN_VALUES)
.replace(key, localEntityWrapper, sessionWrapper);
if (! replaced) {
logger.debugf("Did not succeed in merging sessions, will try again: %s", remoteSession);
}
} while (replaceRetries < MAXIMUM_REPLACE_RETRIES && ! replaced);
}

View file

@ -28,17 +28,11 @@ import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.junit.Assert;
import org.junit.Ignore;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
@ -128,7 +122,7 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
}
private static Worker createWorker(int threadId) {
EmbeddedCacheManager manager = createManager(threadId);
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
@ -140,56 +134,6 @@ public class ConcurrencyJDGRemoteCacheClientListenersTest {
return new Worker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
@ClientListener
public static class HotRodListener {

View file

@ -83,7 +83,7 @@ public class ConcurrencyJDGRemoteCacheTest {
}
private static Worker createWorker(int threadId) {
EmbeddedCacheManager manager = createManager(threadId);
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
System.out.println("Retrieved cache: " + threadId);
@ -95,56 +95,6 @@ public class ConcurrencyJDGRemoteCacheTest {
return new Worker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(RemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.WORK_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
@ClientListener
public static class HotRodListener {

View file

@ -0,0 +1,292 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.context.Flag;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
*
* Also check that listeners are executed asynchronously with some delay.
*
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ConcurrencyJDGRemoveSessionTest {
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
private static final int ITERATIONS = 10000;
private static RemoteCache remoteCache1;
private static RemoteCache remoteCache2;
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
//private static Map<String, EntryInfo> state = new HashMap<>();
public static void main(String[] args) throws Exception {
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.SESSION_CACHE_NAME);
// Create caches, listeners and finally worker threads
Thread worker1 = createWorker(cache1, 1);
Thread worker2 = createWorker(cache2, 2);
// Create 100 initial sessions
for (int i=0 ; i<ITERATIONS ; i++) {
String sessionId = String.valueOf(i);
SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
cache1.put(sessionId, wrappedSession);
}
logger.info("SESSIONS CREATED");
// Create 100 initial sessions
for (int i=0 ; i<ITERATIONS ; i++) {
String sessionId = String.valueOf(i);
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
}
logger.info("SESSIONS AVAILABLE ON DC2");
long start = System.currentTimeMillis();
try {
// Just running in current thread
worker1.run();
logger.info("SESSIONS REMOVED");
//Thread.sleep(5000);
// Doing it in opposite direction to ensure that newer are checked first.
// This us currently FAILING (expected) as listeners are executed asynchronously.
for (int i=ITERATIONS-1 ; i>=0 ; i--) {
String sessionId = String.valueOf(i);
logger.infof("Before call cache2.get: %s", sessionId);
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
}
logger.info("SESSIONS NOT AVAILABLE ON DC2");
// // Start and join workers
// worker1.start();
// worker2.start();
//
// worker1.join();
// worker2.join();
} finally {
Thread.sleep(2000);
// Finish JVM
cache1.getCacheManager().stop();
cache2.getCacheManager().stop();
}
long took = System.currentTimeMillis() - start;
// // Output
// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
// System.out.println(entry.getKey() + ":::" + entry.getValue());
// worker1.cache.remove(entry.getKey());
// }
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
//
// System.out.println("Sleeping before other report");
//
// Thread.sleep(1000);
//
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
}
private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
// Create 100 initial sessions
UserSessionEntity session = new UserSessionEntity();
session.setId(sessionId);
session.setRealm("foo");
session.setBrokerSessionId("!23123123");
session.setBrokerUserId(null);
session.setUser("foo");
session.setLoginUsername("foo");
session.setIpAddress("123.44.143.178");
session.setStarted(Time.currentTime());
session.setLastSessionRefresh(Time.currentTime());
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
clientSession.setAuthMethod("saml");
clientSession.setAction("something");
clientSession.setTimestamp(1234);
clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
session.getAuthenticatedClientSessions().put("client1", clientSession);
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
return wrappedSession;
}
private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
System.out.println("Retrieved cache: " + threadId);
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
if (threadId == 1) {
remoteCache1 = remoteCache;
} else {
remoteCache2 = remoteCache;
}
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
remoteCache.addClientListener(listener);
return new RemoteCacheWorker(remoteCache, threadId);
//return new CacheWorker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
}
@ClientListener
public static class HotRodListener {
private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
private RemoteCache remoteCache;
private AtomicInteger listenerCount;
public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
this.listenerCount = listenerCount;
this.remoteCache = remoteCache;
this.origCache = origCache;
}
@ClientCacheEntryCreated
public void created(ClientCacheEntryCreatedEvent event) {
String cacheKey = (String) event.getKey();
logger.infof("Listener executed for creating of session %s", cacheKey);
}
@ClientCacheEntryModified
public void modified(ClientCacheEntryModifiedEvent event) {
String cacheKey = (String) event.getKey();
logger.infof("Listener executed for modifying of session %s", cacheKey);
}
@ClientCacheEntryRemoved
public void removed(ClientCacheEntryRemovedEvent event) {
String cacheKey = (String) event.getKey();
logger.infof("Listener executed for removing of session %s", cacheKey);
// TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
origCache
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
.remove(cacheKey);
}
}
private static class RemoteCacheWorker extends Thread {
private final RemoteCache<String, Object> remoteCache;
private final int myThreadId;
private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
this.remoteCache = remoteCache;
this.myThreadId = myThreadId;
}
@Override
public void run() {
for (int i=0 ; i<ITERATIONS ; i++) {
String sessionId = String.valueOf(i);
remoteCache.remove(sessionId);
logger.infof("Session %s removed on DC1", sessionId);
// Check if it's immediately seen that session is removed on 2nd DC
RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
// Also check that it's immediatelly removed on my DC
SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
}
}
}
}

View file

@ -59,7 +59,7 @@ import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
*/
public class ConcurrencyJDGSessionsCacheTest {
protected static final Logger logger = Logger.getLogger(KcRemoteStore.class);
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGSessionsCacheTest.class);
private static final int ITERATION_PER_WORKER = 1000;
@ -210,56 +210,11 @@ public class ConcurrencyJDGSessionsCacheTest {
//return new CacheWorker(cache, threadId);
}
private static EmbeddedCacheManager createManager(int threadId) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId);
cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, invalidationCacheConfiguration);
return cacheManager;
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.SESSION_CACHE_NAME, KcRemoteStoreConfigurationBuilder.class);
}
private static Configuration getCacheBackedByRemoteStore(int threadId) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(KcRemoteStoreConfigurationBuilder.class)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(InfinispanConnectionProvider.SESSION_CACHE_NAME)
.rawValues(true)
.forceReturnValues(false)
.marshaller(KeycloakHotRodMarshallerFactory.class.getName())
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
.enabled(false).build();
}
@ClientListener
public static class HotRodListener {

View file

@ -0,0 +1,85 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.remote.configuration.ExhaustedAction;
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
class TestCacheManagerFactory {
<T extends RemoteStoreConfigurationBuilder> EmbeddedCacheManager createManager(int threadId, String cacheName, Class<T> builderClass) {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("jgroups.tcp.port", "53715");
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
boolean clustered = false;
boolean async = false;
boolean allowDuplicateJMXDomains = true;
if (clustered) {
gcb = gcb.clusteredDefault();
gcb.transport().clusterName("test-clustering");
}
gcb.globalJmxStatistics().allowDuplicateDomains(allowDuplicateJMXDomains);
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
Configuration invalidationCacheConfiguration = getCacheBackedByRemoteStore(threadId, cacheName, builderClass);
cacheManager.defineConfiguration(cacheName, invalidationCacheConfiguration);
return cacheManager;
}
private <T extends RemoteStoreConfigurationBuilder> Configuration getCacheBackedByRemoteStore(int threadId, String cacheName, Class<T> builderClass) {
ConfigurationBuilder cacheConfigBuilder = new ConfigurationBuilder();
int port = threadId==1 ? 12232 : 13232;
//int port = 12232;
return cacheConfigBuilder.persistence().addStore(builderClass)
.fetchPersistentState(false)
.ignoreModifications(false)
.purgeOnStartup(false)
.preload(false)
.shared(true)
.remoteCacheName(cacheName)
.rawValues(true)
.forceReturnValues(false)
.marshaller(KeycloakHotRodMarshallerFactory.class.getName())
.addServer()
.host("localhost")
.port(port)
.connectionPool()
.maxActive(20)
.exhaustedAction(ExhaustedAction.CREATE_NEW)
.async()
. enabled(false).build();
}
}

View file

@ -34,6 +34,7 @@ import org.keycloak.services.ErrorPageException;
import org.keycloak.services.managers.AuthenticationManager;
import org.keycloak.services.managers.AuthenticationSessionManager;
import org.keycloak.services.managers.ClientSessionCode;
import org.keycloak.services.managers.UserSessionCrossDCManager;
import org.keycloak.services.messages.Messages;
import org.keycloak.services.resources.LoginActionsService;
import org.keycloak.services.util.CacheControlUtil;
@ -208,7 +209,7 @@ public abstract class AuthorizationEndpointBase {
}
}
UserSessionModel userSession = authSessionId==null ? null : session.sessions().getUserSession(realm, authSessionId);
UserSessionModel userSession = authSessionId==null ? null : new UserSessionCrossDCManager(session).getUserSessionIfExistsRemotely(realm, authSessionId);
if (userSession != null) {
logger.debugf("Sent request to authz endpoint. We don't have authentication session with ID '%s' but we have userSession. Will re-create authentication session with same ID", authSessionId);

View file

@ -386,6 +386,7 @@ public class TokenEndpoint {
}
} catch (OAuthErrorException e) {
logger.trace(e.getMessage(), e);
event.error(Errors.INVALID_TOKEN);
throw new ErrorResponseException(e.getError(), e.getDescription(), Response.Status.BAD_REQUEST);
}

View file

@ -62,4 +62,17 @@ public class UserSessionCrossDCManager {
});
}
// Just check if userSession also exists on remoteCache. It can happen that logout happened on 2nd DC and userSession is already removed on remoteCache and this DC wasn't yet notified
public UserSessionModel getUserSessionIfExistsRemotely(RealmModel realm, String id) {
UserSessionModel userSession = kcSession.sessions().getUserSession(realm, id);
// This will remove userSession "locally" if it doesn't exists on remoteCache
kcSession.sessions().getUserSessionWithPredicate(realm, id, false, (UserSessionModel userSession2) -> {
return userSession2 == null;
});
return kcSession.sessions().getUserSession(realm, id);
}
}

View file

@ -17,19 +17,61 @@
package org.keycloak.testsuite;
import java.util.function.Supplier;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
public class Retry {
public static void execute(Runnable runnable, int retryCount, long intervalMillis) {
/**
* Runs the given {@code runnable} at most {@code retryCount} times until it passes,
* leaving {@code intervalMillis} milliseconds between the invocations.
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
* @param runnable
* @param attemptsCount Total number of attempts to execute the {@code runnable}
* @param intervalMillis
* @return Index of the first successful invocation, starting from 0.
*/
public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) {
int executionIndex = 0;
while (true) {
try {
runnable.run();
return;
return executionIndex;
} catch (RuntimeException | AssertionError e) {
retryCount--;
if (retryCount > 0) {
attemptsCount--;
executionIndex++;
if (attemptsCount > 0) {
try {
Thread.sleep(intervalMillis);
} catch (InterruptedException ie) {
ie.addSuppressed(e);
throw new RuntimeException(ie);
}
} else {
throw e;
}
}
}
}
/**
* Runs the given {@code runnable} at most {@code retryCount} times until it passes,
* leaving {@code intervalMillis} milliseconds between the invocations.
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
* @param supplier
* @param attemptsCount Total number of attempts to execute the {@code runnable}
* @param intervalMillis
* @return Value generated by the {@code supplier}.
*/
public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalMillis) {
while (true) {
try {
return supplier.get();
} catch (RuntimeException | AssertionError e) {
attemptsCount--;
if (attemptsCount > 0) {
try {
Thread.sleep(intervalMillis);
} catch (InterruptedException ie) {

View file

@ -33,6 +33,7 @@ import java.util.Set;
import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanChannelStatistics;
import org.keycloak.testsuite.arquillian.jmx.JmxConnectorRegistry;
import org.keycloak.testsuite.arquillian.undertow.KeycloakOnUndertow;
import org.keycloak.testsuite.crossdc.DC;
import java.io.NotSerializableException;
import java.lang.management.ManagementFactory;
import java.util.Objects;
@ -84,7 +85,7 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
ObjectName mbeanName = new ObjectName(String.format(
"%s:type=%s,name=\"%s(%s)\",manager=\"%s\",component=%s",
annotation.domain().isEmpty() ? getDefaultDomain(annotation.dcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
annotation.domain().isEmpty() ? getDefaultDomain(annotation.dc().getDcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
annotation.type(),
annotation.cacheName(),
annotation.cacheMode(),
@ -98,8 +99,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
try {
Retry.execute(() -> value.reset(), 2, 150);
} catch (RuntimeException ex) {
if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1
&& suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1
&& suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
LOG.warn("Could not reset statistics for " + mbeanName);
}
}
@ -113,7 +114,7 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
ObjectName mbeanName = new ObjectName(String.format(
"%s:type=%s,cluster=\"%s\"",
annotation.domain().isEmpty() ? getDefaultDomain(annotation.dcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
annotation.domain().isEmpty() ? getDefaultDomain(annotation.dc().getDcIndex(), annotation.dcNodeIndex()) : InfinispanConnectionProvider.JMX_DOMAIN,
annotation.type(),
annotation.cluster()
));
@ -124,8 +125,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
try {
Retry.execute(() -> value.reset(), 2, 150);
} catch (RuntimeException ex) {
if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1
&& suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1
&& suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex()).isStarted()) {
LOG.warn("Could not reset statistics for " + mbeanName);
}
}
@ -170,8 +171,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
final String host;
final int port;
if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1) {
ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex());
if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1) {
ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex());
Container container = node.getArquillianContainer();
if (container.getDeployableContainer() instanceof KeycloakOnUndertow) {
return ManagementFactory.getPlatformMBeanServer();
@ -204,8 +205,8 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
final String host;
final int port;
if (annotation.dcIndex() != -1 && annotation.dcNodeIndex() != -1) {
ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dcIndex()).get(annotation.dcNodeIndex());
if (annotation.dc() != DC.UNDEFINED && annotation.dcNodeIndex() != -1) {
ContainerInfo node = suiteContext.get().getAuthServerBackendsInfo(annotation.dc().getDcIndex()).get(annotation.dcNodeIndex());
Container container = node.getArquillianContainer();
if (container.getDeployableContainer() instanceof KeycloakOnUndertow) {
return ManagementFactory.getPlatformMBeanServer();

View file

@ -19,6 +19,7 @@ package org.keycloak.testsuite.arquillian.annotation;
import org.keycloak.testsuite.arquillian.AuthServerTestEnricher;
import org.keycloak.testsuite.arquillian.InfinispanStatistics;
import org.keycloak.testsuite.arquillian.InfinispanStatistics.Constants;
import org.keycloak.testsuite.crossdc.DC;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -48,7 +49,7 @@ public @interface JmxInfinispanCacheStatistics {
// Host address - either given by arrangement of DC ...
/** Index of the data center, starting from 0 */
int dcIndex() default -1;
DC dc() default DC.UNDEFINED;
/** Index of the node within data center, starting from 0. Nodes are ordered by arquillian qualifier as per {@link AuthServerTestEnricher} */
int dcNodeIndex() default -1;

View file

@ -17,6 +17,7 @@
package org.keycloak.testsuite.arquillian.annotation;
import org.keycloak.testsuite.arquillian.InfinispanStatistics.Constants;
import org.keycloak.testsuite.crossdc.DC;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -40,7 +41,7 @@ public @interface JmxInfinispanChannelStatistics {
// Host address - either given by arrangement of DC ...
/** Index of the data center, starting from 0 */
int dcIndex() default -1;
DC dc() default DC.UNDEFINED;
/** Index of the node within data center, starting from 0. Nodes are ordered by arquillian qualifier as per {@link AuthServerTestEnricher} */
int dcNodeIndex() default -1;

View file

@ -0,0 +1,31 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.crossdc;
/**
* Identifier of datacentre in the testsuite
* @author hmlnarik
*/
public enum DC {
FIRST,
SECOND,
UNDEFINED;
public int getDcIndex() {
return ordinal();
}
}

View file

@ -54,6 +54,7 @@ import org.keycloak.testsuite.arquillian.SuiteContext;
import org.keycloak.util.BasicAuthHelper;
import org.keycloak.util.JsonSerialization;
import org.keycloak.util.TokenUtil;
import com.google.common.base.Charsets;
import org.openqa.selenium.By;
import org.openqa.selenium.WebDriver;
@ -203,6 +204,7 @@ public class OAuthClient {
}
public void fillLoginForm(String username, String password) {
WaitUtils.waitForPageToLoad(driver);
String src = driver.getPageSource();
try {
driver.findElement(By.id("username")).sendKeys(username);
@ -250,8 +252,7 @@ public class OAuthClient {
}
public AccessTokenResponse doAccessTokenRequest(String code, String password) {
CloseableHttpClient client = newCloseableHttpClient();
try {
try (CloseableHttpClient client = newCloseableHttpClient()) {
HttpPost post = new HttpPost(getAccessTokenUrl());
List<NameValuePair> parameters = new LinkedList<NameValuePair>();
@ -283,12 +284,7 @@ public class OAuthClient {
parameters.add(new BasicNameValuePair(OAuth2Constants.CODE_VERIFIER, codeVerifier));
}
UrlEncodedFormEntity formEntity = null;
try {
formEntity = new UrlEncodedFormEntity(parameters, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters, Charsets.UTF_8);
post.setEntity(formEntity);
try {
@ -296,8 +292,8 @@ public class OAuthClient {
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve access token", e);
}
} finally {
closeClient(client);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@ -310,8 +306,7 @@ public class OAuthClient {
}
public String introspectTokenWithClientCredential(String clientId, String clientSecret, String tokenType, String tokenToIntrospect) {
CloseableHttpClient client = new DefaultHttpClient();
try {
try (CloseableHttpClient client = new DefaultHttpClient()) {
HttpPost post = new HttpPost(getTokenIntrospectionUrl());
String authorization = BasicAuthHelper.createHeader(clientId, clientSecret);
@ -332,19 +327,16 @@ public class OAuthClient {
post.setEntity(formEntity);
try {
try (CloseableHttpResponse response = client.execute(post)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
CloseableHttpResponse response = client.execute(post);
response.getEntity().writeTo(out);
response.close();
return new String(out.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve access token", e);
}
} finally {
closeClient(client);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

View file

@ -144,6 +144,8 @@ public abstract class AbstractKeycloakTest {
updateMasterAdminPassword();
}
beforeAbstractKeycloakTestRealmImport();
if (testContext.getTestRealmReps() == null) {
importTestRealms();
@ -155,6 +157,9 @@ public abstract class AbstractKeycloakTest {
oauth.init(adminClient, driver);
}
protected void beforeAbstractKeycloakTestRealmImport() throws Exception {
}
@After
public void afterAbstractKeycloakTest() {
if (resetTimeOffset) {

View file

@ -76,7 +76,6 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
runnable.run(arrayIndex % numThreads, keycloaks.get(), keycloaks.get().realm(REALM_NAME));
} catch (Throwable ex) {
failures.add(ex);
log.error(ex.getMessage(), ex);
}
return null;
});
@ -96,6 +95,7 @@ public abstract class AbstractConcurrencyTest extends AbstractTestRealmKeycloakT
if (! failures.isEmpty()) {
RuntimeException ex = new RuntimeException("There were failures in threads. Failures count: " + failures.size());
failures.forEach(ex::addSuppressed);
failures.forEach(e -> log.error(e.getMessage(), e));
throw ex;
}
}

View file

@ -22,7 +22,6 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -46,11 +45,21 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.keycloak.OAuth2Constants;
import org.keycloak.admin.client.Keycloak;
import org.keycloak.admin.client.resource.ClientsResource;
import org.keycloak.admin.client.resource.RealmResource;
import org.keycloak.representations.AccessToken;
import org.keycloak.representations.idm.ClientRepresentation;
import org.keycloak.testsuite.Retry;
import org.keycloak.testsuite.admin.ApiUtil;
import org.keycloak.testsuite.util.ClientBuilder;
import org.keycloak.testsuite.util.OAuthClient;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.client.CookieStore;
import org.apache.http.impl.client.BasicCookieStore;
import org.hamcrest.Matchers;
@ -60,9 +69,9 @@ import org.hamcrest.Matchers;
*/
public class ConcurrentLoginTest extends AbstractConcurrencyTest {
private static final int DEFAULT_THREADS = 10;
private static final int CLIENTS_PER_THREAD = 10;
private static final int DEFAULT_CLIENTS_COUNT = CLIENTS_PER_THREAD * DEFAULT_THREADS;
protected static final int DEFAULT_THREADS = 4;
protected static final int CLIENTS_PER_THREAD = 30;
protected static final int DEFAULT_CLIENTS_COUNT = CLIENTS_PER_THREAD * DEFAULT_THREADS;
@Before
public void beforeTest() {
@ -70,87 +79,85 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
}
protected void createClients() {
final ClientsResource clients = adminClient.realm(REALM_NAME).clients();
for (int i = 0; i < DEFAULT_CLIENTS_COUNT; i++) {
ClientRepresentation client = new ClientRepresentation();
client.setClientId("client" + i);
client.setDirectAccessGrantsEnabled(true);
client.setRedirectUris(Arrays.asList("http://localhost:8180/auth/realms/master/app/*"));
client.setWebOrigins(Arrays.asList("http://localhost:8180"));
client.setSecret("password");
ClientRepresentation client = ClientBuilder.create()
.clientId("client" + i)
.directAccessGrants()
.redirectUris("http://localhost:8180/auth/realms/master/app/*")
.addWebOrigin("http://localhost:8180")
.secret("password")
.build();
log.debug("creating " + client.getClientId());
Response create = adminClient.realm("test").clients().create(client);
Assert.assertEquals(Response.Status.CREATED, create.getStatusInfo());
Response create = clients.create(client);
String clientId = ApiUtil.getCreatedId(create);
create.close();
getCleanup(REALM_NAME).addClientUuid(clientId);
log.debugf("created %s [uuid=%s]", client.getClientId(), clientId);
}
log.debug("clients created");
}
@Test
public void concurrentLogin() throws Throwable {
System.out.println("*********************************************");
public void concurrentLoginSingleUser() throws Throwable {
log.info("*********************************************");
long start = System.currentTimeMillis();
AtomicReference<String> userSessionId = new AtomicReference<>();
LoginTask loginTask = null;
try (CloseableHttpClient httpClient = HttpClientBuilder.create().setRedirectStrategy(new LaxRedirectStrategy()).build()) {
HttpUriRequest request = handleLogin(getPageContent(oauth.getLoginFormUrl(), httpClient, null), "test-user@localhost", "password");
log.debug("Executing login request");
Assert.assertTrue(parseAndCloseResponse(httpClient.execute(request)).contains("<title>AUTH_RESPONSE</title>"));
AtomicInteger clientIndex = new AtomicInteger();
ThreadLocal<OAuthClient> oauthClient = new ThreadLocal<OAuthClient>() {
@Override
protected OAuthClient initialValue() {
OAuthClient oauth1 = new OAuthClient();
oauth1.init(adminClient, driver);
return oauth1;
}
};
run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, (threadIndex, keycloak, realm) -> {
int i = clientIndex.getAndIncrement();
OAuthClient oauth1 = oauthClient.get();
oauth1.clientId("client" + i);
log.infof("%d [%s]: Accessing login page for %s", threadIndex, Thread.currentThread().getName(), oauth1.getClientId());
final HttpClientContext context = HttpClientContext.create();
String pageContent = getPageContent(oauth1.getLoginFormUrl(), httpClient, context);
String currentUrl = context.getRedirectLocations().get(0).toString();
Assert.assertThat(pageContent, Matchers.containsString("<title>AUTH_RESPONSE</title>"));
String code = getQueryFromUrl(currentUrl).get(OAuth2Constants.CODE);
OAuthClient.AccessTokenResponse accessRes = oauth1.doAccessTokenRequest(code, "password");
Assert.assertEquals("AccessTokenResponse: error: '" + accessRes.getError() + "' desc: '" + accessRes.getErrorDescription() + "'",
200, accessRes.getStatusCode());
OAuthClient.AccessTokenResponse refreshRes = oauth1.doRefreshTokenRequest(accessRes.getRefreshToken(), "password");
Assert.assertEquals("AccessTokenResponse: error: '" + refreshRes.getError() + "' desc: '" + refreshRes.getErrorDescription() + "'",
200, refreshRes.getStatusCode());
if (userSessionId.get() == null) {
AccessToken token = oauth.verifyToken(accessRes.getAccessToken());
userSessionId.set(token.getSessionState());
}
});
loginTask = new LoginTask(httpClient, userSessionId, 100, 1, Arrays.asList(
createHttpClientContextForUser(httpClient, "test-user@localhost", "password")
));
run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, loginTask);
int clientSessionsCount = testingClient.testing().getClientSessionsCountInUserSession("test", userSessionId.get());
Assert.assertEquals(clientSessionsCount, 1 + (DEFAULT_THREADS * CLIENTS_PER_THREAD));
Assert.assertEquals(1 + DEFAULT_CLIENTS_COUNT, clientSessionsCount);
} finally {
logStats(start);
long end = System.currentTimeMillis() - start;
log.infof("Statistics: %s", loginTask == null ? "??" : loginTask.getHistogram());
log.info("concurrentLoginSingleUser took " + (end/1000) + "s");
log.info("*********************************************");
}
}
protected void logStats(long start) {
long end = System.currentTimeMillis() - start;
log.info("concurrentLogin took " + (end/1000) + "s");
log.info("*********************************************");
protected HttpClientContext createHttpClientContextForUser(final CloseableHttpClient httpClient, String userName, String password) throws IOException {
final HttpClientContext context = HttpClientContext.create();
CookieStore cookieStore = new BasicCookieStore();
context.setCookieStore(cookieStore);
HttpUriRequest request = handleLogin(getPageContent(oauth.getLoginFormUrl(), httpClient, context), userName, password);
log.debug("Executing login request");
Assert.assertTrue(parseAndCloseResponse(httpClient.execute(request, context)).contains("<title>AUTH_RESPONSE</title>"));
return context;
}
private String getPageContent(String url, CloseableHttpClient httpClient, HttpClientContext context) throws IOException {
@Test
public void concurrentLoginMultipleUsers() throws Throwable {
log.info("*********************************************");
long start = System.currentTimeMillis();
AtomicReference<String> userSessionId = new AtomicReference<>();
LoginTask loginTask = null;
try (CloseableHttpClient httpClient = HttpClientBuilder.create().setRedirectStrategy(new LaxRedirectStrategy()).build()) {
loginTask = new LoginTask(httpClient, userSessionId, 100, 1, Arrays.asList(
createHttpClientContextForUser(httpClient, "test-user@localhost", "password"),
createHttpClientContextForUser(httpClient, "john-doh@localhost", "password"),
createHttpClientContextForUser(httpClient, "roleRichUser", "password")
));
run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, loginTask);
int clientSessionsCount = testingClient.testing().getClientSessionsCountInUserSession("test", userSessionId.get());
Assert.assertEquals(1 + DEFAULT_CLIENTS_COUNT / 3 + (DEFAULT_CLIENTS_COUNT % 3 <= 0 ? 0 : 1), clientSessionsCount);
} finally {
long end = System.currentTimeMillis() - start;
log.infof("Statistics: %s", loginTask == null ? "??" : loginTask.getHistogram());
log.info("concurrentLoginMultipleUsers took " + (end/1000) + "s");
log.info("*********************************************");
}
}
protected String getPageContent(String url, CloseableHttpClient httpClient, HttpClientContext context) throws IOException {
HttpGet request = new HttpGet(url);
request.setHeader("User-Agent", "Mozilla/5.0");
@ -158,15 +165,10 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
request.setHeader("Accept-Language", "en-US,en;q=0.5");
if (context != null) {
return parseAndCloseResponse(httpClient.execute(request, context));
} else {
return parseAndCloseResponse(httpClient.execute(request));
}
return parseAndCloseResponse(httpClient.execute(request, context));
}
private String parseAndCloseResponse(CloseableHttpResponse response) {
protected String parseAndCloseResponse(CloseableHttpResponse response) {
try {
int responseCode = response.getStatusLine().getStatusCode();
String resp = EntityUtils.toString(response.getEntity());
@ -187,9 +189,8 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
}
}
private HttpUriRequest handleLogin(String html, String username, String password) throws UnsupportedEncodingException {
System.out.println("Extracting form's data...");
protected HttpUriRequest handleLogin(String html, String username, String password) throws UnsupportedEncodingException {
log.debug("Extracting form's data...");
// Keycloak form id
Element loginform = Jsoup.parse(html).getElementById("kc-form-login");
@ -227,7 +228,7 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
}
}
private Map<String, String> getQueryFromUrl(String url) throws URISyntaxException {
private static Map<String, String> getQueryFromUrl(String url) throws URISyntaxException {
Map<String, String> m = new HashMap<>();
List<NameValuePair> pairs = URLEncodedUtils.parse(new URI(url), "UTF-8");
for (NameValuePair p : pairs) {
@ -236,5 +237,98 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
return m;
}
public class LoginTask implements KeycloakRunnable {
private final AtomicInteger clientIndex = new AtomicInteger();
private final ThreadLocal<OAuthClient> oauthClient = new ThreadLocal<OAuthClient>() {
@Override
protected OAuthClient initialValue() {
OAuthClient oauth1 = new OAuthClient();
oauth1.init(adminClient, driver);
return oauth1;
}
};
private final CloseableHttpClient httpClient;
private final AtomicReference<String> userSessionId;
private final int retryDelayMs;
private final int retryCount;
private final AtomicInteger[] retryHistogram;
private final AtomicInteger totalInvocations = new AtomicInteger();
private final List<HttpClientContext> clientContexts;
public LoginTask(CloseableHttpClient httpClient, AtomicReference<String> userSessionId, int retryDelayMs, int retryCount, List<HttpClientContext> clientContexts) {
this.httpClient = httpClient;
this.userSessionId = userSessionId;
this.retryDelayMs = retryDelayMs;
this.retryCount = retryCount;
this.retryHistogram = new AtomicInteger[retryCount];
for (int i = 0; i < retryHistogram.length; i ++) {
retryHistogram[i] = new AtomicInteger();
}
this.clientContexts = clientContexts;
}
@Override
public void run(int threadIndex, Keycloak keycloak, RealmResource realm) throws Throwable {
int i = clientIndex.getAndIncrement();
OAuthClient oauth1 = oauthClient.get();
oauth1.clientId("client" + i);
log.infof("%d [%s]: Accessing login page for %s", threadIndex, Thread.currentThread().getName(), oauth1.getClientId());
final HttpClientContext templateContext = clientContexts.get(i % clientContexts.size());
final HttpClientContext context = HttpClientContext.create();
context.setCookieStore(templateContext.getCookieStore());
String pageContent = getPageContent(oauth1.getLoginFormUrl(), httpClient, context);
Assert.assertThat(pageContent, Matchers.containsString("<title>AUTH_RESPONSE</title>"));
Assert.assertThat(context.getRedirectLocations(), Matchers.notNullValue());
Assert.assertThat(context.getRedirectLocations(), Matchers.not(Matchers.empty()));
String currentUrl = context.getRedirectLocations().get(0).toString();
String code = getQueryFromUrl(currentUrl).get(OAuth2Constants.CODE);
AtomicReference<OAuthClient.AccessTokenResponse> accessResRef = new AtomicReference<>();
totalInvocations.incrementAndGet();
// obtain access + refresh token via code-to-token flow
OAuthClient.AccessTokenResponse accessRes = oauth1.doAccessTokenRequest(code, "password");
Assert.assertEquals("AccessTokenResponse: client: " + oauth1.getClientId() + ", error: '" + accessRes.getError() + "' desc: '" + accessRes.getErrorDescription() + "'",
200, accessRes.getStatusCode());
accessResRef.set(accessRes);
// Refresh access + refresh token using refresh token
int invocationIndex = Retry.execute(() -> {
OAuthClient.AccessTokenResponse refreshRes = oauth1.doRefreshTokenRequest(accessResRef.get().getRefreshToken(), "password");
Assert.assertEquals("AccessTokenResponse: client: " + oauth1.getClientId() + ", error: '" + refreshRes.getError() + "' desc: '" + refreshRes.getErrorDescription() + "'",
200, refreshRes.getStatusCode());
}, retryCount, retryDelayMs);
retryHistogram[invocationIndex].incrementAndGet();
if (userSessionId.get() == null) {
AccessToken token = oauth1.verifyToken(accessResRef.get().getAccessToken());
userSessionId.set(token.getSessionState());
}
}
public int getRetryDelayMs() {
return retryDelayMs;
}
public int getRetryCount() {
return retryCount;
}
public Map<Integer, Integer> getHistogram() {
Map<Integer, Integer> res = new LinkedHashMap<>(retryCount);
for (int i = 0; i < retryHistogram.length; i ++) {
AtomicInteger item = retryHistogram[i];
res.put(i * retryDelayMs, item.get());
}
return res;
}
}
}

View file

@ -22,7 +22,6 @@ import java.util.List;
import org.jboss.arquillian.container.test.api.ContainerController;
import org.jboss.arquillian.test.api.ArquillianResource;
import org.junit.After;
import org.junit.Before;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.representations.idm.RealmRepresentation;
@ -75,9 +74,8 @@ public class ConcurrentLoginClusterTest extends ConcurrentLoginTest {
@Override
protected void logStats(long start) {
super.logStats(start);
public void concurrentLoginSingleUser() throws Throwable {
super.concurrentLoginSingleUser();
JGroupsStats stats = testingClient.testing().cache(InfinispanConnectionProvider.SESSION_CACHE_NAME).getJgroupsStats();
log.info("JGroups statistics: " + stats.statsAsString());
}

View file

@ -96,7 +96,7 @@ public abstract class AbstractAdminCrossDCTest extends AbstractCrossDCTest {
Matcher<? super T> matcherInstance = matcherOnOldStat.apply(oldStat);
assertThat(newStat, matcherInstance);
}, 5, 200);
}, 20, 200);
}
protected void assertStatistics(InfinispanStatistics stats, Runnable testedCode, BiConsumer<Map<String, Object>, Map<String, Object>> assertionOnStats) {

View file

@ -40,7 +40,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
*
* Abstract cross-data-centre test that defines primitives for handling cross-DC setup.
* @author hmlnarik
*/
public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest {
@ -63,7 +63,7 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
@Before
public void enableOnlyFirstNodeInFirstDc() {
this.loadBalancerCtrl.disableAllBackendNodes();
loadBalancerCtrl.enableBackendNodeByName(getAutomaticallyStartedBackendNodes(0)
loadBalancerCtrl.enableBackendNodeByName(getAutomaticallyStartedBackendNodes(DC.FIRST)
.findFirst()
.orElseThrow(() -> new IllegalStateException("No node is started automatically"))
.getQualifier()
@ -84,7 +84,7 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
}
@Before
public void InitRESTClientsForStartedNodes() {
public void initRESTClientsForStartedNodes() {
log.debug("Init REST clients for automatically started nodes");
this.suiteContext.getDcAuthServerBackendsInfo().stream()
.flatMap(List::stream)
@ -188,7 +188,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* Disables routing requests to the given data center in the load balancer.
* @param dcIndex
*/
public void disableDcOnLoadBalancer(int dcIndex) {
public void disableDcOnLoadBalancer(DC dc) {
int dcIndex = dc.ordinal();
log.infof("Disabling load balancer for dc=%d", dcIndex);
this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).forEach(c -> loadBalancerCtrl.disableBackendNodeByName(c.getQualifier()));
}
@ -197,7 +198,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* Enables routing requests to all started nodes to the given data center in the load balancer.
* @param dcIndex
*/
public void enableDcOnLoadBalancer(int dcIndex) {
public void enableDcOnLoadBalancer(DC dc) {
int dcIndex = dc.ordinal();
log.infof("Enabling load balancer for dc=%d", dcIndex);
final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
if (! dcNodes.stream().anyMatch(ContainerInfo::isStarted)) {
@ -214,7 +216,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* @param dcIndex
* @param nodeIndex
*/
public void disableLoadBalancerNode(int dcIndex, int nodeIndex) {
public void disableLoadBalancerNode(DC dc, int nodeIndex) {
int dcIndex = dc.ordinal();
log.infof("Disabling load balancer for dc=%d, node=%d", dcIndex, nodeIndex);
loadBalancerCtrl.disableBackendNodeByName(this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).get(nodeIndex).getQualifier());
}
@ -224,7 +227,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* @param dcIndex
* @param nodeIndex
*/
public void enableLoadBalancerNode(int dcIndex, int nodeIndex) {
public void enableLoadBalancerNode(DC dc, int nodeIndex) {
int dcIndex = dc.ordinal();
log.infof("Enabling load balancer for dc=%d, node=%d", dcIndex, nodeIndex);
final ContainerInfo backendNode = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).get(nodeIndex);
if (backendNode == null) {
@ -242,7 +246,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* @param nodeIndex
* @return Started instance descriptor.
*/
public ContainerInfo startBackendNode(int dcIndex, int nodeIndex) {
public ContainerInfo startBackendNode(DC dc, int nodeIndex) {
int dcIndex = dc.ordinal();
assertThat((Integer) dcIndex, lessThan(this.suiteContext.getDcAuthServerBackendsInfo().size()));
final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
assertThat((Integer) nodeIndex, lessThan(dcNodes.size()));
@ -261,7 +266,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* @param nodeIndex
* @return Stopped instance descriptor.
*/
public ContainerInfo stopBackendNode(int dcIndex, int nodeIndex) {
public ContainerInfo stopBackendNode(DC dc, int nodeIndex) {
int dcIndex = dc.ordinal();
assertThat((Integer) dcIndex, lessThan(this.suiteContext.getDcAuthServerBackendsInfo().size()));
final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
assertThat((Integer) nodeIndex, lessThan(dcNodes.size()));
@ -279,7 +285,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* @param dcIndex
* @return
*/
public Stream<ContainerInfo> getManuallyStartedBackendNodes(int dcIndex) {
public Stream<ContainerInfo> getManuallyStartedBackendNodes(DC dc) {
int dcIndex = dc.ordinal();
final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
return dcNodes.stream().filter(ContainerInfo::isManual);
}
@ -289,7 +296,8 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
* @param dcIndex
* @return
*/
public Stream<ContainerInfo> getAutomaticallyStartedBackendNodes(int dcIndex) {
public Stream<ContainerInfo> getAutomaticallyStartedBackendNodes(DC dc) {
int dcIndex = dc.ordinal();
final List<ContainerInfo> dcNodes = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex);
return dcNodes.stream().filter(c -> ! c.isManual());
}

View file

@ -41,6 +41,7 @@ import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanCacheStatistics
import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanChannelStatistics;
import org.keycloak.testsuite.arquillian.InfinispanStatistics;
import org.keycloak.testsuite.arquillian.InfinispanStatistics.Constants;
import org.keycloak.testsuite.pages.ProceedPage;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import static org.hamcrest.Matchers.greaterThan;
@ -58,6 +59,9 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
@Page
protected LoginPasswordUpdatePage passwordUpdatePage;
@Page
protected ProceedPage proceedPage;
@Page
protected ErrorPage errorPage;
@ -73,11 +77,11 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
@Test
public void sendResetPasswordEmailSuccessWorksInCrossDc(
@JmxInfinispanCacheStatistics(dcIndex=0, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node0Statistics,
@JmxInfinispanCacheStatistics(dcIndex=0, dcNodeIndex=1, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node1Statistics,
@JmxInfinispanCacheStatistics(dcIndex=1, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc1Node0Statistics,
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node0Statistics,
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=1, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc0Node1Statistics,
@JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.ACTION_TOKEN_CACHE) InfinispanStatistics cacheDc1Node0Statistics,
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
startBackendNode(0, 1);
startBackendNode(DC.FIRST, 1);
cacheDc0Node1Statistics.waitToBecomeAvailable(10, TimeUnit.SECONDS);
Comparable originalNumberOfEntries = cacheDc0Node0Statistics.getSingleStatistics(Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
@ -107,6 +111,8 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
Matchers::is
);
proceedPage.assertCurrent();
proceedPage.clickProceedLink();
passwordUpdatePage.assertCurrent();
// Verify that there was at least one message sent via the channel
@ -120,8 +126,8 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
// Verify that there was an action token added in the node which was targetted by the link
assertThat(cacheDc0Node0Statistics.getSingleStatistics(Constants.STAT_CACHE_NUMBER_OF_ENTRIES), greaterThan(originalNumberOfEntries));
disableDcOnLoadBalancer(0);
enableDcOnLoadBalancer(1);
disableDcOnLoadBalancer(DC.FIRST);
enableDcOnLoadBalancer(DC.SECOND);
// Make sure that after going to the link, the invalidated action token has been retrieved from Infinispan server cluster in the other DC
assertSingleStatistics(cacheDc1Node0Statistics, Constants.STAT_CACHE_NUMBER_OF_ENTRIES,
@ -134,7 +140,7 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
@Test
public void sendResetPasswordEmailAfterNewNodeAdded() throws IOException, MessagingException {
disableDcOnLoadBalancer(1);
disableDcOnLoadBalancer(DC.SECOND);
UserRepresentation userRep = new UserRepresentation();
userRep.setEnabled(true);
@ -156,14 +162,16 @@ public class ActionTokenCrossDCTest extends AbstractAdminCrossDCTest {
driver.navigate().to(link);
proceedPage.assertCurrent();
proceedPage.clickProceedLink();
passwordUpdatePage.assertCurrent();
passwordUpdatePage.changePassword("new-pass", "new-pass");
assertEquals("Your account has been updated.", driver.getTitle());
disableDcOnLoadBalancer(0);
getManuallyStartedBackendNodes(1)
disableDcOnLoadBalancer(DC.FIRST);
getManuallyStartedBackendNodes(DC.SECOND)
.findFirst()
.ifPresent(c -> {
containerController.start(c.getQualifier());

View file

@ -17,18 +17,26 @@
package org.keycloak.testsuite.crossdc;
import java.util.LinkedList;
import org.keycloak.admin.client.Keycloak;
import org.keycloak.admin.client.resource.RealmResource;
import java.util.List;
import org.jboss.arquillian.container.test.api.ContainerController;
import org.jboss.arquillian.test.api.ArquillianResource;
import org.junit.Before;
import org.keycloak.representations.idm.RealmRepresentation;
import org.keycloak.testsuite.Assert;
import org.keycloak.testsuite.admin.concurrency.ConcurrentLoginTest;
import org.keycloak.testsuite.arquillian.ContainerInfo;
import org.keycloak.testsuite.arquillian.LoadBalancerController;
import org.keycloak.testsuite.arquillian.annotation.LoadBalancer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.LaxRedirectStrategy;
import org.junit.Ignore;
import org.junit.Test;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -42,42 +50,64 @@ public class ConcurrentLoginCrossDCTest extends ConcurrentLoginTest {
@ArquillianResource
protected ContainerController containerController;
private static final int INVOCATIONS_BEFORE_SIMULATING_DC_FAILURE = 10;
private static final int LOGIN_TASK_DELAY_MS = 100;
private static final int LOGIN_TASK_RETRIES = 15;
// Need to postpone that
@Override
public void addTestRealms(List<RealmRepresentation> testRealms) {
}
@Before
@Override
public void beforeTest() {
log.debug("Initializing load balancer - only enabling started nodes in the first DC");
public void beforeAbstractKeycloakTestRealmImport() {
log.debug("Initializing load balancer - enabling all started nodes across DCs");
this.loadBalancerCtrl.disableAllBackendNodes();
// This should enable only the started nodes in first datacenter
this.suiteContext.getDcAuthServerBackendsInfo().get(0).stream()
this.suiteContext.getDcAuthServerBackendsInfo().stream()
.flatMap(List::stream)
.filter(ContainerInfo::isStarted)
.map(ContainerInfo::getQualifier)
.forEach(loadBalancerCtrl::enableBackendNodeByName);
this.suiteContext.getDcAuthServerBackendsInfo().get(1).stream()
.filter(ContainerInfo::isStarted)
.map(ContainerInfo::getQualifier)
.forEach(loadBalancerCtrl::enableBackendNodeByName);
// Import realms
log.info("Importing realms");
List<RealmRepresentation> testRealms = new LinkedList<>();
super.addTestRealms(testRealms);
for (RealmRepresentation testRealm : testRealms) {
importRealm(testRealm);
}
log.info("Realms imported");
// Finally create clients
createClients();
}
@Test
public void concurrentLoginWithRandomDcFailures() throws Throwable {
log.info("*********************************************");
long start = System.currentTimeMillis();
AtomicReference<String> userSessionId = new AtomicReference<>();
LoginTask loginTask = null;
try (CloseableHttpClient httpClient = HttpClientBuilder.create().setRedirectStrategy(new LaxRedirectStrategy()).build()) {
loginTask = new LoginTask(httpClient, userSessionId, LOGIN_TASK_DELAY_MS, LOGIN_TASK_RETRIES, Arrays.asList(
createHttpClientContextForUser(httpClient, "test-user@localhost", "password")
));
HttpUriRequest request = handleLogin(getPageContent(oauth.getLoginFormUrl(), httpClient, HttpClientContext.create()), "test-user@localhost", "password");
log.debug("Executing login request");
org.junit.Assert.assertTrue(parseAndCloseResponse(httpClient.execute(request)).contains("<title>AUTH_RESPONSE</title>"));
run(DEFAULT_THREADS, DEFAULT_CLIENTS_COUNT, loginTask, new SwapDcAvailability());
int clientSessionsCount = testingClient.testing().getClientSessionsCountInUserSession("test", userSessionId.get());
org.junit.Assert.assertEquals(1 + DEFAULT_CLIENTS_COUNT, clientSessionsCount);
} finally {
long end = System.currentTimeMillis() - start;
log.infof("Statistics: %s", loginTask == null ? "??" : loginTask.getHistogram());
log.info("concurrentLoginWithRandomDcFailures took " + (end/1000) + "s");
log.info("*********************************************");
}
}
private class SwapDcAvailability implements KeycloakRunnable {
private final AtomicInteger invocationCounter = new AtomicInteger();
@Override
public void run(int threadIndex, Keycloak keycloak, RealmResource realm) throws Throwable {
final int currentInvocarion = invocationCounter.getAndIncrement();
if (currentInvocarion % INVOCATIONS_BEFORE_SIMULATING_DC_FAILURE == 0) {
int failureIndex = currentInvocarion / INVOCATIONS_BEFORE_SIMULATING_DC_FAILURE;
int dcToEnable = failureIndex % 2;
int dcToDisable = (failureIndex + 1) % 2;
suiteContext.getDcAuthServerBackendsInfo().get(dcToDisable).forEach(c -> loadBalancerCtrl.disableBackendNodeByName(c.getQualifier()));
suiteContext.getDcAuthServerBackendsInfo().get(dcToEnable).forEach(c -> loadBalancerCtrl.enableBackendNodeByName(c.getQualifier()));
}
}
}
}

View file

@ -43,7 +43,7 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
testRealm().update(realmRep);
// Enable second DC
enableDcOnLoadBalancer(1);
enableDcOnLoadBalancer(DC.SECOND);
// Login
OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
@ -68,7 +68,7 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
setTimeOffset(10);
// refresh token on DC0
disableDcOnLoadBalancer(1);
disableDcOnLoadBalancer(DC.SECOND);
tokenResponse = oauth.doRefreshTokenRequest(refreshToken1, "password");
String refreshToken2 = tokenResponse.getRefreshToken();
@ -85,8 +85,8 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
}, 50, 50);
// try refresh with old token on DC1. It should fail.
disableDcOnLoadBalancer(0);
enableDcOnLoadBalancer(1);
disableDcOnLoadBalancer(DC.FIRST);
enableDcOnLoadBalancer(DC.SECOND);
tokenResponse = oauth.doRefreshTokenRequest(refreshToken1, "password");
Assert.assertNull(tokenResponse.getAccessToken());
Assert.assertNotNull(tokenResponse.getError());
@ -106,7 +106,7 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
@Test
public void testLastSessionRefreshUpdate() {
// Disable DC1 on loadbalancer
disableDcOnLoadBalancer(1);
disableDcOnLoadBalancer(DC.SECOND);
// Get statistics
int stores0 = getRemoteCacheStats(0).getGlobalStores();

View file

@ -22,6 +22,7 @@ import javax.ws.rs.core.Response;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.junit.Test;
import org.keycloak.testsuite.Assert;
import org.keycloak.testsuite.Retry;
import org.keycloak.testsuite.util.Matchers;
import org.keycloak.testsuite.util.OAuthClient;
@ -34,12 +35,11 @@ public class LoginCrossDCTest extends AbstractAdminCrossDCTest {
@Test
public void loginTest() throws Exception {
log.info("Started to sleep");
enableDcOnLoadBalancer(1);
enableDcOnLoadBalancer(DC.SECOND);
//log.info("Started to sleep");
//Thread.sleep(10000000);
for (int i=0 ; i<10 ; i++) {
for (int i=0 ; i<30 ; i++) {
OAuthClient.AuthorizationEndpointResponse response1 = oauth.doLogin("test-user@localhost", "password");
String code = response1.getCode();
OAuthClient.AccessTokenResponse response2 = oauth.doAccessTokenRequest(code, "password");

View file

@ -18,8 +18,9 @@
log4j.rootLogger=info, keycloak
log4j.appender.keycloak=org.apache.log4j.ConsoleAppender
log4j.appender.keycloak.layout=org.apache.log4j.PatternLayout
log4j.appender.keycloak.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p [%c] %m%n
log4j.appender.keycloak.layout=org.apache.log4j.EnhancedPatternLayout
keycloak.testsuite.logging.pattern=%d{HH:mm:ss,SSS} %-5p [%c] %m%n
log4j.appender.keycloak.layout.ConversionPattern=${keycloak.testsuite.logging.pattern}
# Logging with "info" when running test from IDE, but disabled when running test with "mvn" . Both cases can be overriden by use system property "keycloak.logging.level" (eg. -Dkeycloak.logging.level=debug )
log4j.logger.org.keycloak=${keycloak.logging.level:info}

View file

@ -85,6 +85,7 @@
<keycloak.connectionsInfinispan.remoteStorePort>12232</keycloak.connectionsInfinispan.remoteStorePort>
<keycloak.connectionsInfinispan.remoteStorePort.2>13232</keycloak.connectionsInfinispan.remoteStorePort.2>
<keycloak.connectionsJpa.url.crossdc>jdbc:h2:mem:test-dc-shared</keycloak.connectionsJpa.url.crossdc>
<keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} %-5p [%c] %m%n</keycloak.testsuite.logging.pattern>
<adapter.test.props/>
<migration.import.properties/>
@ -284,6 +285,7 @@
<keycloak.connectionsInfinispan.remoteStorePort>${keycloak.connectionsInfinispan.remoteStorePort}</keycloak.connectionsInfinispan.remoteStorePort>
<keycloak.connectionsInfinispan.remoteStorePort.2>${keycloak.connectionsInfinispan.remoteStorePort.2}</keycloak.connectionsInfinispan.remoteStorePort.2>
<keycloak.connectionsInfinispan.remoteStoreServer>${keycloak.connectionsInfinispan.remoteStoreServer}</keycloak.connectionsInfinispan.remoteStoreServer>
<keycloak.testsuite.logging.pattern>${keycloak.testsuite.logging.pattern}</keycloak.testsuite.logging.pattern>
<keycloak.connectionsJpa.url.crossdc>${keycloak.connectionsJpa.url.crossdc}</keycloak.connectionsJpa.url.crossdc>
</systemPropertyVariables>
@ -386,6 +388,7 @@
<auth.server.crossdc>true</auth.server.crossdc>
<cache.server.jboss>true</cache.server.jboss>
<cache.server.config.dir>${cache.server.home}/standalone/configuration</cache.server.config.dir>
<keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} [%t] %-5p [%c{1.}] %m%n</keycloak.testsuite.logging.pattern>
</properties>
<dependencies>
<dependency>
@ -460,6 +463,7 @@
<auth.server.crossdc>true</auth.server.crossdc>
<cache.server.jboss>true</cache.server.jboss>
<cache.server.config.dir>${cache.server.home}/standalone/configuration</cache.server.config.dir>
<keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} [%t] %-5p [%c{1.}] %m%n</keycloak.testsuite.logging.pattern>
</properties>
<dependencies>
<dependency>
@ -584,6 +588,8 @@
<auth.server.backend2.home>${containers.home}/auth-server-${auth.server}-backend2</auth.server.backend2.home>
<auth.server.config.dir>${auth.server.backend1.home}/standalone/configuration</auth.server.config.dir>
<keycloak.testsuite.logging.pattern>%d{HH:mm:ss,SSS} [%t] %-5p [%c{1.}] %m%n</keycloak.testsuite.logging.pattern>
</properties>
<build>
<plugins>