Execute persistent sessions tests in CI and fix deadlock (#29236)
* Execute persistent sessions tests in CI and fix deadlock in UserSessionConcurrencyTest The deadlock was caused by a switch to reactive handling of embedded Infinispan updates introduced here #28862 Closes #29235 Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
parent
8c3f7cc6e9
commit
05b6f897ce
2 changed files with 49 additions and 74 deletions
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
@ -324,7 +324,7 @@ jobs:
|
||||||
run: |
|
run: |
|
||||||
TESTS=`testsuite/integration-arquillian/tests/base/testsuites/suite.sh persistent-sessions`
|
TESTS=`testsuite/integration-arquillian/tests/base/testsuites/suite.sh persistent-sessions`
|
||||||
echo "Tests: $TESTS"
|
echo "Tests: $TESTS"
|
||||||
./mvnw test ${{ env.SUREFIRE_RETRY }} -Pauth-server-quarkus -Dauth.server.features=${{ matrix.variant }} -Dtest=$TESTS -pl testsuite/integration-arquillian/tests/base 2>&1 | misc/log/trimmer.sh
|
./mvnw test ${{ env.SUREFIRE_RETRY }} -Pauth-server-quarkus -Dauth.server.feature=${{ matrix.variant }} -Dtest=$TESTS -pl testsuite/integration-arquillian/tests/base 2>&1 | misc/log/trimmer.sh
|
||||||
|
|
||||||
- name: Upload JVM Heapdumps
|
- name: Upload JVM Heapdumps
|
||||||
if: always()
|
if: always()
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.keycloak.models.sessions.infinispan.changes;
|
package org.keycloak.models.sessions.infinispan.changes;
|
||||||
|
|
||||||
import org.infinispan.AdvancedCache;
|
|
||||||
import org.infinispan.Cache;
|
import org.infinispan.Cache;
|
||||||
import org.infinispan.context.Flag;
|
import org.infinispan.context.Flag;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -25,25 +24,22 @@ import org.keycloak.connections.infinispan.InfinispanUtil;
|
||||||
import org.keycloak.models.sessions.infinispan.CacheDecorators;
|
import org.keycloak.models.sessions.infinispan.CacheDecorators;
|
||||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implements SessionChangesPerformer<K, V> {
|
||||||
|
|
||||||
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
|
private static final Logger LOG = Logger.getLogger(EmbeddedCachesChangesPerformer.class);
|
||||||
private final Cache<K, SessionEntityWrapper<V>> cache;
|
private final Cache<K, SessionEntityWrapper<V>> cache;
|
||||||
private final List<Supplier<CompletableFuture<?>>> changes = new LinkedList<>();
|
private final List<Runnable> changes = new LinkedList<>();
|
||||||
|
|
||||||
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache) {
|
public EmbeddedCachesChangesPerformer(Cache<K, SessionEntityWrapper<V>> cache) {
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<?> runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
|
private void runOperationInCluster(K key, MergedUpdate<V> task, SessionEntityWrapper<V> sessionWrapper) {
|
||||||
V session = sessionWrapper.getEntity();
|
V session = sessionWrapper.getEntity();
|
||||||
SessionUpdateTask.CacheOperation operation = task.getOperation(session);
|
SessionUpdateTask.CacheOperation operation = task.getOperation(session);
|
||||||
|
|
||||||
|
@ -53,46 +49,49 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
||||||
switch (operation) {
|
switch (operation) {
|
||||||
case REMOVE:
|
case REMOVE:
|
||||||
// Just remove it
|
// Just remove it
|
||||||
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
||||||
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
||||||
.removeAsyncEntry(key);
|
.remove(key);
|
||||||
|
break;
|
||||||
case ADD:
|
case ADD:
|
||||||
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache)
|
||||||
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
.withFlags(Flag.IGNORE_RETURN_VALUES)
|
||||||
.putAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS)
|
.put(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
|
||||||
.thenAcceptAsync(v -> LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs()));
|
|
||||||
|
LOG.tracef("Added entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
|
break;
|
||||||
case ADD_IF_ABSENT:
|
case ADD_IF_ABSENT:
|
||||||
return CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).putIfAbsentAsync(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS)
|
SessionEntityWrapper<V> existing = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache).putIfAbsent(key, sessionWrapper, task.getLifespanMs(), TimeUnit.MILLISECONDS, task.getMaxIdleTimeMs(), TimeUnit.MILLISECONDS);
|
||||||
.thenAccept(existing -> {
|
|
||||||
if (existing != null) {
|
if (existing != null) {
|
||||||
LOG.debugf("Existing entity in cache for key: %s . Will update it", key);
|
LOG.debugf("Existing entity in cache for key: %s . Will update it", key);
|
||||||
|
|
||||||
// Apply updates on the existing entity and replace it
|
// Apply updates on the existing entity and replace it
|
||||||
task.runUpdate(existing.getEntity());
|
task.runUpdate(existing.getEntity());
|
||||||
|
|
||||||
replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs()).join();
|
replace(key, task, existing, task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
} else {
|
} else {
|
||||||
LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
LOG.tracef("Add_if_absent successfully called for entity '%s' to the cache '%s' . Lifespan: %d ms, MaxIdle: %d ms", key, cache.getName(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
}
|
}
|
||||||
});
|
break;
|
||||||
case REPLACE:
|
case REPLACE:
|
||||||
return replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs());
|
replace(key, task, sessionWrapper, task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException("Unsupported state " + operation);
|
throw new IllegalStateException("Unsupported state " + operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<?> replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntityFirst, long lifespanMs, long maxIdleTimeMs) {
|
private void replace(K key, MergedUpdate<V> task, SessionEntityWrapper<V> oldVersionEntity, long lifespanMs, long maxIdleTimeMs) {
|
||||||
AdvancedCache<K, SessionEntityWrapper<V>> writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
|
SessionEntityWrapper<V> oldVersion = oldVersionEntity;
|
||||||
// make one async attempt
|
SessionEntityWrapper<V> returnValue = null;
|
||||||
SessionEntityWrapper<V> newVersionEntityFirst = generateNewVersionAndWrapEntity(oldVersionEntityFirst.getEntity(), oldVersionEntityFirst.getLocalMetadata());
|
|
||||||
return writeCache.computeIfPresentAsync(key, new ReplaceFunction<>(oldVersionEntityFirst.getVersion(), newVersionEntityFirst), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS)
|
|
||||||
.thenAccept(returnValue -> {
|
|
||||||
int iteration = 0;
|
int iteration = 0;
|
||||||
SessionEntityWrapper<V> newVersionEntity = newVersionEntityFirst;
|
V session = oldVersion.getEntity();
|
||||||
SessionEntityWrapper<V> oldVersion = oldVersionEntityFirst;
|
var writeCache = CacheDecorators.skipCacheStoreIfRemoteCacheIsEnabled(cache);
|
||||||
|
while (iteration++ < InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
||||||
|
SessionEntityWrapper<V> newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
||||||
|
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
while (true) {
|
|
||||||
if (returnValue == null) {
|
if (returnValue == null) {
|
||||||
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
LOG.debugf("Entity %s not found. Maybe removed in the meantime. Replace task will be ignored", key);
|
||||||
return;
|
return;
|
||||||
|
@ -102,27 +101,15 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
LOG.tracef("Replace SUCCESS for entity: %s . old version: %s, new version: %s, Lifespan: %d ms, MaxIdle: %d ms", key, oldVersion.getVersion(), newVersionEntity.getVersion(), task.getLifespanMs(), task.getMaxIdleTimeMs());
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.tracef("Replace failed for entity: %s, old version %s, new version %s. Will try again", key, oldVersion.getVersion(), newVersionEntity.getVersion());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (++iteration >= InfinispanUtil.MAXIMUM_REPLACE_RETRIES) {
|
|
||||||
LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
oldVersion = returnValue;
|
oldVersion = returnValue;
|
||||||
V session = oldVersion.getEntity();
|
session = oldVersion.getEntity();
|
||||||
task.runUpdate(session);
|
task.runUpdate(session);
|
||||||
newVersionEntity = generateNewVersionAndWrapEntity(session, oldVersion.getLocalMetadata());
|
|
||||||
|
|
||||||
returnValue = writeCache.computeIfPresent(key, new ReplaceFunction<>(oldVersion.getVersion(), newVersionEntity), lifespanMs, TimeUnit.MILLISECONDS, maxIdleTimeMs, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
LOG.warnf("Failed to replace entity '%s' in cache '%s'. Expected: %s, Current: %s", key, cache.getName(), oldVersion, returnValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
|
private SessionEntityWrapper<V> generateNewVersionAndWrapEntity(V entity, Map<String, String> localMetadata) {
|
||||||
|
@ -136,18 +123,6 @@ public class EmbeddedCachesChangesPerformer<K, V extends SessionEntity> implemen
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void applyChanges() {
|
public void applyChanges() {
|
||||||
if (!changes.isEmpty()) {
|
changes.forEach(Runnable::run);
|
||||||
List<Throwable> exceptions = new ArrayList<>();
|
|
||||||
CompletableFuture.allOf(changes.stream().map(s -> s.get().exceptionally(throwable -> {
|
|
||||||
exceptions.add(throwable);
|
|
||||||
return null;
|
|
||||||
})).toArray(CompletableFuture[]::new)).join();
|
|
||||||
// If any of those futures has failed, add the exceptions as suppressed exceptions to our runtime exception
|
|
||||||
if (!exceptions.isEmpty()) {
|
|
||||||
RuntimeException ex = new RuntimeException("unable to complete the session updates");
|
|
||||||
exceptions.forEach(ex::addSuppressed);
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue