package reactor.core.publisher;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
final class MonoCollectList<T> extends MonoFromFluxOperator<T, List<T>> implements Fuseable {
MonoCollectList(Flux<? extends T> source) {
super(source);
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super List<T>> actual) {
return new MonoCollectListSubscriber<>(actual);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
static final class MonoCollectListSubscriber<T> extends Operators.MonoSubscriber<T, List<T>> {
List<T> list;
Subscription s;
boolean done;
MonoCollectListSubscriber(CoreSubscriber<? super List<T>> actual) {
super(actual);
this.list = new ArrayList<>();
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.TERMINATED) return done;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
s.request(Long.MAX_VALUE);
}
}
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
List<T> l;
synchronized (this) {
l = list;
if (l != null) {
l.add(t);
return;
}
}
Operators.onDiscard(t, actual.currentContext());
}
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
List<T> l;
synchronized (this) {
l = list;
list = null;
}
discard(l);
actual.onError(t);
}
@Override
public void onComplete() {
if(done) {
return;
}
done = true;
List<T> l;
synchronized (this) {
l = list;
list = null;
}
if (l != null) {
complete(l);
}
}
@Override
protected void discard(List<T> v) {
Operators.onDiscardMultiple(v, actual.currentContext());
}
@Override
public void cancel() {
int state;
List<T> l;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state != CANCELLED) {
s.cancel();
}
if (state <= HAS_REQUEST_NO_VALUE) {
l = list;
this.value = null;
list = null;
}
else {
l = null;
}
}
if (l != null) {
discard(l);
}
}
}
}