External Infinispan as cache - Part 1

Part 1 includes

* New experimental feature to enable the new code
* New providers using RemoteCache only
* New test profile to run the tests with the experimental feature

New providers' implementation for:
* InfinispanConnectionProvider
* AuthenticationSessionProvider
* ClusterProvider

Closes #28140

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-04-15 16:42:06 +01:00 committed by Alexander Schwartz
parent 2b35b4430c
commit d2ae27a1e2
39 changed files with 1502 additions and 159 deletions

View file

@ -100,6 +100,7 @@ public class Profile {
TRANSIENT_USERS("Transient users for brokering", Type.EXPERIMENTAL),
MULTI_SITE("Multi-site support", Type.DISABLED_BY_DEFAULT),
REMOTE_CACHE("Remote caches support. Requires Multi-site support to be enabled as well.", Type.EXPERIMENTAL),
CLIENT_TYPES("Client Types", Type.EXPERIMENTAL),

View file

@ -29,6 +29,7 @@ import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.DefaultInfinispanConnectionProviderFactory;
@ -187,6 +188,11 @@ public class InfinispanClusterProviderFactory implements ClusterProviderFactory
return PROVIDER_ID;
}
@Override
public boolean isSupported(Config.Scope config) {
return !Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
}
@Listener
public class ViewChangeListener {

View file

@ -26,7 +26,7 @@ import org.jboss.logging.Logger;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
class TaskCallback {
public class TaskCallback {
protected static final Logger logger = Logger.getLogger(TaskCallback.class);

View file

@ -80,7 +80,7 @@ public class WrapperClusterEvent implements ClusterEvent {
return eventKey;
}
Collection<? extends ClusterEvent> getDelegateEvents() {
public Collection<? extends ClusterEvent> getDelegateEvents() {
return events;
}

View file

@ -0,0 +1,150 @@
package org.keycloak.cluster.infinispan.remote;
import org.infinispan.client.hotrod.RemoteCache;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.cluster.infinispan.LockEntry;
import org.keycloak.cluster.infinispan.TaskCallback;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.keycloak.cluster.infinispan.InfinispanClusterProvider.TASK_KEY_PREFIX;
import static org.keycloak.cluster.infinispan.remote.RemoteInfinispanClusterProviderFactory.putIfAbsentWithRetries;
public class RemoteInfinispanClusterProvider implements ClusterProvider {
private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final int clusterStartupTime;
private final RemoteCache<String, LockEntry> cache;
private final RemoteInfinispanNotificationManager notificationManager;
private final Executor executor;
public RemoteInfinispanClusterProvider(int clusterStartupTime, RemoteCache<String, LockEntry> cache, RemoteInfinispanNotificationManager notificationManager, Executor executor) {
this.clusterStartupTime = clusterStartupTime;
this.cache = Objects.requireNonNull(cache);
this.notificationManager = Objects.requireNonNull(notificationManager);
this.executor = Objects.requireNonNull(executor);
}
@Override
public int getClusterStartupTime() {
return clusterStartupTime;
}
@Override
public <T> ExecutionResult<T> executeIfNotExecuted(String taskKey, int taskTimeoutInSeconds, Callable<T> task) {
String cacheKey = TASK_KEY_PREFIX + taskKey;
boolean locked = tryLock(cacheKey, taskTimeoutInSeconds);
if (locked) {
try {
try {
T result = task.call();
return ExecutionResult.executed(result);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException("Unexpected exception when executed task " + taskKey, e);
}
} finally {
removeFromCache(cacheKey);
}
} else {
return ExecutionResult.notExecuted();
}
}
@Override
public Future<Boolean> executeIfNotExecutedAsync(String taskKey, int taskTimeoutInSeconds, Callable task) {
TaskCallback newCallback = new TaskCallback();
TaskCallback callback = notificationManager.registerTaskCallback(TASK_KEY_PREFIX + taskKey, newCallback);
// We successfully submitted our task
if (newCallback == callback) {
Supplier<Boolean> wrappedTask = () -> {
boolean executed = executeIfNotExecuted(taskKey, taskTimeoutInSeconds, task).isExecuted();
if (!executed) {
logger.infof("Task already in progress on other cluster node. Will wait until it's finished");
}
try {
callback.getTaskCompletedLatch().await(taskTimeoutInSeconds, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return callback.isSuccess();
};
callback.setFuture(CompletableFuture.supplyAsync(wrappedTask, executor));
} else {
logger.infof("Task already in progress on this cluster node. Will wait until it's finished");
}
return callback.getFuture();
}
@Override
public void registerListener(String taskKey, ClusterListener task) {
notificationManager.registerListener(taskKey, task);
}
@Override
public void notify(String taskKey, ClusterEvent event, boolean ignoreSender, DCNotify dcNotify) {
notificationManager.notify(taskKey, Collections.singleton(event), ignoreSender, dcNotify);
}
@Override
public void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, DCNotify dcNotify) {
notificationManager.notify(taskKey, events, ignoreSender, dcNotify);
}
@Override
public void close() {
}
private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {
LockEntry myLock = createLockEntry();
LockEntry existingLock = putIfAbsentWithRetries(cache, cacheKey, myLock, taskTimeoutInSeconds);
if (existingLock != null) {
if (logger.isTraceEnabled()) {
logger.tracef("Task %s in progress already by node %s. Ignoring task.", cacheKey, existingLock.node());
}
return false;
} else {
if (logger.isTraceEnabled()) {
logger.tracef("Successfully acquired lock for task %s. Our node is %s", cacheKey, myLock.node());
}
return true;
}
}
private LockEntry createLockEntry() {
return new LockEntry(notificationManager.getMyNodeName());
}
private void removeFromCache(String cacheKey) {
// More attempts to send the message (it may fail if some node fails in the meantime)
Retry.executeWithBackoff((int iteration) -> {
cache.remove(cacheKey);
if (logger.isTraceEnabled()) {
logger.tracef("Task %s removed from the cache", cacheKey);
}
}, 10, 10);
}
}

View file

@ -0,0 +1,119 @@
package org.keycloak.cluster.infinispan.remote;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.commons.util.ByRef;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ClusterProviderFactory;
import org.keycloak.cluster.infinispan.InfinispanClusterProvider;
import org.keycloak.cluster.infinispan.LockEntry;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Retry;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.TopologyInfo;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.WORK_CACHE_NAME;
public class RemoteInfinispanClusterProviderFactory implements ClusterProviderFactory {
public static final String PROVIDER_ID = "remote-infinispan";
private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private RemoteCache<String, LockEntry> workCache;
private int clusterStartupTime;
private RemoteInfinispanNotificationManager notificationManager;
private Executor executor;
@Override
public ClusterProvider create(KeycloakSession session) {
assert workCache != null;
assert notificationManager != null;
assert executor != null;
return new RemoteInfinispanClusterProvider(clusterStartupTime, workCache, notificationManager, executor);
}
@Override
public void init(Config.Scope config) {
}
@Override
public synchronized void postInit(KeycloakSessionFactory factory) {
try (var session = factory.create()) {
var ispnProvider = session.getProvider(InfinispanConnectionProvider.class);
executor = ispnProvider.getExecutor("cluster-provider");
workCache = ispnProvider.getRemoteCache(WORK_CACHE_NAME);
clusterStartupTime = initClusterStartupTime(ispnProvider.getRemoteCache(WORK_CACHE_NAME), (int) (factory.getServerStartupTimestamp() / 1000));
notificationManager = new RemoteInfinispanNotificationManager(executor, ispnProvider.getRemoteCache(WORK_CACHE_NAME), getTopologyInfo(factory));
notificationManager.addClientListener();
logger.debugf("Provider initialized. Cluster startup time: %s", Time.toDate(clusterStartupTime));
}
}
@Override
public synchronized void close() {
logger.debug("Closing provider");
if (notificationManager != null) {
notificationManager.removeClientListener();
notificationManager = null;
}
// executor is managed by Infinispan, do not shutdown.
executor = null;
workCache = null;
}
@Override
public String getId() {
return PROVIDER_ID;
}
@Override
public boolean isSupported(Config.Scope config) {
return Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
}
private static TopologyInfo getTopologyInfo(KeycloakSessionFactory factory) {
try (var session = factory.create()) {
return session.getProvider(InfinispanConnectionProvider.class).getTopologyInfo();
}
}
private static int initClusterStartupTime(RemoteCache<String, Integer> cache, int serverStartupTime) {
Integer clusterStartupTime = putIfAbsentWithRetries(cache, InfinispanClusterProvider.CLUSTER_STARTUP_TIME_KEY, serverStartupTime, -1);
return clusterStartupTime == null ? serverStartupTime : clusterStartupTime;
}
static <V> V putIfAbsentWithRetries(RemoteCache<String, V> workCache, String key, V value, int taskTimeoutInSeconds) {
ByRef<V> ref = new ByRef<>(null);
Retry.executeWithBackoff((int iteration) -> {
try {
if (taskTimeoutInSeconds > 0) {
ref.set(workCache.putIfAbsent(key, value, taskTimeoutInSeconds, TimeUnit.SECONDS));
} else {
ref.set(workCache.putIfAbsent(key, value));
}
} catch (HotRodClientException re) {
logger.warnf(re, "Failed to write key '%s' and value '%s' in iteration '%d' . Retrying", key, value, iteration);
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}
}, 10, 10);
return ref.get();
}
}

View file

@ -0,0 +1,164 @@
package org.keycloak.cluster.infinispan.remote;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.jboss.logging.Logger;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider.DCNotify;
import org.keycloak.cluster.infinispan.TaskCallback;
import org.keycloak.cluster.infinispan.WrapperClusterEvent;
import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
import org.keycloak.common.util.Retry;
import org.keycloak.connections.infinispan.TopologyInfo;
import static org.keycloak.cluster.infinispan.InfinispanClusterProvider.TASK_KEY_PREFIX;
@ClientListener
public class RemoteInfinispanNotificationManager {
private static final Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private final ConcurrentMap<String, TaskCallback> taskCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMultivaluedHashMap<String, ClusterListener> listeners = new ConcurrentMultivaluedHashMap<>();
private final Executor executor;
private final RemoteCache<String, Object> workCache;
private final TopologyInfo topologyInfo;
public RemoteInfinispanNotificationManager(Executor executor, RemoteCache<String, Object> workCache, TopologyInfo topologyInfo) {
this.executor = executor;
this.workCache = workCache;
this.topologyInfo = topologyInfo;
}
public void addClientListener() {
workCache.addClientListener(this);
}
public void removeClientListener() {
// workaround because providers are independent and close() can be invoked in any order.
if (workCache.getRemoteCacheContainer().isStarted()) {
workCache.removeClientListener(this);
}
}
public void registerListener(String taskKey, ClusterListener task) {
listeners.add(taskKey, task);
}
public TaskCallback registerTaskCallback(String taskKey, TaskCallback callback) {
var existing = taskCallbacks.putIfAbsent(taskKey, callback);
return existing != null ? existing : callback;
}
public void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, DCNotify dcNotify) {
var wrappedEvent = WrapperClusterEvent.wrap(taskKey, events, topologyInfo.getMyNodeName(), topologyInfo.getMySiteName(), dcNotify, ignoreSender);
var eventKey = UUID.randomUUID().toString();
if (logger.isTraceEnabled()) {
logger.tracef("Sending event with key %s: %s", eventKey, events);
}
Retry.executeWithBackoff((int iteration) -> {
try {
workCache.put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
} catch (HotRodClientException re) {
if (logger.isDebugEnabled()) {
logger.debugf(re, "Failed sending notification to remote cache '%s'. Key: '%s', iteration '%s'. Will try to retry the task",
workCache.getName(), eventKey, iteration);
}
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
throw re;
}
}, 10, 10);
}
public String getMyNodeName() {
return topologyInfo.getMyNodeName();
}
@ClientCacheEntryCreated
public void created(ClientCacheEntryCreatedEvent<String> event) {
String key = event.getKey();
hotrodEventReceived(key);
}
@ClientCacheEntryModified
public void updated(ClientCacheEntryModifiedEvent<String> event) {
String key = event.getKey();
hotrodEventReceived(key);
}
@ClientCacheEntryRemoved
public void removed(ClientCacheEntryRemovedEvent<String> event) {
String key = event.getKey();
taskFinished(key);
}
private void hotrodEventReceived(String key) {
// TODO [pruivo] cache event converter may work here with protostream
workCache.getAsync(key).thenAcceptAsync(value -> eventReceived(key, value), executor);
}
private void eventReceived(String key, Object obj) {
if (!(obj instanceof WrapperClusterEvent event)) {
// Items with the TASK_KEY_PREFIX might be gone fast once the locking is complete, therefore, don't log them.
// It is still good to have the warning in case of real events return null because they have been, for example, expired
if (obj == null && !key.startsWith(TASK_KEY_PREFIX)) {
logger.warnf("Event object wasn't available in remote cache after event was received. Event key: %s", key);
}
return;
}
if (event.rejectEvent(topologyInfo.getMyNodeName(), topologyInfo.getMySiteName())) {
return;
}
String eventKey = event.getEventKey();
if (logger.isTraceEnabled()) {
logger.tracef("Received event: %s", event);
}
List<ClusterListener> myListeners = listeners.get(eventKey);
if (myListeners != null) {
for (var e : event.getDelegateEvents()) {
myListeners.forEach(e);
}
}
}
private void taskFinished(String taskKey) {
TaskCallback callback = taskCallbacks.remove(taskKey);
if (callback == null) {
return;
}
if (logger.isDebugEnabled()) {
logger.debugf("Finished task '%s' with '%b'", taskKey, true);
}
callback.setSuccess(true);
callback.getTaskCompletedLatch().countDown();
}
}

View file

@ -18,13 +18,16 @@
package org.keycloak.connections.infinispan;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.util.concurrent.BlockingManager;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
@ -77,6 +80,11 @@ public class DefaultInfinispanConnectionProvider implements InfinispanConnection
return stage.freeze();
}
@Override
public Executor getExecutor(String name) {
return GlobalComponentRegistry.componentOf(cacheManager, BlockingManager.class).asExecutor(name);
}
@Override
public void close() {
}

View file

@ -26,6 +26,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.dataconversion.MediaType;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
@ -43,6 +44,9 @@ import org.keycloak.Config;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ManagedCacheManagerProvider;
import org.keycloak.common.Profile;
import org.keycloak.connections.infinispan.remote.RemoteInfinispanConnectionProvider;
import org.keycloak.marshalling.KeycloakModelSchema;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
@ -60,6 +64,7 @@ import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.A
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_REVISIONS_CACHE_DEFAULT_MAX;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHORIZATION_REVISIONS_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.DISTRIBUTED_REPLICATED_CACHE_NAMES;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.LOGIN_FAILURE_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
@ -96,10 +101,16 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
private volatile TopologyInfo topologyInfo;
private volatile RemoteCacheManager remoteCacheManager;
@Override
public InfinispanConnectionProvider create(KeycloakSession session) {
lazyInit();
if (Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
return new RemoteInfinispanConnectionProvider(cacheManager, remoteCacheManager, topologyInfo);
}
return new DefaultInfinispanConnectionProvider(cacheManager, remoteCacheProvider, topologyInfo);
}
@ -141,6 +152,7 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
@Override
public void close() {
logger.debug("Closing provider");
runWithWriteLockOnCacheManager(() -> {
if (cacheManager != null && !containerManaged) {
cacheManager.stop();
@ -148,6 +160,9 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
if (remoteCacheProvider != null) {
remoteCacheProvider.stop();
}
if (remoteCacheManager != null && !containerManaged) {
remoteCacheManager.stop();
}
});
}
@ -175,6 +190,7 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
synchronized (this) {
if (cacheManager == null) {
EmbeddedCacheManager managedCacheManager = null;
RemoteCacheManager rcm = null;
Iterator<ManagedCacheManagerProvider> providers = ServiceLoader.load(ManagedCacheManagerProvider.class, DefaultInfinispanConnectionProvider.class.getClassLoader())
.iterator();
@ -186,6 +202,9 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
}
managedCacheManager = provider.getEmbeddedCacheManager(config);
if (Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
rcm = provider.getRemoteCacheManager(config);
}
}
// store it in a locale variable first, so it is not visible to the outside, yet
@ -195,6 +214,9 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
throw new RuntimeException("No " + ManagedCacheManagerProvider.class.getName() + " found. If running in embedded mode set the [embedded] property to this provider.");
}
localCacheManager = initEmbedded();
if (Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
rcm = initRemote();
}
} else {
localCacheManager = initContainerManaged(managedCacheManager);
}
@ -204,11 +226,30 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
remoteCacheProvider = new RemoteCacheProvider(config, localCacheManager);
// only set the cache manager attribute at the very end to avoid passing a half-initialized entry callers
cacheManager = localCacheManager;
remoteCacheManager = rcm;
}
}
}
}
private RemoteCacheManager initRemote() {
var host = config.get("remoteStoreHost", "127.0.0.1");
var port = config.getInt("remoteStorePort", 11222);
org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
builder.addServer().host(host).port(port);
builder.connectionPool().maxActive(16).exhaustedAction(org.infinispan.client.hotrod.configuration.ExhaustedAction.CREATE_NEW);
Marshalling.configure(builder);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build());
// establish connection to all caches
DISTRIBUTED_REPLICATED_CACHE_NAMES.forEach(remoteCacheManager::getCache);
return remoteCacheManager;
}
protected EmbeddedCacheManager initContainerManaged(EmbeddedCacheManager cacheManager) {
containerManaged = true;
@ -216,9 +257,7 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
defineRevisionCache(cacheManager, USER_CACHE_NAME, USER_REVISIONS_CACHE_NAME, USER_REVISIONS_CACHE_DEFAULT_MAX);
defineRevisionCache(cacheManager, AUTHORIZATION_CACHE_NAME, AUTHORIZATION_REVISIONS_CACHE_NAME, AUTHORIZATION_REVISIONS_CACHE_DEFAULT_MAX);
cacheManager.getCache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME, true);
cacheManager.getCache(InfinispanConnectionProvider.KEYS_CACHE_NAME, true);
cacheManager.getCache(InfinispanConnectionProvider.ACTION_TOKEN_CACHE, true);
this.topologyInfo = new TopologyInfo(cacheManager, config, false, getId());
@ -310,15 +349,18 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
}
defineClusteredCache(cacheManager, ACTION_TOKEN_CACHE, actionTokenBuilder.build());
defineClusteredCache(cacheManager, AUTHENTICATION_SESSIONS_CACHE_NAME, clusteredConfiguration);
var workBuilder = createCacheConfigurationBuilder()
.expiration().enableReaper().wakeUpInterval(15, TimeUnit.SECONDS);
if (clustered) {
workBuilder.simpleCache(false);
workBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
if (!Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
defineClusteredCache(cacheManager, AUTHENTICATION_SESSIONS_CACHE_NAME, clusteredConfiguration);
var workBuilder = createCacheConfigurationBuilder()
.expiration().enableReaper().wakeUpInterval(15, TimeUnit.SECONDS);
if (clustered) {
workBuilder.simpleCache(false);
workBuilder.clustering().cacheMode(async ? CacheMode.REPL_ASYNC : CacheMode.REPL_SYNC);
}
defineClusteredCache(cacheManager, WORK_CACHE_NAME, builder.build());
}
defineClusteredCache(cacheManager, WORK_CACHE_NAME, builder.build());
return cacheManager;
}

View file

@ -19,9 +19,11 @@ package org.keycloak.connections.infinispan;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.Provider;
/**
@ -51,7 +53,7 @@ public interface InfinispanConnectionProvider extends Provider {
String ACTION_TOKEN_CACHE = "actionTokens";
int ACTION_TOKEN_CACHE_DEFAULT_MAX = -1;
int ACTION_TOKEN_MAX_IDLE_SECONDS = -1;
long ACTION_TOKEN_WAKE_UP_INTERVAL_SECONDS = 5 * 60 * 1000l;
long ACTION_TOKEN_WAKE_UP_INTERVAL_SECONDS = 5 * 60 * 1000L;
String KEYS_CACHE_NAME = "keys";
int KEYS_CACHE_DEFAULT_MAX = 1000;
@ -137,4 +139,26 @@ public interface InfinispanConnectionProvider extends Provider {
*/
CompletionStage<Void> migrateToProtostream();
/**
* Returns an executor that will run the given tasks on a blocking thread as required.
* <p>
* The Infinispan block {@link Executor} is used to execute blocking operation, like I/O.
* If Virtual Threads are enabled, this will be an executor with Virtual Threads.
*
* @param name The name for trace logging purpose.
* @return The Infinispan blocking {@link Executor}.
*/
Executor getExecutor(String name);
/**
* Syntactic sugar to get a {@link RemoteCache}.
*
* @see InfinispanConnectionProvider#getRemoteCache(String)
*/
static <K, V> RemoteCache<K, V> getRemoteCache(KeycloakSessionFactory factory, String cacheName) {
try (var session = factory.create()) {
return session.getProvider(InfinispanConnectionProvider.class).getRemoteCache(cacheName);
}
}
}

View file

@ -0,0 +1,63 @@
package org.keycloak.connections.infinispan.remote;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import org.infinispan.Cache;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.concurrent.BlockingManager;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.TopologyInfo;
public record RemoteInfinispanConnectionProvider(EmbeddedCacheManager embeddedCacheManager,
RemoteCacheManager remoteCacheManager,
TopologyInfo topologyInfo) implements InfinispanConnectionProvider {
public RemoteInfinispanConnectionProvider(EmbeddedCacheManager embeddedCacheManager, RemoteCacheManager remoteCacheManager, TopologyInfo topologyInfo) {
this.embeddedCacheManager = Objects.requireNonNull(embeddedCacheManager);
this.remoteCacheManager = Objects.requireNonNull(remoteCacheManager);
this.topologyInfo = Objects.requireNonNull(topologyInfo);
}
@Override
public <K, V> Cache<K, V> getCache(String name, boolean createIfAbsent) {
return embeddedCacheManager.getCache(name, createIfAbsent);
}
@Override
public <K, V> RemoteCache<K, V> getRemoteCache(String name) {
return remoteCacheManager.getCache(name);
}
@Override
public TopologyInfo getTopologyInfo() {
return topologyInfo;
}
@Override
public CompletionStage<Void> migrateToProtostream() {
// Only the CacheStore (persistence) stores data in binary format and needs to be deleted.
// We assume rolling-upgrade between KC 25 and KC 26 is not available, in other words, KC 25 and KC 26 servers are not present in the same cluster.
var stage = CompletionStages.aggregateCompletionStage();
DISTRIBUTED_REPLICATED_CACHE_NAMES.stream()
.map(this::getRemoteCache)
.map(RemoteCache::clearAsync)
.forEach(stage::dependsOn);
return stage.freeze();
}
@Override
public Executor getExecutor(String name) {
return GlobalComponentRegistry.componentOf(embeddedCacheManager, BlockingManager.class).asExecutor(name);
}
@Override
public void close() {
//no-op
}
}

View file

@ -17,6 +17,7 @@
package org.keycloak.marshalling;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
/**
@ -151,6 +152,10 @@ public final class Marshalling {
.addContextInitializer(KeycloakModelSchema.INSTANCE);
}
public static void configure(ConfigurationBuilder builder) {
builder.addContextInitializer(KeycloakModelSchema.INSTANCE);
}
public static String emptyStringToNull(String value) {
return value == null || value.isEmpty() ? null : value;

View file

@ -19,20 +19,21 @@ package org.keycloak.models.sessions.infinispan;
import org.keycloak.common.Profile;
import org.keycloak.common.Profile.Feature;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.sessions.infinispan.entities.AuthenticationSessionEntity;
import org.keycloak.models.light.LightweightUserAdapter;
import org.keycloak.models.sessions.infinispan.entities.AuthenticationSessionEntity;
import org.keycloak.sessions.AuthenticationSessionModel;
import org.keycloak.sessions.RootAuthenticationSessionModel;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.keycloak.models.Constants.SESSION_NOTE_LIGHTWEIGHT_USER;
import static org.keycloak.models.light.LightweightUserAdapter.isLightweightUser;
@ -46,7 +47,7 @@ public class AuthenticationSessionAdapter implements AuthenticationSessionModel
private final KeycloakSession session;
private final RootAuthenticationSessionAdapter parent;
private final String tabId;
private AuthenticationSessionEntity entity;
private final AuthenticationSessionEntity entity;
public AuthenticationSessionAdapter(KeycloakSession session, RootAuthenticationSessionAdapter parent, String tabId, AuthenticationSessionEntity entity) {
this.session = session;
@ -105,7 +106,9 @@ public class AuthenticationSessionAdapter implements AuthenticationSessionModel
@Override
public Set<String> getClientScopes() {
if (entity.getClientScopes() == null || entity.getClientScopes().isEmpty()) return Collections.emptySet();
if (entity.getClientScopes() == null || entity.getClientScopes().isEmpty()) {
return Collections.emptySet();
}
return new HashSet<>(entity.getClientScopes());
}
@ -156,10 +159,10 @@ public class AuthenticationSessionAdapter implements AuthenticationSessionModel
@Override
public Map<String, String> getClientNotes() {
if (entity.getClientNotes() == null || entity.getClientNotes().isEmpty()) return Collections.emptyMap();
Map<String, String> copy = new ConcurrentHashMap<>();
copy.putAll(entity.getClientNotes());
return copy;
if (entity.getClientNotes() == null || entity.getClientNotes().isEmpty()) {
return Collections.emptyMap();
}
return new ConcurrentHashMap<>(entity.getClientNotes());
}
@Override
@ -221,11 +224,9 @@ public class AuthenticationSessionAdapter implements AuthenticationSessionModel
@Override
public Map<String, String> getUserSessionNotes() {
if (entity.getUserSessionNotes() == null) {
return Collections.EMPTY_MAP;
return Collections.emptyMap();
}
ConcurrentHashMap<String, String> copy = new ConcurrentHashMap<>();
copy.putAll(entity.getUserSessionNotes());
return copy;
return new ConcurrentHashMap<>(entity.getUserSessionNotes());
}
@Override
@ -237,9 +238,7 @@ public class AuthenticationSessionAdapter implements AuthenticationSessionModel
@Override
public Set<String> getRequiredActions() {
Set<String> copy = new HashSet<>();
copy.addAll(entity.getRequiredActions());
return copy;
return new HashSet<>(entity.getRequiredActions());
}
@Override
@ -335,11 +334,8 @@ public class AuthenticationSessionAdapter implements AuthenticationSessionModel
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !(o instanceof AuthenticationSessionModel)) return false;
return this == o || o instanceof AuthenticationSessionModel that && that.getTabId().equals(getTabId());
AuthenticationSessionModel that = (AuthenticationSessionModel) o;
return that.getTabId().equals(getTabId());
}
@Override

View file

@ -17,14 +17,8 @@
package org.keycloak.models.sessions.infinispan;
import org.keycloak.cluster.ClusterProvider;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.keycloak.common.util.Base64Url;
import org.keycloak.common.util.SecretGenerator;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
@ -40,6 +34,10 @@ import org.keycloak.sessions.AuthenticationSessionCompoundId;
import org.keycloak.sessions.AuthenticationSessionProvider;
import org.keycloak.sessions.RootAuthenticationSessionModel;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -87,7 +85,7 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
private RootAuthenticationSessionAdapter wrap(RealmModel realm, RootAuthenticationSessionEntity entity) {
return entity==null ? null : new RootAuthenticationSessionAdapter(session, this, cache, realm, entity, authSessionsLimit);
return entity == null ? null : new RootAuthenticationSessionAdapter(session, new RootAuthenticationSessionUpdater(entity, this, realm), realm, authSessionsLimit);
}
@ -176,8 +174,24 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
return cache;
}
private record RootAuthenticationSessionUpdater(RootAuthenticationSessionEntity entity,
InfinispanAuthenticationSessionProvider provider,
RealmModel realm) implements SessionEntityUpdater<RootAuthenticationSessionEntity> {
protected String generateTabId() {
return Base64Url.encode(SecretGenerator.getInstance().randomBytes(8));
@Override
public RootAuthenticationSessionEntity getEntity() {
return entity;
}
@Override
public void onEntityUpdated() {
int expirationSeconds = entity.getTimestamp() - Time.currentTime() + SessionExpiration.getAuthSessionLifespan(realm);
provider.tx.replace(provider.cache, entity.getId(), entity, expirationSeconds, TimeUnit.SECONDS);
}
@Override
public void onEntityRemoved() {
provider.tx.remove(provider.cache, entity.getId());
}
}
}

View file

@ -18,9 +18,11 @@
package org.keycloak.models.sessions.infinispan;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.Profile;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
@ -36,18 +38,17 @@ import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.provider.ProviderEvent;
import org.keycloak.provider.ProviderEventListener;
import org.keycloak.sessions.AuthenticationSessionProvider;
import org.keycloak.sessions.AuthenticationSessionProviderFactory;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.logging.Logger;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class InfinispanAuthenticationSessionProviderFactory implements AuthenticationSessionProviderFactory {
public class InfinispanAuthenticationSessionProviderFactory implements AuthenticationSessionProviderFactory<InfinispanAuthenticationSessionProvider> {
private static final Logger log = Logger.getLogger(InfinispanAuthenticationSessionProviderFactory.class);
public static final int PROVIDER_PRIORITY = 1;
@ -70,10 +71,13 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
@Override
public void init(Config.Scope config) {
// get auth sessions limit from config or use default if not provided
int configInt = config.getInt(AUTH_SESSIONS_LIMIT, DEFAULT_AUTH_SESSIONS_LIMIT);
authSessionsLimit = getAuthSessionsLimit(config);
}
public static int getAuthSessionsLimit(Config.Scope config) {
var limit = config.getInt(AUTH_SESSIONS_LIMIT, DEFAULT_AUTH_SESSIONS_LIMIT);
// use default if provided value is not a positive number
authSessionsLimit = (configInt <= 0) ? DEFAULT_AUTH_SESSIONS_LIMIT : configInt;
return limit <= 0 ? DEFAULT_AUTH_SESSIONS_LIMIT : limit;
}
@ -125,17 +129,16 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
@Override
public AuthenticationSessionProvider create(KeycloakSession session) {
public InfinispanAuthenticationSessionProvider create(KeycloakSession session) {
lazyInit(session);
return new InfinispanAuthenticationSessionProvider(session, keyGenerator, authSessionsCache, authSessionsLimit);
}
private void updateAuthNotes(ClusterEvent clEvent) {
if (! (clEvent instanceof AuthenticationSessionAuthNoteUpdateEvent)) {
if (! (clEvent instanceof AuthenticationSessionAuthNoteUpdateEvent event)) {
return;
}
AuthenticationSessionAuthNoteUpdateEvent event = (AuthenticationSessionAuthNoteUpdateEvent) clEvent;
RootAuthenticationSessionEntity authSession = this.authSessionsCache.get(event.getAuthSessionId());
updateAuthSession(authSession, event.getTabId(), event.getAuthNotesFragment());
}
@ -195,4 +198,9 @@ public class InfinispanAuthenticationSessionProviderFactory implements Authentic
public int order() {
return PROVIDER_PRIORITY;
}
@Override
public boolean isSupported(Config.Scope config) {
return !Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
}
}

View file

@ -22,6 +22,7 @@ import org.infinispan.persistence.remote.RemoteStore;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.Profile;
import org.keycloak.common.util.Time;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.connections.infinispan.InfinispanUtil;
@ -93,7 +94,10 @@ public class InfinispanUserLoginFailureProviderFactory implements UserLoginFailu
KeycloakModelUtils.runJobInTransaction(factory, (KeycloakSession session) -> {
checkRemoteCaches(session);
registerClusterListeners(session);
loadLoginFailuresFromRemoteCaches(session);
// TODO [pruivo] to remove: workaround to run the testsuite.
if (!Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
loadLoginFailuresFromRemoteCaches(session);
}
});
} else if (event instanceof UserModel.UserRemovedEvent) {
UserModel.UserRemovedEvent userRemovedEvent = (UserModel.UserRemovedEvent) event;

View file

@ -182,7 +182,10 @@ public class InfinispanUserSessionProviderFactory implements UserSessionProvider
initializeLastSessionRefreshStore(factory);
}
registerClusterListeners(session);
loadSessionsFromRemoteCaches(session);
// TODO [pruivo] to remove: workaround to run the testsuite.
if (!Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) || !Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE)) {
loadSessionsFromRemoteCaches(session);
}
}, preloadTransactionTimeout);

View file

@ -17,24 +17,23 @@
package org.keycloak.models.sessions.infinispan;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Base64Url;
import org.keycloak.common.util.SecretGenerator;
import org.keycloak.common.util.Time;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.entities.AuthenticationSessionEntity;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
import org.keycloak.models.utils.SessionExpiration;
import org.keycloak.sessions.AuthenticationSessionModel;
import org.keycloak.sessions.RootAuthenticationSessionModel;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@ -42,35 +41,29 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
private static final Logger log = Logger.getLogger(RootAuthenticationSessionAdapter.class);
private KeycloakSession session;
private InfinispanAuthenticationSessionProvider provider;
private Cache<String, RootAuthenticationSessionEntity> cache;
private RealmModel realm;
private RootAuthenticationSessionEntity entity;
private final KeycloakSession session;
private final RealmModel realm;
private final int authSessionsLimit;
private static Comparator<Map.Entry<String, AuthenticationSessionEntity>> TIMESTAMP_COMPARATOR =
private final SessionEntityUpdater<RootAuthenticationSessionEntity> updater;
private final static Comparator<Map.Entry<String, AuthenticationSessionEntity>> TIMESTAMP_COMPARATOR =
Comparator.comparingInt(e -> e.getValue().getTimestamp());
public RootAuthenticationSessionAdapter(KeycloakSession session, InfinispanAuthenticationSessionProvider provider,
Cache<String, RootAuthenticationSessionEntity> cache, RealmModel realm,
RootAuthenticationSessionEntity entity, int authSessionsLimt) {
public RootAuthenticationSessionAdapter(KeycloakSession session, SessionEntityUpdater<RootAuthenticationSessionEntity> updater, RealmModel realm,
int authSessionsLimit) {
this.session = session;
this.provider = provider;
this.cache = cache;
this.updater = updater;
this.realm = realm;
this.entity = entity;
this.authSessionsLimit = authSessionsLimt;
this.authSessionsLimit = authSessionsLimit;
}
void update() {
int expirationSeconds = getTimestamp() - Time.currentTime() + SessionExpiration.getAuthSessionLifespan(realm);
provider.tx.replace(cache, entity.getId(), entity, expirationSeconds, TimeUnit.SECONDS);
updater.onEntityUpdated();
}
@Override
public String getId() {
return entity.getId();
return updater.getEntity().getId();
}
@Override
@ -80,12 +73,12 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
@Override
public int getTimestamp() {
return entity.getTimestamp();
return updater.getEntity().getTimestamp();
}
@Override
public void setTimestamp(int timestamp) {
entity.setTimestamp(timestamp);
updater.getEntity().setTimestamp(timestamp);
update();
}
@ -93,7 +86,7 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
public Map<String, AuthenticationSessionModel> getAuthenticationSessions() {
Map<String, AuthenticationSessionModel> result = new HashMap<>();
for (Map.Entry<String, AuthenticationSessionEntity> entry : entity.getAuthenticationSessions().entrySet()) {
for (Map.Entry<String, AuthenticationSessionEntity> entry : updater.getEntity().getAuthenticationSessions().entrySet()) {
String tabId = entry.getKey();
result.put(tabId , new AuthenticationSessionAdapter(session, this, tabId, entry.getValue()));
}
@ -120,7 +113,7 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
public AuthenticationSessionModel createAuthenticationSession(ClientModel client) {
Objects.requireNonNull(client, "client");
Map<String, AuthenticationSessionEntity> authenticationSessions = entity.getAuthenticationSessions();
Map<String, AuthenticationSessionEntity> authenticationSessions = updater.getEntity().getAuthenticationSessions();
if (authenticationSessions.size() >= authSessionsLimit) {
String tabId = authenticationSessions.entrySet().stream().min(TIMESTAMP_COMPARATOR).map(Map.Entry::getKey).orElse(null);
@ -138,11 +131,11 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
int timestamp = Time.currentTime();
authSessionEntity.setTimestamp(timestamp);
String tabId = provider.generateTabId();
String tabId = Base64Url.encode(SecretGenerator.getInstance().randomBytes(8));
authenticationSessions.put(tabId, authSessionEntity);
// Update our timestamp when adding new authenticationSession
entity.setTimestamp(timestamp);
updater.getEntity().setTimestamp(timestamp);
update();
@ -153,12 +146,11 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
@Override
public void removeAuthenticationSessionByTabId(String tabId) {
if (entity.getAuthenticationSessions().remove(tabId) != null) {
if (entity.getAuthenticationSessions().isEmpty()) {
provider.tx.remove(cache, entity.getId());
if (updater.getEntity().getAuthenticationSessions().remove(tabId) != null) {
if (updater.getEntity().getAuthenticationSessions().isEmpty()) {
updater.onEntityRemoved();
} else {
entity.setTimestamp(Time.currentTime());
updater.getEntity().setTimestamp(Time.currentTime());
update();
}
}
@ -166,8 +158,8 @@ public class RootAuthenticationSessionAdapter implements RootAuthenticationSessi
@Override
public void restartSession(RealmModel realm) {
entity.getAuthenticationSessions().clear();
entity.setTimestamp(Time.currentTime());
updater.getEntity().getAuthenticationSessions().clear();
updater.getEntity().setTimestamp(Time.currentTime());
update();
}
}

View file

@ -0,0 +1,29 @@
package org.keycloak.models.sessions.infinispan;
/**
* An updated interface for Infinispan cache.
* <p>
* When the entity is changed, the new entity must be written (or removed) into the Infinispan cache.
* The methods {@link #onEntityUpdated()} and {@link #onEntityRemoved()} signals the entity has changed.
*
* @param <T> The entity type.
*/
public interface SessionEntityUpdater<T> {
/**
* @return The entity tracked by this {@link SessionEntityUpdater}.
* It does not fetch the value from the Infinispan cache and uses a local copy.
*/
T getEntity();
/**
* Signals that the entity was updated, and the Infinispan cache needs to be updated.
*/
void onEntityUpdated();
/**
* Signals that the entity was removed, and the Infinispan cache needs to be updated.
*/
void onEntityRemoved();
}

View file

@ -0,0 +1,135 @@
package org.keycloak.models.sessions.infinispan.remote;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.common.util.Time;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.cache.infinispan.events.AuthenticationSessionAuthNoteUpdateEvent;
import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.RootAuthenticationSessionAdapter;
import org.keycloak.models.sessions.infinispan.SessionEntityUpdater;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.models.utils.SessionExpiration;
import org.keycloak.sessions.AuthenticationSessionCompoundId;
import org.keycloak.sessions.AuthenticationSessionProvider;
import org.keycloak.sessions.RootAuthenticationSessionModel;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class RemoteInfinispanAuthenticationSessionProvider implements AuthenticationSessionProvider {
private final KeycloakSession session;
private final RemoteInfinispanKeycloakTransaction<String, RootAuthenticationSessionEntity> transaction;
private final int authSessionsLimit;
public RemoteInfinispanAuthenticationSessionProvider(KeycloakSession session, RemoteInfinispanAuthenticationSessionProviderFactory factory) {
this.session = Objects.requireNonNull(session);
authSessionsLimit = Objects.requireNonNull(factory).getAuthSessionsLimit();
transaction = new RemoteInfinispanKeycloakTransaction<>(factory.getCache());
session.getTransactionManager().enlistAfterCompletion(transaction);
}
@Override
public void close() {
}
@Override
public RootAuthenticationSessionModel createRootAuthenticationSession(RealmModel realm) {
return createRootAuthenticationSession(realm, KeycloakModelUtils.generateId());
}
@Override
public RootAuthenticationSessionModel createRootAuthenticationSession(RealmModel realm, String id) {
RootAuthenticationSessionEntity entity = new RootAuthenticationSessionEntity(id);
entity.setRealmId(realm.getId());
entity.setTimestamp(Time.currentTime());
int expirationSeconds = SessionExpiration.getAuthSessionLifespan(realm);
transaction.put(id, entity, expirationSeconds, TimeUnit.SECONDS);
return wrap(realm, entity);
}
@Override
public RootAuthenticationSessionModel getRootAuthenticationSession(RealmModel realm, String authenticationSessionId) {
return wrap(realm, transaction.get(authenticationSessionId));
}
@Override
public void removeRootAuthenticationSession(RealmModel realm, RootAuthenticationSessionModel authenticationSession) {
transaction.remove(authenticationSession.getId());
}
@Override
public void removeAllExpired() {
// Rely on expiration of cache entries provided by infinispan. Nothing needed here.
}
@Override
public void removeExpired(RealmModel realm) {
// Rely on expiration of cache entries provided by infinispan. Nothing needed here.
}
@Override
public void onRealmRemoved(RealmModel realm) {
// TODO [pruivo] [optimization] with protostream, use delete by query: DELETE FROM ...
var cache = transaction.getCache();
try (var iterator = cache.retrieveEntries(null, 256)) {
while (iterator.hasNext()) {
var entry = iterator.next();
if (realm.getId().equals(((RootAuthenticationSessionEntity) entry.getValue()).getRealmId())) {
cache.removeAsync(entry.getKey());
}
}
}
}
@Override
public void onClientRemoved(RealmModel realm, ClientModel client) {
// No update anything on clientRemove for now. AuthenticationSessions of removed client will be handled at runtime if needed.
}
@Override
public void updateNonlocalSessionAuthNotes(AuthenticationSessionCompoundId compoundId, Map<String, String> authNotesFragment) {
if (compoundId == null) {
return;
}
session.getProvider(ClusterProvider.class).notify(
InfinispanAuthenticationSessionProviderFactory.AUTHENTICATION_SESSION_EVENTS,
AuthenticationSessionAuthNoteUpdateEvent.create(compoundId.getRootSessionId(), compoundId.getTabId(), authNotesFragment),
true,
ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC
);
}
private RootAuthenticationSessionAdapter wrap(RealmModel realm, RootAuthenticationSessionEntity entity) {
return entity == null ? null : new RootAuthenticationSessionAdapter(session, new RootAuthenticationSessionUpdater(realm, entity, transaction), realm, authSessionsLimit);
}
private record RootAuthenticationSessionUpdater(RealmModel realm, RootAuthenticationSessionEntity entity,
RemoteInfinispanKeycloakTransaction<String, RootAuthenticationSessionEntity> transaction
) implements SessionEntityUpdater<RootAuthenticationSessionEntity> {
@Override
public RootAuthenticationSessionEntity getEntity() {
return entity;
}
@Override
public void onEntityUpdated() {
int expirationSeconds = entity.getTimestamp() - Time.currentTime() + SessionExpiration.getAuthSessionLifespan(realm);
transaction.replace(entity.getId(), entity, expirationSeconds, TimeUnit.SECONDS);
}
@Override
public void onEntityRemoved() {
transaction.remove(entity.getId());
}
}
}

View file

@ -0,0 +1,86 @@
package org.keycloak.models.sessions.infinispan.remote;
import java.lang.invoke.MethodHandles;
import java.util.List;
import org.infinispan.client.hotrod.RemoteCache;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.Profile;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.entities.RootAuthenticationSessionEntity;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.sessions.AuthenticationSessionProviderFactory;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.getRemoteCache;
import static org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory.DEFAULT_AUTH_SESSIONS_LIMIT;
public class RemoteInfinispanAuthenticationSessionProviderFactory implements AuthenticationSessionProviderFactory<RemoteInfinispanAuthenticationSessionProvider> {
private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
public static final String PROVIDER_ID = "remote-infinispan";
private int authSessionsLimit;
private RemoteCache<String, RootAuthenticationSessionEntity> cache;
@Override
public boolean isSupported(Config.Scope config) {
return Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
}
@Override
public RemoteInfinispanAuthenticationSessionProvider create(KeycloakSession session) {
return new RemoteInfinispanAuthenticationSessionProvider(session, this);
}
@Override
public void init(Config.Scope config) {
authSessionsLimit = InfinispanAuthenticationSessionProviderFactory.getAuthSessionsLimit(config);
}
@Override
public void postInit(KeycloakSessionFactory factory) {
cache = getRemoteCache(factory, AUTHENTICATION_SESSIONS_CACHE_NAME);
logger.debugf("Provided initialized. session limit=%s", authSessionsLimit);
}
@Override
public void close() {
cache = null;
}
@Override
public List<ProviderConfigProperty> getConfigMetadata() {
return ProviderConfigurationBuilder.create()
.property()
.name("authSessionsLimit")
.type("int")
.helpText("The maximum number of concurrent authentication sessions per RootAuthenticationSession.")
.defaultValue(DEFAULT_AUTH_SESSIONS_LIMIT)
.add()
.build();
}
@Override
public String getId() {
return PROVIDER_ID;
}
@Override
public int order() {
// use the same priority as the embedded based one
return InfinispanAuthenticationSessionProviderFactory.PROVIDER_PRIORITY;
}
public int getAuthSessionsLimit() {
return authSessionsLimit;
}
public RemoteCache<String, RootAuthenticationSessionEntity> getCache() {
return cache;
}
}

View file

@ -0,0 +1,227 @@
package org.keycloak.models.sessions.infinispan.remote;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.jboss.logging.Logger;
import org.keycloak.models.KeycloakTransaction;
import java.lang.invoke.MethodHandles;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class RemoteInfinispanKeycloakTransaction<K, V> implements KeycloakTransaction {
private final static Logger logger = Logger.getLogger(MethodHandles.lookup().lookupClass());
private boolean active;
private boolean rollback;
private final Map<K, Operation<K, V>> tasks = new LinkedHashMap<>();
private final RemoteCache<K, V> cache;
public RemoteInfinispanKeycloakTransaction(RemoteCache<K, V> cache) {
this.cache = Objects.requireNonNull(cache);
}
@Override
public void begin() {
active = true;
tasks.clear();
}
@Override
public void commit() {
active = false;
if (rollback) {
throw new RuntimeException("Rollback only!");
}
AggregateCompletionStage<Void> stage = CompletionStages.aggregateCompletionStage();
tasks.values().stream()
.map(this::commitOperation)
.forEach(stage::dependsOn);
try {
CompletionStages.await(stage.freeze());
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@Override
public void rollback() {
active = false;
tasks.clear();
}
@Override
public void setRollbackOnly() {
rollback = true;
}
@Override
public boolean getRollbackOnly() {
return rollback;
}
@Override
public boolean isActive() {
return active;
}
public void put(K key, V value, int lifespan, TimeUnit timeUnit) {
logger.tracef("Adding %s.put(%S)", cache.getName(), key);
if (tasks.containsKey(key)) {
throw new IllegalStateException("Can't add session: task in progress for session");
}
tasks.put(key, new PutOperation<>(key, value, lifespan, timeUnit));
}
public void replace(K key, V value, int lifespan, TimeUnit timeUnit) {
logger.tracef("Adding %s.replace(%S)", cache.getName(), key);
Operation<K, V> existing = tasks.get(key);
if (existing != null) {
if (existing.hasValue()) {
tasks.put(key, existing.update(value, lifespan, timeUnit));
}
return;
}
tasks.put(key, new ReplaceOperation<>(key, value, lifespan, timeUnit));
}
public void remove(K key) {
logger.tracef("Adding %s.remove(%S)", cache.getName(), key);
Operation<K, V> existing = tasks.get(key);
if (existing != null && existing.canRemove()) {
tasks.remove(key);
return;
}
tasks.put(key, new RemoveOperation<>(key));
}
public V get(K key) {
var existing = tasks.get(key);
if (existing != null && existing.hasValue()) {
return existing.getValue();
}
// Should we have per-transaction cache for lookups?
return cache.get(key);
}
public RemoteCache<K, V> getCache() {
return cache;
}
private CompletionStage<?> commitOperation(Operation<K, V> operation) {
try {
return operation.execute(cache);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
private interface Operation<K, V> {
CompletionStage<?> execute(RemoteCache<K, V> cache);
/**
* Updates the operation with a new value and lifespan only if {@link #hasValue()} returns {@code true}.
*/
default Operation<K, V> update(V newValue, int newLifespan, TimeUnit newTimeUnit) {
return null;
}
/**
* @return {@code true} if the operation can be removed from the tasks map. It will skip the {@link RemoteCache} removal.
*/
default boolean canRemove() {
return false;
}
/**
* @return {@code true} if the operation has a value associated
*/
default boolean hasValue() {
return false;
}
default V getValue() {
return null;
}
}
private record PutOperation<K, V>(K key, V value, int lifespan, TimeUnit timeUnit) implements Operation<K, V> {
@Override
public CompletionStage<?> execute(RemoteCache<K, V> cache) {
return cache.putAsync(key, value, lifespan, timeUnit);
}
@Override
public Operation<K, V> update(V newValue, int newLifespan, TimeUnit newTimeUnit) {
return new PutOperation<>(key, newValue, newLifespan, newTimeUnit);
}
@Override
public boolean canRemove() {
// since it is new entry in the cache, it can be removed form the tasks map.
return true;
}
@Override
public boolean hasValue() {
return true;
}
@Override
public V getValue() {
return value;
}
}
private record ReplaceOperation<K, V>(K key, V value, int lifespan, TimeUnit timeUnit) implements Operation<K, V> {
@Override
public CompletionStage<?> execute(RemoteCache<K, V> cache) {
return cache.replaceAsync(key, value, lifespan, timeUnit);
}
@Override
public Operation<K, V> update(V newValue, int newLifespan, TimeUnit newTimeUnit) {
return new ReplaceOperation<>(key, newValue, newLifespan, newTimeUnit);
}
@Override
public boolean hasValue() {
return true;
}
@Override
public V getValue() {
return value;
}
}
private record RemoveOperation<K, V>(K key) implements Operation<K, V> {
@Override
public CompletionStage<?> execute(RemoteCache<K, V> cache) {
return cache.removeAsync(key);
}
}
}

View file

@ -15,4 +15,5 @@
# limitations under the License.
#
org.keycloak.cluster.infinispan.InfinispanClusterProviderFactory
org.keycloak.cluster.infinispan.InfinispanClusterProviderFactory
org.keycloak.cluster.infinispan.remote.RemoteInfinispanClusterProviderFactory

View file

@ -15,4 +15,5 @@
# limitations under the License.
#
org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory
org.keycloak.models.sessions.infinispan.InfinispanAuthenticationSessionProviderFactory
org.keycloak.models.sessions.infinispan.remote.RemoteInfinispanAuthenticationSessionProviderFactory

View file

@ -17,10 +17,11 @@
package org.keycloak.cluster;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface ClusterProviderFactory extends ProviderFactory<ClusterProvider> {
public interface ClusterProviderFactory extends ProviderFactory<ClusterProvider>, EnvironmentDependentProviderFactory {
}

View file

@ -17,21 +17,21 @@
package org.keycloak.quarkus.deployment;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.logging.LoggingSetupBuildItem;
import jakarta.enterprise.context.ApplicationScoped;
import org.keycloak.quarkus.runtime.KeycloakRecorder;
import org.keycloak.quarkus.runtime.storage.legacy.infinispan.CacheManagerFactory;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.logging.LoggingSetupBuildItem;
import jakarta.enterprise.context.ApplicationScoped;
import org.keycloak.quarkus.runtime.KeycloakRecorder;
import org.keycloak.quarkus.runtime.storage.legacy.infinispan.CacheManagerFactory;
public class CacheBuildSteps {
@Consume(ProfileBuildItem.class)
@Consume(ConfigBuildItem.class)
// Consume LoggingSetupBuildItem.class and record RUNTIME_INIT are necessary to ensure that logging is set up before the caches are initialized.
// This is to prevent the class TP in JGroups to pick up the trace logging at start up. While the logs will not appear on the console,

View file

@ -17,20 +17,6 @@
package org.keycloak.quarkus.runtime;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.agroal.api.AgroalDataSource;
import io.quarkus.agroal.DataSource;
import io.quarkus.arc.Arc;
@ -64,6 +50,20 @@ import org.keycloak.theme.ClasspathThemeProviderFactory;
import org.keycloak.truststore.TruststoreBuilder;
import org.keycloak.userprofile.DeclarativeUserProfileProviderFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.annotation.Annotation;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.keycloak.quarkus.runtime.configuration.Configuration.getKcConfigValue;
@Recorder
@ -104,8 +104,9 @@ public class KeycloakRecorder {
public void configureLiquibase(Map<String, List<String>> services) {
ServiceLocator locator = Scope.getCurrentScope().getServiceLocator();
if (locator instanceof FastServiceLocator)
if (locator instanceof FastServiceLocator) {
((FastServiceLocator) locator).initServices(services);
}
}
public void configSessionFactory(

View file

@ -17,10 +17,20 @@
package org.keycloak.quarkus.runtime.storage.legacy.infinispan;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.micrometer.core.instrument.Metrics;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.DefaultTemplate;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.commons.api.Lifecycle;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.HashConfiguration;
import org.infinispan.configuration.cache.PersistenceConfigurationBuilder;
@ -40,18 +50,10 @@ import org.jgroups.util.TLSClientAuth;
import org.keycloak.common.Profile;
import org.keycloak.config.CachingOptions;
import org.keycloak.config.MetricsOptions;
import org.keycloak.connections.infinispan.InfinispanUtil;
import org.keycloak.marshalling.Marshalling;
import org.keycloak.quarkus.runtime.configuration.Configuration;
import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.keycloak.config.CachingOptions.CACHE_EMBEDDED_MTLS_KEYSTORE_FILE_PROPERTY;
import static org.keycloak.config.CachingOptions.CACHE_EMBEDDED_MTLS_KEYSTORE_PASSWORD_PROPERTY;
@ -61,11 +63,13 @@ import static org.keycloak.config.CachingOptions.CACHE_REMOTE_HOST_PROPERTY;
import static org.keycloak.config.CachingOptions.CACHE_REMOTE_PASSWORD_PROPERTY;
import static org.keycloak.config.CachingOptions.CACHE_REMOTE_PORT_PROPERTY;
import static org.keycloak.config.CachingOptions.CACHE_REMOTE_USERNAME_PROPERTY;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.DISTRIBUTED_REPLICATED_CACHE_NAMES;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.OFFLINE_USER_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.USER_SESSION_CACHE_NAME;
import static org.keycloak.connections.infinispan.InfinispanConnectionProvider.WORK_CACHE_NAME;
import static org.wildfly.security.sasl.util.SaslMechanismInformation.Names.SCRAM_SHA_512;
public class CacheManagerFactory {
@ -73,18 +77,39 @@ public class CacheManagerFactory {
private static final Logger logger = Logger.getLogger(CacheManagerFactory.class);
private final CompletableFuture<DefaultCacheManager> cacheManagerFuture;
private final CompletableFuture<RemoteCacheManager> remoteCacheManagerFuture;
public CacheManagerFactory(String config) {
this.cacheManagerFuture = startEmbeddedCacheManager(config);
if (isCrossSiteEnabled() && isRemoteCacheEnabled()) {
logger.debug("Remote Cache feature is enabled");
this.remoteCacheManagerFuture = CompletableFuture.supplyAsync(this::startRemoteCacheManager);
} else {
logger.debug("Remote Cache feature is disabled");
this.remoteCacheManagerFuture = CompletableFutures.completedNull();
}
}
public DefaultCacheManager getOrCreateEmbeddedCacheManager() {
return join(cacheManagerFuture);
}
public RemoteCacheManager getOrCreateRemoteCacheManager() {
return join(remoteCacheManagerFuture);
}
public void shutdown() {
logger.debug("Shutdown embedded cache manager");
logger.debug("Shutdown embedded and remote cache managers");
cacheManagerFuture.thenAccept(CacheManagerFactory::close);
remoteCacheManagerFuture.thenAccept(CacheManagerFactory::close);
}
private static boolean isCrossSiteEnabled() {
return Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE);
}
private static boolean isRemoteCacheEnabled() {
return Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE);
}
private static <T> T join(Future<T> future) {
@ -104,6 +129,54 @@ public class CacheManagerFactory {
}
}
private RemoteCacheManager startRemoteCacheManager() {
String cacheRemoteHost = requiredStringProperty(CACHE_REMOTE_HOST_PROPERTY);
Integer cacheRemotePort = Configuration.getOptionalKcValue(CACHE_REMOTE_PORT_PROPERTY)
.map(Integer::parseInt)
.orElse(ConfigurationProperties.DEFAULT_HOTROD_PORT);
String cacheRemoteUsername = requiredStringProperty(CACHE_REMOTE_USERNAME_PROPERTY);
String cacheRemotePassword = requiredStringProperty(CACHE_REMOTE_PASSWORD_PROPERTY);
org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
builder.addServer().host(cacheRemoteHost).port(cacheRemotePort);
builder.connectionPool().maxActive(16).exhaustedAction(org.infinispan.client.hotrod.configuration.ExhaustedAction.CREATE_NEW);
if (isRemoteTLSEnabled()) {
builder.security().ssl()
.enable()
.sslContext(createSSLContext())
.sniHostName(cacheRemoteHost);
}
if (isRemoteAuthenticationEnabled()) {
builder.security().authentication()
.enable()
.username(cacheRemoteUsername)
.password(cacheRemotePassword)
.realm("default")
.saslMechanism(SCRAM_SHA_512);
}
Marshalling.configure(builder);
if (createRemoteCaches()) {
// fall back for distributed caches if not defined
logger.warn("Creating remote cache in external Infinispan server. It should not be used in production!");
for (String name : DISTRIBUTED_REPLICATED_CACHE_NAMES) {
builder.remoteCache(name).templateName(DefaultTemplate.DIST_SYNC);
}
}
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build());
// establish connection to all caches
if (isStartEagerly()) {
DISTRIBUTED_REPLICATED_CACHE_NAMES.forEach(remoteCacheManager::getCache);
}
return remoteCacheManager;
}
private CompletableFuture<DefaultCacheManager> startEmbeddedCacheManager(String config) {
ConfigurationBuilderHolder builder = new ParserRegistry().parse(config);
@ -149,12 +222,22 @@ public class CacheManagerFactory {
}
Marshalling.configure(builder.getGlobalConfigurationBuilder());
if (isCrossSiteEnabled() && isRemoteCacheEnabled()) {
var builders = builder.getNamedConfigurationBuilders();
// remove all distributed caches
logger.debug("Removing all distributed caches.");
// TODO [pruivo] remove all distributed caches after all of them are converted
//DISTRIBUTED_REPLICATED_CACHE_NAMES.forEach(builders::remove);
builders.remove(WORK_CACHE_NAME);
builders.remove(AUTHENTICATION_SESSIONS_CACHE_NAME);
}
var start = isStartEagerly();
return CompletableFuture.supplyAsync(() -> new DefaultCacheManager(builder, start));
}
private static boolean isRemoteTLSEnabled() {
return Configuration.isTrue(CachingOptions.CACHE_REMOTE_TLS_ENABLED);
return Boolean.parseBoolean(System.getProperty("kc.cache-remote-tls-enabled", Boolean.TRUE.toString()));
}
private static boolean isRemoteAuthenticationEnabled() {
@ -162,6 +245,10 @@ public class CacheManagerFactory {
Configuration.getOptionalKcValue(CACHE_REMOTE_PASSWORD_PROPERTY).isPresent();
}
private static boolean createRemoteCaches() {
return Boolean.parseBoolean(System.getProperty("kc.cache-remote-create-caches", Boolean.FALSE.toString()));
}
private static SSLContext createSSLContext() {
try {
// uses the default Java Runtime TrustStore, or the one generated by Keycloak (see org.keycloak.truststore.TruststoreBuilder)
@ -205,6 +292,12 @@ public class CacheManagerFactory {
transportConfig.addProperty(JGroupsTransport.SOCKET_FACTORY, tls.createSocketFactory());
Logger.getLogger(CacheManagerFactory.class).info("MTLS enabled for communications for embedded caches");
}
//TODO [pruivo] disable JGroups after all distributed caches are converted
// if (isCrossSiteEnabled() && isRemoteCacheEnabled()) {
// logger.debug("Disabling JGroups between Keycloak nodes");
// builder.getGlobalConfigurationBuilder().nonClusteredDefault();
// }
}
private void validateTlsAvailable(GlobalConfiguration config) {

View file

@ -30,4 +30,9 @@ public final class QuarkusCacheManagerProvider implements ManagedCacheManagerPro
public <C> C getEmbeddedCacheManager(Config.Scope config) {
return (C) Arc.container().instance(CacheManagerFactory.class).get().getOrCreateEmbeddedCacheManager();
}
@Override
public <C> C getRemoteCacheManager(Config.Scope config) {
return (C) Arc.container().instance(CacheManagerFactory.class).get().getOrCreateRemoteCacheManager();
}
}

View file

@ -20,11 +20,16 @@ package org.keycloak.cluster;
import org.keycloak.Config;
/**
* A Service Provider Interface (SPI) that allows to plug-in an embedded cache manager instance.
* A Service Provider Interface (SPI) that allows to plug-in an embedded or remote cache manager instance.
*
* @author <a href="mailto:psilva@redhat.com">Pedro Igor</a>
*/
public interface ManagedCacheManagerProvider {
<C> C getEmbeddedCacheManager(Config.Scope config);
/**
* @return A RemoteCacheManager if the feature {@link org.keycloak.common.Profile.Feature#REMOTE_CACHE} is enabled, {@code null} otherwise.
*/
<C> C getRemoteCacheManager(Config.Scope config);
}

View file

@ -17,10 +17,11 @@
package org.keycloak.sessions;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface AuthenticationSessionProviderFactory<T extends AuthenticationSessionProvider> extends ProviderFactory<T> {
public interface AuthenticationSessionProviderFactory<T extends AuthenticationSessionProvider> extends ProviderFactory<T>, EnvironmentDependentProviderFactory {
}

View file

@ -18,13 +18,6 @@
package org.keycloak.testsuite.util;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.Profile;
@ -37,6 +30,13 @@ import org.keycloak.provider.ProviderManagerRegistry;
import org.keycloak.provider.Spi;
import org.keycloak.services.DefaultKeycloakSession;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* Used to dynamically reload EnvironmentDependentProviderFactories after some feature is enabled/disabled
*

View file

@ -240,6 +240,27 @@
</properties>
</profile>
<profile>
<id>jpa+remote-infinispan</id>
<properties>
<keycloak.model.parameters>RemoteInfinispan,Jpa</keycloak.model.parameters>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<keycloak.profile.feature.remote_cache>enabled</keycloak.profile.feature.remote_cache>
<keycloak.profile.feature.multi_site>enabled</keycloak.profile.feature.multi_site>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>jpa-federation+infinispan</id>
<properties>

View file

@ -82,8 +82,10 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -102,6 +104,8 @@ import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.keycloak.models.DeploymentStateProviderFactory;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Base of testcases that operate on session level. The tests derived from this class
* will have access to a shared {@link KeycloakSessionFactory} in the {@link #LOCAL_FACTORY}
@ -629,4 +633,36 @@ public abstract class KeycloakModelTest {
Time.setOffset(seconds);
});
}
public static void eventually(BooleanSupplier condition) {
eventually(null, condition, 5000, 10, MILLISECONDS);
}
public static void eventually(Supplier<String> message, BooleanSupplier condition) {
eventually(message, condition, 5000, 10, MILLISECONDS);
}
public static void eventually(Supplier<String> message, BooleanSupplier condition, long timeout,
long pollInterval, TimeUnit unit) {
if (pollInterval <= 0) {
throw new IllegalArgumentException("Check interval must be positive");
}
if (message == null) {
message = () -> null;
}
try {
long expectedEndTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit);
long sleepMillis = MILLISECONDS.convert(pollInterval, unit);
do {
if (condition.getAsBoolean()) return;
Thread.sleep(sleepMillis);
} while (expectedEndTime - System.nanoTime() > 0);
} catch (Exception e) {
throw new RuntimeException("Unexpected!", e);
}
// last check
Assert.assertTrue(message.get(), condition.getAsBoolean());
}
}

View file

@ -19,6 +19,7 @@ package org.keycloak.testsuite.model.infinispan;
import org.infinispan.Cache;
import org.junit.Assume;
import org.junit.Test;
import org.keycloak.common.Profile;
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
import org.keycloak.models.cache.infinispan.events.AuthenticationSessionAuthNoteUpdateEvent;
import org.keycloak.testsuite.model.KeycloakModelTest;
@ -55,6 +56,7 @@ public class CacheExpirationTest extends KeycloakModelTest {
@Test
public void testCacheExpiration() throws Exception {
assumeFalse("Embedded caches not available for testing.", Profile.isFeatureEnabled(Profile.Feature.MULTI_SITE) && Profile.isFeatureEnabled(Profile.Feature.REMOTE_CACHE));
log.debugf("Number of previous instances of the class on the heap: %d", getNumberOfInstancesOfClass(AuthenticationSessionAuthNoteUpdateEvent.class));

View file

@ -0,0 +1,110 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.model.parameters;
import com.google.common.collect.ImmutableSet;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.keycloak.cluster.infinispan.remote.RemoteInfinispanClusterProviderFactory;
import org.keycloak.models.UserSessionSpi;
import org.keycloak.models.sessions.infinispan.InfinispanUserSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.remote.RemoteInfinispanAuthenticationSessionProviderFactory;
import org.keycloak.provider.ProviderFactory;
import org.keycloak.testsuite.model.Config;
import org.keycloak.testsuite.model.HotRodServerRule;
import org.keycloak.testsuite.model.KeycloakModelParameters;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/**
* Copied from {@link CrossDCInfinispan}.
* <p>
* Adds the new provider factories implementation
*/
public class RemoteInfinispan extends KeycloakModelParameters {
private final HotRodServerRule hotRodServerRule = new HotRodServerRule();
private static final AtomicInteger NODE_COUNTER = new AtomicInteger();
private static final String SITE_1_MCAST_ADDR = "228.5.6.7";
private static final String SITE_2_MCAST_ADDR = "228.6.7.8";
private final Object lock = new Object();
static final Set<Class<? extends ProviderFactory>> ALLOWED_FACTORIES = ImmutableSet.<Class<? extends ProviderFactory>>builder()
.addAll(Infinispan.ALLOWED_FACTORIES)
.add(RemoteInfinispanClusterProviderFactory.class)
.add(RemoteInfinispanAuthenticationSessionProviderFactory.class)
.build();
@Override
public void updateConfig(Config cf) {
synchronized (lock) {
NODE_COUNTER.incrementAndGet();
cf.spi("connectionsInfinispan")
.provider("default")
.config("embedded", "true")
.config("clustered", "true")
.config("remoteStoreEnabled", "true")
.config("useKeycloakTimeService", "true")
.config("remoteStoreSecurityEnabled", "false")
.config("nodeName", "node-" + NODE_COUNTER.get())
.config("siteName", siteName(NODE_COUNTER.get()))
.config("remoteStorePort", siteName(NODE_COUNTER.get()).equals("site-2") ? "11333" : "11222")
.config("jgroupsUdpMcastAddr", mcastAddr(NODE_COUNTER.get()))
.spi(UserSessionSpi.NAME)
.provider(InfinispanUserSessionProviderFactory.PROVIDER_ID)
.config("offlineSessionCacheEntryLifespanOverride", "43200")
.config("offlineClientSessionCacheEntryLifespanOverride", "43200");
}
}
public RemoteInfinispan() {
super(Infinispan.ALLOWED_SPIS, ALLOWED_FACTORIES);
}
@Override
public void beforeSuite(Config cf) {
hotRodServerRule.createEmbeddedHotRodServer(cf.scope("connectionsInfinispan", "default"));
}
private static String siteName(int node) {
return "site-" + (node % 2 == 0 ? 2 : 1);
}
private static String mcastAddr(int node) {
return (node % 2 == 0) ? SITE_2_MCAST_ADDR : SITE_1_MCAST_ADDR;
}
@Override
public <T> Stream<T> getParameters(Class<T> clazz) {
if (HotRodServerRule.class.isAssignableFrom(clazz)) {
return Stream.of((T) hotRodServerRule);
} else {
return Stream.empty();
}
}
@Override
public Statement classRule(Statement base, Description description) {
return hotRodServerRule.apply(base, description);
}
}

View file

@ -17,6 +17,8 @@
package org.keycloak.testsuite.model.session;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
@ -416,20 +418,10 @@ public class SessionTimeoutsTest extends KeycloakModelTest {
private void allowXSiteReplication(boolean offline) {
HotRodServerRule hotRodServer = getParameters(HotRodServerRule.class).findFirst().orElse(null);
if (hotRodServer != null) {
String cacheName = offline ? InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME : InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
while (hotRodServer.getHotRodCacheManager().getCache(cacheName).size() != hotRodServer.getHotRodCacheManager2().getCache(cacheName).size()) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.errorf("Interrupted while waiting. Cache: %s, Cache sizes: %d vs %d",
cacheName,
hotRodServer.getHotRodCacheManager().getCache(cacheName).size(),
hotRodServer.getHotRodCacheManager2().getCache(cacheName).size()
);
throw new RuntimeException(e);
}
}
var cacheName = offline ? InfinispanConnectionProvider.OFFLINE_CLIENT_SESSION_CACHE_NAME : InfinispanConnectionProvider.CLIENT_SESSION_CACHE_NAME;
var cache1 = hotRodServer.getHotRodCacheManager().getCache(cacheName);
var cache2 = hotRodServer.getHotRodCacheManager2().getCache(cacheName);
eventually(null, () -> cache1.size() == cache2.size(), 10000, 10, TimeUnit.MILLISECONDS);
}
}
}

View file

@ -370,7 +370,7 @@ public class UserSessionProviderOfflineModelTest extends KeycloakModelTest {
log.debug("Joining the cluster");
inComittedTransaction(session -> {
InfinispanConnectionProvider provider = session.getProvider(InfinispanConnectionProvider.class);
Cache<String, Object> cache = provider.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
Cache<String, Object> cache = provider.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
while (! cache.getAdvancedCache().getDistributionManager().isJoinComplete()) {
sleep(1000);
}

View file

@ -201,9 +201,8 @@ public class SingleUseObjectModelTest extends KeycloakModelTest {
// check if single-use object/action token is available on all nodes
inComittedTransaction(session -> {
SingleUseObjectProvider singleUseStore = session.singleUseObjects();
while (singleUseStore.get(key) == null || singleUseStore.get(actionTokenKey.get()) == null) {
sleep(1000);
}
eventually(() -> "key not found: " + key, () -> singleUseStore.get(key) != null);
eventually(() -> "key not found: " + actionTokenKey.get(), () -> singleUseStore.get(actionTokenKey.get()) != null);
replicationDone.countDown();
});
@ -226,9 +225,8 @@ public class SingleUseObjectModelTest extends KeycloakModelTest {
inComittedTransaction(session -> {
SingleUseObjectProvider singleUseStore = session.singleUseObjects();
while (singleUseStore.get(key) != null && singleUseStore.get(actionTokenKey.get()) != null) {
sleep(1000);
}
eventually(() -> "key found: " + key, () -> singleUseStore.get(key) == null);
eventually(() -> "key found: " + actionTokenKey.get(), () -> singleUseStore.get(actionTokenKey.get()) == null);
});
});
}