From abddbfb3a4dfe0e5c10088a6108e56297bf5e81e Mon Sep 17 00:00:00 2001 From: Bill Burke Date: Fri, 26 Feb 2016 15:39:22 -0500 Subject: [PATCH 1/2] clustered testing --- core/src/main/java/org/keycloak/Config.java | 6 +- .../infinispan/StreamCacheRealmProvider.java | 5 +- .../cache/infinispan/StreamRealmCache.java | 21 +- .../ClusteredCacheBehaviorTest.java | 9 + .../admin/ClusteredConcurrencyTest.java | 212 ++++++++++++++++++ .../testsuite/admin/ConcurrencyTest.java | 34 +++ 6 files changed, 279 insertions(+), 8 deletions(-) create mode 100755 testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java diff --git a/core/src/main/java/org/keycloak/Config.java b/core/src/main/java/org/keycloak/Config.java index f619393c66..06240065a5 100755 --- a/core/src/main/java/org/keycloak/Config.java +++ b/core/src/main/java/org/keycloak/Config.java @@ -135,7 +135,11 @@ public class Config { @Override public Boolean getBoolean(String key, Boolean defaultValue) { String v = get(key, null); - return v != null ? Boolean.parseBoolean(v) : defaultValue; + if (v != null) { + return Boolean.parseBoolean(v); + } else { + return defaultValue; + } } @Override diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java index 2432cc6965..a6a14c0f6e 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java @@ -198,10 +198,7 @@ public class StreamCacheRealmProvider implements CacheRealmProvider { Collections.sort(locks); // lock ordering cache.getRevisions().startBatch(); - //if (!invalidates.isEmpty()) cache.getRevisions().getAdvancedCache().lock(invalidates); - for (String lock : locks) { - boolean success = cache.getRevisions().getAdvancedCache().lock(lock); - } + //if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks); } diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java index 452eee489d..29fc43c03f 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java @@ -104,9 +104,13 @@ public class StreamRealmCache { public Object invalidateObject(String id) { Revisioned removed = (Revisioned)cache.remove(id); + bumpVersion(id); + return removed; + } + + protected void bumpVersion(String id) { long next = UpdateCounter.next(); Object rev = revisions.put(id, next); - return removed; } public void addRevisioned(Revisioned object) { @@ -258,10 +262,13 @@ public class StreamRealmCache { @CacheEntryInvalidated public void cacheInvalidated(CacheEntryInvalidatedEvent event) { if (!event.isPre()) { + bumpVersion(event.getKey()); Object object = event.getValue(); if (object != null) { + bumpVersion(event.getKey()); Predicate> predicate = getInvalidationPredicate(object); if (predicate != null) runEvictions(predicate); + logger.tracev("invalidating: {0}" + object.getClass().getName()); } } } @@ -269,7 +276,11 @@ public class StreamRealmCache { @CacheEntriesEvicted public void cacheEvicted(CacheEntriesEvictedEvent event) { if (!event.isPre()) - for (Object object : event.getEntries().values()) { + for (Map.Entry entry : event.getEntries().entrySet()) { + Object object = entry.getValue(); + bumpVersion(entry.getKey()); + if (object == null) continue; + logger.tracev("evicting: {0}" + object.getClass().getName()); Predicate> predicate = getInvalidationPredicate(object); if (predicate != null) runEvictions(predicate); } @@ -278,7 +289,11 @@ public class StreamRealmCache { public void runEvictions(Predicate> current) { Set evictions = new HashSet<>(); addInvalidations(current, evictions); - for (String key : evictions) cache.evict(key); + logger.tracev("running evictions size: {0}", evictions.size()); + for (String key : evictions) { + cache.evict(key); + bumpVersion(key); + } } protected Predicate> getInvalidationPredicate(Object object) { diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ClusteredCacheBehaviorTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ClusteredCacheBehaviorTest.java index bc8b4a3fdd..3dcc913f9b 100755 --- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ClusteredCacheBehaviorTest.java +++ b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/ClusteredCacheBehaviorTest.java @@ -9,9 +9,11 @@ import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntriesEvicted; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated; import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated; import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved; import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent; +import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent; import org.junit.Ignore; @@ -61,6 +63,11 @@ public class ClusteredCacheBehaviorTest { } + @CacheEntryCreated + public void created(CacheEntryCreatedEvent event) { + + System.out.println("Listener '" + name + "' entry created " + event.getKey() + " isPre: " + event.isPre()); + } @CacheEntryRemoved public void removed(CacheEntryRemovedEvent event) { @@ -91,6 +98,8 @@ public class ClusteredCacheBehaviorTest { System.out.println("node1 create entry"); node1Cache.put("key", "node1"); + System.out.println("node1 create entry"); + node1Cache.put("key", "node111"); System.out.println("node2 create entry"); node2Cache.put("key", "node2"); System.out.println("node1 remove entry"); diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java new file mode 100755 index 0000000000..a98121aac6 --- /dev/null +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java @@ -0,0 +1,212 @@ +/* + * Copyright 2016 Red Hat, Inc. and/or its affiliates + * and other contributors as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.keycloak.testsuite.admin; + +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.keycloak.admin.client.Keycloak; +import org.keycloak.admin.client.resource.ClientResource; +import org.keycloak.admin.client.resource.RealmResource; +import org.keycloak.models.ClientModel; +import org.keycloak.models.KeycloakSession; +import org.keycloak.models.KeycloakSessionFactory; +import org.keycloak.models.RealmModel; +import org.keycloak.representations.idm.ClientRepresentation; +import org.keycloak.representations.idm.GroupRepresentation; +import org.keycloak.representations.idm.RoleRepresentation; +import org.keycloak.services.DefaultKeycloakSessionFactory; +import org.keycloak.services.resources.KeycloakApplication; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.core.Response; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * @author Stian Thorgersen + */ +@Ignore +public class ClusteredConcurrencyTest { + + private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class); + + private static final int DEFAULT_THREADS = 5; + private static final int DEFAULT_ITERATIONS = 20; + + // If enabled only one request is allowed at the time. Useful for checking that test is working. + private static final boolean SYNCHRONIZED = false; + + boolean passedCreateClient = false; + boolean passedCreateRole = false; + + public static DefaultKeycloakSessionFactory node1factory; + public static DefaultKeycloakSessionFactory node2factory; + public static DefaultKeycloakSessionFactory[] nodes = new DefaultKeycloakSessionFactory[2]; + + @BeforeClass + public static void initKeycloak() throws Exception { + System.setProperty("keycloak.connectionsInfinispan.clustered", "true"); + System.setProperty("keycloak.connectionsInfinispan.async", "false"); + KeycloakApplication.loadConfig(); + node1factory = new DefaultKeycloakSessionFactory(); + node1factory.init(); + nodes[0] = node1factory; + node2factory = new DefaultKeycloakSessionFactory(); + node2factory.init(); + nodes[1] = node2factory; + + KeycloakSession session = nodes[0].create(); + session.getTransaction().begin(); + session.realms().createRealm("testrealm"); + session.getTransaction().commit(); + + session = nodes[1].create(); + session.getTransaction().begin(); + RealmModel realm = session.realms().getRealmByName("testrealm"); + Assert.assertNotNull(realm); + session.getTransaction().commit(); + + } + + @Test + public void createClient() throws Throwable { + System.out.println("***************************"); + long start = System.currentTimeMillis(); + run(new KeycloakRunnable() { + @Override + public void run(int threadNum, int iterationNum) { + String name = "c-" + threadNum + "-" + iterationNum; + int node1 = threadNum % 2; + int node2 = 0; + if (node1 == 0) node2 = 1; + + String id = null; + { + KeycloakSession session = nodes[node1].create(); + session.getTransaction().begin(); + RealmModel realm = session.realms().getRealmByName("testrealm"); + ClientModel client = realm.addClient(name); + id = client.getId(); + session.getTransaction().commit(); + } + { + KeycloakSession session = nodes[node2].create(); + session.getTransaction().begin(); + RealmModel realm = session.realms().getRealmByName("testrealm"); + boolean found = false; + for (ClientModel client : realm.getClients()) { + if (client.getId().equals(id)) { + found = true; + } + } + session.getTransaction().commit(); + if (!found) { + fail("Client " + name + " not found in client list"); + } + } + { + KeycloakSession session = nodes[node1].create(); + session.getTransaction().begin(); + RealmModel realm = session.realms().getRealmByName("testrealm"); + boolean found = false; + for (ClientModel client : realm.getClients()) { + if (client.getId().equals(id)) { + found = true; + } + } + session.getTransaction().commit(); + if (!found) { + fail("Client " + name + " not found in client list"); + } + } + } + }); + long end = System.currentTimeMillis() - start; + System.out.println("createClient took " + end); + + } + + private void run(final KeycloakRunnable runnable) throws Throwable { + run(runnable, DEFAULT_THREADS, DEFAULT_ITERATIONS); + } + + private void run(final KeycloakRunnable runnable, final int numThreads, final int numIterationsPerThread) throws Throwable { + final CountDownLatch latch = new CountDownLatch(numThreads); + final AtomicReference failed = new AtomicReference(); + final List threads = new LinkedList<>(); + final Lock lock = SYNCHRONIZED ? new ReentrantLock() : null; + + for (int t = 0; t < numThreads; t++) { + final int threadNum = t; + Thread thread = new Thread() { + @Override + public void run() { + try { + if (lock != null) { + lock.lock(); + } + + for (int i = 0; i < numIterationsPerThread && latch.getCount() > 0; i++) { + log.infov("thread {0}, iteration {1}", threadNum, i); + runnable.run(threadNum, i); + } + latch.countDown(); + } catch (Throwable t) { + failed.compareAndSet(null, t); + while (latch.getCount() > 0) { + latch.countDown(); + } + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + }; + thread.start(); + threads.add(thread); + } + + latch.await(); + + for (Thread t : threads) { + t.join(); + } + + if (failed.get() != null) { + throw failed.get(); + } + } + + interface KeycloakRunnable { + + void run(int threadNum, int iterationNum); + + } + +} diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java index 9adebd3fc2..daaf5a7cdd 100755 --- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java @@ -25,6 +25,7 @@ import org.keycloak.admin.client.Keycloak; import org.keycloak.admin.client.resource.ClientResource; import org.keycloak.admin.client.resource.RealmResource; import org.keycloak.representations.idm.ClientRepresentation; +import org.keycloak.representations.idm.GroupRepresentation; import org.keycloak.representations.idm.RoleRepresentation; import javax.ws.rs.NotFoundException; @@ -122,6 +123,39 @@ public class ConcurrencyTest extends AbstractClientTest { } + @Test + public void createGroup() throws Throwable { + System.out.println("***************************"); + long start = System.currentTimeMillis(); + run(new KeycloakRunnable() { + @Override + public void run(Keycloak keycloak, RealmResource realm, int threadNum, int iterationNum) { + String name = "c-" + threadNum + "-" + iterationNum; + GroupRepresentation c = new GroupRepresentation(); + c.setName(name); + Response response = realm.groups().add(c); + String id = ApiUtil.getCreatedId(response); + response.close(); + + c = realm.groups().group(id).toRepresentation(); + assertNotNull(c); + boolean found = false; + for (GroupRepresentation r : realm.groups().groups()) { + if (r.getName().equals(name)) { + found = true; + break; + } + } + if (!found) { + fail("Group " + name + " not found in group list"); + } + } + }); + long end = System.currentTimeMillis() - start; + System.out.println("createGroup took " + end); + + } + @Test @Ignore public void createRemoveClient() throws Throwable { From a0696fcb973823162ca428b2e7f883dd824d61e3 Mon Sep 17 00:00:00 2001 From: Bill Burke Date: Fri, 26 Feb 2016 16:45:58 -0500 Subject: [PATCH 2/2] clustered testing --- .../infinispan/StreamCacheRealmProvider.java | 9 ++++- .../cache/infinispan/StreamRealmCache.java | 37 +++++++++++++++++-- .../admin/ClusteredConcurrencyTest.java | 4 +- .../testsuite/admin/ConcurrencyTest.java | 4 +- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java index a6a14c0f6e..e53f510914 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamCacheRealmProvider.java @@ -93,7 +93,9 @@ import java.util.Set; * - roles are tricky because of composites. Composite lists are cached too. So, when a role is removed * we also iterate and invalidate any role or group that contains that role being removed. * - * + * - Clustering gotchyas. With an invalidation cache, if you remove an entry on node 1 and this entry does not exist on node 2, node 2 will not receive a @Listener invalidation event. + * so, hat we have to put a marker entry in the invalidation cache before we read from the DB, so if the DB changes in between reading and adding a cache entry, the cache will be notified and bump + * the version information. * * - any relationship should be resolved from session.realms(). For example if JPA.getClientByClientId() is invoked, * JPA should find the id of the client and then call session.realms().getClientById(). THis is to ensure that the cached @@ -192,13 +194,16 @@ public class StreamCacheRealmProvider implements CacheRealmProvider { @Override public void commit() { + /* THIS WAS CAUSING DEADLOCK IN A CLUSTER if (delegate == null) return; List locks = new LinkedList<>(); locks.addAll(invalidations); Collections.sort(locks); // lock ordering cache.getRevisions().startBatch(); - //if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks); + + if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks); + */ } diff --git a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java index 29fc43c03f..7ca6b7ac7c 100755 --- a/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java +++ b/model/infinispan/src/main/java/org/keycloak/models/cache/infinispan/StreamRealmCache.java @@ -24,6 +24,7 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidat import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent; import org.jboss.logging.Logger; +import org.keycloak.models.cache.infinispan.entities.AbstractRevisioned; import org.keycloak.models.cache.infinispan.entities.CachedClient; import org.keycloak.models.cache.infinispan.entities.CachedClientTemplate; import org.keycloak.models.cache.infinispan.entities.CachedGroup; @@ -72,7 +73,11 @@ public class StreamRealmCache { public Long getCurrentRevision(String id) { Long revision = revisions.get(id); - if (revision == null) return UpdateCounter.current(); + if (revision == null) revision = UpdateCounter.current(); + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force this. + String invalidationKey = "invalidation.key" + id; + cache.putForExternalRead(invalidationKey, new AbstractRevisioned(-1L, invalidationKey)); return revision; } @@ -104,6 +109,9 @@ public class StreamRealmCache { public Object invalidateObject(String id) { Revisioned removed = (Revisioned)cache.remove(id); + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force the event. + cache.remove("invalidation.key" + id); bumpVersion(id); return removed; } @@ -261,11 +269,32 @@ public class StreamRealmCache { @CacheEntryInvalidated public void cacheInvalidated(CacheEntryInvalidatedEvent event) { - if (!event.isPre()) { - bumpVersion(event.getKey()); + if (event.isPre()) { + String key = event.getKey(); + if (key.startsWith("invalidation.key")) { + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force this. + String bump = key.substring("invalidation.key".length()); + logger.tracev("bumping invalidation key {0}", bump); + bumpVersion(bump); + return; + } + + } else { + //if (!event.isPre()) { + String key = event.getKey(); + if (key.startsWith("invalidation.key")) { + // if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event + // so, we do this to force this. + String bump = key.substring("invalidation.key".length()); + bumpVersion(bump); + logger.tracev("bumping invalidation key {0}", bump); + return; + } + bumpVersion(key); Object object = event.getValue(); if (object != null) { - bumpVersion(event.getKey()); + bumpVersion(key); Predicate> predicate = getInvalidationPredicate(object); if (predicate != null) runEvictions(predicate); logger.tracev("invalidating: {0}" + object.getClass().getName()); diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java index a98121aac6..7073797cf0 100755 --- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ClusteredConcurrencyTest.java @@ -55,8 +55,8 @@ public class ClusteredConcurrencyTest { private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class); - private static final int DEFAULT_THREADS = 5; - private static final int DEFAULT_ITERATIONS = 20; + private static final int DEFAULT_THREADS = 10; + private static final int DEFAULT_ITERATIONS = 100; // If enabled only one request is allowed at the time. Useful for checking that test is working. private static final boolean SYNCHRONIZED = false; diff --git a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java index daaf5a7cdd..d47e78f00b 100755 --- a/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java +++ b/testsuite/integration/src/test/java/org/keycloak/testsuite/admin/ConcurrencyTest.java @@ -48,8 +48,8 @@ public class ConcurrencyTest extends AbstractClientTest { private static final Logger log = Logger.getLogger(ConcurrencyTest.class); - private static final int DEFAULT_THREADS = 5; - private static final int DEFAULT_ITERATIONS = 20; + private static final int DEFAULT_THREADS = 10; + private static final int DEFAULT_ITERATIONS = 100; // If enabled only one request is allowed at the time. Useful for checking that test is working. private static final boolean SYNCHRONIZED = false;