/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
An iterable that consumes a Publisher in a blocking fashion.
It also implements methods to stream the contents via Stream
that also supports cancellation.
Type parameters: - <T> – the value type
/**
* An iterable that consumes a Publisher in a blocking fashion.
* <p>
* <p> It also implements methods to stream the contents via Stream
* that also supports cancellation.
*
* @param <T> the value type
*/
final class BlockingIterable<T> implements Iterable<T>, Scannable {
final CorePublisher<? extends T> source;
final int batchSize;
final Supplier<Queue<T>> queueSupplier;
BlockingIterable(CorePublisher<? extends T> source,
int batchSize,
Supplier<Queue<T>> queueSupplier) {
if (batchSize <= 0) {
throw new IllegalArgumentException("batchSize > 0 required but it was " + batchSize);
}
this.source = Objects.requireNonNull(source, "source");
this.batchSize = batchSize;
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return Math.min(Integer.MAX_VALUE, batchSize); //FIXME should batchSize be forced to int?
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
@Override
public Iterator<T> iterator() {
SubscriberIterator<T> it = createIterator();
source.subscribe(it);
return it;
}
@Override
public Spliterator<T> spliterator() {
return stream().spliterator(); // cancellation should be composed through this way
}
Returns: a Stream
of unknown size with onClose attached to Subscription.cancel()
/**
* @return a {@link Stream} of unknown size with onClose attached to {@link
* Subscription#cancel()}
*/
public Stream<T> stream() {
SubscriberIterator<T> it = createIterator();
source.subscribe(it);
Spliterator<T> sp = Spliterators.spliteratorUnknownSize(it, 0);
return StreamSupport.stream(sp, false)
.onClose(it);
}
SubscriberIterator<T> createIterator() {
Queue<T> q;
try {
q = Objects.requireNonNull(queueSupplier.get(),
"The queueSupplier returned a null queue");
}
catch (Throwable e) {
throw Exceptions.propagate(e);
}
return new SubscriberIterator<>(q, batchSize);
}
static final class SubscriberIterator<T>
implements InnerConsumer<T>, Iterator<T>, Runnable {
final Queue<T> queue;
final int batchSize;
final int limit;
final Lock lock;
final Condition condition;
long produced;
volatile Subscription s;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<SubscriberIterator, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(SubscriberIterator.class,
Subscription.class,
"s");
volatile boolean done;
Throwable error;
SubscriberIterator(Queue<T> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
this.limit = Operators.unboundedOrLimit(batchSize);
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
}
@Override
public Context currentContext() {
return Context.empty();
}
@Override
public boolean hasNext() {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("Iterating over a toIterable() / toStream() is blocking, which is not supported in thread " + Thread.currentThread().getName());
}
for (; ; ) {
boolean d = done;
boolean empty = queue.isEmpty();
if (d) {
Throwable e = error;
if (e != null) {
throw Exceptions.propagate(e);
}
else if (empty) {
return false;
}
}
if (empty) {
lock.lock();
try {
while (!done && queue.isEmpty()) {
condition.await();
}
}
catch (InterruptedException ex) {
run();
throw Exceptions.propagate(ex);
}
finally {
lock.unlock();
}
}
else {
return true;
}
}
}
@Override
public T next() {
// hasNext will start by checking the thread, so `next()` would be rejected on a NONBLOCKING thread
if (hasNext()) {
T v = queue.poll();
if (v == null) {
run();
throw new IllegalStateException("Queue is empty: Expected one element to be available from the Reactive Streams source.");
}
long p = produced + 1;
if (p == limit) {
produced = 0;
s.request(p);
}
else {
produced = p;
}
return v;
}
throw new NoSuchElementException();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
s.request(Operators.unboundedOrPrefetch(batchSize));
}
}
@Override
public void onNext(T t) {
if (!queue.offer(t)) {
Operators.terminate(S, this);
onError(Operators.onOperatorError(null,
Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL),
t, currentContext()));
}
else {
signalConsumer();
}
}
@Override
public void onError(Throwable t) {
error = t;
done = true;
signalConsumer();
}
@Override
public void onComplete() {
done = true;
signalConsumer();
}
void signalConsumer() {
lock.lock();
try {
condition.signalAll();
}
finally {
lock.unlock();
}
}
@Override
public void run() {
Operators.terminate(S, this);
signalConsumer();
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;
if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
if (key == Attr.PREFETCH) return batchSize;
if (key == Attr.ERROR) return error;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
}
}