KEYCLOAK-17014 Searching all users from admin console is very slow
This commit is contained in:
parent
cffe24f815
commit
6da396821a
6 changed files with 1169 additions and 2 deletions
|
@ -0,0 +1,265 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.keycloak.utils;
|
||||||
|
|
||||||
|
import java.util.DoubleSummaryStatistics;
|
||||||
|
import java.util.OptionalDouble;
|
||||||
|
import java.util.PrimitiveIterator;
|
||||||
|
import java.util.Spliterator;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.DoubleBinaryOperator;
|
||||||
|
import java.util.function.DoubleConsumer;
|
||||||
|
import java.util.function.DoubleFunction;
|
||||||
|
import java.util.function.DoublePredicate;
|
||||||
|
import java.util.function.DoubleToIntFunction;
|
||||||
|
import java.util.function.DoubleToLongFunction;
|
||||||
|
import java.util.function.DoubleUnaryOperator;
|
||||||
|
import java.util.function.ObjDoubleConsumer;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.DoubleStream;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This stream will automatically close itself after terminal operation.
|
||||||
|
*/
|
||||||
|
class ClosingDoubleStream implements DoubleStream {
|
||||||
|
|
||||||
|
private final DoubleStream delegate;
|
||||||
|
|
||||||
|
public ClosingDoubleStream(DoubleStream delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream filter(DoublePredicate predicate) {
|
||||||
|
return new ClosingDoubleStream(delegate.filter(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream map(DoubleUnaryOperator mapper) {
|
||||||
|
return new ClosingDoubleStream(delegate.map(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
|
||||||
|
return new ClosingStream<>(delegate.mapToObj(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream mapToInt(DoubleToIntFunction mapper) {
|
||||||
|
return new ClosingIntStream(delegate.mapToInt(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream mapToLong(DoubleToLongFunction mapper) {
|
||||||
|
return new ClosingLongStream(delegate.mapToLong(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
|
||||||
|
return new ClosingDoubleStream(delegate.flatMap(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream distinct() {
|
||||||
|
return new ClosingDoubleStream(delegate.distinct());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream sorted() {
|
||||||
|
return new ClosingDoubleStream(delegate.sorted());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream peek(DoubleConsumer action) {
|
||||||
|
return new ClosingDoubleStream(delegate.peek(action));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream limit(long maxSize) {
|
||||||
|
return new ClosingDoubleStream(delegate.limit(maxSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream skip(long n) {
|
||||||
|
return new ClosingDoubleStream(delegate.skip(n));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEach(DoubleConsumer action) {
|
||||||
|
delegate.forEach(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEachOrdered(DoubleConsumer action) {
|
||||||
|
delegate.forEachOrdered(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double[] toArray() {
|
||||||
|
double[] result = delegate.toArray();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double reduce(double identity, DoubleBinaryOperator op) {
|
||||||
|
double result = delegate.reduce(identity, op);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble reduce(DoubleBinaryOperator op) {
|
||||||
|
OptionalDouble result = delegate.reduce(op);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
|
||||||
|
R result = delegate.collect(supplier, accumulator, combiner);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double sum() {
|
||||||
|
double result = delegate.sum();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble min() {
|
||||||
|
OptionalDouble result = delegate.min();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble max() {
|
||||||
|
OptionalDouble result = delegate.max();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long count() {
|
||||||
|
long result = delegate.count();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble average() {
|
||||||
|
OptionalDouble result = delegate.average();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleSummaryStatistics summaryStatistics() {
|
||||||
|
DoubleSummaryStatistics result = delegate.summaryStatistics();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean anyMatch(DoublePredicate predicate) {
|
||||||
|
boolean result = delegate.anyMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allMatch(DoublePredicate predicate) {
|
||||||
|
boolean result = delegate.allMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean noneMatch(DoublePredicate predicate) {
|
||||||
|
boolean result = delegate.noneMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble findFirst() {
|
||||||
|
OptionalDouble result = delegate.findFirst();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble findAny() {
|
||||||
|
OptionalDouble result = delegate.findAny();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<Double> boxed() {
|
||||||
|
return new ClosingStream<>(delegate.boxed());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream sequential() {
|
||||||
|
return new ClosingDoubleStream(delegate.sequential());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream parallel() {
|
||||||
|
return new ClosingDoubleStream(delegate.parallel());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream unordered() {
|
||||||
|
return new ClosingDoubleStream(delegate.unordered());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream onClose(Runnable closeHandler) {
|
||||||
|
return new ClosingDoubleStream(delegate.onClose(closeHandler));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PrimitiveIterator.OfDouble iterator() {
|
||||||
|
return delegate.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Spliterator.OfDouble spliterator() {
|
||||||
|
return delegate.spliterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isParallel() {
|
||||||
|
return delegate.isParallel();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,280 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.keycloak.utils;
|
||||||
|
|
||||||
|
import java.util.IntSummaryStatistics;
|
||||||
|
import java.util.OptionalDouble;
|
||||||
|
import java.util.OptionalInt;
|
||||||
|
import java.util.PrimitiveIterator;
|
||||||
|
import java.util.Spliterator;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.IntBinaryOperator;
|
||||||
|
import java.util.function.IntConsumer;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
import java.util.function.IntPredicate;
|
||||||
|
import java.util.function.IntToDoubleFunction;
|
||||||
|
import java.util.function.IntToLongFunction;
|
||||||
|
import java.util.function.IntUnaryOperator;
|
||||||
|
import java.util.function.ObjIntConsumer;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.DoubleStream;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This stream will automatically close itself after terminal operation.
|
||||||
|
*/
|
||||||
|
class ClosingIntStream implements IntStream {
|
||||||
|
|
||||||
|
private final IntStream delegate;
|
||||||
|
|
||||||
|
public ClosingIntStream(IntStream delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream filter(IntPredicate predicate) {
|
||||||
|
return new ClosingIntStream(delegate.filter(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream map(IntUnaryOperator mapper) {
|
||||||
|
return new ClosingIntStream(delegate.map(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
|
||||||
|
return new ClosingStream<>(delegate.mapToObj(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream mapToLong(IntToLongFunction mapper) {
|
||||||
|
return new ClosingLongStream(delegate.mapToLong(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
|
||||||
|
return new ClosingDoubleStream(delegate.mapToDouble(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream flatMap(IntFunction<? extends IntStream> mapper) {
|
||||||
|
return new ClosingIntStream(delegate.flatMap(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream distinct() {
|
||||||
|
return new ClosingIntStream(delegate.distinct());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream sorted() {
|
||||||
|
return new ClosingIntStream(delegate.sorted());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream peek(IntConsumer action) {
|
||||||
|
return new ClosingIntStream(delegate.peek(action));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream limit(long maxSize) {
|
||||||
|
return new ClosingIntStream(delegate.limit(maxSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream skip(long n) {
|
||||||
|
return new ClosingIntStream(delegate.skip(n));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEach(IntConsumer action) {
|
||||||
|
delegate.forEach(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEachOrdered(IntConsumer action) {
|
||||||
|
delegate.forEachOrdered(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int[] toArray() {
|
||||||
|
int[] result = delegate.toArray();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int reduce(int identity, IntBinaryOperator op) {
|
||||||
|
int result = delegate.reduce(identity, op);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalInt reduce(IntBinaryOperator op) {
|
||||||
|
OptionalInt result = delegate.reduce(op);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R> R collect(Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
|
||||||
|
R result = delegate.collect(supplier, accumulator, combiner);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int sum() {
|
||||||
|
int result = delegate.sum();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalInt min() {
|
||||||
|
OptionalInt result = delegate.min();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalInt max() {
|
||||||
|
OptionalInt result = delegate.max();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long count() {
|
||||||
|
long result = delegate.count();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble average() {
|
||||||
|
OptionalDouble result = delegate.average();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntSummaryStatistics summaryStatistics() {
|
||||||
|
IntSummaryStatistics result = delegate.summaryStatistics();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean anyMatch(IntPredicate predicate) {
|
||||||
|
boolean result = delegate.anyMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allMatch(IntPredicate predicate) {
|
||||||
|
boolean result = delegate.allMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean noneMatch(IntPredicate predicate) {
|
||||||
|
boolean result = delegate.noneMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalInt findFirst() {
|
||||||
|
OptionalInt result = delegate.findFirst();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalInt findAny() {
|
||||||
|
OptionalInt result = delegate.findAny();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream asLongStream() {
|
||||||
|
LongStream result = delegate.asLongStream();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream asDoubleStream() {
|
||||||
|
DoubleStream result = delegate.asDoubleStream();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<Integer> boxed() {
|
||||||
|
return new ClosingStream<>(delegate.boxed());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream sequential() {
|
||||||
|
return new ClosingIntStream(delegate.sequential());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream parallel() {
|
||||||
|
return new ClosingIntStream(delegate.parallel());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream unordered() {
|
||||||
|
return new ClosingIntStream(delegate.unordered());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream onClose(Runnable closeHandler) {
|
||||||
|
return new ClosingIntStream(delegate.onClose(closeHandler));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PrimitiveIterator.OfInt iterator() {
|
||||||
|
return delegate.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Spliterator.OfInt spliterator() {
|
||||||
|
return delegate.spliterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isParallel() {
|
||||||
|
return delegate.isParallel();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,273 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.keycloak.utils;
|
||||||
|
|
||||||
|
import java.util.LongSummaryStatistics;
|
||||||
|
import java.util.OptionalDouble;
|
||||||
|
import java.util.OptionalLong;
|
||||||
|
import java.util.PrimitiveIterator;
|
||||||
|
import java.util.Spliterator;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.LongBinaryOperator;
|
||||||
|
import java.util.function.LongConsumer;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
import java.util.function.LongPredicate;
|
||||||
|
import java.util.function.LongToDoubleFunction;
|
||||||
|
import java.util.function.LongToIntFunction;
|
||||||
|
import java.util.function.LongUnaryOperator;
|
||||||
|
import java.util.function.ObjLongConsumer;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.DoubleStream;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This stream will automatically close itself after terminal operation.
|
||||||
|
*/
|
||||||
|
class ClosingLongStream implements LongStream {
|
||||||
|
|
||||||
|
private final LongStream delegate;
|
||||||
|
|
||||||
|
public ClosingLongStream(LongStream delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream filter(LongPredicate predicate) {
|
||||||
|
return new ClosingLongStream(delegate.filter(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream map(LongUnaryOperator mapper) {
|
||||||
|
return new ClosingLongStream(delegate.map(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
|
||||||
|
return new ClosingStream<>(delegate.mapToObj(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream mapToInt(LongToIntFunction mapper) {
|
||||||
|
return new ClosingIntStream(delegate.mapToInt(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
|
||||||
|
return new ClosingDoubleStream(delegate.mapToDouble(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream flatMap(LongFunction<? extends LongStream> mapper) {
|
||||||
|
return new ClosingLongStream(delegate.flatMap(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream distinct() {
|
||||||
|
return new ClosingLongStream(delegate.distinct());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream sorted() {
|
||||||
|
return new ClosingLongStream(delegate.sorted());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream peek(LongConsumer action) {
|
||||||
|
return new ClosingLongStream(delegate.peek(action));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream limit(long maxSize) {
|
||||||
|
return new ClosingLongStream(delegate.limit(maxSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream skip(long n) {
|
||||||
|
return new ClosingLongStream(delegate.skip(n));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEach(LongConsumer action) {
|
||||||
|
delegate.forEach(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEachOrdered(LongConsumer action) {
|
||||||
|
delegate.forEachOrdered(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long[] toArray() {
|
||||||
|
long[] result = delegate.toArray();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long reduce(long identity, LongBinaryOperator op) {
|
||||||
|
long result = delegate.reduce(identity, op);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalLong reduce(LongBinaryOperator op) {
|
||||||
|
OptionalLong result = delegate.reduce(op);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R> R collect(Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
|
||||||
|
R result = delegate.collect(supplier, accumulator, combiner);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long sum() {
|
||||||
|
long result = delegate.sum();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalLong min() {
|
||||||
|
OptionalLong result = delegate.min();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalLong max() {
|
||||||
|
OptionalLong result = delegate.max();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long count() {
|
||||||
|
long result = delegate.count();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalDouble average() {
|
||||||
|
OptionalDouble result = delegate.average();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongSummaryStatistics summaryStatistics() {
|
||||||
|
LongSummaryStatistics result = delegate.summaryStatistics();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean anyMatch(LongPredicate predicate) {
|
||||||
|
boolean result = delegate.anyMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allMatch(LongPredicate predicate) {
|
||||||
|
boolean result = delegate.allMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean noneMatch(LongPredicate predicate) {
|
||||||
|
boolean result = delegate.noneMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalLong findFirst() {
|
||||||
|
OptionalLong result = delegate.findFirst();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public OptionalLong findAny() {
|
||||||
|
OptionalLong result = delegate.findAny();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream asDoubleStream() {
|
||||||
|
DoubleStream result = delegate.asDoubleStream();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<Long> boxed() {
|
||||||
|
return new ClosingStream<>(delegate.boxed());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream sequential() {
|
||||||
|
return new ClosingLongStream(delegate.sequential());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream parallel() {
|
||||||
|
return new ClosingLongStream(delegate.parallel());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream unordered() {
|
||||||
|
return new ClosingLongStream(delegate.unordered());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream onClose(Runnable closeHandler) {
|
||||||
|
return new ClosingLongStream(delegate.onClose(closeHandler));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PrimitiveIterator.OfLong iterator() {
|
||||||
|
return delegate.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Spliterator.OfLong spliterator() {
|
||||||
|
return delegate.spliterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isParallel() {
|
||||||
|
return delegate.isParallel();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,283 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.keycloak.utils;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Spliterator;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.BinaryOperator;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.function.ToDoubleFunction;
|
||||||
|
import java.util.function.ToIntFunction;
|
||||||
|
import java.util.function.ToLongFunction;
|
||||||
|
import java.util.stream.Collector;
|
||||||
|
import java.util.stream.DoubleStream;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
import java.util.stream.LongStream;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This stream will automatically close itself after terminal operation.
|
||||||
|
*/
|
||||||
|
class ClosingStream<R> implements Stream<R> {
|
||||||
|
|
||||||
|
private final Stream<R> delegate;
|
||||||
|
|
||||||
|
public ClosingStream(Stream<R> delegate) {
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> filter(Predicate<? super R> predicate) {
|
||||||
|
return new ClosingStream<>(delegate.filter(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R1> Stream<R1> map(Function<? super R, ? extends R1> mapper) {
|
||||||
|
return new ClosingStream<>(delegate.map(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream mapToInt(ToIntFunction<? super R> mapper) {
|
||||||
|
return new ClosingIntStream(delegate.mapToInt(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream mapToLong(ToLongFunction<? super R> mapper) {
|
||||||
|
return new ClosingLongStream(delegate.mapToLong(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream mapToDouble(ToDoubleFunction<? super R> mapper) {
|
||||||
|
return new ClosingDoubleStream(delegate.mapToDouble(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R1> Stream<R1> flatMap(Function<? super R, ? extends Stream<? extends R1>> mapper) {
|
||||||
|
return new ClosingStream<>(delegate.flatMap(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntStream flatMapToInt(Function<? super R, ? extends IntStream> mapper) {
|
||||||
|
return new ClosingIntStream(delegate.flatMapToInt(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongStream flatMapToLong(Function<? super R, ? extends LongStream> mapper) {
|
||||||
|
return new ClosingLongStream(delegate.flatMapToLong(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DoubleStream flatMapToDouble(Function<? super R, ? extends DoubleStream> mapper) {
|
||||||
|
return new ClosingDoubleStream(delegate.flatMapToDouble(mapper));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> distinct() {
|
||||||
|
return new ClosingStream<>(delegate.distinct());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> sorted() {
|
||||||
|
return new ClosingStream<>(delegate.sorted());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> sorted(Comparator<? super R> comparator) {
|
||||||
|
return new ClosingStream<>(delegate.sorted(comparator));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> peek(Consumer<? super R> action) {
|
||||||
|
return new ClosingStream<>(delegate.peek(action));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> limit(long maxSize) {
|
||||||
|
return new ClosingStream<>(delegate.limit(maxSize));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> skip(long n) {
|
||||||
|
return new ClosingStream<>(delegate.skip(n));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEach(Consumer<? super R> action) {
|
||||||
|
delegate.forEach(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEachOrdered(Consumer<? super R> action) {
|
||||||
|
delegate.forEachOrdered(action);
|
||||||
|
close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] toArray() {
|
||||||
|
Object[] result = delegate.toArray();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <A> A[] toArray(IntFunction<A[]> generator) {
|
||||||
|
A[] result = delegate.toArray(generator);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R reduce(R identity, BinaryOperator<R> accumulator) {
|
||||||
|
R result = delegate.reduce(identity, accumulator);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<R> reduce(BinaryOperator<R> accumulator) {
|
||||||
|
Optional<R> result = delegate.reduce(accumulator);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <U> U reduce(U identity, BiFunction<U, ? super R, U> accumulator, BinaryOperator<U> combiner) {
|
||||||
|
U result = delegate.reduce(identity, accumulator, combiner);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R1> R1 collect(Supplier<R1> supplier, BiConsumer<R1, ? super R> accumulator, BiConsumer<R1, R1> combiner) {
|
||||||
|
R1 result = delegate.collect(supplier, accumulator, combiner);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <R1, A> R1 collect(Collector<? super R, A, R1> collector) {
|
||||||
|
R1 result = delegate.collect(collector);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<R> min(Comparator<? super R> comparator) {
|
||||||
|
Optional<R> result = delegate.min(comparator);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<R> max(Comparator<? super R> comparator) {
|
||||||
|
Optional<R> result = delegate.max(comparator);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long count() {
|
||||||
|
long result = delegate.count();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean anyMatch(Predicate<? super R> predicate) {
|
||||||
|
boolean result = delegate.anyMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allMatch(Predicate<? super R> predicate) {
|
||||||
|
boolean result = delegate.allMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean noneMatch(Predicate<? super R> predicate) {
|
||||||
|
boolean result = delegate.noneMatch(predicate);
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<R> findFirst() {
|
||||||
|
Optional<R> result = delegate.findFirst();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<R> findAny() {
|
||||||
|
Optional<R> result = delegate.findAny();
|
||||||
|
close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<R> iterator() {
|
||||||
|
return delegate.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Spliterator<R> spliterator() {
|
||||||
|
return delegate.spliterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isParallel() {
|
||||||
|
return delegate.isParallel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> sequential() {
|
||||||
|
return new ClosingStream<>(delegate.sequential());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> parallel() {
|
||||||
|
return new ClosingStream<>(delegate.parallel());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> unordered() {
|
||||||
|
return new ClosingStream<>(delegate.unordered());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<R> onClose(Runnable closeHandler) {
|
||||||
|
return new ClosingStream<>(delegate.onClose(closeHandler));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -19,7 +19,6 @@ package org.keycloak.utils;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Spliterators;
|
import java.util.Spliterators;
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
|
@ -34,7 +33,7 @@ public class StreamsUtil {
|
||||||
* @return stream that will be closed on terminating operation
|
* @return stream that will be closed on terminating operation
|
||||||
*/
|
*/
|
||||||
public static <T> Stream<T> closing(Stream<T> stream) {
|
public static <T> Stream<T> closing(Stream<T> stream) {
|
||||||
return Stream.of(stream).flatMap(Function.identity());
|
return new ClosingStream<>(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2021 Red Hat, Inc. and/or its affiliates
|
||||||
|
* and other contributors as indicated by the @author tags.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.keycloak.utils;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class StreamsUtilTest {
|
||||||
|
|
||||||
|
private static final Consumer<Integer> NOOP = o -> {
|
||||||
|
};
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoClosingOfClosingStream() {
|
||||||
|
AtomicBoolean closed = new AtomicBoolean();
|
||||||
|
StreamsUtil.closing(Stream.of(1, 2, 3).onClose(() -> closed.set(true))).forEach(NOOP);
|
||||||
|
|
||||||
|
Assert.assertTrue(closed.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleClosingHandlersOnClosingStream() {
|
||||||
|
AtomicInteger firstHandlerFiringCount = new AtomicInteger();
|
||||||
|
AtomicInteger secondHandlerFiringCount = new AtomicInteger();
|
||||||
|
AtomicInteger thirdHandlerFiringCount = new AtomicInteger();
|
||||||
|
|
||||||
|
StreamsUtil.closing(Stream.of(1, 2, 3).onClose(firstHandlerFiringCount::incrementAndGet))
|
||||||
|
.onClose(secondHandlerFiringCount::incrementAndGet).mapToInt(value -> value)
|
||||||
|
.onClose(thirdHandlerFiringCount::incrementAndGet).forEach(value -> {
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertEquals(1, firstHandlerFiringCount.get());
|
||||||
|
Assert.assertEquals(1, secondHandlerFiringCount.get());
|
||||||
|
Assert.assertEquals(1, thirdHandlerFiringCount.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLimitOnClosingStream() {
|
||||||
|
AtomicInteger numberOfFetchedElements = new AtomicInteger();
|
||||||
|
|
||||||
|
Stream.of(new Object())
|
||||||
|
.flatMap(
|
||||||
|
o -> StreamsUtil.closing(Stream.of(1, 2, 3).peek(integer -> numberOfFetchedElements.incrementAndGet())))
|
||||||
|
.limit(1).forEach(NOOP);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, numberOfFetchedElements.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in a new issue