Merge pull request #4378 from mposolda/ispn-clientListeners-bugs
KEYCLOAK-4187 Fixes and tests related to sessions expiration and bulk…
This commit is contained in:
commit
b36f0d89bc
20 changed files with 637 additions and 147 deletions
|
@ -48,6 +48,8 @@ import org.keycloak.cluster.ClusterEvent;
|
|||
import org.keycloak.cluster.ClusterListener;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.common.util.ConcurrentMultivaluedHashMap;
|
||||
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
|
||||
|
||||
/**
|
||||
* Impl for sending infinispan messages across cluster and listening to them
|
||||
*
|
||||
|
@ -63,13 +65,16 @@ public class InfinispanNotificationsManager {
|
|||
|
||||
private final Cache<String, Serializable> workCache;
|
||||
|
||||
private final RemoteCache workRemoteCache;
|
||||
|
||||
private final String myAddress;
|
||||
|
||||
private final String mySite;
|
||||
|
||||
|
||||
protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, String myAddress, String mySite) {
|
||||
protected InfinispanNotificationsManager(Cache<String, Serializable> workCache, RemoteCache workRemoteCache, String myAddress, String mySite) {
|
||||
this.workCache = workCache;
|
||||
this.workRemoteCache = workRemoteCache;
|
||||
this.myAddress = myAddress;
|
||||
this.mySite = mySite;
|
||||
}
|
||||
|
@ -77,26 +82,29 @@ public class InfinispanNotificationsManager {
|
|||
|
||||
// Create and init manager including all listeners etc
|
||||
public static InfinispanNotificationsManager create(Cache<String, Serializable> workCache, String myAddress, String mySite, Set<RemoteStore> remoteStores) {
|
||||
InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, myAddress, mySite);
|
||||
RemoteCache workRemoteCache = null;
|
||||
|
||||
// We need CacheEntryListener just if we don't have remoteStore. With remoteStore will be all cluster nodes notified anyway from HotRod listener
|
||||
if (remoteStores.isEmpty()) {
|
||||
workCache.addListener(manager.new CacheEntryListener());
|
||||
|
||||
logger.debugf("Added listener for infinispan cache: %s", workCache.getName());
|
||||
} else {
|
||||
for (RemoteStore remoteStore : remoteStores) {
|
||||
RemoteCache<Object, Object> remoteCache = remoteStore.getRemoteCache();
|
||||
remoteCache.addClientListener(manager.new HotRodListener(remoteCache));
|
||||
|
||||
logger.debugf("Added listener for HotRod remoteStore cache: %s", remoteCache.getName());
|
||||
}
|
||||
if (!remoteStores.isEmpty()) {
|
||||
RemoteStore remoteStore = remoteStores.iterator().next();
|
||||
workRemoteCache = remoteStore.getRemoteCache();
|
||||
|
||||
if (mySite == null) {
|
||||
throw new IllegalStateException("Multiple datacenters available, but site name is not configured! Check your configuration");
|
||||
}
|
||||
}
|
||||
|
||||
InfinispanNotificationsManager manager = new InfinispanNotificationsManager(workCache, workRemoteCache, myAddress, mySite);
|
||||
|
||||
// We need CacheEntryListener for communication within current DC
|
||||
workCache.addListener(manager.new CacheEntryListener());
|
||||
logger.debugf("Added listener for infinispan cache: %s", workCache.getName());
|
||||
|
||||
// Added listener for remoteCache to notify other DCs
|
||||
if (workRemoteCache != null) {
|
||||
workRemoteCache.addClientListener(manager.new HotRodListener(workRemoteCache));
|
||||
logger.debugf("Added listener for HotRod remoteStore cache: %s", workRemoteCache.getName());
|
||||
}
|
||||
|
||||
return manager;
|
||||
}
|
||||
|
||||
|
@ -132,13 +140,15 @@ public class InfinispanNotificationsManager {
|
|||
logger.tracef("Sending event with key %s: %s", eventKey, event);
|
||||
}
|
||||
|
||||
Flag[] flags = dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY
|
||||
? new Flag[] { Flag.IGNORE_RETURN_VALUES, Flag.SKIP_CACHE_STORE }
|
||||
: new Flag[] { Flag.IGNORE_RETURN_VALUES };
|
||||
|
||||
// Put the value to the cache to notify listeners on all the nodes
|
||||
workCache.getAdvancedCache().withFlags(flags)
|
||||
if (dcNotify == ClusterProvider.DCNotify.LOCAL_DC_ONLY || workRemoteCache == null) {
|
||||
// Just put it to workCache, but skip notifying remoteCache
|
||||
workCache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_CACHE_STORE)
|
||||
.put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
|
||||
} else {
|
||||
// Add directly to remoteCache. Will notify remote listeners on all nodes in all DCs
|
||||
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(workCache);
|
||||
remoteCache.put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -219,7 +229,7 @@ public class InfinispanNotificationsManager {
|
|||
}
|
||||
|
||||
if (event.isIgnoreSenderSite()) {
|
||||
if (this.mySite != null && this.mySite.equals(event.getSender())) {
|
||||
if (this.mySite == null || this.mySite.equals(event.getSenderSite())) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,7 +135,10 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
|
|||
|
||||
@Override
|
||||
public void onRealmRemoved(RealmModel realm) {
|
||||
clusterEventsSenderTx.addEvent(InfinispanAuthenticationSessionProviderFactory.REALM_REMOVED_AUTHSESSION_EVENT, RealmRemovedSessionEvent.create(realm.getId()), true);
|
||||
// Send message to all DCs. The remoteCache will notify client listeners on all DCs for remove authentication sessions
|
||||
clusterEventsSenderTx.addEvent(
|
||||
RealmRemovedSessionEvent.createEvent(RealmRemovedSessionEvent.class, InfinispanAuthenticationSessionProviderFactory.REALM_REMOVED_AUTHSESSION_EVENT, session, realm.getId(), false),
|
||||
ClusterProvider.DCNotify.ALL_DCS);
|
||||
}
|
||||
|
||||
protected void onRealmRemovedEvent(String realmId) {
|
||||
|
@ -154,7 +157,10 @@ public class InfinispanAuthenticationSessionProvider implements AuthenticationSe
|
|||
|
||||
@Override
|
||||
public void onClientRemoved(RealmModel realm, ClientModel client) {
|
||||
clusterEventsSenderTx.addEvent(InfinispanAuthenticationSessionProviderFactory.CLIENT_REMOVED_AUTHSESSION_EVENT, ClientRemovedSessionEvent.create(realm.getId(), client.getId()), true);
|
||||
// Send message to all DCs. The remoteCache will notify client listeners on all DCs for remove authentication sessions of this client
|
||||
clusterEventsSenderTx.addEvent(
|
||||
ClientRemovedSessionEvent.create(session, InfinispanAuthenticationSessionProviderFactory.CLIENT_REMOVED_AUTHSESSION_EVENT, realm.getId(), false, client.getId()),
|
||||
ClusterProvider.DCNotify.ALL_DCS);
|
||||
}
|
||||
|
||||
protected void onClientRemovedEvent(String realmId, String clientUuid) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.infinispan.Cache;
|
|||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.AuthenticatedClientSessionModel;
|
||||
|
@ -463,8 +464,10 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
|
||||
@Override
|
||||
public void removeUserSessions(RealmModel realm) {
|
||||
// Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions. This assumes that 2nd DC contains same userSessions like current one.
|
||||
clusterEventsSenderTx.addEvent(InfinispanUserSessionProviderFactory.REMOVE_USER_SESSIONS_EVENT, RemoveUserSessionsEvent.create(realm.getId()), false);
|
||||
// Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions.
|
||||
clusterEventsSenderTx.addEvent(
|
||||
RemoveUserSessionsEvent.createEvent(RemoveUserSessionsEvent.class, InfinispanUserSessionProviderFactory.REMOVE_USER_SESSIONS_EVENT, session, realm.getId(), true),
|
||||
ClusterProvider.DCNotify.LOCAL_DC_ONLY);
|
||||
}
|
||||
|
||||
protected void onRemoveUserSessionsEvent(String realmId) {
|
||||
|
@ -486,6 +489,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
|
||||
@Override
|
||||
public void accept(String sessionId) {
|
||||
// Remove session from remoteCache too
|
||||
localCache.remove(sessionId);
|
||||
}
|
||||
|
||||
|
@ -515,7 +519,9 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
|
||||
@Override
|
||||
public void removeAllUserLoginFailures(RealmModel realm) {
|
||||
clusterEventsSenderTx.addEvent(InfinispanUserSessionProviderFactory.REMOVE_ALL_LOGIN_FAILURES_EVENT, RemoveAllUserLoginFailuresEvent.create(realm.getId()), false);
|
||||
clusterEventsSenderTx.addEvent(
|
||||
RemoveAllUserLoginFailuresEvent.createEvent(RemoveAllUserLoginFailuresEvent.class, InfinispanUserSessionProviderFactory.REMOVE_ALL_LOGIN_FAILURES_EVENT, session, realm.getId(), true),
|
||||
ClusterProvider.DCNotify.LOCAL_DC_ONLY);
|
||||
}
|
||||
|
||||
protected void onRemoveAllUserLoginFailuresEvent(String realmId) {
|
||||
|
@ -527,22 +533,23 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
|
||||
Cache<LoginFailureKey, LoginFailureEntity> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
|
||||
|
||||
Iterator<LoginFailureKey> itr = localCacheStoreIgnore
|
||||
localCacheStoreIgnore
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(UserLoginFailurePredicate.create(realmId))
|
||||
.map(Mappers.loginFailureId())
|
||||
.iterator();
|
||||
|
||||
while (itr.hasNext()) {
|
||||
LoginFailureKey key = itr.next();
|
||||
localCache.remove(key);
|
||||
}
|
||||
.forEach(loginFailureKey -> {
|
||||
// Remove loginFailure from remoteCache too
|
||||
localCache.remove(loginFailureKey);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRealmRemoved(RealmModel realm) {
|
||||
clusterEventsSenderTx.addEvent(InfinispanUserSessionProviderFactory.REALM_REMOVED_SESSION_EVENT, RealmRemovedSessionEvent.create(realm.getId()), false);
|
||||
// Don't send message to all DCs, just to all cluster nodes in current DC. The remoteCache will notify client listeners for removed userSessions.
|
||||
clusterEventsSenderTx.addEvent(
|
||||
RealmRemovedSessionEvent.createEvent(RealmRemovedSessionEvent.class, InfinispanUserSessionProviderFactory.REALM_REMOVED_SESSION_EVENT, session, realm.getId(), true),
|
||||
ClusterProvider.DCNotify.LOCAL_DC_ONLY);
|
||||
}
|
||||
|
||||
protected void onRealmRemovedEvent(String realmId) {
|
||||
|
@ -553,7 +560,9 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
|
|||
|
||||
@Override
|
||||
public void onClientRemoved(RealmModel realm, ClientModel client) {
|
||||
clusterEventsSenderTx.addEvent(InfinispanUserSessionProviderFactory.CLIENT_REMOVED_SESSION_EVENT, ClientRemovedSessionEvent.create(realm.getId(), client.getId()), false);
|
||||
// clusterEventsSenderTx.addEvent(
|
||||
// ClientRemovedSessionEvent.createEvent(ClientRemovedSessionEvent.class, InfinispanUserSessionProviderFactory.CLIENT_REMOVED_SESSION_EVENT, session, realm.getId(), true),
|
||||
// ClusterProvider.DCNotify.LOCAL_DC_ONLY);
|
||||
}
|
||||
|
||||
protected void onClientRemovedEvent(String realmId, String clientUuid) {
|
||||
|
|
|
@ -20,12 +20,14 @@ package org.keycloak.models.sessions.infinispan.events;
|
|||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.cluster.ClusterListener;
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserSessionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.InfinispanUserSessionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.InfinispanUserSessionProviderFactory;
|
||||
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
||||
/**
|
||||
|
@ -48,17 +50,33 @@ public abstract class AbstractUserSessionClusterListener<SE extends SessionClust
|
|||
InfinispanUserSessionProvider provider = (InfinispanUserSessionProvider) session.getProvider(UserSessionProvider.class, InfinispanUserSessionProviderFactory.PROVIDER_ID);
|
||||
SE sessionEvent = (SE) event;
|
||||
|
||||
String realmId = sessionEvent.getRealmId();
|
||||
boolean shouldResendEvent = shouldResendEvent(session, sessionEvent);
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debugf("Received user session event '%s'", sessionEvent.toString());
|
||||
log.debugf("Received user session event '%s'. Should resend event: %b", sessionEvent.toString(), shouldResendEvent);
|
||||
}
|
||||
|
||||
eventReceived(session, provider, sessionEvent);
|
||||
|
||||
if (shouldResendEvent) {
|
||||
session.getProvider(ClusterProvider.class).notify(sessionEvent.getEventKey(), event, true, ClusterProvider.DCNotify.ALL_BUT_LOCAL_DC);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
protected abstract void eventReceived(KeycloakSession session, InfinispanUserSessionProvider provider, SE sessionEvent);
|
||||
|
||||
|
||||
private boolean shouldResendEvent(KeycloakSession session, SessionClusterEvent event) {
|
||||
if (!event.isResendingEvent()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Just the initiator will re-send the event after receiving it
|
||||
String myNode = InfinispanUtil.getMyAddress(session);
|
||||
String mySite = InfinispanUtil.getMySite(session);
|
||||
return (event.getNodeId() != null && event.getNodeId().equals(myNode) && event.getSiteId() != null && event.getSiteId().equals(mySite));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,31 +17,24 @@
|
|||
|
||||
package org.keycloak.models.sessions.infinispan.events;
|
||||
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ClientRemovedSessionEvent implements SessionClusterEvent {
|
||||
public class ClientRemovedSessionEvent extends SessionClusterEvent {
|
||||
|
||||
private String realmId;
|
||||
private String clientUuid;
|
||||
|
||||
public static ClientRemovedSessionEvent create(String realmId, String clientUuid) {
|
||||
ClientRemovedSessionEvent event = new ClientRemovedSessionEvent();
|
||||
event.realmId = realmId;
|
||||
public static ClientRemovedSessionEvent create(KeycloakSession session, String eventKey, String realmId, boolean resendingEvent, String clientUuid) {
|
||||
ClientRemovedSessionEvent event = ClientRemovedSessionEvent.createEvent(ClientRemovedSessionEvent.class, eventKey, session, realmId, resendingEvent);
|
||||
event.clientUuid = clientUuid;
|
||||
return event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("ClientRemovedSessionEvent [ realmId=%s , clientUuid=%s ]", realmId, clientUuid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRealmId() {
|
||||
return realmId;
|
||||
return String.format("ClientRemovedSessionEvent [ realmId=%s , clientUuid=%s ]", getRealmId(), clientUuid);
|
||||
}
|
||||
|
||||
public String getClientUuid() {
|
||||
|
|
|
@ -20,23 +20,5 @@ package org.keycloak.models.sessions.infinispan.events;
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class RealmRemovedSessionEvent implements SessionClusterEvent {
|
||||
|
||||
private String realmId;
|
||||
|
||||
public static RealmRemovedSessionEvent create(String realmId) {
|
||||
RealmRemovedSessionEvent event = new RealmRemovedSessionEvent();
|
||||
event.realmId = realmId;
|
||||
return event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("RealmRemovedSessionEvent [ realmId=%s ]", realmId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRealmId() {
|
||||
return realmId;
|
||||
}
|
||||
public class RealmRemovedSessionEvent extends SessionClusterEvent {
|
||||
}
|
||||
|
|
|
@ -20,23 +20,5 @@ package org.keycloak.models.sessions.infinispan.events;
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class RemoveAllUserLoginFailuresEvent implements SessionClusterEvent {
|
||||
|
||||
private String realmId;
|
||||
|
||||
public static RemoveAllUserLoginFailuresEvent create(String realmId) {
|
||||
RemoveAllUserLoginFailuresEvent event = new RemoveAllUserLoginFailuresEvent();
|
||||
event.realmId = realmId;
|
||||
return event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("RemoveAllUserLoginFailuresEvent [ realmId=%s ]", realmId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRealmId() {
|
||||
return realmId;
|
||||
}
|
||||
public class RemoveAllUserLoginFailuresEvent extends SessionClusterEvent {
|
||||
}
|
||||
|
|
|
@ -20,23 +20,5 @@ package org.keycloak.models.sessions.infinispan.events;
|
|||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class RemoveUserSessionsEvent implements SessionClusterEvent {
|
||||
|
||||
private String realmId;
|
||||
|
||||
public static RemoveUserSessionsEvent create(String realmId) {
|
||||
RemoveUserSessionsEvent event = new RemoveUserSessionsEvent();
|
||||
event.realmId = realmId;
|
||||
return event;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("RemoveUserSessionsEvent [ realmId=%s ]", realmId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRealmId() {
|
||||
return realmId;
|
||||
}
|
||||
public class RemoveUserSessionsEvent extends SessionClusterEvent {
|
||||
}
|
||||
|
|
|
@ -18,12 +18,64 @@
|
|||
package org.keycloak.models.sessions.infinispan.events;
|
||||
|
||||
import org.keycloak.cluster.ClusterEvent;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public interface SessionClusterEvent extends ClusterEvent {
|
||||
public abstract class SessionClusterEvent implements ClusterEvent {
|
||||
|
||||
String getRealmId();
|
||||
private String realmId;
|
||||
private String eventKey;
|
||||
private boolean resendingEvent;
|
||||
private String siteId;
|
||||
private String nodeId;
|
||||
|
||||
|
||||
public static <T extends SessionClusterEvent> T createEvent(Class<T> eventClass, String eventKey, KeycloakSession session, String realmId, boolean resendingEvent) {
|
||||
try {
|
||||
T event = eventClass.newInstance();
|
||||
event.setData(session, eventKey, realmId, resendingEvent);
|
||||
return event;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void setData(KeycloakSession session, String eventKey, String realmId, boolean resendingEvent) {
|
||||
this.realmId = realmId;
|
||||
this.eventKey = eventKey;
|
||||
this.resendingEvent = resendingEvent;
|
||||
this.siteId = InfinispanUtil.getMySite(session);
|
||||
this.nodeId = InfinispanUtil.getMyAddress(session);
|
||||
}
|
||||
|
||||
|
||||
public String getRealmId() {
|
||||
return realmId;
|
||||
}
|
||||
|
||||
public String getEventKey() {
|
||||
return eventKey;
|
||||
}
|
||||
|
||||
public boolean isResendingEvent() {
|
||||
return resendingEvent;
|
||||
}
|
||||
|
||||
public String getSiteId() {
|
||||
return siteId;
|
||||
}
|
||||
|
||||
public String getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String simpleClassName = getClass().getSimpleName();
|
||||
return String.format("%s [ realmId=%s ]", simpleClassName, realmId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,10 @@
|
|||
|
||||
package org.keycloak.models.sessions.infinispan.events;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.keycloak.cluster.ClusterProvider;
|
||||
import org.keycloak.common.util.MultivaluedHashMap;
|
||||
import org.keycloak.models.AbstractKeycloakTransaction;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
|
||||
|
@ -34,41 +33,41 @@ public class SessionEventsSenderTransaction extends AbstractKeycloakTransaction
|
|||
|
||||
private final KeycloakSession session;
|
||||
|
||||
private final MultivaluedHashMap<String, SessionClusterEvent> sessionEvents = new MultivaluedHashMap<>();
|
||||
private final MultivaluedHashMap<String, SessionClusterEvent> localDCSessionEvents = new MultivaluedHashMap<>();
|
||||
private final List<DCEventContext> sessionEvents = new LinkedList<>();
|
||||
|
||||
public SessionEventsSenderTransaction(KeycloakSession session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public void addEvent(String eventName, SessionClusterEvent event, boolean sendToAllDCs) {
|
||||
if (sendToAllDCs) {
|
||||
sessionEvents.add(eventName, event);
|
||||
} else {
|
||||
localDCSessionEvents.add(eventName, event);
|
||||
}
|
||||
public void addEvent(SessionClusterEvent event, ClusterProvider.DCNotify dcNotify) {
|
||||
sessionEvents.add(new DCEventContext(dcNotify, event));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void commitImpl() {
|
||||
ClusterProvider cluster = session.getProvider(ClusterProvider.class);
|
||||
|
||||
// TODO bulk notify (send whole list instead of separate events?)
|
||||
for (Map.Entry<String, List<SessionClusterEvent>> entry : sessionEvents.entrySet()) {
|
||||
for (SessionClusterEvent event : entry.getValue()) {
|
||||
cluster.notify(entry.getKey(), event, false, ClusterProvider.DCNotify.ALL_DCS);
|
||||
for (DCEventContext entry : sessionEvents) {
|
||||
cluster.notify(entry.event.getEventKey(), entry.event, false, entry.dcNotify);
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, List<SessionClusterEvent>> entry : localDCSessionEvents.entrySet()) {
|
||||
for (SessionClusterEvent event : entry.getValue()) {
|
||||
cluster.notify(entry.getKey(), event, false, ClusterProvider.DCNotify.LOCAL_DC_ONLY);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void rollbackImpl() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
private class DCEventContext {
|
||||
private final ClusterProvider.DCNotify dcNotify;
|
||||
private final SessionClusterEvent event;
|
||||
|
||||
DCEventContext(ClusterProvider.DCNotify dcNotify, SessionClusterEvent event) {
|
||||
this.dcNotify = dcNotify;
|
||||
this.event = event;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan.remotestore;
|
|||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.infinispan.client.hotrod.Flag;
|
||||
import org.infinispan.commons.configuration.ConfiguredBy;
|
||||
import org.infinispan.filter.KeyFilter;
|
||||
import org.infinispan.marshall.core.MarshalledEntry;
|
||||
|
@ -70,10 +71,11 @@ public class KcRemoteStore extends RemoteStore {
|
|||
}
|
||||
|
||||
|
||||
// Don't do anything. Iterate over remoteCache.keySet() can have big performance impact. We handle bulk load by ourselves if needed.
|
||||
@Override
|
||||
public void process(KeyFilter filter, CacheLoaderTask task, Executor executor, boolean fetchValue, boolean fetchMetadata) {
|
||||
logger.infof("Calling process with filter '%s' on cache '%s'", filter, cacheName);
|
||||
super.process(filter, task, executor, fetchValue, fetchMetadata);
|
||||
logger.debugf("Skip calling process with filter '%s' on cache '%s'", filter, cacheName);
|
||||
// super.process(filter, task, executor, fetchValue, fetchMetadata);
|
||||
}
|
||||
|
||||
|
||||
|
@ -88,6 +90,7 @@ public class KcRemoteStore extends RemoteStore {
|
|||
logger.debugf("Calling delete for key '%s' on cache '%s'", key, cacheName);
|
||||
|
||||
// Optimization - we don't need to know the previous value.
|
||||
// TODO: For some usecases (bulk removal of user sessions), it may be better for performance to call removeAsync and wait for all futures to be finished
|
||||
getRemoteCache().remove(key);
|
||||
|
||||
return true;
|
||||
|
|
|
@ -500,16 +500,19 @@ development as there is no need to restart infinispan server(s) among test runs.
|
|||
|
||||
4) Setup MySQL database or some other shared database.
|
||||
|
||||
4) Run the LoginCrossDCTest (or any other test) with those properties. In shortcut, it's using MySQL database, disabled L1 lifespan and
|
||||
5) Ensure that org.wildfly.arquillian:wildfly-arquillian-container-managed is on the classpath when running test. On Intellij, it can be
|
||||
done by going to: View -> Tool Windows -> Maven projects. Then check profile "cache-server-infinispan". The tests will use this profile when executed.
|
||||
|
||||
6) Run the LoginCrossDCTest (or any other test) with those properties. In shortcut, it's using MySQL database, disabled L1 lifespan and
|
||||
connects to the remoteStore provided by infinispan server configured in previous steps:
|
||||
|
||||
-Dauth.server.crossdc=true -Dauth.server.undertow.crossdc=true -Dkeycloak.connectionsJpa.url.crossdc=jdbc:mysql://localhost/keycloak
|
||||
-Dauth.server.crossdc=true -Dauth.server.undertow.crossdc=true -Dcache.server.lifecycle.skip=true -Dkeycloak.connectionsJpa.url.crossdc=jdbc:mysql://localhost/keycloak
|
||||
-Dkeycloak.connectionsJpa.driver.crossdc=com.mysql.jdbc.Driver -Dkeycloak.connectionsJpa.user=keycloak
|
||||
-Dkeycloak.connectionsJpa.password=keycloak -Dkeycloak.connectionsInfinispan.clustered=true -Dkeycloak.connectionsInfinispan.l1Lifespan=0
|
||||
-Dkeycloak.connectionsInfinispan.remoteStorePort=11222 -Dkeycloak.connectionsInfinispan.remoteStorePort.2=11222 -Dkeycloak.connectionsInfinispan.sessionsOwners=1
|
||||
-Dsession.cache.owners=1 -Dkeycloak.infinispan.logging.level=debug -Dresources
|
||||
|
||||
5) If you want to debug and test manually, the servers are running on these ports (Note that not all backend servers are running by default and some might be also unused by loadbalancer):
|
||||
7) If you want to debug and test manually, the servers are running on these ports (Note that not all backend servers are running by default and some might be also unused by loadbalancer):
|
||||
|
||||
Loadbalancer -> "http://localhost:8180/auth"
|
||||
auth-server-undertow-cross-dc-0_1 -> "http://localhost:8101/auth"
|
||||
|
@ -517,7 +520,6 @@ connects to the remoteStore provided by infinispan server configured in previous
|
|||
auth-server-undertow-cross-dc-1_1 -> "http://localhost:8111/auth"
|
||||
auth-server-undertow-cross-dc-1_2-manual -> "http://localhost:8112/auth"
|
||||
|
||||
TODO: Tests using JMX statistics like ActionTokenCrossDCTest doesn't yet working.
|
||||
|
||||
## Run Docker Authentication test
|
||||
|
||||
|
|
|
@ -229,7 +229,10 @@ public class CacheStatisticsControllerEnricher implements TestEnricher {
|
|||
: annotation.managementPort();
|
||||
}
|
||||
|
||||
JMXServiceURL url = new JMXServiceURL("service:jmx:remote+http://" + host + ":" + port);
|
||||
String jmxUrl = "service:jmx:remote+http://" + host + ":" + port;
|
||||
LOG.infof("JMX Service URL: %s", jmxUrl);
|
||||
|
||||
JMXServiceURL url = new JMXServiceURL(jmxUrl);
|
||||
JMXConnector jmxc = jmxConnectorRegistry.get().getConnection(url);
|
||||
|
||||
return jmxc.getMBeanServerConnection();
|
||||
|
|
|
@ -135,6 +135,15 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
|
|||
return KeycloakTestingClient.getInstance(node.getContextRoot() + "/auth");
|
||||
}
|
||||
|
||||
|
||||
protected Keycloak getAdminClientForStartedNodeInDc(int dcIndex) {
|
||||
ContainerInfo firstStartedNode = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).stream()
|
||||
.filter(ContainerInfo::isStarted)
|
||||
.findFirst().get();
|
||||
|
||||
return getAdminClientFor(firstStartedNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get admin client directed to the given node.
|
||||
* @param node
|
||||
|
@ -148,6 +157,16 @@ public abstract class AbstractCrossDCTest extends AbstractTestRealmKeycloakTest
|
|||
return client;
|
||||
}
|
||||
|
||||
|
||||
protected KeycloakTestingClient getTestingClientForStartedNodeInDc(int dcIndex) {
|
||||
ContainerInfo firstStartedNode = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).stream()
|
||||
.filter(ContainerInfo::isStarted)
|
||||
.findFirst().get();
|
||||
|
||||
return getTestingClientFor(firstStartedNode);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get testing client directed to the given node.
|
||||
* @param node
|
||||
|
|
|
@ -230,15 +230,6 @@ public class LastSessionRefreshCrossDCTest extends AbstractAdminCrossDCTest {
|
|||
}
|
||||
|
||||
|
||||
private KeycloakTestingClient getTestingClientForStartedNodeInDc(int dcIndex) {
|
||||
ContainerInfo firstStartedNode = this.suiteContext.getDcAuthServerBackendsInfo().get(dcIndex).stream()
|
||||
.filter(ContainerInfo::isStarted)
|
||||
.findFirst().get();
|
||||
|
||||
return getTestingClientFor(firstStartedNode);
|
||||
}
|
||||
|
||||
|
||||
private RemoteCacheStats getRemoteCacheStats(int dcIndex) {
|
||||
return getTestingClientForStartedNodeInDc(dcIndex).testing("test")
|
||||
.cache(InfinispanConnectionProvider.SESSION_CACHE_NAME)
|
||||
|
|
|
@ -0,0 +1,396 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.testsuite.crossdc;
|
||||
|
||||
|
||||
import javax.ws.rs.NotFoundException;
|
||||
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.keycloak.OAuth2Constants;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.Constants;
|
||||
import org.keycloak.representations.idm.ClientRepresentation;
|
||||
import org.keycloak.representations.idm.RealmRepresentation;
|
||||
import org.keycloak.representations.idm.UserRepresentation;
|
||||
import org.keycloak.testsuite.Assert;
|
||||
import org.keycloak.testsuite.Retry;
|
||||
import org.keycloak.testsuite.admin.ApiUtil;
|
||||
import org.keycloak.testsuite.arquillian.InfinispanStatistics;
|
||||
import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanCacheStatistics;
|
||||
import org.keycloak.testsuite.arquillian.annotation.JmxInfinispanChannelStatistics;
|
||||
import org.keycloak.testsuite.util.ClientBuilder;
|
||||
import org.keycloak.testsuite.util.OAuthClient;
|
||||
import org.keycloak.testsuite.util.RealmBuilder;
|
||||
import org.keycloak.testsuite.util.UserBuilder;
|
||||
|
||||
/**
|
||||
* Tests the bulk removal of user sessions and expiration scenarios (eg. removing realm, removing user etc)
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class SessionExpirationCrossDCTest extends AbstractAdminCrossDCTest {
|
||||
|
||||
private static final String REALM_NAME = "expiration-test";
|
||||
|
||||
private static final int SESSIONS_COUNT = 20;
|
||||
|
||||
private int sessions01;
|
||||
private int sessions02;
|
||||
private int remoteSessions01;
|
||||
private int remoteSessions02;
|
||||
|
||||
private int authSessions01;
|
||||
private int authSessions02;
|
||||
|
||||
|
||||
@Before
|
||||
public void beforeTest() {
|
||||
try {
|
||||
adminClient.realm(REALM_NAME).remove();
|
||||
} catch (NotFoundException ignore) {
|
||||
}
|
||||
|
||||
UserRepresentation user = UserBuilder.create()
|
||||
.id("login-test")
|
||||
.username("login-test")
|
||||
.email("login@test.com")
|
||||
.enabled(true)
|
||||
.password("password")
|
||||
.addRoles(Constants.OFFLINE_ACCESS_ROLE)
|
||||
.build();
|
||||
|
||||
ClientRepresentation client = ClientBuilder.create()
|
||||
.clientId("test-app")
|
||||
.directAccessGrants()
|
||||
.redirectUris("http://localhost:8180/auth/realms/master/app/*")
|
||||
.addWebOrigin("http://localhost:8180")
|
||||
.secret("password")
|
||||
.build();
|
||||
|
||||
RealmRepresentation realmRep = RealmBuilder.create()
|
||||
.name(REALM_NAME)
|
||||
.user(user)
|
||||
.client(client)
|
||||
.build();
|
||||
|
||||
adminClient.realms().create(realmRep);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRealmRemoveSessions(
|
||||
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.SESSION_CACHE_NAME) InfinispanStatistics cacheDc1Statistics,
|
||||
@JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.SESSION_CACHE_NAME) InfinispanStatistics cacheDc2Statistics,
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
createInitialSessions(InfinispanConnectionProvider.SESSION_CACHE_NAME, false, cacheDc1Statistics, cacheDc2Statistics);
|
||||
|
||||
// log.infof("Sleeping!");
|
||||
// Thread.sleep(10000000);
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove test realm
|
||||
getAdminClient().realm(REALM_NAME).remove();
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big.
|
||||
assertStatisticsExpected("After realm remove", InfinispanConnectionProvider.SESSION_CACHE_NAME, cacheDc1Statistics, cacheDc2Statistics, channelStatisticsCrossDc,
|
||||
sessions01, sessions02, remoteSessions01, remoteSessions02, 40l);
|
||||
}
|
||||
|
||||
|
||||
// Return last used accessTokenResponse
|
||||
private OAuthClient.AccessTokenResponse createInitialSessions(String cacheName, boolean offline, InfinispanStatistics cacheDc1Statistics, InfinispanStatistics cacheDc2Statistics) throws Exception {
|
||||
|
||||
// Enable second DC
|
||||
enableDcOnLoadBalancer(DC.SECOND);
|
||||
|
||||
// Check sessions count before test
|
||||
sessions01 = getTestingClientForStartedNodeInDc(0).testing().cache(cacheName).size();
|
||||
sessions02 = getTestingClientForStartedNodeInDc(1).testing().cache(cacheName).size();
|
||||
remoteSessions01 = (Integer) cacheDc1Statistics.getSingleStatistics(InfinispanStatistics.Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
|
||||
remoteSessions02 = (Integer) cacheDc2Statistics.getSingleStatistics(InfinispanStatistics.Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
|
||||
log.infof("Before creating sessions: sessions01: %d, sessions02: %d, remoteSessions01: %d, remoteSessions02: %d", sessions01, sessions02, remoteSessions01, remoteSessions02);
|
||||
|
||||
// Create 20 user sessions
|
||||
oauth.realm(REALM_NAME);
|
||||
|
||||
if (offline) {
|
||||
oauth.scope(OAuth2Constants.OFFLINE_ACCESS);
|
||||
}
|
||||
|
||||
OAuthClient.AccessTokenResponse lastAccessTokenResponse = null;
|
||||
for (int i=0 ; i<SESSIONS_COUNT ; i++) {
|
||||
lastAccessTokenResponse = oauth.doGrantAccessTokenRequest("password", "login-test", "password");
|
||||
}
|
||||
|
||||
// Assert 20 sessions exists on node1 and node2 and on remote caches
|
||||
Retry.execute(() -> {
|
||||
int sessions11 = getTestingClientForStartedNodeInDc(0).testing().cache(cacheName).size();
|
||||
int sessions12 = getTestingClientForStartedNodeInDc(1).testing().cache(cacheName).size();
|
||||
int remoteSessions11 = (Integer) cacheDc1Statistics.getSingleStatistics(InfinispanStatistics.Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
|
||||
int remoteSessions12 = (Integer) cacheDc2Statistics.getSingleStatistics(InfinispanStatistics.Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
|
||||
log.infof("After creating sessions: sessions11: %d, sessions12: %d, remoteSessions11: %d, remoteSessions12: %d", sessions11, sessions12, remoteSessions11, remoteSessions12);
|
||||
|
||||
Assert.assertEquals(sessions11, sessions01 + SESSIONS_COUNT);
|
||||
Assert.assertEquals(sessions12, sessions02 + SESSIONS_COUNT);
|
||||
Assert.assertEquals(remoteSessions11, remoteSessions01 + SESSIONS_COUNT);
|
||||
Assert.assertEquals(remoteSessions12, remoteSessions02 + SESSIONS_COUNT);
|
||||
}, 50, 50);
|
||||
|
||||
return lastAccessTokenResponse;
|
||||
}
|
||||
|
||||
|
||||
private void assertStatisticsExpected(String messagePrefix, String cacheName, InfinispanStatistics cacheDc1Statistics, InfinispanStatistics cacheDc2Statistics, InfinispanStatistics channelStatisticsCrossDc,
|
||||
int sessions1Expected, int sessions2Expected, int remoteSessions1Expected, int remoteSessions2Expected, long sentMessagesHigherBound) {
|
||||
Retry.execute(() -> {
|
||||
int sessions1 = getTestingClientForStartedNodeInDc(0).testing().cache(cacheName).size();
|
||||
int sessions2 = getTestingClientForStartedNodeInDc(1).testing().cache(cacheName).size();
|
||||
int remoteSessions1 = (Integer) cacheDc1Statistics.getSingleStatistics(InfinispanStatistics.Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
|
||||
int remoteSessions2 = (Integer) cacheDc2Statistics.getSingleStatistics(InfinispanStatistics.Constants.STAT_CACHE_NUMBER_OF_ENTRIES);
|
||||
long messagesCount = (Long) channelStatisticsCrossDc.getSingleStatistics(InfinispanStatistics.Constants.STAT_CHANNEL_SENT_MESSAGES);
|
||||
log.infof(messagePrefix + ": sessions1: %d, sessions2: %d, remoteSessions1: %d, remoteSessions2: %d, sentMessages: %d", sessions1, sessions2, remoteSessions1, remoteSessions2, messagesCount);
|
||||
|
||||
Assert.assertEquals(sessions1, sessions1Expected);
|
||||
Assert.assertEquals(sessions2, sessions2Expected);
|
||||
Assert.assertEquals(remoteSessions1, remoteSessions1Expected);
|
||||
Assert.assertEquals(remoteSessions2, remoteSessions2Expected);
|
||||
|
||||
// Workaround...
|
||||
if (sentMessagesHigherBound > 5) {
|
||||
Assert.assertThat(messagesCount, Matchers.greaterThan(0l));
|
||||
}
|
||||
|
||||
Assert.assertThat(messagesCount, Matchers.lessThan(sentMessagesHigherBound));
|
||||
}, 50, 50);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRealmRemoveOfflineSessions(
|
||||
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME) InfinispanStatistics cacheDc1Statistics,
|
||||
@JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME) InfinispanStatistics cacheDc2Statistics,
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
|
||||
createInitialSessions(InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME, true, cacheDc1Statistics, cacheDc2Statistics);
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove test realm
|
||||
getAdminClient().realm(REALM_NAME).remove();
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big.
|
||||
assertStatisticsExpected("After realm remove", InfinispanConnectionProvider.OFFLINE_SESSION_CACHE_NAME, cacheDc1Statistics, cacheDc2Statistics, channelStatisticsCrossDc,
|
||||
sessions01, sessions02, remoteSessions01, remoteSessions02, 70l); // Might be bigger messages as online sessions removed too.
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLogoutAllInRealm(
|
||||
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.SESSION_CACHE_NAME) InfinispanStatistics cacheDc1Statistics,
|
||||
@JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.SESSION_CACHE_NAME) InfinispanStatistics cacheDc2Statistics,
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
|
||||
createInitialSessions(InfinispanConnectionProvider.SESSION_CACHE_NAME, false, cacheDc1Statistics, cacheDc2Statistics);
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Logout all in realm
|
||||
getAdminClient().realm(REALM_NAME).logoutAll();
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big.
|
||||
assertStatisticsExpected("After realm logout", InfinispanConnectionProvider.SESSION_CACHE_NAME, cacheDc1Statistics, cacheDc2Statistics, channelStatisticsCrossDc,
|
||||
sessions01, sessions02, remoteSessions01, remoteSessions02, 40l);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testPeriodicExpiration(
|
||||
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.SESSION_CACHE_NAME) InfinispanStatistics cacheDc1Statistics,
|
||||
@JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.SESSION_CACHE_NAME) InfinispanStatistics cacheDc2Statistics,
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
|
||||
OAuthClient.AccessTokenResponse lastAccessTokenResponse = createInitialSessions(InfinispanConnectionProvider.SESSION_CACHE_NAME, false, cacheDc1Statistics, cacheDc2Statistics);
|
||||
|
||||
// Assert I am able to refresh
|
||||
OAuthClient.AccessTokenResponse refreshResponse = oauth.doRefreshTokenRequest(lastAccessTokenResponse.getRefreshToken(), "password");
|
||||
Assert.assertNotNull(refreshResponse.getRefreshToken());
|
||||
Assert.assertNull(refreshResponse.getError());
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove expired in DC0
|
||||
getTestingClientForStartedNodeInDc(0).testing().removeExpired(REALM_NAME);
|
||||
|
||||
// Nothing yet expired. Limit 5 for sent_messages is just if "lastSessionRefresh" periodic thread happened
|
||||
assertStatisticsExpected("After remove expired - 1", InfinispanConnectionProvider.SESSION_CACHE_NAME, cacheDc1Statistics, cacheDc2Statistics, channelStatisticsCrossDc,
|
||||
sessions01 + SESSIONS_COUNT, sessions02 + SESSIONS_COUNT, remoteSessions01 + SESSIONS_COUNT, remoteSessions02 + SESSIONS_COUNT, 5l);
|
||||
|
||||
|
||||
// Set time offset
|
||||
setTimeOffset(10000000);
|
||||
|
||||
// Assert I am not able to refresh anymore
|
||||
refreshResponse = oauth.doRefreshTokenRequest(lastAccessTokenResponse.getRefreshToken(), "password");
|
||||
Assert.assertNull(refreshResponse.getRefreshToken());
|
||||
Assert.assertNotNull(refreshResponse.getError());
|
||||
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove expired in DC0
|
||||
getTestingClientForStartedNodeInDc(0).testing().removeExpired(REALM_NAME);
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big.
|
||||
assertStatisticsExpected("After remove expired - 2", InfinispanConnectionProvider.SESSION_CACHE_NAME, cacheDc1Statistics, cacheDc2Statistics, channelStatisticsCrossDc,
|
||||
sessions01, sessions02, remoteSessions01, remoteSessions02, 40l);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// AUTH SESSIONS
|
||||
|
||||
@Test
|
||||
public void testPeriodicExpirationAuthSessions(
|
||||
@JmxInfinispanCacheStatistics(dc=DC.FIRST, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME) InfinispanStatistics cacheDc1Statistics,
|
||||
@JmxInfinispanCacheStatistics(dc=DC.SECOND, dcNodeIndex=0, cacheName=InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME) InfinispanStatistics cacheDc2Statistics,
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
createInitialAuthSessions();
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove expired in DC0 and DC1
|
||||
getTestingClientForStartedNodeInDc(0).testing().removeExpired(REALM_NAME);
|
||||
getTestingClientForStartedNodeInDc(1).testing().removeExpired(REALM_NAME);
|
||||
|
||||
// Nothing yet expired. Limit 5 for sent_messages is just if "lastSessionRefresh" periodic thread happened
|
||||
assertAuthSessionsStatisticsExpected("After remove expired auth sessions - 1", channelStatisticsCrossDc,
|
||||
SESSIONS_COUNT, 5l);
|
||||
|
||||
// Set time offset
|
||||
setTimeOffset(10000000);
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove expired in DC0 and DC1. Need to trigger it on both!
|
||||
getTestingClientForStartedNodeInDc(0).testing().removeExpired(REALM_NAME);
|
||||
getTestingClientForStartedNodeInDc(1).testing().removeExpired(REALM_NAME);
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big.
|
||||
assertAuthSessionsStatisticsExpected("After remove expired auth sessions - 2", channelStatisticsCrossDc,
|
||||
0, 5l);
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Return last used accessTokenResponse
|
||||
private void createInitialAuthSessions() throws Exception {
|
||||
|
||||
// Enable second DC
|
||||
enableDcOnLoadBalancer(DC.SECOND);
|
||||
|
||||
// Check sessions count before test
|
||||
authSessions01 = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME).size();
|
||||
authSessions02 = getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME).size();
|
||||
log.infof("Before creating authentication sessions: authSessions01: %d, authSessions02: %d", authSessions01, authSessions02);
|
||||
|
||||
// Create 20 authentication sessions
|
||||
oauth.realm(REALM_NAME);
|
||||
|
||||
for (int i=0 ; i<SESSIONS_COUNT ; i++) {
|
||||
oauth.openLoginForm();
|
||||
driver.manage().deleteAllCookies();
|
||||
}
|
||||
|
||||
// Assert 20 authentication sessions exists on node1 and node2 and on remote caches
|
||||
Retry.execute(() -> {
|
||||
int authSessions11 = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME).size();
|
||||
int authSessions12 = getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME).size();
|
||||
log.infof("After creating authentication sessions: sessions11: %d, authSessions12: %d", authSessions11, authSessions12);
|
||||
|
||||
// There are 20 new authentication sessions created totally in both datacenters
|
||||
int diff1 = authSessions11 - authSessions01;
|
||||
int diff2 = authSessions12 - authSessions02;
|
||||
Assert.assertEquals(SESSIONS_COUNT, diff1 + diff2);
|
||||
}, 50, 50);
|
||||
}
|
||||
|
||||
|
||||
private void assertAuthSessionsStatisticsExpected(String messagePrefix, InfinispanStatistics channelStatisticsCrossDc,
|
||||
int expectedAuthSessionsCountDiff, long sentMessagesHigherBound) {
|
||||
Retry.execute(() -> {
|
||||
int authSessions1 = getTestingClientForStartedNodeInDc(0).testing().cache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME).size();
|
||||
int authSessions2 = getTestingClientForStartedNodeInDc(1).testing().cache(InfinispanConnectionProvider.AUTHENTICATION_SESSIONS_CACHE_NAME).size();
|
||||
long messagesCount = (Long) channelStatisticsCrossDc.getSingleStatistics(InfinispanStatistics.Constants.STAT_CHANNEL_SENT_MESSAGES);
|
||||
log.infof(messagePrefix + ": authSessions1: %d, authSessions2: %d, sentMessages: %d", authSessions1, authSessions2, messagesCount);
|
||||
|
||||
int diff1 = authSessions1 - authSessions01;
|
||||
int diff2 = authSessions2 - authSessions02;
|
||||
|
||||
Assert.assertEquals(expectedAuthSessionsCountDiff, diff1 + diff2);
|
||||
|
||||
// Workaround...
|
||||
if (sentMessagesHigherBound > 5) {
|
||||
Assert.assertThat(messagesCount, Matchers.greaterThan(0l));
|
||||
}
|
||||
|
||||
Assert.assertThat(messagesCount, Matchers.lessThan(sentMessagesHigherBound));
|
||||
}, 50, 50);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRealmRemoveAuthSessions(
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
|
||||
createInitialAuthSessions();
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove test realm
|
||||
getAdminClient().realm(REALM_NAME).remove();
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big, however there are some messages due to removed realm
|
||||
assertAuthSessionsStatisticsExpected("After realm removed", channelStatisticsCrossDc,
|
||||
0, 40l);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testClientRemoveAuthSessions(
|
||||
@JmxInfinispanChannelStatistics() InfinispanStatistics channelStatisticsCrossDc) throws Exception {
|
||||
|
||||
createInitialAuthSessions();
|
||||
|
||||
channelStatisticsCrossDc.reset();
|
||||
|
||||
// Remove test-app client
|
||||
ApiUtil.findClientByClientId(getAdminClient().realm(REALM_NAME), "test-app").remove();
|
||||
|
||||
// Assert sessions removed on node1 and node2 and on remote caches. Assert that count of messages sent between DCs is not too big, however there are some messages due to removed client
|
||||
assertAuthSessionsStatisticsExpected("After client removed", channelStatisticsCrossDc,
|
||||
0, 5l);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -174,7 +174,7 @@
|
|||
<group qualifier="auth-server-undertow-cross-dc">
|
||||
<container qualifier="cache-server-cross-dc-1" mode="suite" >
|
||||
<configuration>
|
||||
<property name="enabled">${auth.server.undertow.crossdc}</property>
|
||||
<property name="enabled">${auth.server.undertow.crossdc} && ! ${cache.server.lifecycle.skip}</property>
|
||||
<property name="adapterImplClass">org.jboss.as.arquillian.container.managed.ManagedDeployableContainer</property>
|
||||
<property name="jbossHome">${cache.server.home}</property>
|
||||
<property name="serverConfig">clustered.xml</property>
|
||||
|
@ -197,7 +197,7 @@
|
|||
|
||||
<container qualifier="cache-server-cross-dc-2" mode="suite" >
|
||||
<configuration>
|
||||
<property name="enabled">${auth.server.undertow.crossdc}</property>
|
||||
<property name="enabled">${auth.server.undertow.crossdc} && ! ${cache.server.lifecycle.skip}</property>
|
||||
<property name="adapterImplClass">org.jboss.as.arquillian.container.managed.ManagedDeployableContainer</property>
|
||||
<property name="jbossHome">${cache.server.home}</property>
|
||||
<property name="setupCleanServerBaseDir">true</property>
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
<auth.server.undertow>true</auth.server.undertow>
|
||||
<auth.server.undertow.crossdc>false</auth.server.undertow.crossdc>
|
||||
<auth.server.crossdc>false</auth.server.crossdc>
|
||||
<cache.server.lifecycle.skip>false</cache.server.lifecycle.skip>
|
||||
|
||||
<auth.server.container>auth-server-${auth.server}</auth.server.container>
|
||||
<auth.server.home>${containers.home}/${auth.server.container}</auth.server.home>
|
||||
|
@ -272,6 +273,7 @@
|
|||
<!--cache server properties-->
|
||||
<auth.server.crossdc>${auth.server.crossdc}</auth.server.crossdc>
|
||||
<auth.server.undertow.crossdc>${auth.server.undertow.crossdc}</auth.server.undertow.crossdc>
|
||||
<cache.server.lifecycle.skip>${cache.server.lifecycle.skip}</cache.server.lifecycle.skip>
|
||||
|
||||
<cache.server>${cache.server}</cache.server>
|
||||
<cache.server.port.offset>${cache.server.port.offset}</cache.server.port.offset>
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.infinispan.context.Flag;
|
|||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
@ -250,6 +252,7 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public static class CreateManySessionsCommand extends AbstractSessionCacheCommand {
|
||||
|
||||
@Override
|
||||
|
@ -288,4 +291,41 @@ public abstract class AbstractSessionCacheCommand extends AbstractCommand {
|
|||
|
||||
}
|
||||
|
||||
|
||||
// This will propagate creating sessions to remoteCache too
|
||||
public static class CreateManySessionsProviderCommand extends AbstractSessionCacheCommand {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "createManySessionsProvider";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
|
||||
String realmName = getArg(1);
|
||||
String username = getArg(2);
|
||||
int count = getIntArg(3);
|
||||
int batchCount = getIntArg(4);
|
||||
|
||||
BatchTaskRunner.runInBatches(0, count, batchCount, session.getKeycloakSessionFactory(), (KeycloakSession batchSession, int firstInIteration, int countInIteration) -> {
|
||||
RealmModel realm = batchSession.realms().getRealmByName(realmName);
|
||||
UserModel user = batchSession.users().getUserByUsername(username, realm);
|
||||
|
||||
for (int i=0 ; i<countInIteration ; i++) {
|
||||
session.sessions().createUserSession(KeycloakModelUtils.generateId(), realm, user, username, "127.0.0.1", "form", false, null, null);
|
||||
}
|
||||
|
||||
log.infof("Created '%d' sessions started from offset '%d'", countInIteration, firstInIteration);
|
||||
});
|
||||
|
||||
log.infof("Created all '%d' sessions", count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String printUsage() {
|
||||
return getName() + " <cache-name> <realm-name> <user-name> <count> <count-in-batch>";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ public class TestsuiteCLI {
|
|||
AbstractSessionCacheCommand.ListCommand.class,
|
||||
AbstractSessionCacheCommand.ClearCommand.class,
|
||||
AbstractSessionCacheCommand.CreateManySessionsCommand.class,
|
||||
AbstractSessionCacheCommand.CreateManySessionsProviderCommand.class,
|
||||
PersistSessionsCommand.class,
|
||||
LoadPersistentSessionsCommand.class,
|
||||
UserCommands.Create.class,
|
||||
|
|
Loading…
Reference in a new issue