package io.reactivex.internal.operators.flowable;
import java.util.*;
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.Flowable;
import io.reactivex.disposables.*;
import io.reactivex.exceptions.MissingBackpressureException;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.subscribers.QueueDrainSubscriber;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.UnicastProcessor;
import io.reactivex.subscribers.*;
public final class FlowableWindowBoundarySelector<T, B, V> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> close;
final int bufferSize;
public FlowableWindowBoundarySelector(
Flowable<T> source,
Publisher<B> open, Function<? super B, ? extends Publisher<V>> close,
int bufferSize) {
super(source);
this.open = open;
this.close = close;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Subscriber<? super Flowable<T>> s) {
source.subscribe(new WindowBoundaryMainSubscriber<T, B, V>(
new SerializedSubscriber<Flowable<T>>(s),
open, close, bufferSize));
}
static final class WindowBoundaryMainSubscriber<T, B, V>
extends QueueDrainSubscriber<T, Object, Flowable<T>>
implements Subscription {
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> close;
final int bufferSize;
final CompositeDisposable resources;
Subscription upstream;
final AtomicReference<Disposable> boundary = new AtomicReference<Disposable>();
final List<UnicastProcessor<T>> ws;
final AtomicLong windows = new AtomicLong();
final AtomicBoolean stopWindows = new AtomicBoolean();
WindowBoundaryMainSubscriber(Subscriber<? super Flowable<T>> actual,
Publisher<B> open, Function<? super B, ? extends Publisher<V>> close, int bufferSize) {
super(actual, new MpscLinkedQueue<Object>());
this.open = open;
this.close = close;
this.bufferSize = bufferSize;
this.resources = new CompositeDisposable();
this.ws = new ArrayList<UnicastProcessor<T>>();
windows.lazySet(1);
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
if (stopWindows.get()) {
return;
}
OperatorWindowBoundaryOpenSubscriber<T, B> os = new OperatorWindowBoundaryOpenSubscriber<T, B>(this);
if (boundary.compareAndSet(null, os)) {
s.request(Long.MAX_VALUE);
open.subscribe(os);
}
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (fastEnter()) {
for (UnicastProcessor<T> w : ws) {
w.onNext(t);
}
if (leave(-1) == 0) {
return;
}
} else {
queue.offer(NotificationLite.next(t));
if (!enter()) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
if (enter()) {
drainLoop();
}
if (windows.decrementAndGet() == 0) {
resources.dispose();
}
downstream.onError(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
if (enter()) {
drainLoop();
}
if (windows.decrementAndGet() == 0) {
resources.dispose();
}
downstream.onComplete();
}
void error(Throwable t) {
upstream.cancel();
resources.dispose();
DisposableHelper.dispose(boundary);
downstream.onError(t);
}
@Override
public void request(long n) {
requested(n);
}
@Override
public void cancel() {
if (stopWindows.compareAndSet(false, true)) {
DisposableHelper.dispose(boundary);
if (windows.decrementAndGet() == 0) {
upstream.cancel();
}
}
}
void dispose() {
resources.dispose();
DisposableHelper.dispose(boundary);
}
void drainLoop() {
final SimplePlainQueue<Object> q = queue;
final Subscriber<? super Flowable<T>> a = downstream;
final List<UnicastProcessor<T>> ws = this.ws;
int missed = 1;
for (;;) {
for (;;) {
boolean d = done;
Object o = q.poll();
boolean empty = o == null;
if (d && empty) {
dispose();
Throwable e = error;
if (e != null) {
for (UnicastProcessor<T> w : ws) {
w.onError(e);
}
} else {
for (UnicastProcessor<T> w : ws) {
w.onComplete();
}
}
ws.clear();
return;
}
if (empty) {
break;
}
if (o instanceof WindowOperation) {
@SuppressWarnings("unchecked")
WindowOperation<T, B> wo = (WindowOperation<T, B>) o;
UnicastProcessor<T> w = wo.w;
if (w != null) {
if (ws.remove(wo.w)) {
wo.w.onComplete();
if (windows.decrementAndGet() == 0) {
dispose();
return;
}
}
continue;
}
if (stopWindows.get()) {
continue;
}
w = UnicastProcessor.<T>create(bufferSize);
long r = requested();
if (r != 0L) {
ws.add(w);
a.onNext(w);
if (r != Long.MAX_VALUE) {
produced(1);
}
} else {
cancel();
a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests"));
continue;
}
Publisher<V> p;
try {
p = ObjectHelper.requireNonNull(close.apply(wo.open), "The publisher supplied is null");
} catch (Throwable e) {
cancel();
a.onError(e);
continue;
}
OperatorWindowBoundaryCloseSubscriber<T, V> cl = new OperatorWindowBoundaryCloseSubscriber<T, V>(this, w);
if (resources.add(cl)) {
windows.getAndIncrement();
p.subscribe(cl);
}
continue;
}
for (UnicastProcessor<T> w : ws) {
w.onNext(NotificationLite.<T>getValue(o));
}
}
missed = leave(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public boolean accept(Subscriber<? super Flowable<T>> a, Object v) {
return false;
}
void open(B b) {
queue.offer(new WindowOperation<T, B>(null, b));
if (enter()) {
drainLoop();
}
}
void close(OperatorWindowBoundaryCloseSubscriber<T, V> w) {
resources.delete(w);
queue.offer(new WindowOperation<T, B>(w.w, null));
if (enter()) {
drainLoop();
}
}
}
static final class WindowOperation<T, B> {
final UnicastProcessor<T> w;
final B open;
WindowOperation(UnicastProcessor<T> w, B open) {
this.w = w;
this.open = open;
}
}
static final class OperatorWindowBoundaryOpenSubscriber<T, B> extends DisposableSubscriber<B> {
final WindowBoundaryMainSubscriber<T, B, ?> parent;
OperatorWindowBoundaryOpenSubscriber(WindowBoundaryMainSubscriber<T, B, ?> parent) {
this.parent = parent;
}
@Override
public void onNext(B t) {
parent.open(t);
}
@Override
public void onError(Throwable t) {
parent.error(t);
}
@Override
public void onComplete() {
parent.onComplete();
}
}
static final class OperatorWindowBoundaryCloseSubscriber<T, V> extends DisposableSubscriber<V> {
final WindowBoundaryMainSubscriber<T, ?, V> parent;
final UnicastProcessor<T> w;
boolean done;
OperatorWindowBoundaryCloseSubscriber(WindowBoundaryMainSubscriber<T, ?, V> parent, UnicastProcessor<T> w) {
this.parent = parent;
this.w = w;
}
@Override
public void onNext(V t) {
cancel();
onComplete();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
parent.error(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
parent.close(this);
}
}
}