/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.core.async.publisher;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.function.Supplier;

A Publisher that uses an ExecutorService to emit a single result.
Author:Graeme Rocher
Type parameters:
  • <T> – The argument type
Since:1.0
/** * A {@link org.reactivestreams.Publisher} that uses an {@link ExecutorService} to emit a single result. * @param <T> The argument type * @author Graeme Rocher * @since 1.0 */
public class AsyncSingleResultPublisher<T> implements Publishers.MicronautPublisher<T> { private final ExecutorService executor; private final Supplier<T> supplier;
Constructor.
Params:
  • executor – executor
  • supplier – type of supplier
/** * Constructor. * @param executor executor * @param supplier type of supplier */
public AsyncSingleResultPublisher(ExecutorService executor, Supplier<T> supplier) { this.executor = executor; this.supplier = supplier; }
Constructor.
Params:
  • supplier – type of supplier
/** * Constructor. * @param supplier type of supplier */
public AsyncSingleResultPublisher(Supplier<T> supplier) { this(ForkJoinPool.commonPool(), supplier); } @Override public final void subscribe(Subscriber<? super T> subscriber) { Objects.requireNonNull(subscriber, "Subscriber cannot be null"); subscriber.onSubscribe(new ExecutorServiceSubscription<>(subscriber, supplier, executor)); }
Subscription class.
Type parameters:
  • <S> – type of subscriber
/** * Subscription class. * @param <S> type of subscriber */
static class ExecutorServiceSubscription<S> implements Subscription { private final Subscriber<? super S> subscriber; private final ExecutorService executor; private final Supplier<S> supplier; private Future<?> future; // to allow cancellation private boolean completed;
Constructor.
Params:
  • subscriber – subscriber
  • supplier – supplier
  • executor – executor
/** * Constructor. * @param subscriber subscriber * @param supplier supplier * @param executor executor */
ExecutorServiceSubscription(Subscriber<? super S> subscriber, Supplier<S> supplier, ExecutorService executor) { this.subscriber = subscriber; this.supplier = supplier; this.executor = executor; }
Request execution.
Params:
  • n – request number
/** * Request execution. * @param n request number */
@Override public synchronized void request(long n) { if (n != 0 && !completed) { completed = true; if (n < 0) { IllegalArgumentException ex = new IllegalArgumentException(); executor.execute(() -> subscriber.onError(ex)); } else { future = executor.submit(() -> { try { S value = supplier.get(); if (value != null) { subscriber.onNext(value); } subscriber.onComplete(); } catch (Exception e) { subscriber.onError(e); } }); } } }
Cancel.
/** * Cancel. */
@Override public synchronized void cancel() { completed = true; if (future != null) { future.cancel(false); } } } }