Batch cluster events

Sending multiple events in a single network request should minimize
latency and traffic.

Closes #30445

Signed-off-by: Pedro Ruivo <pruivo@redhat.com>
This commit is contained in:
Pedro Ruivo 2024-06-14 14:37:55 +01:00 committed by Alexander Schwartz
parent edde31a1ca
commit 5c0dddd837
15 changed files with 117 additions and 73 deletions

View file

@ -8,3 +8,15 @@ WARNING: JBoss Marshalling and Infinispan Protostream are not compatible with ea
Consequently, all caches are cleared when upgrading to this version. Consequently, all caches are cleared when upgrading to this version.
To prevent losing user sessions upgrade to Keycloak 25 first and enable the persistent sessions feature as outlined in the migration guide for {project_name} 25. To prevent losing user sessions upgrade to Keycloak 25 first and enable the persistent sessions feature as outlined in the migration guide for {project_name} 25.
= New method in `ClusterProvider` API
The following method was added to `org.keycloak.cluster.ClusterProvider`:
* `void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, DCNotify dcNotify)`
When multiple events are sent to the same `taskKey`, this method batches events and just perform a single network call.
This is an optimization to reduce traffic and network related resources.
In {project_name} 26, the new method has a default implementation to keep backward compatibility with custom implementation.
The default implementation performs a single network call per an event, and it will be removed in a future version of {project_name}.

View file

@ -17,6 +17,8 @@
package org.keycloak.cluster.infinispan; package org.keycloak.cluster.infinispan;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -123,10 +125,14 @@ public class InfinispanClusterProvider implements ClusterProvider {
this.notificationsManager.registerListener(taskKey, task); this.notificationsManager.registerListener(taskKey, task);
} }
@Override @Override
public void notify(String taskKey, ClusterEvent event, boolean ignoreSender, DCNotify dcNotify) { public void notify(String taskKey, ClusterEvent event, boolean ignoreSender, DCNotify dcNotify) {
this.notificationsManager.notify(taskKey, event, ignoreSender, dcNotify); notificationsManager.notify(taskKey, Collections.singleton(event), ignoreSender, dcNotify);
}
@Override
public void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, DCNotify dcNotify) {
notificationsManager.notify(taskKey, events, ignoreSender, dcNotify);
} }
private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) { private boolean tryLock(String cacheKey, int taskTimeoutInSeconds) {

View file

@ -17,7 +17,7 @@
package org.keycloak.cluster.infinispan; package org.keycloak.cluster.infinispan;
import java.io.Serializable; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
@ -77,7 +77,7 @@ public class InfinispanNotificationsManager {
private final Cache<String, Object> workCache; private final Cache<String, Object> workCache;
private final RemoteCache<Object, Serializable> workRemoteCache; private final RemoteCache<String, Object> workRemoteCache;
private final String myAddress; private final String myAddress;
@ -85,7 +85,7 @@ public class InfinispanNotificationsManager {
private final ExecutorService listenersExecutor; private final ExecutorService listenersExecutor;
protected InfinispanNotificationsManager(Cache<String, Object> workCache, RemoteCache<Object, Serializable> workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) { protected InfinispanNotificationsManager(Cache<String, Object> workCache, RemoteCache<String, Object> workRemoteCache, String myAddress, String mySite, ExecutorService listenersExecutor) {
this.workCache = workCache; this.workCache = workCache;
this.workRemoteCache = workRemoteCache; this.workRemoteCache = workRemoteCache;
this.myAddress = myAddress; this.myAddress = myAddress;
@ -96,7 +96,7 @@ public class InfinispanNotificationsManager {
// Create and init manager including all listeners etc // Create and init manager including all listeners etc
public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Object> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) { public static InfinispanNotificationsManager create(KeycloakSession session, Cache<String, Object> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
RemoteCache<Object, Serializable> workRemoteCache = null; RemoteCache<String, Object> workRemoteCache = null;
if (!remoteStores.isEmpty()) { if (!remoteStores.isEmpty()) {
RemoteStore remoteStore = remoteStores.iterator().next(); RemoteStore remoteStore = remoteStores.iterator().next();
@ -140,13 +140,13 @@ public class InfinispanNotificationsManager {
} }
void notify(String taskKey, ClusterEvent event, boolean ignoreSender, ClusterProvider.DCNotify dcNotify) { void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, ClusterProvider.DCNotify dcNotify) {
var wrappedEvent = WrapperClusterEvent.wrap(taskKey, event, myAddress, mySite, dcNotify, ignoreSender); var wrappedEvent = WrapperClusterEvent.wrap(taskKey, events, myAddress, mySite, dcNotify, ignoreSender);
String eventKey = UUID.randomUUID().toString(); String eventKey = UUID.randomUUID().toString();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.tracef("Sending event with key %s: %s", eventKey, event); logger.tracef("Sending event with key %s: %s", eventKey, events);
} }
if (dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY || workRemoteCache == null) { if (dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY || workRemoteCache == null) {
@ -200,9 +200,9 @@ public class InfinispanNotificationsManager {
@ClientListener @ClientListener
public class HotRodListener { public class HotRodListener {
private final RemoteCache<Object, Serializable> remoteCache; private final RemoteCache<String, Object> remoteCache;
public HotRodListener(RemoteCache<Object, Serializable> remoteCache) { public HotRodListener(RemoteCache<String, Object> remoteCache) {
this.remoteCache = remoteCache; this.remoteCache = remoteCache;
} }
@ -229,8 +229,8 @@ public class InfinispanNotificationsManager {
// TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request // TODO: Look at CacheEventConverter stuff to possibly include value in the event and avoid additional remoteCache request
try { try {
listenersExecutor.submit(() -> { listenersExecutor.submit(() -> {
Supplier<Serializable> fetchEvent = () -> remoteCache.get(key); Supplier<Object> fetchEvent = () -> remoteCache.get(key);
Serializable event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent); Object event = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(fetchEvent);
int iteration = 0; int iteration = 0;
// Event might have been generated from a node which is more up-to-date, so the fetch might return null. // Event might have been generated from a node which is more up-to-date, so the fetch might return null.
// Retry until we find a node that is up-to-date and has the entry. // Retry until we find a node that is up-to-date and has the entry.
@ -281,12 +281,10 @@ public class InfinispanNotificationsManager {
logger.tracef("Received event: %s", event); logger.tracef("Received event: %s", event);
} }
ClusterEvent wrappedEvent = event.getDelegateEvent();
List<ClusterListener> myListeners = listeners.get(eventKey); List<ClusterListener> myListeners = listeners.get(eventKey);
if (myListeners != null) { if (myListeners != null) {
for (ClusterListener listener : myListeners) { for (var e : event.getDelegateEvents()) {
listener.eventReceived(wrappedEvent); myListeners.forEach(e);
} }
} }
} }

View file

@ -17,6 +17,8 @@
package org.keycloak.cluster.infinispan; package org.keycloak.cluster.infinispan;
import java.util.Collection;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import org.infinispan.protostream.WrappedMessage; import org.infinispan.protostream.WrappedMessage;
@ -42,22 +44,23 @@ public class WrapperClusterEvent implements ClusterEvent {
final String senderSite; // can be null final String senderSite; // can be null
@ProtoField(4) @ProtoField(4)
final SiteFilter siteFilter; final SiteFilter siteFilter;
final ClusterEvent delegateEvent; private final Collection<? extends ClusterEvent> events;
private WrapperClusterEvent(String eventKey, String senderAddress, String senderSite, SiteFilter siteFilter, ClusterEvent delegateEvent) { private WrapperClusterEvent(String eventKey, String senderAddress, String senderSite, SiteFilter siteFilter, Collection<? extends ClusterEvent> events) {
this.eventKey = Objects.requireNonNull(eventKey); this.eventKey = Objects.requireNonNull(eventKey);
this.senderAddress = senderAddress; this.senderAddress = senderAddress;
this.senderSite = senderSite; this.senderSite = senderSite;
this.siteFilter = Objects.requireNonNull(siteFilter); this.siteFilter = Objects.requireNonNull(siteFilter);
this.delegateEvent = Objects.requireNonNull(delegateEvent); this.events = Objects.requireNonNull(events);
} }
@ProtoFactory @ProtoFactory
static WrapperClusterEvent protoFactory(String eventKey, String senderAddress, String senderSite, SiteFilter siteFilter, WrappedMessage eventPS) { static WrapperClusterEvent protoFactory(String eventKey, String senderAddress, String senderSite, SiteFilter siteFilter, List<WrappedMessage> eventPS) {
return new WrapperClusterEvent(eventKey, Marshalling.emptyStringToNull(senderAddress), Marshalling.emptyStringToNull(senderSite), siteFilter, (ClusterEvent) eventPS.getValue()); var events = eventPS.stream().map(WrappedMessage::getValue).map(ClusterEvent.class::cast).toList();
return new WrapperClusterEvent(eventKey, Marshalling.emptyStringToNull(senderAddress), Marshalling.emptyStringToNull(senderSite), siteFilter, events);
} }
public static WrapperClusterEvent wrap(String eventKey, ClusterEvent event, String senderAddress, String senderSite, ClusterProvider.DCNotify dcNotify, boolean ignoreSender) { public static WrapperClusterEvent wrap(String eventKey, Collection<? extends ClusterEvent> events, String senderAddress, String senderSite, ClusterProvider.DCNotify dcNotify, boolean ignoreSender) {
senderAddress = ignoreSender ? Objects.requireNonNull(senderAddress) : null; senderAddress = ignoreSender ? Objects.requireNonNull(senderAddress) : null;
senderSite = dcNotify == ClusterProvider.DCNotify.ALL_DCS ? null : senderSite; senderSite = dcNotify == ClusterProvider.DCNotify.ALL_DCS ? null : senderSite;
var siteNotification = switch (dcNotify) { var siteNotification = switch (dcNotify) {
@ -65,20 +68,20 @@ public class WrapperClusterEvent implements ClusterEvent {
case LOCAL_DC_ONLY -> SiteFilter.LOCAL; case LOCAL_DC_ONLY -> SiteFilter.LOCAL;
case ALL_BUT_LOCAL_DC -> SiteFilter.REMOTE; case ALL_BUT_LOCAL_DC -> SiteFilter.REMOTE;
}; };
return new WrapperClusterEvent(eventKey, senderAddress, senderSite, siteNotification, event); return new WrapperClusterEvent(eventKey, senderAddress, senderSite, siteNotification, events);
} }
@ProtoField(5) @ProtoField(5)
WrappedMessage getEventPS() { List<WrappedMessage> getEventPS() {
return new WrappedMessage(delegateEvent); return events.stream().map(WrappedMessage::new).toList();
} }
public String getEventKey() { public String getEventKey() {
return eventKey; return eventKey;
} }
public ClusterEvent getDelegateEvent() { Collection<? extends ClusterEvent> getDelegateEvents() {
return delegateEvent; return events;
} }
public boolean rejectEvent(String mySiteAddress, String mySiteName) { public boolean rejectEvent(String mySiteAddress, String mySiteName) {
@ -97,7 +100,7 @@ public class WrapperClusterEvent implements ClusterEvent {
Objects.equals(senderAddress, that.senderAddress) && Objects.equals(senderAddress, that.senderAddress) &&
Objects.equals(senderSite, that.senderSite) && Objects.equals(senderSite, that.senderSite) &&
siteFilter == that.siteFilter && siteFilter == that.siteFilter &&
delegateEvent.equals(that.delegateEvent); events.equals(that.events);
} }
@Override @Override
@ -106,13 +109,13 @@ public class WrapperClusterEvent implements ClusterEvent {
result = 31 * result + Objects.hashCode(senderAddress); result = 31 * result + Objects.hashCode(senderAddress);
result = 31 * result + Objects.hashCode(senderSite); result = 31 * result + Objects.hashCode(senderSite);
result = 31 * result + siteFilter.hashCode(); result = 31 * result + siteFilter.hashCode();
result = 31 * result + delegateEvent.hashCode(); result = 31 * result + events.hashCode();
return result; return result;
} }
@Override @Override
public String toString() { public String toString() {
return String.format("WrapperClusterEvent [ eventKey=%s, sender=%s, senderSite=%s, delegateEvent=%s ]", eventKey, senderAddress, senderSite, delegateEvent); return String.format("WrapperClusterEvent [ eventKey=%s, sender=%s, senderSite=%s, delegateEvents=%s ]", eventKey, senderAddress, senderSite, events);
} }
@Proto @Proto

View file

@ -38,7 +38,7 @@ public class InfinispanCachePublicKeyProvider implements CachePublicKeyProvider
public void clearCache() { public void clearCache() {
keys.clear(); keys.clear();
ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
cluster.notify(InfinispanCachePublicKeyProviderFactory.KEYS_CLEAR_CACHE_EVENTS, new ClearCacheEvent(), true, ClusterProvider.DCNotify.ALL_DCS); cluster.notify(InfinispanCachePublicKeyProviderFactory.KEYS_CLEAR_CACHE_EVENTS, ClearCacheEvent.getInstance(), true, ClusterProvider.DCNotify.ALL_DCS);
} }
@Override @Override

View file

@ -116,10 +116,11 @@ public class InfinispanPublicKeyStorageProvider implements PublicKeyStorageProvi
protected void runInvalidations() { protected void runInvalidations() {
ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
for (String cacheKey : invalidations) { var events = invalidations.stream()
keys.remove(cacheKey); .peek(keys::remove)
cluster.notify(InfinispanCachePublicKeyProviderFactory.PUBLIC_KEY_STORAGE_INVALIDATION_EVENT, PublicKeyStorageInvalidationEvent.create(cacheKey), true, ClusterProvider.DCNotify.ALL_DCS); .map(PublicKeyStorageInvalidationEvent::create)
} .toList();
cluster.notify(InfinispanCachePublicKeyProviderFactory.PUBLIC_KEY_STORAGE_INVALIDATION_EVENT, events, true, ClusterProvider.DCNotify.ALL_DCS);
} }
@Override @Override

View file

@ -27,6 +27,7 @@ import org.keycloak.cluster.infinispan.WrapperClusterEvent;
import org.keycloak.component.ComponentModel; import org.keycloak.component.ComponentModel;
import org.keycloak.keys.infinispan.PublicKeyStorageInvalidationEvent; import org.keycloak.keys.infinispan.PublicKeyStorageInvalidationEvent;
import org.keycloak.models.UserSessionModel; import org.keycloak.models.UserSessionModel;
import org.keycloak.models.cache.infinispan.ClearCacheEvent;
import org.keycloak.models.cache.infinispan.authorization.events.PolicyRemovedEvent; import org.keycloak.models.cache.infinispan.authorization.events.PolicyRemovedEvent;
import org.keycloak.models.cache.infinispan.authorization.events.PolicyUpdatedEvent; import org.keycloak.models.cache.infinispan.authorization.events.PolicyUpdatedEvent;
import org.keycloak.models.cache.infinispan.authorization.events.ResourceRemovedEvent; import org.keycloak.models.cache.infinispan.authorization.events.ResourceRemovedEvent;
@ -114,6 +115,9 @@ import org.keycloak.storage.managers.UserStorageSyncManager;
// keys.infinispan package // keys.infinispan package
PublicKeyStorageInvalidationEvent.class, PublicKeyStorageInvalidationEvent.class,
// models.cache.infinispan
ClearCacheEvent.class,
//models.cache.infinispan.authorization.events package //models.cache.infinispan.authorization.events package
PolicyUpdatedEvent.class, PolicyUpdatedEvent.class,
PolicyRemovedEvent.class, PolicyRemovedEvent.class,

View file

@ -143,8 +143,8 @@ public final class Marshalling {
public static final int SINGLE_USE_OBJECT_VALUE_ENTITY = 65601; public static final int SINGLE_USE_OBJECT_VALUE_ENTITY = 65601;
public static final int USER_SESSION_ENTITY = 65602; public static final int USER_SESSION_ENTITY = 65602;
public static final int CACHE_KEY_INVALIDATION_EVENT = 65603; public static final int CACHE_KEY_INVALIDATION_EVENT = 65603;
public static final int CLEAR_CACHE_EVENT = 65604;
public static void configure(GlobalConfigurationBuilder builder) { public static void configure(GlobalConfigurationBuilder builder) {
builder.serialization() builder.serialization()

View file

@ -207,12 +207,8 @@ public abstract class CacheManager {
public void sendInvalidationEvents(KeycloakSession session, Collection<InvalidationEvent> invalidationEvents, String eventKey) { public void sendInvalidationEvents(KeycloakSession session, Collection<InvalidationEvent> invalidationEvents, String eventKey) {
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class); session.getProvider(ClusterProvider.class)
.notify(eventKey, invalidationEvents, true, ClusterProvider.DCNotify.ALL_DCS);
// Maybe add InvalidationEvent, which will be collection of all invalidationEvents? That will reduce cluster traffic even more.
for (InvalidationEvent event : invalidationEvents) {
clusterProvider.notify(eventKey, event, true, ClusterProvider.DCNotify.ALL_DCS);
}
} }

View file

@ -16,13 +16,26 @@
*/ */
package org.keycloak.models.cache.infinispan; package org.keycloak.models.cache.infinispan;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoTypeId;
import org.keycloak.cluster.ClusterEvent; import org.keycloak.cluster.ClusterEvent;
import org.keycloak.marshalling.Marshalling;
/** /**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a> * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $ * @version $Revision: 1 $
*/ */
public class ClearCacheEvent implements ClusterEvent { @ProtoTypeId(Marshalling.CLEAR_CACHE_EVENT)
public final class ClearCacheEvent implements ClusterEvent {
private static final ClearCacheEvent INSTANCE = new ClearCacheEvent();
private ClearCacheEvent() {}
@ProtoFactory
public static ClearCacheEvent getInstance() {
return INSTANCE;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {

View file

@ -185,7 +185,7 @@ public class RealmCacheSession implements CacheRealmProvider {
@Override @Override
public void clear() { public void clear() {
ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
cluster.notify(InfinispanCacheRealmProviderFactory.REALM_CLEAR_CACHE_EVENTS, new ClearCacheEvent(), false, ClusterProvider.DCNotify.ALL_DCS); cluster.notify(InfinispanCacheRealmProviderFactory.REALM_CLEAR_CACHE_EVENTS, ClearCacheEvent.getInstance(), false, ClusterProvider.DCNotify.ALL_DCS);
} }
@Override @Override

View file

@ -112,7 +112,7 @@ public class UserCacheSession implements UserCache, OnCreateComponent, OnUpdateC
public void clear() { public void clear() {
cache.clear(); cache.clear();
ClusterProvider cluster = session.getProvider(ClusterProvider.class); ClusterProvider cluster = session.getProvider(ClusterProvider.class);
cluster.notify(InfinispanUserCacheProviderFactory.USER_CLEAR_CACHE_EVENTS, new ClearCacheEvent(), true, ClusterProvider.DCNotify.ALL_DCS); cluster.notify(InfinispanUserCacheProviderFactory.USER_CLEAR_CACHE_EVENTS, ClearCacheEvent.getInstance(), true, ClusterProvider.DCNotify.ALL_DCS);
} }
public UserProvider getDelegate() { public UserProvider getDelegate() {
@ -214,7 +214,7 @@ public class UserCacheSession implements UserCache, OnCreateComponent, OnUpdateC
if (cached != null && !cached.getRealm().equals(realm.getId())) { if (cached != null && !cached.getRealm().equals(realm.getId())) {
cached = null; cached = null;
} }
UserModel adapter = null; UserModel adapter = null;
if (cached == null) { if (cached == null) {
logger.trace("not cached"); logger.trace("not cached");
@ -344,7 +344,7 @@ public class UserCacheSession implements UserCache, OnCreateComponent, OnUpdateC
OrganizationProvider organizationProvider = session.getProvider(OrganizationProvider.class); OrganizationProvider organizationProvider = session.getProvider(OrganizationProvider.class);
OrganizationModel organization = organizationProvider.getByMember(delegate); OrganizationModel organization = organizationProvider.getByMember(delegate);
if ((organizationProvider.isEnabled() && organization != null && organization.isManaged(delegate) && !organization.isEnabled()) || if ((organizationProvider.isEnabled() && organization != null && organization.isManaged(delegate) && !organization.isEnabled()) ||
(!organizationProvider.isEnabled() && organization != null && organization.isManaged(delegate))) { (!organizationProvider.isEnabled() && organization != null && organization.isManaged(delegate))) {
return new ReadOnlyUserModelDelegate(delegate) { return new ReadOnlyUserModelDelegate(delegate) {
@Override @Override
@ -509,7 +509,7 @@ public class UserCacheSession implements UserCache, OnCreateComponent, OnUpdateC
@Override @Override
public Stream<UserModel> getRoleMembersStream(RealmModel realm, RoleModel role) { public Stream<UserModel> getRoleMembersStream(RealmModel realm, RoleModel role) {
return getDelegate().getRoleMembersStream(realm, role); return getDelegate().getRoleMembersStream(realm, role);
} }
@Override @Override
public UserModel getServiceAccount(ClientModel client) { public UserModel getServiceAccount(ClientModel client) {

View file

@ -17,9 +17,12 @@
package org.keycloak.models.sessions.infinispan.events; package org.keycloak.models.sessions.infinispan.events;
import java.util.LinkedList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterProvider; import org.keycloak.cluster.ClusterProvider;
import org.keycloak.models.AbstractKeycloakTransaction; import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakSession; import org.keycloak.models.KeycloakSession;
@ -33,41 +36,30 @@ public class SessionEventsSenderTransaction extends AbstractKeycloakTransaction
private final KeycloakSession session; private final KeycloakSession session;
private final List<DCEventContext> sessionEvents = new LinkedList<>(); private final Map<EventGroup, List<ClusterEvent>> sessionEvents = new HashMap<>();
public SessionEventsSenderTransaction(KeycloakSession session) { public SessionEventsSenderTransaction(KeycloakSession session) {
this.session = session; this.session = session;
} }
public void addEvent(SessionClusterEvent event, ClusterProvider.DCNotify dcNotify) { public void addEvent(SessionClusterEvent event, ClusterProvider.DCNotify dcNotify) {
sessionEvents.add(new DCEventContext(dcNotify, event)); var group = new EventGroup(event.getEventKey(), dcNotify);
sessionEvents.computeIfAbsent(group, eventGroup -> new ArrayList<>()).add(event);
} }
@Override @Override
protected void commitImpl() { protected void commitImpl() {
ClusterProvider cluster = session.getProvider(ClusterProvider.class); var cluster = session.getProvider(ClusterProvider.class);
for (var entry : sessionEvents.entrySet()) {
// TODO bulk notify (send whole list instead of separate events?) cluster.notify(entry.getKey().eventKey(), entry.getValue(), false, entry.getKey().dcNotify());
for (DCEventContext entry : sessionEvents) {
cluster.notify(entry.event.getEventKey(), entry.event, false, entry.dcNotify);
} }
} }
@Override @Override
protected void rollbackImpl() { protected void rollbackImpl() {
sessionEvents.clear();
} }
private record EventGroup(String eventKey, ClusterProvider.DCNotify dcNotify) {}
private static class DCEventContext {
private final ClusterProvider.DCNotify dcNotify;
private final SessionClusterEvent event;
DCEventContext(ClusterProvider.DCNotify dcNotify, SessionClusterEvent event) {
this.dcNotify = dcNotify;
this.event = event;
}
}
} }

View file

@ -17,10 +17,15 @@
package org.keycloak.cluster; package org.keycloak.cluster;
import java.io.Serializable; import java.util.function.Consumer;
/** /**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a> * @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/ */
public interface ClusterEvent extends Serializable { public interface ClusterEvent extends Consumer<ClusterListener> {
@Override
default void accept(ClusterListener listener) {
listener.eventReceived(this);
}
} }

View file

@ -18,11 +18,12 @@
package org.keycloak.cluster; package org.keycloak.cluster;
import org.keycloak.provider.Provider; import java.util.Collection;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.keycloak.provider.Provider;
/** /**
* Various utils related to clustering and concurrent tasks on cluster nodes * Various utils related to clustering and concurrent tasks on cluster nodes
* *
@ -80,6 +81,19 @@ public interface ClusterProvider extends Provider {
*/ */
void notify(String taskKey, ClusterEvent event, boolean ignoreSender, DCNotify dcNotify); void notify(String taskKey, ClusterEvent event, boolean ignoreSender, DCNotify dcNotify);
/**
* An alternative to {@link #notify(String, ClusterEvent, boolean, DCNotify)} that sends multiple events in a single
* network call.
* <p>
* Notifies registered listeners on all cluster nodes in all datacenters. It will notify listeners registered under
* given {@code taskKey}
*
* @see #notify(String, ClusterEvent, boolean, DCNotify)
*/
default void notify(String taskKey, Collection<? extends ClusterEvent> events, boolean ignoreSender, DCNotify dcNotify) {
events.forEach(event -> notify(taskKey, event, ignoreSender, dcNotify));
}
enum DCNotify { enum DCNotify {
/** Send message to all cluster nodes in all DCs **/ /** Send message to all cluster nodes in all DCs **/
ALL_DCS, ALL_DCS,