package reactor.core.publisher;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
final class MonoCollectList<T, C extends Collection<? super T>>
extends MonoFromFluxOperator<T, C>
implements Fuseable {
final Supplier<C> collectionSupplier;
MonoCollectList(Flux<? extends T> source,
Supplier<C> collectionSupplier) {
super(source);
this.collectionSupplier = collectionSupplier;
}
@Override
public void subscribe(CoreSubscriber<? super C> actual) {
C collection;
try {
collection = Objects.requireNonNull(collectionSupplier.get(),
"The collectionSupplier returned a null collection");
}
catch (Throwable ex) {
Operators.error(actual, Operators.onOperatorError(ex, actual.currentContext()));
return;
}
source.subscribe(new MonoBufferAllSubscriber<>(actual, collection));
}
static final class MonoBufferAllSubscriber<T, C extends Collection<? super T>>
extends Operators.MonoSubscriber<T, C> {
C collection;
Subscription s;
MonoBufferAllSubscriber(CoreSubscriber<? super C> actual, C collection) {
super(actual);
this.collection = collection;
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.TERMINATED) return collection == null;
return super.scanUnsafe(key);
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}
@Override
public void onNext(T t) {
collection.add(t);
}
@Override
public void onError(Throwable t) {
C c = collection;
if(c == null){
return;
}
collection = null;
Operators.onDiscardMultiple(c, currentContext());
actual.onError(t);
}
@Override
public void onComplete() {
C c = collection;
if(c == null){
return;
}
collection = null;
complete(c);
}
@Override
public void cancel() {
Operators.onDiscardMultiple(collection, currentContext());
super.cancel();
s.cancel();
}
}
}