Merge pull request #2234 from mposolda/master

KEYCLOAK-2412 KEYCLOAK-2495 cluster fixes and improvements
This commit is contained in:
Marek Posolda 2016-02-17 13:57:13 +01:00
commit c4144dd884
47 changed files with 1560 additions and 195 deletions

View file

@ -89,6 +89,7 @@
<local-cache name="sessions"/>
<local-cache name="offlineSessions"/>
<local-cache name="loginFailures"/>
<local-cache name="work"/>
<local-cache name="realmVersions">
<transaction mode="BATCH" locking="PESSIMISTIC"/>
</local-cache>

View file

@ -7,5 +7,8 @@ embed-server --server-config=standalone-ha.xml
/subsystem=infinispan/cache-container=keycloak/distributed-cache=sessions:add(mode="SYNC",owners="1")
/subsystem=infinispan/cache-container=keycloak/distributed-cache=offlineSessions:add(mode="SYNC",owners="1")
/subsystem=infinispan/cache-container=keycloak/distributed-cache=loginFailures:add(mode="SYNC",owners="1")
/subsystem=infinispan/cache-container=keycloak/replicated-cache=work:add(mode="SYNC")
/subsystem=infinispan/cache-container=keycloak/local-cache=realmVersions:add()
/subsystem=infinispan/cache-container=keycloak/local-cache=realmVersions/transaction=TRANSACTION:add(mode=BATCH,locking=PESSIMISTIC)
/extension=org.keycloak.keycloak-server-subsystem/:add(module=org.keycloak.keycloak-server-subsystem)
/subsystem=keycloak-server:add(web-context=auth)

View file

@ -6,5 +6,8 @@ embed-server --server-config=standalone.xml
/subsystem=infinispan/cache-container=keycloak/local-cache=sessions:add()
/subsystem=infinispan/cache-container=keycloak/local-cache=offlineSessions:add()
/subsystem=infinispan/cache-container=keycloak/local-cache=loginFailures:add()
/subsystem=infinispan/cache-container=keycloak/local-cache=work:add()
/subsystem=infinispan/cache-container=keycloak/local-cache=realmVersions:add()
/subsystem=infinispan/cache-container=keycloak/local-cache=realmVersions/transaction=TRANSACTION:add(mode=BATCH,locking=PESSIMISTIC)
/extension=org.keycloak.keycloak-server-subsystem/:add(module=org.keycloak.keycloak-server-subsystem)
/subsystem=keycloak-server:add(web-context=auth)

View file

@ -143,6 +143,61 @@ kinit hnelson@KEYCLOAK.ORG
and provide password `secret`
Now when you access `http://localhost:8081/auth/realms/master/account` you should be logged in automatically as user `hnelson` .
Now when you access `http://localhost:8081/auth/realms/master/account` you should be logged in automatically as user `hnelson` .
Create many users or offline sessions
-------------------------------------
Run testsuite with the command like this:
```
mvn exec:java -Pkeycloak-server -DstartTestsuiteCLI
```
Alternatively if you want to use your MySQL database use the command like this (replace properties values according your DB connection):
```
mvn exec:java -Pkeycloak-server -Dkeycloak.connectionsJpa.url=jdbc:mysql://localhost/keycloak -Dkeycloak.connectionsJpa.driver=com.mysql.jdbc.Driver -Dkeycloak.connectionsJpa.user=keycloak -Dkeycloak.connectionsJpa.password=keycloak -DstartTestsuiteCLI
```
Then once CLI is started, you can use command `help` to see all the available commands.
### Creating many users
For create many users you can use command `createUsers`
For example this will create 500 users `test0, test1, test2, ... , test499` in realm `demo` and each 100 users in separate transaction. All users will be granted realm roles `user` and `admin` :
```
createUsers test test demo 0 500 100 user,admin
```
Check count of users:
```
getUsersCount demo
```
Check if concrete user was really created:
```
getUser demo test499
```
### Creating many offline sessions
For create many offline sessions you can use command `persistSessions` . For example create 50000 sessions (each 500 in separate transaction) with command:
```
persistSessions 50000 500
```
Once users or sessions are created, you can restart to ensure the startup import of offline sessions will be triggered and you can see impact on startup time. After restart you can use command:
```
size
```
to doublecheck total count of sessions in infinispan (it will be 2 times as there is also 1 client session per each user session created)

View file

@ -0,0 +1,194 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import java.io.Serializable;
import java.util.concurrent.Callable;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
/**
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanClusterProvider implements ClusterProvider {
protected static final Logger logger = Logger.getLogger(InfinispanClusterProvider.class);
public static final String CLUSTER_STARTUP_TIME_KEY = "cluster-start-time";
private static final String TASK_KEY_PREFIX = "task::";
private final InfinispanClusterProviderFactory factory;
private final KeycloakSession session;
private final Cache<String, Serializable> cache;
public InfinispanClusterProvider(InfinispanClusterProviderFactory factory, KeycloakSession session, Cache<String, Serializable> cache) {
this.factory = factory;
this.session = session;
this.cache = cache;
}
@Override
public int getClusterStartupTime() {
Integer existingClusterStartTime = (Integer) cache.get(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY);
if (existingClusterStartTime != null) {
return existingClusterStartTime;
} else {
// clusterStartTime not yet initialized. Let's try to put our startupTime
int serverStartTime = (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
existingClusterStartTime = (Integer) cache.putIfAbsent(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime);
if (existingClusterStartTime == null) {
logger.debugf("Initialized cluster startup time to %s", Time.toDate(serverStartTime).toString());
return serverStartTime;
} else {
return existingClusterStartTime;
}
}
}
@Override
public void close() {
}
@Override
public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task) {
String cacheKey = TASK_KEY_PREFIX + taskKey;
boolean locked = tryLock(cacheKey, taskTimeoutInSeconds);
if (locked) {
try {
try {
T result = task.call();
return ExecutionResult.executed(result);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException("Unexpected exception when executed task " + taskKey, e);
}
} finally {
removeFromCache(cacheKey);
}
} else {
return ExecutionResult.notExecuted();
}
}
@Override
public void registerListener(String taskKey, ClusterListener task) {
factory.registerListener(taskKey, task);
}
@Override
public void notify(String taskKey, ClusterEvent event) {
// Put the value to the cache to notify listeners on all the nodes
cache.put(taskKey, event);
}
private String getCurrentNode(Cache<String, Serializable> cache) {
Transport transport = cache.getCacheManager().getTransport();
return transport==null ? "local" : transport.getAddress().toString();
}
private LockEntry createLockEntry(Cache<String, Serializable> cache) {
LockEntry lock = new LockEntry();
lock.setNode(getCurrentNode(cache));
lock.setTimestamp(Time.currentTime());
return lock;
}
private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
LockEntry myLock = createLockEntry(cache);
LockEntry existingLock = (LockEntry) cache.putIfAbsent(cacheKey, myLock);
if (existingLock != null) {
// Task likely already in progress. Check if timestamp is not outdated
int thatTime = existingLock.getTimestamp();
int currentTime = Time.currentTime();
if (thatTime + taskTimeoutInSeconds < currentTime) {
if (logger.isTraceEnabled()) {
logger.tracef("Task %s outdated when in progress by node %s. Will try to replace task with our node %s", cacheKey, existingLock.getNode(), myLock.getNode());
}
boolean replaced = cache.replace(cacheKey, existingLock, myLock);
if (!replaced) {
if (logger.isTraceEnabled()) {
logger.tracef("Failed to replace the task %s. Other thread replaced in the meantime. Ignoring task.", cacheKey);
}
}
return replaced;
} else {
if (logger.isTraceEnabled()) {
logger.tracef("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
}
return false;
}
} else {
if (logger.isTraceEnabled()) {
logger.tracef("Successfully acquired lock for task %s. Our node is %s", cacheKey, myLock.getNode());
}
return true;
}
}
private void removeFromCache(String cacheKey) {
// 3 attempts to send the message (it may fail if some node fails in the meantime)
int retry = 3;
while (true) {
try {
cache.getAdvancedCache()
.withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
.remove(cacheKey);
if (logger.isTraceEnabled()) {
logger.tracef("Task %s removed from the cache", cacheKey);
}
return;
} catch (RuntimeException e) {
ComponentStatus status = cache.getStatus();
if (status.isStopping() || status.isTerminated()) {
logger.warnf("Failed to remove task %s from the cache. Cache is already terminating", cacheKey);
logger.debug(e.getMessage(), e);
return;
}
retry--;
if (retry == 0) {
throw e;
}
}
}
}
}

View file

@ -0,0 +1,201 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanClusterProviderFactory implements ClusterProviderFactory {
public static final String PROVIDER_ID = "infinispan";
protected static final Logger logger = Logger.getLogger(InfinispanClusterProviderFactory.class);
private volatile Cache<String, Serializable> workCache;
private Map<String, ClusterListener> listeners = new HashMap<>();
@Override
public ClusterProvider create(KeycloakSession session) {
lazyInit(session);
return new InfinispanClusterProvider(this, session, workCache);
}
private void lazyInit(KeycloakSession session) {
if (workCache == null) {
synchronized (this) {
if (workCache == null) {
workCache = session.getProvider(InfinispanConnectionProvider.class).getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
workCache.getCacheManager().addListener(new ViewChangeListener());
workCache.addListener(new CacheEntryListener());
}
}
}
}
@Override
public void init(Config.Scope config) {
}
@Override
public void postInit(KeycloakSessionFactory factory) {
}
@Override
public void close() {
}
@Override
public String getId() {
return PROVIDER_ID;
}
@Listener
public class ViewChangeListener {
@ViewChanged
public void viewChanged(ViewChangedEvent event) {
EmbeddedCacheManager cacheManager = event.getCacheManager();
Transport transport = cacheManager.getTransport();
// Coordinator makes sure that entries for outdated nodes are cleaned up
if (transport != null && transport.isCoordinator()) {
Set<String> newAddresses = convertAddresses(event.getNewMembers());
Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
removedNodesAddresses.removeAll(newAddresses);
if (removedNodesAddresses.isEmpty()) {
return;
}
logger.debugf("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
Cache<String, Serializable> cache = cacheManager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
Iterator<String> toRemove = cache.entrySet().stream().filter(new Predicate<Map.Entry<String, Serializable>>() {
@Override
public boolean test(Map.Entry<String, Serializable> entry) {
if (!(entry.getValue() instanceof LockEntry)) {
return false;
}
LockEntry lock = (LockEntry) entry.getValue();
return removedNodesAddresses.contains(lock.getNode());
}
}).map(new Function<Map.Entry<String, Serializable>, String>() {
@Override
public String apply(Map.Entry<String, Serializable> entry) {
return entry.getKey();
}
}).iterator();
while (toRemove.hasNext()) {
String rem = toRemove.next();
if (logger.isTraceEnabled()) {
logger.tracef("Removing task %s due it's node left cluster", rem);
}
cache.remove(rem);
}
}
}
private Set<String> convertAddresses(Collection<Address> addresses) {
return addresses.stream().map(new Function<Address, String>() {
@Override
public String apply(Address address) {
return address.toString();
}
}).collect(Collectors.toSet());
}
}
<T> void registerListener(String taskKey, ClusterListener task) {
listeners.put(taskKey, task);
}
@Listener
public class CacheEntryListener {
@CacheEntryCreated
public void cacheEntryCreated(CacheEntryCreatedEvent<String, Object> event) {
if (!event.isPre()) {
trigger(event.getKey(), event.getValue());
}
}
@CacheEntryModified
public void cacheEntryModified(CacheEntryModifiedEvent<String, Object> event) {
if (!event.isPre()) {
trigger(event.getKey(), event.getValue());
}
}
private void trigger(String key, Object value) {
ClusterListener task = listeners.get(key);
if (task != null) {
ClusterEvent event = (ClusterEvent) value;
task.run(event);
}
}
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster.infinispan;
import java.io.Serializable;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LockEntry implements Serializable {
private String node;
private int timestamp;
public String getNode() {
return node;
}
public void setNode(String node) {
this.node = node;
}
public int getTimestamp() {
return timestamp;
}
public void setTimestamp(int timestamp) {
this.timestamp = timestamp;
}
}

View file

@ -23,6 +23,8 @@ import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.models.KeycloakSession;
@ -145,6 +147,13 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, sessionCacheConfiguration);
cacheManager.defineConfiguration(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME, sessionCacheConfiguration);
cacheManager.defineConfiguration(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME, sessionCacheConfiguration);
ConfigurationBuilder replicationConfigBuilder = new ConfigurationBuilder();
if (clustered) {
replicationConfigBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
}
Configuration replicationCacheConfiguration = replicationConfigBuilder.build();
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, replicationCacheConfiguration);
}
}

View file

@ -30,6 +30,7 @@ public interface InfinispanConnectionProvider extends Provider {
static final String SESSION_CACHE_NAME = "sessions";
static final String OFFLINE_SESSION_CACHE_NAME = "offlineSessions";
static final String LOGIN_FAILURE_CACHE_NAME = "loginFailures";
static final String WORK_CACHE_NAME = "work";
<K, V> Cache<K, V> getCache(String name);

View file

@ -21,15 +21,41 @@ import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.models.*;
import org.keycloak.models.ClientInitialAccessModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.ClientSessionModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
import org.keycloak.models.UsernameLoginFailureModel;
import org.keycloak.models.session.UserSessionPersisterProvider;
import org.keycloak.models.sessions.infinispan.entities.*;
import org.keycloak.models.sessions.infinispan.initializer.TimeAwareInitializerState;
import org.keycloak.models.sessions.infinispan.stream.*;
import org.keycloak.models.sessions.infinispan.entities.ClientInitialAccessEntity;
import org.keycloak.models.sessions.infinispan.entities.ClientSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
import org.keycloak.models.sessions.infinispan.stream.ClientInitialAccessPredicate;
import org.keycloak.models.sessions.infinispan.stream.ClientSessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.Comparators;
import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.RealmInfoUtil;
import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -411,19 +437,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
loginFailureCache.remove(new LoginFailureKey(realm.getId(), user.getEmail()));
}
@Override
public int getClusterStartupTime() {
TimeAwareInitializerState state = (TimeAwareInitializerState) offlineSessionCache.get(InfinispanUserSessionProviderFactory.SESSION_INITIALIZER_STATE_KEY);
int startTime;
if (state == null) {
log.warn("Cluster startup time not yet available. Fallback to local startup time");
startTime = (int)(session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
} else {
startTime = state.getClusterStartupTime();
}
return startTime;
}
@Override
public void close() {
}

View file

@ -17,6 +17,8 @@
package org.keycloak.models.sessions.infinispan;
import java.io.Serializable;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.Config;
@ -34,9 +36,6 @@ import org.keycloak.provider.ProviderEventListener;
public class InfinispanUserSessionProviderFactory implements UserSessionProviderFactory {
private static final String STATE_KEY_PREFIX = "initializerState";
public static final String SESSION_INITIALIZER_STATE_KEY = STATE_KEY_PREFIX + "::offlineUserSessions";
private static final Logger log = Logger.getLogger(InfinispanUserSessionProviderFactory.class);
private Config.Scope config;
@ -85,9 +84,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
@Override
public void run(KeycloakSession session) {
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
Cache<String, SessionEntity> cache = connections.getCache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME);
Cache<String, Serializable> cache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, SESSION_INITIALIZER_STATE_KEY);
InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, "offlineUserSessions");
initializer.initCache();
initializer.loadPersistentSessions();
}

View file

@ -20,57 +20,55 @@ package org.keycloak.models.sessions.infinispan.initializer;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.remoting.transport.Transport;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.utils.KeycloakModelUtils;
/**
* Startup initialization for reading persistent userSessions/clientSessions to be filled into infinispan/memory . In cluster,
* the initialization is distributed among all cluster nodes, so the startup time is even faster
*
* TODO: Move to clusterService. Implementation is already pretty generic and doesn't contain any "userSession" specific stuff. All sessions-specific logic is in the SessionLoader implementation
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanUserSessionInitializer {
private static final String STATE_KEY_PREFIX = "distributed::";
private static final Logger log = Logger.getLogger(InfinispanUserSessionInitializer.class);
private final KeycloakSessionFactory sessionFactory;
private final Cache<String, SessionEntity> cache;
private final Cache<String, Serializable> workCache;
private final SessionLoader sessionLoader;
private final int maxErrors;
private final int sessionsPerSegment;
private final String stateKey;
public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, SessionEntity> cache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKey) {
public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKeySuffix) {
this.sessionFactory = sessionFactory;
this.cache = cache;
this.workCache = workCache;
this.sessionLoader = sessionLoader;
this.maxErrors = maxErrors;
this.sessionsPerSegment = sessionsPerSegment;
this.stateKey = stateKey;
this.stateKey = STATE_KEY_PREFIX + stateKeySuffix;
}
public void initCache() {
this.cache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
this.workCache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
}
@ -94,16 +92,14 @@ public class InfinispanUserSessionInitializer {
private boolean isFinished() {
InitializerState state = (InitializerState) cache.get(stateKey);
InitializerState state = (InitializerState) workCache.get(stateKey);
return state != null && state.isFinished();
}
private InitializerState getOrCreateInitializerState() {
TimeAwareInitializerState state = (TimeAwareInitializerState) cache.get(stateKey);
InitializerState state = (InitializerState) workCache.get(stateKey);
if (state == null) {
int startTime = (int)(sessionFactory.getServerStartupTimestamp() / 1000);
final int[] count = new int[1];
// Rather use separate transactions for update and counting
@ -111,7 +107,7 @@ public class InfinispanUserSessionInitializer {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
sessionLoader.init(session, startTime);
sessionLoader.init(session);
}
});
@ -124,9 +120,8 @@ public class InfinispanUserSessionInitializer {
});
state = new TimeAwareInitializerState();
state = new InitializerState();
state.init(count[0], sessionsPerSegment);
state.setClusterStartupTime(startTime);
saveStateToCache(state);
}
return state;
@ -143,7 +138,7 @@ public class InfinispanUserSessionInitializer {
public void run() {
// Save this synchronously to ensure all nodes read correct state
InfinispanUserSessionInitializer.this.cache.getAdvancedCache().
InfinispanUserSessionInitializer.this.workCache.getAdvancedCache().
withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
.put(stateKey, state);
}
@ -153,7 +148,7 @@ public class InfinispanUserSessionInitializer {
private boolean isCoordinator() {
Transport transport = cache.getCacheManager().getTransport();
Transport transport = workCache.getCacheManager().getTransport();
return transport == null || transport.isCoordinator();
}
@ -166,9 +161,9 @@ public class InfinispanUserSessionInitializer {
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService localExecutor = Executors.newCachedThreadPool();
Transport transport = cache.getCacheManager().getTransport();
Transport transport = workCache.getCacheManager().getTransport();
boolean distributed = transport != null;
ExecutorService executorService = distributed ? new DefaultExecutorService(cache, localExecutor) : localExecutor;
ExecutorService executorService = distributed ? new DefaultExecutorService(workCache, localExecutor) : localExecutor;
int errors = 0;
@ -190,7 +185,7 @@ public class InfinispanUserSessionInitializer {
SessionInitializerWorker worker = new SessionInitializerWorker();
worker.setWorkerEnvironment(segment, sessionsPerSegment, sessionLoader);
if (!distributed) {
worker.setEnvironment(cache, null);
worker.setEnvironment(workCache, null);
}
Future<WorkerResult> future = executorService.submit(worker);
@ -242,6 +237,12 @@ public class InfinispanUserSessionInitializer {
runnable.run();
return;
} catch (RuntimeException e) {
ComponentStatus status = workCache.getStatus();
if (status.isStopping() || status.isTerminated()) {
log.warn("Failed to put initializerState to the cache. Cache is already terminating");
log.debug(e.getMessage(), e);
return;
}
retry--;
if (retry == 0) {
throw e;

View file

@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan.initializer;
import java.util.List;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.ClientSessionModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.UserSessionModel;
@ -33,9 +34,12 @@ public class OfflineUserSessionLoader implements SessionLoader {
private static final Logger log = Logger.getLogger(OfflineUserSessionLoader.class);
@Override
public void init(KeycloakSession session, int clusterStartupTime) {
public void init(KeycloakSession session) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
// TODO: check if update of timestamps in persister can be skipped entirely
int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
persister.clearDetachedUserSessions();

View file

@ -32,7 +32,7 @@ import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class SessionInitializerWorker implements DistributedCallable<String, SessionEntity, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
@ -40,7 +40,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
private int sessionsPerSegment;
private SessionLoader sessionLoader;
private transient Cache<String, SessionEntity> cache;
private transient Cache<String, Serializable> workCache;
public void setWorkerEnvironment(int segment, int sessionsPerSegment, SessionLoader sessionLoader) {
this.segment = segment;
@ -49,8 +49,8 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
}
@Override
public void setEnvironment(Cache<String, SessionEntity> cache, Set<String> inputKeys) {
this.cache = cache;
public void setEnvironment(Cache<String, Serializable> workCache, Set<String> inputKeys) {
this.workCache = workCache;
}
@Override
@ -59,7 +59,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
log.tracef("Running computation for segment: %d", segment);
}
KeycloakSessionFactory sessionFactory = cache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
if (sessionFactory == null) {
log.warnf("KeycloakSessionFactory not yet set in cache. Worker skipped");
return InfinispanUserSessionInitializer.WorkerResult.create(segment, false);

View file

@ -26,7 +26,7 @@ import org.keycloak.models.KeycloakSession;
*/
public interface SessionLoader extends Serializable {
void init(KeycloakSession session, int clusterStartupTime);
void init(KeycloakSession session);
int getSessionsCount(KeycloakSession session);

View file

@ -0,0 +1,18 @@
#
# Copyright 2016 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.keycloak.cluster.infinispan.InfinispanClusterProviderFactory

View file

@ -73,5 +73,9 @@
<dropUniqueConstraint tableName="FED_PROVIDERS" constraintName="UK_DCCIRJLIPU1478VQC89DID88C" />
<dropTable tableName="FED_PROVIDERS" />
<createIndex indexName="IDX_US_SESS_ID_ON_CL_SESS" tableName="OFFLINE_CLIENT_SESSION">
<column name="USER_SESSION_ID" type="VARCHAR(36)"/>
</createIndex>
</changeSet>
</databaseChangeLog>

View file

@ -0,0 +1,26 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster;
import java.io.Serializable;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface ClusterEvent extends Serializable {
}

View file

@ -15,20 +15,20 @@
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.initializer;
package org.keycloak.cluster;
/**
* Task to be executed on all cluster nodes once it's notified.
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class TimeAwareInitializerState extends InitializerState {
public interface ClusterListener {
private int clusterStartupTime;
/**
* Registered task to be executed on all cluster nodes once it's notified from cache.
*
* @param event value of notification (Object added into the cache)
*/
void run(ClusterEvent event);
public int getClusterStartupTime() {
return clusterStartupTime;
}
public void setClusterStartupTime(int clusterStartupTime) {
this.clusterStartupTime = clusterStartupTime;
}
}

View file

@ -0,0 +1,66 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster;
import java.util.concurrent.Callable;
import org.keycloak.provider.Provider;
/**
* Various utils related to clustering and concurrent tasks on cluster nodes
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface ClusterProvider extends Provider {
/**
* Same value for all cluster nodes. It will use startup time of this server in non-cluster environment.
*/
int getClusterStartupTime();
/**
* Execute given task just if it's not already in progress (either on this or any other cluster node).
*
* @param taskKey
* @param taskTimeoutInSeconds timeout for given task. If there is existing task in progress for longer time, it's considered outdated so we will start our task.
* @param task
* @param <T>
* @return result with "executed" flag specifying if execution was executed or ignored.
*/
<T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task);
/**
* Register task (listener) under given key. When this key will be put to the cache on any cluster node, the task will be executed
*
* @param taskKey
* @param task
*/
void registerListener(String taskKey, ClusterListener task);
/**
* Notify registered listeners on all cluster nodes
*
* @param taskKey
* @param event
*/
void notify(String taskKey, ClusterEvent event);
}

View file

@ -0,0 +1,26 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster;
import org.keycloak.provider.ProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface ClusterProviderFactory extends ProviderFactory<ClusterProvider> {
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster;
import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderFactory;
import org.keycloak.provider.Spi;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ClusterSpi implements Spi {
@Override
public boolean isInternal() {
return true;
}
@Override
public String getName() {
return "cluster";
}
@Override
public Class<? extends Provider> getProviderClass() {
return ClusterProvider.class;
}
@Override
public Class<? extends ProviderFactory> getProviderFactoryClass() {
return ClusterProviderFactory.class;
}
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.cluster;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ExecutionResult<T> {
private final boolean executed;
private final T result;
private ExecutionResult(boolean executed, T result) {
this.executed = executed;
this.result = result;
}
public static <T> ExecutionResult<T> executed(T result) {
return new ExecutionResult<>(true, result);
}
public static <T> ExecutionResult<T> notExecuted() {
return new ExecutionResult<>(false, null);
}
public boolean isExecuted() {
return executed;
}
public T getResult() {
return result;
}
}

View file

@ -22,11 +22,21 @@ package org.keycloak.models;
*/
public class UserFederationSyncResult {
private boolean ignored;
private int added;
private int updated;
private int removed;
private int failed;
public boolean isIgnored() {
return ignored;
}
public void setIgnored(boolean ignored) {
this.ignored = ignored;
}
public int getAdded() {
return added;
}
@ -83,11 +93,15 @@ public class UserFederationSyncResult {
}
public String getStatus() {
String status = String.format("%d imported users, %d updated users, %d removed users", added, updated, removed);
if (failed != 0) {
status += String.format(", %d users failed sync! See server log for more details", failed);
if (ignored) {
return "Synchronization ignored as it's already in progress";
} else {
String status = String.format("%d imported users, %d updated users, %d removed users", added, updated, removed);
if (failed != 0) {
status += String.format(", %d users failed sync! See server log for more details", failed);
}
return status;
}
return status;
}
@Override
@ -98,4 +112,10 @@ public class UserFederationSyncResult {
public static UserFederationSyncResult empty() {
return new UserFederationSyncResult();
}
public static UserFederationSyncResult ignored() {
UserFederationSyncResult result = new UserFederationSyncResult();
result.setIgnored(true);
return result;
}
}

View file

@ -82,9 +82,6 @@ public interface UserSessionProvider extends Provider {
void removeClientInitialAccessModel(RealmModel realm, String id);
List<ClientInitialAccessModel> listClientInitialAccess(RealmModel realm);
// Will use startup time of this server in non-cluster environment
int getClusterStartupTime();
void close();
}

View file

@ -49,5 +49,6 @@ org.keycloak.authentication.ClientAuthenticatorSpi
org.keycloak.authentication.RequiredActionSpi
org.keycloak.authentication.FormAuthenticatorSpi
org.keycloak.authentication.FormActionSpi
org.keycloak.cluster.ClusterSpi

View file

@ -17,6 +17,7 @@
package org.keycloak.protocol.oidc;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.ClientConnection;
import org.keycloak.OAuth2Constants;
import org.keycloak.OAuthErrorException;
@ -193,7 +194,7 @@ public class TokenManager {
int currentTime = Time.currentTime();
if (realm.isRevokeRefreshToken()) {
int clusterStartupTime = session.sessions().getClusterStartupTime();
int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
if (refreshToken.getIssuedAt() < validation.clientSession.getTimestamp() && (clusterStartupTime != validation.clientSession.getTimestamp())) {
throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");

View file

@ -231,7 +231,7 @@ public class RealmManager implements RealmImporter {
// Remove all periodic syncs for configured federation providers
UsersSyncManager usersSyncManager = new UsersSyncManager();
for (final UserFederationProviderModel fedProvider : federationProviders) {
usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), fedProvider);
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, true);
}
}
return removed;
@ -434,7 +434,7 @@ public class RealmManager implements RealmImporter {
List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
UsersSyncManager usersSyncManager = new UsersSyncManager();
for (final UserFederationProviderModel fedProvider : federationProviders) {
usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
}
return realm;
}

View file

@ -16,6 +16,10 @@
*/
package org.keycloak.services.managers;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
@ -30,13 +34,17 @@ import org.keycloak.services.ServicesLogger;
import org.keycloak.timer.TimerProvider;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Callable;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class UsersSyncManager {
private static final String FEDERATION_TASK_KEY = "federation";
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
/**
@ -57,26 +65,106 @@ public class UsersSyncManager {
refreshPeriodicSyncForProvider(sessionFactory, timer, fedProvider, realm.getId());
}
}
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
clusterProvider.registerListener(FEDERATION_TASK_KEY, new UserFederationClusterListener(sessionFactory));
}
});
}
public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
private class Holder {
ExecutionResult<UserFederationSyncResult> result;
}
public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
final Holder holder = new Holder();
// See when we did last sync.
int oldLastSync = fedProvider.getLastSync();
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
// Ensure not executed concurrently on this or any other cluster node
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
// shared key for "full" and "changed" . Improve if needed
String taskKey = fedProvider.getId() + "::sync";
// 30 seconds minimal timeout for now
int timeout = Math.max(30, fedProvider.getFullSyncPeriod());
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
@Override
public UserFederationSyncResult call() throws Exception {
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
}
});
}
});
if (holder.result == null || !holder.result.isExecuted()) {
logger.debugf("syncAllUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
return UserFederationSyncResult.ignored();
} else {
return holder.result.getResult();
}
}
public void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
final Holder holder = new Holder();
// Ensure not executed concurrently on this or any other cluster node
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
// shared key for "full" and "changed" . Improve if needed
String taskKey = fedProvider.getId() + "::sync";
// 30 seconds minimal timeout for now
int timeout = Math.max(30, fedProvider.getChangedSyncPeriod());
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
@Override
public UserFederationSyncResult call() throws Exception {
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
// See when we did last sync.
int oldLastSync = fedProvider.getLastSync();
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
}
});
}
});
if (holder.result == null || !holder.result.isExecuted()) {
logger.debugf("syncChangedUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
return UserFederationSyncResult.ignored();
} else {
return holder.result.getResult();
}
}
// Ensure all cluster nodes are notified
public void notifyToRefreshPeriodicSync(KeycloakSession session, RealmModel realm, UserFederationProviderModel federationProvider, boolean removed) {
FederationProviderClusterEvent event = FederationProviderClusterEvent.createEvent(removed, realm.getId(), federationProvider);
session.getProvider(ClusterProvider.class).notify(FEDERATION_TASK_KEY, event);
}
// Executed once it receives notification that some UserFederationProvider was created or updated
protected void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
logger.debugf("Going to refresh periodic sync for provider '%s' . Full sync period: %d , changed users sync period: %d",
fedProvider.getDisplayName(), fedProvider.getFullSyncPeriod(), fedProvider.getChangedSyncPeriod());
if (fedProvider.getFullSyncPeriod() > 0) {
// We want periodic full sync for this provider
timer.schedule(new Runnable() {
@ -84,7 +172,12 @@ public class UsersSyncManager {
@Override
public void run() {
try {
syncAllUsers(sessionFactory, realmId, fedProvider);
boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
if (shouldPerformSync) {
syncAllUsers(sessionFactory, realmId, fedProvider);
} else {
logger.debugf("Ignored periodic full sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
}
} catch (Throwable t) {
logger.errorDuringFullUserSync(t);
}
@ -102,7 +195,12 @@ public class UsersSyncManager {
@Override
public void run() {
try {
syncChangedUsers(sessionFactory, realmId, fedProvider);
boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
if (shouldPerformSync) {
syncChangedUsers(sessionFactory, realmId, fedProvider);
} else {
logger.debugf("Ignored periodic changed-users sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
}
} catch (Throwable t) {
logger.errorDuringChangedUserSync(t);
}
@ -115,7 +213,21 @@ public class UsersSyncManager {
}
}
public void removePeriodicSyncForProvider(TimerProvider timer, final UserFederationProviderModel fedProvider) {
// Skip syncing if there is short time since last sync time.
private boolean shouldPerformNewPeriodicSync(int lastSyncTime, int period) {
if (lastSyncTime <= 0) {
return true;
}
int currentTime = Time.currentTime();
int timeSinceLastSync = currentTime - lastSyncTime;
return (timeSinceLastSync * 2 > period);
}
// Executed once it receives notification that some UserFederationProvider was removed
protected void removePeriodicSyncForProvider(TimerProvider timer, UserFederationProviderModel fedProvider) {
logger.debugf("Removing periodic sync for provider %s", fedProvider.getDisplayName());
timer.cancelTask(fedProvider.getId() + "-FULL");
timer.cancelTask(fedProvider.getId() + "-CHANGED");
}
@ -144,4 +256,73 @@ public class UsersSyncManager {
});
}
private class UserFederationClusterListener implements ClusterListener {
private final KeycloakSessionFactory sessionFactory;
public UserFederationClusterListener(KeycloakSessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Override
public void run(ClusterEvent event) {
final FederationProviderClusterEvent fedEvent = (FederationProviderClusterEvent) event;
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
TimerProvider timer = session.getProvider(TimerProvider.class);
if (fedEvent.isRemoved()) {
removePeriodicSyncForProvider(timer, fedEvent.getFederationProvider());
} else {
refreshPeriodicSyncForProvider(sessionFactory, timer, fedEvent.getFederationProvider(), fedEvent.getRealmId());
}
}
});
}
}
// Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
public static class FederationProviderClusterEvent implements ClusterEvent {
private boolean removed;
private String realmId;
private UserFederationProviderModel federationProvider;
public boolean isRemoved() {
return removed;
}
public void setRemoved(boolean removed) {
this.removed = removed;
}
public String getRealmId() {
return realmId;
}
public void setRealmId(String realmId) {
this.realmId = realmId;
}
public UserFederationProviderModel getFederationProvider() {
return federationProvider;
}
public void setFederationProvider(UserFederationProviderModel federationProvider) {
this.federationProvider = federationProvider;
}
public static FederationProviderClusterEvent createEvent(boolean removed, String realmId, UserFederationProviderModel fedProvider) {
FederationProviderClusterEvent notification = new FederationProviderClusterEvent();
notification.setRemoved(removed);
notification.setRealmId(realmId);
notification.setFederationProvider(fedProvider);
return notification;
}
}
}

View file

@ -38,6 +38,7 @@ import org.keycloak.services.managers.UsersSyncManager;
import org.keycloak.services.resources.admin.AdminRoot;
import org.keycloak.services.scheduled.ClearExpiredEvents;
import org.keycloak.services.scheduled.ClearExpiredUserSessions;
import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
import org.keycloak.services.scheduled.ScheduledTaskRunner;
import org.keycloak.services.util.JsonConfigProvider;
import org.keycloak.services.util.ObjectMapperResolver;
@ -217,8 +218,8 @@ public class KeycloakApplication extends Application {
KeycloakSession session = sessionFactory.create();
try {
TimerProvider timer = session.getProvider(TimerProvider.class);
timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredEvents()), interval, "ClearExpiredEvents");
timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions()), interval, "ClearExpiredUserSessions");
timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredEvents(), interval), interval, "ClearExpiredEvents");
timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions(), interval), interval, "ClearExpiredUserSessions");
new UsersSyncManager().bootstrapPeriodic(sessionFactory, timer);
} finally {
session.close();

View file

@ -242,7 +242,7 @@ public class RealmAdminResource {
List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
UsersSyncManager usersSyncManager = new UsersSyncManager();
for (final UserFederationProviderModel fedProvider : federationProviders) {
usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
}
adminEvent.operation(OperationType.UPDATE).representation(rep).success();

View file

@ -106,7 +106,7 @@ public class UserFederationProviderResource {
UserFederationProviderModel model = new UserFederationProviderModel(rep.getId(), rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
realm.updateUserFederationProvider(model);
new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
boolean kerberosCredsAdded = UserFederationProvidersResource.checkKerberosCredential(session, realm, model);
if (kerberosCredsAdded) {
logger.addedKerberosToRealmCredentials();
@ -138,7 +138,7 @@ public class UserFederationProviderResource {
auth.requireManage();
realm.removeUserFederationProvider(this.federationProviderModel);
new UsersSyncManager().removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), this.federationProviderModel);
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, this.federationProviderModel, true);
adminEvent.operation(OperationType.DELETE).resourcePath(uriInfo).success();

View file

@ -178,7 +178,7 @@ public class UserFederationProvidersResource {
}
UserFederationProviderModel model = realm.addUserFederationProvider(rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
boolean kerberosCredsAdded = checkKerberosCredential(session, realm, model);
if (kerberosCredsAdded) {
logger.addedKerberosToRealmCredentials();

View file

@ -0,0 +1,68 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.services.scheduled;
import java.util.concurrent.Callable;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
/**
* Ensures that there are not concurrent executions of same task (either on this host or any other cluster host)
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class ClusterAwareScheduledTaskRunner extends ScheduledTaskRunner {
private final int intervalSecs;
public ClusterAwareScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task, long intervalMillis) {
super(sessionFactory, task);
this.intervalSecs = (int) (intervalMillis / 1000);
}
@Override
protected void runTask(final KeycloakSession session) {
session.getTransaction().begin();
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
String taskKey = task.getClass().getSimpleName();
ExecutionResult<Void> result = clusterProvider.executeIfNotExecuted(taskKey, intervalSecs, new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run(session);
return null;
}
});
session.getTransaction().commit();
if (result.isExecuted()) {
logger.debugf("Executed scheduled task %s", taskKey);
} else {
logger.debugf("Skipped execution of task %s as other cluster node is executing it", taskKey);
}
}
}

View file

@ -17,6 +17,10 @@
package org.keycloak.services.scheduled;
import java.util.concurrent.Callable;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.services.ServicesLogger;
@ -26,10 +30,10 @@ import org.keycloak.services.ServicesLogger;
*/
public class ScheduledTaskRunner implements Runnable {
private static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
private final KeycloakSessionFactory sessionFactory;
private final ScheduledTask task;
protected final KeycloakSessionFactory sessionFactory;
protected final ScheduledTask task;
public ScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task) {
this.sessionFactory = sessionFactory;
@ -40,11 +44,7 @@ public class ScheduledTaskRunner implements Runnable {
public void run() {
KeycloakSession session = sessionFactory.create();
try {
session.getTransaction().begin();
task.run(session);
session.getTransaction().commit();
logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
runTask(session);
} catch (Throwable t) {
logger.failedToRunScheduledTask(t, task.getClass().getSimpleName());
@ -58,4 +58,12 @@ public class ScheduledTaskRunner implements Runnable {
}
}
protected void runTask(KeycloakSession session) {
session.getTransaction().begin();
task.run(session);
session.getTransaction().commit();
logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
}
}

View file

@ -33,7 +33,7 @@ import org.keycloak.services.filters.KeycloakSessionServletFilter;
import org.keycloak.services.managers.ApplianceBootstrap;
import org.keycloak.services.managers.RealmManager;
import org.keycloak.services.resources.KeycloakApplication;
import org.keycloak.testsuite.util.cli.InfinispanCLI;
import org.keycloak.testsuite.util.cli.TestsuiteCLI;
import org.keycloak.util.JsonSerialization;
import javax.servlet.DispatcherType;
@ -206,8 +206,8 @@ public class KeycloakServer {
}
});
if (System.getProperties().containsKey("startInfinispanCLI")) {
new InfinispanCLI(keycloak).start();
if (System.getProperties().containsKey("startTestsuiteCLI")) {
new TestsuiteCLI(keycloak).start();
}
return keycloak;

View file

@ -43,23 +43,19 @@ import org.keycloak.services.managers.UsersSyncManager;
import org.keycloak.testsuite.federation.ldap.FederationTestUtils;
import org.keycloak.testsuite.rule.KeycloakRule;
import org.keycloak.testsuite.rule.LDAPRule;
import org.keycloak.testsuite.DummyUserFederationProviderFactory;
import org.keycloak.timer.TimerProvider;
import org.keycloak.common.util.Time;
import java.util.HashMap;
import java.util.Map;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class SyncProvidersTest {
public class LDAPSyncTest {
private static LDAPRule ldapRule = new LDAPRule();
private static UserFederationProviderModel ldapModel = null;
private static UserFederationProviderModel dummyModel = null;
private static KeycloakRule keycloakRule = new KeycloakRule(new KeycloakRule.KeycloakSetup() {
@ -84,9 +80,6 @@ public class SyncProvidersTest {
LDAPObject ldapUser = FederationTestUtils.addLDAPUser(ldapFedProvider, appRealm, "user" + i, "User" + i + "FN", "User" + i + "LN", "user" + i + "@email.org", null, "12" + i);
FederationTestUtils.updateLDAPPassword(ldapFedProvider, ldapUser, "Password1");
}
// Add dummy provider
dummyModel = appRealm.addUserFederationProvider(DummyUserFederationProviderFactory.PROVIDER_NAME, new HashMap<String, String>(), 1, "test-dummy", -1, 1, 0);
}
});
@ -365,37 +358,6 @@ public class SyncProvidersTest {
}
}
@Test
public void testPeriodicSync() {
KeycloakSession session = keycloakRule.startSession();
try {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
DummyUserFederationProviderFactory dummyFedFactory = (DummyUserFederationProviderFactory)sessionFactory.getProviderFactory(UserFederationProvider.class, DummyUserFederationProviderFactory.PROVIDER_NAME);
int full = dummyFedFactory.getFullSyncCounter();
int changed = dummyFedFactory.getChangedSyncCounter();
// Assert that after some period was DummyUserFederationProvider triggered
UsersSyncManager usersSyncManager = new UsersSyncManager();
usersSyncManager.bootstrapPeriodic(sessionFactory, session.getProvider(TimerProvider.class));
sleep(1800);
// Cancel timer
usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
// Assert that DummyUserFederationProviderFactory.syncChangedUsers was invoked
int newChanged = dummyFedFactory.getChangedSyncCounter();
Assert.assertEquals(full, dummyFedFactory.getFullSyncCounter());
Assert.assertTrue(newChanged > changed);
// Assert that dummy provider won't be invoked anymore
sleep(1800);
Assert.assertEquals(full, dummyFedFactory.getFullSyncCounter());
Assert.assertEquals(newChanged, dummyFedFactory.getChangedSyncCounter());
} finally {
keycloakRule.stopSession(session, false);
}
}
private void sleep(int time) {
try {
Thread.sleep(time);

View file

@ -0,0 +1,116 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.federation.sync;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserFederationProviderModel;
import org.keycloak.models.UserFederationSyncResult;
import org.keycloak.models.UserModel;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.testsuite.DummyUserFederationProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class SyncDummyUserFederationProviderFactory extends DummyUserFederationProviderFactory {
// Used during SyncFederationTest
static volatile CountDownLatch latch1 = new CountDownLatch(1);
static volatile CountDownLatch latch2 = new CountDownLatch(1);
static void restartLatches() {
latch1 = new CountDownLatch(1);
latch2 = new CountDownLatch(1);
}
private static final Logger logger = Logger.getLogger(SyncDummyUserFederationProviderFactory.class);
public static final String SYNC_PROVIDER_ID = "sync-dummy";
public static final String WAIT_TIME = "wait-time"; // waitTime before transaction is commited
@Override
public String getId() {
return SYNC_PROVIDER_ID;
}
@Override
public Set<String> getConfigurationOptions() {
Set<String> list = super.getConfigurationOptions();
list.add(WAIT_TIME);
return list;
}
@Override
public UserFederationSyncResult syncChangedUsers(KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel model, Date lastSync) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
int waitTime = Integer.parseInt(model.getConfig().get(WAIT_TIME));
logger.infof("Starting sync of changed users. Wait time is: %s", waitTime);
RealmModel realm = session.realms().getRealm(realmId);
// KEYCLOAK-2412 : Just remove and add some users for testing purposes
for (int i = 0; i < 10; i++) {
String username = "dummyuser-" + i;
UserModel user = session.userStorage().getUserByUsername(username, realm);
if (user != null) {
session.userStorage().removeUser(realm, user);
}
user = session.userStorage().addUser(realm, username);
}
logger.infof("Finished sync of changed users. Waiting now for %d seconds", waitTime);
try {
latch1.await(waitTime * 1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted!", ie);
}
logger.infof("Finished waiting");
}
});
// countDown, so the SyncFederationTest can continue
latch2.countDown();
return new UserFederationSyncResult();
}
}

View file

@ -0,0 +1,175 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.federation.sync;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserFederationProvider;
import org.keycloak.models.UserFederationProviderModel;
import org.keycloak.models.UserFederationSyncResult;
import org.keycloak.services.managers.RealmManager;
import org.keycloak.services.managers.UsersSyncManager;
import org.keycloak.testsuite.DummyUserFederationProviderFactory;
import org.keycloak.testsuite.rule.KeycloakRule;
import org.keycloak.timer.TimerProvider;
/**
* Test with Dummy providers (For LDAP see {@link org.keycloak.testsuite.federation.ldap.base.LDAPSyncTest}
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class SyncFederationTest {
private static UserFederationProviderModel dummyModel = null;
@ClassRule
public static KeycloakRule keycloakRule = new KeycloakRule(new KeycloakRule.KeycloakSetup() {
@Override
public void config(RealmManager manager, RealmModel adminstrationRealm, RealmModel appRealm) {
// Other tests may left Time offset uncleared, which could cause issues
Time.setOffset(0);
}
});
@Test
public void test01PeriodicSync() {
// Enable timer for SyncDummyUserFederationProvider
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
@Override
public void config(RealmManager manager, RealmModel adminstrationRealm, RealmModel appRealm) {
dummyModel = appRealm.addUserFederationProvider(DummyUserFederationProviderFactory.PROVIDER_NAME, new HashMap<String, String>(), 1, "test-sync-dummy", -1, 1, 0);
}
});
KeycloakSession session = keycloakRule.startSession();
try {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
DummyUserFederationProviderFactory dummyFedFactory = (DummyUserFederationProviderFactory)sessionFactory.getProviderFactory(UserFederationProvider.class, DummyUserFederationProviderFactory.PROVIDER_NAME);
int full = dummyFedFactory.getFullSyncCounter();
int changed = dummyFedFactory.getChangedSyncCounter();
// Assert that after some period was DummyUserFederationProvider triggered
UsersSyncManager usersSyncManager = new UsersSyncManager();
usersSyncManager.bootstrapPeriodic(sessionFactory, session.getProvider(TimerProvider.class));
sleep(1800);
// Cancel timer
RealmModel appRealm = session.realms().getRealmByName("test");
usersSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
// Assert that DummyUserFederationProviderFactory.syncChangedUsers was invoked
int newChanged = dummyFedFactory.getChangedSyncCounter();
Assert.assertEquals(full, dummyFedFactory.getFullSyncCounter());
Assert.assertTrue(newChanged > changed);
// Assert that dummy provider won't be invoked anymore
sleep(1800);
Assert.assertEquals(full, dummyFedFactory.getFullSyncCounter());
Assert.assertEquals(newChanged, dummyFedFactory.getChangedSyncCounter());
} finally {
keycloakRule.stopSession(session, true);
}
// remove dummyProvider
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
@Override
public void config(RealmManager manager, RealmModel adminstrationRealm, RealmModel appRealm) {
appRealm.removeUserFederationProvider(dummyModel);
}
});
}
@Test
public void test02ConcurrentSync() throws Exception {
SyncDummyUserFederationProviderFactory.restartLatches();
// Enable timer for SyncDummyUserFederationProvider
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
@Override
public void config(RealmManager manager, RealmModel adminstrationRealm, RealmModel appRealm) {
Map<String, String> config = new HashMap<>();
config.put(SyncDummyUserFederationProviderFactory.WAIT_TIME, "2000");
dummyModel = appRealm.addUserFederationProvider(SyncDummyUserFederationProviderFactory.SYNC_PROVIDER_ID, config, 1, "test-sync-dummy", -1, 1, 0);
}
});
KeycloakSession session = keycloakRule.startSession();
try {
KeycloakSessionFactory sessionFactory = session.getKeycloakSessionFactory();
// bootstrap periodic sync
UsersSyncManager usersSyncManager = new UsersSyncManager();
usersSyncManager.bootstrapPeriodic(sessionFactory, session.getProvider(TimerProvider.class));
// Wait and then trigger sync manually. Assert it will be ignored
sleep(1800);
RealmModel realm = session.realms().getRealm("test");
UserFederationSyncResult syncResult = usersSyncManager.syncChangedUsers(sessionFactory, realm.getId(), dummyModel);
Assert.assertTrue(syncResult.isIgnored());
// Cancel timer
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, dummyModel, true);
// Signal to factory to finish waiting
SyncDummyUserFederationProviderFactory.latch1.countDown();
} finally {
keycloakRule.stopSession(session, true);
}
SyncDummyUserFederationProviderFactory.latch2.await(20000, TimeUnit.MILLISECONDS);
// remove provider
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
@Override
public void config(RealmManager manager, RealmModel adminstrationRealm, RealmModel appRealm) {
appRealm.removeUserFederationProvider(dummyModel);
}
});
}
private void sleep(int time) {
try {
Thread.sleep(time);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}

View file

@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.ClientModel;
import org.keycloak.models.ClientSessionModel;
@ -83,7 +84,7 @@ public class UserSessionInitializerTest {
// Create and persist offline sessions
int started = Time.currentTime();
int serverStartTime = session.sessions().getClusterStartupTime();
int serverStartTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
for (UserSessionModel origSession : origSessions) {
UserSessionModel userSession = session.sessions().getUserSession(realm, origSession.getId());
@ -99,7 +100,7 @@ public class UserSessionInitializerTest {
// Clear ispn cache to ensure initializerState is removed as well
InfinispanConnectionProvider infinispan = session.getProvider(InfinispanConnectionProvider.class);
infinispan.getCache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME).clear();
infinispan.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME).clear();
resetSession();

View file

@ -35,7 +35,7 @@ public abstract class AbstractCommand {
protected List<String> args;
protected KeycloakSessionFactory sessionFactory;
public void injectProperties(List<String> args, InfinispanCLI cli, KeycloakSessionFactory sessionFactory) {
public void injectProperties(List<String> args, TestsuiteCLI cli, KeycloakSessionFactory sessionFactory) {
this.args = args;
this.sessionFactory = sessionFactory;
}

View file

@ -43,10 +43,32 @@ public class PersistSessionsCommand extends AbstractCommand {
@Override
public void doRunCommand(KeycloakSession sess) {
final int count = getIntArg(0);
final int batchCount = getIntArg(1);
int remaining = count;
while (remaining > 0) {
int createInThisBatch = Math.min(batchCount, remaining);
createSessionsBatch(createInThisBatch);
remaining = remaining - createInThisBatch;
}
// Write some summary
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
log.info("Command finished. Total number of sessions in persister: " + persister.getUserSessionsCount(true));
}
});
}
private void createSessionsBatch(final int countInThisBatch) {
final List<String> userSessionIds = new LinkedList<>();
final List<String> clientSessionIds = new LinkedList<>();
// Create sessions in separate transaction first
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
@ -56,7 +78,7 @@ public class PersistSessionsCommand extends AbstractCommand {
ClientModel testApp = realm.getClientByClientId("security-admin-console");
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
for (int i = 0; i < count; i++) {
for (int i = 0; i < countInThisBatch; i++) {
UserSessionModel userSession = session.sessions().createUserSession(realm, john, "john-doh@localhost", "127.0.0.2", "form", true, null, null);
ClientSessionModel clientSession = session.sessions().createClientSession(realm, testApp);
clientSession.setUserSession(userSession);
@ -69,9 +91,10 @@ public class PersistSessionsCommand extends AbstractCommand {
});
log.info("Sessions created in infinispan storage");
log.infof("%d sessions created in infinispan storage", countInThisBatch);
// Persist them now
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
@ -84,35 +107,17 @@ public class PersistSessionsCommand extends AbstractCommand {
counter++;
UserSessionModel userSession = session.sessions().getUserSession(realm, userSessionId);
persister.createUserSession(userSession, true);
if (counter%1000 == 0) {
log.infof("%d user sessions persisted. Continue", counter);
}
}
log.infof("All %d user sessions persisted", counter);
log.infof("%d user sessions persisted. Continue", counter);
counter = 0;
for (String clientSessionId : clientSessionIds) {
counter++;
ClientSessionModel clientSession = session.sessions().getClientSession(realm, clientSessionId);
persister.createClientSession(clientSession, true);
if (counter%1000 == 0) {
log.infof("%d client sessions persisted. Continue", counter);
}
}
log.infof("All %d client sessions persisted", counter);
}
});
// Persist them now
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
log.info("Total number of sessions in persister: " + persister.getUserSessionsCount(true));
log.infof("%d client sessions persisted. Continue", counter);
}
});
@ -120,6 +125,6 @@ public class PersistSessionsCommand extends AbstractCommand {
@Override
public String printUsage() {
return super.printUsage() + " <sessions-count>";
return super.printUsage() + " <sessions-count> <sessions-count-per-each-transaction>";
}
}

View file

@ -0,0 +1,73 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.util.cli;
import java.util.HashMap;
import java.util.Map;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserFederationProviderModel;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.services.managers.UsersSyncManager;
import org.keycloak.testsuite.federation.sync.SyncDummyUserFederationProviderFactory;
import org.keycloak.timer.TimerProvider;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class SyncDummyFederationProviderCommand extends AbstractCommand {
@Override
protected void doRunCommand(KeycloakSession session) {
int waitTime = getIntArg(0);
int changedSyncPeriod = getIntArg(1);
RealmModel realm = session.realms().getRealmByName("master");
UserFederationProviderModel fedProviderModel = KeycloakModelUtils.findUserFederationProviderByDisplayName("cluster-dummy", realm);
if (fedProviderModel == null) {
Map<String, String> cfg = new HashMap<>();
updateConfig(cfg, waitTime);
fedProviderModel = realm.addUserFederationProvider(SyncDummyUserFederationProviderFactory.SYNC_PROVIDER_ID, cfg, 1, "cluster-dummy", -1, changedSyncPeriod, -1);
} else {
Map<String, String> cfg = fedProviderModel.getConfig();
updateConfig(cfg, waitTime);
fedProviderModel.setChangedSyncPeriod(changedSyncPeriod);
realm.updateUserFederationProvider(fedProviderModel);
}
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, fedProviderModel, false);
log.infof("User federation provider created and sync was started", waitTime);
}
private void updateConfig(Map<String, String> cfg, int waitTime) {
cfg.put(SyncDummyUserFederationProviderFactory.WAIT_TIME, String.valueOf(waitTime));
}
@Override
public String getName() {
return "startSyncDummy";
}
@Override
public String printUsage() {
return super.printUsage() + " <wait-time-before-sync-commit-in-seconds> <changed-sync-period-in-seconds>";
}
}

View file

@ -32,27 +32,13 @@ import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.testsuite.KeycloakServer;
/**
* HOWTO USE THIS:
*
* 1) Run KeycloakServer with system properties (assuming mongo up and running on localhost):
* -Dkeycloak.realm.provider=mongo -Dkeycloak.user.provider=mongo -Dkeycloak.userSessionPersister.provider=mongo -Dkeycloak.connectionsMongo.db=keycloak -Dkeycloak.connectionsInfinispan.clustered=true -Dresources -DstartInfinispanCLI
*
* 2) Write command on STDIN to persist 50000 userSessions to mongo: persistSessions 50000
*
* 3) Run command "clear" to ensure infinispan cache is cleared. Doublecheck with command "size" is 0
*
* 4) Write command to load sessions from persistent storage - 100 sessions per worker transaction: loadPersistentSessions 100
*
* See the progress in log. Finally run command "size" to ensure size is 100001 (50000 userSessions + 50000 clientSessions + 1 initializationState item)
*
* 5) Alternative to step 3+4 - Kill the server after step 2 and start two KeycloakServer in parallel on ports 8081 and 8082 . See the progress in logs of loading persistent sessions to infinispan.
* Kill the coordinator (usually 8081 node) during startup and see the node 8082 became coordinator and took ownership of loading persistent sessions. After node 8082 fully started, the size of infinispan is again 100001
* See Testsuite.md (section how to create many users and offline sessions)
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanCLI {
public class TestsuiteCLI {
private static final Logger log = Logger.getLogger(InfinispanCLI.class);
private static final Logger log = Logger.getLogger(TestsuiteCLI.class);
private static final Class<?>[] BUILTIN_COMMANDS = {
ExitCommand.class,
@ -69,13 +55,14 @@ public class InfinispanCLI {
UserCommands.Create.class,
UserCommands.Remove.class,
UserCommands.Count.class,
UserCommands.GetUser.class
UserCommands.GetUser.class,
SyncDummyFederationProviderCommand.class
};
private final KeycloakSessionFactory sessionFactory;
private final Map<String, Class<? extends AbstractCommand>> commands = new LinkedHashMap<>();
public InfinispanCLI(KeycloakServer server) {
public TestsuiteCLI(KeycloakServer server) {
this.sessionFactory = server.getSessionFactory();
// register builtin commands
@ -96,7 +83,7 @@ public class InfinispanCLI {
// WARNING: Stdin blocking operation
public void start() throws IOException {
log.info("Starting infinispan CLI. Exit with 'exit' . Available commands with 'help' ");
log.info("Starting testsuite CLI. Exit with 'exit' . Available commands with 'help' ");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
@ -131,7 +118,7 @@ public class InfinispanCLI {
System.out.print("$ ");
}
} finally {
log.info("Exit infinispan CLI");
log.info("Exit testsuite CLI");
reader.close();
}
}
@ -164,7 +151,7 @@ public class InfinispanCLI {
private List<String> commandNames = new ArrayList<>();
@Override
public void injectProperties(List<String> args, InfinispanCLI cli, KeycloakSessionFactory sessionFactory) {
public void injectProperties(List<String> args, TestsuiteCLI cli, KeycloakSessionFactory sessionFactory) {
for (String commandName : cli.commands.keySet()) {
commandNames.add(commandName);
}

View file

@ -15,4 +15,5 @@
# limitations under the License.
#
org.keycloak.testsuite.DummyUserFederationProviderFactory
org.keycloak.testsuite.DummyUserFederationProviderFactory
org.keycloak.testsuite.federation.sync.SyncDummyUserFederationProviderFactory

View file

@ -62,6 +62,7 @@ public class KeycloakServerDeploymentProcessor implements DeploymentUnitProcesso
st.addDependency(cacheContainerService.append("sessions"));
st.addDependency(cacheContainerService.append("offlineSessions"));
st.addDependency(cacheContainerService.append("loginFailures"));
st.addDependency(cacheContainerService.append("work"));
st.addDependency(cacheContainerService.append("realmVersions"));
}
}

View file

@ -30,6 +30,7 @@
<local-cache name="sessions"/>
<local-cache name="offlineSessions"/>
<local-cache name="loginFailures"/>
<local-cache name="work"/>
<local-cache name="realmVersions">
<transaction mode="BATCH" locking="PESSIMISTIC"/>
</local-cache>
@ -90,6 +91,7 @@
<distributed-cache name="sessions" mode="SYNC" owners="1"/>
<distributed-cache name="offlineSessions" mode="SYNC" owners="1"/>
<distributed-cache name="loginFailures" mode="SYNC" owners="1"/>
<replicated-cache name="work" mode="SYNC" />
<local-cache name="realmVersions">
<transaction mode="BATCH" locking="PESSIMISTIC"/>
</local-cache>