From a0696fcb973823162ca428b2e7f883dd824d61e3 Mon Sep 17 00:00:00 2001 From: Bill Burke Date: Fri, 26 Feb 2016 16:45:58 -0500 Subject: [PATCH] clustered testing --- .../infinispan/StreamCacheRealmProvider.java | 9 ++++- .../cache/infinispan/StreamRealmCache.java | 37 +++++++++++++++++-- .../admin/ClusteredConcurrencyTest.java | 4 +- .../testsuite/admin/ConcurrencyTest.java | 4 +- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java index a6a14c0f6e..e53f510914 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java @@ -93,7 +93,9 @@ import java.util.Set; * - roles are tricky because of composites. Composite lists are cached too. So, when a role is removed * we also iterate and invalidate any role or group that contains that role being removed. * - * + * - Clustering gotchyas. With an invalidation cache, if you remove an entry on node 1 and this entry does not exist on node 2, node 2 will not receive a @Listener invalidation event. + * so, hat we have to put a marker entry in the invalidation cache before we read from the DB, so if the DB changes in between reading and adding a cache entry, the cache will be notified and bump + * the version information. * * - any relationship should be resolved from session.realms(). For example if JPA.getClientByClientId() is invoked, * JPA should find the id of the client and then call session.realms().getClientById(). THis is to ensure that the cached @@ -192,13 +194,16 @@ public class StreamCacheRealmProvider implements CacheRealmProvider { @Override public void commit() { + /* THIS WAS CAUSING DEADLOCK IN A CLUSTER if (delegate == null) return; List locks = new LinkedList<>(); locks.addAll(invalidations); Collections.sort(locks); // lock ordering cache.getRevisions().startBatch(); - //if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks); + + if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks); + */ } diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java index 29fc43c03f..7ca6b7ac7c 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java @@ -24,6 +24,7 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidat import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent; import org.jboss.logging.Logger; +import org.keycloak.models.cache.infinispan.entities.AbstractRevisioned; import org.keycloak.models.cache.infinispan.entities.CachedClient; import org.keycloak.models.cache.infinispan.entities.CachedClientTemplate; import org.keycloak.models.cache.infinispan.entities.CachedGroup; @@ -72,7 +73,11 @@ public class StreamRealmCache { public Long getCurrentRevision(String id) { Long revision = revisions.get(id); - if (revision == null) return UpdateCounter.current(); + if (revision == null) revision = UpdateCounter.current(); + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force this. + String invalidationKey = "invalidation.key" + id; + cache.putForExternalRead(invalidationKey, new AbstractRevisioned(-1L, invalidationKey)); return revision; } @@ -104,6 +109,9 @@ public class StreamRealmCache { public Object invalidateObject(String id) { Revisioned removed = (Revisioned)cache.remove(id); + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force the event. + cache.remove("invalidation.key" + id); bumpVersion(id); return removed; } @@ -261,11 +269,32 @@ public class StreamRealmCache { @CacheEntryInvalidated public void cacheInvalidated(CacheEntryInvalidatedEvent event) { - if (!event.isPre()) { - bumpVersion(event.getKey()); + if (event.isPre()) { + String key = event.getKey(); + if (key.startsWith("invalidation.key")) { + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force this. + String bump = key.substring("invalidation.key".length()); + logger.tracev("bumping invalidation key {0}", bump); + bumpVersion(bump); + return; + } + + } else { + //if (!event.isPre()) { + String key = event.getKey(); + if (key.startsWith("invalidation.key")) { + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force this. + String bump = key.substring("invalidation.key".length()); + bumpVersion(bump); + logger.tracev("bumping invalidation key {0}", bump); + return; + } + bumpVersion(key); Object object = event.getValue(); if (object != null) { - bumpVersion(event.getKey()); + bumpVersion(key); Predicate> predicate = getInvalidationPredicate(object); if (predicate != null) runEvictions(predicate); logger.tracev("invalidating: {0}" + object.getClass().getName()); diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java index a98121aac6..7073797cf0 100755 --- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java @@ -55,8 +55,8 @@ public class ClusteredConcurrencyTest { private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class); - private static final int DEFAULT_THREADS = 5; - private static final int DEFAULT_ITERATIONS = 20; + private static final int DEFAULT_THREADS = 10; + private static final int DEFAULT_ITERATIONS = 100; // If enabled only one request is allowed at the time. Useful for checking that test is working. private static final boolean SYNCHRONIZED = false; diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java index daaf5a7cdd..d47e78f00b 100755 --- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java @@ -48,8 +48,8 @@ public class ConcurrencyTest extends AbstractClientTest { private static final Logger log = Logger.getLogger(ConcurrencyTest.class); - private static final int DEFAULT_THREADS = 5; - private static final int DEFAULT_ITERATIONS = 20; + private static final int DEFAULT_THREADS = 10; + private static final int DEFAULT_ITERATIONS = 100; // If enabled only one request is allowed at the time. Useful for checking that test is working. private static final boolean SYNCHRONIZED = false;