package io.vertx.reactivex.impl;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.plugins.RxJavaPlugins;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Author:Julien Viet
/** * @author <a href="mailto:julien@julienviet.com">Julien Viet</a> */
public class AsyncResultSingle<T> extends Single<T> { public static <T> Single<T> toSingle(Consumer<Handler<AsyncResult<T>>> subscriptionConsumer) { return RxJavaPlugins.onAssembly(new AsyncResultSingle<T>(subscriptionConsumer)); } private final Consumer<Handler<AsyncResult<T>>> subscriptionConsumer; public AsyncResultSingle(Consumer<Handler<AsyncResult<T>>> subscriptionConsumer) { this.subscriptionConsumer = subscriptionConsumer; } @Override protected void subscribeActual(@NonNull SingleObserver<? super T> observer) { AtomicBoolean disposed = new AtomicBoolean(); observer.onSubscribe(new Disposable() { @Override public void dispose() { disposed.set(true); } @Override public boolean isDisposed() { return disposed.get(); } }); if (!disposed.get()) { try { subscriptionConsumer.accept(ar -> { if (!disposed.getAndSet(true)) { if (ar.succeeded()) { try { observer.onSuccess(ar.result()); } catch (Throwable t) { Exceptions.throwIfFatal(t); RxJavaPlugins.onError(t); } } else if (ar.failed()) { try { observer.onError(ar.cause()); } catch (Throwable t) { Exceptions.throwIfFatal(t); RxJavaPlugins.onError(t); } } } }); } catch (Exception e) { if (!disposed.getAndSet(true)) { try { observer.onError(e); } catch (Throwable t) { Exceptions.throwIfFatal(t); RxJavaPlugins.onError(t); } } } } } }