Ensure that concatenated Stream is closed once read

Fixes: #15781
This commit is contained in:
Hynek Mlnarik 2023-02-15 13:53:26 +01:00 committed by Hynek Mlnařík
parent 1c79a5666d
commit e30e1eca68
5 changed files with 418 additions and 8 deletions

View file

@ -16,6 +16,7 @@
*/ */
package org.keycloak.utils; package org.keycloak.utils;
import java.util.Comparator;
import java.util.DoubleSummaryStatistics; import java.util.DoubleSummaryStatistics;
import java.util.OptionalDouble; import java.util.OptionalDouble;
import java.util.PrimitiveIterator; import java.util.PrimitiveIterator;
@ -250,16 +251,110 @@ class ClosingDoubleStream implements DoubleStream {
@Override @Override
public PrimitiveIterator.OfDouble iterator() { public PrimitiveIterator.OfDouble iterator() {
return delegate.iterator(); return new ClosingIterator(delegate.iterator());
} }
@Override @Override
public Spliterator.OfDouble spliterator() { public Spliterator.OfDouble spliterator() {
return delegate.spliterator(); return new ClosingSpliterator(delegate.spliterator());
} }
@Override @Override
public boolean isParallel() { public boolean isParallel() {
return delegate.isParallel(); return delegate.isParallel();
} }
private class ClosingIterator implements PrimitiveIterator.OfDouble {
private final PrimitiveIterator.OfDouble iterator;
public ClosingIterator(PrimitiveIterator.OfDouble iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
final boolean res = iterator.hasNext();
if (! res) {
close();
}
return res;
}
@Override
public Double next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(DoubleConsumer action) {
iterator.forEachRemaining(action);
close();
}
@Override
public double nextDouble() {
return iterator.nextDouble();
}
}
private class ClosingSpliterator implements Spliterator.OfDouble {
private final Spliterator.OfDouble spliterator;
public ClosingSpliterator(Spliterator.OfDouble spliterator) {
this.spliterator = spliterator;
}
@Override
public boolean tryAdvance(DoubleConsumer action) {
final boolean res = spliterator.tryAdvance(action);
if (! res) {
close();
}
return res;
}
@Override
public void forEachRemaining(DoubleConsumer action) {
spliterator.forEachRemaining(action);
close();
}
@Override
public Spliterator.OfDouble trySplit() {
return spliterator.trySplit();
}
@Override
public long estimateSize() {
return spliterator.estimateSize();
}
@Override
public long getExactSizeIfKnown() {
return spliterator.getExactSizeIfKnown();
}
@Override
public int characteristics() {
return spliterator.characteristics();
}
@Override
public boolean hasCharacteristics(int characteristics) {
return spliterator.hasCharacteristics(characteristics);
}
@Override
public Comparator<? super Double> getComparator() {
return spliterator.getComparator();
}
}
} }

View file

@ -16,6 +16,7 @@
*/ */
package org.keycloak.utils; package org.keycloak.utils;
import java.util.Comparator;
import java.util.IntSummaryStatistics; import java.util.IntSummaryStatistics;
import java.util.OptionalDouble; import java.util.OptionalDouble;
import java.util.OptionalInt; import java.util.OptionalInt;
@ -265,16 +266,110 @@ class ClosingIntStream implements IntStream {
@Override @Override
public PrimitiveIterator.OfInt iterator() { public PrimitiveIterator.OfInt iterator() {
return delegate.iterator(); return new ClosingIterator(delegate.iterator());
} }
@Override @Override
public Spliterator.OfInt spliterator() { public Spliterator.OfInt spliterator() {
return delegate.spliterator(); return new ClosingSpliterator(delegate.spliterator());
} }
@Override @Override
public boolean isParallel() { public boolean isParallel() {
return delegate.isParallel(); return delegate.isParallel();
} }
private class ClosingIterator implements PrimitiveIterator.OfInt {
private final PrimitiveIterator.OfInt iterator;
public ClosingIterator(PrimitiveIterator.OfInt iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
final boolean res = iterator.hasNext();
if (! res) {
close();
}
return res;
}
@Override
public Integer next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(IntConsumer action) {
iterator.forEachRemaining(action);
close();
}
@Override
public int nextInt() {
return iterator.nextInt();
}
}
private class ClosingSpliterator implements Spliterator.OfInt {
private final Spliterator.OfInt spliterator;
public ClosingSpliterator(Spliterator.OfInt spliterator) {
this.spliterator = spliterator;
}
@Override
public boolean tryAdvance(IntConsumer action) {
final boolean res = spliterator.tryAdvance(action);
if (! res) {
close();
}
return res;
}
@Override
public void forEachRemaining(IntConsumer action) {
spliterator.forEachRemaining(action);
close();
}
@Override
public Spliterator.OfInt trySplit() {
return spliterator.trySplit();
}
@Override
public long estimateSize() {
return spliterator.estimateSize();
}
@Override
public long getExactSizeIfKnown() {
return spliterator.getExactSizeIfKnown();
}
@Override
public int characteristics() {
return spliterator.characteristics();
}
@Override
public boolean hasCharacteristics(int characteristics) {
return spliterator.hasCharacteristics(characteristics);
}
@Override
public Comparator<? super Integer> getComparator() {
return spliterator.getComparator();
}
}
} }

View file

@ -16,6 +16,7 @@
*/ */
package org.keycloak.utils; package org.keycloak.utils;
import java.util.Comparator;
import java.util.LongSummaryStatistics; import java.util.LongSummaryStatistics;
import java.util.OptionalDouble; import java.util.OptionalDouble;
import java.util.OptionalLong; import java.util.OptionalLong;
@ -258,16 +259,110 @@ class ClosingLongStream implements LongStream {
@Override @Override
public PrimitiveIterator.OfLong iterator() { public PrimitiveIterator.OfLong iterator() {
return delegate.iterator(); return new ClosingIterator(delegate.iterator());
} }
@Override @Override
public Spliterator.OfLong spliterator() { public Spliterator.OfLong spliterator() {
return delegate.spliterator(); return new ClosingSpliterator(delegate.spliterator());
} }
@Override @Override
public boolean isParallel() { public boolean isParallel() {
return delegate.isParallel(); return delegate.isParallel();
} }
private class ClosingIterator implements PrimitiveIterator.OfLong {
private final PrimitiveIterator.OfLong iterator;
public ClosingIterator(PrimitiveIterator.OfLong iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
final boolean res = iterator.hasNext();
if (! res) {
close();
}
return res;
}
@Override
public Long next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(LongConsumer action) {
iterator.forEachRemaining(action);
close();
}
@Override
public long nextLong() {
return iterator.nextLong();
}
}
private class ClosingSpliterator implements Spliterator.OfLong {
private final Spliterator.OfLong spliterator;
public ClosingSpliterator(Spliterator.OfLong spliterator) {
this.spliterator = spliterator;
}
@Override
public boolean tryAdvance(LongConsumer action) {
final boolean res = spliterator.tryAdvance(action);
if (! res) {
close();
}
return res;
}
@Override
public void forEachRemaining(LongConsumer action) {
spliterator.forEachRemaining(action);
close();
}
@Override
public Spliterator.OfLong trySplit() {
return spliterator.trySplit();
}
@Override
public long estimateSize() {
return spliterator.estimateSize();
}
@Override
public long getExactSizeIfKnown() {
return spliterator.getExactSizeIfKnown();
}
@Override
public int characteristics() {
return spliterator.characteristics();
}
@Override
public boolean hasCharacteristics(int characteristics) {
return spliterator.hasCharacteristics(characteristics);
}
@Override
public Comparator<? super Long> getComparator() {
return spliterator.getComparator();
}
}
} }

View file

@ -242,12 +242,12 @@ class ClosingStream<R> implements Stream<R> {
@Override @Override
public Iterator<R> iterator() { public Iterator<R> iterator() {
return delegate.iterator(); return new ClosingIterator(delegate.iterator());
} }
@Override @Override
public Spliterator<R> spliterator() { public Spliterator<R> spliterator() {
return delegate.spliterator(); return new ClosingSpliterator(delegate.spliterator());
} }
@Override @Override
@ -280,4 +280,92 @@ class ClosingStream<R> implements Stream<R> {
delegate.close(); delegate.close();
} }
private class ClosingIterator implements Iterator<R> {
private final Iterator<R> iterator;
public ClosingIterator(Iterator<R> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasNext() {
final boolean res = iterator.hasNext();
if (! res) {
close();
}
return res;
}
@Override
public R next() {
return iterator.next();
}
@Override
public void remove() {
iterator.remove();
}
@Override
public void forEachRemaining(Consumer<? super R> action) {
iterator.forEachRemaining(action);
close();
}
}
private class ClosingSpliterator implements Spliterator<R> {
private final Spliterator<R> spliterator;
public ClosingSpliterator(Spliterator<R> spliterator) {
this.spliterator = spliterator;
}
@Override
public boolean tryAdvance(Consumer<? super R> action) {
final boolean res = spliterator.tryAdvance(action);
if (! res) {
close();
}
return res;
}
@Override
public void forEachRemaining(Consumer<? super R> action) {
spliterator.forEachRemaining(action);
close();
}
@Override
public Spliterator<R> trySplit() {
return spliterator.trySplit();
}
@Override
public long estimateSize() {
return spliterator.estimateSize();
}
@Override
public long getExactSizeIfKnown() {
return spliterator.getExactSizeIfKnown();
}
@Override
public int characteristics() {
return spliterator.characteristics();
}
@Override
public boolean hasCharacteristics(int characteristics) {
return spliterator.hasCharacteristics(characteristics);
}
@Override
public Comparator<? super R> getComparator() {
return spliterator.getComparator();
}
}
} }

View file

@ -36,6 +36,43 @@ public class StreamsUtilTest {
Assert.assertTrue(closed.get()); Assert.assertTrue(closed.get());
} }
@Test
public void testAutoClosingOfClosingStreamOuter() {
AtomicBoolean closed = new AtomicBoolean();
StreamsUtil.closing(Stream.of(1, 2, 3)).onClose(() -> closed.set(true)).forEach(NOOP);
Assert.assertTrue(closed.get());
}
@Test
public void testAutoClosingOfClosingStreamFlatMap() {
AtomicBoolean closed = new AtomicBoolean();
Stream.of("value")
.flatMap(v -> StreamsUtil.closing(Stream.of(1, 2, 3)).onClose(() -> closed.set(true)))
.forEach(NOOP);
Assert.assertTrue(closed.get());
}
@Test
public void testAutoClosingOfClosingUsingIterator() {
AtomicBoolean closed = new AtomicBoolean();
StreamsUtil.closing(Stream.of(1, 2, 3).onClose(() -> closed.set(true))).iterator().forEachRemaining(NOOP);
Assert.assertTrue(closed.get());
}
@Test
public void testAutoClosingOfClosingUsingConcat() {
AtomicBoolean closed = new AtomicBoolean();
Stream.concat(
Stream.of(4, 5),
StreamsUtil.closing(Stream.of(1, 2, 3).onClose(() -> closed.set(true)))
).iterator().forEachRemaining(NOOP);
Assert.assertTrue(closed.get());
}
@Test @Test
public void testMultipleClosingHandlersOnClosingStream() { public void testMultipleClosingHandlersOnClosingStream() {
AtomicInteger firstHandlerFiringCount = new AtomicInteger(); AtomicInteger firstHandlerFiringCount = new AtomicInteger();