/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.core.async.publisher;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.async.subscriber.Completable;
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.reflect.ClassUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Utilities for working with raw Publisher instances. Designed for internal use by Micronaut and not as a replacement for a reactive library such as RxJava, Reactor, Akka etc.
Author:Graeme Rocher
Since:1.0
/** * Utilities for working with raw {@link Publisher} instances. Designed for internal use by Micronaut and * not as a replacement for a reactive library such as RxJava, Reactor, Akka etc. * * @author Graeme Rocher * @since 1.0 */
@Internal public class Publishers { @SuppressWarnings("ConstantName") private static final List<Class<?>> REACTIVE_TYPES = new ArrayList<>(3); @SuppressWarnings("ConstantName") private static final List<Class<?>> SINGLE_TYPES = new ArrayList<>(3); private static final List<Class<?>> COMPLETABLE_TYPES = new ArrayList<>(3); static { ClassLoader classLoader = Publishers.class.getClassLoader(); Publishers.SINGLE_TYPES.add(CompletableFuturePublisher.class); Publishers.SINGLE_TYPES.add(JustPublisher.class); COMPLETABLE_TYPES.add(Completable.class); List<String> typeNames = Arrays.asList( "io.reactivex.Observable", "reactor.core.publisher.Flux", "kotlinx.coroutines.flow.Flow", "io.reactivex.rxjava3.core.Flowable", "io.reactivex.rxjava3.core.Observable" ); for (String name : typeNames) { Optional<Class> aClass = ClassUtils.forName(name, classLoader); aClass.ifPresent(Publishers.REACTIVE_TYPES::add); } for (String name : Arrays.asList( "io.reactivex.Single", "reactor.core.publisher.Mono", "io.reactivex.Maybe", "io.reactivex.rxjava3.core.Single", "io.reactivex.rxjava3.core.Maybe" )) { Optional<Class> aClass = ClassUtils.forName(name, classLoader); aClass.ifPresent(aClass1 -> { Publishers.SINGLE_TYPES.add(aClass1); Publishers.REACTIVE_TYPES.add(aClass1); }); } for (String name : Arrays.asList("io.reactivex.Completable", "io.reactivex.rxjava3.core.Completable")) { Optional<Class> aClass = ClassUtils.forName(name, classLoader); aClass.ifPresent(aClass1 -> { Publishers.COMPLETABLE_TYPES.add(aClass1); Publishers.REACTIVE_TYPES.add(aClass1); }); } }
Registers an additional reactive type. Should be called during application static initialization.
Params:
  • type – The type
Since:2.0
/** * Registers an additional reactive type. Should be called during application static initialization. * @param type The type * @since 2.0 */
public static void registerReactiveType(Class<?> type) { if (type != null) { REACTIVE_TYPES.add(type); } }
Registers an additional reactive single type. Should be called during application static initialization.
Params:
  • type – The type
Since:2.0
/** * Registers an additional reactive single type. Should be called during application static initialization. * @param type The type * @since 2.0 */
public static void registerReactiveSingle(Class<?> type) { if (type != null) { registerReactiveType(type); SINGLE_TYPES.add(type); } }
Registers an additional reactive completable type. Should be called during application static initialization.
Params:
  • type – The type
Since:2.0
/** * Registers an additional reactive completable type. Should be called during application static initialization. * @param type The type * @since 2.0 */
public static void registerReactiveCompletable(Class<?> type) { if (type != null) { registerReactiveType(type); COMPLETABLE_TYPES.add(type); } }
Returns:A list of known reactive types.
/** * @return A list of known reactive types. */
public static List<Class<?>> getKnownReactiveTypes() { return Collections.unmodifiableList(new ArrayList<>(REACTIVE_TYPES)); }
Build a Publisher from a CompletableFuture.
Params:
Type parameters:
  • <T> – The type of the publisher
Returns:The Publisher
/** * Build a {@link Publisher} from a {@link CompletableFuture}. * * @param futureSupplier The supplier of the {@link CompletableFuture} * @param <T> The type of the publisher * @return The {@link Publisher} */
public static <T> Publisher<T> fromCompletableFuture(Supplier<CompletableFuture<T>> futureSupplier) { return new CompletableFuturePublisher<>(futureSupplier); }
Build a Publisher from a CompletableFuture.
Params:
Type parameters:
  • <T> – The type of the publisher
Returns:The Publisher
/** * Build a {@link Publisher} from a {@link CompletableFuture}. * * @param future The {@link CompletableFuture} * @param <T> The type of the publisher * @return The {@link Publisher} */
public static <T> Publisher<T> fromCompletableFuture(CompletableFuture<T> future) { return new CompletableFuturePublisher<>(() -> future); }
A Publisher that emits a fixed single value.
Params:
  • value – The value to emit
Type parameters:
  • <T> – The value type
Returns:The Publisher
/** * A {@link Publisher} that emits a fixed single value. * * @param value The value to emit * @param <T> The value type * @return The {@link Publisher} */
public static <T> Publisher<T> just(T value) { return new JustPublisher<>(value); }
A Publisher that emits a fixed single value.
Params:
  • error – The error to emit
Type parameters:
  • <T> – The value type
Returns:The Publisher
/** * A {@link Publisher} that emits a fixed single value. * * @param error The error to emit * @param <T> The value type * @return The {@link Publisher} */
public static <T> Publisher<T> just(Throwable error) { return new JustThrowPublisher<>(error); }
A Publisher that completes without emitting any items.
Type parameters:
  • <T> – The value type
Returns:The Publisher
Since:2.0.0
/** * A {@link Publisher} that completes without emitting any items. * * @param <T> The value type * @return The {@link Publisher} * @since 2.0.0 */
public static <T> Publisher<T> empty() { return new JustCompletePublisher<>(); }
Map the result from a publisher using the given mapper.
Params:
  • publisher – The publisher
  • mapper – The mapper
Type parameters:
  • <T> – The generic type
  • <R> – The result type
Returns:The mapped publisher
/** * Map the result from a publisher using the given mapper. * * @param publisher The publisher * @param mapper The mapper * @param <T> The generic type * @param <R> The result type * @return The mapped publisher */
public static <T, R> Publisher<R> map(Publisher<T> publisher, Function<T, R> mapper) { return (MicronautPublisher<R>) actual -> publisher.subscribe(new CompletionAwareSubscriber<T>() { @Override protected void doOnSubscribe(Subscription subscription) { actual.onSubscribe(subscription); } @Override protected void doOnNext(T message) { try { R result = Objects.requireNonNull(mapper.apply(message), "The mapper returned a null value."); actual.onNext(result); } catch (Throwable e) { onError(e); } } @Override protected void doOnError(Throwable t) { actual.onError(t); } @Override protected void doOnComplete() { actual.onComplete(); } }); }
Map the result from a publisher using the given mapper.
Params:
  • publisher – The publisher
  • consumer – The mapper
Type parameters:
  • <T> – The generic type
Returns:The mapped publisher
/** * Map the result from a publisher using the given mapper. * * @param publisher The publisher * @param consumer The mapper * @param <T> The generic type * @return The mapped publisher */
public static <T> Publisher<T> then(Publisher<T> publisher, Consumer<T> consumer) { return (MicronautPublisher<T>) actual -> publisher.subscribe(new CompletionAwareSubscriber<T>() { @Override protected void doOnSubscribe(Subscription subscription) { actual.onSubscribe(subscription); } @Override protected void doOnNext(T message) { try { actual.onNext(message); consumer.accept(message); } catch (Throwable e) { onError(e); } } @Override protected void doOnError(Throwable t) { actual.onError(t); } @Override protected void doOnComplete() { actual.onComplete(); } }); }
Allow executing logic on completion of a Publisher.
Params:
  • publisher – The publisher
  • future – The runnable
Type parameters:
  • <T> – The generic type
Returns:The mapped publisher
/** * Allow executing logic on completion of a Publisher. * * @param publisher The publisher * @param future The runnable * @param <T> The generic type * @return The mapped publisher */
public static <T> Publisher<T> onComplete(Publisher<T> publisher, Supplier<CompletableFuture<Void>> future) { return (MicronautPublisher<T>) actual -> publisher.subscribe(new CompletionAwareSubscriber<T>() { @Override protected void doOnSubscribe(Subscription subscription) { actual.onSubscribe(subscription); } @Override protected void doOnNext(T message) { try { actual.onNext(message); } catch (Throwable e) { onError(e); } } @Override protected void doOnError(Throwable t) { actual.onError(t); } @Override protected void doOnComplete() { future.get().whenComplete((aVoid, throwable) -> { if (throwable != null) { actual.onError(throwable); } else { actual.onComplete(); } }); } }); }
Is the given type a Publisher or convertible to a publisher.
Params:
  • type – The type to check
Returns:True if it is
/** * Is the given type a Publisher or convertible to a publisher. * * @param type The type to check * @return True if it is */
public static boolean isConvertibleToPublisher(Class<?> type) { if (Publisher.class.isAssignableFrom(type)) { return true; } else { for (Class<?> reactiveType : REACTIVE_TYPES) { if (reactiveType.isAssignableFrom(type)) { return true; } } return false; } }
Is the given object a Publisher or convertible to a publisher.
Params:
  • object – The object
Returns:True if it is
/** * Is the given object a Publisher or convertible to a publisher. * * @param object The object * @return True if it is */
public static boolean isConvertibleToPublisher(Object object) { if (object == null) { return false; } if (object instanceof Publisher) { return true; } else { return isConvertibleToPublisher(object.getClass()); } }
Attempts to convert the publisher to the given type.
Params:
  • object – The object to convert
  • publisherType – The publisher type
Type parameters:
  • <T> – The generic type
Returns:The Resulting in publisher
/** * Attempts to convert the publisher to the given type. * * @param object The object to convert * @param publisherType The publisher type * @param <T> The generic type * @return The Resulting in publisher */
public static <T> T convertPublisher(Object object, Class<T> publisherType) { Objects.requireNonNull(object, "Argument [object] cannot be null"); Objects.requireNonNull(publisherType, "Argument [publisherType] cannot be null"); if (publisherType.isInstance(object)) { return (T) object; } if (object instanceof CompletableFuture) { @SuppressWarnings("unchecked") Publisher<T> futurePublisher = Publishers.fromCompletableFuture(() -> ((CompletableFuture) object)); return ConversionService.SHARED.convert(futurePublisher, publisherType) .orElseThrow(() -> unconvertibleError(object, publisherType)); } else if (object instanceof MicronautPublisher && MicronautPublisher.class.isAssignableFrom(publisherType)) { return (T) object; } else { return ConversionService.SHARED.convert(object, publisherType) .orElseThrow(() -> unconvertibleError(object, publisherType)); } }
Does the given reactive type emit a single result.
Params:
  • type – The type
Returns:True it does
/** * Does the given reactive type emit a single result. * * @param type The type * @return True it does */
public static boolean isSingle(Class<?> type) { for (Class<?> reactiveType : SINGLE_TYPES) { if (reactiveType.isAssignableFrom(type)) { return true; } } return false; }
Does the given reactive type emit a single result.
Params:
  • type – The type
Returns:True it does
/** * Does the given reactive type emit a single result. * * @param type The type * @return True it does */
public static boolean isCompletable(Class<?> type) { for (Class<?> reactiveType : COMPLETABLE_TYPES) { if (reactiveType.isAssignableFrom(type)) { return true; } } return false; } private static <T> IllegalArgumentException unconvertibleError(Object object, Class<T> publisherType) { return new IllegalArgumentException("Cannot convert reactive type [" + object.getClass() + "] to type [" + publisherType + "]. Ensure that you have the necessary Reactive module on your classpath. For example for Reactor you should have 'micronaut-reactor'."); }
Marker interface for any micronaut produced publishers.
Type parameters:
  • <T> – The generic type
Since:2.0.2
/** * Marker interface for any micronaut produced publishers. * * @param <T> The generic type * @since 2.0.2 */
@SuppressWarnings("ReactiveStreamsPublisherImplementation") public interface MicronautPublisher<T> extends Publisher<T> { }
A publisher for a value.
Type parameters:
  • <T> – The type
/** * A publisher for a value. * * @param <T> The type */
private static class JustPublisher<T> implements MicronautPublisher<T> { private final T value; public JustPublisher(T value) { this.value = value; } @Override public void subscribe(Subscriber<? super T> subscriber) { subscriber.onSubscribe(new Subscription() { boolean done; @Override public void request(long n) { if (done) { return; } done = true; if (value != null) { subscriber.onNext(value); } subscriber.onComplete(); } @Override public void cancel() { done = true; } }); } }
A publisher that throws an error.
Type parameters:
  • <T> – The type
/** * A publisher that throws an error. * * @param <T> The type */
private static class JustThrowPublisher<T> implements MicronautPublisher<T> { private final Throwable error; public JustThrowPublisher(Throwable error) { this.error = error; } @Override public void subscribe(Subscriber<? super T> subscriber) { subscriber.onSubscribe(new Subscription() { boolean done; @Override public void request(long n) { if (done) { return; } done = true; subscriber.onError(error); } @Override public void cancel() { done = true; } }); } }
A publisher that completes without emitting any items.
Type parameters:
  • <T> – The type
/** * A publisher that completes without emitting any items. * * @param <T> The type */
private static class JustCompletePublisher<T> implements MicronautPublisher<T> { @Override public void subscribe(Subscriber<? super T> subscriber) { subscriber.onSubscribe(new Subscription() { boolean done; @Override public void request(long n) { if (done) { return; } done = true; subscriber.onComplete(); } @Override public void cancel() { done = true; } }); } } }