package org.reactivestreams;
import java.util.concurrent.Flow;
import static java.util.Objects.requireNonNull;
public final class FlowAdapters {
private FlowAdapters() {
throw new IllegalStateException("No instances!");
}
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toPublisher(
Flow.Publisher<? extends T> flowPublisher) {
requireNonNull(flowPublisher, "flowPublisher");
final org.reactivestreams.Publisher<T> publisher;
if (flowPublisher instanceof FlowPublisherFromReactive) {
publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
} else if (flowPublisher instanceof org.reactivestreams.Publisher) {
publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
} else {
publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
}
return publisher;
}
@SuppressWarnings("unchecked")
public static <T> Flow.Publisher<T> toFlowPublisher(
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
) {
requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
final Flow.Publisher<T> flowPublisher;
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
} else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
} else {
flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
}
return flowPublisher;
}
@SuppressWarnings("unchecked")
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
Flow.Processor<? super T, ? extends U> flowProcessor
) {
requireNonNull(flowProcessor, "flowProcessor");
final org.reactivestreams.Processor<T, U> processor;
if (flowProcessor instanceof FlowToReactiveProcessor) {
processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
} else if (flowProcessor instanceof org.reactivestreams.Processor) {
processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
} else {
processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
}
return processor;
}
@SuppressWarnings("unchecked")
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
) {
requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
final Flow.Processor<T, U> flowProcessor;
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
} else if (reactiveStreamsProcessor instanceof Flow.Processor) {
flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
} else {
flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
}
return flowProcessor;
}
@SuppressWarnings("unchecked")
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
final Flow.Subscriber<T> flowSubscriber;
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
} else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
} else {
flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
}
return flowSubscriber;
}
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
requireNonNull(flowSubscriber, "flowSubscriber");
final org.reactivestreams.Subscriber<T> subscriber;
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
} else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
} else {
subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
}
return subscriber;
}
static final class FlowToReactiveSubscription implements Flow.Subscription {
final org.reactivestreams.Subscription reactiveStreams;
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
this.reactiveStreams = reactive;
}
@Override
public void request(long n) {
reactiveStreams.request(n);
}
@Override
public void cancel() {
reactiveStreams.cancel();
}
}
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
final Flow.Subscription flow;
public ReactiveToFlowSubscription(Flow.Subscription flow) {
this.flow = flow;
}
@Override
public void request(long n) {
flow.request(n);
}
@Override
public void cancel() {
flow.cancel();
}
}
static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
final org.reactivestreams.Subscriber<? super T> reactiveStreams;
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
this.reactiveStreams = reactive;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
}
@Override
public void onNext(T item) {
reactiveStreams.onNext(item);
}
@Override
public void onError(Throwable throwable) {
reactiveStreams.onError(throwable);
}
@Override
public void onComplete() {
reactiveStreams.onComplete();
}
}
static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
final Flow.Subscriber<? super T> flow;
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
this.flow = flow;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
}
@Override
public void onNext(T item) {
flow.onNext(item);
}
@Override
public void onError(Throwable throwable) {
flow.onError(throwable);
}
@Override
public void onComplete() {
flow.onComplete();
}
}
static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
final Flow.Processor<? super T, ? extends U> flow;
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
this.flow = flow;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
}
@Override
public void onNext(T t) {
flow.onNext(t);
}
@Override
public void onError(Throwable t) {
flow.onError(t);
}
@Override
public void onComplete() {
flow.onComplete();
}
@Override
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
}
}
static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
this.reactiveStreams = reactive;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
}
@Override
public void onNext(T t) {
reactiveStreams.onNext(t);
}
@Override
public void onError(Throwable t) {
reactiveStreams.onError(t);
}
@Override
public void onComplete() {
reactiveStreams.onComplete();
}
@Override
public void subscribe(Flow.Subscriber<? super U> s) {
reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
}
}
static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
final Flow.Publisher<? extends T> flow;
public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
this.flow = flowPublisher;
}
@Override
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
}
}
static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
final org.reactivestreams.Publisher<? extends T> reactiveStreams;
public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
this.reactiveStreams = reactivePublisher;
}
@Override
public void subscribe(Flow.Subscriber<? super T> flow) {
reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
}
}
}