Add a tombstone operation to optimize multiple deletes
Closes #31699 Signed-off-by: Alexander Schwartz <aschwart@redhat.com>
This commit is contained in:
parent
5895faddaa
commit
557cf1e60e
2 changed files with 37 additions and 13 deletions
|
@ -122,10 +122,12 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
|
||||||
|
|
||||||
Object taskKey = getTaskKey(cache, key);
|
Object taskKey = getTaskKey(cache, key);
|
||||||
CacheTask current = tasks.get(taskKey);
|
CacheTask current = tasks.get(taskKey);
|
||||||
if (current != null) {
|
if (current != null && current != TOMBSTONE && current.getOperation() != Operation.REMOVE) {
|
||||||
if (current instanceof CacheTaskWithValue) {
|
if (current instanceof CacheTaskWithValue) {
|
||||||
((CacheTaskWithValue<V>) current).setValue(value);
|
((CacheTaskWithValue<V>) current).setValue(value);
|
||||||
((CacheTaskWithValue<V>) current).updateLifespan(lifespan, lifespanUnit);
|
((CacheTaskWithValue<V>) current).updateLifespan(lifespan, lifespanUnit);
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("Can't replace entry: task " + current + " in progress for session");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tasks.put(taskKey, new CacheTaskWithValue<V>(value, lifespan, lifespanUnit) {
|
tasks.put(taskKey, new CacheTaskWithValue<V>(value, lifespan, lifespanUnit) {
|
||||||
|
@ -171,6 +173,10 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
|
||||||
return String.format("CacheTask: Operation 'remove' for key %s", key);
|
return String.format("CacheTask: Operation 'remove' for key %s", key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Operation getOperation() {
|
||||||
|
return Operation.REMOVE;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,9 +206,13 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
|
||||||
|
|
||||||
public interface CacheTask {
|
public interface CacheTask {
|
||||||
void execute();
|
void execute();
|
||||||
|
|
||||||
|
default Operation getOperation() {
|
||||||
|
return Operation.OTHER;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public enum Operation { PUT, OTHER }
|
public enum Operation { PUT, REMOVE, OTHER }
|
||||||
|
|
||||||
public static abstract class CacheTaskWithValue<V> implements CacheTask {
|
public static abstract class CacheTaskWithValue<V> implements CacheTask {
|
||||||
protected V value;
|
protected V value;
|
||||||
|
@ -227,10 +237,6 @@ public class InfinispanKeycloakTransaction implements KeycloakTransaction {
|
||||||
this.lifespan = lifespan;
|
this.lifespan = lifespan;
|
||||||
this.lifespanUnit = lifespanUnit;
|
this.lifespanUnit = lifespanUnit;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Operation getOperation() {
|
|
||||||
return Operation.OTHER;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ignore return values. Should have better performance within cluster / cross-dc env
|
// Ignore return values. Should have better performance within cluster / cross-dc env
|
||||||
|
|
|
@ -31,6 +31,7 @@ import io.reactivex.rxjava3.core.Flowable;
|
||||||
import org.infinispan.client.hotrod.MetadataValue;
|
import org.infinispan.client.hotrod.MetadataValue;
|
||||||
import org.infinispan.client.hotrod.RemoteCache;
|
import org.infinispan.client.hotrod.RemoteCache;
|
||||||
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
|
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
|
||||||
|
import org.infinispan.commons.util.concurrent.CompletableFutures;
|
||||||
import org.infinispan.commons.util.concurrent.CompletionStages;
|
import org.infinispan.commons.util.concurrent.CompletionStages;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
import org.keycloak.models.AbstractKeycloakTransaction;
|
import org.keycloak.models.AbstractKeycloakTransaction;
|
||||||
|
@ -76,7 +77,7 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
|
||||||
logger.tracef("Adding %s.put(%S)", cache.getName(), key);
|
logger.tracef("Adding %s.put(%S)", cache.getName(), key);
|
||||||
|
|
||||||
if (tasks.containsKey(key)) {
|
if (tasks.containsKey(key)) {
|
||||||
throw new IllegalStateException("Can't add session: task in progress for session");
|
throw new IllegalStateException("Can't add entry: task " + tasks.get(key) + " in progress for session");
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks.put(key, new PutOperation<>(key, value, lifespan, timeUnit));
|
tasks.put(key, new PutOperation<>(key, value, lifespan, timeUnit));
|
||||||
|
@ -86,9 +87,11 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
|
||||||
logger.tracef("Adding %s.replace(%S)", cache.getName(), key);
|
logger.tracef("Adding %s.replace(%S)", cache.getName(), key);
|
||||||
|
|
||||||
Operation<K, V> existing = tasks.get(key);
|
Operation<K, V> existing = tasks.get(key);
|
||||||
if (existing != null) {
|
if (existing != null && existing != TOMBSTONE && !(existing instanceof RemoteInfinispanKeycloakTransaction.RemoveOperation<K,V>)) {
|
||||||
if (existing.hasValue()) {
|
if (existing.hasValue()) {
|
||||||
tasks.put(key, existing.update(value, lifespan, timeUnit));
|
tasks.put(key, existing.update(value, lifespan, timeUnit));
|
||||||
|
} else {
|
||||||
|
throw new IllegalStateException("Can't replace entry: task " + existing + " in progress for session");
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -101,7 +104,8 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
|
||||||
|
|
||||||
Operation<K, V> existing = tasks.get(key);
|
Operation<K, V> existing = tasks.get(key);
|
||||||
if (existing != null && existing.canRemove()) {
|
if (existing != null && existing.canRemove()) {
|
||||||
tasks.remove(key);
|
//noinspection unchecked
|
||||||
|
tasks.put(key, (Operation<K, V>) TOMBSTONE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,4 +258,18 @@ public class RemoteInfinispanKeycloakTransaction<K, V> extends AbstractKeycloakT
|
||||||
return cache.removeAsync(key);
|
return cache.removeAsync(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final Operation<?,?> TOMBSTONE = new Operation<>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canRemove() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletionStage<?> execute(RemoteCache<Object, Object> cache) {
|
||||||
|
return CompletableFutures.completedNull();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue