Merge pull request #2288 from patriot1burke/master

clustered cache fixes
This commit is contained in:
Bill Burke 2016-02-26 18:22:38 -05:00
commit 1d8f3d8702
6 changed files with 318 additions and 13 deletions

View file

@ -135,7 +135,11 @@ public class Config {
@Override
public Boolean getBoolean(String key, Boolean defaultValue) {
String v = get(key, null);
return v != null ? Boolean.parseBoolean(v) : defaultValue;
if (v != null) {
return Boolean.parseBoolean(v);
} else {
return defaultValue;
}
}
@Override

View file

@ -93,7 +93,9 @@ import java.util.Set;
* - roles are tricky because of composites. Composite lists are cached too. So, when a role is removed
* we also iterate and invalidate any role or group that contains that role being removed.
*
*
* - Clustering gotchyas. With an invalidation cache, if you remove an entry on node 1 and this entry does not exist on node 2, node 2 will not receive a @Listener invalidation event.
* so, hat we have to put a marker entry in the invalidation cache before we read from the DB, so if the DB changes in between reading and adding a cache entry, the cache will be notified and bump
* the version information.
*
* - any relationship should be resolved from session.realms(). For example if JPA.getClientByClientId() is invoked,
* JPA should find the id of the client and then call session.realms().getClientById(). THis is to ensure that the cached
@ -192,16 +194,16 @@ public class StreamCacheRealmProvider implements CacheRealmProvider {
@Override
public void commit() {
/* THIS WAS CAUSING DEADLOCK IN A CLUSTER
if (delegate == null) return;
List<String> locks = new LinkedList<>();
locks.addAll(invalidations);
Collections.sort(locks); // lock ordering
cache.getRevisions().startBatch();
//if (!invalidates.isEmpty()) cache.getRevisions().getAdvancedCache().lock(invalidates);
for (String lock : locks) {
boolean success = cache.getRevisions().getAdvancedCache().lock(lock);
}
if (!locks.isEmpty()) cache.getRevisions().getAdvancedCache().lock(locks);
*/
}

View file

@ -24,6 +24,7 @@ import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidat
import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
import org.jboss.logging.Logger;
import org.keycloak.models.cache.infinispan.entities.AbstractRevisioned;
import org.keycloak.models.cache.infinispan.entities.CachedClient;
import org.keycloak.models.cache.infinispan.entities.CachedClientTemplate;
import org.keycloak.models.cache.infinispan.entities.CachedGroup;
@ -72,7 +73,11 @@ public class StreamRealmCache {
public Long getCurrentRevision(String id) {
Long revision = revisions.get(id);
if (revision == null) return UpdateCounter.current();
if (revision == null) revision = UpdateCounter.current();
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
// so, we do this to force this.
String invalidationKey = "invalidation.key" + id;
cache.putForExternalRead(invalidationKey, new AbstractRevisioned(-1L, invalidationKey));
return revision;
}
@ -104,9 +109,16 @@ public class StreamRealmCache {
public Object invalidateObject(String id) {
Revisioned removed = (Revisioned)cache.remove(id);
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
// so, we do this to force the event.
cache.remove("invalidation.key" + id);
bumpVersion(id);
return removed;
}
protected void bumpVersion(String id) {
long next = UpdateCounter.next();
Object rev = revisions.put(id, next);
return removed;
}
public void addRevisioned(Revisioned object) {
@ -257,11 +269,35 @@ public class StreamRealmCache {
@CacheEntryInvalidated
public void cacheInvalidated(CacheEntryInvalidatedEvent<String, Object> event) {
if (!event.isPre()) {
if (event.isPre()) {
String key = event.getKey();
if (key.startsWith("invalidation.key")) {
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
// so, we do this to force this.
String bump = key.substring("invalidation.key".length());
logger.tracev("bumping invalidation key {0}", bump);
bumpVersion(bump);
return;
}
} else {
//if (!event.isPre()) {
String key = event.getKey();
if (key.startsWith("invalidation.key")) {
// if you do cache.remove() on node 1 and the entry doesn't exist on node 2, node 2 never receives a invalidation event
// so, we do this to force this.
String bump = key.substring("invalidation.key".length());
bumpVersion(bump);
logger.tracev("bumping invalidation key {0}", bump);
return;
}
bumpVersion(key);
Object object = event.getValue();
if (object != null) {
bumpVersion(key);
Predicate<Map.Entry<String, Revisioned>> predicate = getInvalidationPredicate(object);
if (predicate != null) runEvictions(predicate);
logger.tracev("invalidating: {0}" + object.getClass().getName());
}
}
}
@ -269,7 +305,11 @@ public class StreamRealmCache {
@CacheEntriesEvicted
public void cacheEvicted(CacheEntriesEvictedEvent<String, Object> event) {
if (!event.isPre())
for (Object object : event.getEntries().values()) {
for (Map.Entry<String, Object> entry : event.getEntries().entrySet()) {
Object object = entry.getValue();
bumpVersion(entry.getKey());
if (object == null) continue;
logger.tracev("evicting: {0}" + object.getClass().getName());
Predicate<Map.Entry<String, Revisioned>> predicate = getInvalidationPredicate(object);
if (predicate != null) runEvictions(predicate);
}
@ -278,7 +318,11 @@ public class StreamRealmCache {
public void runEvictions(Predicate<Map.Entry<String, Revisioned>> current) {
Set<String> evictions = new HashSet<>();
addInvalidations(current, evictions);
for (String key : evictions) cache.evict(key);
logger.tracev("running evictions size: {0}", evictions.size());
for (String key : evictions) {
cache.evict(key);
bumpVersion(key);
}
}
protected Predicate<Map.Entry<String, Revisioned>> getInvalidationPredicate(Object object) {

View file

@ -9,9 +9,11 @@ import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntriesEvicted;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.junit.Ignore;
@ -61,6 +63,11 @@ public class ClusteredCacheBehaviorTest {
}
@CacheEntryCreated
public void created(CacheEntryCreatedEvent event) {
System.out.println("Listener '" + name + "' entry created " + event.getKey() + " isPre: " + event.isPre());
}
@CacheEntryRemoved
public void removed(CacheEntryRemovedEvent<String, Object> event) {
@ -91,6 +98,8 @@ public class ClusteredCacheBehaviorTest {
System.out.println("node1 create entry");
node1Cache.put("key", "node1");
System.out.println("node1 create entry");
node1Cache.put("key", "node111");
System.out.println("node2 create entry");
node2Cache.put("key", "node2");
System.out.println("node1 remove entry");

View file

@ -0,0 +1,212 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.admin;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.keycloak.admin.client.Keycloak;
import org.keycloak.admin.client.resource.ClientResource;
import org.keycloak.admin.client.resource.RealmResource;
import org.keycloak.models.ClientModel;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.RealmModel;
import org.keycloak.representations.idm.ClientRepresentation;
import org.keycloak.representations.idm.GroupRepresentation;
import org.keycloak.representations.idm.RoleRepresentation;
import org.keycloak.services.DefaultKeycloakSessionFactory;
import org.keycloak.services.resources.KeycloakApplication;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
@Ignore
public class ClusteredConcurrencyTest {
private static final Logger log = Logger.getLogger(ClusteredConcurrencyTest.class);
private static final int DEFAULT_THREADS = 10;
private static final int DEFAULT_ITERATIONS = 100;
// If enabled only one request is allowed at the time. Useful for checking that test is working.
private static final boolean SYNCHRONIZED = false;
boolean passedCreateClient = false;
boolean passedCreateRole = false;
public static DefaultKeycloakSessionFactory node1factory;
public static DefaultKeycloakSessionFactory node2factory;
public static DefaultKeycloakSessionFactory[] nodes = new DefaultKeycloakSessionFactory[2];
@BeforeClass
public static void initKeycloak() throws Exception {
System.setProperty("keycloak.connectionsInfinispan.clustered", "true");
System.setProperty("keycloak.connectionsInfinispan.async", "false");
KeycloakApplication.loadConfig();
node1factory = new DefaultKeycloakSessionFactory();
node1factory.init();
nodes[0] = node1factory;
node2factory = new DefaultKeycloakSessionFactory();
node2factory.init();
nodes[1] = node2factory;
KeycloakSession session = nodes[0].create();
session.getTransaction().begin();
session.realms().createRealm("testrealm");
session.getTransaction().commit();
session = nodes[1].create();
session.getTransaction().begin();
RealmModel realm = session.realms().getRealmByName("testrealm");
Assert.assertNotNull(realm);
session.getTransaction().commit();
}
@Test
public void createClient() throws Throwable {
System.out.println("***************************");
long start = System.currentTimeMillis();
run(new KeycloakRunnable() {
@Override
public void run(int threadNum, int iterationNum) {
String name = "c-" + threadNum + "-" + iterationNum;
int node1 = threadNum % 2;
int node2 = 0;
if (node1 == 0) node2 = 1;
String id = null;
{
KeycloakSession session = nodes[node1].create();
session.getTransaction().begin();
RealmModel realm = session.realms().getRealmByName("testrealm");
ClientModel client = realm.addClient(name);
id = client.getId();
session.getTransaction().commit();
}
{
KeycloakSession session = nodes[node2].create();
session.getTransaction().begin();
RealmModel realm = session.realms().getRealmByName("testrealm");
boolean found = false;
for (ClientModel client : realm.getClients()) {
if (client.getId().equals(id)) {
found = true;
}
}
session.getTransaction().commit();
if (!found) {
fail("Client " + name + " not found in client list");
}
}
{
KeycloakSession session = nodes[node1].create();
session.getTransaction().begin();
RealmModel realm = session.realms().getRealmByName("testrealm");
boolean found = false;
for (ClientModel client : realm.getClients()) {
if (client.getId().equals(id)) {
found = true;
}
}
session.getTransaction().commit();
if (!found) {
fail("Client " + name + " not found in client list");
}
}
}
});
long end = System.currentTimeMillis() - start;
System.out.println("createClient took " + end);
}
private void run(final KeycloakRunnable runnable) throws Throwable {
run(runnable, DEFAULT_THREADS, DEFAULT_ITERATIONS);
}
private void run(final KeycloakRunnable runnable, final int numThreads, final int numIterationsPerThread) throws Throwable {
final CountDownLatch latch = new CountDownLatch(numThreads);
final AtomicReference<Throwable> failed = new AtomicReference();
final List<Thread> threads = new LinkedList<>();
final Lock lock = SYNCHRONIZED ? new ReentrantLock() : null;
for (int t = 0; t < numThreads; t++) {
final int threadNum = t;
Thread thread = new Thread() {
@Override
public void run() {
try {
if (lock != null) {
lock.lock();
}
for (int i = 0; i < numIterationsPerThread && latch.getCount() > 0; i++) {
log.infov("thread {0}, iteration {1}", threadNum, i);
runnable.run(threadNum, i);
}
latch.countDown();
} catch (Throwable t) {
failed.compareAndSet(null, t);
while (latch.getCount() > 0) {
latch.countDown();
}
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
thread.start();
threads.add(thread);
}
latch.await();
for (Thread t : threads) {
t.join();
}
if (failed.get() != null) {
throw failed.get();
}
}
interface KeycloakRunnable {
void run(int threadNum, int iterationNum);
}
}

View file

@ -25,6 +25,7 @@ import org.keycloak.admin.client.Keycloak;
import org.keycloak.admin.client.resource.ClientResource;
import org.keycloak.admin.client.resource.RealmResource;
import org.keycloak.representations.idm.ClientRepresentation;
import org.keycloak.representations.idm.GroupRepresentation;
import org.keycloak.representations.idm.RoleRepresentation;
import javax.ws.rs.NotFoundException;
@ -47,8 +48,8 @@ public class ConcurrencyTest extends AbstractClientTest {
private static final Logger log = Logger.getLogger(ConcurrencyTest.class);
private static final int DEFAULT_THREADS = 5;
private static final int DEFAULT_ITERATIONS = 20;
private static final int DEFAULT_THREADS = 10;
private static final int DEFAULT_ITERATIONS = 100;
// If enabled only one request is allowed at the time. Useful for checking that test is working.
private static final boolean SYNCHRONIZED = false;
@ -122,6 +123,39 @@ public class ConcurrencyTest extends AbstractClientTest {
}
@Test
public void createGroup() throws Throwable {
System.out.println("***************************");
long start = System.currentTimeMillis();
run(new KeycloakRunnable() {
@Override
public void run(Keycloak keycloak, RealmResource realm, int threadNum, int iterationNum) {
String name = "c-" + threadNum + "-" + iterationNum;
GroupRepresentation c = new GroupRepresentation();
c.setName(name);
Response response = realm.groups().add(c);
String id = ApiUtil.getCreatedId(response);
response.close();
c = realm.groups().group(id).toRepresentation();
assertNotNull(c);
boolean found = false;
for (GroupRepresentation r : realm.groups().groups()) {
if (r.getName().equals(name)) {
found = true;
break;
}
}
if (!found) {
fail("Group " + name + " not found in group list");
}
}
});
long end = System.currentTimeMillis() - start;
System.out.println("createGroup took " + end);
}
@Test
@Ignore
public void createRemoveClient() throws Throwable {