package io.vertx.rx.java;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import rx.Observable;
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
public class ObservableReadStream<T, R> implements Observable.OnSubscribe<R> {
private static final Throwable COMPLETED_SENTINEL = new Throwable();
public static final long DEFAULT_MAX_BUFFER_SIZE = 256;
private final ReadStream<T> stream;
private final Function<T, R> adapter;
private final AtomicReference<Sub> subscription = new AtomicReference<>();
private boolean subscribed;
public ObservableReadStream(ReadStream<T> stream, Function<T, R> adapter) {
this(stream, adapter, DEFAULT_MAX_BUFFER_SIZE);
}
public ObservableReadStream(ReadStream<T> stream, Function<T, R> adapter, long maxBufferSize) {
stream.pause();
this.stream = stream;
this.adapter = adapter;
}
public long getRequested() {
Sub sub = subscription.get();
return sub != null ? sub.adapter.requested() : 0;
}
private class Sub implements Subscription, Producer {
private Adapter adapter;
Sub(Adapter queue) {
this.adapter = queue;
}
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("Cannot request negative items:" + n);
}
adapter.request(n);
}
@Override
public void unsubscribe() {
if (subscription.compareAndSet(this, null)) {
boolean resume;
synchronized (ObservableReadStream.this) {
resume = adapter.dispose();
}
RxHelper.setNullHandlers(stream);
if (resume) {
try {
stream.resume();
} catch (Exception ignore) {
}
}
subscribed = false;
}
}
@Override
public boolean isUnsubscribed() {
return subscription.get() != this;
}
}
@Override
public void call(Subscriber<? super R> subscriber) {
QueueAdapter adapter = new QueueAdapter(subscriber);
Sub sub = new Sub(adapter);
if (!subscription.compareAndSet(null, sub)) {
throw new IllegalStateException();
}
subscriber.setProducer(sub);
subscriber.add(sub);
stream.exceptionHandler(sub.adapter::end);
stream.endHandler(v -> sub.adapter.end(COMPLETED_SENTINEL));
stream.handler(sub.adapter);
subscribed = true;
long requested = adapter.requested();
stream.pause();
if (requested > 0L) {
stream.fetch(requested);
}
}
private abstract class Adapter implements Handler<T> {
protected final Subscriber<? super R> subscriber;
long requested;
Adapter(Subscriber<? super R> subscriber) {
this.subscriber = subscriber;
}
synchronized long requested() {
synchronized (ObservableReadStream.this) {
return requested;
}
}
void request(long n) {
synchronized (ObservableReadStream.this) {
if (n == Long.MAX_VALUE || (n >= Long.MAX_VALUE - requested)) {
requested = Long.MAX_VALUE;
} else {
requested += n;
}
}
}
abstract boolean dispose();
abstract void end(Throwable t);
}
private class QueueAdapter extends Adapter {
QueueAdapter(Subscriber<? super R> subscriber) {
super(subscriber);
}
@Override
boolean dispose() {
return true;
}
@Override
void end(Throwable t) {
if (t == COMPLETED_SENTINEL) {
subscriber.onCompleted();
} else {
subscriber.onError(t);
}
}
@Override
public void handle(T item) {
subscriber.onNext(adapter.apply(item));
}
@Override
void request(long n) {
super.request(n);
if (subscribed && n > 0L) {
stream.fetch(n);
}
}
}
}