KEYCLOAK-4066 TimeoutException in cluster environment in ClearExpiredSessions

This commit is contained in:
mposolda 2017-01-10 18:18:36 +01:00
parent 811d532b48
commit 7098daaf72
7 changed files with 206 additions and 10 deletions

View file

@ -164,9 +164,16 @@ public class DefaultInfinispanConnectionProviderFactory implements InfinispanCon
throw new RuntimeException("Invalid value for sessionsMode");
}
sessionConfigBuilder.clustering().hash()
.numOwners(config.getInt("sessionsOwners", 2))
.numSegments(config.getInt("sessionsSegments", 60)).build();
int l1Lifespan = config.getInt("l1Lifespan", 600000);
boolean l1Enabled = l1Lifespan > 0;
sessionConfigBuilder.clustering()
.hash()
.numOwners(config.getInt("sessionsOwners", 2))
.numSegments(config.getInt("sessionsSegments", 60))
.l1()
.enabled(l1Enabled)
.lifespan(l1Lifespan)
.build();
}
Configuration sessionCacheConfiguration = sessionConfigBuilder.build();

View file

@ -19,6 +19,7 @@ package org.keycloak.models.sessions.infinispan;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.context.Flag;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.models.ClientInitialAccessModel;
@ -291,6 +292,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void removeExpired(RealmModel realm) {
log.debugf("Removing expired sessions");
removeExpiredUserSessions(realm);
removeExpiredClientSessions(realm);
removeExpiredOfflineUserSessions(realm);
@ -302,9 +304,13 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
int expired = Time.currentTime() - realm.getSsoSessionMaxLifespan();
int expiredRefresh = Time.currentTime() - realm.getSsoSessionIdleTimeout();
Iterator<Map.Entry<String, SessionEntity>> itr = sessionCache.entrySet().stream().filter(UserSessionPredicate.create(realm.getId()).expired(expired, expiredRefresh)).iterator();
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
Iterator<Map.Entry<String, SessionEntity>> itr = sessionCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
.entrySet().stream().filter(UserSessionPredicate.create(realm.getId()).expired(expired, expiredRefresh)).iterator();
int counter = 0;
while (itr.hasNext()) {
counter++;
UserSessionEntity entity = (UserSessionEntity) itr.next().getValue();
tx.remove(sessionCache, entity.getId());
@ -314,23 +320,38 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
}
}
log.debugf("Removed %d expired user sessions for realm '%s'", counter, realm.getName());
}
private void removeExpiredClientSessions(RealmModel realm) {
int expiredDettachedClientSession = Time.currentTime() - RealmInfoUtil.getDettachedClientSessionLifespan(realm);
Iterator<Map.Entry<String, SessionEntity>> itr = sessionCache.entrySet().stream().filter(ClientSessionPredicate.create(realm.getId()).expiredRefresh(expiredDettachedClientSession).requireNullUserSession()).iterator();
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
Iterator<Map.Entry<String, SessionEntity>> itr = sessionCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
.entrySet().stream().filter(ClientSessionPredicate.create(realm.getId()).expiredRefresh(expiredDettachedClientSession).requireNullUserSession()).iterator();
int counter = 0;
while (itr.hasNext()) {
counter++;
tx.remove(sessionCache, itr.next().getKey());
}
log.debugf("Removed %d expired client sessions for realm '%s'", counter, realm.getName());
}
private void removeExpiredOfflineUserSessions(RealmModel realm) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout();
Iterator<Map.Entry<String, SessionEntity>> itr = offlineSessionCache.entrySet().stream().filter(UserSessionPredicate.create(realm.getId()).expired(null, expiredOffline)).iterator();
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
UserSessionPredicate predicate = UserSessionPredicate.create(realm.getId()).expired(null, expiredOffline);
Iterator<Map.Entry<String, SessionEntity>> itr = offlineSessionCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
.entrySet().stream().filter(predicate).iterator();
int counter = 0;
while (itr.hasNext()) {
counter++;
UserSessionEntity entity = (UserSessionEntity) itr.next().getValue();
tx.remove(offlineSessionCache, entity.getId());
@ -340,22 +361,32 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
tx.remove(offlineSessionCache, clientSessionId);
}
}
log.debugf("Removed %d expired offline user sessions for realm '%s'", counter, realm.getName());
}
private void removeExpiredOfflineClientSessions(RealmModel realm) {
UserSessionPersisterProvider persister = session.getProvider(UserSessionPersisterProvider.class);
int expiredOffline = Time.currentTime() - realm.getOfflineSessionIdleTimeout();
Iterator<String> itr = offlineSessionCache.entrySet().stream().filter(ClientSessionPredicate.create(realm.getId()).expiredRefresh(expiredOffline)).map(Mappers.sessionId()).iterator();
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
Iterator<String> itr = offlineSessionCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
.entrySet().stream().filter(ClientSessionPredicate.create(realm.getId()).expiredRefresh(expiredOffline)).map(Mappers.sessionId()).iterator();
int counter = 0;
while (itr.hasNext()) {
counter++;
String sessionId = itr.next();
tx.remove(offlineSessionCache, sessionId);
persister.removeClientSession(sessionId, true);
}
log.debugf("Removed %d expired offline client sessions for realm '%s'", counter, realm.getName());
}
private void removeExpiredClientInitialAccess(RealmModel realm) {
Iterator<String> itr = sessionCache.entrySet().stream().filter(ClientInitialAccessPredicate.create(realm.getId()).expired(Time.currentTime())).map(Mappers.sessionId()).iterator();
Iterator<String> itr = sessionCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
.entrySet().stream().filter(ClientInitialAccessPredicate.create(realm.getId()).expired(Time.currentTime())).map(Mappers.sessionId()).iterator();
while (itr.hasNext()) {
tx.remove(sessionCache, itr.next());
}

View file

@ -60,7 +60,7 @@ public class SessionInitializerWorker implements DistributedCallable<String, Ser
KeycloakSessionFactory sessionFactory = workCache.getAdvancedCache().getComponentRegistry().getComponent(KeycloakSessionFactory.class);
if (sessionFactory == null) {
log.warnf("KeycloakSessionFactory not yet set in cache. Worker skipped");
log.debugf("KeycloakSessionFactory not yet set in cache. Worker skipped");
return InfinispanUserSessionInitializer.WorkerResult.create(segment, false);
}

View file

@ -0,0 +1,142 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.model;
import java.util.List;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
import org.keycloak.models.UserSessionModel;
import org.keycloak.testsuite.KeycloakServer;
import org.keycloak.testsuite.rule.KeycloakRule;
/**
* Run test with shared MySQL DB and in cluster:
*
* -Dkeycloak.connectionsJpa.url=jdbc:mysql://localhost/keycloak -Dkeycloak.connectionsJpa.driver=com.mysql.jdbc.Driver -Dkeycloak.connectionsJpa.user=keycloak
* -Dkeycloak.connectionsJpa.password=keycloak -Dkeycloak.connectionsInfinispan.clustered=true
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
@Ignore
public class ClusterSessionCleanerTest {
protected static final Logger logger = Logger.getLogger(ClusterSessionCleanerTest.class);
private static final String REALM_NAME = "test";
@ClassRule
public static KeycloakRule server1 = new KeycloakRule();
@ClassRule
public static KeycloakRule server2 = new KeycloakRule() {
@Override
protected void configureServer(KeycloakServer server) {
server.getConfig().setPort(8082);
}
@Override
protected void importRealm() {
}
@Override
protected void removeTestRealms() {
}
};
@Test
public void testClusterPeriodicSessionCleanups() throws Exception {
// Add some userSessions on server1
KeycloakSession session1 = server1.startSession();
RealmModel realm1 = session1.realms().getRealmByName(REALM_NAME);
UserModel user1 = session1.users().getUserByUsername("test-user@localhost", realm1);
for (int i=0 ; i<15 ; i++) {
session1.sessions().createUserSession(realm1, user1, user1.getUsername(), "127.0.0.1", "form", true, null, null);
}
session1 = commit(server1, session1);
// Add some userSessions on server2
KeycloakSession session2 = server2.startSession();
RealmModel realm2 = session2.realms().getRealmByName(REALM_NAME);
UserModel user2 = session2.users().getUserByUsername("test-user@localhost", realm2);
// Check we are really in cluster (same user ids)
Assert.assertEquals(user2.getId(), user1.getId());
for (int i=0 ; i<15 ; i++) {
session2.sessions().createUserSession(realm2, user2, user2.getUsername(), "127.0.0.1", "form", true, null, null);
}
session2 = commit(server2, session2);
// Assert sessions on both nodes
List<UserSessionModel> sessions1 = getSessions(session1);
List<UserSessionModel> sessions2 = getSessions(session2);
Assert.assertEquals(30, sessions1.size());
Assert.assertEquals(30, sessions2.size());
logger.info("Before offset: sessions1 : " + sessions1.size());
logger.info("Before offset: sessions2 : " + sessions2.size());
// set Time offset and run periodic cleaner on server1
Time.setOffset(999999);
realm1 = session1.realms().getRealmByName(REALM_NAME);
session1.sessions().removeExpired(realm1);
session1 = commit(server1, session1);
// Ensure some sessions still there
sessions1 = getSessions(session1);
sessions2 = getSessions(session2);
logger.info("After server1 periodic clean: sessions1 : " + sessions1.size());
logger.info("After server1 periodic clean: sessions2 : " + sessions2.size());
// Run periodic cleaner on server2
realm2 = session2.realms().getRealmByName(REALM_NAME);
session2.sessions().removeExpired(realm2);
session2 = commit(server1, session2);
// Ensure there are no sessions on server1 or server2
sessions1 = getSessions(session1);
sessions2 = getSessions(session2);
Assert.assertTrue(sessions1.isEmpty());
Assert.assertTrue(sessions2.isEmpty());
logger.info("After both periodic cleans: sessions1 : " + sessions1.size());
logger.info("After both periodic cleans: sessions2 : " + sessions2.size());
}
private List<UserSessionModel> getSessions(KeycloakSession session) {
RealmModel realm = session.realms().getRealmByName(REALM_NAME);
UserModel user = session.users().getUserByUsername("test-user@localhost", realm);
return session.sessions().getUserSessions(realm, user);
}
private KeycloakSession commit(KeycloakRule rule, KeycloakSession session) throws Exception {
session.getTransactionManager().commit();
session.close();
return rule.startSession();
}
}

View file

@ -223,4 +223,18 @@ public abstract class AbstractOfflineCacheCommand extends AbstractCommand {
}
}
public static class SizeLocalCommand extends AbstractOfflineCacheCommand {
@Override
public String getName() {
return "sizeLocal";
}
@Override
protected void doRunCacheCommand(KeycloakSession session, Cache<String, SessionEntity> cache) {
log.info("Size local: " + cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).size());
}
}
}

View file

@ -47,6 +47,7 @@ public class TestsuiteCLI {
AbstractOfflineCacheCommand.GetCommand.class,
AbstractOfflineCacheCommand.GetMultipleCommand.class,
AbstractOfflineCacheCommand.GetLocalCommand.class,
AbstractOfflineCacheCommand.SizeLocalCommand.class,
AbstractOfflineCacheCommand.RemoveCommand.class,
AbstractOfflineCacheCommand.SizeCommand.class,
AbstractOfflineCacheCommand.ListCommand.class,

View file

@ -97,7 +97,8 @@
"default": {
"clustered": "${keycloak.connectionsInfinispan.clustered:false}",
"async": "${keycloak.connectionsInfinispan.async:false}",
"sessionsOwners": "${keycloak.connectionsInfinispan.sessionsOwners:2}",
"sessionsOwners": "${keycloak.connectionsInfinispan.sessionsOwners:1}",
"l1Lifespan": "${keycloak.connectionsInfinispan.l1Lifespan:600000}",
"remoteStoreEnabled": "${keycloak.connectionsInfinispan.remoteStoreEnabled:false}",
"remoteStoreHost": "${keycloak.connectionsInfinispan.remoteStoreHost:localhost}",
"remoteStorePort": "${keycloak.connectionsInfinispan.remoteStorePort:11222}"