parent
071fc03f41
commit
6566b58be1
21 changed files with 763 additions and 152 deletions
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.keycloak.common.util;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
|
@ -81,6 +82,20 @@ public class Retry {
|
|||
|
||||
|
||||
public static int executeWithBackoff(AdvancedRunnable runnable, ThrowableCallback throwableCallback, int attemptsCount, int intervalBaseMillis) {
|
||||
long duration = 0;
|
||||
for (int i = 0; i < attemptsCount; i++) {
|
||||
duration += computeIterationBase(intervalBaseMillis, i);
|
||||
}
|
||||
return executeWithBackoff(runnable, throwableCallback, Duration.ofMillis(duration), intervalBaseMillis);
|
||||
}
|
||||
|
||||
public static int executeWithBackoff(AdvancedRunnable runnable, Duration timeout, int intervalBaseMillis) {
|
||||
return executeWithBackoff(runnable, null, timeout, intervalBaseMillis);
|
||||
}
|
||||
|
||||
public static int executeWithBackoff(AdvancedRunnable runnable, ThrowableCallback throwableCallback, Duration timeout, int intervalBaseMillis) {
|
||||
long maximumTime = Time.currentTimeMillis() + timeout.toMillis();
|
||||
|
||||
int iteration = 0;
|
||||
while (true) {
|
||||
try {
|
||||
|
@ -92,9 +107,8 @@ public class Retry {
|
|||
throwableCallback.handleThrowable(iteration, e);
|
||||
}
|
||||
|
||||
attemptsCount--;
|
||||
iteration++;
|
||||
if (attemptsCount > 0) {
|
||||
if (Time.currentTimeMillis() < maximumTime) {
|
||||
try {
|
||||
if (intervalBaseMillis > 0) {
|
||||
int delay = computeBackoffInterval(intervalBaseMillis, iteration);
|
||||
|
@ -112,10 +126,12 @@ public class Retry {
|
|||
}
|
||||
|
||||
private static int computeBackoffInterval(int base, int iteration) {
|
||||
int iterationBase = base * (int)Math.pow(2, iteration);
|
||||
return new Random().nextInt(iterationBase);
|
||||
return new Random().nextInt(computeIterationBase(base, iteration));
|
||||
}
|
||||
|
||||
private static int computeIterationBase(int base, int iteration) {
|
||||
return base * (1 << iteration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
|
||||
|
|
|
@ -18,20 +18,24 @@
|
|||
package org.keycloak.models.dblock;
|
||||
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.models.locking.GlobalLock;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionTaskWithResult;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.keycloak.models.locking.GlobalLock.Constants.KEYCLOAK_BOOT;
|
||||
import static org.keycloak.models.locking.GlobalLockProvider.Constants.KEYCLOAK_BOOT;
|
||||
|
||||
public class DBLockGlobalLockProvider implements GlobalLockProvider {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(DBLockGlobalLockProvider.class);
|
||||
public static final String DATABASE = "database";
|
||||
private final KeycloakSession session;
|
||||
private final DBLockProvider dbLockProvider;
|
||||
public DBLockGlobalLockProvider(DBLockProvider dbLockProvider) {
|
||||
public DBLockGlobalLockProvider(KeycloakSession session, DBLockProvider dbLockProvider) {
|
||||
this.session = session;
|
||||
this.dbLockProvider = dbLockProvider;
|
||||
}
|
||||
|
||||
|
@ -46,15 +50,31 @@ public class DBLockGlobalLockProvider implements GlobalLockProvider {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a new non-reentrant global lock that is visible to all Keycloak nodes. If the lock was successfully
|
||||
* acquired the method runs the {@code task} and return result to the caller.
|
||||
* <p />
|
||||
* See {@link GlobalLockProvider#withLock(String, Duration, KeycloakSessionTaskWithResult)} for more details.
|
||||
* <p />
|
||||
* This implementation does NOT meet all requirements from the JavaDoc of {@link GlobalLockProvider#withLock(String, Duration, KeycloakSessionTaskWithResult)}
|
||||
* because {@link DBLockProvider} does not provide a way to lock and unlock in separate transactions.
|
||||
* Also, the database schema update currently requires to be running in the same thread that initiated the update
|
||||
* therefore the {@code task} is also running in the caller thread/transaction.
|
||||
*/
|
||||
@Override
|
||||
public GlobalLock acquire(String lockName, Duration timeToWaitForLock) {
|
||||
public <V> V withLock(String lockName, Duration timeToWaitForLock, KeycloakSessionTaskWithResult<V> task) {
|
||||
Objects.requireNonNull(lockName, "lockName cannot be null");
|
||||
|
||||
if (timeToWaitForLock != null) {
|
||||
LOG.debug("DBLockGlobalLockProvider does not support setting timeToWaitForLock per lock.");
|
||||
}
|
||||
|
||||
try {
|
||||
dbLockProvider.waitForLock(stringToNamespace(lockName));
|
||||
return () -> releaseLock(lockName);
|
||||
return task.run(session);
|
||||
} finally {
|
||||
releaseLock(lockName);
|
||||
}
|
||||
}
|
||||
|
||||
private void releaseLock(String lockName) {
|
||||
|
|
|
@ -37,7 +37,7 @@ public class DBLockGlobalLockProviderFactory implements GlobalLockProviderFactor
|
|||
dbLockManager.checkForcedUnlock();
|
||||
}
|
||||
|
||||
return new DBLockGlobalLockProvider(dbLockManager.getDBLock());
|
||||
return new DBLockGlobalLockProvider(session, dbLockManager.getDBLock());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.keycloak.models.map.storage.hotRod.connections;
|
||||
|
||||
import org.infinispan.client.hotrod.Flag;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.RemoteCacheManager;
|
||||
import org.infinispan.client.hotrod.RemoteCacheManagerAdmin;
|
||||
|
@ -28,12 +29,15 @@ import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants;
|
|||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
import org.keycloak.models.map.storage.hotRod.locking.HotRodLocksUtils;
|
||||
import org.keycloak.models.map.storage.hotRod.common.HotRodEntityDescriptor;
|
||||
import org.keycloak.models.map.storage.hotRod.common.CommonPrimitivesProtoSchemaInitializer;
|
||||
import org.keycloak.models.map.storage.hotRod.common.HotRodVersionUtils;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
@ -50,18 +54,23 @@ import static org.keycloak.models.map.storage.hotRod.common.HotRodVersionUtils.i
|
|||
public class DefaultHotRodConnectionProviderFactory implements HotRodConnectionProviderFactory {
|
||||
|
||||
public static final String PROVIDER_ID = "default";
|
||||
|
||||
public static final String HOT_ROD_LOCKS_CACHE_NAME = "locks";
|
||||
private static final String HOT_ROD_INIT_LOCK_NAME = "HOT_ROD_INIT_LOCK";
|
||||
private static final Logger LOG = Logger.getLogger(DefaultHotRodConnectionProviderFactory.class);
|
||||
|
||||
private org.keycloak.Config.Scope config;
|
||||
|
||||
private RemoteCacheManager remoteCacheManager;
|
||||
private volatile RemoteCacheManager remoteCacheManager;
|
||||
|
||||
@Override
|
||||
public HotRodConnectionProvider create(KeycloakSession session) {
|
||||
if (remoteCacheManager == null) {
|
||||
synchronized (this) {
|
||||
if (remoteCacheManager == null) {
|
||||
lazyInit();
|
||||
}
|
||||
}
|
||||
}
|
||||
return new DefaultHotRodConnectionProvider(remoteCacheManager);
|
||||
}
|
||||
|
||||
|
@ -111,8 +120,17 @@ public class DefaultHotRodConnectionProviderFactory implements HotRodConnectionP
|
|||
|
||||
remoteBuilder.addContextInitializer(CommonPrimitivesProtoSchemaInitializer.INSTANCE);
|
||||
ENTITY_DESCRIPTOR_MAP.values().stream().map(HotRodEntityDescriptor::getProtoSchema).forEach(remoteBuilder::addContextInitializer);
|
||||
|
||||
// Configure settings necessary for locking
|
||||
configureLocking(remoteBuilder);
|
||||
|
||||
remoteCacheManager = new RemoteCacheManager(remoteBuilder.build());
|
||||
|
||||
// Acquire initial phase lock to avoid concurrent schema update
|
||||
RemoteCache<String, String> locksCache = remoteCacheManager.getCache(HOT_ROD_LOCKS_CACHE_NAME);
|
||||
try {
|
||||
HotRodLocksUtils.repeatPutIfAbsent(locksCache, HOT_ROD_INIT_LOCK_NAME, Duration.ofMillis(900), 50);
|
||||
|
||||
Set<String> remoteCaches = ENTITY_DESCRIPTOR_MAP.values().stream()
|
||||
.map(HotRodEntityDescriptor::getCacheName).collect(Collectors.toSet());
|
||||
|
||||
|
@ -137,6 +155,18 @@ public class DefaultHotRodConnectionProviderFactory implements HotRodConnectionP
|
|||
}
|
||||
|
||||
LOG.infof("HotRod client configuration was successful.");
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
if (!HotRodLocksUtils.removeWithInstanceIdentifier(locksCache, HOT_ROD_INIT_LOCK_NAME)) {
|
||||
throw new RuntimeException("Cannot release HotRod init lock");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void configureLocking(ConfigurationBuilder builder) {
|
||||
builder.remoteCache(HOT_ROD_LOCKS_CACHE_NAME)
|
||||
.configurationURI(getCacheConfigUri(HOT_ROD_LOCKS_CACHE_NAME));
|
||||
}
|
||||
|
||||
private void registerSchemata() {
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.map.storage.hotRod.locking;
|
||||
|
||||
import org.infinispan.client.hotrod.Flag;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionTaskWithResult;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.keycloak.common.util.StackUtil.getShortStackTrace;
|
||||
|
||||
|
||||
public class HotRodGlobalLockProvider implements GlobalLockProvider {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(HotRodGlobalLockProvider.class);
|
||||
private final KeycloakSession session;
|
||||
private final RemoteCache<String, String> locksCache;
|
||||
private final long defaultTimeoutMilliseconds;
|
||||
|
||||
public HotRodGlobalLockProvider(KeycloakSession session, RemoteCache<String, String> locksCache, long defaultTimeoutMilliseconds) {
|
||||
this.locksCache = locksCache;
|
||||
this.defaultTimeoutMilliseconds = defaultTimeoutMilliseconds;
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> V withLock(String lockName, Duration timeToWaitForLock, KeycloakSessionTaskWithResult<V> task) throws LockAcquiringTimeoutException {
|
||||
Objects.requireNonNull(lockName, "lockName cannot be null");
|
||||
|
||||
if (timeToWaitForLock == null) {
|
||||
// Set default timeout if null provided
|
||||
timeToWaitForLock = Duration.ofMillis(defaultTimeoutMilliseconds);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.debugf("Acquiring lock [%s].%s", lockName, getShortStackTrace());
|
||||
HotRodLocksUtils.repeatPutIfAbsent(locksCache, lockName, timeToWaitForLock, 50);
|
||||
LOG.debugf("Lock acquired [%s]. Continuing with task execution.", lockName);
|
||||
|
||||
return KeycloakModelUtils.runJobInTransactionWithResult(session.getKeycloakSessionFactory(), task);
|
||||
} finally {
|
||||
LOG.debugf("Releasing lock [%s].%s", lockName, getShortStackTrace());
|
||||
boolean result = HotRodLocksUtils.removeWithInstanceIdentifier(locksCache, lockName);
|
||||
LOG.debugf("Lock [%s] release resulted with %s", lockName, result);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceReleaseAllLocks() {
|
||||
locksCache.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.map.storage.hotRod.locking;
|
||||
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.keycloak.Config;
|
||||
import org.keycloak.common.Profile;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.locking.GlobalLockProviderFactory;
|
||||
import org.keycloak.models.map.storage.hotRod.connections.HotRodConnectionProvider;
|
||||
import org.keycloak.provider.EnvironmentDependentProviderFactory;
|
||||
|
||||
public class HotRodGlobalLockProviderFactory implements GlobalLockProviderFactory, EnvironmentDependentProviderFactory {
|
||||
|
||||
public static final String PROVIDER_ID = "hotrod";
|
||||
protected static final String HOT_ROD_LOCKS_CACHE = "locks";
|
||||
|
||||
private RemoteCache<String, String> locksCache;
|
||||
private long defaultTimeoutMilliseconds;
|
||||
|
||||
|
||||
@Override
|
||||
public GlobalLockProvider create(KeycloakSession session) {
|
||||
if (locksCache == null) {
|
||||
lazyInit(session);
|
||||
}
|
||||
|
||||
return new HotRodGlobalLockProvider(session, locksCache, defaultTimeoutMilliseconds);
|
||||
}
|
||||
|
||||
private void lazyInit(KeycloakSession session) {
|
||||
HotRodConnectionProvider hotRodConnectionProvider = session.getProvider(HotRodConnectionProvider.class);
|
||||
locksCache = hotRodConnectionProvider.getRemoteCache(HOT_ROD_LOCKS_CACHE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Config.Scope config) {
|
||||
defaultTimeoutMilliseconds = config.getLong("defaultTimeoutMilliseconds", 5000L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postInit(KeycloakSessionFactory factory) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return PROVIDER_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupported() {
|
||||
return Profile.isFeatureEnabled(Profile.Feature.MAP_STORAGE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.map.storage.hotRod.locking;
|
||||
|
||||
import org.infinispan.client.hotrod.Flag;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.keycloak.common.util.Retry;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class HotRodLocksUtils {
|
||||
|
||||
public static final String SEPARATOR = ";";
|
||||
private static final String INSTANCE_IDENTIFIER = getKeycloakInstanceIdentifier();
|
||||
|
||||
/**
|
||||
* Repeatedly attempts to put an entry with the key {@code lockName}
|
||||
* to the {@code locksCache}. Succeeds only if there is no entry with
|
||||
* the same key already.
|
||||
* <p/>
|
||||
* Execution of this method is time bounded, if this method does not
|
||||
* succeed within {@code timeoutMilliseconds} it gives up and returns
|
||||
* false.
|
||||
* <p/>
|
||||
* There is a pause after each unsuccessful attempt equal to
|
||||
* {@code repeatInterval} milliseconds
|
||||
*
|
||||
* @param locksCache Cache that will be used for putting the value
|
||||
* @param lockName Name of the entry
|
||||
* @param timeout duration to wait until the lock is acquired
|
||||
* @param repeatInterval Number of milliseconds to wait after each
|
||||
* unsuccessful attempt
|
||||
* @throws LockAcquiringTimeoutException the key {@code lockName} was NOT put into the {@code map}
|
||||
* within time boundaries
|
||||
* @throws IllegalStateException when a {@code lock} value found in the storage has wrong format. It is expected
|
||||
* the lock value has the following format {@code 'timeAcquired;keycloakInstanceIdentifier'}
|
||||
*/
|
||||
public static void repeatPutIfAbsent(RemoteCache<String, String> locksCache, String lockName, Duration timeout, int repeatInterval) throws LockAcquiringTimeoutException {
|
||||
final AtomicReference<String> currentOwnerRef = new AtomicReference<>(null);
|
||||
try {
|
||||
Retry.executeWithBackoff(i -> {
|
||||
String curr = locksCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(lockName, Time.currentTimeMillis() + SEPARATOR + INSTANCE_IDENTIFIER);
|
||||
currentOwnerRef.set(curr);
|
||||
if (curr != null) {
|
||||
throw new AssertionError("Acquiring lock in iteration " + i + " was not successful");
|
||||
}
|
||||
}, timeout, repeatInterval);
|
||||
} catch (AssertionError ex) {
|
||||
String currentOwner = currentOwnerRef.get();
|
||||
String[] split = currentOwner == null ? null : currentOwner.split(SEPARATOR, 2);
|
||||
if (currentOwner == null || split.length != 2) throw new IllegalStateException("Bad lock value format found in storage for lock " + lockName + ". " +
|
||||
"It is expected the format to be 'timeAcquired;keycloakInstanceIdentifier' but was " + currentOwner);
|
||||
throw new LockAcquiringTimeoutException(lockName, split[1], Instant.ofEpochMilli(Long.parseLong(split[0])));
|
||||
}
|
||||
}
|
||||
|
||||
private static String getKeycloakInstanceIdentifier() {
|
||||
long pid = ProcessHandle.current().pid();
|
||||
String hostname;
|
||||
try {
|
||||
hostname = InetAddress.getLocalHost().getHostName();
|
||||
} catch (UnknownHostException e) {
|
||||
hostname = "unknown-host";
|
||||
}
|
||||
|
||||
return pid + "@" + hostname;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the entry with key {@code lockName} from map if the value
|
||||
* of the entry is equal to this node's identifier
|
||||
*
|
||||
* @param map Map that will be used for removing
|
||||
* @param lockName Name of the entry
|
||||
* @return true if the entry was removed, false otherwise
|
||||
*/
|
||||
public static boolean removeWithInstanceIdentifier(ConcurrentMap<String, String> map, String lockName) {
|
||||
String value = map.get(lockName);
|
||||
if (value != null && value.endsWith(INSTANCE_IDENTIFIER)) {
|
||||
map.remove(lockName);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
#
|
||||
# Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
# and other contributors as indicated by the @author tags.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
org.keycloak.models.map.storage.hotRod.locking.HotRodGlobalLockProviderFactory
|
|
@ -0,0 +1,19 @@
|
|||
<!--
|
||||
~ Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
~ and other contributors as indicated by the @author tags.
|
||||
~
|
||||
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||
~ you may not use this file except in compliance with the License.
|
||||
~ You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
<replicated-cache name="locks" mode="SYNC">
|
||||
<encoding media-type="text/plain"/>
|
||||
</replicated-cache>
|
|
@ -27,7 +27,6 @@ import java.util.LinkedHashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
|
@ -75,7 +74,6 @@ import org.keycloak.models.RoleModel;
|
|||
import org.keycloak.models.UserLoginFailureModel;
|
||||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.UserSessionModel;
|
||||
import org.keycloak.models.locking.GlobalLock;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
import org.keycloak.models.map.client.MapProtocolMapperEntity;
|
||||
|
@ -146,7 +144,6 @@ import org.keycloak.models.map.storage.jpa.user.entity.JpaUserEntity;
|
|||
import org.keycloak.models.map.storage.jpa.user.entity.JpaUserFederatedIdentityEntity;
|
||||
import org.keycloak.models.map.user.MapUserCredentialEntity;
|
||||
import org.keycloak.models.map.user.MapUserCredentialEntityImpl;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
import org.keycloak.provider.EnvironmentDependentProviderFactory;
|
||||
import org.keycloak.sessions.RootAuthenticationSessionModel;
|
||||
import org.keycloak.transaction.JtaTransactionManagerLookup;
|
||||
|
@ -495,13 +492,13 @@ public class JpaMapStorageProviderFactory implements
|
|||
}
|
||||
|
||||
private void update(Class<?> modelType, Connection connection, KeycloakSession session) {
|
||||
KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), (KeycloakSession lockSession) -> {
|
||||
GlobalLockProvider globalLock = session.getProvider(GlobalLockProvider.class);
|
||||
try (GlobalLock l = globalLock.acquireLock(modelType.getName())) {
|
||||
session.getProvider(MapJpaUpdaterProvider.class).update(modelType, connection, config.get("schema"));
|
||||
try {
|
||||
session.getProvider(GlobalLockProvider.class).withLock(modelType.getName(), lockedSession -> {
|
||||
lockedSession.getProvider(MapJpaUpdaterProvider.class).update(modelType, connection, config.get("schema"));
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException("Acquiring " + modelType.getName() + " failed.", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -327,7 +327,18 @@ final class StoragePropertyMappers {
|
|||
}
|
||||
|
||||
private static Optional<String> getGlobalLockProvider(Optional<String> storage, ConfigSourceInterceptorContext context) {
|
||||
return of(storage.isEmpty() ? "dblock" : "none");
|
||||
try {
|
||||
if (storage.isPresent()) {
|
||||
return of(storage.map(StorageType::valueOf)
|
||||
.filter(type -> type.equals(StorageType.hotrod))
|
||||
.map(StorageType::getProvider)
|
||||
.orElse("none"));
|
||||
}
|
||||
} catch (IllegalArgumentException iae) {
|
||||
throw new IllegalArgumentException("Invalid storage provider: " + storage.orElse(null), iae);
|
||||
}
|
||||
|
||||
return of("dblock");
|
||||
}
|
||||
|
||||
private static Optional<String> getUserSessionPersisterStorage(Optional<String> storage, ConfigSourceInterceptorContext context) {
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.sql.Statement;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.enterprise.inject.Instance;
|
||||
import javax.persistence.EntityManager;
|
||||
|
@ -52,7 +51,6 @@ import org.keycloak.migration.ModelVersion;
|
|||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.dblock.DBLockGlobalLockProvider;
|
||||
import org.keycloak.models.locking.GlobalLock;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
import org.keycloak.provider.EnvironmentDependentProviderFactory;
|
||||
|
@ -292,8 +290,11 @@ public class LegacyJpaConnectionProviderFactory extends AbstractJpaConnectionPro
|
|||
|
||||
private void update(Connection connection, String schema, KeycloakSession session, JpaUpdaterProvider updater) {
|
||||
GlobalLockProvider globalLock = session.getProvider(GlobalLockProvider.class);
|
||||
try (GlobalLock l = globalLock.acquireLock(DBLockGlobalLockProvider.DATABASE)) {
|
||||
try {
|
||||
globalLock.withLock(DBLockGlobalLockProvider.DATABASE, innerSession -> {
|
||||
updater.update(connection, schema);
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException("Acquiring database failed.", e);
|
||||
}
|
||||
|
@ -302,8 +303,11 @@ public class LegacyJpaConnectionProviderFactory extends AbstractJpaConnectionPro
|
|||
private void export(Connection connection, String schema, File databaseUpdateFile, KeycloakSession session,
|
||||
JpaUpdaterProvider updater) {
|
||||
GlobalLockProvider globalLock = session.getProvider(GlobalLockProvider.class);
|
||||
try (GlobalLock l = globalLock.acquireLock(DBLockGlobalLockProvider.DATABASE)) {
|
||||
try {
|
||||
globalLock.withLock(DBLockGlobalLockProvider.DATABASE, innerSession -> {
|
||||
updater.export(connection, schema, databaseUpdateFile);
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException("Acquiring database failed.", e);
|
||||
}
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.locking;
|
||||
|
||||
/**
|
||||
* An object of this type represents a successfully acquired global lock provided by {@link GlobalLockProvider}
|
||||
*/
|
||||
public interface GlobalLock extends AutoCloseable {
|
||||
|
||||
/**
|
||||
* Releases the lock represented by this
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
public static class Constants {
|
||||
public static final String KEYCLOAK_BOOT = "keycloak-boot";
|
||||
}
|
||||
}
|
|
@ -17,62 +17,80 @@
|
|||
|
||||
package org.keycloak.models.locking;
|
||||
|
||||
import org.keycloak.models.KeycloakSessionTaskWithResult;
|
||||
import org.keycloak.provider.Provider;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public interface GlobalLockProvider extends Provider {
|
||||
|
||||
/**
|
||||
* Effectively the same as {@code acquire(lockName, null)}
|
||||
* <p />
|
||||
* This method is intended to be used in a {@code try}-with-resources block.
|
||||
*
|
||||
* @param lockName Identifier used for acquiring lock. Can be any non-null string.
|
||||
* @return Instance of {@link GlobalLock} representing successfully acquired global lock.
|
||||
* @throws LockAcquiringTimeoutException When acquiring the global lock times out
|
||||
* (see Javadoc of {@link #acquire(String, Duration)} for more details on how the time
|
||||
* duration is determined)
|
||||
* @throws NullPointerException When lockName is {@code null}.
|
||||
*/
|
||||
default GlobalLock acquireLock(String lockName) throws LockAcquiringTimeoutException {
|
||||
return acquire(lockName, null);
|
||||
class Constants {
|
||||
public static final String KEYCLOAK_BOOT = "keycloak-boot";
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a new global lock that is visible to all Keycloak nodes. The lock is non-reentrant.
|
||||
* <p />
|
||||
* The lock is guaranteed to be kept until the returned {@link GlobalLock} is closed
|
||||
* using the {@link GlobalLock#close} method.
|
||||
* <p />
|
||||
* Some implementations may benefit from locks that are released at the end of transaction.
|
||||
* For this purpose, the lifespan of the returned lock is limited by the transaction lifespan
|
||||
* of the session which acquired this lock.
|
||||
* <p />
|
||||
* This method is intended to be used in a {@code try}-with-resources block.
|
||||
* Acquires a new non-reentrant global lock that is visible to all Keycloak nodes.
|
||||
* Effectively the same as {@code withLock(lockName, null, task)}
|
||||
*
|
||||
* @param lockName Identifier used for acquiring lock. Can be any non-null string.
|
||||
* @param task The task that will be executed under the acquired lock
|
||||
* @param <V> Type of object returned by the {@code task}
|
||||
* @return Value returned by the {@code task}
|
||||
* @throws LockAcquiringTimeoutException When acquiring the global lock times out
|
||||
* (see Javadoc of {@link #withLock(String, Duration, KeycloakSessionTaskWithResult)} for more details on how the time
|
||||
* duration is determined)
|
||||
* @throws NullPointerException When lockName is {@code null}.
|
||||
*/
|
||||
default <V> V withLock(String lockName, KeycloakSessionTaskWithResult<V> task) throws LockAcquiringTimeoutException {
|
||||
return withLock(lockName, null, task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a new non-reentrant global lock that is visible to all Keycloak nodes. If the lock was successfully
|
||||
* acquired the method runs the {@code task} in a new transaction to ensure all data modified in {@code task}
|
||||
* is committed to the stores before releasing the lock and returning to the caller.
|
||||
* <p/>
|
||||
* If there is another global lock with the same identifier ({@code lockName}) already acquired, this method waits
|
||||
* until the lock is released, however, not more than {@code timeToWaitForLock} duration. If the lock is not
|
||||
* acquired after {@code timeToWaitForLock} duration, the method throws {@link LockAcquiringTimeoutException}
|
||||
* acquired after {@code timeToWaitForLock} duration, the method throws {@link LockAcquiringTimeoutException}.
|
||||
* <p/>
|
||||
* Releasing of the lock is done using instance of {@link GlobalLock} returned by this method.
|
||||
* When the execution of the {@code task} finishes, the acquired lock must be released regardless of the result.
|
||||
* <p/>
|
||||
* <b>A note to implementors of the interface:</b>
|
||||
* <p/>
|
||||
* To make sure acquiring/releasing the lock is visible to all Keycloak nodes it may be needed to run the code that
|
||||
* acquires/releases the lock in a separate transactions. This means together the method can use 3 separate
|
||||
* transactions, for example:
|
||||
* <pre>
|
||||
* try {
|
||||
* KeycloakModelUtils.runJobInTransaction(factory,
|
||||
* innerSession -> /* run code that acquires the lock *\/)
|
||||
*
|
||||
* KeycloakModelUtils.runJobInTransactionWithResult(factory, task)
|
||||
* } finally {
|
||||
* KeycloakModelUtils.runJobInTransaction(factory,
|
||||
* innerSession -> /* run code that releases the lock *\/)
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @param lockName Identifier used for acquiring lock. Can be any non-null string.
|
||||
* @param task The task that will be executed under the acquired lock
|
||||
* @param <V> Type of object returned by the {@code task}
|
||||
* @param timeToWaitForLock Duration this method waits until it gives up acquiring the lock. If {@code null},
|
||||
* each implementation should provide some default duration, for example using
|
||||
* configuration option.
|
||||
* @return Instance of {@link GlobalLock} representing successfully acquired global lock.
|
||||
* each implementation should provide some default duration, for example, using
|
||||
* a configuration option.
|
||||
* @return Value returned by the {@code task}
|
||||
*
|
||||
* @throws LockAcquiringTimeoutException When the method waits for {@code timeToWaitForLock} duration and the lock is still
|
||||
* not available to acquire.
|
||||
* @throws NullPointerException When {@code lockName} is {@code null}.
|
||||
*/
|
||||
GlobalLock acquire(String lockName, Duration timeToWaitForLock) throws LockAcquiringTimeoutException;
|
||||
<V> V withLock(String lockName, Duration timeToWaitForLock, KeycloakSessionTaskWithResult<V> task) throws LockAcquiringTimeoutException;
|
||||
|
||||
/**
|
||||
* Releases all locks acquired by this GlobalLockProvider.
|
||||
* <p />
|
||||
* This method must unlock all existing locks acquired by this provider regardless of the thread
|
||||
* This method unlocks all existing locks acquired by this provider regardless of the thread
|
||||
* or Keycloak instance that originally acquired them.
|
||||
*/
|
||||
void forceReleaseAllLocks();
|
||||
|
|
|
@ -21,6 +21,8 @@ import org.keycloak.Config;
|
|||
import org.keycloak.common.Profile;
|
||||
import org.keycloak.models.KeycloakSession;
|
||||
import org.keycloak.models.KeycloakSessionFactory;
|
||||
import org.keycloak.models.KeycloakSessionTaskWithResult;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
import org.keycloak.provider.EnvironmentDependentProviderFactory;
|
||||
|
||||
import java.time.Duration;
|
||||
|
@ -31,7 +33,21 @@ public class NoneGlobalLockProviderFactory implements GlobalLockProviderFactory,
|
|||
|
||||
@Override
|
||||
public GlobalLockProvider create(KeycloakSession session) {
|
||||
return INSTANCE;
|
||||
return new GlobalLockProvider() {
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> V withLock(String lockName, Duration timeToWaitForLock, KeycloakSessionTaskWithResult<V> task) {
|
||||
return KeycloakModelUtils.runJobInTransactionWithResult(session.getKeycloakSessionFactory(), task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceReleaseAllLocks() {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,21 +74,4 @@ public class NoneGlobalLockProviderFactory implements GlobalLockProviderFactory,
|
|||
public boolean isSupported() {
|
||||
return Profile.isFeatureEnabled(Profile.Feature.MAP_STORAGE);
|
||||
}
|
||||
|
||||
private static final GlobalLockProvider INSTANCE = new GlobalLockProvider() {
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GlobalLock acquire(String lockName, Duration timeToWaitForLock) {
|
||||
return () -> {};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forceReleaseAllLocks() {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.keycloak.models.ModelDuplicateException;
|
|||
import org.keycloak.models.RealmModel;
|
||||
import org.keycloak.models.UserModel;
|
||||
import org.keycloak.models.UserProvider;
|
||||
import org.keycloak.models.locking.GlobalLock;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
import org.keycloak.models.utils.KeycloakModelUtils;
|
||||
|
@ -71,7 +70,6 @@ import java.util.Optional;
|
|||
import java.util.ServiceLoader;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
|
@ -150,8 +148,8 @@ public class KeycloakApplication extends Application {
|
|||
@Override
|
||||
public void run(KeycloakSession session) {
|
||||
GlobalLockProvider locks = session.getProvider(GlobalLockProvider.class);
|
||||
try (GlobalLock l = locks.acquireLock(GlobalLock.Constants.KEYCLOAK_BOOT)) {
|
||||
exportImportManager[0] = bootstrap();
|
||||
try {
|
||||
exportImportManager[0] = locks.withLock(GlobalLockProvider.Constants.KEYCLOAK_BOOT, innerSession -> bootstrap());
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException("Acquiring keycloak-boot lock failed.", e);
|
||||
}
|
||||
|
|
|
@ -791,7 +791,6 @@
|
|||
The following properties needs to be synchronized across all map-storage profiles
|
||||
-->
|
||||
<keycloak.profile.feature.map_storage>enabled</keycloak.profile.feature.map_storage>
|
||||
<keycloak.globalLock.provider>none</keycloak.globalLock.provider>
|
||||
<keycloak.realm.provider>map</keycloak.realm.provider>
|
||||
<keycloak.client.provider>map</keycloak.client.provider>
|
||||
<keycloak.clientScope.provider>map</keycloak.clientScope.provider>
|
||||
|
@ -817,6 +816,7 @@
|
|||
-->
|
||||
<keycloak.mapStorage.provider>concurrenthashmap</keycloak.mapStorage.provider>
|
||||
<auth.server.quarkus.mapStorage.profile.config>chm</auth.server.quarkus.mapStorage.profile.config>
|
||||
<keycloak.globalLock.provider>none</keycloak.globalLock.provider>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
@ -839,7 +839,6 @@
|
|||
The following properties needs to be synchronized across all map-storage profiles
|
||||
-->
|
||||
<keycloak.profile.feature.map_storage>enabled</keycloak.profile.feature.map_storage>
|
||||
<keycloak.globalLock.provider>none</keycloak.globalLock.provider>
|
||||
<keycloak.realm.provider>map</keycloak.realm.provider>
|
||||
<keycloak.client.provider>map</keycloak.client.provider>
|
||||
<keycloak.clientScope.provider>map</keycloak.clientScope.provider>
|
||||
|
@ -883,6 +882,7 @@
|
|||
<keycloak.user.map.storage.provider>jpa</keycloak.user.map.storage.provider>
|
||||
<keycloak.userSession.map.storage.provider>jpa</keycloak.userSession.map.storage.provider>
|
||||
<auth.server.quarkus.mapStorage.profile.config>jpa</auth.server.quarkus.mapStorage.profile.config>
|
||||
<keycloak.globalLock.provider>none</keycloak.globalLock.provider>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
@ -906,7 +906,6 @@
|
|||
The following properties needs to be synchronized across all map-storage profiles
|
||||
-->
|
||||
<keycloak.profile.feature.map_storage>enabled</keycloak.profile.feature.map_storage>
|
||||
<keycloak.globalLock.provider>none</keycloak.globalLock.provider>
|
||||
<keycloak.realm.provider>map</keycloak.realm.provider>
|
||||
<keycloak.client.provider>map</keycloak.client.provider>
|
||||
<keycloak.clientScope.provider>map</keycloak.clientScope.provider>
|
||||
|
@ -945,6 +944,7 @@
|
|||
<infinispan.version>${infinispan.version}</infinispan.version>
|
||||
<keycloak.testsuite.start-hotrod-container>${keycloak.testsuite.start-hotrod-container}</keycloak.testsuite.start-hotrod-container>
|
||||
<auth.server.quarkus.mapStorage.profile.config>hotrod</auth.server.quarkus.mapStorage.profile.config>
|
||||
<keycloak.globalLock.provider>hotrod</keycloak.globalLock.provider>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
@ -1013,6 +1013,7 @@
|
|||
<keycloak.user.map.storage.provider>jpa</keycloak.user.map.storage.provider>
|
||||
<keycloak.userSession.map.storage.provider>jpa</keycloak.userSession.map.storage.provider>
|
||||
<auth.server.quarkus.mapStorage.profile.config>jpa</auth.server.quarkus.mapStorage.profile.config>
|
||||
<keycloak.globalLock.provider>none</keycloak.globalLock.provider>
|
||||
</systemPropertyVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -37,8 +37,27 @@ public @interface RequireProvider {
|
|||
|
||||
/**
|
||||
* Specifies provider IDs of mandatory provider. There must be at least one provider available
|
||||
* from those in {@code only} array to fulfil this requirement.
|
||||
* from those in {@code only} array to fulfil this requirement. If this is used together with
|
||||
* {@link #exclude()} both rules are applied.
|
||||
* <p />
|
||||
* For example,
|
||||
* When possible providers are: {@code provider1}, {@code provider2}, {@code provider3}
|
||||
* and rules: {@code @RequireProvider{value = MyFactory.class, only = [provider1, provider2], exclude = [provider2]}}
|
||||
* The test will be running only when {@code provider1} is available on the session factory
|
||||
*
|
||||
*/
|
||||
String[] only() default {};
|
||||
|
||||
/**
|
||||
* Specifies provider IDs that does not satisfy this requirement. In other words, there must be another provider
|
||||
* of type {@code value()} for satisfying this requirement. If this is used together with
|
||||
* {@link #only()} both rules are applied.
|
||||
* <p />
|
||||
* For example,
|
||||
* When possible providers are: {@code provider1}, {@code provider2}, {@code provider3}
|
||||
* and rules: {@code @RequireProvider{value = MyFactory.class, only = [provider1, provider2], exclude = [provider2]}}
|
||||
* The test will be running only when {@code provider1} is available on the session factory
|
||||
*/
|
||||
String[] exclude() default {};
|
||||
|
||||
}
|
||||
|
|
|
@ -136,9 +136,7 @@ public abstract class KeycloakModelTest {
|
|||
testClass = testClass.getSuperclass();
|
||||
}
|
||||
List<Class<? extends Provider>> notFound = st
|
||||
.filter(rp -> rp.only().length == 0
|
||||
? getFactory().getProviderFactory(rp.value()) == null
|
||||
: Stream.of(rp.only()).allMatch(provider -> getFactory().getProviderFactory(rp.value(), provider) == null))
|
||||
.filter(KeycloakModelTest::checkProviderAvailability)
|
||||
.map(RequireProvider::value)
|
||||
.collect(Collectors.toList());
|
||||
Assume.assumeThat("Some required providers not found", notFound, Matchers.empty());
|
||||
|
@ -151,6 +149,25 @@ public abstract class KeycloakModelTest {
|
|||
}
|
||||
};
|
||||
|
||||
// Returns true if annotation requirement is not met
|
||||
private static boolean checkProviderAvailability(RequireProvider annotation) {
|
||||
Set<String> allFactories = getFactory().getProviderFactoriesStream(annotation.value()).map(ProviderFactory::getId).collect(Collectors.toSet());
|
||||
List<String> only = Arrays.asList(annotation.only());
|
||||
List<String> exclude = Arrays.asList(annotation.exclude());
|
||||
|
||||
// There is no factory for required provider
|
||||
if (allFactories.isEmpty()) return true;
|
||||
|
||||
// Remove excluded ids
|
||||
allFactories.removeIf(exclude::contains);
|
||||
|
||||
// Remove not matching only
|
||||
allFactories.removeIf(id -> !only.isEmpty() && !only.contains(id));
|
||||
|
||||
// If there is no factory return true
|
||||
return allFactories.isEmpty();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public final TestRule guaranteeRequiredFactoryOnMethod = new TestRule() {
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,211 @@
|
|||
/*
|
||||
* Copyright 2022 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.testsuite.model.globalLock;
|
||||
|
||||
import org.hamcrest.Matchers;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Test;
|
||||
import org.keycloak.models.dblock.DBLockGlobalLockProviderFactory;
|
||||
import org.keycloak.models.locking.GlobalLockProvider;
|
||||
import org.keycloak.models.locking.LockAcquiringTimeoutException;
|
||||
import org.keycloak.models.locking.NoneGlobalLockProviderFactory;
|
||||
import org.keycloak.testsuite.model.KeycloakModelTest;
|
||||
import org.keycloak.testsuite.model.RequireProvider;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
@RequireProvider(value = GlobalLockProvider.class,
|
||||
exclude = { NoneGlobalLockProviderFactory.PROVIDER_ID, DBLockGlobalLockProviderFactory.PROVIDER_ID }
|
||||
)
|
||||
public class GlobalLocksTest extends KeycloakModelTest {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(GlobalLocksTest.class);
|
||||
@Override
|
||||
protected boolean isUseSameKeycloakSessionFactoryForAllThreads() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void concurrentLockingTest() {
|
||||
final String LOCK_NAME = "simpleLockTestLockName";
|
||||
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
int numIterations = 50;
|
||||
Random rand = new Random();
|
||||
List<Integer> resultingList = new LinkedList<>();
|
||||
|
||||
IntStream.range(0, numIterations).parallel().forEach(index -> inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
LOG.infof("Iteration %d entered session", index);
|
||||
|
||||
try {
|
||||
lockProvider.withLock(LOCK_NAME, Duration.ofSeconds(60), innerSession -> {
|
||||
LOG.infof("Iteration %d entered locked block", index);
|
||||
|
||||
// Locked block
|
||||
int c = counter.getAndIncrement();
|
||||
|
||||
try {
|
||||
Thread.sleep(rand.nextInt(100));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
resultingList.add(c);
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}));
|
||||
|
||||
assertThat(resultingList, hasSize(numIterations));
|
||||
assertThat(resultingList, equalTo(IntStream.range(0, 50).boxed().collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lockTimeoutExceptionTest() {
|
||||
final String LOCK_NAME = "lockTimeoutExceptionTestLock";
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
CountDownLatch waitForTheOtherThreadToFail = new CountDownLatch(1);
|
||||
|
||||
IntStream.range(0, 2).parallel().forEach(index -> inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
|
||||
try {
|
||||
lockProvider.withLock(LOCK_NAME, Duration.ofSeconds(2), innerSession -> {
|
||||
int c = counter.incrementAndGet();
|
||||
if (c == 1) {
|
||||
try {
|
||||
waitForTheOtherThreadToFail.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
LOG.infof("Lock acquired by thread %s with counter: %d", Thread.currentThread().getName(), c);
|
||||
throw new RuntimeException("Lock acquired by more than one thread.");
|
||||
}
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
int c = counter.incrementAndGet();
|
||||
LOG.infof("Exception when acquiring lock by thread %s with counter: %d", Thread.currentThread().getName(), c);
|
||||
if (c != 2) {
|
||||
throw new RuntimeException("Acquiring lock failed by different thread than second.");
|
||||
}
|
||||
|
||||
assertThat(e.getMessage(), containsString("Lock [" + LOCK_NAME + "] already acquired by keycloak instance"));
|
||||
waitForTheOtherThreadToFail.countDown();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReleaseAllLocksMethod() throws InterruptedException {
|
||||
final int NUMBER_OF_THREADS = 4;
|
||||
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
|
||||
|
||||
CountDownLatch locksAcquired = new CountDownLatch(NUMBER_OF_THREADS);
|
||||
CountDownLatch testFinished = new CountDownLatch(1);
|
||||
|
||||
try {
|
||||
// Acquire locks and let the threads wait until the end of this test method
|
||||
executor.submit(() -> {
|
||||
IntStream.range(0, NUMBER_OF_THREADS).parallel()
|
||||
.forEach(i ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
lockProvider.withLock("LOCK_" + i, session -> {
|
||||
locksAcquired.countDown();
|
||||
try {
|
||||
testFinished.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
locksAcquired.await();
|
||||
|
||||
// Test no lock can be acquired because all are still hold by the executor above
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
IntStream.range(0, NUMBER_OF_THREADS).parallel()
|
||||
.forEach(i ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
lockProvider.withLock("LOCK_" + i, Duration.ofSeconds(1), is -> {
|
||||
throw new RuntimeException("Acquiring lock should not succeed as it was acquired in the first transaction");
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
})
|
||||
);
|
||||
assertThat(counter.get(), Matchers.equalTo(NUMBER_OF_THREADS));
|
||||
|
||||
// Unlock all locks forcefully
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
lockProvider.forceReleaseAllLocks();
|
||||
});
|
||||
|
||||
// Test all locks can be acquired again
|
||||
counter.set(0);
|
||||
IntStream.range(0, NUMBER_OF_THREADS).parallel()
|
||||
.forEach(i ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
lockProvider.withLock("LOCK_" + i, Duration.ofSeconds(1), is -> counter.incrementAndGet());
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
assertThat(counter.get(), Matchers.equalTo(NUMBER_OF_THREADS));
|
||||
} finally {
|
||||
testFinished.countDown();
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,8 @@ import org.keycloak.models.DeploymentStateSpi;
|
|||
import org.keycloak.models.SingleUseObjectSpi;
|
||||
import org.keycloak.models.UserLoginFailureSpi;
|
||||
import org.keycloak.models.UserSessionSpi;
|
||||
import org.keycloak.models.locking.GlobalLockProviderSpi;
|
||||
import org.keycloak.models.locking.NoneGlobalLockProviderFactory;
|
||||
import org.keycloak.models.map.authSession.MapRootAuthenticationSessionProviderFactory;
|
||||
import org.keycloak.models.map.authorization.MapAuthorizationStoreFactory;
|
||||
import org.keycloak.models.map.client.MapClientProviderFactory;
|
||||
|
@ -41,6 +43,7 @@ import org.keycloak.models.map.role.MapRoleProviderFactory;
|
|||
import org.keycloak.models.map.storage.MapStorageSpi;
|
||||
import org.keycloak.models.map.storage.chm.ConcurrentHashMapStorageProviderFactory;
|
||||
import org.keycloak.models.map.storage.hotRod.HotRodMapStorageProviderFactory;
|
||||
import org.keycloak.models.map.storage.hotRod.locking.HotRodGlobalLockProviderFactory;
|
||||
import org.keycloak.models.map.user.MapUserProviderFactory;
|
||||
import org.keycloak.models.map.userSession.MapUserSessionProviderFactory;
|
||||
import org.keycloak.provider.ProviderFactory;
|
||||
|
@ -72,6 +75,7 @@ public class HotRodMapStorage extends KeycloakModelParameters {
|
|||
static final Set<Class<? extends ProviderFactory>> ALLOWED_FACTORIES = ImmutableSet.<Class<? extends ProviderFactory>>builder()
|
||||
.add(HotRodMapStorageProviderFactory.class)
|
||||
.add(HotRodConnectionProviderFactory.class)
|
||||
.add(HotRodGlobalLockProviderFactory.class)
|
||||
.build();
|
||||
|
||||
private final InfinispanContainer hotRodContainer = new InfinispanContainer();
|
||||
|
@ -92,7 +96,8 @@ public class HotRodMapStorage extends KeycloakModelParameters {
|
|||
.spi(UserSessionSpi.NAME).provider(MapUserSessionProviderFactory.PROVIDER_ID).config(STORAGE_CONFIG, HotRodMapStorageProviderFactory.PROVIDER_ID)
|
||||
.spi(UserLoginFailureSpi.NAME).provider(MapUserLoginFailureProviderFactory.PROVIDER_ID).config(STORAGE_CONFIG, HotRodMapStorageProviderFactory.PROVIDER_ID)
|
||||
.spi(EventStoreSpi.NAME).provider(MapUserSessionProviderFactory.PROVIDER_ID).config("storage-admin-events.provider", HotRodMapStorageProviderFactory.PROVIDER_ID)
|
||||
.config("storage-auth-events.provider", HotRodMapStorageProviderFactory.PROVIDER_ID);
|
||||
.config("storage-auth-events.provider", HotRodMapStorageProviderFactory.PROVIDER_ID)
|
||||
.spi(GlobalLockProviderSpi.GLOBAL_LOCK).defaultProvider(HotRodGlobalLockProviderFactory.PROVIDER_ID);
|
||||
|
||||
cf.spi(MapStorageSpi.NAME)
|
||||
.provider(ConcurrentHashMapStorageProviderFactory.PROVIDER_ID)
|
||||
|
|
Loading…
Reference in a new issue