KEYCLOAK-7489 Replace failed for entity repeats infinitely

This commit is contained in:
mposolda 2018-07-03 09:50:45 +02:00 committed by Marek Posolda
parent 8c66f520af
commit 40d129cf54
4 changed files with 84 additions and 32 deletions

View file

@ -30,6 +30,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.CacheDecorators;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheInvoker;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -205,9 +206,12 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity) {
boolean replaced = false;
int iteration = 0;
V session = oldVersionEntity.getEntity();
while (!replaced) {
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
iteration++;
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersionEntity.getLocalMetadata());
// Atomic cluster-aware replace
@ -236,6 +240,10 @@ public class InfinispanChangelogBasedTransaction<K, V extends SessionEntity> ext
}
}
if (!replaced) {
logger.warnf("Failed to replace entity '%s' in cache '%s'", key, cache.getName());
}
}

View file

@ -34,6 +34,7 @@ import org.keycloak.models.RealmModel;
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
import org.keycloak.models.sessions.infinispan.changes.SessionUpdateTask;
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
@ -132,7 +133,11 @@ public class RemoteCacheInvoker {
private <K, V extends SessionEntity> void replace(RemoteCache<K, SessionEntityWrapper<V>> remoteCache, long lifespanMs, long maxIdleMs, K key, SessionUpdateTask<V> task) {
boolean replaced = false;
while (!replaced) {
int iteration = 0;
while (!replaced && iteration < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
iteration++;
VersionedValue<SessionEntityWrapper<V>> versioned = remoteCache.getVersioned(key);
if (versioned == null) {
logger.warnf("Not found entity to replace for key '%s'", key);
@ -159,6 +164,10 @@ public class RemoteCacheInvoker {
}
}
}
if (!replaced) {
logger.warnf("Failed to replace entity '%s' in remote cache '%s'", key, remoteCache.getName());
}
}

View file

@ -34,6 +34,8 @@ import org.keycloak.models.KeycloakSession;
*/
public class InfinispanUtil {
public static final int MAXIMUM_REPLACE_RETRIES = 25;
// See if we have RemoteStore (external JDG) configured for cross-Data-Center scenario
public static Set<RemoteStore> getRemoteStores(Cache ispnCache) {
return ispnCache.getAdvancedCache().getComponentRegistry().getComponent(PersistenceManager.class).getStores(RemoteStore.class);

View file

@ -27,6 +27,7 @@ import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
@ -77,40 +78,60 @@ public class DistributedCacheConcurrentWritesTest {
clientSession.setTimestamp(1234);
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
cache1.put("123", session);
try {
for (int i = 0; i < 10; i++) {
testConcurrentReplaceWithRemove("key-" + i, session, cache1, cache2);
}
} finally {
// Kill JVM
cache1.getCache().stop();
cache2.getCache().stop();
cache1.getCache().getCacheManager().stop();
cache2.getCache().getCacheManager().stop();
System.out.println("Managers killed");
}
}
// Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
private static void testConcurrentReplaceWithRemove(String key, UserSessionEntity session, CacheWrapper<String, UserSessionEntity> cache1,
CacheWrapper<String, UserSessionEntity> cache2) throws InterruptedException {
cache1.put(key, session);
// Create 2 workers for concurrent write and start them
Worker worker1 = new Worker(1, cache1);
Worker worker2 = new Worker(2, cache2);
Worker worker1 = new Worker(1, cache1, key);
Worker worker2 = new Worker(2, cache2, key);
long start = System.currentTimeMillis();
System.out.println("Started clustering test");
System.out.println("Started clustering test for key " + key);
worker1.start();
//worker1.join();
worker2.start();
Thread.sleep(1000);
// Try to remove the entity after some sleep time.
cache1.wrappedCache.getAdvancedCache()
.withFlags(Flag.CACHE_MODE_LOCAL)
.remove(key);
worker1.join();
worker2.join();
long took = System.currentTimeMillis() - start;
session = cache1.get("123").getEntity();
System.out.println("Took: " + took + " ms. Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
+ ", failedReplaceCounter2: " + failedReplaceCounter2.get());
// JGroups statistics
JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
", received messages: " + channel.getReceivedMessages());
System.out.println("Test finished for key '" + key + "'. Took: " + took + " ms");
// Kill JVM
cache1.getCache().stop();
cache2.getCache().stop();
cache1.getCache().getCacheManager().stop();
cache2.getCache().getCacheManager().stop();
// System.out.println("Took: " + took + " ms for key . Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()
// + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
System.out.println("Managers killed");
// // JGroups statistics
// JChannel channel = (JChannel)((JGroupsTransport)cache1.wrappedCache.getAdvancedCache().getRpcManager().getTransport()).getChannel();
// System.out.println("Sent MB: " + channel.getSentBytes() / 1000000 + ", sent messages: " + channel.getSentMessages() + ", received MB: " + channel.getReceivedBytes() / 1000000 +
// ", received messages: " + channel.getReceivedMessages());
}
@ -118,10 +139,13 @@ public class DistributedCacheConcurrentWritesTest {
private final CacheWrapper<String, UserSessionEntity> cache;
private final int threadId;
private final String key;
public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache) {
public Worker(int threadId, CacheWrapper<String, UserSessionEntity> cache, String key) {
this.threadId = threadId;
this.cache = cache;
this.key = key;
setName("th-" + key + "-" + threadId);
}
@Override
@ -131,16 +155,25 @@ public class DistributedCacheConcurrentWritesTest {
String noteKey = "n-" + threadId + "-" + i;
boolean replaced = false;
while (!replaced) {
SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get("123");
UserSessionEntity oldSession = oldWrapped.getEntity();
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
UserSessionEntity clone = oldSession;
// This code can be used to reproduce infinite loop ( KEYCLOAK-7443 )
// boolean replaced = false;
// while (!replaced) {
// SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
// oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
// replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
// }
clone.getNotes().put(noteKey, "someVal");
//cache.replace("123", clone);
replaced = cacheReplace(oldWrapped, clone);
int count = 0;
boolean replaced = false;
while (!replaced && count < 25) {
count++;
SessionEntityWrapper<UserSessionEntity> oldWrapped = cache.get(key);
oldWrapped.getEntity().getNotes().put(noteKey, "someVal");
replaced = cacheReplace(oldWrapped, oldWrapped.getEntity());
}
if (!replaced) {
System.err.println("FAILED TO REPLACE ENTITY: " + key);
return;
}
}
@ -148,8 +181,8 @@ public class DistributedCacheConcurrentWritesTest {
private boolean cacheReplace(SessionEntityWrapper<UserSessionEntity> oldSession, UserSessionEntity newSession) {
try {
boolean replaced = cache.replace("123", oldSession, newSession);
//cache.replace("123", newSession);
boolean replaced = cache.replace(key, oldSession, newSession);
//cache.replace(key, newSession);
if (!replaced) {
failedReplaceCounter.incrementAndGet();
//return false;
@ -239,7 +272,7 @@ public class DistributedCacheConcurrentWritesTest {
ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
if (clustered) {
distConfigBuilder.clustering().cacheMode(async ? CacheMode.DIST_ASYNC : CacheMode.DIST_SYNC);
distConfigBuilder.clustering().hash().numOwners(1);
distConfigBuilder.clustering().hash().numOwners(2);
// Disable L1 cache
distConfigBuilder.clustering().hash().l1().enabled(false);