KEYCLOAK-2412 Added ClusterProvider. Avoid concurrent federation sync execution by more cluster nodes at the same time.
Clustering - more progress
This commit is contained in:
parent
61f2baf65e
commit
1328531f31
34 changed files with 1087 additions and 107 deletions
|
@ -0,0 +1,186 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.lifecycle.ComponentStatus;
|
||||
import org.infinispan.remoting.transport.Transport;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.cluster.ClusterListener;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.cluster.ExecutionResult;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
|
||||
/**
|
||||
* Various utils related to clustering
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class InfinispanClusterProvider implements ClusterProvider {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(InfinispanClusterProvider.class);
|
||||
|
||||
public static final String CLUSTER_STARTUP_TIME_KEY = "cluster-start-time";
|
||||
private static final String TASK_KEY_PREFIX = "task::";
|
||||
|
||||
private final InfinispanClusterProviderFactory factory;
|
||||
private final KeycloakSession session;
|
||||
private final Cache<String, Serializable> cache;
|
||||
|
||||
public InfinispanClusterProvider(InfinispanClusterProviderFactory factory, KeycloakSession session, Cache<String, Serializable> cache) {
|
||||
this.factory = factory;
|
||||
this.session = session;
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getClusterStartupTime() {
|
||||
Integer existingClusterStartTime = (Integer) cache.get(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY);
|
||||
if (existingClusterStartTime != null) {
|
||||
return existingClusterStartTime;
|
||||
} else {
|
||||
// clusterStartTime not yet initialized. Let's try to put our startupTime
|
||||
int serverStartTime = (int) (session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
|
||||
|
||||
existingClusterStartTime = (Integer) cache.putIfAbsent(InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartTime);
|
||||
if (existingClusterStartTime == null) {
|
||||
logger.infof("Initialized cluster startup time to %s", Time.toDate(serverStartTime).toString());
|
||||
return serverStartTime;
|
||||
} else {
|
||||
return existingClusterStartTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task) {
|
||||
String cacheKey = TASK_KEY_PREFIX + taskKey;
|
||||
boolean locked = tryLock(cacheKey, taskTimeoutInSeconds);
|
||||
if (locked) {
|
||||
try {
|
||||
try {
|
||||
T result = task.call();
|
||||
return ExecutionResult.executed(result);
|
||||
} catch (RuntimeException re) {
|
||||
throw re;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Unexpected exception when executed task " + taskKey, e);
|
||||
}
|
||||
} finally {
|
||||
removeFromCache(cacheKey);
|
||||
}
|
||||
} else {
|
||||
return ExecutionResult.notExecuted();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void registerListener(String taskKey, ClusterListener task) {
|
||||
factory.registerListener(taskKey, task);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void notify(String taskKey, ClusterEvent event) {
|
||||
// Put the value to the cache to notify listeners on all the nodes
|
||||
cache.put(taskKey, event);
|
||||
}
|
||||
|
||||
|
||||
private String getCurrentNode(Cache<String, Serializable> cache) {
|
||||
Transport transport = cache.getCacheManager().getTransport();
|
||||
return transport==null ? "local" : transport.getAddress().toString();
|
||||
}
|
||||
|
||||
|
||||
private LockEntry createLockEntry(Cache<String, Serializable> cache) {
|
||||
LockEntry lock = new LockEntry();
|
||||
lock.setNode(getCurrentNode(cache));
|
||||
lock.setTimestamp(Time.currentTime());
|
||||
return lock;
|
||||
}
|
||||
|
||||
|
||||
private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
|
||||
LockEntry myLock = createLockEntry(cache);
|
||||
|
||||
LockEntry existingLock = (LockEntry) cache.putIfAbsent(cacheKey, myLock);
|
||||
if (existingLock != null) {
|
||||
// Task likely already in progress. Check if timestamp is not outdated
|
||||
int thatTime = existingLock.getTimestamp();
|
||||
int currentTime = Time.currentTime();
|
||||
if (thatTime + taskTimeoutInSeconds < currentTime) {
|
||||
logger.infof("Task %s outdated when in progress by node %s. Will try to replace task with our node %s", cacheKey, existingLock.getNode(), myLock.getNode());
|
||||
boolean replaced = cache.replace(cacheKey, existingLock, myLock);
|
||||
// TODO: trace
|
||||
if (!replaced) {
|
||||
logger.infof("Failed to replace the task %s. Other thread replaced in the meantime. Ignoring task.", cacheKey);
|
||||
}
|
||||
return replaced;
|
||||
} else {
|
||||
logger.infof("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.getNode());
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
logger.infof("Successfully acquired lock for task %s. Our node is %s", cacheKey, myLock.getNode());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void removeFromCache(String cacheKey) {
|
||||
// 3 attempts to send the message (it may fail if some node fails in the meantime)
|
||||
int retry = 3;
|
||||
while (true) {
|
||||
try {
|
||||
cache.getAdvancedCache()
|
||||
.withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
|
||||
.remove(cacheKey);
|
||||
logger.infof("Task %s removed from the cache", cacheKey);
|
||||
return;
|
||||
} catch (RuntimeException e) {
|
||||
ComponentStatus status = cache.getStatus();
|
||||
if (status.isStopping() || status.isTerminated()) {
|
||||
logger.warnf("Failed to remove task %s from the cache. Cache is already terminating", cacheKey);
|
||||
logger.debug(e.getMessage(), e);
|
||||
return;
|
||||
}
|
||||
retry--;
|
||||
if (retry == 0) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,199 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.notifications.Listener;
|
||||
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
|
||||
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
|
||||
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
|
||||
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
|
||||
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
|
||||
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
|
||||
import org.infinispan.remoting.transport.Address;
|
||||
import org.infinispan.remoting.transport.Transport;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.Config;
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.cluster.ClusterListener;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.cluster.ClusterProviderFactory;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class InfinispanClusterProviderFactory implements ClusterProviderFactory {
|
||||
|
||||
public static final String PROVIDER_ID = "infinispan";
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(InfinispanClusterProviderFactory.class);
|
||||
|
||||
private volatile Cache<String, Serializable> workCache;
|
||||
|
||||
private Map<String, ClusterListener> listeners = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public ClusterProvider create(KeycloakSession session) {
|
||||
lazyInit(session);
|
||||
return new InfinispanClusterProvider(this, session, workCache);
|
||||
}
|
||||
|
||||
private void lazyInit(KeycloakSession session) {
|
||||
if (workCache == null) {
|
||||
synchronized (this) {
|
||||
if (workCache == null) {
|
||||
workCache = session.getProvider(InfinispanConnectionProvider.class).getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
workCache.getCacheManager().addListener(new ViewChangeListener());
|
||||
workCache.addListener(new CacheEntryListener());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Config.Scope config) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postInit(KeycloakSessionFactory factory) {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return PROVIDER_ID;
|
||||
}
|
||||
|
||||
|
||||
@Listener
|
||||
public class ViewChangeListener {
|
||||
|
||||
@ViewChanged
|
||||
public void viewChanged(ViewChangedEvent event) {
|
||||
EmbeddedCacheManager cacheManager = event.getCacheManager();
|
||||
Transport transport = cacheManager.getTransport();
|
||||
|
||||
// Coordinator makes sure that entries for outdated nodes are cleaned up
|
||||
if (transport != null && transport.isCoordinator()) {
|
||||
|
||||
Set<String> newAddresses = convertAddresses(event.getNewMembers());
|
||||
Set<String> removedNodesAddresses = convertAddresses(event.getOldMembers());
|
||||
removedNodesAddresses.removeAll(newAddresses);
|
||||
|
||||
if (removedNodesAddresses.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.infof("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
|
||||
|
||||
Cache<String, Serializable> cache = cacheManager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
|
||||
Iterator<String> toRemove = cache.entrySet().stream().filter(new Predicate<Map.Entry<String, Serializable>>() {
|
||||
|
||||
@Override
|
||||
public boolean test(Map.Entry<String, Serializable> entry) {
|
||||
if (!(entry.getValue() instanceof LockEntry)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LockEntry lock = (LockEntry) entry.getValue();
|
||||
return removedNodesAddresses.contains(lock.getNode());
|
||||
}
|
||||
|
||||
}).map(new Function<Map.Entry<String, Serializable>, String>() {
|
||||
|
||||
@Override
|
||||
public String apply(Map.Entry<String, Serializable> entry) {
|
||||
return entry.getKey();
|
||||
}
|
||||
|
||||
}).iterator();
|
||||
|
||||
while (toRemove.hasNext()) {
|
||||
String rem = toRemove.next();
|
||||
logger.infof("Removing task %s due it's node left cluster", rem);
|
||||
cache.remove(rem);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Set<String> convertAddresses(Collection<Address> addresses) {
|
||||
return addresses.stream().map(new Function<Address, String>() {
|
||||
|
||||
@Override
|
||||
public String apply(Address address) {
|
||||
return address.toString();
|
||||
}
|
||||
|
||||
}).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
<T> void registerListener(String taskKey, ClusterListener task) {
|
||||
listeners.put(taskKey, task);
|
||||
}
|
||||
|
||||
@Listener
|
||||
public class CacheEntryListener {
|
||||
|
||||
@CacheEntryCreated
|
||||
public void cacheEntryCreated(CacheEntryCreatedEvent<String, Object> event) {
|
||||
if (!event.isPre()) {
|
||||
trigger(event.getKey(), event.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@CacheEntryModified
|
||||
public void cacheEntryModified(CacheEntryModifiedEvent<String, Object> event) {
|
||||
if (!event.isPre()) {
|
||||
trigger(event.getKey(), event.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
private void trigger(String key, Object value) {
|
||||
ClusterListener task = listeners.get(key);
|
||||
if (task != null) {
|
||||
ClusterEvent event = (ClusterEvent) value;
|
||||
task.run(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class LockEntry implements Serializable {
|
||||
|
||||
private String node;
|
||||
private int timestamp;
|
||||
|
||||
public String getNode() {
|
||||
return node;
|
||||
}
|
||||
|
||||
public void setNode(String node) {
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
public int getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public void setTimestamp(int timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
}
|
|
@ -23,6 +23,8 @@ import org.infinispan.configuration.cache.ConfigurationBuilder;
|
|||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.transaction.LockingMode;
|
||||
import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.Config;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
|
@ -145,6 +147,13 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
|||
cacheManager.defineConfiguration(InfinispanConnectionProvider.SESSION_CACHE_NAME, sessionCacheConfiguration);
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME, sessionCacheConfiguration);
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME, sessionCacheConfiguration);
|
||||
|
||||
ConfigurationBuilder replicationConfigBuilder = new ConfigurationBuilder();
|
||||
if (clustered) {
|
||||
replicationConfigBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
|
||||
}
|
||||
Configuration replicationCacheConfiguration = replicationConfigBuilder.build();
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.WORK_CACHE_NAME, replicationCacheConfiguration);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ public interface InfinispanConnectionProvider extends Provider {
|
|||
static final String SESSION_CACHE_NAME = "sessions";
|
||||
static final String OFFLINE_SESSION_CACHE_NAME = "offlineSessions";
|
||||
static final String LOGIN_FAILURE_CACHE_NAME = "loginFailures";
|
||||
static final String WORK_CACHE_NAME = "work";
|
||||
|
||||
<K, V> Cache<K, V> getCache(String name);
|
||||
|
||||
|
|
|
@ -21,15 +21,41 @@ import org.infinispan.Cache;
|
|||
import org.infinispan.CacheStream;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.models.*;
|
||||
import org.keycloak.models.ClientInitialAccessModel;
|
||||
import org.keycloak.models.ClientModel;
|
||||
import org.keycloak.models.ClientSessionModel;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakTransaction;
|
||||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
import org.keycloak.models.UserSessionProvider;
|
||||
import org.keycloak.models.UsernameLoginFailureModel;
|
||||
import org.keycloak.models.session.UserSessionPersisterProvider;
|
||||
import org.keycloak.models.sessions.infinispan.entities.*;
|
||||
import org.keycloak.models.sessions.infinispan.initializer.TimeAwareInitializerState;
|
||||
import org.keycloak.models.sessions.infinispan.stream.*;
|
||||
import org.keycloak.models.sessions.infinispan.entities.ClientInitialAccessEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.ClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.LoginFailureEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.LoginFailureKey;
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.stream.ClientInitialAccessPredicate;
|
||||
import org.keycloak.models.sessions.infinispan.stream.ClientSessionPredicate;
|
||||
import org.keycloak.models.sessions.infinispan.stream.Comparators;
|
||||
import org.keycloak.models.sessions.infinispan.stream.Mappers;
|
||||
import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
|
||||
import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
|
||||
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
import org.keycloak.models.utils.RealmInfoUtil;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -411,19 +437,6 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
loginFailureCache.remove(new LoginFailureKey(realm.getId(), user.getEmail()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getClusterStartupTime() {
|
||||
TimeAwareInitializerState state = (TimeAwareInitializerState) offlineSessionCache.get(InfinispanUserSessionProviderFactory.SESSION_INITIALIZER_STATE_KEY);
|
||||
int startTime;
|
||||
if (state == null) {
|
||||
log.warn("Cluster startup time not yet available. Fallback to local startup time");
|
||||
startTime = (int)(session.getKeycloakSessionFactory().getServerStartupTimestamp() / 1000);
|
||||
} else {
|
||||
startTime = state.getClusterStartupTime();
|
||||
}
|
||||
return startTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.keycloak.models.sessions.infinispan;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.Config;
|
||||
|
@ -34,9 +36,6 @@ import org.keycloak.provider.ProviderEventListener;
|
|||
|
||||
public class InfinispanUserSessionProviderFactory implements UserSessionProviderFactory {
|
||||
|
||||
private static final String STATE_KEY_PREFIX = "initializerState";
|
||||
public static final String SESSION_INITIALIZER_STATE_KEY = STATE_KEY_PREFIX + "::offlineUserSessions";
|
||||
|
||||
private static final Logger log = Logger.getLogger(InfinispanUserSessionProviderFactory.class);
|
||||
|
||||
private Config.Scope config;
|
||||
|
@ -85,9 +84,9 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
|
|||
@Override
|
||||
public void run(KeycloakSession session) {
|
||||
InfinispanConnectionProvider connections = session.getProvider(InfinispanConnectionProvider.class);
|
||||
Cache<String, SessionEntity> cache = connections.getCache(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME);
|
||||
Cache<String, Serializable> cache = connections.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
|
||||
InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, SESSION_INITIALIZER_STATE_KEY);
|
||||
InfinispanUserSessionInitializer initializer = new InfinispanUserSessionInitializer(sessionFactory, cache, new OfflineUserSessionLoader(), maxErrors, sessionsPerSegment, "offlineUserSessions");
|
||||
initializer.initCache();
|
||||
initializer.loadPersistentSessions();
|
||||
}
|
||||
|
|
|
@ -20,57 +20,55 @@ package org.keycloak.models.sessions.infinispan.initializer;
|
|||
import java.io.Serializable;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.distexec.DefaultExecutorService;
|
||||
import org.infinispan.notifications.Listener;
|
||||
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
|
||||
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
|
||||
import org.infinispan.lifecycle.ComponentStatus;
|
||||
import org.infinispan.remoting.transport.Transport;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.KeycloakSessionTask;
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
||||
/**
|
||||
* Startup initialization for reading persistent userSessions/clientSessions to be filled into infinispan/memory . In cluster,
|
||||
* the initialization is distributed among all cluster nodes, so the startup time is even faster
|
||||
*
|
||||
* TODO: Move to clusterService. Implementation is already pretty generic and doesn't contain any "userSession" specific stuff. All logic is in the SessionLoader implementation
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class InfinispanUserSessionInitializer {
|
||||
|
||||
private static final String STATE_KEY_PREFIX = "distributed::";
|
||||
|
||||
private static final Logger log = Logger.getLogger(InfinispanUserSessionInitializer.class);
|
||||
|
||||
private final KeycloakSessionFactory sessionFactory;
|
||||
private final Cache<String, SessionEntity> cache;
|
||||
private final Cache<String, Serializable> workCache;
|
||||
private final SessionLoader sessionLoader;
|
||||
private final int maxErrors;
|
||||
private final int sessionsPerSegment;
|
||||
private final String stateKey;
|
||||
|
||||
|
||||
public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, SessionEntity> cache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKey) {
|
||||
public InfinispanUserSessionInitializer(KeycloakSessionFactory sessionFactory, Cache<String, Serializable> workCache, SessionLoader sessionLoader, int maxErrors, int sessionsPerSegment, String stateKeySuffix) {
|
||||
this.sessionFactory = sessionFactory;
|
||||
this.cache = cache;
|
||||
this.workCache = workCache;
|
||||
this.sessionLoader = sessionLoader;
|
||||
this.maxErrors = maxErrors;
|
||||
this.sessionsPerSegment = sessionsPerSegment;
|
||||
this.stateKey = stateKey;
|
||||
this.stateKey = STATE_KEY_PREFIX + stateKeySuffix;
|
||||
}
|
||||
|
||||
public void initCache() {
|
||||
this.cache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
|
||||
this.workCache.getAdvancedCache().getComponentRegistry().registerComponent(sessionFactory, KeycloakSessionFactory.class);
|
||||
}
|
||||
|
||||
|
||||
|
@ -94,16 +92,14 @@ public class InfinispanUserSessionInitializer {
|
|||
|
||||
|
||||
private boolean isFinished() {
|
||||
InitializerState state = (InitializerState) cache.get(stateKey);
|
||||
InitializerState state = (InitializerState) workCache.get(stateKey);
|
||||
return state != null && state.isFinished();
|
||||
}
|
||||
|
||||
|
||||
private InitializerState getOrCreateInitializerState() {
|
||||
TimeAwareInitializerState state = (TimeAwareInitializerState) cache.get(stateKey);
|
||||
InitializerState state = (InitializerState) workCache.get(stateKey);
|
||||
if (state == null) {
|
||||
int startTime = (int)(sessionFactory.getServerStartupTimestamp() / 1000);
|
||||
|
||||
final int[] count = new int[1];
|
||||
|
||||
// Rather use separate transactions for update and counting
|
||||
|
@ -111,7 +107,7 @@ public class InfinispanUserSessionInitializer {
|
|||
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
|
||||
@Override
|
||||
public void run(KeycloakSession session) {
|
||||
sessionLoader.init(session, startTime);
|
||||
sessionLoader.init(session);
|
||||
}
|
||||
|
||||
});
|
||||
|
@ -124,9 +120,8 @@ public class InfinispanUserSessionInitializer {
|
|||
|
||||
});
|
||||
|
||||
state = new TimeAwareInitializerState();
|
||||
state = new InitializerState();
|
||||
state.init(count[0], sessionsPerSegment);
|
||||
state.setClusterStartupTime(startTime);
|
||||
saveStateToCache(state);
|
||||
}
|
||||
return state;
|
||||
|
@ -143,7 +138,7 @@ public class InfinispanUserSessionInitializer {
|
|||
public void run() {
|
||||
|
||||
// Save this synchronously to ensure all nodes read correct state
|
||||
InfinispanUserSessionInitializer.this.cache.getAdvancedCache().
|
||||
InfinispanUserSessionInitializer.this.workCache.getAdvancedCache().
|
||||
withFlags(Flag.IGNORE_RETURN_VALUES, Flag.FORCE_SYNCHRONOUS)
|
||||
.put(stateKey, state);
|
||||
}
|
||||
|
@ -153,7 +148,7 @@ public class InfinispanUserSessionInitializer {
|
|||
|
||||
|
||||
private boolean isCoordinator() {
|
||||
Transport transport = cache.getCacheManager().getTransport();
|
||||
Transport transport = workCache.getCacheManager().getTransport();
|
||||
return transport == null || transport.isCoordinator();
|
||||
}
|
||||
|
||||
|
@ -166,9 +161,9 @@ public class InfinispanUserSessionInitializer {
|
|||
int processors = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
ExecutorService localExecutor = Executors.newCachedThreadPool();
|
||||
Transport transport = cache.getCacheManager().getTransport();
|
||||
Transport transport = workCache.getCacheManager().getTransport();
|
||||
boolean distributed = transport != null;
|
||||
ExecutorService executorService = distributed ? new DefaultExecutorService(cache, localExecutor) : localExecutor;
|
||||
ExecutorService executorService = distributed ? new DefaultExecutorService(workCache, localExecutor) : localExecutor;
|
||||
|
||||
int errors = 0;
|
||||
|
||||
|
@ -190,7 +185,7 @@ public class InfinispanUserSessionInitializer {
|
|||
SessionInitializerWorker worker = new SessionInitializerWorker();
|
||||
worker.setWorkerEnvironment(segment, sessionsPerSegment, sessionLoader);
|
||||
if (!distributed) {
|
||||
worker.setEnvironment(cache, null);
|
||||
worker.setEnvironment(workCache, null);
|
||||
}
|
||||
|
||||
Future<WorkerResult> future = executorService.submit(worker);
|
||||
|
@ -242,6 +237,12 @@ public class InfinispanUserSessionInitializer {
|
|||
runnable.run();
|
||||
return;
|
||||
} catch (RuntimeException e) {
|
||||
ComponentStatus status = workCache.getStatus();
|
||||
if (status.isStopping() || status.isTerminated()) {
|
||||
log.warn("Failed to put initializerState to the cache. Cache is already terminating");
|
||||
log.debug(e.getMessage(), e);
|
||||
return;
|
||||
}
|
||||
retry--;
|
||||
if (retry == 0) {
|
||||
throw e;
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.keycloak.models.sessions.infinispan.initializer;
|
|||
import java.util.List;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.models.ClientSessionModel;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
|
@ -33,9 +34,12 @@ public class OfflineUserSessionLoader implements SessionLoader {
|
|||
private static final Logger log = Logger.getLogger(OfflineUserSessionLoader.class);
|
||||
|
||||
@Override
|
||||
public void init(KeycloakSession session, int clusterStartupTime) {
|
||||
public void init(KeycloakSession session) {
|
||||
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
|
||||
|
||||
// TODO: check if update of timestamps in persister can be skipped entirely
|
||||
int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
|
||||
|
||||
log.debugf("Clearing detached sessions from persistent storage and updating timestamps to %d", clusterStartupTime);
|
||||
|
||||
persister.clearDetachedUserSessions();
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.keycloak.models.utils.KeycloakModelUtils;
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class SessionInitializerWorker implements DistributedCallable<String, SessionEntity, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
|
||||
public class SessionInitializerWorker implements DistributedCallable<String, Serializable, InfinispanUserSessionInitializer.WorkerResult>, Serializable {
|
||||
|
||||
private static final Logger log = Logger.getLogger(SessionInitializerWorker.class);
|
||||
|
||||
|
@ -40,7 +40,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
|
|||
private int sessionsPerSegment;
|
||||
private SessionLoader sessionLoader;
|
||||
|
||||
private transient Cache<String, SessionEntity> cache;
|
||||
private transient Cache<String, Serializable> workCache;
|
||||
|
||||
public void setWorkerEnvironment(int segment, int sessionsPerSegment, SessionLoader sessionLoader) {
|
||||
this.segment = segment;
|
||||
|
@ -49,8 +49,8 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setEnvironment(Cache<String, SessionEntity> cache, Set<String> inputKeys) {
|
||||
this.cache = cache;
|
||||
public void setEnvironment(Cache<String, Serializable> workCache, Set<String> inputKeys) {
|
||||
this.workCache = workCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -59,7 +59,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ses
|
|||
log.tracef("Running computation for segment: %d", segment);
|
||||
}
|
||||
|
||||
KeycloakSessionFactory sessionFactory = cache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
|
||||
KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
|
||||
if (sessionFactory == null) {
|
||||
log.warnf("KeycloakSessionFactory not yet set in cache. Worker skipped");
|
||||
return InfinispanUserSessionInitializer.WorkerResult.create(segment, false);
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.keycloak.models.KeycloakSession;
|
|||
*/
|
||||
public interface SessionLoader extends Serializable {
|
||||
|
||||
void init(KeycloakSession session, int clusterStartupTime);
|
||||
void init(KeycloakSession session);
|
||||
|
||||
int getSessionsCount(KeycloakSession session);
|
||||
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
#
|
||||
# Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
# and other contributors as indicated by the @author tags.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
org.keycloak.cluster.infinispan.InfinispanClusterProviderFactory
|
|
@ -15,20 +15,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.sessions.infinispan.initializer;
|
||||
package org.keycloak.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class TimeAwareInitializerState extends InitializerState {
|
||||
|
||||
private int clusterStartupTime;
|
||||
|
||||
public int getClusterStartupTime() {
|
||||
return clusterStartupTime;
|
||||
}
|
||||
|
||||
public void setClusterStartupTime(int clusterStartupTime) {
|
||||
this.clusterStartupTime = clusterStartupTime;
|
||||
}
|
||||
public interface ClusterEvent extends Serializable {
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Task to be executed on all cluster nodes once it's notified.
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public interface ClusterListener {
|
||||
|
||||
/**
|
||||
* Registered task to be executed on all cluster nodes once it's notified from cache.
|
||||
*
|
||||
* @param event value of notification (Object added into the cache)
|
||||
*/
|
||||
void run(ClusterEvent event);
|
||||
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster;
|
||||
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.keycloak.provider.Provider;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public interface ClusterProvider extends Provider {
|
||||
|
||||
/**
|
||||
* Will use startup time of this server in non-cluster environment. Otherwise the value is same for all cluster nodes
|
||||
*/
|
||||
int getClusterStartupTime();
|
||||
|
||||
|
||||
/**
|
||||
* Execute given task just if it's not already in progress (either on this or any other cluster node).
|
||||
*
|
||||
* @param taskKey
|
||||
* @param taskTimeoutInSeconds timeout for given task. If there is existing task in progress for longer time, it's considered outdated so we will start our task.
|
||||
* @param task
|
||||
* @param <T>
|
||||
* @return result with "executed" flag specifying if execution was executed or ignored.
|
||||
*/
|
||||
<T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task);
|
||||
|
||||
|
||||
/**
|
||||
* Register task (listener) under given key. When this key will be put to the cache on any cluster node, the task will be executed
|
||||
*
|
||||
* @param taskKey
|
||||
* @param task
|
||||
*/
|
||||
void registerListener(String taskKey, ClusterListener task);
|
||||
|
||||
|
||||
/**
|
||||
* Notify registered listeners on all cluster nodes
|
||||
*
|
||||
* @param taskKey
|
||||
* @param event
|
||||
*/
|
||||
void notify(String taskKey, ClusterEvent event);
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster;
|
||||
|
||||
import org.keycloak.provider.ProviderFactory;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public interface ClusterProviderFactory extends ProviderFactory<ClusterProvider> {
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster;
|
||||
|
||||
import org.keycloak.provider.Provider;
|
||||
import org.keycloak.provider.ProviderFactory;
|
||||
import org.keycloak.provider.Spi;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ClusterSpi implements Spi {
|
||||
|
||||
@Override
|
||||
public boolean isInternal() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "cluster";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends Provider> getProviderClass() {
|
||||
return ClusterProvider.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends ProviderFactory> getProviderFactoryClass() {
|
||||
return ClusterProviderFactory.class;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ExecutionResult<T> {
|
||||
|
||||
private final boolean executed;
|
||||
private final T result;
|
||||
|
||||
private ExecutionResult(boolean executed, T result) {
|
||||
this.executed = executed;
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
public static <T> ExecutionResult<T> executed(T result) {
|
||||
return new ExecutionResult<>(true, result);
|
||||
}
|
||||
|
||||
public static <T> ExecutionResult<T> notExecuted() {
|
||||
return new ExecutionResult<>(false, null);
|
||||
}
|
||||
|
||||
public boolean isExecuted() {
|
||||
return executed;
|
||||
}
|
||||
|
||||
public T getResult() {
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -112,4 +112,10 @@ public class UserFederationSyncResult {
|
|||
public static UserFederationSyncResult empty() {
|
||||
return new UserFederationSyncResult();
|
||||
}
|
||||
|
||||
public static UserFederationSyncResult ignored() {
|
||||
UserFederationSyncResult result = new UserFederationSyncResult();
|
||||
result.setIgnored(true);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,9 +82,6 @@ public interface UserSessionProvider extends Provider {
|
|||
void removeClientInitialAccessModel(RealmModel realm, String id);
|
||||
List<ClientInitialAccessModel> listClientInitialAccess(RealmModel realm);
|
||||
|
||||
// Will use startup time of this server in non-cluster environment
|
||||
int getClusterStartupTime();
|
||||
|
||||
void close();
|
||||
|
||||
}
|
||||
|
|
|
@ -49,5 +49,6 @@ org.keycloak.authentication.ClientAuthenticatorSpi
|
|||
org.keycloak.authentication.RequiredActionSpi
|
||||
org.keycloak.authentication.FormAuthenticatorSpi
|
||||
org.keycloak.authentication.FormActionSpi
|
||||
org.keycloak.cluster.ClusterSpi
|
||||
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.keycloak.protocol.oidc;
|
||||
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.common.ClientConnection;
|
||||
import org.keycloak.OAuth2Constants;
|
||||
import org.keycloak.OAuthErrorException;
|
||||
|
@ -193,7 +194,7 @@ public class TokenManager {
|
|||
int currentTime = Time.currentTime();
|
||||
|
||||
if (realm.isRevokeRefreshToken()) {
|
||||
int clusterStartupTime = session.sessions().getClusterStartupTime();
|
||||
int clusterStartupTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
|
||||
|
||||
if (refreshToken.getIssuedAt() < validation.clientSession.getTimestamp() && (clusterStartupTime != validation.clientSession.getTimestamp())) {
|
||||
throw new OAuthErrorException(OAuthErrorException.INVALID_GRANT, "Stale token");
|
||||
|
|
|
@ -231,7 +231,7 @@ public class RealmManager implements RealmImporter {
|
|||
// Remove all periodic syncs for configured federation providers
|
||||
UsersSyncManager usersSyncManager = new UsersSyncManager();
|
||||
for (final UserFederationProviderModel fedProvider : federationProviders) {
|
||||
usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), fedProvider);
|
||||
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, true);
|
||||
}
|
||||
}
|
||||
return removed;
|
||||
|
@ -434,7 +434,7 @@ public class RealmManager implements RealmImporter {
|
|||
List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
|
||||
UsersSyncManager usersSyncManager = new UsersSyncManager();
|
||||
for (final UserFederationProviderModel fedProvider : federationProviders) {
|
||||
usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
|
||||
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
|
||||
}
|
||||
return realm;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,10 @@
|
|||
*/
|
||||
package org.keycloak.services.managers;
|
||||
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.cluster.ClusterListener;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.cluster.ExecutionResult;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
|
@ -30,13 +34,17 @@ import org.keycloak.services.ServicesLogger;
|
|||
import org.keycloak.timer.TimerProvider;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class UsersSyncManager {
|
||||
|
||||
private static final String FEDERATION_TASK_KEY = "federation";
|
||||
|
||||
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
|
||||
|
||||
/**
|
||||
|
@ -57,26 +65,106 @@ public class UsersSyncManager {
|
|||
refreshPeriodicSyncForProvider(sessionFactory, timer, fedProvider, realm.getId());
|
||||
}
|
||||
}
|
||||
|
||||
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
|
||||
clusterProvider.registerListener(FEDERATION_TASK_KEY, new UserFederationClusterListener(sessionFactory));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
|
||||
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
|
||||
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
|
||||
return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
|
||||
|
||||
private class Holder {
|
||||
ExecutionResult<UserFederationSyncResult> result;
|
||||
}
|
||||
|
||||
public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, String realmId, final UserFederationProviderModel fedProvider) {
|
||||
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
|
||||
public UserFederationSyncResult syncAllUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
|
||||
final Holder holder = new Holder();
|
||||
|
||||
// See when we did last sync.
|
||||
int oldLastSync = fedProvider.getLastSync();
|
||||
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
|
||||
return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
|
||||
// Ensure not executed concurrently on this or any other cluster node
|
||||
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
|
||||
|
||||
@Override
|
||||
public void run(KeycloakSession session) {
|
||||
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
|
||||
// shared key for "full" and "changed" . Improve if needed
|
||||
String taskKey = fedProvider.getId() + "::sync";
|
||||
|
||||
// 30 seconds minimal timeout for now
|
||||
int timeout = Math.max(30, fedProvider.getFullSyncPeriod());
|
||||
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
|
||||
|
||||
@Override
|
||||
public UserFederationSyncResult call() throws Exception {
|
||||
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
|
||||
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
|
||||
return fedProviderFactory.syncAllUsers(sessionFactory, realmId, fedProvider);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
if (holder.result == null || !holder.result.isExecuted()) {
|
||||
logger.infof("syncAllUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
|
||||
return UserFederationSyncResult.ignored();
|
||||
} else {
|
||||
return holder.result.getResult();
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
|
||||
public UserFederationSyncResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserFederationProviderModel fedProvider) {
|
||||
final Holder holder = new Holder();
|
||||
|
||||
// Ensure not executed concurrently on this or any other cluster node
|
||||
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
|
||||
|
||||
@Override
|
||||
public void run(KeycloakSession session) {
|
||||
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
|
||||
// shared key for "full" and "changed" . Improve if needed
|
||||
String taskKey = fedProvider.getId() + "::sync";
|
||||
|
||||
// 30 seconds minimal timeout for now
|
||||
int timeout = Math.max(30, fedProvider.getChangedSyncPeriod());
|
||||
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<UserFederationSyncResult>() {
|
||||
|
||||
@Override
|
||||
public UserFederationSyncResult call() throws Exception {
|
||||
final UserFederationProviderFactory fedProviderFactory = (UserFederationProviderFactory) sessionFactory.getProviderFactory(UserFederationProvider.class, fedProvider.getProviderName());
|
||||
|
||||
// See when we did last sync.
|
||||
int oldLastSync = fedProvider.getLastSync();
|
||||
updateLastSyncInterval(sessionFactory, fedProvider, realmId);
|
||||
return fedProviderFactory.syncChangedUsers(sessionFactory, realmId, fedProvider, Time.toDate(oldLastSync));
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
if (holder.result == null || !holder.result.isExecuted()) {
|
||||
logger.infof("syncChangedUsers for federation provider %s was ignored as it's already in progress", fedProvider.getDisplayName());
|
||||
return UserFederationSyncResult.ignored();
|
||||
} else {
|
||||
return holder.result.getResult();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Ensure all cluster nodes are notified
|
||||
public void notifyToRefreshPeriodicSync(KeycloakSession session, RealmModel realm, UserFederationProviderModel federationProvider, boolean removed) {
|
||||
FederationProviderClusterEvent event = FederationProviderClusterEvent.createEvent(removed, realm.getId(), federationProvider);
|
||||
session.getProvider(ClusterProvider.class).notify(FEDERATION_TASK_KEY, event);
|
||||
}
|
||||
|
||||
|
||||
// Executed once it receives notification that some UserFederationProvider was created or updated
|
||||
protected void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserFederationProviderModel fedProvider, final String realmId) {
|
||||
logger.infof("Going to refresh periodic sync for provider '%s' . Full sync period: %d , changed users sync period: %d",
|
||||
fedProvider.getDisplayName(), fedProvider.getFullSyncPeriod(), fedProvider.getChangedSyncPeriod());
|
||||
|
||||
if (fedProvider.getFullSyncPeriod() > 0) {
|
||||
// We want periodic full sync for this provider
|
||||
timer.schedule(new Runnable() {
|
||||
|
@ -84,7 +172,12 @@ public class UsersSyncManager {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
syncAllUsers(sessionFactory, realmId, fedProvider);
|
||||
boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
|
||||
if (shouldPerformSync) {
|
||||
syncAllUsers(sessionFactory, realmId, fedProvider);
|
||||
} else {
|
||||
logger.infof("Ignored periodic full sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.errorDuringFullUserSync(t);
|
||||
}
|
||||
|
@ -102,7 +195,12 @@ public class UsersSyncManager {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
syncChangedUsers(sessionFactory, realmId, fedProvider);
|
||||
boolean shouldPerformSync = shouldPerformNewPeriodicSync(fedProvider.getLastSync(), fedProvider.getChangedSyncPeriod());
|
||||
if (shouldPerformSync) {
|
||||
syncChangedUsers(sessionFactory, realmId, fedProvider);
|
||||
} else {
|
||||
logger.infof("Ignored periodic changed-users sync with federation provider %s due small time since last sync", fedProvider.getDisplayName());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.errorDuringChangedUserSync(t);
|
||||
}
|
||||
|
@ -115,7 +213,21 @@ public class UsersSyncManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void removePeriodicSyncForProvider(TimerProvider timer, final UserFederationProviderModel fedProvider) {
|
||||
// Skip syncing if there is short time since last sync time.
|
||||
private boolean shouldPerformNewPeriodicSync(int lastSyncTime, int period) {
|
||||
if (lastSyncTime <= 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
int currentTime = Time.currentTime();
|
||||
int timeSinceLastSync = currentTime - lastSyncTime;
|
||||
|
||||
return (timeSinceLastSync * 2 > period);
|
||||
}
|
||||
|
||||
// Executed once it receives notification that some UserFederationProvider was removed
|
||||
protected void removePeriodicSyncForProvider(TimerProvider timer, UserFederationProviderModel fedProvider) {
|
||||
logger.infof("Removing periodic sync for provider %s", fedProvider.getDisplayName());
|
||||
timer.cancelTask(fedProvider.getId() + "-FULL");
|
||||
timer.cancelTask(fedProvider.getId() + "-CHANGED");
|
||||
}
|
||||
|
@ -144,4 +256,73 @@ public class UsersSyncManager {
|
|||
});
|
||||
}
|
||||
|
||||
|
||||
private class UserFederationClusterListener implements ClusterListener {
|
||||
|
||||
private final KeycloakSessionFactory sessionFactory;
|
||||
|
||||
public UserFederationClusterListener(KeycloakSessionFactory sessionFactory) {
|
||||
this.sessionFactory = sessionFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(ClusterEvent event) {
|
||||
final FederationProviderClusterEvent fedEvent = (FederationProviderClusterEvent) event;
|
||||
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
|
||||
|
||||
@Override
|
||||
public void run(KeycloakSession session) {
|
||||
TimerProvider timer = session.getProvider(TimerProvider.class);
|
||||
if (fedEvent.isRemoved()) {
|
||||
removePeriodicSyncForProvider(timer, fedEvent.getFederationProvider());
|
||||
} else {
|
||||
refreshPeriodicSyncForProvider(sessionFactory, timer, fedEvent.getFederationProvider(), fedEvent.getRealmId());
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
|
||||
public static class FederationProviderClusterEvent implements ClusterEvent {
|
||||
|
||||
private boolean removed;
|
||||
private String realmId;
|
||||
private UserFederationProviderModel federationProvider;
|
||||
|
||||
public boolean isRemoved() {
|
||||
return removed;
|
||||
}
|
||||
|
||||
public void setRemoved(boolean removed) {
|
||||
this.removed = removed;
|
||||
}
|
||||
|
||||
public String getRealmId() {
|
||||
return realmId;
|
||||
}
|
||||
|
||||
public void setRealmId(String realmId) {
|
||||
this.realmId = realmId;
|
||||
}
|
||||
|
||||
public UserFederationProviderModel getFederationProvider() {
|
||||
return federationProvider;
|
||||
}
|
||||
|
||||
public void setFederationProvider(UserFederationProviderModel federationProvider) {
|
||||
this.federationProvider = federationProvider;
|
||||
}
|
||||
|
||||
public static FederationProviderClusterEvent createEvent(boolean removed, String realmId, UserFederationProviderModel fedProvider) {
|
||||
FederationProviderClusterEvent notification = new FederationProviderClusterEvent();
|
||||
notification.setRemoved(removed);
|
||||
notification.setRealmId(realmId);
|
||||
notification.setFederationProvider(fedProvider);
|
||||
return notification;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.keycloak.services.managers.UsersSyncManager;
|
|||
import org.keycloak.services.resources.admin.AdminRoot;
|
||||
import org.keycloak.services.scheduled.ClearExpiredEvents;
|
||||
import org.keycloak.services.scheduled.ClearExpiredUserSessions;
|
||||
import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
|
||||
import org.keycloak.services.scheduled.ScheduledTaskRunner;
|
||||
import org.keycloak.services.util.JsonConfigProvider;
|
||||
import org.keycloak.services.util.ObjectMapperResolver;
|
||||
|
@ -217,8 +218,8 @@ public class KeycloakApplication extends Application {
|
|||
KeycloakSession session = sessionFactory.create();
|
||||
try {
|
||||
TimerProvider timer = session.getProvider(TimerProvider.class);
|
||||
timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredEvents()), interval, "ClearExpiredEvents");
|
||||
timer.schedule(new ScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions()), interval, "ClearExpiredUserSessions");
|
||||
timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredEvents(), interval), interval, "ClearExpiredEvents");
|
||||
timer.schedule(new ClusterAwareScheduledTaskRunner(sessionFactory, new ClearExpiredUserSessions(), interval), interval, "ClearExpiredUserSessions");
|
||||
new UsersSyncManager().bootstrapPeriodic(sessionFactory, timer);
|
||||
} finally {
|
||||
session.close();
|
||||
|
|
|
@ -242,7 +242,7 @@ public class RealmAdminResource {
|
|||
List<UserFederationProviderModel> federationProviders = realm.getUserFederationProviders();
|
||||
UsersSyncManager usersSyncManager = new UsersSyncManager();
|
||||
for (final UserFederationProviderModel fedProvider : federationProviders) {
|
||||
usersSyncManager.refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), fedProvider, realm.getId());
|
||||
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, fedProvider, false);
|
||||
}
|
||||
|
||||
adminEvent.operation(OperationType.UPDATE).representation(rep).success();
|
||||
|
|
|
@ -106,7 +106,7 @@ public class UserFederationProviderResource {
|
|||
UserFederationProviderModel model = new UserFederationProviderModel(rep.getId(), rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
|
||||
rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
|
||||
realm.updateUserFederationProvider(model);
|
||||
new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
|
||||
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
|
||||
boolean kerberosCredsAdded = UserFederationProvidersResource.checkKerberosCredential(session, realm, model);
|
||||
if (kerberosCredsAdded) {
|
||||
logger.addedKerberosToRealmCredentials();
|
||||
|
@ -138,7 +138,7 @@ public class UserFederationProviderResource {
|
|||
auth.requireManage();
|
||||
|
||||
realm.removeUserFederationProvider(this.federationProviderModel);
|
||||
new UsersSyncManager().removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), this.federationProviderModel);
|
||||
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, this.federationProviderModel, true);
|
||||
|
||||
adminEvent.operation(OperationType.DELETE).resourcePath(uriInfo).success();
|
||||
|
||||
|
|
|
@ -178,7 +178,7 @@ public class UserFederationProvidersResource {
|
|||
}
|
||||
UserFederationProviderModel model = realm.addUserFederationProvider(rep.getProviderName(), rep.getConfig(), rep.getPriority(), displayName,
|
||||
rep.getFullSyncPeriod(), rep.getChangedSyncPeriod(), rep.getLastSync());
|
||||
new UsersSyncManager().refreshPeriodicSyncForProvider(session.getKeycloakSessionFactory(), session.getProvider(TimerProvider.class), model, realm.getId());
|
||||
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, model, false);
|
||||
boolean kerberosCredsAdded = checkKerberosCredential(session, realm, model);
|
||||
if (kerberosCredsAdded) {
|
||||
logger.addedKerberosToRealmCredentials();
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.services.scheduled;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.cluster.ExecutionResult;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
|
||||
/**
|
||||
* Ensures that there are not concurrent executions of same task (either on this host or any other cluster host)
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ClusterAwareScheduledTaskRunner extends ScheduledTaskRunner {
|
||||
|
||||
private final int intervalSecs;
|
||||
|
||||
public ClusterAwareScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task, long intervalMillis) {
|
||||
super(sessionFactory, task);
|
||||
this.intervalSecs = (int) (intervalMillis / 1000);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runTask(final KeycloakSession session) {
|
||||
session.getTransaction().begin();
|
||||
|
||||
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
|
||||
String taskKey = task.getClass().getSimpleName();
|
||||
|
||||
ExecutionResult<Void> result = clusterProvider.executeIfNotExecuted(taskKey, intervalSecs, new Callable<Void>() {
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
task.run(session);
|
||||
return null;
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
session.getTransaction().commit();
|
||||
|
||||
if (result.isExecuted()) {
|
||||
logger.debugf("Executed scheduled task %s", taskKey);
|
||||
} else {
|
||||
logger.debugf("Skipped execution of task %s as other cluster node is executing it", taskKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -17,6 +17,10 @@
|
|||
|
||||
package org.keycloak.services.scheduled;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.cluster.ExecutionResult;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.services.ServicesLogger;
|
||||
|
@ -26,10 +30,10 @@ import org.keycloak.services.ServicesLogger;
|
|||
*/
|
||||
public class ScheduledTaskRunner implements Runnable {
|
||||
|
||||
private static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
|
||||
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
|
||||
|
||||
private final KeycloakSessionFactory sessionFactory;
|
||||
private final ScheduledTask task;
|
||||
protected final KeycloakSessionFactory sessionFactory;
|
||||
protected final ScheduledTask task;
|
||||
|
||||
public ScheduledTaskRunner(KeycloakSessionFactory sessionFactory, ScheduledTask task) {
|
||||
this.sessionFactory = sessionFactory;
|
||||
|
@ -40,11 +44,7 @@ public class ScheduledTaskRunner implements Runnable {
|
|||
public void run() {
|
||||
KeycloakSession session = sessionFactory.create();
|
||||
try {
|
||||
session.getTransaction().begin();
|
||||
task.run(session);
|
||||
session.getTransaction().commit();
|
||||
|
||||
logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
|
||||
runTask(session);
|
||||
} catch (Throwable t) {
|
||||
logger.failedToRunScheduledTask(t, task.getClass().getSimpleName());
|
||||
|
||||
|
@ -58,4 +58,12 @@ public class ScheduledTaskRunner implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
protected void runTask(KeycloakSession session) {
|
||||
session.getTransaction().begin();
|
||||
task.run(session);
|
||||
session.getTransaction().commit();
|
||||
|
||||
logger.debug("Executed scheduled task " + task.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.keycloak.testsuite.federation.sync;
|
|||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
|
@ -37,6 +39,17 @@ import org.keycloak.testsuite.DummyUserFederationProviderFactory;
|
|||
*/
|
||||
public class SyncDummyUserFederationProviderFactory extends DummyUserFederationProviderFactory {
|
||||
|
||||
// Used during SyncFederationTest
|
||||
static volatile CountDownLatch latch1 = new CountDownLatch(1);
|
||||
static volatile CountDownLatch latch2 = new CountDownLatch(1);
|
||||
|
||||
static void restartLatches() {
|
||||
latch1 = new CountDownLatch(1);
|
||||
latch2 = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static final Logger logger = Logger.getLogger(SyncDummyUserFederationProviderFactory.class);
|
||||
|
||||
public static final String SYNC_PROVIDER_ID = "sync-dummy";
|
||||
|
@ -68,7 +81,7 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
|
|||
RealmModel realm = session.realms().getRealm(realmId);
|
||||
|
||||
// KEYCLOAK-2412 : Just remove and add some users for testing purposes
|
||||
for (int i = 0; i<10; i++) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String username = "dummyuser-" + i;
|
||||
UserModel user = session.userStorage().getUserByUsername(username, realm);
|
||||
|
||||
|
@ -83,7 +96,7 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
|
|||
|
||||
|
||||
try {
|
||||
Thread.sleep(waitTime * 1000);
|
||||
latch1.await(waitTime * 1000, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Interrupted!", ie);
|
||||
|
@ -94,6 +107,9 @@ public class SyncDummyUserFederationProviderFactory extends DummyUserFederationP
|
|||
|
||||
});
|
||||
|
||||
// countDown, so the SyncFederationTest can continue
|
||||
latch2.countDown();
|
||||
|
||||
return new UserFederationSyncResult();
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.keycloak.testsuite.federation.sync;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -84,7 +85,8 @@ public class SyncFederationTest {
|
|||
sleep(1800);
|
||||
|
||||
// Cancel timer
|
||||
usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
|
||||
RealmModel appRealm = session.realms().getRealmByName("test");
|
||||
usersSyncManager.notifyToRefreshPeriodicSync(session, appRealm, dummyModel, true);
|
||||
|
||||
// Assert that DummyUserFederationProviderFactory.syncChangedUsers was invoked
|
||||
int newChanged = dummyFedFactory.getChangedSyncCounter();
|
||||
|
@ -111,7 +113,9 @@ public class SyncFederationTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test02ConcurrentSync() {
|
||||
public void test02ConcurrentSync() throws Exception {
|
||||
SyncDummyUserFederationProviderFactory.restartLatches();
|
||||
|
||||
// Enable timer for SyncDummyUserFederationProvider
|
||||
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
|
||||
|
||||
|
@ -139,11 +143,17 @@ public class SyncFederationTest {
|
|||
Assert.assertTrue(syncResult.isIgnored());
|
||||
|
||||
// Cancel timer
|
||||
usersSyncManager.removePeriodicSyncForProvider(session.getProvider(TimerProvider.class), dummyModel);
|
||||
usersSyncManager.notifyToRefreshPeriodicSync(session, realm, dummyModel, true);
|
||||
|
||||
// Signal to factory to finish waiting
|
||||
SyncDummyUserFederationProviderFactory.latch1.countDown();
|
||||
|
||||
} finally {
|
||||
keycloakRule.stopSession(session, true);
|
||||
}
|
||||
|
||||
SyncDummyUserFederationProviderFactory.latch2.await(20000, TimeUnit.MILLISECONDS);
|
||||
|
||||
// remove provider
|
||||
keycloakRule.update(new KeycloakRule.KeycloakSetup() {
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.ClientModel;
|
||||
import org.keycloak.models.ClientSessionModel;
|
||||
|
@ -83,7 +84,7 @@ public class UserSessionInitializerTest {
|
|||
|
||||
// Create and persist offline sessions
|
||||
int started = Time.currentTime();
|
||||
int serverStartTime = session.sessions().getClusterStartupTime();
|
||||
int serverStartTime = session.getProvider(ClusterProvider.class).getClusterStartupTime();
|
||||
|
||||
for (UserSessionModel origSession : origSessions) {
|
||||
UserSessionModel userSession = session.sessions().getUserSession(realm, origSession.getId());
|
||||
|
|
|
@ -47,10 +47,11 @@ public class SyncDummyFederationProviderCommand extends AbstractCommand {
|
|||
} else {
|
||||
Map<String, String> cfg = fedProviderModel.getConfig();
|
||||
updateConfig(cfg, waitTime);
|
||||
fedProviderModel.setChangedSyncPeriod(changedSyncPeriod);
|
||||
realm.updateUserFederationProvider(fedProviderModel);
|
||||
}
|
||||
|
||||
new UsersSyncManager().refreshPeriodicSyncForProvider(sessionFactory, session.getProvider(TimerProvider.class), fedProviderModel, "master");
|
||||
new UsersSyncManager().notifyToRefreshPeriodicSync(session, realm, fedProviderModel, false);
|
||||
|
||||
log.infof("User federation provider created and sync was started", waitTime);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue