package io.reactivex.internal.operators.observable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.Scheduler.Worker;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.observers.QueueDrainObserver;
import io.reactivex.internal.queue.MpscLinkedQueue;
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.observers.SerializedObserver;
import io.reactivex.subjects.UnicastSubject;
public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public ObservableWindowTimed(
ObservableSource<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
int bufferSize, boolean restartTimerOnMaxSize) {
super(source);
this.timespan = timespan;
this.timeskip = timeskip;
this.unit = unit;
this.scheduler = scheduler;
this.maxSize = maxSize;
this.bufferSize = bufferSize;
this.restartTimerOnMaxSize = restartTimerOnMaxSize;
}
@Override
public void subscribeActual(Observer<? super Observable<T>> t) {
SerializedObserver<Observable<T>> actual = new SerializedObserver<Observable<T>>(t);
if (timespan == timeskip) {
if (maxSize == Long.MAX_VALUE) {
source.subscribe(new WindowExactUnboundedObserver<T>(
actual,
timespan, unit, scheduler, bufferSize));
return;
}
source.subscribe(new WindowExactBoundedObserver<T>(
actual,
timespan, unit, scheduler,
bufferSize, maxSize, restartTimerOnMaxSize));
return;
}
source.subscribe(new WindowSkipObserver<T>(actual,
timespan, timeskip, unit, scheduler.createWorker(), bufferSize));
}
static final class WindowExactUnboundedObserver<T>
extends QueueDrainObserver<T, Object, Observable<T>>
implements Observer<T>, Disposable, Runnable {
final long timespan;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
Disposable upstream;
UnicastSubject<T> window;
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
static final Object NEXT = new Object();
volatile boolean terminated;
WindowExactUnboundedObserver(Observer<? super Observable<T>> actual, long timespan, TimeUnit unit,
Scheduler scheduler, int bufferSize) {
super(actual, new MpscLinkedQueue<Object>());
this.timespan = timespan;
this.unit = unit;
this.scheduler = scheduler;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
window = UnicastSubject.<T>create(bufferSize);
Observer<? super Observable<T>> a = downstream;
a.onSubscribe(this);
a.onNext(window);
if (!cancelled) {
Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit);
DisposableHelper.replace(timer, task);
}
}
}
@Override
public void onNext(T t) {
if (terminated) {
return;
}
if (fastEnter()) {
window.onNext(t);
if (leave(-1) == 0) {
return;
}
} else {
queue.offer(NotificationLite.next(t));
if (!enter()) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
error = t;
done = true;
if (enter()) {
drainLoop();
}
disposeTimer();
downstream.onError(t);
}
@Override
public void onComplete() {
done = true;
if (enter()) {
drainLoop();
}
disposeTimer();
downstream.onComplete();
}
@Override
public void dispose() {
cancelled = true;
}
@Override
public boolean isDisposed() {
return cancelled;
}
void disposeTimer() {
DisposableHelper.dispose(timer);
}
@Override
public void run() {
if (cancelled) {
terminated = true;
disposeTimer();
}
queue.offer(NEXT);
if (enter()) {
drainLoop();
}
}
void drainLoop() {
final MpscLinkedQueue<Object> q = (MpscLinkedQueue<Object>)queue;
final Observer<? super Observable<T>> a = downstream;
UnicastSubject<T> w = window;
int missed = 1;
for (;;) {
for (;;) {
boolean term = terminated;
boolean d = done;
Object o = q.poll();
if (d && (o == null || o == NEXT)) {
window = null;
q.clear();
disposeTimer();
Throwable err = error;
if (err != null) {
w.onError(err);
} else {
w.onComplete();
}
return;
}
if (o == null) {
break;
}
if (o == NEXT) {
w.onComplete();
if (!term) {
w = UnicastSubject.create(bufferSize);
window = w;
a.onNext(w);
} else {
upstream.dispose();
}
continue;
}
w.onNext(NotificationLite.<T>getValue(o));
}
missed = leave(-missed);
if (missed == 0) {
break;
}
}
}
}
static final class WindowExactBoundedObserver<T>
extends QueueDrainObserver<T, Object, Observable<T>>
implements Disposable {
final long timespan;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean restartTimerOnMaxSize;
final long maxSize;
final Scheduler.Worker worker;
long count;
long producerIndex;
Disposable upstream;
UnicastSubject<T> window;
volatile boolean terminated;
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
WindowExactBoundedObserver(
Observer<? super Observable<T>> actual,
long timespan, TimeUnit unit, Scheduler scheduler,
int bufferSize, long maxSize, boolean restartTimerOnMaxSize) {
super(actual, new MpscLinkedQueue<Object>());
this.timespan = timespan;
this.unit = unit;
this.scheduler = scheduler;
this.bufferSize = bufferSize;
this.maxSize = maxSize;
this.restartTimerOnMaxSize = restartTimerOnMaxSize;
if (restartTimerOnMaxSize) {
worker = scheduler.createWorker();
} else {
worker = null;
}
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
Observer<? super Observable<T>> a = downstream;
a.onSubscribe(this);
if (cancelled) {
return;
}
UnicastSubject<T> w = UnicastSubject.create(bufferSize);
window = w;
a.onNext(w);
Disposable task;
ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this);
if (restartTimerOnMaxSize) {
task = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit);
} else {
task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
}
DisposableHelper.replace(timer, task);
}
}
@Override
public void onNext(T t) {
if (terminated) {
return;
}
if (fastEnter()) {
UnicastSubject<T> w = window;
w.onNext(t);
long c = count + 1;
if (c >= maxSize) {
producerIndex++;
count = 0;
w.onComplete();
w = UnicastSubject.create(bufferSize);
window = w;
downstream.onNext(w);
if (restartTimerOnMaxSize) {
Disposable tm = timer.get();
tm.dispose();
Disposable task = worker.schedulePeriodically(
new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit);
DisposableHelper.replace(timer, task);
}
} else {
count = c;
}
if (leave(-1) == 0) {
return;
}
} else {
queue.offer(NotificationLite.next(t));
if (!enter()) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
error = t;
done = true;
if (enter()) {
drainLoop();
}
downstream.onError(t);
disposeTimer();
}
@Override
public void onComplete() {
done = true;
if (enter()) {
drainLoop();
}
downstream.onComplete();
disposeTimer();
}
@Override
public void dispose() {
cancelled = true;
}
@Override
public boolean isDisposed() {
return cancelled;
}
void disposeTimer() {
DisposableHelper.dispose(timer);
Worker w = worker;
if (w != null) {
w.dispose();
}
}
void drainLoop() {
final MpscLinkedQueue<Object> q = (MpscLinkedQueue<Object>)queue;
final Observer<? super Observable<T>> a = downstream;
UnicastSubject<T> w = window;
int missed = 1;
for (;;) {
for (;;) {
if (terminated) {
upstream.dispose();
q.clear();
disposeTimer();
return;
}
boolean d = done;
Object o = q.poll();
boolean empty = o == null;
boolean isHolder = o instanceof ConsumerIndexHolder;
if (d && (empty || isHolder)) {
window = null;
q.clear();
disposeTimer();
Throwable err = error;
if (err != null) {
w.onError(err);
} else {
w.onComplete();
}
return;
}
if (empty) {
break;
}
if (isHolder) {
ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o;
if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) {
w.onComplete();
count = 0;
w = UnicastSubject.create(bufferSize);
window = w;
a.onNext(w);
}
continue;
}
w.onNext(NotificationLite.<T>getValue(o));
long c = count + 1;
if (c >= maxSize) {
producerIndex++;
count = 0;
w.onComplete();
w = UnicastSubject.create(bufferSize);
window = w;
downstream.onNext(w);
if (restartTimerOnMaxSize) {
Disposable tm = timer.get();
tm.dispose();
Disposable task = worker.schedulePeriodically(
new ConsumerIndexHolder(producerIndex, this), timespan, timespan, unit);
if (!timer.compareAndSet(tm, task)) {
task.dispose();
}
}
} else {
count = c;
}
}
missed = leave(-missed);
if (missed == 0) {
break;
}
}
}
static final class ConsumerIndexHolder implements Runnable {
final long index;
final WindowExactBoundedObserver<?> parent;
ConsumerIndexHolder(long index, WindowExactBoundedObserver<?> parent) {
this.index = index;
this.parent = parent;
}
@Override
public void run() {
WindowExactBoundedObserver<?> p = parent;
if (!p.cancelled) {
p.queue.offer(this);
} else {
p.terminated = true;
p.disposeTimer();
}
if (p.enter()) {
p.drainLoop();
}
}
}
}
static final class WindowSkipObserver<T>
extends QueueDrainObserver<T, Object, Observable<T>>
implements Disposable, Runnable {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler.Worker worker;
final int bufferSize;
final List<UnicastSubject<T>> windows;
Disposable upstream;
volatile boolean terminated;
WindowSkipObserver(Observer<? super Observable<T>> actual,
long timespan, long timeskip, TimeUnit unit,
Worker worker, int bufferSize) {
super(actual, new MpscLinkedQueue<Object>());
this.timespan = timespan;
this.timeskip = timeskip;
this.unit = unit;
this.worker = worker;
this.bufferSize = bufferSize;
this.windows = new LinkedList<UnicastSubject<T>>();
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
if (cancelled) {
return;
}
final UnicastSubject<T> w = UnicastSubject.create(bufferSize);
windows.add(w);
downstream.onNext(w);
worker.schedule(new CompletionTask(w), timespan, unit);
worker.schedulePeriodically(this, timeskip, timeskip, unit);
}
}
@Override
public void onNext(T t) {
if (fastEnter()) {
for (UnicastSubject<T> w : windows) {
w.onNext(t);
}
if (leave(-1) == 0) {
return;
}
} else {
queue.offer(t);
if (!enter()) {
return;
}
}
drainLoop();
}
@Override
public void onError(Throwable t) {
error = t;
done = true;
if (enter()) {
drainLoop();
}
downstream.onError(t);
disposeWorker();
}
@Override
public void onComplete() {
done = true;
if (enter()) {
drainLoop();
}
downstream.onComplete();
disposeWorker();
}
@Override
public void dispose() {
cancelled = true;
}
@Override
public boolean isDisposed() {
return cancelled;
}
void disposeWorker() {
worker.dispose();
}
void complete(UnicastSubject<T> w) {
queue.offer(new SubjectWork<T>(w, false));
if (enter()) {
drainLoop();
}
}
@SuppressWarnings("unchecked")
void drainLoop() {
final MpscLinkedQueue<Object> q = (MpscLinkedQueue<Object>)queue;
final Observer<? super Observable<T>> a = downstream;
final List<UnicastSubject<T>> ws = windows;
int missed = 1;
for (;;) {
for (;;) {
if (terminated) {
upstream.dispose();
disposeWorker();
q.clear();
ws.clear();
return;
}
boolean d = done;
Object v = q.poll();
boolean empty = v == null;
boolean sw = v instanceof SubjectWork;
if (d && (empty || sw)) {
q.clear();
Throwable e = error;
if (e != null) {
for (UnicastSubject<T> w : ws) {
w.onError(e);
}
} else {
for (UnicastSubject<T> w : ws) {
w.onComplete();
}
}
disposeWorker();
ws.clear();
return;
}
if (empty) {
break;
}
if (sw) {
SubjectWork<T> work = (SubjectWork<T>)v;
if (work.open) {
if (cancelled) {
continue;
}
final UnicastSubject<T> w = UnicastSubject.create(bufferSize);
ws.add(w);
a.onNext(w);
worker.schedule(new CompletionTask(w), timespan, unit);
} else {
ws.remove(work.w);
work.w.onComplete();
if (ws.isEmpty() && cancelled) {
terminated = true;
}
}
} else {
for (UnicastSubject<T> w : ws) {
w.onNext((T)v);
}
}
}
missed = leave(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
UnicastSubject<T> w = UnicastSubject.create(bufferSize);
SubjectWork<T> sw = new SubjectWork<T>(w, true);
if (!cancelled) {
queue.offer(sw);
}
if (enter()) {
drainLoop();
}
}
static final class SubjectWork<T> {
final UnicastSubject<T> w;
final boolean open;
SubjectWork(UnicastSubject<T> w, boolean open) {
this.w = w;
this.open = open;
}
}
final class CompletionTask implements Runnable {
private final UnicastSubject<T> w;
CompletionTask(UnicastSubject<T> w) {
this.w = w;
}
@Override
public void run() {
complete(w);
}
}
}
}