Remove outdated test code from model/infinispan module
Closes #31661 Signed-off-by: Ryan Emerson <remerson@redhat.com>
This commit is contained in:
parent
4d60c91cb8
commit
69a8509f6c
10 changed files with 0 additions and 2143 deletions
|
@ -1,186 +0,0 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.initializer.DistributedCacheConcurrentWritesTest;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyDistributedRemoveSessionTest {
|
||||
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
|
||||
|
||||
private static final int ITERATIONS = 10000;
|
||||
|
||||
private static final AtomicInteger errorsCounter = new AtomicInteger(0);
|
||||
|
||||
private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
|
||||
private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
|
||||
|
||||
private static Map<String, AtomicInteger> removalCounts = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
private static final UUID CLIENT_1_UUID = UUID.randomUUID();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = DistributedCacheConcurrentWritesTest.createManager("node1").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = DistributedCacheConcurrentWritesTest.createManager("node2").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
// Create caches, listeners and finally worker threads
|
||||
Thread worker1 = createWorker(cache1, 1);
|
||||
Thread worker2 = createWorker(cache2, 2);
|
||||
Thread worker3 = createWorker(cache1, 1);
|
||||
Thread worker4 = createWorker(cache2, 2);
|
||||
|
||||
// Create 100 initial sessions
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
|
||||
cache1.put(sessionId, wrappedSession);
|
||||
|
||||
removalCounts.put(sessionId, new AtomicInteger(0));
|
||||
}
|
||||
|
||||
logger.info("SESSIONS CREATED");
|
||||
|
||||
// Create 100 initial sessions
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
|
||||
Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
|
||||
}
|
||||
|
||||
logger.info("SESSIONS AVAILABLE ON DC2");
|
||||
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
worker1.start();
|
||||
worker2.start();
|
||||
worker3.start();
|
||||
worker4.start();
|
||||
|
||||
worker1.join();
|
||||
worker2.join();
|
||||
worker3.join();
|
||||
worker4.join();
|
||||
|
||||
logger.info("SESSIONS REMOVED");
|
||||
|
||||
Map<Integer, Integer> histogram = new HashMap<>();
|
||||
for (Map.Entry<String, AtomicInteger> entry : removalCounts.entrySet()) {
|
||||
int count = entry.getValue().get();
|
||||
|
||||
int current = histogram.get(count) == null ? 0 : histogram.get(count);
|
||||
current++;
|
||||
histogram.put(count, current);
|
||||
}
|
||||
|
||||
logger.infof("Histogram: %s", histogram.toString());
|
||||
logger.infof("Errors: %d", errorsCounter.get());
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
logger.infof("took %d ms", took);
|
||||
|
||||
|
||||
} finally {
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Finish JVM
|
||||
cache1.getCacheManager().stop();
|
||||
cache2.getCacheManager().stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
|
||||
// Create 100 initial sessions
|
||||
UserSessionEntity session = new UserSessionEntity(sessionId);
|
||||
session.setRealmId("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID());
|
||||
clientSession.setAuthMethod("saml");
|
||||
clientSession.setAction("something");
|
||||
clientSession.setTimestamp(1234);
|
||||
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
|
||||
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
|
||||
return wrappedSession;
|
||||
}
|
||||
|
||||
|
||||
private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
return new CacheWorker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
private static class CacheWorker extends Thread {
|
||||
|
||||
private final Cache<String, Object> cache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private CacheWorker(Cache cache, int myThreadId) {
|
||||
this.cache = cache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
|
||||
Object o = cache.remove(sessionId);
|
||||
|
||||
if (o != null) {
|
||||
removalCounts.get(sessionId).incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,220 +0,0 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.Flag;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
|
||||
import org.infinispan.client.hotrod.annotation.ClientListener;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
||||
import org.infinispan.factories.ComponentRegistry;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.manager.PersistenceManager;
|
||||
import org.infinispan.persistence.remote.RemoteStore;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "putIfAbsent" contract.
|
||||
*
|
||||
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyJDGCachePutTest {
|
||||
|
||||
private static final Map<String, EntryInfo> state = new HashMap<>();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Init map somehow
|
||||
for (int i=0 ; i<1000 ; i++) {
|
||||
String key = "key-" + i;
|
||||
state.put(key, new EntryInfo());
|
||||
}
|
||||
|
||||
// Create caches, listeners and finally worker threads
|
||||
Worker worker1 = createWorker(1);
|
||||
Worker worker2 = createWorker(2);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
// Start and join workers
|
||||
worker1.start();
|
||||
worker2.start();
|
||||
|
||||
worker1.join();
|
||||
worker2.join();
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
|
||||
Map<String, EntryInfo> failedState = new HashMap<>();
|
||||
|
||||
// Output
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
System.out.println(entry.getKey() + ":::" + entry.getValue());
|
||||
|
||||
if (entry.getValue().th1.get() != entry.getValue().th2.get()) {
|
||||
failedState.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
worker1.cache.remove(entry.getKey());
|
||||
}
|
||||
|
||||
System.out.println("\nFAILED ENTRIES. SIZE: " + failedState.size() + "\n");
|
||||
for (Map.Entry<String, EntryInfo> entry : failedState.entrySet()) {
|
||||
System.out.println(entry.getKey() + ":::" + entry.getValue());
|
||||
}
|
||||
|
||||
System.out.println("Took: " + took + " ms");
|
||||
|
||||
// Finish JVM
|
||||
worker1.cache.getCacheManager().stop();
|
||||
worker2.cache.getCacheManager().stop();
|
||||
}
|
||||
|
||||
private static Worker createWorker(int threadId) {
|
||||
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
||||
RemoteStore<?, ?> remoteStore = ComponentRegistry.componentOf(cache, PersistenceManager.class)
|
||||
.getStores(RemoteStore.class).iterator().next();
|
||||
HotRodListener listener = new HotRodListener();
|
||||
remoteStore.getRemoteCache().addClientListener(listener);
|
||||
|
||||
return new Worker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
||||
//private AtomicInteger listenerCount = new AtomicInteger(0);
|
||||
|
||||
@ClientCacheEntryCreated
|
||||
public void created(ClientCacheEntryCreatedEvent<String> event) {
|
||||
String cacheKey = event.getKey();
|
||||
state.get(cacheKey).successfulListenerWrites.incrementAndGet();
|
||||
}
|
||||
|
||||
@ClientCacheEntryModified
|
||||
public void updated(ClientCacheEntryModifiedEvent<String> event) {
|
||||
String cacheKey = event.getKey();
|
||||
state.get(cacheKey).successfulListenerWrites.incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static class Worker extends Thread {
|
||||
|
||||
private final Cache<String, Integer> cache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private Worker(Cache<String, Integer> cache, int myThreadId) {
|
||||
this.cache = cache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
String cacheKey = entry.getKey();
|
||||
EntryInfo wrapper = state.get(cacheKey);
|
||||
|
||||
int val = getClusterStartupTime(this.cache, cacheKey, wrapper, myThreadId);
|
||||
if (myThreadId == 1) {
|
||||
wrapper.th1.set(val);
|
||||
} else {
|
||||
wrapper.th2.set(val);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
System.out.println("Worker finished: " + myThreadId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static int getClusterStartupTime(Cache<String, Integer> cache, String cacheKey, EntryInfo wrapper, int myThreadId) {
|
||||
Integer startupTime = myThreadId==1 ? Integer.valueOf(cacheKey.substring(4)) : Integer.valueOf(cacheKey.substring(4)) * 2;
|
||||
|
||||
// Concurrency doesn't work correctly with this
|
||||
//Integer existingClusterStartTime = (Integer) cache.putIfAbsent(cacheKey, startupTime);
|
||||
|
||||
// Concurrency works fine with this
|
||||
RemoteCache remoteCache = ComponentRegistry.componentOf(cache, PersistenceManager.class)
|
||||
.getStores(RemoteStore.class)
|
||||
.iterator().next()
|
||||
.getRemoteCache();
|
||||
|
||||
Integer existingClusterStartTime = null;
|
||||
for (int i=0 ; i<10 ; i++) {
|
||||
try {
|
||||
existingClusterStartTime = (Integer) remoteCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(cacheKey, startupTime);
|
||||
break;
|
||||
} catch (HotRodClientException ce) {
|
||||
if (i == 9) {
|
||||
throw ce;
|
||||
//break;
|
||||
} else {
|
||||
wrapper.exceptions.incrementAndGet();
|
||||
System.err.println("Exception: i=" + i + " for key: " + cacheKey + " and myThreadId: " + myThreadId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (existingClusterStartTime == null
|
||||
// || startupTime.equals(remoteCache.get(cacheKey))
|
||||
) {
|
||||
wrapper.successfulInitializations.incrementAndGet();
|
||||
return startupTime;
|
||||
} else {
|
||||
wrapper.failedInitializations.incrementAndGet();
|
||||
return existingClusterStartTime;
|
||||
}
|
||||
}
|
||||
|
||||
public static class EntryInfo {
|
||||
AtomicInteger successfulInitializations = new AtomicInteger(0);
|
||||
AtomicInteger successfulListenerWrites = new AtomicInteger(0);
|
||||
AtomicInteger th1 = new AtomicInteger();
|
||||
AtomicInteger th2 = new AtomicInteger();
|
||||
AtomicInteger failedInitializations = new AtomicInteger();
|
||||
AtomicInteger exceptions = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Inits: %d, listeners: %d, failedInits: %d, exceptions: %s, th1: %d, th2: %d", successfulInitializations.get(), successfulListenerWrites.get(),
|
||||
failedInitializations.get(), exceptions.get(), th1.get(), th2.get());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,444 +0,0 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.VersionedValue;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
|
||||
import org.infinispan.client.hotrod.annotation.ClientListener;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.SessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.connections.infinispan.InfinispanUtil;
|
||||
import java.util.UUID;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
|
||||
/**
|
||||
* Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "replaceWithVersion" contract.
|
||||
*
|
||||
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyJDGCacheReplaceTest {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGCacheReplaceTest.class);
|
||||
|
||||
private static final int ITERATION_PER_WORKER = 1000;
|
||||
|
||||
private static RemoteCache remoteCache1;
|
||||
private static RemoteCache remoteCache2;
|
||||
|
||||
private static List<ExecutorService> executors = new ArrayList<>();
|
||||
|
||||
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
|
||||
private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0);
|
||||
|
||||
private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
|
||||
private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
|
||||
|
||||
private static final ConcurrencyTestHistogram histogram = new ConcurrencyTestHistogram();
|
||||
|
||||
//private static Map<String, EntryInfo> state = new HashMap<>();
|
||||
|
||||
private static final UUID CLIENT_1_UUID = UUID.randomUUID();
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
// Create initial item
|
||||
UserSessionEntity session = new UserSessionEntity("123");
|
||||
session.setRealmId("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID());
|
||||
clientSession.setAuthMethod("saml");
|
||||
clientSession.setAction("something");
|
||||
clientSession.setTimestamp(1234);
|
||||
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
|
||||
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
|
||||
|
||||
// Some dummy testing of remoteStore behaviour
|
||||
logger.info("Before put");
|
||||
|
||||
cache1
|
||||
.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL) // will still invoke remoteStore . Just doesn't propagate to cluster
|
||||
.put("123", wrappedSession);
|
||||
|
||||
logger.info("After put");
|
||||
|
||||
cache1.replace("123", wrappedSession);
|
||||
|
||||
logger.info("After replace");
|
||||
|
||||
cache1.get("123");
|
||||
|
||||
logger.info("After cache1.get");
|
||||
|
||||
cache2.get("123");
|
||||
|
||||
logger.info("After cache2.get");
|
||||
|
||||
cache1.get("123");
|
||||
|
||||
logger.info("After cache1.get - second call");
|
||||
|
||||
cache2.get("123");
|
||||
|
||||
logger.info("After cache2.get - second call");
|
||||
|
||||
cache2.replace("123", wrappedSession);
|
||||
|
||||
logger.info("After replace - second call");
|
||||
|
||||
cache1.get("123");
|
||||
|
||||
logger.info("After cache1.get - third call");
|
||||
|
||||
cache2.get("123");
|
||||
|
||||
logger.info("After cache2.get - third call");
|
||||
|
||||
cache1
|
||||
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD)
|
||||
.entrySet().stream().forEach(e -> {
|
||||
});
|
||||
|
||||
logger.info("After cache1.stream");
|
||||
|
||||
// Explicitly call put on remoteCache (KcRemoteCache.write ignores remote writes)
|
||||
InfinispanUtil.getRemoteCache(cache1).put("123", session);
|
||||
InfinispanUtil.getRemoteCache(cache2).replace("123", session);
|
||||
|
||||
// Create caches, listeners and finally worker threads
|
||||
remoteCache1 = InfinispanUtil.getRemoteCache(cache1);
|
||||
remoteCache2 = InfinispanUtil.getRemoteCache(cache2);
|
||||
|
||||
// Manual test of lifespans
|
||||
testLifespans();
|
||||
|
||||
Thread worker1 = createWorker(cache1, 1);
|
||||
Thread worker2 = createWorker(cache2, 2);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
// Start and join workers
|
||||
worker1.start();
|
||||
worker2.start();
|
||||
|
||||
worker1.join();
|
||||
worker2.join();
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
|
||||
// // Output
|
||||
// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
// System.out.println(entry.getKey() + ":::" + entry.getValue());
|
||||
// worker1.cache.remove(entry.getKey());
|
||||
// }
|
||||
|
||||
System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
|
||||
", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
|
||||
", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
|
||||
|
||||
System.out.println("Sleeping before other report");
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
|
||||
", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
|
||||
", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
|
||||
|
||||
System.out.println("remoteCache1.notes: " + ((UserSessionEntity) remoteCache1.get("123")).getNotes().size() );
|
||||
System.out.println("remoteCache2.notes: " + ((UserSessionEntity) remoteCache2.get("123")).getNotes().size() );
|
||||
|
||||
System.out.println("Histogram: ");
|
||||
//histogram.dumpStats();
|
||||
|
||||
// shutdown pools
|
||||
for (ExecutorService ex : executors) {
|
||||
ex.shutdown();
|
||||
}
|
||||
|
||||
// Finish JVM
|
||||
cache1.getCacheManager().stop();
|
||||
cache2.getCacheManager().stop();
|
||||
}
|
||||
|
||||
private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
||||
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
|
||||
|
||||
if (threadId == 1) {
|
||||
remoteCache1 = remoteCache;
|
||||
} else {
|
||||
remoteCache2 = remoteCache;
|
||||
}
|
||||
|
||||
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
|
||||
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
|
||||
remoteCache.addClientListener(listener);
|
||||
|
||||
return new RemoteCacheWorker(remoteCache, threadId);
|
||||
//return new CacheWorker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
||||
private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
|
||||
private RemoteCache remoteCache;
|
||||
private AtomicInteger listenerCount;
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
|
||||
this.listenerCount = listenerCount;
|
||||
this.remoteCache = remoteCache;
|
||||
this.origCache = origCache;
|
||||
executor = Executors.newCachedThreadPool();
|
||||
executors.add(executor);
|
||||
|
||||
}
|
||||
|
||||
@ClientCacheEntryCreated
|
||||
public void created(ClientCacheEntryCreatedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
listenerCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@ClientCacheEntryModified
|
||||
public void updated(ClientCacheEntryModifiedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
listenerCount.incrementAndGet();
|
||||
|
||||
executor.submit(() -> {
|
||||
// TODO: can be optimized - object sent in the event
|
||||
VersionedValue<SessionEntity> versionedVal = remoteCache.getWithMetadata(cacheKey);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
||||
if (versionedVal.getVersion() < event.getVersion()) {
|
||||
System.err.println("INCOMPATIBLE VERSION. event version: " + event.getVersion() + ", entity version: " + versionedVal.getVersion() + ", i=" + i);
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ie) {
|
||||
throw new RuntimeException(ie);
|
||||
}
|
||||
|
||||
versionedVal = remoteCache.getWithMetadata(cacheKey);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SessionEntity session = (SessionEntity) versionedVal.getValue();
|
||||
SessionEntityWrapper sessionWrapper = new SessionEntityWrapper(session);
|
||||
|
||||
if (listenerCount.get() % 100 == 0) {
|
||||
logger.infof("Listener count: " + listenerCount.get());
|
||||
}
|
||||
|
||||
// TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
|
||||
origCache
|
||||
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
|
||||
.replace(cacheKey, sessionWrapper);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
private static class RemoteCacheWorker extends Thread {
|
||||
|
||||
private final RemoteCache<String, UserSessionEntity> remoteCache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
|
||||
this.remoteCache = remoteCache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
|
||||
|
||||
// Histogram will contain value 1 in all places as it's always different note and hence session is changed to different value
|
||||
String noteKey = "n-" + myThreadId + "-" + i;
|
||||
|
||||
// In case it's hardcoded (eg. all the replaces are doing same change, so session is defacto not changed), then histogram may contain bigger value than 1 on some places.
|
||||
//String noteKey = "some";
|
||||
|
||||
ReplaceStatus replaced = ReplaceStatus.NOT_REPLACED;
|
||||
while (replaced != ReplaceStatus.REPLACED) {
|
||||
VersionedValue<UserSessionEntity> versioned = remoteCache.getWithMetadata("123");
|
||||
UserSessionEntity oldSession = versioned.getValue();
|
||||
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
|
||||
UserSessionEntity clone = oldSession;
|
||||
|
||||
// In case that exception was thrown (ReplaceStatus.ERROR), the remoteCache may have the note. Seems that transactions are not fully rolled-back on the JDG side
|
||||
// in case that backup fails
|
||||
if (replaced == ReplaceStatus.NOT_REPLACED) {
|
||||
clone.getNotes().put(noteKey, "someVal");
|
||||
} else if (replaced == ReplaceStatus.ERROR) {
|
||||
if (clone.getNotes().containsKey(noteKey)) {
|
||||
System.err.println("I HAVE THE KEY: " + noteKey);
|
||||
} else {
|
||||
System.err.println("I DON'T HAVE THE KEY: " + noteKey);
|
||||
clone.getNotes().put(noteKey, "someVal");
|
||||
}
|
||||
}
|
||||
|
||||
//cache.replace("123", clone);
|
||||
replaced = cacheReplace(versioned, clone);
|
||||
}
|
||||
|
||||
// Try to see if remoteCache on 2nd DC is immediatelly seeing our change
|
||||
RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
|
||||
//UserSessionEntity thatSession = (UserSessionEntity) secondDCRemoteCache.get("123");
|
||||
|
||||
//Assert.assertEquals("someVal", thatSession.getNotes().get(noteKey));
|
||||
//System.out.println("Passed");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private ReplaceStatus cacheReplace(VersionedValue<UserSessionEntity> oldSession, UserSessionEntity newSession) {
|
||||
try {
|
||||
boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion());
|
||||
//boolean replaced = true;
|
||||
//remoteCache.replace("123", newSession);
|
||||
if (!replaced) {
|
||||
failedReplaceCounter.incrementAndGet();
|
||||
//return false;
|
||||
//System.out.println("Replace failed!!!");
|
||||
} else {
|
||||
histogram.increaseSuccessOpsCount(oldSession.getVersion());
|
||||
}
|
||||
return replaced ? ReplaceStatus.REPLACED : ReplaceStatus.NOT_REPLACED;
|
||||
} catch (Exception re) {
|
||||
failedReplaceCounter2.incrementAndGet();
|
||||
return ReplaceStatus.ERROR;
|
||||
}
|
||||
//return replaced;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private enum ReplaceStatus {
|
||||
REPLACED, NOT_REPLACED, ERROR
|
||||
}
|
||||
|
||||
|
||||
private static void testLifespans() throws Exception {
|
||||
long l1 = InfinispanUtil.toHotrodTimeMs(remoteCache1, 5000);
|
||||
long l2 = InfinispanUtil.toHotrodTimeMs(remoteCache2, 2592000000L);
|
||||
long l3 = InfinispanUtil.toHotrodTimeMs(remoteCache2, 2592000001L);
|
||||
//long l4 = InfinispanUtil.getLifespanMs(remoteCache1, Time.currentTimeMillis() + 5000);
|
||||
|
||||
remoteCache1.put("k1", "v1", l1, TimeUnit.MILLISECONDS);
|
||||
remoteCache1.put("k2", "v2", l2, TimeUnit.MILLISECONDS);
|
||||
remoteCache1.put("k3", "v3", l3, TimeUnit.MILLISECONDS);
|
||||
remoteCache1.put("k4", "v4", Time.currentTimeMillis() + 5000, TimeUnit.MILLISECONDS);
|
||||
|
||||
System.out.println("l1=" + l1 + ", l2=" + l2 + ", l3=" + l3);
|
||||
System.out.println("k1=" + remoteCache1.get("k1") + ", k2=" + remoteCache1.get("k2") + ", k3=" + remoteCache1.get("k3") + ", k4=" + remoteCache1.get("k4"));
|
||||
|
||||
Thread.sleep(4000);
|
||||
|
||||
System.out.println("k1=" + remoteCache1.get("k1") + ", k2=" + remoteCache1.get("k2") + ", k3=" + remoteCache1.get("k3") + ", k4=" + remoteCache1.get("k4"));
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
System.out.println("k1=" + remoteCache1.get("k1") + ", k2=" + remoteCache1.get("k2") + ", k3=" + remoteCache1.get("k3") + ", k4=" + remoteCache1.get("k4"));
|
||||
}
|
||||
/*
|
||||
// Worker, which operates on "classic" cache and rely on operations delegated to the second cache
|
||||
private static class CacheWorker extends Thread {
|
||||
|
||||
private final Cache<String, SessionEntityWrapper<UserSessionEntity>> cache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private CacheWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int myThreadId) {
|
||||
this.cache = cache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
|
||||
|
||||
String noteKey = "n-" + myThreadId + "-" + i;
|
||||
|
||||
boolean replaced = false;
|
||||
while (!replaced) {
|
||||
VersionedValue<UserSessionEntity> versioned = cache.getVersioned("123");
|
||||
UserSessionEntity oldSession = versioned.getValue();
|
||||
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
|
||||
UserSessionEntity clone = oldSession;
|
||||
|
||||
clone.getNotes().put(noteKey, "someVal");
|
||||
//cache.replace("123", clone);
|
||||
replaced = cacheReplace(versioned, clone);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}*/
|
||||
|
||||
|
||||
}
|
|
@ -1,113 +0,0 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyJDGOfflineBackupsTest {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGOfflineBackupsTest.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
try {
|
||||
// Create initial item
|
||||
UserSessionEntity session = new UserSessionEntity("123");
|
||||
session.setRealmId("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
// AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity();
|
||||
// clientSession.setAuthMethod("saml");
|
||||
// clientSession.setAction("something");
|
||||
// clientSession.setTimestamp(1234);
|
||||
// clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2")));
|
||||
// clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2")));
|
||||
// session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
|
||||
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
|
||||
|
||||
// Some dummy testing of remoteStore behaviour
|
||||
logger.info("Before put");
|
||||
|
||||
|
||||
AtomicInteger successCount = new AtomicInteger(0);
|
||||
AtomicInteger errorsCount = new AtomicInteger(0);
|
||||
for (int i=0 ; i<100 ; i++) {
|
||||
try {
|
||||
cache1
|
||||
.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL) // will still invoke remoteStore . Just doesn't propagate to cluster
|
||||
.put("123", wrappedSession);
|
||||
successCount.incrementAndGet();
|
||||
Thread.sleep(1000);
|
||||
logger.infof("Success in the iteration: %d", i);
|
||||
} catch (HotRodClientException hrce) {
|
||||
logger.errorf("Failed to put the item in the iteration: %d ", i);
|
||||
errorsCount.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
logger.infof("SuccessCount: %d, ErrorsCount: %d", successCount.get(), errorsCount.get());
|
||||
|
||||
// logger.info("After put");
|
||||
//
|
||||
// cache1.replace("123", wrappedSession);
|
||||
//
|
||||
// logger.info("After replace");
|
||||
//
|
||||
// cache1.get("123");
|
||||
//
|
||||
// logger.info("After cache1.get");
|
||||
|
||||
// cache2.get("123");
|
||||
//
|
||||
// logger.info("After cache2.get");
|
||||
|
||||
} finally {
|
||||
// Finish JVM
|
||||
cache1.getCacheManager().stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,260 +0,0 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.VersionedValue;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
|
||||
import org.infinispan.client.hotrod.annotation.ClientListener;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||
import org.infinispan.factories.ComponentRegistry;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.manager.PersistenceManager;
|
||||
import org.infinispan.persistence.remote.RemoteStore;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.junit.Assert;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.connections.infinispan.InfinispanUtil;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Test that hotrod ClientListeners are correctly executed as expected
|
||||
*
|
||||
* STEPS TO REPRODUCE:
|
||||
* - Unzip infinispan-server-9.2.4.Final to some locations ISPN1 and ISPN2
|
||||
*
|
||||
* - Edit both ISPN1/standalone/configuration/clustered.xml and ISPN2/standalone/configuration/clustered.xml . Configure cache in container "clustered"
|
||||
*
|
||||
* <replicated-cache-configuration name="sessions-cfg" mode="ASYNC" start="EAGER" batching="false">
|
||||
<transaction mode="NON_XA" locking="PESSIMISTIC"/>
|
||||
</replicated-cache-configuration>
|
||||
|
||||
<replicated-cache name="work" configuration="sessions-cfg" />
|
||||
|
||||
- Run server1
|
||||
./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=1010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server
|
||||
|
||||
- Run server2
|
||||
./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=2010 -Djboss.default.multicast.address=234.56.78.100 -Djboss.node.name=cache-server-dc-2
|
||||
|
||||
- Run this test as main class from IDE
|
||||
*
|
||||
*
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyJDGRemoteCacheClientListenersTest {
|
||||
|
||||
// Helper map to track if listeners were executed
|
||||
private static final Map<String, EntryInfo> state = new HashMap<>();
|
||||
|
||||
private static final AtomicInteger totalListenerCalls = new AtomicInteger(0);
|
||||
|
||||
private static final AtomicInteger totalErrors = new AtomicInteger(0);
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Init map somehow
|
||||
for (int i=0 ; i<1000 ; i++) {
|
||||
String key = "key-" + i;
|
||||
EntryInfo entryInfo = new EntryInfo();
|
||||
entryInfo.val.set(i);
|
||||
state.put(key, entryInfo);
|
||||
}
|
||||
|
||||
// Create caches, listeners and finally worker threads
|
||||
Worker worker1 = createWorker(1);
|
||||
Worker worker2 = createWorker(2);
|
||||
|
||||
// Note "run", so it's not executed asynchronously here!!!
|
||||
worker1.run();
|
||||
|
||||
//
|
||||
// // Start and join workers
|
||||
// worker1.start();
|
||||
// worker2.start();
|
||||
//
|
||||
// worker1.join();
|
||||
// worker2.join();
|
||||
|
||||
// Output
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
System.out.println(entry.getKey() + ":::" + entry.getValue());
|
||||
}
|
||||
|
||||
System.out.println("totalListeners: " + totalListenerCalls.get() + ", totalErrors: " + totalErrors.get());
|
||||
|
||||
|
||||
// Assert that ClientListener was able to read the value and save it into EntryInfo
|
||||
try {
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
EntryInfo info = entry.getValue();
|
||||
Assert.assertEquals(info.val.get(), info.dc1Created.get());
|
||||
Assert.assertEquals(info.val.get(), info.dc2Created.get());
|
||||
Assert.assertEquals(info.val.get() * 2, info.dc1Updated.get());
|
||||
Assert.assertEquals(info.val.get() * 2, info.dc2Updated.get());
|
||||
}
|
||||
} finally {
|
||||
// Remove items
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
worker1.cache.remove(entry.getKey());
|
||||
}
|
||||
|
||||
// Finish JVM
|
||||
worker1.cache.getCacheManager().stop();
|
||||
worker2.cache.getCacheManager().stop();
|
||||
}
|
||||
}
|
||||
|
||||
private static Worker createWorker(int threadId) {
|
||||
EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
Cache<String, Integer> cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME);
|
||||
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
||||
RemoteStore<?, ?> remoteStore = ComponentRegistry.componentOf(cache, PersistenceManager.class)
|
||||
.getStores(RemoteStore.class).iterator().next();
|
||||
HotRodListener listener = new HotRodListener(cache, threadId);
|
||||
remoteStore.getRemoteCache().addClientListener(listener);
|
||||
|
||||
return new Worker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
||||
private final RemoteCache<String, Integer> remoteCache;
|
||||
private final int threadId;
|
||||
private final Executor executor;
|
||||
|
||||
public HotRodListener(Cache<String, Integer> cache, int threadId) {
|
||||
this.remoteCache = InfinispanUtil.getRemoteCache(cache);
|
||||
this.threadId = threadId;
|
||||
this.executor = Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
//private AtomicInteger listenerCount = new AtomicInteger(0);
|
||||
|
||||
@ClientCacheEntryCreated
|
||||
public void created(ClientCacheEntryCreatedEvent<String> event) {
|
||||
executor.execute(() -> event(event.getKey(), event.getVersion(), true));
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryModified
|
||||
public void updated(ClientCacheEntryModifiedEvent<String> event) {
|
||||
executor.execute(() -> event(event.getKey(), event.getVersion(), false));
|
||||
}
|
||||
|
||||
|
||||
private void event(String cacheKey, long version, boolean created) {
|
||||
EntryInfo entryInfo = state.get(cacheKey);
|
||||
entryInfo.successfulListenerWrites.incrementAndGet();
|
||||
|
||||
totalListenerCalls.incrementAndGet();
|
||||
|
||||
VersionedValue<Integer> versionedVal = remoteCache.getWithMetadata(cacheKey);
|
||||
|
||||
if (versionedVal.getVersion() < version) {
|
||||
System.err.println("INCOMPATIBLE VERSION. event version: " + version + ", entity version: " + versionedVal.getVersion());
|
||||
totalErrors.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
Integer val = versionedVal.getValue();
|
||||
if (val != null) {
|
||||
AtomicInteger dcVal;
|
||||
if (created) {
|
||||
dcVal = threadId == 1 ? entryInfo.dc1Created : entryInfo.dc2Created;
|
||||
} else {
|
||||
dcVal = threadId == 1 ? entryInfo.dc1Updated : entryInfo.dc2Updated;
|
||||
}
|
||||
dcVal.set(val);
|
||||
} else {
|
||||
System.err.println("NOT A VALUE FOR KEY: " + cacheKey);
|
||||
totalErrors.incrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static void createItems(Cache<String, Integer> cache, int myThreadId) {
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
String cacheKey = entry.getKey();
|
||||
Integer value = entry.getValue().val.get();
|
||||
|
||||
cache.put(cacheKey, value);
|
||||
}
|
||||
|
||||
System.out.println("Worker creating finished: " + myThreadId);
|
||||
}
|
||||
|
||||
private static class Worker extends Thread {
|
||||
|
||||
private final Cache<String, Integer> cache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private Worker(Cache<String, Integer> cache, int myThreadId) {
|
||||
this.cache = cache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
createItems(cache, myThreadId);
|
||||
|
||||
for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
String cacheKey = entry.getKey();
|
||||
Integer value = entry.getValue().val.get() * 2;
|
||||
|
||||
this.cache.replace(cacheKey, value);
|
||||
}
|
||||
|
||||
System.out.println("Worker updating finished: " + myThreadId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static class EntryInfo {
|
||||
AtomicInteger val = new AtomicInteger();
|
||||
AtomicInteger successfulListenerWrites = new AtomicInteger(0);
|
||||
AtomicInteger dc1Created = new AtomicInteger();
|
||||
AtomicInteger dc2Created = new AtomicInteger();
|
||||
AtomicInteger dc1Updated = new AtomicInteger();
|
||||
AtomicInteger dc2Updated = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("val: %d, successfulListenerWrites: %d, dc1Created: %d, dc2Created: %d, dc1Updated: %d, dc2Updated: %d", val.get(), successfulListenerWrites.get(),
|
||||
dc1Created.get(), dc2Created.get(), dc1Updated.get(), dc2Updated.get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,328 +0,0 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
|
||||
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
|
||||
import org.infinispan.client.hotrod.annotation.ClientListener;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
|
||||
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
|
||||
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
|
||||
import org.infinispan.context.Flag;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import org.keycloak.connections.infinispan.InfinispanUtil;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true.
|
||||
*
|
||||
* Also check that listeners are executed asynchronously with some delay.
|
||||
*
|
||||
* Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest}
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyJDGRemoveSessionTest {
|
||||
|
||||
protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class);
|
||||
|
||||
private static final int ITERATIONS = 10000;
|
||||
|
||||
private static RemoteCache remoteCache1;
|
||||
private static RemoteCache remoteCache2;
|
||||
|
||||
private static final AtomicInteger errorsCounter = new AtomicInteger(0);
|
||||
|
||||
private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0);
|
||||
private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0);
|
||||
|
||||
private static Map<String, AtomicInteger> removalCounts = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
private static final UUID CLIENT_1_UUID = UUID.randomUUID();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createManager(2).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
// Create caches, listeners and finally worker threads
|
||||
Thread worker1 = createWorker(cache1, 1);
|
||||
Thread worker2 = createWorker(cache2, 2);
|
||||
Thread worker3 = createWorker(cache1, 1);
|
||||
Thread worker4 = createWorker(cache2, 2);
|
||||
|
||||
// Create 100 initial sessions
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = createSessionEntity(sessionId);
|
||||
cache1.put(sessionId, wrappedSession);
|
||||
|
||||
removalCounts.put(sessionId, new AtomicInteger(0));
|
||||
}
|
||||
|
||||
logger.info("SESSIONS CREATED");
|
||||
|
||||
// Create 100 initial sessions
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
|
||||
Assert.assertNotNull("Loaded wrapper for key " + sessionId, loadedWrapper);
|
||||
}
|
||||
|
||||
logger.info("SESSIONS AVAILABLE ON DC2");
|
||||
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
worker1.start();
|
||||
worker2.start();
|
||||
worker3.start();
|
||||
worker4.start();
|
||||
|
||||
worker1.join();
|
||||
worker2.join();
|
||||
worker3.join();
|
||||
worker4.join();
|
||||
|
||||
logger.info("SESSIONS REMOVED");
|
||||
|
||||
Map<Integer, Integer> histogram = new HashMap<>();
|
||||
for (Map.Entry<String, AtomicInteger> entry : removalCounts.entrySet()) {
|
||||
int count = entry.getValue().get();
|
||||
|
||||
int current = histogram.get(count) == null ? 0 : histogram.get(count);
|
||||
current++;
|
||||
histogram.put(count, current);
|
||||
}
|
||||
|
||||
logger.infof("Histogram: %s", histogram.toString());
|
||||
logger.infof("Errors: %d", errorsCounter.get());
|
||||
|
||||
//Thread.sleep(5000);
|
||||
|
||||
// Doing it in opposite direction to ensure that newer are checked first.
|
||||
// This us currently FAILING (expected) as listeners are executed asynchronously.
|
||||
// for (int i=ITERATIONS-1 ; i>=0 ; i--) {
|
||||
// String sessionId = String.valueOf(i);
|
||||
//
|
||||
// logger.infof("Before call cache2.get: %s", sessionId);
|
||||
//
|
||||
// SessionEntityWrapper loadedWrapper = cache2.get(sessionId);
|
||||
// Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper);
|
||||
// }
|
||||
//
|
||||
// logger.info("SESSIONS NOT AVAILABLE ON DC2");
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
logger.infof("took %d ms", took);
|
||||
|
||||
// // Start and join workers
|
||||
// worker1.start();
|
||||
// worker2.start();
|
||||
//
|
||||
// worker1.join();
|
||||
// worker2.join();
|
||||
|
||||
} finally {
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Finish JVM
|
||||
cache1.getCacheManager().stop();
|
||||
cache2.getCacheManager().stop();
|
||||
}
|
||||
|
||||
// // Output
|
||||
// for (Map.Entry<String, EntryInfo> entry : state.entrySet()) {
|
||||
// System.out.println(entry.getKey() + ":::" + entry.getValue());
|
||||
// worker1.cache.remove(entry.getKey());
|
||||
// }
|
||||
|
||||
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
|
||||
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
|
||||
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() );
|
||||
//
|
||||
// System.out.println("Sleeping before other report");
|
||||
//
|
||||
// Thread.sleep(1000);
|
||||
//
|
||||
// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() +
|
||||
// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() +
|
||||
// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get());
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static SessionEntityWrapper<UserSessionEntity> createSessionEntity(String sessionId) {
|
||||
// Create 100 initial sessions
|
||||
UserSessionEntity session = new UserSessionEntity(sessionId);
|
||||
session.setRealmId("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID());
|
||||
clientSession.setAuthMethod("saml");
|
||||
clientSession.setAction("something");
|
||||
clientSession.setTimestamp(1234);
|
||||
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
|
||||
|
||||
SessionEntityWrapper<UserSessionEntity> wrappedSession = new SessionEntityWrapper<>(session);
|
||||
return wrappedSession;
|
||||
}
|
||||
|
||||
|
||||
private static Thread createWorker(Cache<String, SessionEntityWrapper<UserSessionEntity>> cache, int threadId) {
|
||||
System.out.println("Retrieved cache: " + threadId);
|
||||
|
||||
RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache);
|
||||
|
||||
if (threadId == 1) {
|
||||
remoteCache1 = remoteCache;
|
||||
} else {
|
||||
remoteCache2 = remoteCache;
|
||||
}
|
||||
|
||||
AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2;
|
||||
HotRodListener listener = new HotRodListener(cache, remoteCache, counter);
|
||||
remoteCache.addClientListener(listener);
|
||||
|
||||
return new RemoteCacheWorker(remoteCache, threadId);
|
||||
//return new CacheWorker(cache, threadId);
|
||||
}
|
||||
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
}
|
||||
|
||||
|
||||
@ClientListener
|
||||
public static class HotRodListener {
|
||||
|
||||
private Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache;
|
||||
private RemoteCache remoteCache;
|
||||
private AtomicInteger listenerCount;
|
||||
|
||||
public HotRodListener(Cache<String, SessionEntityWrapper<UserSessionEntity>> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) {
|
||||
this.listenerCount = listenerCount;
|
||||
this.remoteCache = remoteCache;
|
||||
this.origCache = origCache;
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryCreated
|
||||
public void created(ClientCacheEntryCreatedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
|
||||
logger.infof("Listener executed for creating of session %s", cacheKey);
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryModified
|
||||
public void modified(ClientCacheEntryModifiedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
|
||||
logger.infof("Listener executed for modifying of session %s", cacheKey);
|
||||
}
|
||||
|
||||
|
||||
@ClientCacheEntryRemoved
|
||||
public void removed(ClientCacheEntryRemovedEvent event) {
|
||||
String cacheKey = (String) event.getKey();
|
||||
|
||||
logger.infof("Listener executed for removing of session %s", cacheKey);
|
||||
|
||||
// TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried
|
||||
origCache
|
||||
.getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE)
|
||||
.remove(cacheKey);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class RemoteCacheWorker extends Thread {
|
||||
|
||||
private final RemoteCache<String, Object> remoteCache;
|
||||
|
||||
private final int myThreadId;
|
||||
|
||||
private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) {
|
||||
this.remoteCache = remoteCache;
|
||||
this.myThreadId = myThreadId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int i=0 ; i<ITERATIONS ; i++) {
|
||||
String sessionId = String.valueOf(i);
|
||||
|
||||
try {
|
||||
Object o = remoteCache
|
||||
.withFlags(org.infinispan.client.hotrod.Flag.FORCE_RETURN_VALUE)
|
||||
.remove(sessionId);
|
||||
|
||||
if (o != null) {
|
||||
removalCounts.get(sessionId).incrementAndGet();
|
||||
}
|
||||
} catch (HotRodClientException hrce) {
|
||||
errorsCounter.incrementAndGet();
|
||||
}
|
||||
//
|
||||
//
|
||||
// logger.infof("Session %s removed on DC1", sessionId);
|
||||
//
|
||||
// // Check if it's immediately seen that session is removed on 2nd DC
|
||||
// RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1;
|
||||
// SessionEntityWrapper thatSession = (SessionEntityWrapper) secondDCRemoteCache.get(sessionId);
|
||||
// Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, thatSession);
|
||||
//
|
||||
// // Also check that it's immediatelly removed on my DC
|
||||
// SessionEntityWrapper mySession = (SessionEntityWrapper) remoteCache.get(sessionId);
|
||||
// Assert.assertNull("Session with ID " + sessionId + " not removed on the other DC. ThreadID: " + myThreadId, mySession);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -1,54 +0,0 @@
|
|||
/*
|
||||
* Copyright 2017 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class ConcurrencyTestHistogram {
|
||||
|
||||
private final ConcurrentMap<Long, AtomicInteger> counters = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
public ConcurrencyTestHistogram() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void increaseSuccessOpsCount(long version) {
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
AtomicInteger existing = counters.putIfAbsent(version, counter);
|
||||
if (existing != null) {
|
||||
counter = existing;
|
||||
}
|
||||
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
public void dumpStats() {
|
||||
for (Map.Entry<Long, AtomicInteger> entry : counters.entrySet()) {
|
||||
System.out.println(entry.getKey() + "=" + entry.getValue().get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,114 +0,0 @@
|
|||
/*
|
||||
* Copyright 2020 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.keycloak.cluster.infinispan;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.commons.util.CloseableIterator;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder;
|
||||
import org.jboss.logging.Logger;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext;
|
||||
import org.keycloak.connections.infinispan.InfinispanUtil;
|
||||
|
||||
/**
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
public class JDGPutTest {
|
||||
|
||||
public static final Logger logger = Logger.getLogger(JDGPutTest.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Cache<String, Object> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
Cache<String, Object> cache2 = createManager(2).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
try {
|
||||
//RemoteCache remoteCache1 = InfinispanUtil.getRemoteCache(cache1);
|
||||
//RemoteCache remoteCache2 = InfinispanUtil.getRemoteCache(cache2);
|
||||
|
||||
//remoteCache1.put("key1", new Book("book1", "desc", 1));
|
||||
//remoteCache2.put("key2", );
|
||||
String uuidStr = UUID.randomUUID().toString();
|
||||
System.out.println(uuidStr);
|
||||
UUID uuid = UUID.fromString(uuidStr);
|
||||
AuthenticatedClientSessionEntity ace = new AuthenticatedClientSessionEntity(uuid);
|
||||
SessionEntityWrapper wrapper = new SessionEntityWrapper(ace);
|
||||
|
||||
cache1.put("key1", wrapper);
|
||||
//cache1.put("key1", "val1");
|
||||
|
||||
//AuthenticatedClientSessionEntity val1 = (AuthenticatedClientSessionEntity) cache2.get("key1");
|
||||
//RemoteCache remoteCache1 = InfinispanUtil.getRemoteCache(cache1);
|
||||
//remoteCache1.put("key1", "val1");
|
||||
RemoteCache remoteCache2 = InfinispanUtil.getRemoteCache(cache2);
|
||||
Object o = remoteCache2.get("key1");
|
||||
|
||||
logger.info("Before retrieve entries");
|
||||
try (CloseableIterator it = remoteCache2.retrieveEntries(null, 64)) {
|
||||
Object o2 = it.next();
|
||||
logger.info("o2: " + o2);
|
||||
}
|
||||
|
||||
//Object key = remoteCache2.keySet().iterator().next();
|
||||
//Object value = remoteCache2.values().iterator().next();
|
||||
//logger.info("Key: " + key + ", val: " + value);
|
||||
|
||||
bulkLoadSessions(remoteCache2);
|
||||
} finally {
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Finish JVM
|
||||
cache1.getCacheManager().stop();
|
||||
cache2.getCacheManager().stop();
|
||||
}
|
||||
}
|
||||
|
||||
private static EmbeddedCacheManager createManager(int threadId) {
|
||||
return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class);
|
||||
}
|
||||
|
||||
private static void bulkLoadSessions(RemoteCache remoteCache) {
|
||||
RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(64);
|
||||
|
||||
Map<Object, Object> toInsert = new HashMap<>(ctx.getSessionsPerSegment());
|
||||
|
||||
try (CloseableIterator<Map.Entry<Object, Object>> it = remoteCache.retrieveEntries(null, ctx.getSessionsPerSegment())) {
|
||||
while (it.hasNext()) {
|
||||
Map.Entry<?,?> entry = it.next();
|
||||
toInsert.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
logger.warnf(e, "Error loading sessions from remote cache '%s'", remoteCache.getName());
|
||||
throw e;
|
||||
}
|
||||
|
||||
logger.info("Loaded " + toInsert);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,229 +0,0 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.sessions.infinispan.initializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.client.hotrod.ProtocolVersion;
|
||||
import org.infinispan.client.hotrod.RemoteCache;
|
||||
import org.infinispan.client.hotrod.RemoteCacheManager;
|
||||
import org.infinispan.commons.api.BasicCache;
|
||||
import org.infinispan.configuration.cache.CacheMode;
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.junit.Ignore;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Test concurrent writes to distributed cache with usage of atomic replace
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
@Ignore
|
||||
public class DistributedCacheConcurrentWritesTest {
|
||||
|
||||
private static final int BATCHES_PER_WORKER = 1000;
|
||||
private static final int ITEMS_IN_BATCH = 100;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createCache("node1");
|
||||
BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createCache("node2");
|
||||
|
||||
// NOTE: This setup requires infinispan servers to be up and running on localhost:12232 and localhost:13232
|
||||
// BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1 = createRemoteCache("node1");
|
||||
// BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2 = createRemoteCache("node2");
|
||||
|
||||
try {
|
||||
testConcurrentPut(cache1, cache2);
|
||||
} finally {
|
||||
|
||||
// Kill JVM
|
||||
cache1.stop();
|
||||
cache2.stop();
|
||||
stopMgr(cache1);
|
||||
stopMgr(cache2);
|
||||
|
||||
System.out.println("Managers killed");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static SessionEntityWrapper<UserSessionEntity> createEntityInstance(String id) {
|
||||
// Create initial item
|
||||
UserSessionEntity session = new UserSessionEntity(id);
|
||||
session.setRealmId("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID());
|
||||
clientSession.setAuthMethod("saml");
|
||||
clientSession.setAction("something");
|
||||
clientSession.setTimestamp(1234);
|
||||
session.getAuthenticatedClientSessions().put("foo-client", clientSession.getId());
|
||||
|
||||
return new SessionEntityWrapper<>(session);
|
||||
}
|
||||
|
||||
|
||||
// Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime
|
||||
private static void testConcurrentPut(BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache1,
|
||||
BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache2) throws InterruptedException {
|
||||
|
||||
// Create workers for concurrent write and start them
|
||||
Worker worker1 = new Worker(1, cache1);
|
||||
Worker worker2 = new Worker(2, cache2);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
System.out.println("Started clustering test");
|
||||
|
||||
worker1.start();
|
||||
//worker1.join();
|
||||
worker2.start();
|
||||
|
||||
worker1.join();
|
||||
worker2.join();
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
|
||||
System.out.println("Test finished. Took: " + took + " ms. Cache size: " + cache1.size());
|
||||
}
|
||||
|
||||
|
||||
private static class Worker extends Thread {
|
||||
|
||||
private final BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache;
|
||||
private final int startIndex;
|
||||
|
||||
public Worker(int threadId, BasicCache<String, SessionEntityWrapper<UserSessionEntity>> cache) {
|
||||
this.cache = cache;
|
||||
this.startIndex = (threadId - 1) * (ITEMS_IN_BATCH * BATCHES_PER_WORKER);
|
||||
setName("th-" + threadId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int page = 0; page < BATCHES_PER_WORKER ; page++) {
|
||||
int startPageIndex = startIndex + page * ITEMS_IN_BATCH;
|
||||
|
||||
putItemsClassic(startPageIndex);
|
||||
//putItemsAll(startPageIndex);
|
||||
|
||||
System.out.println("Thread " + getName() + ": Saved items from " + startPageIndex + " to " + (startPageIndex + ITEMS_IN_BATCH - 1));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// put items 1 by 1
|
||||
private void putItemsClassic(int startPageIndex) {
|
||||
for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
|
||||
String key = "key-" + startIndex + i;
|
||||
SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
|
||||
cache.put(key, session);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// put all items together
|
||||
private void putItemsAll(int startPageIndex) {
|
||||
Map<String, SessionEntityWrapper<UserSessionEntity>> mapp = new HashMap<>();
|
||||
|
||||
for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) {
|
||||
String key = "key-" + startIndex + i;
|
||||
SessionEntityWrapper<UserSessionEntity> session = createEntityInstance(key);
|
||||
mapp.put(key, session);
|
||||
}
|
||||
|
||||
cache.putAll(mapp);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Cache creation utils
|
||||
|
||||
|
||||
public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createCache(String nodeName) {
|
||||
EmbeddedCacheManager mgr = createManager(nodeName);
|
||||
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
return cache;
|
||||
}
|
||||
|
||||
|
||||
public static EmbeddedCacheManager createManager(String nodeName) {
|
||||
System.setProperty("java.net.preferIPv4Stack", "true");
|
||||
System.setProperty("jgroups.tcp.port", "53715");
|
||||
|
||||
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
|
||||
gcb = gcb.clusteredDefault();
|
||||
gcb.transport().clusterName("test-clustering");
|
||||
gcb.transport().nodeName(nodeName);
|
||||
gcb.jmx().domain(InfinispanConnectionProvider.JMX_DOMAIN).enable();
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
|
||||
ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
|
||||
distConfigBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
|
||||
distConfigBuilder.clustering().hash().numOwners(1);
|
||||
|
||||
// Disable L1 cache
|
||||
distConfigBuilder.clustering().hash().l1().enabled(false);
|
||||
Configuration distConfig = distConfigBuilder.build();
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, distConfig);
|
||||
|
||||
return cacheManager;
|
||||
}
|
||||
|
||||
|
||||
public static BasicCache<String, SessionEntityWrapper<UserSessionEntity>> createRemoteCache(String nodeName) {
|
||||
int port = ("node1".equals(nodeName)) ? 12232 : 13232;
|
||||
|
||||
org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
|
||||
org.infinispan.client.hotrod.configuration.Configuration cfg = builder
|
||||
.addServer().host("localhost").port(port)
|
||||
.version(ProtocolVersion.PROTOCOL_VERSION_26)
|
||||
.build();
|
||||
RemoteCacheManager mgr = new RemoteCacheManager(cfg);
|
||||
return mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
}
|
||||
|
||||
// CLEANUP METHODS
|
||||
|
||||
private static void stopMgr(BasicCache cache) {
|
||||
if (cache instanceof Cache) {
|
||||
((Cache) cache).getCacheManager().stop();
|
||||
} else {
|
||||
((RemoteCache) cache).getRemoteCacheManager().stop();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,195 +0,0 @@
|
|||
/*
|
||||
* Copyright 2016 Red Hat, Inc. and/or its affiliates
|
||||
* and other contributors as indicated by the @author tags.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.keycloak.models.sessions.infinispan.initializer;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.configuration.cache.CacheMode;
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.transaction.LockingMode;
|
||||
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
|
||||
import org.infinispan.util.concurrent.IsolationLevel;
|
||||
import org.keycloak.common.util.Time;
|
||||
import org.keycloak.connections.infinispan.InfinispanConnectionProvider;
|
||||
import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity;
|
||||
import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Test concurrent writes to distributed cache with usage of write skew
|
||||
*
|
||||
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
|
||||
*/
|
||||
//@Ignore
|
||||
public class DistributedCacheWriteSkewTest {
|
||||
|
||||
private static final int ITERATION_PER_WORKER = 1000;
|
||||
|
||||
private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0);
|
||||
|
||||
private static final UUID CLIENT_1_UUID = UUID.randomUUID();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Cache<String, UserSessionEntity> cache1 = createManager("node1").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
Cache<String, UserSessionEntity> cache2 = createManager("node2").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME);
|
||||
|
||||
// Create initial item
|
||||
UserSessionEntity session = new UserSessionEntity("123");
|
||||
session.setRealmId("foo");
|
||||
session.setBrokerSessionId("!23123123");
|
||||
session.setBrokerUserId(null);
|
||||
session.setUser("foo");
|
||||
session.setLoginUsername("foo");
|
||||
session.setIpAddress("123.44.143.178");
|
||||
session.setStarted(Time.currentTime());
|
||||
session.setLastSessionRefresh(Time.currentTime());
|
||||
|
||||
AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID());
|
||||
clientSession.setAuthMethod("saml");
|
||||
clientSession.setAction("something");
|
||||
clientSession.setTimestamp(1234);
|
||||
session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId());
|
||||
|
||||
cache1.put("123", session);
|
||||
|
||||
//cache1.replace("123", session);
|
||||
|
||||
// Create 2 workers for concurrent write and start them
|
||||
Worker worker1 = new Worker(1, cache1);
|
||||
Worker worker2 = new Worker(2, cache2);
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
System.out.println("Started clustering test");
|
||||
|
||||
worker1.start();
|
||||
//worker1.join();
|
||||
worker2.start();
|
||||
|
||||
worker1.join();
|
||||
worker2.join();
|
||||
|
||||
long took = System.currentTimeMillis() - start;
|
||||
session = cache1.get("123");
|
||||
System.out.println("Took: " + took + " ms. Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get());
|
||||
|
||||
// Kill JVM
|
||||
cache1.stop();
|
||||
cache2.stop();
|
||||
cache1.getCacheManager().stop();
|
||||
cache2.getCacheManager().stop();
|
||||
|
||||
System.out.println("Managers killed");
|
||||
}
|
||||
|
||||
|
||||
private static class Worker extends Thread {
|
||||
|
||||
private final Cache<String, UserSessionEntity> cache;
|
||||
private final int threadId;
|
||||
|
||||
public Worker(int threadId, Cache<String, UserSessionEntity> cache) {
|
||||
this.threadId = threadId;
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
for (int i=0 ; i<ITERATION_PER_WORKER ; i++) {
|
||||
|
||||
String noteKey = "n-" + threadId + "-" + i;
|
||||
|
||||
boolean replaced = false;
|
||||
while (!replaced) {
|
||||
try {
|
||||
//cache.startBatch();
|
||||
|
||||
UserSessionEntity oldSession = cache.get("123");
|
||||
|
||||
//UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession);
|
||||
UserSessionEntity clone = oldSession;
|
||||
|
||||
clone.getNotes().put(noteKey, "someVal");
|
||||
|
||||
cache.replace("123", clone);
|
||||
//cache.getAdvancedCache().withFlags(Flag.FAIL_SILENTLY).endBatch(true);
|
||||
replaced = true;
|
||||
} catch (Exception e) {
|
||||
System.out.println(e);
|
||||
e.printStackTrace();
|
||||
failedReplaceCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static EmbeddedCacheManager createManager(String nodeName) {
|
||||
System.setProperty("java.net.preferIPv4Stack", "true");
|
||||
System.setProperty("jgroups.tcp.port", "53715");
|
||||
|
||||
GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
|
||||
gcb = gcb.clusteredDefault();
|
||||
gcb.transport().clusterName("test-clustering");
|
||||
gcb.transport().nodeName(nodeName);
|
||||
gcb.jmx().domain(InfinispanConnectionProvider.JMX_DOMAIN).enable();
|
||||
EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build());
|
||||
|
||||
ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder();
|
||||
distConfigBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
|
||||
distConfigBuilder.clustering().hash().numOwners(1);
|
||||
|
||||
// Disable L1 cache
|
||||
distConfigBuilder.clustering().hash().l1().enabled(false);
|
||||
|
||||
//distConfigBuilder.storeAsBinary().enable().storeKeysAsBinary(false).storeValuesAsBinary(true);
|
||||
|
||||
// KEYCLOAK-13692 - Per ISPN-7613 Infinispan:
|
||||
// * Automatically enables versioning when needed,
|
||||
// * writeSkewCheck automatically enabled for OPTIMISTIC and REPEATABLE_READ transactions
|
||||
// so the following explicit settings of these are not needed anymore
|
||||
// distConfigBuilder.versioning().enabled(true);
|
||||
// distConfigBuilder.versioning().scheme(VersioningScheme.SIMPLE);
|
||||
// distConfigBuilder.locking().writeSkewCheck(true);
|
||||
|
||||
distConfigBuilder.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
|
||||
distConfigBuilder.locking().concurrencyLevel(32);
|
||||
distConfigBuilder.locking().lockAcquisitionTimeout(1000, TimeUnit.SECONDS);
|
||||
|
||||
// distConfigBuilder.invocationBatching().enable();
|
||||
//distConfigBuilder.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
|
||||
distConfigBuilder.transaction().transactionManagerLookup(new EmbeddedTransactionManagerLookup());
|
||||
distConfigBuilder.transaction().lockingMode(LockingMode.OPTIMISTIC);
|
||||
|
||||
Configuration distConfig = distConfigBuilder.build();
|
||||
cacheManager.defineConfiguration(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, distConfig);
|
||||
|
||||
return cacheManager;
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue