package io.reactivex.internal.operators.flowable;
import java.util.Collection;
import java.util.concurrent.Callable;
import org.reactivestreams.*;
import io.reactivex.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.*;
public final class FlowableToList<T, U extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, U> {
final Callable<U> collectionSupplier;
public FlowableToList(Flowable<T> source, Callable<U> collectionSupplier) {
super(source);
this.collectionSupplier = collectionSupplier;
}
@Override
protected void subscribeActual(Subscriber<? super U> s) {
U coll;
try {
coll = ObjectHelper.requireNonNull(collectionSupplier.call(), "The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
source.subscribe(new ToListSubscriber<T, U>(s, coll));
}
static final class ToListSubscriber<T, U extends Collection<? super T>>
extends DeferredScalarSubscription<U>
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -8134157938864266736L;
Subscription upstream;
ToListSubscriber(Subscriber<? super U> actual, U collection) {
super(actual);
this.value = collection;
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}
@Override
public void onNext(T t) {
U v = value;
if (v != null) {
v.add(t);
}
}
@Override
public void onError(Throwable t) {
value = null;
downstream.onError(t);
}
@Override
public void onComplete() {
complete(value);
}
@Override
public void cancel() {
super.cancel();
upstream.cancel();
}
}
}