diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyDistributedRemoveSessionTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyDistributedRemoveSessionTest.java deleted file mode 100644 index e853274142..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyDistributedRemoveSessionTest.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright 2017 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.cluster.infinispan; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.infinispan.Cache; -import org.jboss.logging.Logger; -import org.junit.Assert; -import org.keycloak.common.util.Time; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import org.keycloak.models.sessions.infinispan.initializer.DistributedCacheConcurrentWritesTest; - -/** - * @author Marek Posolda - */ -public class ConcurrencyDistributedRemoveSessionTest { - - - protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class); - - private static final int ITERATIONS = 10000; - - private static final AtomicInteger errorsCounter = new AtomicInteger(0); - - private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0); - private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0); - - private static Map removalCounts = new ConcurrentHashMap<>(); - - - private static final UUID CLIENT_1_UUID = UUID.randomUUID(); - - public static void main(String[] args) throws Exception { - Cache> cache1 = DistributedCacheConcurrentWritesTest.createManager("node1").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - Cache> cache2 = DistributedCacheConcurrentWritesTest.createManager("node2").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - // Create caches, listeners and finally worker threads - Thread worker1 = createWorker(cache1, 1); - Thread worker2 = createWorker(cache2, 2); - Thread worker3 = createWorker(cache1, 1); - Thread worker4 = createWorker(cache2, 2); - - // Create 100 initial sessions - for (int i=0 ; i wrappedSession = createSessionEntity(sessionId); - cache1.put(sessionId, wrappedSession); - - removalCounts.put(sessionId, new AtomicInteger(0)); - } - - logger.info("SESSIONS CREATED"); - - // Create 100 initial sessions - for (int i=0 ; i histogram = new HashMap<>(); - for (Map.Entry entry : removalCounts.entrySet()) { - int count = entry.getValue().get(); - - int current = histogram.get(count) == null ? 0 : histogram.get(count); - current++; - histogram.put(count, current); - } - - logger.infof("Histogram: %s", histogram.toString()); - logger.infof("Errors: %d", errorsCounter.get()); - - long took = System.currentTimeMillis() - start; - logger.infof("took %d ms", took); - - - } finally { - Thread.sleep(2000); - - // Finish JVM - cache1.getCacheManager().stop(); - cache2.getCacheManager().stop(); - } - } - - - private static SessionEntityWrapper createSessionEntity(String sessionId) { - // Create 100 initial sessions - UserSessionEntity session = new UserSessionEntity(sessionId); - session.setRealmId("foo"); - session.setBrokerSessionId("!23123123"); - session.setBrokerUserId(null); - session.setUser("foo"); - session.setLoginUsername("foo"); - session.setIpAddress("123.44.143.178"); - session.setStarted(Time.currentTime()); - session.setLastSessionRefresh(Time.currentTime()); - - AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID()); - clientSession.setAuthMethod("saml"); - clientSession.setAction("something"); - clientSession.setTimestamp(1234); - session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId()); - - SessionEntityWrapper wrappedSession = new SessionEntityWrapper<>(session); - return wrappedSession; - } - - - private static Thread createWorker(Cache> cache, int threadId) { - System.out.println("Retrieved cache: " + threadId); - return new CacheWorker(cache, threadId); - } - - - private static class CacheWorker extends Thread { - - private final Cache cache; - - private final int myThreadId; - - private CacheWorker(Cache cache, int myThreadId) { - this.cache = cache; - this.myThreadId = myThreadId; - } - - - @Override - public void run() { - - for (int i=0 ; iMarek Posolda - */ -public class ConcurrencyJDGCachePutTest { - - private static final Map state = new HashMap<>(); - - public static void main(String[] args) throws Exception { - // Init map somehow - for (int i=0 ; i<1000 ; i++) { - String key = "key-" + i; - state.put(key, new EntryInfo()); - } - - // Create caches, listeners and finally worker threads - Worker worker1 = createWorker(1); - Worker worker2 = createWorker(2); - - long start = System.currentTimeMillis(); - - // Start and join workers - worker1.start(); - worker2.start(); - - worker1.join(); - worker2.join(); - - long took = System.currentTimeMillis() - start; - - Map failedState = new HashMap<>(); - - // Output - for (Map.Entry entry : state.entrySet()) { - System.out.println(entry.getKey() + ":::" + entry.getValue()); - - if (entry.getValue().th1.get() != entry.getValue().th2.get()) { - failedState.put(entry.getKey(), entry.getValue()); - } - - worker1.cache.remove(entry.getKey()); - } - - System.out.println("\nFAILED ENTRIES. SIZE: " + failedState.size() + "\n"); - for (Map.Entry entry : failedState.entrySet()) { - System.out.println(entry.getKey() + ":::" + entry.getValue()); - } - - System.out.println("Took: " + took + " ms"); - - // Finish JVM - worker1.cache.getCacheManager().stop(); - worker2.cache.getCacheManager().stop(); - } - - private static Worker createWorker(int threadId) { - EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class); - Cache cache = manager.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - System.out.println("Retrieved cache: " + threadId); - - RemoteStore remoteStore = ComponentRegistry.componentOf(cache, PersistenceManager.class) - .getStores(RemoteStore.class).iterator().next(); - HotRodListener listener = new HotRodListener(); - remoteStore.getRemoteCache().addClientListener(listener); - - return new Worker(cache, threadId); - } - - - @ClientListener - public static class HotRodListener { - - //private AtomicInteger listenerCount = new AtomicInteger(0); - - @ClientCacheEntryCreated - public void created(ClientCacheEntryCreatedEvent event) { - String cacheKey = event.getKey(); - state.get(cacheKey).successfulListenerWrites.incrementAndGet(); - } - - @ClientCacheEntryModified - public void updated(ClientCacheEntryModifiedEvent event) { - String cacheKey = event.getKey(); - state.get(cacheKey).successfulListenerWrites.incrementAndGet(); - } - - } - - - private static class Worker extends Thread { - - private final Cache cache; - - private final int myThreadId; - - private Worker(Cache cache, int myThreadId) { - this.cache = cache; - this.myThreadId = myThreadId; - } - - @Override - public void run() { - for (Map.Entry entry : state.entrySet()) { - String cacheKey = entry.getKey(); - EntryInfo wrapper = state.get(cacheKey); - - int val = getClusterStartupTime(this.cache, cacheKey, wrapper, myThreadId); - if (myThreadId == 1) { - wrapper.th1.set(val); - } else { - wrapper.th2.set(val); - } - - } - - System.out.println("Worker finished: " + myThreadId); - } - - } - - public static int getClusterStartupTime(Cache cache, String cacheKey, EntryInfo wrapper, int myThreadId) { - Integer startupTime = myThreadId==1 ? Integer.valueOf(cacheKey.substring(4)) : Integer.valueOf(cacheKey.substring(4)) * 2; - - // Concurrency doesn't work correctly with this - //Integer existingClusterStartTime = (Integer) cache.putIfAbsent(cacheKey, startupTime); - - // Concurrency works fine with this - RemoteCache remoteCache = ComponentRegistry.componentOf(cache, PersistenceManager.class) - .getStores(RemoteStore.class) - .iterator().next() - .getRemoteCache(); - - Integer existingClusterStartTime = null; - for (int i=0 ; i<10 ; i++) { - try { - existingClusterStartTime = (Integer) remoteCache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(cacheKey, startupTime); - break; - } catch (HotRodClientException ce) { - if (i == 9) { - throw ce; - //break; - } else { - wrapper.exceptions.incrementAndGet(); - System.err.println("Exception: i=" + i + " for key: " + cacheKey + " and myThreadId: " + myThreadId); - } - } - } - - if (existingClusterStartTime == null -// || startupTime.equals(remoteCache.get(cacheKey)) - ) { - wrapper.successfulInitializations.incrementAndGet(); - return startupTime; - } else { - wrapper.failedInitializations.incrementAndGet(); - return existingClusterStartTime; - } - } - - public static class EntryInfo { - AtomicInteger successfulInitializations = new AtomicInteger(0); - AtomicInteger successfulListenerWrites = new AtomicInteger(0); - AtomicInteger th1 = new AtomicInteger(); - AtomicInteger th2 = new AtomicInteger(); - AtomicInteger failedInitializations = new AtomicInteger(); - AtomicInteger exceptions = new AtomicInteger(); - - @Override - public String toString() { - return String.format("Inits: %d, listeners: %d, failedInits: %d, exceptions: %s, th1: %d, th2: %d", successfulInitializations.get(), successfulListenerWrites.get(), - failedInitializations.get(), exceptions.get(), th1.get(), th2.get()); - } - } - - - -} diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGCacheReplaceTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGCacheReplaceTest.java deleted file mode 100644 index 16a468a0fb..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGCacheReplaceTest.java +++ /dev/null @@ -1,444 +0,0 @@ -/* - * Copyright 2016 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.cluster.infinispan; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.infinispan.Cache; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.client.hotrod.VersionedValue; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; -import org.infinispan.client.hotrod.annotation.ClientListener; -import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; -import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; -import org.infinispan.context.Flag; -import org.infinispan.manager.EmbeddedCacheManager; -import org.jboss.logging.Logger; -import org.keycloak.common.util.Time; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.SessionEntity; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import org.keycloak.connections.infinispan.InfinispanUtil; -import java.util.UUID; -import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; - -/** - * Test concurrency for remoteStore (backed by HotRod RemoteCaches) against external JDG. Especially tests "replaceWithVersion" contract. - * - * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest} - * - * @author Marek Posolda - */ -public class ConcurrencyJDGCacheReplaceTest { - - protected static final Logger logger = Logger.getLogger(ConcurrencyJDGCacheReplaceTest.class); - - private static final int ITERATION_PER_WORKER = 1000; - - private static RemoteCache remoteCache1; - private static RemoteCache remoteCache2; - - private static List executors = new ArrayList<>(); - - private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0); - private static final AtomicInteger failedReplaceCounter2 = new AtomicInteger(0); - - private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0); - private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0); - - private static final ConcurrencyTestHistogram histogram = new ConcurrencyTestHistogram(); - - //private static Map state = new HashMap<>(); - - private static final UUID CLIENT_1_UUID = UUID.randomUUID(); - - - public static void main(String[] args) throws Exception { - Cache> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - Cache> cache2 = createManager(2).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - // Create initial item - UserSessionEntity session = new UserSessionEntity("123"); - session.setRealmId("foo"); - session.setBrokerSessionId("!23123123"); - session.setBrokerUserId(null); - session.setUser("foo"); - session.setLoginUsername("foo"); - session.setIpAddress("123.44.143.178"); - session.setStarted(Time.currentTime()); - session.setLastSessionRefresh(Time.currentTime()); - - AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID()); - clientSession.setAuthMethod("saml"); - clientSession.setAction("something"); - clientSession.setTimestamp(1234); - session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId()); - - SessionEntityWrapper wrappedSession = new SessionEntityWrapper<>(session); - - // Some dummy testing of remoteStore behaviour - logger.info("Before put"); - - cache1 - .getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL) // will still invoke remoteStore . Just doesn't propagate to cluster - .put("123", wrappedSession); - - logger.info("After put"); - - cache1.replace("123", wrappedSession); - - logger.info("After replace"); - - cache1.get("123"); - - logger.info("After cache1.get"); - - cache2.get("123"); - - logger.info("After cache2.get"); - - cache1.get("123"); - - logger.info("After cache1.get - second call"); - - cache2.get("123"); - - logger.info("After cache2.get - second call"); - - cache2.replace("123", wrappedSession); - - logger.info("After replace - second call"); - - cache1.get("123"); - - logger.info("After cache1.get - third call"); - - cache2.get("123"); - - logger.info("After cache2.get - third call"); - - cache1 - .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD) - .entrySet().stream().forEach(e -> { - }); - - logger.info("After cache1.stream"); - - // Explicitly call put on remoteCache (KcRemoteCache.write ignores remote writes) - InfinispanUtil.getRemoteCache(cache1).put("123", session); - InfinispanUtil.getRemoteCache(cache2).replace("123", session); - - // Create caches, listeners and finally worker threads - remoteCache1 = InfinispanUtil.getRemoteCache(cache1); - remoteCache2 = InfinispanUtil.getRemoteCache(cache2); - - // Manual test of lifespans - testLifespans(); - - Thread worker1 = createWorker(cache1, 1); - Thread worker2 = createWorker(cache2, 2); - - long start = System.currentTimeMillis(); - - // Start and join workers - worker1.start(); - worker2.start(); - - worker1.join(); - worker2.join(); - - long took = System.currentTimeMillis() - start; - -// // Output -// for (Map.Entry entry : state.entrySet()) { -// System.out.println(entry.getKey() + ":::" + entry.getValue()); -// worker1.cache.remove(entry.getKey()); -// } - - System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() + - ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + - ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() ); - - System.out.println("Sleeping before other report"); - - Thread.sleep(2000); - - System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() + - ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + - ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get()); - - System.out.println("remoteCache1.notes: " + ((UserSessionEntity) remoteCache1.get("123")).getNotes().size() ); - System.out.println("remoteCache2.notes: " + ((UserSessionEntity) remoteCache2.get("123")).getNotes().size() ); - - System.out.println("Histogram: "); - //histogram.dumpStats(); - - // shutdown pools - for (ExecutorService ex : executors) { - ex.shutdown(); - } - - // Finish JVM - cache1.getCacheManager().stop(); - cache2.getCacheManager().stop(); - } - - private static Thread createWorker(Cache> cache, int threadId) { - System.out.println("Retrieved cache: " + threadId); - - RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache); - - if (threadId == 1) { - remoteCache1 = remoteCache; - } else { - remoteCache2 = remoteCache; - } - - AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2; - HotRodListener listener = new HotRodListener(cache, remoteCache, counter); - remoteCache.addClientListener(listener); - - return new RemoteCacheWorker(remoteCache, threadId); - //return new CacheWorker(cache, threadId); - } - - - private static EmbeddedCacheManager createManager(int threadId) { - return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class); - } - - - @ClientListener - public static class HotRodListener { - - private Cache> origCache; - private RemoteCache remoteCache; - private AtomicInteger listenerCount; - - private ExecutorService executor; - - public HotRodListener(Cache> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) { - this.listenerCount = listenerCount; - this.remoteCache = remoteCache; - this.origCache = origCache; - executor = Executors.newCachedThreadPool(); - executors.add(executor); - - } - - @ClientCacheEntryCreated - public void created(ClientCacheEntryCreatedEvent event) { - String cacheKey = (String) event.getKey(); - listenerCount.incrementAndGet(); - } - - @ClientCacheEntryModified - public void updated(ClientCacheEntryModifiedEvent event) { - String cacheKey = (String) event.getKey(); - listenerCount.incrementAndGet(); - - executor.submit(() -> { - // TODO: can be optimized - object sent in the event - VersionedValue versionedVal = remoteCache.getWithMetadata(cacheKey); - for (int i = 0; i < 10; i++) { - - if (versionedVal.getVersion() < event.getVersion()) { - System.err.println("INCOMPATIBLE VERSION. event version: " + event.getVersion() + ", entity version: " + versionedVal.getVersion() + ", i=" + i); - try { - Thread.sleep(100); - } catch (InterruptedException ie) { - throw new RuntimeException(ie); - } - - versionedVal = remoteCache.getWithMetadata(cacheKey); - } else { - break; - } - } - - SessionEntity session = (SessionEntity) versionedVal.getValue(); - SessionEntityWrapper sessionWrapper = new SessionEntityWrapper(session); - - if (listenerCount.get() % 100 == 0) { - logger.infof("Listener count: " + listenerCount.get()); - } - - // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried - origCache - .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE) - .replace(cacheKey, sessionWrapper); - }); - } - - - - - } - - private static class RemoteCacheWorker extends Thread { - - private final RemoteCache remoteCache; - - private final int myThreadId; - - private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) { - this.remoteCache = remoteCache; - this.myThreadId = myThreadId; - } - - @Override - public void run() { - - for (int i=0 ; i versioned = remoteCache.getWithMetadata("123"); - UserSessionEntity oldSession = versioned.getValue(); - //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession); - UserSessionEntity clone = oldSession; - - // In case that exception was thrown (ReplaceStatus.ERROR), the remoteCache may have the note. Seems that transactions are not fully rolled-back on the JDG side - // in case that backup fails - if (replaced == ReplaceStatus.NOT_REPLACED) { - clone.getNotes().put(noteKey, "someVal"); - } else if (replaced == ReplaceStatus.ERROR) { - if (clone.getNotes().containsKey(noteKey)) { - System.err.println("I HAVE THE KEY: " + noteKey); - } else { - System.err.println("I DON'T HAVE THE KEY: " + noteKey); - clone.getNotes().put(noteKey, "someVal"); - } - } - - //cache.replace("123", clone); - replaced = cacheReplace(versioned, clone); - } - - // Try to see if remoteCache on 2nd DC is immediatelly seeing our change - RemoteCache secondDCRemoteCache = myThreadId == 1 ? remoteCache2 : remoteCache1; - //UserSessionEntity thatSession = (UserSessionEntity) secondDCRemoteCache.get("123"); - - //Assert.assertEquals("someVal", thatSession.getNotes().get(noteKey)); - //System.out.println("Passed"); - } - - } - - private ReplaceStatus cacheReplace(VersionedValue oldSession, UserSessionEntity newSession) { - try { - boolean replaced = remoteCache.replaceWithVersion("123", newSession, oldSession.getVersion()); - //boolean replaced = true; - //remoteCache.replace("123", newSession); - if (!replaced) { - failedReplaceCounter.incrementAndGet(); - //return false; - //System.out.println("Replace failed!!!"); - } else { - histogram.increaseSuccessOpsCount(oldSession.getVersion()); - } - return replaced ? ReplaceStatus.REPLACED : ReplaceStatus.NOT_REPLACED; - } catch (Exception re) { - failedReplaceCounter2.incrementAndGet(); - return ReplaceStatus.ERROR; - } - //return replaced; - } - - } - - private enum ReplaceStatus { - REPLACED, NOT_REPLACED, ERROR - } - - - private static void testLifespans() throws Exception { - long l1 = InfinispanUtil.toHotrodTimeMs(remoteCache1, 5000); - long l2 = InfinispanUtil.toHotrodTimeMs(remoteCache2, 2592000000L); - long l3 = InfinispanUtil.toHotrodTimeMs(remoteCache2, 2592000001L); - //long l4 = InfinispanUtil.getLifespanMs(remoteCache1, Time.currentTimeMillis() + 5000); - - remoteCache1.put("k1", "v1", l1, TimeUnit.MILLISECONDS); - remoteCache1.put("k2", "v2", l2, TimeUnit.MILLISECONDS); - remoteCache1.put("k3", "v3", l3, TimeUnit.MILLISECONDS); - remoteCache1.put("k4", "v4", Time.currentTimeMillis() + 5000, TimeUnit.MILLISECONDS); - - System.out.println("l1=" + l1 + ", l2=" + l2 + ", l3=" + l3); - System.out.println("k1=" + remoteCache1.get("k1") + ", k2=" + remoteCache1.get("k2") + ", k3=" + remoteCache1.get("k3") + ", k4=" + remoteCache1.get("k4")); - - Thread.sleep(4000); - - System.out.println("k1=" + remoteCache1.get("k1") + ", k2=" + remoteCache1.get("k2") + ", k3=" + remoteCache1.get("k3") + ", k4=" + remoteCache1.get("k4")); - - Thread.sleep(2000); - - System.out.println("k1=" + remoteCache1.get("k1") + ", k2=" + remoteCache1.get("k2") + ", k3=" + remoteCache1.get("k3") + ", k4=" + remoteCache1.get("k4")); - } -/* - // Worker, which operates on "classic" cache and rely on operations delegated to the second cache - private static class CacheWorker extends Thread { - - private final Cache> cache; - - private final int myThreadId; - - private CacheWorker(Cache> cache, int myThreadId) { - this.cache = cache; - this.myThreadId = myThreadId; - } - - @Override - public void run() { - - for (int i=0 ; i versioned = cache.getVersioned("123"); - UserSessionEntity oldSession = versioned.getValue(); - //UserSessionEntity clone = DistributedCacheConcurrentWritesTest.cloneSession(oldSession); - UserSessionEntity clone = oldSession; - - clone.getNotes().put(noteKey, "someVal"); - //cache.replace("123", clone); - replaced = cacheReplace(versioned, clone); - } - } - - } - - }*/ - - -} diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGOfflineBackupsTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGOfflineBackupsTest.java deleted file mode 100644 index f2094d331b..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGOfflineBackupsTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2017 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.cluster.infinispan; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.infinispan.Cache; -import org.infinispan.client.hotrod.exceptions.HotRodClientException; -import org.infinispan.context.Flag; -import org.infinispan.manager.EmbeddedCacheManager; -import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; -import org.jboss.logging.Logger; -import org.keycloak.common.util.Time; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; - -/** - * @author Marek Posolda - */ -public class ConcurrencyJDGOfflineBackupsTest { - - protected static final Logger logger = Logger.getLogger(ConcurrencyJDGOfflineBackupsTest.class); - - public static void main(String[] args) throws Exception { - - Cache> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - try { - // Create initial item - UserSessionEntity session = new UserSessionEntity("123"); - session.setRealmId("foo"); - session.setBrokerSessionId("!23123123"); - session.setBrokerUserId(null); - session.setUser("foo"); - session.setLoginUsername("foo"); - session.setIpAddress("123.44.143.178"); - session.setStarted(Time.currentTime()); - session.setLastSessionRefresh(Time.currentTime()); - -// AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(); -// clientSession.setAuthMethod("saml"); -// clientSession.setAction("something"); -// clientSession.setTimestamp(1234); -// clientSession.setProtocolMappers(new HashSet<>(Arrays.asList("mapper1", "mapper2"))); -// clientSession.setRoles(new HashSet<>(Arrays.asList("role1", "role2"))); -// session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId()); - - SessionEntityWrapper wrappedSession = new SessionEntityWrapper<>(session); - - // Some dummy testing of remoteStore behaviour - logger.info("Before put"); - - - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger errorsCount = new AtomicInteger(0); - for (int i=0 ; i<100 ; i++) { - try { - cache1 - .getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL) // will still invoke remoteStore . Just doesn't propagate to cluster - .put("123", wrappedSession); - successCount.incrementAndGet(); - Thread.sleep(1000); - logger.infof("Success in the iteration: %d", i); - } catch (HotRodClientException hrce) { - logger.errorf("Failed to put the item in the iteration: %d ", i); - errorsCount.incrementAndGet(); - } - } - - logger.infof("SuccessCount: %d, ErrorsCount: %d", successCount.get(), errorsCount.get()); - -// logger.info("After put"); -// -// cache1.replace("123", wrappedSession); -// -// logger.info("After replace"); -// -// cache1.get("123"); -// -// logger.info("After cache1.get"); - -// cache2.get("123"); -// -// logger.info("After cache2.get"); - - } finally { - // Finish JVM - cache1.getCacheManager().stop(); - } - - } - - private static EmbeddedCacheManager createManager(int threadId) { - return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class); - } - -} diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java deleted file mode 100644 index 99c7e8312f..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoteCacheClientListenersTest.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Copyright 2017 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.cluster.infinispan; - -import org.infinispan.Cache; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.client.hotrod.VersionedValue; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; -import org.infinispan.client.hotrod.annotation.ClientListener; -import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; -import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; -import org.infinispan.factories.ComponentRegistry; -import org.infinispan.manager.EmbeddedCacheManager; -import org.infinispan.persistence.manager.PersistenceManager; -import org.infinispan.persistence.remote.RemoteStore; -import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; -import org.junit.Assert; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.connections.infinispan.InfinispanUtil; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Test that hotrod ClientListeners are correctly executed as expected - * - * STEPS TO REPRODUCE: - * - Unzip infinispan-server-9.2.4.Final to some locations ISPN1 and ISPN2 - * - * - Edit both ISPN1/standalone/configuration/clustered.xml and ISPN2/standalone/configuration/clustered.xml . Configure cache in container "clustered" - * - * - - - - - - - Run server1 - ./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=1010 -Djboss.default.multicast.address=234.56.78.99 -Djboss.node.name=cache-server - - - Run server2 - ./standalone.sh -c clustered.xml -Djava.net.preferIPv4Stack=true -Djboss.socket.binding.port-offset=2010 -Djboss.default.multicast.address=234.56.78.100 -Djboss.node.name=cache-server-dc-2 - - - Run this test as main class from IDE - * - * - * - * @author Marek Posolda - */ -public class ConcurrencyJDGRemoteCacheClientListenersTest { - - // Helper map to track if listeners were executed - private static final Map state = new HashMap<>(); - - private static final AtomicInteger totalListenerCalls = new AtomicInteger(0); - - private static final AtomicInteger totalErrors = new AtomicInteger(0); - - - public static void main(String[] args) throws Exception { - // Init map somehow - for (int i=0 ; i<1000 ; i++) { - String key = "key-" + i; - EntryInfo entryInfo = new EntryInfo(); - entryInfo.val.set(i); - state.put(key, entryInfo); - } - - // Create caches, listeners and finally worker threads - Worker worker1 = createWorker(1); - Worker worker2 = createWorker(2); - - // Note "run", so it's not executed asynchronously here!!! - worker1.run(); - -// -// // Start and join workers -// worker1.start(); -// worker2.start(); -// -// worker1.join(); -// worker2.join(); - - // Output - for (Map.Entry entry : state.entrySet()) { - System.out.println(entry.getKey() + ":::" + entry.getValue()); - } - - System.out.println("totalListeners: " + totalListenerCalls.get() + ", totalErrors: " + totalErrors.get()); - - - // Assert that ClientListener was able to read the value and save it into EntryInfo - try { - for (Map.Entry entry : state.entrySet()) { - EntryInfo info = entry.getValue(); - Assert.assertEquals(info.val.get(), info.dc1Created.get()); - Assert.assertEquals(info.val.get(), info.dc2Created.get()); - Assert.assertEquals(info.val.get() * 2, info.dc1Updated.get()); - Assert.assertEquals(info.val.get() * 2, info.dc2Updated.get()); - } - } finally { - // Remove items - for (Map.Entry entry : state.entrySet()) { - worker1.cache.remove(entry.getKey()); - } - - // Finish JVM - worker1.cache.getCacheManager().stop(); - worker2.cache.getCacheManager().stop(); - } - } - - private static Worker createWorker(int threadId) { - EmbeddedCacheManager manager = new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.WORK_CACHE_NAME, RemoteStoreConfigurationBuilder.class); - Cache cache = manager.getCache(InfinispanConnectionProvider.WORK_CACHE_NAME); - - System.out.println("Retrieved cache: " + threadId); - - RemoteStore remoteStore = ComponentRegistry.componentOf(cache, PersistenceManager.class) - .getStores(RemoteStore.class).iterator().next(); - HotRodListener listener = new HotRodListener(cache, threadId); - remoteStore.getRemoteCache().addClientListener(listener); - - return new Worker(cache, threadId); - } - - - @ClientListener - public static class HotRodListener { - - private final RemoteCache remoteCache; - private final int threadId; - private final Executor executor; - - public HotRodListener(Cache cache, int threadId) { - this.remoteCache = InfinispanUtil.getRemoteCache(cache); - this.threadId = threadId; - this.executor = Executors.newCachedThreadPool(); - } - - //private AtomicInteger listenerCount = new AtomicInteger(0); - - @ClientCacheEntryCreated - public void created(ClientCacheEntryCreatedEvent event) { - executor.execute(() -> event(event.getKey(), event.getVersion(), true)); - - } - - - @ClientCacheEntryModified - public void updated(ClientCacheEntryModifiedEvent event) { - executor.execute(() -> event(event.getKey(), event.getVersion(), false)); - } - - - private void event(String cacheKey, long version, boolean created) { - EntryInfo entryInfo = state.get(cacheKey); - entryInfo.successfulListenerWrites.incrementAndGet(); - - totalListenerCalls.incrementAndGet(); - - VersionedValue versionedVal = remoteCache.getWithMetadata(cacheKey); - - if (versionedVal.getVersion() < version) { - System.err.println("INCOMPATIBLE VERSION. event version: " + version + ", entity version: " + versionedVal.getVersion()); - totalErrors.incrementAndGet(); - return; - } - - Integer val = versionedVal.getValue(); - if (val != null) { - AtomicInteger dcVal; - if (created) { - dcVal = threadId == 1 ? entryInfo.dc1Created : entryInfo.dc2Created; - } else { - dcVal = threadId == 1 ? entryInfo.dc1Updated : entryInfo.dc2Updated; - } - dcVal.set(val); - } else { - System.err.println("NOT A VALUE FOR KEY: " + cacheKey); - totalErrors.incrementAndGet(); - } - } - - } - - - private static void createItems(Cache cache, int myThreadId) { - for (Map.Entry entry : state.entrySet()) { - String cacheKey = entry.getKey(); - Integer value = entry.getValue().val.get(); - - cache.put(cacheKey, value); - } - - System.out.println("Worker creating finished: " + myThreadId); - } - - private static class Worker extends Thread { - - private final Cache cache; - - private final int myThreadId; - - private Worker(Cache cache, int myThreadId) { - this.cache = cache; - this.myThreadId = myThreadId; - } - - @Override - public void run() { - createItems(cache, myThreadId); - - for (Map.Entry entry : state.entrySet()) { - String cacheKey = entry.getKey(); - Integer value = entry.getValue().val.get() * 2; - - this.cache.replace(cacheKey, value); - } - - System.out.println("Worker updating finished: " + myThreadId); - } - - } - - - public static class EntryInfo { - AtomicInteger val = new AtomicInteger(); - AtomicInteger successfulListenerWrites = new AtomicInteger(0); - AtomicInteger dc1Created = new AtomicInteger(); - AtomicInteger dc2Created = new AtomicInteger(); - AtomicInteger dc1Updated = new AtomicInteger(); - AtomicInteger dc2Updated = new AtomicInteger(); - - @Override - public String toString() { - return String.format("val: %d, successfulListenerWrites: %d, dc1Created: %d, dc2Created: %d, dc1Updated: %d, dc2Updated: %d", val.get(), successfulListenerWrites.get(), - dc1Created.get(), dc2Created.get(), dc1Updated.get(), dc2Updated.get()); - } - } -} diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java deleted file mode 100644 index 9606ba50f0..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/ConcurrencyJDGRemoveSessionTest.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Copyright 2017 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.cluster.infinispan; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.infinispan.Cache; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified; -import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved; -import org.infinispan.client.hotrod.annotation.ClientListener; -import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent; -import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent; -import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent; -import org.infinispan.client.hotrod.exceptions.HotRodClientException; -import org.infinispan.context.Flag; -import org.infinispan.manager.EmbeddedCacheManager; -import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; -import org.jboss.logging.Logger; -import org.junit.Assert; -import org.keycloak.common.util.Time; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import org.keycloak.connections.infinispan.InfinispanUtil; -import java.util.UUID; - -/** - * Check that removing of session from remoteCache is session immediately removed on remoteCache in other DC. This is true. - * - * Also check that listeners are executed asynchronously with some delay. - * - * Steps: {@see ConcurrencyJDGRemoteCacheClientListenersTest} - * - * @author Marek Posolda - */ -public class ConcurrencyJDGRemoveSessionTest { - - protected static final Logger logger = Logger.getLogger(ConcurrencyJDGRemoveSessionTest.class); - - private static final int ITERATIONS = 10000; - - private static RemoteCache remoteCache1; - private static RemoteCache remoteCache2; - - private static final AtomicInteger errorsCounter = new AtomicInteger(0); - - private static final AtomicInteger successfulListenerWrites = new AtomicInteger(0); - private static final AtomicInteger successfulListenerWrites2 = new AtomicInteger(0); - - private static Map removalCounts = new ConcurrentHashMap<>(); - - - private static final UUID CLIENT_1_UUID = UUID.randomUUID(); - - public static void main(String[] args) throws Exception { - Cache> cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - Cache> cache2 = createManager(2).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - // Create caches, listeners and finally worker threads - Thread worker1 = createWorker(cache1, 1); - Thread worker2 = createWorker(cache2, 2); - Thread worker3 = createWorker(cache1, 1); - Thread worker4 = createWorker(cache2, 2); - - // Create 100 initial sessions - for (int i=0 ; i wrappedSession = createSessionEntity(sessionId); - cache1.put(sessionId, wrappedSession); - - removalCounts.put(sessionId, new AtomicInteger(0)); - } - - logger.info("SESSIONS CREATED"); - - // Create 100 initial sessions - for (int i=0 ; i histogram = new HashMap<>(); - for (Map.Entry entry : removalCounts.entrySet()) { - int count = entry.getValue().get(); - - int current = histogram.get(count) == null ? 0 : histogram.get(count); - current++; - histogram.put(count, current); - } - - logger.infof("Histogram: %s", histogram.toString()); - logger.infof("Errors: %d", errorsCounter.get()); - - //Thread.sleep(5000); - - // Doing it in opposite direction to ensure that newer are checked first. - // This us currently FAILING (expected) as listeners are executed asynchronously. -// for (int i=ITERATIONS-1 ; i>=0 ; i--) { -// String sessionId = String.valueOf(i); -// -// logger.infof("Before call cache2.get: %s", sessionId); -// -// SessionEntityWrapper loadedWrapper = cache2.get(sessionId); -// Assert.assertNull("Loaded wrapper not null for key " + sessionId, loadedWrapper); -// } -// -// logger.info("SESSIONS NOT AVAILABLE ON DC2"); - - long took = System.currentTimeMillis() - start; - logger.infof("took %d ms", took); - - // // Start and join workers -// worker1.start(); -// worker2.start(); -// -// worker1.join(); -// worker2.join(); - - } finally { - Thread.sleep(2000); - - // Finish JVM - cache1.getCacheManager().stop(); - cache2.getCacheManager().stop(); - } - -// // Output -// for (Map.Entry entry : state.entrySet()) { -// System.out.println(entry.getKey() + ":::" + entry.getValue()); -// worker1.cache.remove(entry.getKey()); -// } - -// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() + -// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + -// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get() ); -// -// System.out.println("Sleeping before other report"); -// -// Thread.sleep(1000); -// -// System.out.println("Finished. Took: " + took + " ms. Notes: " + cache1.get("123").getEntity().getNotes().size() + -// ", successfulListenerWrites: " + successfulListenerWrites.get() + ", successfulListenerWrites2: " + successfulListenerWrites2.get() + -// ", failedReplaceCounter: " + failedReplaceCounter.get() + ", failedReplaceCounter2: " + failedReplaceCounter2.get()); - - - } - - - private static SessionEntityWrapper createSessionEntity(String sessionId) { - // Create 100 initial sessions - UserSessionEntity session = new UserSessionEntity(sessionId); - session.setRealmId("foo"); - session.setBrokerSessionId("!23123123"); - session.setBrokerUserId(null); - session.setUser("foo"); - session.setLoginUsername("foo"); - session.setIpAddress("123.44.143.178"); - session.setStarted(Time.currentTime()); - session.setLastSessionRefresh(Time.currentTime()); - - AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID()); - clientSession.setAuthMethod("saml"); - clientSession.setAction("something"); - clientSession.setTimestamp(1234); - session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId()); - - SessionEntityWrapper wrappedSession = new SessionEntityWrapper<>(session); - return wrappedSession; - } - - - private static Thread createWorker(Cache> cache, int threadId) { - System.out.println("Retrieved cache: " + threadId); - - RemoteCache remoteCache = InfinispanUtil.getRemoteCache(cache); - - if (threadId == 1) { - remoteCache1 = remoteCache; - } else { - remoteCache2 = remoteCache; - } - - AtomicInteger counter = threadId ==1 ? successfulListenerWrites : successfulListenerWrites2; - HotRodListener listener = new HotRodListener(cache, remoteCache, counter); - remoteCache.addClientListener(listener); - - return new RemoteCacheWorker(remoteCache, threadId); - //return new CacheWorker(cache, threadId); - } - - - private static EmbeddedCacheManager createManager(int threadId) { - return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class); - } - - - @ClientListener - public static class HotRodListener { - - private Cache> origCache; - private RemoteCache remoteCache; - private AtomicInteger listenerCount; - - public HotRodListener(Cache> origCache, RemoteCache remoteCache, AtomicInteger listenerCount) { - this.listenerCount = listenerCount; - this.remoteCache = remoteCache; - this.origCache = origCache; - } - - - @ClientCacheEntryCreated - public void created(ClientCacheEntryCreatedEvent event) { - String cacheKey = (String) event.getKey(); - - logger.infof("Listener executed for creating of session %s", cacheKey); - } - - - @ClientCacheEntryModified - public void modified(ClientCacheEntryModifiedEvent event) { - String cacheKey = (String) event.getKey(); - - logger.infof("Listener executed for modifying of session %s", cacheKey); - } - - - @ClientCacheEntryRemoved - public void removed(ClientCacheEntryRemovedEvent event) { - String cacheKey = (String) event.getKey(); - - logger.infof("Listener executed for removing of session %s", cacheKey); - - // TODO: for distributed caches, ensure that it is executed just on owner OR if event.isCommandRetried - origCache - .getAdvancedCache().withFlags(Flag.SKIP_CACHE_LOAD, Flag.SKIP_CACHE_STORE) - .remove(cacheKey); - - } - - } - - private static class RemoteCacheWorker extends Thread { - - private final RemoteCache remoteCache; - - private final int myThreadId; - - private RemoteCacheWorker(RemoteCache remoteCache, int myThreadId) { - this.remoteCache = remoteCache; - this.myThreadId = myThreadId; - } - - @Override - public void run() { - - for (int i=0 ; iMarek Posolda - */ -public class ConcurrencyTestHistogram { - - private final ConcurrentMap counters = new ConcurrentHashMap<>(); - - - public ConcurrencyTestHistogram() { - - } - - - public void increaseSuccessOpsCount(long version) { - AtomicInteger counter = new AtomicInteger(0); - AtomicInteger existing = counters.putIfAbsent(version, counter); - if (existing != null) { - counter = existing; - } - - counter.incrementAndGet(); - } - - - public void dumpStats() { - for (Map.Entry entry : counters.entrySet()) { - System.out.println(entry.getKey() + "=" + entry.getValue().get()); - } - } -} diff --git a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/JDGPutTest.java b/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/JDGPutTest.java deleted file mode 100644 index cdf023fe57..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/cluster/infinispan/JDGPutTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2020 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.keycloak.cluster.infinispan; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import org.infinispan.Cache; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.commons.util.CloseableIterator; -import org.infinispan.manager.EmbeddedCacheManager; -import org.infinispan.persistence.remote.configuration.RemoteStoreConfigurationBuilder; -import org.jboss.logging.Logger; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.remotestore.RemoteCacheSessionsLoaderContext; -import org.keycloak.connections.infinispan.InfinispanUtil; - -/** - * @author Marek Posolda - */ -public class JDGPutTest { - - public static final Logger logger = Logger.getLogger(JDGPutTest.class); - - public static void main(String[] args) throws Exception { - Cache cache1 = createManager(1).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - Cache cache2 = createManager(2).getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - try { - //RemoteCache remoteCache1 = InfinispanUtil.getRemoteCache(cache1); - //RemoteCache remoteCache2 = InfinispanUtil.getRemoteCache(cache2); - - //remoteCache1.put("key1", new Book("book1", "desc", 1)); - //remoteCache2.put("key2", ); - String uuidStr = UUID.randomUUID().toString(); - System.out.println(uuidStr); - UUID uuid = UUID.fromString(uuidStr); - AuthenticatedClientSessionEntity ace = new AuthenticatedClientSessionEntity(uuid); - SessionEntityWrapper wrapper = new SessionEntityWrapper(ace); - - cache1.put("key1", wrapper); - //cache1.put("key1", "val1"); - - //AuthenticatedClientSessionEntity val1 = (AuthenticatedClientSessionEntity) cache2.get("key1"); - //RemoteCache remoteCache1 = InfinispanUtil.getRemoteCache(cache1); - //remoteCache1.put("key1", "val1"); - RemoteCache remoteCache2 = InfinispanUtil.getRemoteCache(cache2); - Object o = remoteCache2.get("key1"); - - logger.info("Before retrieve entries"); - try (CloseableIterator it = remoteCache2.retrieveEntries(null, 64)) { - Object o2 = it.next(); - logger.info("o2: " + o2); - } - - //Object key = remoteCache2.keySet().iterator().next(); - //Object value = remoteCache2.values().iterator().next(); - //logger.info("Key: " + key + ", val: " + value); - - bulkLoadSessions(remoteCache2); - } finally { - Thread.sleep(2000); - - // Finish JVM - cache1.getCacheManager().stop(); - cache2.getCacheManager().stop(); - } - } - - private static EmbeddedCacheManager createManager(int threadId) { - return new TestCacheManagerFactory().createManager(threadId, InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, RemoteStoreConfigurationBuilder.class); - } - - private static void bulkLoadSessions(RemoteCache remoteCache) { - RemoteCacheSessionsLoaderContext ctx = new RemoteCacheSessionsLoaderContext(64); - - Map toInsert = new HashMap<>(ctx.getSessionsPerSegment()); - - try (CloseableIterator> it = remoteCache.retrieveEntries(null, ctx.getSessionsPerSegment())) { - while (it.hasNext()) { - Map.Entry entry = it.next(); - toInsert.put(entry.getKey(), entry.getValue()); - } - - } catch (RuntimeException e) { - logger.warnf(e, "Error loading sessions from remote cache '%s'", remoteCache.getName()); - throw e; - } - - logger.info("Loaded " + toInsert); - - } - - -} diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java deleted file mode 100644 index 52e36e411d..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheConcurrentWritesTest.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright 2016 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.models.sessions.infinispan.initializer; - -import java.util.HashMap; -import java.util.Map; - -import org.infinispan.Cache; -import org.infinispan.client.hotrod.ProtocolVersion; -import org.infinispan.client.hotrod.RemoteCache; -import org.infinispan.client.hotrod.RemoteCacheManager; -import org.infinispan.commons.api.BasicCache; -import org.infinispan.configuration.cache.CacheMode; -import org.infinispan.configuration.cache.Configuration; -import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.configuration.global.GlobalConfigurationBuilder; -import org.infinispan.manager.DefaultCacheManager; -import org.infinispan.manager.EmbeddedCacheManager; -import org.junit.Ignore; -import org.keycloak.common.util.Time; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.changes.SessionEntityWrapper; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import java.util.UUID; - -/** - * Test concurrent writes to distributed cache with usage of atomic replace - * - * @author Marek Posolda - */ -@Ignore -public class DistributedCacheConcurrentWritesTest { - - private static final int BATCHES_PER_WORKER = 1000; - private static final int ITEMS_IN_BATCH = 100; - - public static void main(String[] args) throws Exception { - BasicCache> cache1 = createCache("node1"); - BasicCache> cache2 = createCache("node2"); - - // NOTE: This setup requires infinispan servers to be up and running on localhost:12232 and localhost:13232 -// BasicCache> cache1 = createRemoteCache("node1"); -// BasicCache> cache2 = createRemoteCache("node2"); - - try { - testConcurrentPut(cache1, cache2); - } finally { - - // Kill JVM - cache1.stop(); - cache2.stop(); - stopMgr(cache1); - stopMgr(cache2); - - System.out.println("Managers killed"); - } - } - - - private static SessionEntityWrapper createEntityInstance(String id) { - // Create initial item - UserSessionEntity session = new UserSessionEntity(id); - session.setRealmId("foo"); - session.setBrokerSessionId("!23123123"); - session.setBrokerUserId(null); - session.setUser("foo"); - session.setLoginUsername("foo"); - session.setIpAddress("123.44.143.178"); - session.setStarted(Time.currentTime()); - session.setLastSessionRefresh(Time.currentTime()); - - AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID()); - clientSession.setAuthMethod("saml"); - clientSession.setAction("something"); - clientSession.setTimestamp(1234); - session.getAuthenticatedClientSessions().put("foo-client", clientSession.getId()); - - return new SessionEntityWrapper<>(session); - } - - - // Reproducer for KEYCLOAK-7443 and KEYCLOAK-7489. The infinite loop can happen if cache.replace(key, old, new) is called and entity was removed on one cluster node in the meantime - private static void testConcurrentPut(BasicCache> cache1, - BasicCache> cache2) throws InterruptedException { - - // Create workers for concurrent write and start them - Worker worker1 = new Worker(1, cache1); - Worker worker2 = new Worker(2, cache2); - - long start = System.currentTimeMillis(); - - System.out.println("Started clustering test"); - - worker1.start(); - //worker1.join(); - worker2.start(); - - worker1.join(); - worker2.join(); - - long took = System.currentTimeMillis() - start; - - System.out.println("Test finished. Took: " + took + " ms. Cache size: " + cache1.size()); - } - - - private static class Worker extends Thread { - - private final BasicCache> cache; - private final int startIndex; - - public Worker(int threadId, BasicCache> cache) { - this.cache = cache; - this.startIndex = (threadId - 1) * (ITEMS_IN_BATCH * BATCHES_PER_WORKER); - setName("th-" + threadId); - } - - @Override - public void run() { - - for (int page = 0; page < BATCHES_PER_WORKER ; page++) { - int startPageIndex = startIndex + page * ITEMS_IN_BATCH; - - putItemsClassic(startPageIndex); - //putItemsAll(startPageIndex); - - System.out.println("Thread " + getName() + ": Saved items from " + startPageIndex + " to " + (startPageIndex + ITEMS_IN_BATCH - 1)); - } - } - - - // put items 1 by 1 - private void putItemsClassic(int startPageIndex) { - for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) { - String key = "key-" + startIndex + i; - SessionEntityWrapper session = createEntityInstance(key); - cache.put(key, session); - } - } - - - // put all items together - private void putItemsAll(int startPageIndex) { - Map> mapp = new HashMap<>(); - - for (int i = startPageIndex ; i < (startPageIndex + ITEMS_IN_BATCH) ; i++) { - String key = "key-" + startIndex + i; - SessionEntityWrapper session = createEntityInstance(key); - mapp.put(key, session); - } - - cache.putAll(mapp); - } - } - - - // Cache creation utils - - - public static BasicCache> createCache(String nodeName) { - EmbeddedCacheManager mgr = createManager(nodeName); - Cache> cache = mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - return cache; - } - - - public static EmbeddedCacheManager createManager(String nodeName) { - System.setProperty("java.net.preferIPv4Stack", "true"); - System.setProperty("jgroups.tcp.port", "53715"); - - GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); - gcb = gcb.clusteredDefault(); - gcb.transport().clusterName("test-clustering"); - gcb.transport().nodeName(nodeName); - gcb.jmx().domain(InfinispanConnectionProvider.JMX_DOMAIN).enable(); - EmbeddedCacheManager cacheManager = new DefaultCacheManager(gcb.build()); - - ConfigurationBuilder distConfigBuilder = new ConfigurationBuilder(); - distConfigBuilder.clustering().cacheMode(CacheMode.DIST_SYNC); - distConfigBuilder.clustering().hash().numOwners(1); - - // Disable L1 cache - distConfigBuilder.clustering().hash().l1().enabled(false); - Configuration distConfig = distConfigBuilder.build(); - cacheManager.defineConfiguration(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME, distConfig); - - return cacheManager; - } - - - public static BasicCache> createRemoteCache(String nodeName) { - int port = ("node1".equals(nodeName)) ? 12232 : 13232; - - org.infinispan.client.hotrod.configuration.ConfigurationBuilder builder = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder(); - org.infinispan.client.hotrod.configuration.Configuration cfg = builder - .addServer().host("localhost").port(port) - .version(ProtocolVersion.PROTOCOL_VERSION_26) - .build(); - RemoteCacheManager mgr = new RemoteCacheManager(cfg); - return mgr.getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - } - - // CLEANUP METHODS - - private static void stopMgr(BasicCache cache) { - if (cache instanceof Cache) { - ((Cache) cache).getCacheManager().stop(); - } else { - ((RemoteCache) cache).getRemoteCacheManager().stop(); - } - } - -} diff --git a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheWriteSkewTest.java b/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheWriteSkewTest.java deleted file mode 100644 index c8a86c172f..0000000000 --- a/model/infinispan/src/test/java/org/keycloak/models/sessions/infinispan/initializer/DistributedCacheWriteSkewTest.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Copyright 2016 Red Hat, Inc. and/or its affiliates - * and other contributors as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.keycloak.models.sessions.infinispan.initializer; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.infinispan.Cache; -import org.infinispan.configuration.cache.CacheMode; -import org.infinispan.configuration.cache.Configuration; -import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.configuration.global.GlobalConfigurationBuilder; -import org.infinispan.manager.DefaultCacheManager; -import org.infinispan.manager.EmbeddedCacheManager; -import org.infinispan.transaction.LockingMode; -import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup; -import org.infinispan.util.concurrent.IsolationLevel; -import org.keycloak.common.util.Time; -import org.keycloak.connections.infinispan.InfinispanConnectionProvider; -import org.keycloak.models.sessions.infinispan.entities.AuthenticatedClientSessionEntity; -import org.keycloak.models.sessions.infinispan.entities.UserSessionEntity; -import java.util.UUID; - -/** - * Test concurrent writes to distributed cache with usage of write skew - * - * @author Marek Posolda - */ -//@Ignore -public class DistributedCacheWriteSkewTest { - - private static final int ITERATION_PER_WORKER = 1000; - - private static final AtomicInteger failedReplaceCounter = new AtomicInteger(0); - - private static final UUID CLIENT_1_UUID = UUID.randomUUID(); - - public static void main(String[] args) throws Exception { - Cache cache1 = createManager("node1").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - Cache cache2 = createManager("node2").getCache(InfinispanConnectionProvider.USER_SESSION_CACHE_NAME); - - // Create initial item - UserSessionEntity session = new UserSessionEntity("123"); - session.setRealmId("foo"); - session.setBrokerSessionId("!23123123"); - session.setBrokerUserId(null); - session.setUser("foo"); - session.setLoginUsername("foo"); - session.setIpAddress("123.44.143.178"); - session.setStarted(Time.currentTime()); - session.setLastSessionRefresh(Time.currentTime()); - - AuthenticatedClientSessionEntity clientSession = new AuthenticatedClientSessionEntity(UUID.randomUUID()); - clientSession.setAuthMethod("saml"); - clientSession.setAction("something"); - clientSession.setTimestamp(1234); - session.getAuthenticatedClientSessions().put(CLIENT_1_UUID.toString(), clientSession.getId()); - - cache1.put("123", session); - - //cache1.replace("123", session); - - // Create 2 workers for concurrent write and start them - Worker worker1 = new Worker(1, cache1); - Worker worker2 = new Worker(2, cache2); - - long start = System.currentTimeMillis(); - - System.out.println("Started clustering test"); - - worker1.start(); - //worker1.join(); - worker2.start(); - - worker1.join(); - worker2.join(); - - long took = System.currentTimeMillis() - start; - session = cache1.get("123"); - System.out.println("Took: " + took + " ms. Notes count: " + session.getNotes().size() + ", failedReplaceCounter: " + failedReplaceCounter.get()); - - // Kill JVM - cache1.stop(); - cache2.stop(); - cache1.getCacheManager().stop(); - cache2.getCacheManager().stop(); - - System.out.println("Managers killed"); - } - - - private static class Worker extends Thread { - - private final Cache cache; - private final int threadId; - - public Worker(int threadId, Cache cache) { - this.threadId = threadId; - this.cache = cache; - } - - @Override - public void run() { - - for (int i=0 ; i