clustered testing
This commit is contained in:
parent
abddbfb3a4
commit
a0696fcb97
4 changed files with 44 additions and 10 deletions
|
@ -93,7 +93,9 @@ import java.util.Set;
|
||||||
* - roles are tricky because of composites. Composite lists are cached too. So, when a role is removed
|
* - roles are tricky because of composites. Composite lists are cached too. So, when a role is removed
|
||||||
* we also iterate and invalidate any role or group that contains that role being removed.
|
* we also iterate and invalidate any role or group that contains that role being removed.
|
||||||
*
|
*
|
||||||
*
|
* - Clustering gotchyas. With an invalidation cache, if you remove an entry on node 1 and this entry does not exist on node 2, node 2 will not receive a @Listener invalidation event.
|
||||||
|
* so, hat we have to put a marker entry in the invalidation cache before we read from the DB, so if the DB changes in between reading and adding a cache entry, the cache will be notified and bump
|
||||||
|
* the version information.
|
||||||
*
|
*
|
||||||
* - any relationship should be resolved from session.realms(). For example if JPA.getClientByClientId() is invoked,
|
* - any relationship should be resolved from session.realms(). For example if JPA.getClientByClientId() is invoked,
|
||||||
* JPA should find the id of the client and then call session.realms().getClientById(). THis is to ensure that the cached
|
* JPA should find the id of the client and then call session.realms().getClientById(). THis is to ensure that the cached
|
||||||
|
@ -192,13 +194,16 @@ public class StreamCacheRealmProvider implements CacheRealmProvider {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void commit() {
|
public void commit() {
|
||||||
|
/* THIS WAS CAUSING DEADLOCK IN A CLUSTER
|
||||||
if (delegate == null) return;
|
if (delegate == null) return;
|
||||||
List<String> locks = new LinkedList<>();
|
List<String> locks = new LinkedList<>();
|
||||||
locks.addAll(invalidations);
|
locks.addAll(invalidations);
|
||||||
|
|
||||||
Collections.sort(locks); // lock ordering
|
Collections.sort(locks); // lock ordering
|
||||||
cache.getRevisions().startBatch();
|
cache.getRevisions().startBatch();
|
||||||
//if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks);
|
|
||||||
|
if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks);
|
||||||
|
*/
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidat
|
||||||
import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
|
import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
|
||||||
import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
|
import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
import org.keycloak.models.cache.infinispan.entities.AbstractRevisioned;
|
||||||
import org.keycloak.models.cache.infinispan.entities.CachedClient;
|
import org.keycloak.models.cache.infinispan.entities.CachedClient;
|
||||||
import org.keycloak.models.cache.infinispan.entities.CachedClientTemplate;
|
import org.keycloak.models.cache.infinispan.entities.CachedClientTemplate;
|
||||||
import org.keycloak.models.cache.infinispan.entities.CachedGroup;
|
import org.keycloak.models.cache.infinispan.entities.CachedGroup;
|
||||||
|
@ -72,7 +73,11 @@ public class StreamRealmCache {
|
||||||
|
|
||||||
public Long getCurrentRevision(String id) {
|
public Long getCurrentRevision(String id) {
|
||||||
Long revision = revisions.get(id);
|
Long revision = revisions.get(id);
|
||||||
if (revision == null) return UpdateCounter.current();
|
if (revision == null) revision = UpdateCounter.current();
|
||||||
|
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
|
||||||
|
// so, we do this to force this.
|
||||||
|
String invalidationKey = "invalidation.key" + id;
|
||||||
|
cache.putForExternalRead(invalidationKey, new AbstractRevisioned(-1L, invalidationKey));
|
||||||
return revision;
|
return revision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,6 +109,9 @@ public class StreamRealmCache {
|
||||||
|
|
||||||
public Object invalidateObject(String id) {
|
public Object invalidateObject(String id) {
|
||||||
Revisioned removed = (Revisioned)cache.remove(id);
|
Revisioned removed = (Revisioned)cache.remove(id);
|
||||||
|
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
|
||||||
|
// so, we do this to force the event.
|
||||||
|
cache.remove("invalidation.key" + id);
|
||||||
bumpVersion(id);
|
bumpVersion(id);
|
||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
@ -261,11 +269,32 @@ public class StreamRealmCache {
|
||||||
|
|
||||||
@CacheEntryInvalidated
|
@CacheEntryInvalidated
|
||||||
public void cacheInvalidated(CacheEntryInvalidatedEvent<String, Object> event) {
|
public void cacheInvalidated(CacheEntryInvalidatedEvent<String, Object> event) {
|
||||||
if (!event.isPre()) {
|
if (event.isPre()) {
|
||||||
bumpVersion(event.getKey());
|
String key = event.getKey();
|
||||||
|
if (key.startsWith("invalidation.key")) {
|
||||||
|
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
|
||||||
|
// so, we do this to force this.
|
||||||
|
String bump = key.substring("invalidation.key".length());
|
||||||
|
logger.tracev("bumping invalidation key {0}", bump);
|
||||||
|
bumpVersion(bump);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
//if (!event.isPre()) {
|
||||||
|
String key = event.getKey();
|
||||||
|
if (key.startsWith("invalidation.key")) {
|
||||||
|
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
|
||||||
|
// so, we do this to force this.
|
||||||
|
String bump = key.substring("invalidation.key".length());
|
||||||
|
bumpVersion(bump);
|
||||||
|
logger.tracev("bumping invalidation key {0}", bump);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
bumpVersion(key);
|
||||||
Object object = event.getValue();
|
Object object = event.getValue();
|
||||||
if (object != null) {
|
if (object != null) {
|
||||||
bumpVersion(event.getKey());
|
bumpVersion(key);
|
||||||
Predicate<Map.Entry<String, Revisioned>> predicate = getInvalidationPredicate(object);
|
Predicate<Map.Entry<String, Revisioned>> predicate = getInvalidationPredicate(object);
|
||||||
if (predicate != null) runEvictions(predicate);
|
if (predicate != null) runEvictions(predicate);
|
||||||
logger.tracev("invalidating: {0}" + object.getClass().getName());
|
logger.tracev("invalidating: {0}" + object.getClass().getName());
|
||||||
|
|
|
@ -55,8 +55,8 @@ public class ClusteredConcurrencyTest {
|
||||||
|
|
||||||
private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class);
|
private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class);
|
||||||
|
|
||||||
private static final int DEFAULT_THREADS = 5;
|
private static final int DEFAULT_THREADS = 10;
|
||||||
private static final int DEFAULT_ITERATIONS = 20;
|
private static final int DEFAULT_ITERATIONS = 100;
|
||||||
|
|
||||||
// If enabled only one request is allowed at the time. Useful for checking that test is working.
|
// If enabled only one request is allowed at the time. Useful for checking that test is working.
|
||||||
private static final boolean SYNCHRONIZED = false;
|
private static final boolean SYNCHRONIZED = false;
|
||||||
|
|
|
@ -48,8 +48,8 @@ public class ConcurrencyTest extends AbstractClientTest {
|
||||||
|
|
||||||
private static final Logger log = Logger.getLogger(ConcurrencyTest.class);
|
private static final Logger log = Logger.getLogger(ConcurrencyTest.class);
|
||||||
|
|
||||||
private static final int DEFAULT_THREADS = 5;
|
private static final int DEFAULT_THREADS = 10;
|
||||||
private static final int DEFAULT_ITERATIONS = 20;
|
private static final int DEFAULT_ITERATIONS = 100;
|
||||||
|
|
||||||
// If enabled only one request is allowed at the time. Useful for checking that test is working.
|
// If enabled only one request is allowed at the time. Useful for checking that test is working.
|
||||||
private static final boolean SYNCHRONIZED = false;
|
private static final boolean SYNCHRONIZED = false;
|
||||||
|
|
Loading…
Reference in a new issue