package io.vertx.reactivex.impl;

import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.internal.subscriptions.BasicIntQueueSubscription;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.processors.UnicastProcessor;
import io.vertx.core.streams.ReadStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

Author:Julien Viet
/** * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> */
public class FlowableReadStream<T, U> extends Flowable<U> { public static final long DEFAULT_MAX_BUFFER_SIZE = 256; private final ReadStream<T> stream; private final Function<T, U> f; private final AtomicReference<Subscription> current; public FlowableReadStream(ReadStream<T> stream, long maxBufferSize, Function<T, U> f) { stream.pause(); this.stream = stream; this.f = f; this.current = new AtomicReference<>(); } private void release() { Subscription sub = current.get(); if (sub != null) { if (current.compareAndSet(sub, null)) { try { stream.exceptionHandler(null); stream.endHandler(null); stream.handler(null); } catch (Exception ignore) { } finally { stream.resume(); } } } } @Override protected void subscribeActual(Subscriber<? super U> subscriber) { Subscription sub = new Subscription() { @Override public void request(long l) { if (current.get() == this) { stream.fetch(l); } } @Override public void cancel() { release(); } }; if (!current.compareAndSet(null, sub)) { EmptySubscription.error(new IllegalStateException("This processor allows only a single Subscriber"), subscriber); return; } stream.pause(); stream.endHandler(v -> { release(); subscriber.onComplete(); }); stream.exceptionHandler(err -> { release(); subscriber.onError(err); }); stream.handler(item -> { subscriber.onNext(f.apply(item)); }); subscriber.onSubscribe(sub); } }