Use ReentrantLock instead of synchronized to avoid thread pinning
+ since the runSerialized mechanism is currently on the best effort basis it is possible there are concurrent executions if T1 obtained a lock T2 removed the lock and T3 created a new lock before T1 called putIfAbsent therefore I added a debug log detecting this situation Signed-off-by: Michal Hajas <mhajas@redhat.com>
This commit is contained in:
parent
355901dfd8
commit
1bb5e14134
2 changed files with 26 additions and 14 deletions
|
@ -31,6 +31,7 @@ import org.keycloak.models.cache.infinispan.stream.InRealmPredicate;
|
|||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
/**
|
||||
|
@ -40,7 +41,7 @@ public class RealmCacheManager extends CacheManager {
|
|||
|
||||
private static final Logger logger = Logger.getLogger(RealmCacheManager.class);
|
||||
|
||||
private final ConcurrentHashMap<String, String> cacheInteractions = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, ReentrantLock> cacheInteractions = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected Logger getLogger() {
|
||||
|
@ -136,15 +137,19 @@ public class RealmCacheManager extends CacheManager {
|
|||
public <T> T computeSerialized(KeycloakSession session, String id, BiFunction<String, KeycloakSession, T> compute) {
|
||||
// this locking is only to ensure that if there is a computation for the same id in the "synchronized" block below,
|
||||
// it will have the same object instance to lock the current execution until the other is finished.
|
||||
String lock = cacheInteractions.computeIfAbsent(id, s -> id);
|
||||
ReentrantLock lock = cacheInteractions.computeIfAbsent(id, s -> new ReentrantLock());
|
||||
try {
|
||||
synchronized (lock) {
|
||||
// in case the previous thread has removed the entry in the finally block
|
||||
cacheInteractions.putIfAbsent(id, lock);
|
||||
return compute.apply(id, session);
|
||||
lock.lock();
|
||||
// in case the previous thread has removed the entry in the finally block
|
||||
ReentrantLock existingLock = cacheInteractions.putIfAbsent(id, lock);
|
||||
if (existingLock != lock) {
|
||||
logger.debugf("Concurrent execution detected for realm '%s'.", id);
|
||||
}
|
||||
|
||||
return compute.apply(id, session);
|
||||
} finally {
|
||||
cacheInteractions.remove(lock);
|
||||
lock.unlock();
|
||||
cacheInteractions.remove(id, lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
|
||||
package org.keycloak.models.sessions.infinispan.changes;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* Adding an in-JVM lock to prevent a best-effort concurrent executions for the same ID.
|
||||
|
@ -27,20 +30,24 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
* @author Alexander Schwartz
|
||||
*/
|
||||
public class SerializeExecutionsByKey<K> {
|
||||
private final ConcurrentHashMap<K, K> cacheInteractions = new ConcurrentHashMap<>();
|
||||
private static final Logger LOG = Logger.getLogger(SerializeExecutionsByKey.class);
|
||||
private final ConcurrentHashMap<K, ReentrantLock> cacheInteractions = new ConcurrentHashMap<>();
|
||||
|
||||
public void runSerialized(K key, Runnable task) {
|
||||
// this locking is only to ensure that if there is a computation for the same id in the "synchronized" block below,
|
||||
// it will have the same object instance to lock the current execution until the other is finished.
|
||||
K lock = cacheInteractions.computeIfAbsent(key, s -> key);
|
||||
ReentrantLock lock = cacheInteractions.computeIfAbsent(key, s -> new ReentrantLock());
|
||||
try {
|
||||
synchronized (lock) {
|
||||
// in case the previous thread has removed the entry in the finally block
|
||||
cacheInteractions.putIfAbsent(key, lock);
|
||||
task.run();
|
||||
lock.lock();
|
||||
// in case the previous thread has removed the entry in the finally block
|
||||
ReentrantLock existingLock = cacheInteractions.putIfAbsent(key, lock);
|
||||
if (existingLock != lock) {
|
||||
LOG.debugf("Concurrent execution detected for key '%s'.", key);
|
||||
}
|
||||
task.run();
|
||||
} finally {
|
||||
cacheInteractions.remove(lock);
|
||||
lock.unlock();
|
||||
cacheInteractions.remove(key, lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue