Add support for pessimistic locking to HotRod

Closes #13273
This commit is contained in:
Michal Hajas 2023-01-25 15:33:26 +01:00
parent f6f179eaca
commit 837c64de3d
12 changed files with 368 additions and 267 deletions

View file

@ -24,26 +24,33 @@ import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.models.AbstractKeycloakTransaction;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.map.common.AbstractEntity;
import org.keycloak.models.map.common.DeepCloner;
import org.keycloak.models.map.common.ExpirableEntity;
import org.keycloak.models.map.storage.ModelEntityUtil;
import org.keycloak.models.map.storage.hotRod.common.AbstractHotRodEntity;
import org.keycloak.models.map.storage.hotRod.common.HotRodEntityDelegate;
import org.keycloak.models.map.storage.hotRod.common.HotRodEntityDescriptor;
import org.keycloak.models.map.common.StringKeyConverter;
import org.keycloak.models.map.storage.MapKeycloakTransaction;
import org.keycloak.models.map.storage.MapStorage;
import org.keycloak.models.map.storage.ModelEntityUtil;
import org.keycloak.models.map.storage.QueryParameters;
import org.keycloak.models.map.storage.chm.ConcurrentHashMapCrudOperations;
import org.keycloak.models.map.storage.chm.ConcurrentHashMapKeycloakTransaction;
import org.keycloak.models.map.storage.chm.MapFieldPredicates;
import org.keycloak.models.map.storage.chm.MapModelCriteriaBuilder;
import org.keycloak.models.map.storage.hotRod.transaction.NoActionHotRodTransactionWrapper;
import org.keycloak.models.map.storage.criteria.DefaultModelCriteria;
import org.keycloak.models.map.storage.hotRod.common.AbstractHotRodEntity;
import org.keycloak.models.map.storage.hotRod.common.HotRodEntityDelegate;
import org.keycloak.models.map.storage.hotRod.common.HotRodEntityDescriptor;
import org.keycloak.models.map.storage.hotRod.connections.DefaultHotRodConnectionProviderFactory;
import org.keycloak.models.map.storage.hotRod.connections.HotRodConnectionProvider;
import org.keycloak.models.map.storage.hotRod.locking.HotRodLocksUtils;
import org.keycloak.models.map.storage.hotRod.transaction.AllAreasHotRodTransactionsWrapper;
import org.keycloak.models.map.storage.hotRod.transaction.NoActionHotRodTransactionWrapper;
import org.keycloak.storage.SearchableModelField;
import org.keycloak.utils.LockObjectsForModification;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterators;
@ -53,6 +60,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.keycloak.common.util.StackUtil.getShortStackTrace;
import static org.keycloak.models.map.storage.hotRod.common.HotRodUtils.paginateQuery;
import static org.keycloak.utils.StreamsUtil.closing;
@ -60,6 +68,7 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
private static final Logger LOG = Logger.getLogger(HotRodMapStorage.class);
private final KeycloakSession session;
private final RemoteCache<K, E> remoteCache;
protected final StringKeyConverter<K> keyConverter;
protected final HotRodEntityDescriptor<E, V> storedEntityDescriptor;
@ -67,8 +76,12 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
protected final DeepCloner cloner;
protected boolean isExpirableEntity;
private final AllAreasHotRodTransactionsWrapper txWrapper;
private final Map<SearchableModelField<? super M>, MapModelCriteriaBuilder.UpdatePredicatesFunc<K, V, M>> fieldPredicates;
private final Long lockTimeout;
private final RemoteCache<String, String> locksCache;
public HotRodMapStorage(RemoteCache<K, E> remoteCache, StringKeyConverter<K> keyConverter, HotRodEntityDescriptor<E, V> storedEntityDescriptor, DeepCloner cloner, AllAreasHotRodTransactionsWrapper txWrapper) {
public HotRodMapStorage(KeycloakSession session, RemoteCache<K, E> remoteCache, StringKeyConverter<K> keyConverter, HotRodEntityDescriptor<E, V> storedEntityDescriptor, DeepCloner cloner, AllAreasHotRodTransactionsWrapper txWrapper, Long lockTimeout) {
this.session = session;
this.remoteCache = remoteCache;
this.keyConverter = keyConverter;
this.storedEntityDescriptor = storedEntityDescriptor;
@ -76,6 +89,10 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
this.delegateProducer = storedEntityDescriptor.getHotRodDelegateProvider();
this.isExpirableEntity = ExpirableEntity.class.isAssignableFrom(ModelEntityUtil.getEntityType(storedEntityDescriptor.getModelTypeClass()));
this.txWrapper = txWrapper;
this.fieldPredicates = MapFieldPredicates.getPredicates((Class<M>) storedEntityDescriptor.getModelTypeClass());
this.lockTimeout = lockTimeout;
HotRodConnectionProvider cacheProvider = session.getProvider(HotRodConnectionProvider.class);
this.locksCache = cacheProvider.getRemoteCache(DefaultHotRodConnectionProviderFactory.HOT_ROD_LOCKS_CACHE_NAME);
}
@Override
@ -102,11 +119,32 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
return value;
}
private String getLockName(String key) {
return storedEntityDescriptor.getModelTypeClass().getName() + "_" + key;
}
@Override
public V read(String key) {
Objects.requireNonNull(key, "Key must be non-null");
K k = keyConverter.fromStringSafe(key);
if (LockObjectsForModification.isEnabled(session, storedEntityDescriptor.getModelTypeClass())) {
String lockName = getLockName(key);
HotRodLocksUtils.repeatPutIfAbsent(locksCache, lockName, Duration.ofMillis(lockTimeout), 50, true);
session.getTransactionManager().enlistAfterCompletion(new AbstractKeycloakTransaction() {
@Override
protected void commitImpl() {
HotRodLocksUtils.removeWithInstanceIdentifier(locksCache, lockName);
}
@Override
protected void rollbackImpl() {
HotRodLocksUtils.removeWithInstanceIdentifier(locksCache, lockName);
}
});
}
// Obtain value from Infinispan
E hotRodEntity = remoteCache.get(k);
if (hotRodEntity == null) return null;
@ -152,31 +190,76 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
@Override
public Stream<V> read(QueryParameters<M> queryParameters) {
IckleQueryMapModelCriteriaBuilder<E, M> iqmcb = queryParameters.getModelCriteriaBuilder()
.flashToModelCriteriaBuilder(createCriteriaBuilder());
String queryString = iqmcb.getIckleQuery();
if (!queryParameters.getOrderBy().isEmpty()) {
queryString += " ORDER BY " + queryParameters.getOrderBy().stream().map(HotRodMapStorage::toOrderString)
.collect(Collectors.joining(", "));
if (LockObjectsForModification.isEnabled(session, storedEntityDescriptor.getModelTypeClass())) {
return pessimisticQueryRead(queryParameters);
}
LOG.tracef("Executing read Ickle query: %s", queryString);
QueryFactory queryFactory = Search.getQueryFactory(remoteCache);
Query<E> query = paginateQuery(queryFactory.create(queryString), queryParameters.getOffset(),
queryParameters.getLimit());
query.setParameters(iqmcb.getParameters());
CloseableIterator<E> iterator = query.iterator();
Query<E> query = prepareQueryWithPrefixAndParameters(null, queryParameters);
CloseableIterator<E> iterator = paginateQuery(query, queryParameters.getOffset(),
queryParameters.getLimit()).iterator();
return closing(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false))
.onClose(iterator::close)
.filter(Objects::nonNull) // see https://github.com/keycloak/keycloak/issues/9271
.map(this.delegateProducer);
}
private Stream<V> pessimisticQueryRead(QueryParameters<M> queryParameters) {
DefaultModelCriteria<M> dmc = queryParameters.getModelCriteriaBuilder();
// Optimization if the criteria contains only one id
String id = (String) dmc.getSingleRestrictionArgument("id");
if (id != null) {
// We have a criteria that contains "id EQ 'some_key'". We can change this to reading only some_key using read method and then apply the rest of criteria.
MapModelCriteriaBuilder<K,V,M> mapMcb = dmc.flashToModelCriteriaBuilder(new MapModelCriteriaBuilder<>(keyConverter, fieldPredicates));
V entity = read(id);
if (entity == null) {
return Stream.empty();
}
boolean fulfillsQueryCriteria = mapMcb.getKeyFilter().test(keyConverter.fromString(id)) && mapMcb.getEntityFilter().test(entity);
if (!fulfillsQueryCriteria) {
// entity does not fulfill whole criteria, we can release lock now
HotRodLocksUtils.removeWithInstanceIdentifier(locksCache, getLockName(id));
return Stream.empty();
}
return Stream.of(entity);
}
// Criteria does not contain only one id, we need to read ids non-pessimistically and then read entities one by one pessimistically
Query<Object[]> query = prepareQueryWithPrefixAndParameters("SELECT id ", queryParameters);
CloseableIterator<Object[]> iterator = paginateQuery(query, queryParameters.getOffset(),
queryParameters.getLimit()).iterator();
return closing(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false))
.onClose(iterator::close)
// Extract ids from the result
.map(a -> a[0])
.map(String.class::cast)
// Pessimistically read
.map(this::read)
// Entity can be removed in the meanwhile, we need to check for null
.filter(Objects::nonNull);
}
private <T> Query<T> prepareQueryWithPrefixAndParameters(String prefix, QueryParameters<M> queryParameters) {
IckleQueryMapModelCriteriaBuilder<E, M> iqmcb = queryParameters.getModelCriteriaBuilder()
.flashToModelCriteriaBuilder(createCriteriaBuilder());
String queryString = (prefix != null ? prefix : "") + iqmcb.getIckleQuery();
if (!queryParameters.getOrderBy().isEmpty()) {
queryString += " ORDER BY " + queryParameters.getOrderBy().stream().map(HotRodMapStorage::toOrderString)
.collect(Collectors.joining(", "));
}
LOG.tracef("Preparing Ickle query: '%s'%s", queryString, getShortStackTrace());
QueryFactory queryFactory = Search.getQueryFactory(remoteCache);
Query<T> query = queryFactory.create(queryString);
query.setParameters(iqmcb.getParameters());
return query;
}
@Override
public long getCount(QueryParameters<M> queryParameters) {
IckleQueryMapModelCriteriaBuilder<E, M> iqmcb = queryParameters.getModelCriteriaBuilder()
@ -195,26 +278,11 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
@Override
public long delete(QueryParameters<M> queryParameters) {
IckleQueryMapModelCriteriaBuilder<E, M> iqmcb = queryParameters.getModelCriteriaBuilder()
.flashToModelCriteriaBuilder(createCriteriaBuilder());
String queryString = "DELETE " + iqmcb.getIckleQuery();
if (!queryParameters.getOrderBy().isEmpty()) {
queryString += " ORDER BY " + queryParameters.getOrderBy().stream().map(HotRodMapStorage::toOrderString)
.collect(Collectors.joining(", "));
}
LOG.tracef("Executing delete Ickle query: %s", queryString);
QueryFactory queryFactory = Search.getQueryFactory(remoteCache);
if (queryParameters.getLimit() != null || queryParameters.getOffset() != null) {
throw new IllegalArgumentException("HotRod storage does not support pagination for delete query");
}
Query<Object[]> query = queryFactory.create(queryString);
query.setParameters(iqmcb.getParameters());
Query<Object[]> query = prepareQueryWithPrefixAndParameters("DELETE ", queryParameters);
return query.executeStatement();
}
@ -239,7 +307,6 @@ public class HotRodMapStorage<K, E extends AbstractHotRodEntity, V extends Abstr
}
protected MapKeycloakTransaction<V, M> createTransactionInternal(KeycloakSession session) {
Map<SearchableModelField<? super M>, MapModelCriteriaBuilder.UpdatePredicatesFunc<K, V, M>> fieldPredicates = MapFieldPredicates.getPredicates((Class<M>) storedEntityDescriptor.getModelTypeClass());
return new ConcurrentHashMapKeycloakTransaction<>(this, keyConverter, cloner, fieldPredicates);
}

View file

@ -112,9 +112,12 @@ import org.keycloak.models.map.user.MapUserFederatedIdentityEntity;
import org.keycloak.models.map.userSession.MapAuthenticatedClientSessionEntity;
import org.keycloak.models.map.userSession.MapUserSessionEntity;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ProviderConfigProperty;
import org.keycloak.provider.ProviderConfigurationBuilder;
import org.keycloak.storage.SearchableModelField;
import org.keycloak.transaction.JtaTransactionManagerLookup;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -130,6 +133,7 @@ public class HotRodMapStorageProviderFactory implements AmphibianProviderFactory
private static final Map<SearchableModelField<AuthenticatedClientSessionModel>, MapModelCriteriaBuilder.UpdatePredicatesFunc<Object, AbstractEntity, AuthenticatedClientSessionModel>> clientSessionPredicates = MapFieldPredicates.basePredicates(HotRodAuthenticatedClientSessionEntity.ID);
private Long lockTimeout;
private final static DeepCloner CLONER = new DeepCloner.Builder()
.constructor(MapRootAuthenticationSessionEntity.class, HotRodRootAuthenticationSessionEntityDelegate::new)
.constructor(MapAuthenticationSessionEntity.class, HotRodAuthenticationSessionEntityDelegate::new)
@ -218,12 +222,12 @@ public class HotRodMapStorageProviderFactory implements AmphibianProviderFactory
HotRodEntityDescriptor<E, V> entityDescriptor = (HotRodEntityDescriptor<E, V>) getEntityDescriptor(modelType);
if (modelType == SingleUseObjectValueModel.class) {
return (HotRodMapStorage) new SingleUseObjectHotRodMapStorage(connectionProvider.getRemoteCache(entityDescriptor.getCacheName()), StringKeyConverter.StringKey.INSTANCE, (HotRodEntityDescriptor<HotRodSingleUseObjectEntity, HotRodSingleUseObjectEntityDelegate>) getEntityDescriptor(modelType), CLONER, txWrapper);
return (HotRodMapStorage) new SingleUseObjectHotRodMapStorage(session, connectionProvider.getRemoteCache(entityDescriptor.getCacheName()), StringKeyConverter.StringKey.INSTANCE, (HotRodEntityDescriptor<HotRodSingleUseObjectEntity, HotRodSingleUseObjectEntityDelegate>) getEntityDescriptor(modelType), CLONER, txWrapper, lockTimeout);
} if (modelType == AuthenticatedClientSessionModel.class) {
return new HotRodMapStorage(connectionProvider.getRemoteCache(entityDescriptor.getCacheName()),
return new HotRodMapStorage(session, connectionProvider.getRemoteCache(entityDescriptor.getCacheName()),
StringKeyConverter.StringKey.INSTANCE,
entityDescriptor,
CLONER, txWrapper) {
CLONER, txWrapper, lockTimeout) {
@Override
protected MapKeycloakTransaction createTransactionInternal(KeycloakSession session) {
return new ConcurrentHashMapKeycloakTransaction(this, keyConverter, cloner, clientSessionPredicates);
@ -233,11 +237,10 @@ public class HotRodMapStorageProviderFactory implements AmphibianProviderFactory
// Precompute client session storage to avoid recursive initialization
HotRodMapStorage clientSessionStore = getHotRodStorage(session, AuthenticatedClientSessionModel.class, txWrapper);
clientSessionStore.createTransaction(session);
return new HotRodMapStorage(connectionProvider.getRemoteCache(entityDescriptor.getCacheName()),
return new HotRodMapStorage(session, connectionProvider.getRemoteCache(entityDescriptor.getCacheName()),
StringKeyConverter.StringKey.INSTANCE,
entityDescriptor,
CLONER, txWrapper) {
CLONER, txWrapper, lockTimeout) {
@Override
protected MapKeycloakTransaction createTransactionInternal(KeycloakSession session) {
Map<SearchableModelField<? super UserSessionModel>, MapModelCriteriaBuilder.UpdatePredicatesFunc<String, MapUserSessionEntity, UserSessionModel>> fieldPredicates = MapFieldPredicates.getPredicates((Class<UserSessionModel>) storedEntityDescriptor.getModelTypeClass());
@ -245,12 +248,12 @@ public class HotRodMapStorageProviderFactory implements AmphibianProviderFactory
}
};
}
return new HotRodMapStorage<>(connectionProvider.getRemoteCache(entityDescriptor.getCacheName()), StringKeyConverter.StringKey.INSTANCE, entityDescriptor, CLONER, txWrapper);
return new HotRodMapStorage<>(session, connectionProvider.getRemoteCache(entityDescriptor.getCacheName()), StringKeyConverter.StringKey.INSTANCE, entityDescriptor, CLONER, txWrapper, lockTimeout);
}
@Override
public void init(Config.Scope config) {
this.lockTimeout = config.getLong("lockTimeout", 10000L);
}
@Override
@ -273,4 +276,15 @@ public class HotRodMapStorageProviderFactory implements AmphibianProviderFactory
public String getHelpText() {
return "HotRod map storage";
}
@Override
public List<ProviderConfigProperty> getConfigMetadata() {
return ProviderConfigurationBuilder.create()
.property()
.name("lockTimeout")
.type("long")
.defaultValue(10000L)
.helpText("The maximum time to wait in milliseconds when waiting for acquiring a pessimistic read lock. If set to negative there is no timeout configured.")
.add().build();
}
}

View file

@ -49,10 +49,10 @@ public class SingleUseObjectHotRodMapStorage
private final HotRodEntityDescriptor<HotRodSingleUseObjectEntity, HotRodSingleUseObjectEntityDelegate> storedEntityDescriptor;
private final DeepCloner cloner;
public SingleUseObjectHotRodMapStorage(RemoteCache<String, HotRodSingleUseObjectEntity> remoteCache, StringKeyConverter<String> keyConverter,
public SingleUseObjectHotRodMapStorage(KeycloakSession session, RemoteCache<String, HotRodSingleUseObjectEntity> remoteCache, StringKeyConverter<String> keyConverter,
HotRodEntityDescriptor<HotRodSingleUseObjectEntity, HotRodSingleUseObjectEntityDelegate> storedEntityDescriptor,
DeepCloner cloner, AllAreasHotRodTransactionsWrapper txWrapper) {
super(remoteCache, keyConverter, storedEntityDescriptor, cloner, txWrapper);
DeepCloner cloner, AllAreasHotRodTransactionsWrapper txWrapper, Long lockTimeout) {
super(session, remoteCache, keyConverter, storedEntityDescriptor, cloner, txWrapper, lockTimeout);
this.keyConverter = keyConverter;
this.storedEntityDescriptor = storedEntityDescriptor;
this.cloner = cloner;

View file

@ -137,7 +137,7 @@ public class DefaultHotRodConnectionProviderFactory implements HotRodConnectionP
// Acquire initial phase lock to avoid concurrent schema update
RemoteCache<String, String> locksCache = remoteCacheManager.getCache(HOT_ROD_LOCKS_CACHE_NAME);
try {
HotRodLocksUtils.repeatPutIfAbsent(locksCache, HOT_ROD_INIT_LOCK_NAME, Duration.ofMillis(900), 50);
HotRodLocksUtils.repeatPutIfAbsent(locksCache, HOT_ROD_INIT_LOCK_NAME, Duration.ofMillis(900), 50, false);
Set<String> remoteCaches = ENTITY_DESCRIPTOR_MAP.values().stream()
.map(HotRodEntityDescriptor::getCacheName).collect(Collectors.toSet());

View file

@ -55,7 +55,7 @@ public class HotRodGlobalLockProvider implements GlobalLockProvider {
try {
LOG.debugf("Acquiring lock [%s].%s", lockName, getShortStackTrace());
HotRodLocksUtils.repeatPutIfAbsent(locksCache, lockName, timeToWaitForLock, 50);
HotRodLocksUtils.repeatPutIfAbsent(locksCache, lockName, timeToWaitForLock, 50, false);
LOG.debugf("Lock acquired [%s]. Continuing with task execution.", lockName);
return KeycloakModelUtils.runJobInTransactionWithResult(session.getKeycloakSessionFactory(), task);

View file

@ -33,13 +33,17 @@ import java.util.concurrent.atomic.AtomicReference;
public class HotRodLocksUtils {
public static final String SEPARATOR = ";";
private static final String INSTANCE_IDENTIFIER = getKeycloakInstanceIdentifier();
/**
* Repeatedly attempts to put an entry with the key {@code lockName}
* to the {@code locksCache}. Succeeds only if there is no entry with
* the same key already.
* <p/>
* The value of created entry is equal to instance identifier. It is
* possible to make the method succeed even if the value already exists
* with the same instance identifier. This behaviour is enabled using
* {@code isReentrant} switch.
* <p/>
* Execution of this method is time bounded, if this method does not
* succeed within {@code timeoutMilliseconds} it gives up and returns
* false.
@ -52,19 +56,23 @@ public class HotRodLocksUtils {
* @param timeout duration to wait until the lock is acquired
* @param repeatInterval Number of milliseconds to wait after each
* unsuccessful attempt
* @param isReentrant if this is set to true, the method succeeds also when the value for given key is
* equal to the instance identifier
* @throws LockAcquiringTimeoutException the key {@code lockName} was NOT put into the {@code map}
* within time boundaries
* @throws IllegalStateException when a {@code lock} value found in the storage has wrong format. It is expected
* the lock value has the following format {@code 'timeAcquired;keycloakInstanceIdentifier'}
*/
public static void repeatPutIfAbsent(RemoteCache<String, String> locksCache, String lockName, Duration timeout, int repeatInterval) throws LockAcquiringTimeoutException {
public static void repeatPutIfAbsent(RemoteCache<String, String> locksCache, String lockName, Duration timeout, int repeatInterval, boolean isReentrant) throws LockAcquiringTimeoutException {
final AtomicReference<String> currentOwnerRef = new AtomicReference<>(null);
try {
Retry.executeWithBackoff(i -> {
String curr = locksCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(lockName, Time.currentTimeMillis() + SEPARATOR + INSTANCE_IDENTIFIER);
String curr = locksCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(lockName, Time.currentTimeMillis() + SEPARATOR + getKeycloakInstanceIdentifier());
currentOwnerRef.set(curr);
if (curr != null) {
throw new AssertionError("Acquiring lock in iteration " + i + " was not successful");
if (!isReentrant || !curr.endsWith(SEPARATOR + getKeycloakInstanceIdentifier())) {
throw new AssertionError("Acquiring lock in iteration " + i + " was not successful");
}
}
}, timeout, repeatInterval);
} catch (AssertionError ex) {
@ -85,7 +93,8 @@ public class HotRodLocksUtils {
hostname = "unknown-host";
}
return pid + "@" + hostname;
String threadName = Thread.currentThread().getName();
return threadName + "#" + pid + "@" + hostname;
}
/**
@ -98,7 +107,7 @@ public class HotRodLocksUtils {
*/
public static boolean removeWithInstanceIdentifier(ConcurrentMap<String, String> map, String lockName) {
String value = map.get(lockName);
if (value != null && value.endsWith(INSTANCE_IDENTIFIER)) {
if (value != null && value.endsWith(getKeycloakInstanceIdentifier())) {
map.remove(lockName);
return true;
} else {

View file

@ -17,18 +17,12 @@
package org.keycloak.testsuite.admin;
import org.junit.Assume;
import org.keycloak.Config;
import org.keycloak.admin.client.resource.ComponentResource;
import org.junit.Before;
import org.junit.Test;
import org.keycloak.admin.client.resource.ComponentsResource;
import org.keycloak.admin.client.resource.RealmResource;
import org.keycloak.common.util.MultivaluedHashMap;
import org.keycloak.models.RealmSpi;
import org.keycloak.models.map.common.AbstractMapProviderFactory;
import org.keycloak.models.map.realm.MapRealmProviderFactory;
import org.keycloak.models.map.storage.hotRod.HotRodMapStorageProviderFactory;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.representations.idm.*;
import org.keycloak.testsuite.components.TestProvider;
@ -45,7 +39,6 @@ import java.util.function.BiConsumer;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hamcrest.Matchers;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
@ -57,19 +50,8 @@ public class ComponentsTest extends AbstractAdminTest {
private ComponentsResource components;
private String realmProvider;
@Before
public void before() throws Exception {
// realmProvider is used only to prevent tests from running in certain configs, should be removed once GHI #15410 is resolved.
realmProvider = testingClient.server().fetch(session -> Config.getProvider(RealmSpi.NAME), String.class);
if (realmProvider.equals(MapRealmProviderFactory.PROVIDER_ID)) {
// append the storage provider in case of map
String mapStorageProvider = testingClient.server().fetch(session -> Config.scope(RealmSpi.NAME,
MapRealmProviderFactory.PROVIDER_ID, AbstractMapProviderFactory.CONFIG_STORAGE).get("provider"), String.class);
if (mapStorageProvider != null) realmProvider = realmProvider + "-" + mapStorageProvider;
}
components = adminClient.realm(REALM_NAME).components();
}
@ -99,11 +81,6 @@ public class ComponentsTest extends AbstractAdminTest {
@Test
public void testConcurrencyWithoutChildren() throws InterruptedException {
// remove this restriction once GHI #15410 is resolved.
Assume.assumeThat("Test does not run with HotRod after HotRod client transaction was enabled. This will be removed with pessimistic locking introduction.",
realmProvider,
not(equalTo(MapRealmProviderFactory.PROVIDER_ID + "-" + HotRodMapStorageProviderFactory.PROVIDER_ID)));
testConcurrency((s, i) -> s.submit(new CreateAndDeleteComponent(s, i)));
// Data consistency is not guaranteed with concurrent access to entities in map store.
@ -114,11 +91,6 @@ public class ComponentsTest extends AbstractAdminTest {
@Test
public void testConcurrencyWithChildren() throws InterruptedException {
// remove this restriction once GHI #15410 is resolved.
Assume.assumeThat("Test does not run with HotRod after HotRod client transaction was enabled. This will be removed with pessimistic locking introduction.",
realmProvider,
not(equalTo(MapRealmProviderFactory.PROVIDER_ID + "-" + HotRodMapStorageProviderFactory.PROVIDER_ID)));
testConcurrency((s, i) -> s.submit(new CreateAndDeleteComponentWithFlatChildren(s, i)));
// Data consistency is not guaranteed with concurrent access to entities in map store.

View file

@ -54,9 +54,8 @@ import org.keycloak.jose.jws.JWSInput;
import org.keycloak.models.UserSessionSpi;
import org.keycloak.models.map.common.AbstractMapProviderFactory;
import org.keycloak.models.map.storage.hotRod.HotRodMapStorageProviderFactory;
import org.keycloak.models.map.storage.jpa.JpaMapStorageProviderFactory;
import org.keycloak.models.map.storage.chm.ConcurrentHashMapStorageProviderFactory;
import org.keycloak.models.map.userSession.MapUserSessionProviderFactory;
import org.keycloak.models.sessions.infinispan.InfinispanUserSessionProviderFactory;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.protocol.oidc.OIDCConfigAttributes;
import org.keycloak.representations.AccessToken;
@ -128,10 +127,9 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
@Test
public void concurrentLoginSingleUser() throws Throwable {
// remove this restriction once GHI #15410 is resolved.
Assume.assumeThat("Test runs only with InfinispanUserSessionProvider or MapUserSessionProvider using JPA",
Assume.assumeThat("Test does not work with ConcurrentHashMap storage",
userSessionProvider,
Matchers.either(equalTo(InfinispanUserSessionProviderFactory.PROVIDER_ID))
.or(equalTo(MapUserSessionProviderFactory.PROVIDER_ID + "-" + JpaMapStorageProviderFactory.PROVIDER_ID)));
not(equalTo(MapUserSessionProviderFactory.PROVIDER_ID + "-" + ConcurrentHashMapStorageProviderFactory.PROVIDER_ID)));
log.info("*********************************************");
long start = System.currentTimeMillis();
@ -174,11 +172,6 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
@Test
public void concurrentLoginSingleUserSingleClient() throws Throwable {
// remove this restriction once GHI #15410 is resolved.
Assume.assumeThat("Test does not run with HotRod after HotRod client transaction was enabled. This will be removed with pessimistic locking introduction.",
userSessionProvider,
not(equalTo(MapUserSessionProviderFactory.PROVIDER_ID + "-" + HotRodMapStorageProviderFactory.PROVIDER_ID)));
log.info("*********************************************");
long start = System.currentTimeMillis();
@ -203,10 +196,9 @@ public class ConcurrentLoginTest extends AbstractConcurrencyTest {
@Test
public void concurrentLoginMultipleUsers() throws Throwable {
// remove this restriction once GHI #15410 is resolved.
Assume.assumeThat("Test runs only with InfinispanUserSessionProvider or MapUserSessionProvider using JPA",
Assume.assumeThat("Test does not work with ConcurrentHashMap storage",
userSessionProvider,
Matchers.either(equalTo(InfinispanUserSessionProviderFactory.PROVIDER_ID))
.or(equalTo(MapUserSessionProviderFactory.PROVIDER_ID + "-" + JpaMapStorageProviderFactory.PROVIDER_ID)));
not(equalTo(MapUserSessionProviderFactory.PROVIDER_ID + "-" + ConcurrentHashMapStorageProviderFactory.PROVIDER_ID)));
log.info("*********************************************");
long start = System.currentTimeMillis();

View file

@ -60,6 +60,8 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.keycloak.testsuite.model.transaction.StorageTransactionTest.LOCK_TIMEOUT_SYSTEM_PROPERTY;
/**
*
* @author hmlnarik
@ -109,7 +111,8 @@ public class HotRodMapStorage extends KeycloakModelParameters {
.config("port", hotRodContainer.getPort())
.config("username", hotRodContainer.getUsername())
.config("password", hotRodContainer.getPassword())
.config("configureRemoteCaches", "true");
.config("configureRemoteCaches", "true")
.config("lockTimeout", "${" + LOCK_TIMEOUT_SYSTEM_PROPERTY + ":}");
}
@Override

View file

@ -22,7 +22,6 @@ import org.keycloak.models.AuthenticatedClientSessionModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.Constants;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakTransaction;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.UserSessionProvider;
@ -39,8 +38,6 @@ import org.keycloak.testsuite.model.RequireProvider;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.IntStream;
@ -58,7 +55,6 @@ public class UserSessionConcurrencyTest extends KeycloakModelTest {
private String realmId;
private static final int CLIENTS_COUNT = 10;
private static final Lock SYNC_USESSION = new ReentrantLock();
private static final ThreadLocal<Boolean> wasWriting = ThreadLocal.withInitial(() -> false);
private final boolean isHotRodStore = HotRodMapStorageProviderFactory.PROVIDER_ID.equals(CONFIG.getConfig().get("userSessions.map.storage.provider"));
@ -108,17 +104,6 @@ public class UserSessionConcurrencyTest extends KeycloakModelTest {
RealmModel realm = session.realms().getRealm(realmId);
ClientModel client = realm.getClientByClientId("client" + (n % CLIENTS_COUNT));
// THIS SHOULD BE REMOVED AS PART OF ISSUE https://github.com/keycloak/keycloak/issues/13273
// Without this lock more threads can create client session but only one of them is referenced from
// user session. All others that are not referenced are basically lost and should not be created.
// In other words, this lock is to make sure only one thread creates client session, all other
// should use client session created by the first thread
//
// This is basically the same as JpaMapKeycloakTransaction#read method is doing after calling lockUserSessionsForModification() method
if (isHotRodStore) {
SYNC_USESSION.lock();
}
UserSessionModel uSession = lockUserSessionsForModification(session, () -> session.sessions().getUserSession(realm, uId));
AuthenticatedClientSessionModel cSession = uSession.getAuthenticatedClientSessionByClient(client.getId());
if (cSession == null) {
@ -128,10 +113,6 @@ public class UserSessionConcurrencyTest extends KeycloakModelTest {
cSession.setNote(OIDCLoginProtocol.STATE_PARAM, "state-" + n);
if (isHotRodStore) {
releaseLockOnTransactionCommit(session, SYNC_USESSION);
}
return null;
} finally {
cdl.countDown();
@ -162,54 +143,4 @@ public class UserSessionConcurrencyTest extends KeycloakModelTest {
});
}
}
private void releaseLockOnTransactionCommit(KeycloakSession session, Lock l) {
session.getTransactionManager().enlistAfterCompletion(new KeycloakTransaction() {
@Override
public void begin() {
}
@Override
public void commit() {
// THIS IS WORKAROUND FOR MISSING https://github.com/keycloak/keycloak/issues/13280
// It happens that calling remoteCache.put() in one thread and remoteCache.get() in another thread after
// releasing the l lock is so fast that changes are not yet present in the Infinispan server, to avoid
// this we need to leverage HotRod transactions that makes sure the changes are propagated to Infinispan
// server in commit phase
//
// In other words, we need to give Infinispan some time to process put request before we let other
// threads query client session created in this transaction
if (isHotRodStore && wasWriting.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
wasWriting.set(false);
}
l.unlock();
}
@Override
public void rollback() {
l.unlock();
}
@Override
public void setRollbackOnly() {
}
@Override
public boolean getRollbackOnly() {
return false;
}
@Override
public boolean isActive() {
return false;
}
});
}
}

View file

@ -23,8 +23,11 @@ import org.keycloak.models.KeycloakSession;
import org.keycloak.models.ModelException;
import org.keycloak.models.RealmModel;
import org.keycloak.models.RealmProvider;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.models.locking.LockAcquiringTimeoutException;
import org.keycloak.models.map.storage.MapStorageProvider;
import org.keycloak.models.map.storage.MapStorageSpi;
import org.keycloak.models.map.storage.hotRod.HotRodMapStorageProviderFactory;
import org.keycloak.models.map.storage.jpa.JpaMapStorageProviderFactory;
import org.keycloak.testsuite.model.KeycloakModelTest;
import org.keycloak.testsuite.model.RequireProvider;
@ -34,7 +37,10 @@ import org.keycloak.utils.LockObjectsForModification;
import javax.persistence.OptimisticLockException;
import javax.persistence.PessimisticLockException;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@ -56,6 +62,10 @@ public class StorageTransactionTest extends KeycloakModelTest {
RealmModel r = s.realms().createRealm("1");
r.setDefaultRole(s.roles().addRealmRole(r, Constants.DEFAULT_ROLES_ROLE_PREFIX + "-" + r.getName()));
r.setAttribute("k1", "v1");
r.setSsoSessionIdleTimeout(1000);
r.setSsoSessionMaxLifespan(2000);
realmId = r.getId();
}
@ -71,92 +81,109 @@ public class StorageTransactionTest extends KeycloakModelTest {
@Test
public void testTwoTransactionsSequentially() throws Exception {
TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory());
tx1.begin();
assertThat(
tx1.runStep(session -> {
session.realms().getRealm(realmId).setAttribute("k2", "v1");
return session.realms().getRealm(realmId).getAttribute("k2");
}), equalTo("v1"));
tx1.commit();
tx2.begin();
assertThat(
tx2.runStep(session -> session.realms().getRealm(realmId).getAttribute("k2")),
equalTo("v1"));
tx2.commit();
try (TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory())) {
tx1.begin();
assertThat(
tx1.runStep(session -> {
session.realms().getRealm(realmId).setAttribute("k2", "v1");
return session.realms().getRealm(realmId).getAttribute("k2");
}), equalTo("v1"));
tx1.commit();
tx2.begin();
assertThat(
tx2.runStep(session -> session.realms().getRealm(realmId).getAttribute("k2")),
equalTo("v1"));
tx2.commit();
}
}
@Test
public void testRepeatableRead() {
TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory());
TransactionController tx3 = new TransactionController(getFactory());
public void testRepeatableRead() throws Exception {
try (TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory());
TransactionController tx3 = new TransactionController(getFactory())) {
tx1.begin();
tx2.begin();
tx3.begin();
tx1.begin();
tx2.begin();
tx3.begin();
// Read original value in tx1
assertThat(
tx1.runStep(session -> session.realms().getRealm(realmId).getAttribute("k1")),
equalTo("v1"));
// Read original value in tx1
assertThat(
tx1.runStep(session -> session.realms().getRealm(realmId).getAttribute("k1")),
equalTo("v1"));
// change value to new in tx2
tx2.runStep(session -> {
session.realms().getRealm(realmId).setAttribute("k1", "v2");
return null;
// change value to new in tx2
tx2.runStep(session -> {
session.realms().getRealm(realmId).setAttribute("k1", "v2");
return null;
});
tx2.commit();
// tx1 should still return the value that already read
assertThat(
tx1.runStep(session -> session.realms().getRealm(realmId).getAttribute("k1")),
equalTo("v1"));
// tx3 should return the new value
assertThat(
tx3.runStep(session -> session.realms().getRealm(realmId).getAttribute("k1")),
equalTo("v2"));
tx1.commit();
tx3.commit();
}
}
@Test
// LockObjectForModification currently works only in map-jpa and map-hotrod
@RequireProvider(value = MapStorageProvider.class, only = { JpaMapStorageProviderFactory.PROVIDER_ID, HotRodMapStorageProviderFactory.PROVIDER_ID})
public void testLockObjectForModificationById() throws Exception {
testLockObjectForModification(session -> LockObjectsForModification.lockRealmsForModification(session, () -> session.realms().getRealm(realmId)));
}
@Test
// LockObjectForModification currently works only in map-jpa and map-hotrod
@RequireProvider(value = MapStorageProvider.class, only = { JpaMapStorageProviderFactory.PROVIDER_ID, HotRodMapStorageProviderFactory.PROVIDER_ID})
public void testLockUserSessionForModificationByQuery() throws Exception {
// Create user session
final String sessionId = withRealm(realmId, (session, realm) -> {
UserModel myUser = session.users().addUser(realm, "myUser");
return session.sessions().createUserSession(realm, myUser, "myUser", "127.0.0.1", "form", true, null, null).getId();
});
tx2.commit();
// tx1 should still return the value that already read
assertThat(
tx1.runStep(session -> session.realms().getRealm(realmId).getAttribute("k1")),
equalTo("v1"));
// tx3 should return the new value
assertThat(
tx3.runStep(session -> session.realms().getRealm(realmId).getAttribute("k1")),
equalTo("v2"));
tx1.commit();
tx3.commit();
testLockObjectForModification(session -> LockObjectsForModification.lockUserSessionsForModification(session, readUserSessionByIdUsingQueryParameters(session, sessionId)));
}
@Test
// LockObjectForModification is currently used only in map-jpa
@RequireProvider(value = MapStorageProvider.class, only = JpaMapStorageProviderFactory.PROVIDER_ID)
public void testLockObjectForModification() {
private <R> void testLockObjectForModification(Function<KeycloakSession, R> lockedExecution) throws Exception {
String originalTimeoutValue = System.getProperty(LOCK_TIMEOUT_SYSTEM_PROPERTY);
try {
System.setProperty(LOCK_TIMEOUT_SYSTEM_PROPERTY, "300");
reinitializeKeycloakSessionFactory();
try (TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory());
TransactionController tx3 = new TransactionController(getFactory())) {
TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory());
TransactionController tx3 = new TransactionController(getFactory());
tx1.begin();
tx2.begin();
tx1.begin();
tx2.begin();
// tx1 acquires lock
tx1.runStep(lockedExecution);
// tx1 acquires lock
tx1.runStep(session -> LockObjectsForModification.lockRealmsForModification(session, () -> session.realms().getRealm(realmId)));
// tx2 should fail as tx1 locked the realm
assertException(() -> tx2.runStep(lockedExecution),
anyOf(allOf(instanceOf(ModelException.class), hasCause(anyOf(instanceOf(PessimisticLockException.class), instanceOf(org.hibernate.PessimisticLockException.class)))),
instanceOf(LockAcquiringTimeoutException.class)));
// tx2 should fail as tx1 locked the realm
assertException(() -> tx2.runStep(session -> LockObjectsForModification.lockRealmsForModification(session, () -> session.realms().getRealm(realmId))),
allOf(instanceOf(ModelException.class),
hasCause(instanceOf(PessimisticLockException.class))));
// end both transactions
tx2.rollback();
tx1.commit();
// end both transactions
tx2.rollback();
tx1.commit();
// start new transaction and read again, it should be successful
tx3.begin();
tx3.runStep(session -> LockObjectsForModification.lockRealmsForModification(session, () -> session.realms().getRealm(realmId)));
tx3.commit();
// start new transaction and read again, it should be successful
tx3.begin();
tx3.runStep(lockedExecution);
tx3.commit();
}
} finally {
if (originalTimeoutValue == null) {
System.clearProperty(LOCK_TIMEOUT_SYSTEM_PROPERTY);
@ -167,38 +194,44 @@ public class StorageTransactionTest extends KeycloakModelTest {
}
}
private LockObjectsForModification.CallableWithoutThrowingAnException<UserSessionModel> readUserSessionByIdUsingQueryParameters(KeycloakSession session, String sessionId) {
RealmModel realm = session.realms().getRealm(realmId);
return () -> session.sessions().getUserSession(realm, sessionId);
}
@Test
// Optimistic locking works only with map-jpa
@RequireProvider(value = MapStorageProvider.class, only = JpaMapStorageProviderFactory.PROVIDER_ID)
public void testOptimisticLockingException() {
public void testOptimisticLockingException() throws Exception {
withRealm(realmId, (session, realm) -> {
realm.setDisplayName("displayName1");
return null;
});
TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory());
try (TransactionController tx1 = new TransactionController(getFactory());
TransactionController tx2 = new TransactionController(getFactory())) {
// tx1 acquires lock
tx1.begin();
tx2.begin();
// tx1 acquires lock
tx1.begin();
tx2.begin();
// both transactions touch the same entity
tx1.runStep(session -> {
session.realms().getRealm(realmId).setDisplayName("displayName2");
return null;
});
tx2.runStep(session -> {
session.realms().getRealm(realmId).setDisplayName("displayName3");
return null;
});
// both transactions touch the same entity
tx1.runStep(session -> {
session.realms().getRealm(realmId).setDisplayName("displayName2");
return null;
});
tx2.runStep(session -> {
session.realms().getRealm(realmId).setDisplayName("displayName3");
return null;
});
// tx1 transaction should be successful
tx1.commit();
// tx1 transaction should be successful
tx1.commit();
// tx2 should fail as tx1 already changed the value
assertException(tx2::commit,
allOf(instanceOf(ModelException.class),
hasCause(instanceOf(OptimisticLockException.class))));
// tx2 should fail as tx1 already changed the value
assertException(tx2::commit,
allOf(instanceOf(ModelException.class),
hasCause(instanceOf(OptimisticLockException.class))));
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates
* Copyright 2022 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -21,32 +21,112 @@ import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakTransactionManager;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
public class TransactionController {
private final KeycloakSession session;
/**
* This controller adds possibility to manually control more transaction within
* one test case.
* <p />
* It uses ExecutorService to run each transaction in a separate thread. This
* is necessary, for example, for pessimistic locking in HotRod as the locks
* needs to be reentrant by the same thread. If this is running in one thread
* the pessimistic locking does not work as all transactions are able to
* acquire the same lock repeatedly.
*/
public class TransactionController implements AutoCloseable {
private final AtomicReference<KeycloakSession> session = new AtomicReference<>();
private final ExecutorService executor;
private final AtomicReference<String> threadName = new AtomicReference<>();
public TransactionController(KeycloakSessionFactory sessionFactory) {
session = sessionFactory.create();
executor = Executors.newSingleThreadExecutor();
CountDownLatch latch = new CountDownLatch(1);
executor.execute(() -> {
threadName.set(Thread.currentThread().getName());
latch.countDown();
});
try {
if (!latch.await(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Initialization of TransactionController timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
executeAndWaitUntilFinished(() -> threadName.set(Thread.currentThread().getName()));
executeAndWaitUntilFinished(() -> session.set(sessionFactory.create()));
}
public void begin() {
getTransactionManager().begin();
executeAndWaitUntilFinished(() -> getTransactionManager().begin());
}
public void commit() {
getTransactionManager().commit();
executeAndWaitUntilFinished(() -> getTransactionManager().commit());
}
public void rollback() {
getTransactionManager().rollback();
executeAndWaitUntilFinished(() -> getTransactionManager().rollback());
}
public <R> R runStep(Function<KeycloakSession, R> task) {
return task.apply(session);
AtomicReference<R> result = new AtomicReference<>();
executeAndWaitUntilFinished(() -> result.set(task.apply(session.get())));
return result.get();
}
private KeycloakTransactionManager getTransactionManager() {
return session.getTransactionManager();
return session.get().getTransactionManager();
}
private void executeAndWaitUntilFinished(Runnable runnable) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<RuntimeException> exception = new AtomicReference<>();
executor.execute(() -> {
if (!Objects.equals(threadName.get(), Thread.currentThread().getName())) {
throw new RuntimeException("Execution running in different thread");
}
try {
runnable.run();
} catch (RuntimeException ex) {
exception.set(ex);
} finally {
latch.countDown();
}
});
try {
if (!latch.await(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Waiting for the operation to finish timed out");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (exception.get() != null) {
throw exception.get();
}
}
@Override
public void close() throws Exception {
// Shutdown executor
executor.shutdown();
try {
// Wait until it is terminated
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
// Shutdown forcefully
executor.shutdownNow();
}
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}