package io.vertx.ext.mongo.impl;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import java.util.Objects;
public class PublisherAdapter<T> implements ReadStream<T> {
private enum State {
IDLE, STARTED, EXHAUSTED, STOPPED
}
private final ContextInternal context;
private final Publisher<T> publisher;
private final InboundBuffer<T> internalQueue;
private final int batchSize;
private State state;
private int inFlight;
private Handler<T> handler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private Subscription subscription;
public PublisherAdapter(ContextInternal context, Publisher<T> publisher, int batchSize) {
Objects.requireNonNull(context, "context is null");
Objects.requireNonNull(publisher, "publisher is null");
this.context = context;
this.publisher = publisher;
this.batchSize = batchSize > 0 ? batchSize : 256;
internalQueue = new InboundBuffer<T>(context);
state = State.IDLE;
}
@Override
public synchronized ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
if (state != State.STOPPED) {
exceptionHandler = handler;
}
return this;
}
@Override
public ReadStream<T> handler(Handler<T> handler) {
synchronized (this) {
if (state == State.STOPPED) {
return this;
}
}
if (handler == null) {
stop();
context.runOnContext(v -> handleEnd());
} else {
synchronized (this) {
this.handler = handler;
}
internalQueue.handler(this::handleOut);
boolean subscribe = false;
synchronized (this) {
if (state == State.IDLE) {
state = State.STARTED;
subscribe = true;
}
}
if (subscribe) {
publisher.subscribe(new Subscriber());
}
}
return this;
}
@Override
public ReadStream<T> pause() {
synchronized (this) {
if (state == State.STOPPED) {
return this;
}
}
internalQueue.pause();
return this;
}
@Override
public ReadStream<T> resume() {
synchronized (this) {
if (state == State.STOPPED) {
return this;
}
}
internalQueue.resume();
return this;
}
@Override
public synchronized ReadStream<T> fetch(long amount) {
synchronized (this) {
if (state == State.STOPPED) {
return this;
}
}
internalQueue.fetch(amount);
return this;
}
@Override
public synchronized ReadStream<T> endHandler(Handler<Void> endHandler) {
if (state != State.STOPPED) {
this.endHandler = endHandler;
}
return this;
}
private void handleIn(T item) {
synchronized (this) {
if (state == State.STOPPED) {
return;
}
inFlight++;
}
internalQueue.write(item);
}
private void handleOut(T item) {
synchronized (this) {
if (state == State.STOPPED) {
return;
}
inFlight--;
}
handler.handle(item);
State s;
synchronized (this) {
if (inFlight != 0) {
return;
}
s = state;
}
if (s == State.EXHAUSTED) {
stop();
handleEnd();
} else {
requestMore();
}
}
private void handleOnComplete() {
boolean stop;
synchronized (this) {
if (state == State.STOPPED) {
return;
}
state = State.EXHAUSTED;
stop = inFlight == 0;
}
if (stop) {
stop();
handleEnd();
}
}
private void handleException(Throwable cause) {
Handler<Throwable> h;
synchronized (this) {
h = state != State.STOPPED ? exceptionHandler : null;
}
if (h != null) {
stop();
h.handle(cause);
}
}
private void requestMore() {
Subscription s;
synchronized (this) {
if (state == State.STOPPED) {
return;
}
s = this.subscription;
}
s.request(batchSize);
}
private void handleEnd() {
Handler<Void> h;
synchronized (this) {
h = endHandler;
}
if (h != null) {
h.handle(null);
}
}
private void stop() {
Subscription s;
synchronized (this) {
state = State.STOPPED;
s = this.subscription;
}
internalQueue.handler(null).drainHandler(null);
if (s != null) {
s.cancel();
}
}
private class Subscriber implements org.reactivestreams.Subscriber<T> {
@Override
public void onSubscribe(Subscription subscription) {
context.runOnContext(v -> {
synchronized (PublisherAdapter.this) {
PublisherAdapter.this.subscription = subscription;
}
requestMore();
});
}
@Override
public void onNext(T t) {
context.runOnContext(v -> handleIn(t));
}
@Override
public void onError(Throwable t) {
context.runOnContext(v -> handleException(t));
}
@Override
public void onComplete() {
context.runOnContext(v -> handleOnComplete());
}
}
}