Allow concurrent remote cache operations (#25390)
Closes #25388 Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
parent
9afa5f86ec
commit
5b1b3ca11b
4 changed files with 57 additions and 44 deletions
|
@ -215,21 +215,13 @@ public class InfinispanClusterProviderFactory implements ClusterProviderFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debugf("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
|
logger.debugf("Nodes %s removed from cluster. Removing tasks locked by this nodes", removedNodesAddresses.toString());
|
||||||
/*
|
DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() -> {
|
||||||
workaround for Infinispan 12.1.7.Final to prevent a deadlock while
|
|
||||||
DefaultInfinispanConnectionProviderFactory is shutting down PersistenceManagerImpl
|
|
||||||
that acquires a writeLock and this removal that acquires a readLock.
|
|
||||||
First seen with https://issues.redhat.com/browse/ISPN-13664 and still occurs probably due to
|
|
||||||
https://issues.redhat.com/browse/ISPN-13666 in 13.0.10
|
|
||||||
Tracked in https://github.com/keycloak/keycloak/issues/9871
|
|
||||||
*/
|
|
||||||
synchronized (DefaultInfinispanConnectionProviderFactory.class) {
|
|
||||||
if (workCache.getStatus() == ComponentStatus.RUNNING) {
|
if (workCache.getStatus() == ComponentStatus.RUNNING) {
|
||||||
workCache.entrySet().removeIf(new LockEntryPredicate(removedNodesAddresses));
|
workCache.entrySet().removeIf(new LockEntryPredicate(removedNodesAddresses));
|
||||||
} else {
|
} else {
|
||||||
logger.warn("work cache is not running, ignoring event");
|
logger.warn("work cache is not running, ignoring event");
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
logger.error("caught exception in ViewChangeListener", t);
|
logger.error("caught exception in ViewChangeListener", t);
|
||||||
|
|
|
@ -158,17 +158,9 @@ public class InfinispanNotificationsManager {
|
||||||
// Add directly to remoteCache. Will notify remote listeners on all nodes in all DCs
|
// Add directly to remoteCache. Will notify remote listeners on all nodes in all DCs
|
||||||
Retry.executeWithBackoff((int iteration) -> {
|
Retry.executeWithBackoff((int iteration) -> {
|
||||||
try {
|
try {
|
||||||
/*
|
DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() ->
|
||||||
workaround for Infinispan 12.1.7.Final to prevent a deadlock while
|
workRemoteCache.put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS)
|
||||||
DefaultInfinispanConnectionProviderFactory is shutting down PersistenceManagerImpl
|
);
|
||||||
that acquires a writeLock and this put that acquires a readLock.
|
|
||||||
First seen with https://issues.redhat.com/browse/ISPN-13664 and still occurs probably due to
|
|
||||||
https://issues.redhat.com/browse/ISPN-13666 in 13.0.10
|
|
||||||
Tracked in https://github.com/keycloak/keycloak/issues/9871
|
|
||||||
*/
|
|
||||||
synchronized (DefaultInfinispanConnectionProviderFactory.class) {
|
|
||||||
workRemoteCache.put(eventKey, wrappedEvent, 120, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
} catch (HotRodClientException re) {
|
} catch (HotRodClientException re) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debugf(re, "Failed sending notification to remote cache '%s'. Key: '%s', iteration '%s'. Will try to retry the task",
|
logger.debugf(re, "Failed sending notification to remote cache '%s'. Key: '%s', iteration '%s'. Will try to retry the task",
|
||||||
|
@ -250,10 +242,9 @@ public class InfinispanNotificationsManager {
|
||||||
https://issues.redhat.com/browse/ISPN-13666 in 13.0.10
|
https://issues.redhat.com/browse/ISPN-13666 in 13.0.10
|
||||||
Tracked in https://github.com/keycloak/keycloak/issues/9871
|
Tracked in https://github.com/keycloak/keycloak/issues/9871
|
||||||
*/
|
*/
|
||||||
Object value;
|
Object value = DefaultInfinispanConnectionProviderFactory.runWithReadLockOnCacheManager(() ->
|
||||||
synchronized (DefaultInfinispanConnectionProviderFactory.class) {
|
remoteCache.get(key)
|
||||||
value = remoteCache.get(key);
|
);
|
||||||
}
|
|
||||||
eventReceived(key, (Serializable) value);
|
eventReceived(key, (Serializable) value);
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.keycloak.connections.infinispan;
|
package org.keycloak.connections.infinispan;
|
||||||
|
|
||||||
import org.infinispan.Cache;
|
|
||||||
import org.infinispan.client.hotrod.ProtocolVersion;
|
import org.infinispan.client.hotrod.ProtocolVersion;
|
||||||
import org.infinispan.commons.dataconversion.MediaType;
|
import org.infinispan.commons.dataconversion.MediaType;
|
||||||
import org.infinispan.configuration.cache.CacheMode;
|
import org.infinispan.configuration.cache.CacheMode;
|
||||||
|
@ -29,7 +28,6 @@ import org.infinispan.eviction.EvictionType;
|
||||||
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
|
import org.infinispan.jboss.marshalling.core.JBossUserMarshaller;
|
||||||
import org.infinispan.manager.DefaultCacheManager;
|
import org.infinispan.manager.DefaultCacheManager;
|
||||||
import org.infinispan.manager.EmbeddedCacheManager;
|
import org.infinispan.manager.EmbeddedCacheManager;
|
||||||
import org.infinispan.persistence.manager.PersistenceManager;
|
|
||||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||||
import org.infinispan.transaction.LockingMode;
|
import org.infinispan.transaction.LockingMode;
|
||||||
import org.infinispan.transaction.TransactionMode;
|
import org.infinispan.transaction.TransactionMode;
|
||||||
|
@ -53,6 +51,10 @@ import org.keycloak.provider.ProviderEvent;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.keycloak.connections.infinispan.InfinispanUtil.configureTransport;
|
import static org.keycloak.connections.infinispan.InfinispanUtil.configureTransport;
|
||||||
import static org.keycloak.connections.infinispan.InfinispanUtil.createCacheConfigurationBuilder;
|
import static org.keycloak.connections.infinispan.InfinispanUtil.createCacheConfigurationBuilder;
|
||||||
|
@ -66,6 +68,7 @@ import static org.keycloak.models.cache.infinispan.InfinispanCacheRealmProviderF
|
||||||
*/
|
*/
|
||||||
public class DefaultInfinispanConnectionProviderFactory implements InfinispanConnectionProviderFactory {
|
public class DefaultInfinispanConnectionProviderFactory implements InfinispanConnectionProviderFactory {
|
||||||
|
|
||||||
|
private static final ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();
|
||||||
private static final Logger logger = Logger.getLogger(DefaultInfinispanConnectionProviderFactory.class);
|
private static final Logger logger = Logger.getLogger(DefaultInfinispanConnectionProviderFactory.class);
|
||||||
|
|
||||||
private Config.Scope config;
|
private Config.Scope config;
|
||||||
|
@ -85,24 +88,54 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
|
||||||
return new DefaultInfinispanConnectionProvider(cacheManager, remoteCacheProvider, topologyInfo);
|
return new DefaultInfinispanConnectionProvider(cacheManager, remoteCacheProvider, topologyInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
workaround for Infinispan 12.1.7.Final to prevent a deadlock while
|
||||||
|
DefaultInfinispanConnectionProviderFactory is shutting down PersistenceManagerImpl
|
||||||
|
that acquires a writeLock and this removal that acquires a readLock.
|
||||||
|
First seen with https://issues.redhat.com/browse/ISPN-13664 and still occurs probably due to
|
||||||
|
https://issues.redhat.com/browse/ISPN-13666 in 13.0.10
|
||||||
|
Tracked in https://github.com/keycloak/keycloak/issues/9871
|
||||||
|
*/
|
||||||
|
public static void runWithReadLockOnCacheManager(Runnable task) {
|
||||||
|
Lock lock = DefaultInfinispanConnectionProviderFactory.READ_WRITE_LOCK.readLock();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T runWithReadLockOnCacheManager(Supplier<T> task) {
|
||||||
|
Lock lock = DefaultInfinispanConnectionProviderFactory.READ_WRITE_LOCK.readLock();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return task.get();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void runWithWriteLockOnCacheManager(Runnable task) {
|
||||||
|
Lock lock = DefaultInfinispanConnectionProviderFactory.READ_WRITE_LOCK.writeLock();
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
/*
|
runWithWriteLockOnCacheManager(() -> {
|
||||||
workaround for Infinispan 12.1.7.Final to prevent a deadlock while
|
|
||||||
DefaultInfinispanConnectionProviderFactory is shutting down PersistenceManagerImpl
|
|
||||||
that acquires a writeLock and this removal that acquires a readLock.
|
|
||||||
First seen with https://issues.redhat.com/browse/ISPN-13664 and still occurs probably due to
|
|
||||||
https://issues.redhat.com/browse/ISPN-13666 in 13.0.10
|
|
||||||
Tracked in https://github.com/keycloak/keycloak/issues/9871
|
|
||||||
*/
|
|
||||||
synchronized (DefaultInfinispanConnectionProviderFactory.class) {
|
|
||||||
if (cacheManager != null && !containerManaged) {
|
if (cacheManager != null && !containerManaged) {
|
||||||
cacheManager.stop();
|
cacheManager.stop();
|
||||||
}
|
}
|
||||||
if (remoteCacheProvider != null) {
|
if (remoteCacheProvider != null) {
|
||||||
remoteCacheProvider.stop();
|
remoteCacheProvider.stop();
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -61,14 +61,11 @@ public class InfinispanKeyGenerator {
|
||||||
boolean wantsLocalKey = !session.getProvider(StickySessionEncoderProvider.class).shouldAttachRoute();
|
boolean wantsLocalKey = !session.getProvider(StickySessionEncoderProvider.class).shouldAttachRoute();
|
||||||
|
|
||||||
if (wantsLocalKey && cache.getCacheConfiguration().clustering().cacheMode().isClustered()) {
|
if (wantsLocalKey && cache.getCacheConfiguration().clustering().cacheMode().isClustered()) {
|
||||||
KeyAffinityService<K> keyAffinityService = keyAffinityServices.get(cacheName);
|
KeyAffinityService<K> keyAffinityService = keyAffinityServices.computeIfAbsent(cacheName, s -> {
|
||||||
if (keyAffinityService == null) {
|
KeyAffinityService<K> k = createKeyAffinityService(cache, keyGenerator);
|
||||||
keyAffinityService = createKeyAffinityService(cache, keyGenerator);
|
|
||||||
keyAffinityServices.put(cacheName, keyAffinityService);
|
|
||||||
|
|
||||||
log.debugf("Registered key affinity service for cache '%s'", cacheName);
|
log.debugf("Registered key affinity service for cache '%s'", cacheName);
|
||||||
}
|
return k;
|
||||||
|
});
|
||||||
return keyAffinityService.getKeyForAddress(cache.getCacheManager().getAddress());
|
return keyAffinityService.getKeyForAddress(cache.getCacheManager().getAddress());
|
||||||
} else {
|
} else {
|
||||||
return keyGenerator.getKey();
|
return keyGenerator.getKey();
|
||||||
|
|
Loading…
Reference in a new issue