KEYCLOAK-3298 Bit more perf improvement for bulk removal of sessions

This commit is contained in:
mposolda 2017-08-14 12:31:07 +02:00
parent a80808c5c9
commit fc777e166c
2 changed files with 91 additions and 13 deletions

View file

@ -52,6 +52,7 @@ import org.keycloak.models.sessions.infinispan.stream.Mappers;
import org.keycloak.models.sessions.infinispan.stream.SessionPredicate;
import org.keycloak.models.sessions.infinispan.stream.UserLoginFailurePredicate;
import org.keycloak.models.sessions.infinispan.stream.UserSessionPredicate;
import org.keycloak.models.sessions.infinispan.util.FuturesHelper;
import org.keycloak.models.sessions.infinispan.util.InfinispanUtil;
import java.util.Iterator;
@ -59,6 +60,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@ -396,11 +398,11 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
int expired = Time.currentTime() - realm.getSsoSessionMaxLifespan();
int expiredRefresh = Time.currentTime() - realm.getSsoSessionIdleTimeout();
FuturesHelper futures = new FuturesHelper();
// Each cluster node cleanups just local sessions, which are those owned by himself (+ few more taking l1 cache into account)
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(sessionCache);
int[] counter = { 0 };
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
// Ignore remoteStore for stream iteration. But we will invoke remoteStore for userSession removal propagate
@ -413,14 +415,15 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void accept(String sessionId) {
counter[0]++;
tx.remove(localCache, sessionId);
Future future = localCache.removeAsync(sessionId);
futures.addTask(future);
}
});
futures.waitForAllToFinish();
log.debugf("Removed %d expired user sessions for realm '%s'", counter[0], realm.getName());
log.debugf("Removed %d expired user sessions for realm '%s'", futures.size(), realm.getName());
}
private void removeExpiredOfflineUserSessions(RealmModel realm) {
@ -432,7 +435,7 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
UserSessionPredicate predicate = UserSessionPredicate.create(realm.getId()).expired(null, expiredOffline);
final int[] counter = { 0 };
FuturesHelper futures = new FuturesHelper();
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
@ -446,8 +449,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void accept(UserSessionEntity userSessionEntity) {
counter[0]++;
tx.remove(localCache, userSessionEntity.getId());
Future future = localCache.removeAsync(userSessionEntity.getId());
futures.addTask(future);
// TODO:mposolda can be likely optimized to delete all expired at one step
persister.removeUserSession( userSessionEntity.getId(), true);
@ -459,7 +462,9 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
});
log.debugf("Removed %d expired offline user sessions for realm '%s'", counter, realm.getName());
futures.waitForAllToFinish();
log.debugf("Removed %d expired offline user sessions for realm '%s'", futures.size(), realm.getName());
}
@Override
@ -475,6 +480,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
private void removeLocalUserSessions(String realmId, boolean offline) {
FuturesHelper futures = new FuturesHelper();
Cache<String, SessionEntityWrapper<UserSessionEntity>> cache = getCache(offline);
Cache<String, SessionEntityWrapper<UserSessionEntity>> localCache = CacheDecorators.localCache(cache);
@ -489,11 +496,16 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
@Override
public void accept(String sessionId) {
// Remove session from remoteCache too
localCache.remove(sessionId);
// Remove session from remoteCache too. Use removeAsync for better perf
Future future = localCache.removeAsync(sessionId);
futures.addTask(future);
}
});
futures.waitForAllToFinish();
log.debugf("Removed %d sessions in realm %s. Offline: %b", (Object) futures.size(), realmId, offline);
}
@Override
@ -529,6 +541,8 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
}
private void removeAllLocalUserLoginFailuresEvent(String realmId) {
FuturesHelper futures = new FuturesHelper();
Cache<LoginFailureKey, LoginFailureEntity> localCache = CacheDecorators.localCache(loginFailureCache);
Cache<LoginFailureKey, LoginFailureEntity> localCacheStoreIgnore = CacheDecorators.skipCacheLoaders(localCache);
@ -539,9 +553,14 @@ public class InfinispanUserSessionProvider implements UserSessionProvider {
.filter(UserLoginFailurePredicate.create(realmId))
.map(Mappers.loginFailureId())
.forEach(loginFailureKey -> {
// Remove loginFailure from remoteCache too
localCache.remove(loginFailureKey);
// Remove loginFailure from remoteCache too. Use removeAsync for better perf
Future future = localCache.removeAsync(loginFailureKey);
futures.addTask(future);
});
futures.waitForAllToFinish();
log.debugf("Removed %d login failures in realm %s", futures.size(), realmId);
}
@Override

View file

@ -0,0 +1,59 @@
/*
* Copyright 2017 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.sessions.infinispan.util;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.jboss.logging.Logger;
/**
* Not thread-safe. Assumes tasks are added from single thread.
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class FuturesHelper {
private static final Logger log = Logger.getLogger(FuturesHelper.class);
private final Queue<Future> futures = new LinkedList<>();
public void addTask(Future future) {
this.futures.add(future);
}
public void waitForAllToFinish() {
for (Future future : futures) {
try {
future.get();
} catch (ExecutionException | InterruptedException ee) {
log.error("Exception when waiting for future", ee); // TODO Possibly some good mechanism to avoid swamp log with many same exceptions?
}
}
}
public int size() {
return futures.size();
}
}