package io.reactivex.internal.operators.observable;
import java.util.concurrent.atomic.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.util.AtomicThrowable;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.UnicastSubject;
public final class ObservableWindowBoundary<T, B> extends AbstractObservableWithUpstream<T, Observable<T>> {
final ObservableSource<B> other;
final int capacityHint;
public ObservableWindowBoundary(ObservableSource<T> source, ObservableSource<B> other, int capacityHint) {
super(source);
this.other = other;
this.capacityHint = capacityHint;
}
@Override
public void subscribeActual(Observer<? super Observable<T>> observer) {
WindowBoundaryMainObserver<T, B> parent = new WindowBoundaryMainObserver<T, B>(observer, capacityHint);
observer.onSubscribe(parent);
other.subscribe(parent.boundaryObserver);
source.subscribe(parent);
}
static final class WindowBoundaryMainObserver<T, B>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 2233020065421370272L;
final Observer<? super Observable<T>> downstream;
final int capacityHint;
final WindowBoundaryInnerObserver<T, B> boundaryObserver;
final AtomicReference<Disposable> upstream;
final AtomicInteger windows;
final MpscLinkedQueue<Object> queue;
final AtomicThrowable errors;
final AtomicBoolean stopWindows;
static final Object NEXT_WINDOW = new Object();
volatile boolean done;
UnicastSubject<T> window;
WindowBoundaryMainObserver(Observer<? super Observable<T>> downstream, int capacityHint) {
this.downstream = downstream;
this.capacityHint = capacityHint;
this.boundaryObserver = new WindowBoundaryInnerObserver<T, B>(this);
this.upstream = new AtomicReference<Disposable>();
this.windows = new AtomicInteger(1);
this.queue = new MpscLinkedQueue<Object>();
this.errors = new AtomicThrowable();
this.stopWindows = new AtomicBoolean();
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(upstream, d)) {
innerNext();
}
}
@Override
public void onNext(T t) {
queue.offer(t);
drain();
}
@Override
public void onError(Throwable e) {
boundaryObserver.dispose();
if (errors.addThrowable(e)) {
done = true;
drain();
} else {
RxJavaPlugins.onError(e);
}
}
@Override
public void onComplete() {
boundaryObserver.dispose();
done = true;
drain();
}
@Override
public void dispose() {
if (stopWindows.compareAndSet(false, true)) {
boundaryObserver.dispose();
if (windows.decrementAndGet() == 0) {
DisposableHelper.dispose(upstream);
}
}
}
@Override
public boolean isDisposed() {
return stopWindows.get();
}
@Override
public void run() {
if (windows.decrementAndGet() == 0) {
DisposableHelper.dispose(upstream);
}
}
void innerNext() {
queue.offer(NEXT_WINDOW);
drain();
}
void innerError(Throwable e) {
DisposableHelper.dispose(upstream);
if (errors.addThrowable(e)) {
done = true;
drain();
} else {
RxJavaPlugins.onError(e);
}
}
void innerComplete() {
DisposableHelper.dispose(upstream);
done = true;
drain();
}
@SuppressWarnings("unchecked")
void drain() {
if (getAndIncrement() != 0) {
return;
}
int missed = 1;
Observer<? super Observable<T>> downstream = this.downstream;
MpscLinkedQueue<Object> queue = this.queue;
AtomicThrowable errors = this.errors;
for (;;) {
for (;;) {
if (windows.get() == 0) {
queue.clear();
window = null;
return;
}
UnicastSubject<T> w = window;
boolean d = done;
if (d && errors.get() != null) {
queue.clear();
Throwable ex = errors.terminate();
if (w != null) {
window = null;
w.onError(ex);
}
downstream.onError(ex);
return;
}
Object v = queue.poll();
boolean empty = v == null;
if (d && empty) {
Throwable ex = errors.terminate();
if (ex == null) {
if (w != null) {
window = null;
w.onComplete();
}
downstream.onComplete();
} else {
if (w != null) {
window = null;
w.onError(ex);
}
downstream.onError(ex);
}
return;
}
if (empty) {
break;
}
if (v != NEXT_WINDOW) {
w.onNext((T)v);
continue;
}
if (w != null) {
window = null;
w.onComplete();
}
if (!stopWindows.get()) {
w = UnicastSubject.create(capacityHint, this);
window = w;
windows.getAndIncrement();
downstream.onNext(w);
}
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
static final class WindowBoundaryInnerObserver<T, B> extends DisposableObserver<B> {
final WindowBoundaryMainObserver<T, B> parent;
boolean done;
WindowBoundaryInnerObserver(WindowBoundaryMainObserver<T, B> parent) {
this.parent = parent;
}
@Override
public void onNext(B t) {
if (done) {
return;
}
parent.innerNext();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
parent.innerError(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
parent.innerComplete();
}
}
}