package io.vertx.reactivex.impl;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayDeque;
import java.util.function.Function;

An RxJava Subscriber that turns an Observable into a ReadStream.

The stream implements the pause() and resume() operation by maintaining a buffer of ReadStreamSubscriber<R,J>.BUFFER_SIZE elements between the Observable and the ReadStream.

When the subscriber is created it requests 0 elements to activate the subscriber's back-pressure. Setting the handler initially on the ReadStream triggers a request of ReadStreamSubscriber<R,J>.BUFFER_SIZE elements. When the item buffer is half empty, new elements are requested to fill the buffer back to ReadStreamSubscriber<R,J>.BUFFER_SIZE elements.

The #endHandler(Handler<Void>) is called when the Observable is completed or has failed and no pending elements, emitted before the completion or failure, are still in the buffer, i.e the handler is not called when the stream is paused.

Author:Julien Viet
/** * An RxJava {@code Subscriber} that turns an {@code Observable} into a {@link ReadStream}. * <p> * The stream implements the {@link #pause()} and {@link #resume()} operation by maintaining * a buffer of {@link #BUFFER_SIZE} elements between the {@code Observable} and the {@code ReadStream}. * <p> * When the subscriber is created it requests {@code 0} elements to activate the subscriber's back-pressure. * Setting the handler initially on the {@code ReadStream} triggers a request of {@link #BUFFER_SIZE} elements. * When the item buffer is half empty, new elements are requested to fill the buffer back to {@link #BUFFER_SIZE} * elements. * <p> * The {@link #endHandler(Handler<Void>)} is called when the {@code Observable} is completed or has failed and * no pending elements, emitted before the completion or failure, are still in the buffer, i.e the handler * is not called when the stream is paused. * * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> */
public class ReadStreamSubscriber<R, J> implements Subscriber<R>, ReadStream<J> { private static final Runnable NOOP_ACTION = () -> {}; private static final Throwable DONE_SENTINEL = new Throwable(); public static final int BUFFER_SIZE = 16; public static <R, J> ReadStream<J> asReadStream(Flowable<R> flowable, Function<R, J> adapter) { ReadStreamSubscriber<R, J> observer = new ReadStreamSubscriber<>(adapter); flowable.subscribe(observer); return observer; } public static <R, J> ReadStream<J> asReadStream(Observable<R> observable, Function<R, J> adapter) { return asReadStream(observable.toFlowable(BackpressureStrategy.BUFFER), adapter); } private final Function<R, J> adapter; private Handler<Void> endHandler; private Handler<Throwable> exceptionHandler; private Handler<J> elementHandler; private boolean paused = false; private Throwable completed; private ArrayDeque<R> pending = new ArrayDeque<>(); private int requested = 0; private Subscription subscription; public ReadStreamSubscriber(Function<R, J> adapter) { this.adapter = adapter; } @Override public ReadStream<J> handler(Handler<J> handler) { synchronized (this) { elementHandler = handler; } checkStatus(); return this; } @Override public ReadStream<J> pause() { synchronized (this) { paused = true; } return this; } @Override public ReadStream<J> fetch(long amount) { throw new UnsupportedOperationException("todo"); } @Override public ReadStream<J> resume() { synchronized (this) { paused = false; } checkStatus(); return this; } @Override public void onSubscribe(Subscription s) { synchronized (this) { subscription = s; } checkStatus(); } private void checkStatus() { Runnable action = NOOP_ACTION; while (true) { J adapted; Handler<J> handler; synchronized (this) { if (!paused && (handler = elementHandler) != null && pending.size() > 0) { requested--; R item = pending.poll(); adapted = adapter.apply(item); } else { if (completed != null) { if (pending.isEmpty()) { Handler<Throwable> onError; Throwable result; if (completed != DONE_SENTINEL) { onError = exceptionHandler; result = completed; exceptionHandler = null; } else { onError = null; result = null; } Handler<Void> onCompleted = endHandler; endHandler = null; action = () -> { try { if (onError != null) { onError.handle(result); } } finally { if (onCompleted != null) { onCompleted.handle(null); } } }; } } else if (elementHandler != null && requested < BUFFER_SIZE / 2) { int request = BUFFER_SIZE - requested; action = () -> subscription.request(request); requested = BUFFER_SIZE; } break; } } handler.handle(adapted); } action.run(); } @Override public ReadStream<J> endHandler(Handler<Void> handler) { synchronized (this) { if (completed == null || pending.size() > 0) { endHandler = handler; } else { if (handler != null) { throw new IllegalStateException(); } } } return this; } @Override public ReadStream<J> exceptionHandler(Handler<Throwable> handler) { synchronized (this) { if (completed == null || pending.size() > 0) { exceptionHandler = handler; } else { if (handler != null) { throw new IllegalStateException(); } } } return this; } @Override public void onComplete() { onError(DONE_SENTINEL); } @Override public void onError(Throwable e) { synchronized (this) { if (completed != null) { return; } completed = e; } checkStatus(); } @Override public void onNext(R item) { synchronized (this) { pending.add(item); } checkStatus(); } }