/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.FluxConcatMap.ErrorMode;
import reactor.core.publisher.FluxOnAssembly.AssemblyLightSnapshot;
import reactor.core.publisher.FluxOnAssembly.AssemblySnapshot;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
A ParallelFlux publishes to an array of Subscribers, in parallel 'rails' (or 'groups'
). Use from
to start processing a regular Publisher in 'rails', which each cover a subset of the original Publisher's data. Flux.parallel()
is a convenient shortcut to achieve that on a Flux
.
Use runOn
to introduce where each 'rail' should run on thread-wise.
Use sequential
to merge the sources back into a single Flux
.
Use then
to listen for all rails termination in the produced Mono
subscribe(Subscriber)
if you simply want to subscribe to the merged sequence. Note that other variants like subscribe(Consumer)
instead do multiple subscribes, one on each rail (which means that the lambdas should be as stateless and side-effect free as possible).
Type parameters: - <T> – the value type
/**
* A ParallelFlux publishes to an array of Subscribers, in parallel 'rails' (or
* {@link #groups() 'groups'}).
* <p>
* Use {@link #from} to start processing a regular Publisher in 'rails', which each
* cover a subset of the original Publisher's data. {@link Flux#parallel()} is a
* convenient shortcut to achieve that on a {@link Flux}.
* <p>
* Use {@link #runOn} to introduce where each 'rail' should run on thread-wise.
* <p>
* Use {@link #sequential} to merge the sources back into a single {@link Flux}.
* <p>
* Use {@link #then} to listen for all rails termination in the produced {@link Mono}
* <p>
* {@link #subscribe(Subscriber)} if you simply want to subscribe to the merged sequence.
* Note that other variants like {@link #subscribe(Consumer)} instead do multiple
* subscribes, one on each rail (which means that the lambdas should be as stateless and
* side-effect free as possible).
*
*
* @param <T> the value type
*/
public abstract class ParallelFlux<T> implements CorePublisher<T> {
Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core) in a round-robin fashion. Equivalent to Flux.parallel
. Params: - source – the source Publisher
Type parameters: - <T> – the value type
Returns: the ParallelFlux
instance
/**
* Take a Publisher and prepare to consume it on multiple 'rails' (one per CPU core)
* in a round-robin fashion. Equivalent to {@link Flux#parallel}.
*
* @param <T> the value type
* @param source the source Publisher
*
* @return the {@link ParallelFlux} instance
*/
public static <T> ParallelFlux<T> from(Publisher<? extends T> source) {
return from(source, Schedulers.DEFAULT_POOL_SIZE, Queues.SMALL_BUFFER_SIZE,
Queues.small());
}
Take a Publisher and prepare to consume it on parallelism
number of 'rails', possibly ordered and in a round-robin fashion. Params: - source – the source Publisher
- parallelism – the number of parallel rails
Type parameters: - <T> – the value type
Returns: the new ParallelFlux
instance
/**
* Take a Publisher and prepare to consume it on {@code parallelism} number of 'rails',
* possibly ordered and in a round-robin fashion.
*
* @param <T> the value type
* @param source the source Publisher
* @param parallelism the number of parallel rails
*
* @return the new {@link ParallelFlux} instance
*/
public static <T> ParallelFlux<T> from(Publisher<? extends T> source,
int parallelism) {
return from(source,
parallelism, Queues.SMALL_BUFFER_SIZE,
Queues.small());
}
Take a Publisher and prepare to consume it on parallelism
number of 'rails' and in a round-robin fashion and use custom prefetch amount and queue for dealing with the source Publisher's values. Params: - source – the source Publisher
- parallelism – the number of parallel rails
- prefetch – the number of values to prefetch from the source
- queueSupplier – the queue structure supplier to hold the prefetched values
from the source until there is a rail ready to process it.
Type parameters: - <T> – the value type
Returns: the new ParallelFlux
instance
/**
* Take a Publisher and prepare to consume it on {@code parallelism} number of 'rails'
* and in a round-robin fashion and use custom prefetch amount and queue
* for dealing with the source Publisher's values.
*
* @param <T> the value type
* @param source the source Publisher
* @param parallelism the number of parallel rails
* @param prefetch the number of values to prefetch from the source
* @param queueSupplier the queue structure supplier to hold the prefetched values
* from the source until there is a rail ready to process it.
*
* @return the new {@link ParallelFlux} instance
*/
public static <T> ParallelFlux<T> from(Publisher<? extends T> source,
int parallelism,
int prefetch,
Supplier<Queue<T>> queueSupplier) {
Objects.requireNonNull(queueSupplier, "queueSupplier");
Objects.requireNonNull(source, "source");
return onAssembly(new ParallelSource<>(source,
parallelism,
prefetch, queueSupplier));
}
Wraps multiple Publishers into a ParallelFlux
which runs them in parallel and unordered. Params: - publishers – the array of publishers
Type parameters: - <T> – the value type
Returns: the new ParallelFlux
instance
/**
* Wraps multiple Publishers into a {@link ParallelFlux} which runs them in parallel and
* unordered.
*
* @param <T> the value type
* @param publishers the array of publishers
*
* @return the new {@link ParallelFlux} instance
*/
@SafeVarargs
public static <T> ParallelFlux<T> from(Publisher<T>... publishers) {
return onAssembly(new ParallelArraySource<>(publishers));
}
Perform a fluent transformation to a value via a converter function which receives
this ParallelFlux.
Params: - converter – the converter function from
ParallelFlux
to some type
Type parameters: - <U> – the output value type
Returns: the value returned by the converter function
/**
* Perform a fluent transformation to a value via a converter function which receives
* this ParallelFlux.
*
* @param <U> the output value type
* @param converter the converter function from {@link ParallelFlux} to some type
*
* @return the value returned by the converter function
*/
public final <U> U as(Function<? super ParallelFlux<T>, U> converter) {
return converter.apply(this);
}
Activate traceback (full assembly tracing) for this particular ParallelFlux
, in case of an error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace creation.
It should be placed towards the end of the reactive chain, as errors
triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
. As such, if the error is a composite one
, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable)
.
Returns: the assembly tracing ParallelFlux
/**
* Activate traceback (full assembly tracing) for this particular {@link ParallelFlux}, in case of an
* error upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
* creation.
* <p>
* It should be placed towards the end of the reactive chain, as errors
* triggered downstream of it cannot be observed and augmented with assembly trace.
* <p>
* The traceback is attached to the error as a {@link Throwable#getSuppressed() suppressed exception}.
* As such, if the error is a {@link Exceptions#isMultiple(Throwable) composite one}, the traceback
* would appear as a component of the composite. In any case, the traceback nature can be detected via
* {@link Exceptions#isTraceback(Throwable)}.
*
* @return the assembly tracing {@link ParallelFlux}
*/
public final ParallelFlux<T> checkpoint() {
AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
return new ParallelFluxOnAssembly<>(this, stacktrace);
}
Activate traceback (assembly marker) for this particular ParallelFlux
by giving it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint()
, this doesn't create a filled stack trace, avoiding the main cost of the operator. However, as a trade-off the description must be unique enough for the user to find out where this ParallelFlux was assembled. If you only want a generic description, and still rely on the stack trace to find the assembly site, use the checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors
triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
. As such, if the error is a composite one
, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable)
.
Params: - description – a unique enough description to include in the light assembly traceback.
Returns: the assembly marked ParallelFlux
/**
* Activate traceback (assembly marker) for this particular {@link ParallelFlux} by giving it a description that
* will be reflected in the assembly traceback in case of an error upstream of the
* checkpoint. Note that unlike {@link #checkpoint()}, this doesn't create a
* filled stack trace, avoiding the main cost of the operator.
* However, as a trade-off the description must be unique enough for the user to find
* out where this ParallelFlux was assembled. If you only want a generic description, and
* still rely on the stack trace to find the assembly site, use the
* {@link #checkpoint(String, boolean)} variant.
* <p>
* It should be placed towards the end of the reactive chain, as errors
* triggered downstream of it cannot be observed and augmented with assembly trace.
* <p>
* The traceback is attached to the error as a {@link Throwable#getSuppressed() suppressed exception}.
* As such, if the error is a {@link Exceptions#isMultiple(Throwable) composite one}, the traceback
* would appear as a component of the composite. In any case, the traceback nature can be detected via
* {@link Exceptions#isTraceback(Throwable)}.
*
* @param description a unique enough description to include in the light assembly traceback.
* @return the assembly marked {@link ParallelFlux}
*/
public final ParallelFlux<T> checkpoint(String description) {
return new ParallelFluxOnAssembly<>(this, new AssemblyLightSnapshot(description));
}
Activate traceback (full assembly tracing or the lighter assembly marking depending on the forceStackTrace
option). By setting the forceStackTrace
parameter to true, activate assembly tracing for this particular ParallelFlux
and give it a description that will be reflected in the assembly traceback in case of an error upstream of the checkpoint. Note that unlike checkpoint(String)
, this will incur the cost of an exception stack trace creation. The description could for example be a meaningful name for the assembled ParallelFlux or a wider correlation ID, since the stack trace will always provide enough information to locate where this ParallelFlux was assembled.
By setting forceStackTrace
to false, behaves like checkpoint(String)
and is subject to the same caveat in choosing the description.
It should be placed towards the end of the reactive chain, as errors
triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a suppressed exception
. As such, if the error is a composite one
, the traceback would appear as a component of the composite. In any case, the traceback nature can be detected via Exceptions.isTraceback(Throwable)
.
Params: - description – a description (must be unique enough if forceStackTrace is set
to false).
- forceStackTrace – false to make a light checkpoint without a stacktrace, true
to use a stack trace.
Returns: the assembly marked ParallelFlux
.
/**
* Activate traceback (full assembly tracing or the lighter assembly marking depending on the
* {@code forceStackTrace} option).
* <p>
* By setting the {@code forceStackTrace} parameter to {@literal true}, activate assembly
* tracing for this particular {@link ParallelFlux} and give it a description that
* will be reflected in the assembly traceback in case of an error upstream of the
* checkpoint. Note that unlike {@link #checkpoint(String)}, this will incur
* the cost of an exception stack trace creation. The description could for
* example be a meaningful name for the assembled ParallelFlux or a wider correlation ID,
* since the stack trace will always provide enough information to locate where this
* ParallelFlux was assembled.
* <p>
* By setting {@code forceStackTrace} to {@literal false}, behaves like
* {@link #checkpoint(String)} and is subject to the same caveat in choosing the
* description.
* <p>
* It should be placed towards the end of the reactive chain, as errors
* triggered downstream of it cannot be observed and augmented with assembly marker.
* <p>
* The traceback is attached to the error as a {@link Throwable#getSuppressed() suppressed exception}.
* As such, if the error is a {@link Exceptions#isMultiple(Throwable) composite one}, the traceback
* would appear as a component of the composite. In any case, the traceback nature can be detected via
* {@link Exceptions#isTraceback(Throwable)}.
*
* @param description a description (must be unique enough if forceStackTrace is set
* to false).
* @param forceStackTrace false to make a light checkpoint without a stacktrace, true
* to use a stack trace.
* @return the assembly marked {@link ParallelFlux}.
*/
public final ParallelFlux<T> checkpoint(String description, boolean forceStackTrace) {
final AssemblySnapshot stacktrace;
if (!forceStackTrace) {
stacktrace = new AssemblyLightSnapshot(description);
}
else {
stacktrace = new AssemblySnapshot(description, Traces.callSiteSupplierFactory.get());
}
return new ParallelFluxOnAssembly<>(this, stacktrace);
}
Collect the elements in each rail into a collection supplied via a
collectionSupplier and collected into with a collector action, emitting the
collection at the end.
Params: - collectionSupplier – the supplier of the collection in each rail
- collector – the collector, taking the per-rail collection and the current
item
Type parameters: - <C> – the collection type
Returns: the new ParallelFlux
instance
/**
* Collect the elements in each rail into a collection supplied via a
* collectionSupplier and collected into with a collector action, emitting the
* collection at the end.
*
* @param <C> the collection type
* @param collectionSupplier the supplier of the collection in each rail
* @param collector the collector, taking the per-rail collection and the current
* item
*
* @return the new {@link ParallelFlux} instance
*/
public final <C> ParallelFlux<C> collect(Supplier<? extends C> collectionSupplier,
BiConsumer<? super C, ? super T> collector) {
return onAssembly(new ParallelCollect<>(this, collectionSupplier, collector));
}
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
This operator requires a finite source ParallelFlux.
Params: - comparator – the comparator to compare elements
Returns: the new Flux instance
/**
* Sorts the 'rails' according to the comparator and returns a full sorted list as a
* Publisher.
* <p>
* This operator requires a finite source ParallelFlux.
*
* @param comparator the comparator to compare elements
*
* @return the new Flux instance
*/
public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator) {
return collectSortedList(comparator, 16);
}
Sorts the 'rails' according to the comparator and returns a full sorted list as a
Publisher.
This operator requires a finite source ParallelFlux.
Params: - comparator – the comparator to compare elements
- capacityHint – the expected number of total elements
Returns: the new Mono instance
/**
* Sorts the 'rails' according to the comparator and returns a full sorted list as a
* Publisher.
* <p>
* This operator requires a finite source ParallelFlux.
*
* @param comparator the comparator to compare elements
* @param capacityHint the expected number of total elements
*
* @return the new Mono instance
*/
public final Mono<List<T>> collectSortedList(Comparator<? super T> comparator,
int capacityHint) {
int ch = capacityHint / parallelism() + 1;
ParallelFlux<List<T>> railReduced =
reduce(() -> new ArrayList<>(ch), (a, b) -> {
a.add(b);
return a;
});
ParallelFlux<List<T>> railSorted = railReduced.map(list -> {
list.sort(comparator);
return list;
});
Mono<List<T>> merged = railSorted.reduce((a, b) -> sortedMerger(a, b, comparator));
return merged;
}
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and generating 2 publishers upfront.
Params: - mapper – the function to map each rail's value into a Publisher source and the
inner Publishers (immediate, boundary, end)
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and concatenates Publishers on each 'rail', signalling errors immediately
* and generating 2 publishers upfront.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher source and the
* inner Publishers (immediate, boundary, end)
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return concatMap(mapper, 2, ErrorMode.IMMEDIATE);
}
Generates and concatenates Publishers on each 'rail', signalling errors immediately
and using the given prefetch amount for generating Publishers upfront.
Params: - mapper – the function to map each rail's value into a Publisher
- prefetch – the number of items to prefetch from each inner Publisher source
and the inner Publishers (immediate, boundary, end)
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and concatenates Publishers on each 'rail', signalling errors immediately
* and using the given prefetch amount for generating Publishers upfront.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param prefetch the number of items to prefetch from each inner Publisher source
* and the inner Publishers (immediate, boundary, end)
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch) {
return concatMap(mapper, prefetch, ErrorMode.IMMEDIATE);
}
Generates and concatenates Publishers on each 'rail', delaying errors
and generating 2 publishers upfront.
Params: - mapper – the function to map each rail's value into a Publisher
source and the inner Publishers (immediate, boundary, end)
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and concatenates Publishers on each 'rail', delaying errors
* and generating 2 publishers upfront.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* source and the inner Publishers (immediate, boundary, end)
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> concatMapDelayError(Function<? super T, ? extends
Publisher<? extends R>> mapper) {
return concatMap(mapper, 2, ErrorMode.END);
}
Run the specified runnable when a 'rail' completes or signals an error.
Params: - afterTerminate – the callback
Returns: the new ParallelFlux
instance
/**
* Run the specified runnable when a 'rail' completes or signals an error.
*
* @param afterTerminate the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doAfterTerminate(Runnable afterTerminate) {
Objects.requireNonNull(afterTerminate, "afterTerminate");
return doOnSignal(this, null, null, null, null, afterTerminate, null, null, null);
}
Run the specified runnable when a 'rail' receives a cancellation.
Params: - onCancel – the callback
Returns: the new ParallelFlux
instance
/**
* Run the specified runnable when a 'rail' receives a cancellation.
*
* @param onCancel the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doOnCancel(Runnable onCancel) {
Objects.requireNonNull(onCancel, "onCancel");
return doOnSignal(this, null, null, null, null, null, null, null, onCancel);
}
Run the specified runnable when a 'rail' completes.
Params: - onComplete – the callback
Returns: the new ParallelFlux
instance
/**
* Run the specified runnable when a 'rail' completes.
*
* @param onComplete the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doOnComplete(Runnable onComplete) {
Objects.requireNonNull(onComplete, "onComplete");
return doOnSignal(this, null, null, null, onComplete, null, null, null, null);
}
Triggers side-effects when the ParallelFlux
emits an item, fails with an error or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that with ParallelFlux
and the lambda-based subscribes
or the array-based one
, onError and onComplete will be invoked as many times as there are rails, resulting in as many corresponding Signal
being seen in the callback. Use of subscribe(Subscriber)
, which calls sequential()
, might cancel some rails, resulting in less signals being observed. This is an advanced operator, typically used for monitoring of a ParallelFlux.
Params: - signalConsumer – the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and Subscriber.onComplete()
See Also: Returns: an observed ParallelFlux
/**
* Triggers side-effects when the {@link ParallelFlux} emits an item, fails with an error
* or completes successfully. All these events are represented as a {@link Signal}
* that is passed to the side-effect callback. Note that with {@link ParallelFlux} and
* the {@link #subscribe(Consumer) lambda-based subscribes} or the
* {@link #subscribe(CoreSubscriber[]) array-based one}, onError and onComplete will be
* invoked as many times as there are rails, resulting in as many corresponding
* {@link Signal} being seen in the callback.
* <p>
* Use of {@link #subscribe(Subscriber)}, which calls {@link #sequential()}, might
* cancel some rails, resulting in less signals being observed. This is an advanced
* operator, typically used for monitoring of a ParallelFlux.
*
* @param signalConsumer the mandatory callback to call on
* {@link Subscriber#onNext(Object)}, {@link Subscriber#onError(Throwable)} and
* {@link Subscriber#onComplete()}
* @return an observed {@link ParallelFlux}
* @see #doOnNext(Consumer)
* @see #doOnError(Consumer)
* @see #doOnComplete(Runnable)
* @see #subscribe(CoreSubscriber[])
* @see Signal
*/
public final ParallelFlux<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
Objects.requireNonNull(signalConsumer, "signalConsumer");
return onAssembly(new ParallelDoOnEach<>(
this,
(ctx, v) -> signalConsumer.accept(Signal.next(v, ctx)),
(ctx, e) -> signalConsumer.accept(Signal.error(e, ctx)),
ctx -> signalConsumer.accept(Signal.complete(ctx))
));
}
Call the specified consumer with the exception passing through any 'rail'.
Params: - onError – the callback
Returns: the new ParallelFlux
instance
/**
* Call the specified consumer with the exception passing through any 'rail'.
*
* @param onError the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doOnError(Consumer<? super Throwable> onError) {
Objects.requireNonNull(onError, "onError");
return doOnSignal(this, null, null, onError, null, null, null, null, null);
}
Call the specified callback when a 'rail' receives a Subscription from its
upstream.
This method is not intended for capturing the subscription and calling its methods, but for side effects like monitoring. For instance, the correct way to cancel a subscription is to call Disposable.dispose()
on the Disposable returned by subscribe()
.
Params: - onSubscribe – the callback
Returns: the new ParallelFlux
instance
/**
* Call the specified callback when a 'rail' receives a Subscription from its
* upstream.
* <p>
* This method is <strong>not</strong> intended for capturing the subscription and calling its methods,
* but for side effects like monitoring. For instance, the correct way to cancel a subscription is
* to call {@link Disposable#dispose()} on the Disposable returned by {@link ParallelFlux#subscribe()}.
*
* @param onSubscribe the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe");
return doOnSignal(this, null, null, null, null, null, onSubscribe, null, null);
}
Call the specified consumer with the current element passing through any 'rail'.
Params: - onNext – the callback
Returns: the new ParallelFlux
instance
/**
* Call the specified consumer with the current element passing through any 'rail'.
*
* @param onNext the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doOnNext(Consumer<? super T> onNext) {
Objects.requireNonNull(onNext, "onNext");
return doOnSignal(this, onNext, null, null, null, null, null, null, null);
}
Call the specified consumer with the request amount if any rail receives a
request.
Params: - onRequest – the callback
Returns: the new ParallelFlux
instance
/**
* Call the specified consumer with the request amount if any rail receives a
* request.
*
* @param onRequest the callback
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> doOnRequest(LongConsumer onRequest) {
Objects.requireNonNull(onRequest, "onRequest");
return doOnSignal(this, null, null, null, null, null, null, onRequest, null);
}
Triggered when the ParallelFlux
terminates, either by completing successfully or with an error. Params: - onTerminate – the callback to call on
Subscriber.onComplete
or Subscriber.onError
Returns: an observed ParallelFlux
/**
* Triggered when the {@link ParallelFlux} terminates, either by completing successfully or with an error.
* @param onTerminate the callback to call on {@link Subscriber#onComplete} or {@link Subscriber#onError}
*
* @return an observed {@link ParallelFlux}
*/
public final ParallelFlux<T> doOnTerminate(Runnable onTerminate) {
Objects.requireNonNull(onTerminate, "onTerminate");
return doOnSignal(this,
null,
null,
e -> onTerminate.run(),
onTerminate,
null,
null,
null,
null);
}
Filters the source values on each 'rail'.
Note that the same predicate may be called from multiple threads concurrently.
Params: - predicate – the function returning true to keep a value or false to drop a
value
Returns: the new ParallelFlux
instance
/**
* Filters the source values on each 'rail'.
* <p>
* Note that the same predicate may be called from multiple threads concurrently.
*
* @param predicate the function returning true to keep a value or false to drop a
* value
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> filter(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate");
return onAssembly(new ParallelFilter<>(this, predicate));
}
Generates and flattens Publishers on each 'rail'.
Errors are not delayed and uses unbounded concurrency along with default inner
prefetch.
Params: - mapper – the function to map each rail's value into a Publisher
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and flattens Publishers on each 'rail'.
* <p>
* Errors are not delayed and uses unbounded concurrency along with default inner
* prefetch.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return flatMap(mapper,
false,
Integer.MAX_VALUE, Queues.SMALL_BUFFER_SIZE);
}
Generates and flattens Publishers on each 'rail', optionally delaying errors.
It uses unbounded concurrency along with default inner prefetch.
Params: - mapper – the function to map each rail's value into a Publisher
- delayError – should the errors from the main and the inner sources delayed
till everybody terminates?
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and flattens Publishers on each 'rail', optionally delaying errors.
* <p>
* It uses unbounded concurrency along with default inner prefetch.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param delayError should the errors from the main and the inner sources delayed
* till everybody terminates?
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper,
boolean delayError) {
return flatMap(mapper,
delayError,
Integer.MAX_VALUE, Queues.SMALL_BUFFER_SIZE);
}
Generates and flattens Publishers on each 'rail', optionally delaying errors and
having a total number of simultaneous subscriptions to the inner Publishers.
It uses a default inner prefetch.
Params: - mapper – the function to map each rail's value into a Publisher
- delayError – should the errors from the main and the inner sources delayed
till everybody terminates?
- maxConcurrency – the maximum number of simultaneous subscriptions to the
generated inner Publishers
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and flattens Publishers on each 'rail', optionally delaying errors and
* having a total number of simultaneous subscriptions to the inner Publishers.
* <p>
* It uses a default inner prefetch.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param delayError should the errors from the main and the inner sources delayed
* till everybody terminates?
* @param maxConcurrency the maximum number of simultaneous subscriptions to the
* generated inner Publishers
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency) {
return flatMap(mapper,
delayError,
maxConcurrency, Queues.SMALL_BUFFER_SIZE);
}
Generates and flattens Publishers on each 'rail', optionally delaying errors,
having a total number of simultaneous subscriptions to the inner Publishers and
using the given prefetch amount for the inner Publishers.
Params: - mapper – the function to map each rail's value into a Publisher
- delayError – should the errors from the main and the inner sources delayed
till everybody terminates?
- maxConcurrency – the maximum number of simultaneous subscriptions to the
generated inner Publishers
- prefetch – the number of items to prefetch from each inner Publisher
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and flattens Publishers on each 'rail', optionally delaying errors,
* having a total number of simultaneous subscriptions to the inner Publishers and
* using the given prefetch amount for the inner Publishers.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param delayError should the errors from the main and the inner sources delayed
* till everybody terminates?
* @param maxConcurrency the maximum number of simultaneous subscriptions to the
* generated inner Publishers
* @param prefetch the number of items to prefetch from each inner Publisher
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper,
boolean delayError,
int maxConcurrency,
int prefetch) {
return onAssembly(new ParallelFlatMap<>(this,
mapper,
delayError,
maxConcurrency,
Queues.get(maxConcurrency),
prefetch, Queues.get(prefetch)));
}
Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
index (zero based).
Each group can be consumed only once; requests and cancellation compose through.
Note that cancelling only one rail may result in undefined behavior.
Returns: the new Flux instance
/**
* Exposes the 'rails' as individual GroupedFlux instances, keyed by the rail
* index (zero based).
* <p>
* Each group can be consumed only once; requests and cancellation compose through.
* Note that cancelling only one rail may result in undefined behavior.
*
* @return the new Flux instance
*/
public final Flux<GroupedFlux<Integer, T>> groups() {
return Flux.onAssembly(new ParallelGroup<>(this));
}
Hides the identities of this ParallelFlux
and its Subscription
as well. Returns: a new ParallelFlux
defeating any Publisher
/ Subscription
feature-detection
/**
* Hides the identities of this {@link ParallelFlux} and its {@link Subscription}
* as well.
*
* @return a new {@link ParallelFlux} defeating any {@link Publisher} / {@link Subscription} feature-detection
*/
public final ParallelFlux<T> hide() {
return new ParallelFluxHide<>(this);
}
Observe all Reactive Streams signals and use Logger
support to handle trace implementation. Default will use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
The default log category will be "reactor.*", a generated operator suffix will
complete, e.g. "reactor.Parallel.Map".
Returns: a new unaltered ParallelFlux
/**
* Observe all Reactive Streams signals and use {@link Logger} support to handle trace
* implementation. Default will use {@link Level#INFO} and java.util.logging. If SLF4J
* is available, it will be used instead.
* <p>
* <img class="marble" src="doc-files/marbles/logForFlux.svg" alt="">
* <p>
* The default log category will be "reactor.*", a generated operator suffix will
* complete, e.g. "reactor.Parallel.Map".
*
* @return a new unaltered {@link ParallelFlux}
*/
public final ParallelFlux<T> log() {
return log(null, Level.INFO);
}
Observe all Reactive Streams signals and use Logger
support to handle trace implementation. Default will use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
Params: - category – to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator suffix
will complete, e.g. "reactor.Parallel.Map".
Returns: a new unaltered ParallelFlux
/**
* Observe all Reactive Streams signals and use {@link Logger} support to handle trace
* implementation. Default will use {@link Level#INFO} and java.util.logging. If SLF4J
* is available, it will be used instead.
* <p>
* <img class="marble" src="doc-files/marbles/logForFlux.svg" alt="">
* <p>
*
* @param category to be mapped into logger configuration (e.g. org.springframework
* .reactor). If category ends with "." like "reactor.", a generated operator suffix
* will complete, e.g. "reactor.Parallel.Map".
*
* @return a new unaltered {@link ParallelFlux}
*/
public final ParallelFlux<T> log(@Nullable String category) {
return log(category, Level.INFO);
}
Observe Reactive Streams signals matching the passed filter options
and use Logger
support to handle trace implementation. Default will use the passed Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only
capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT,
SignalType.ON_ERROR)
Params: - category – to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Parallel.Map".
- level – the
Level
to enforce for this tracing ParallelFlux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account) - options – a vararg
SignalType
option to filter log messages
Returns: a new unaltered ParallelFlux
/**
* Observe Reactive Streams signals matching the passed filter {@code options} and use
* {@link Logger} support to handle trace implementation. Default will use the passed
* {@link Level} and java.util.logging. If SLF4J is available, it will be used
* instead.
* <p>
* Options allow fine grained filtering of the traced signal, for instance to only
* capture onNext and onError:
* <pre>
* ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT,
* SignalType.ON_ERROR)
*
* <p>
* <img class="marble" src="doc-files/marbles/logForFlux.svg" alt="">
*
* @param category to be mapped into logger configuration (e.g. org.springframework
* .reactor). If category ends with "." like "reactor.", a generated operator
* suffix will complete, e.g. "reactor.Parallel.Map".
* @param level the {@link Level} to enforce for this tracing ParallelFlux (only
* FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
* @param options a vararg {@link SignalType} option to filter log messages
*
* @return a new unaltered {@link ParallelFlux}
*/
public final ParallelFlux<T> log(@Nullable String category,
Level level,
SignalType... options) {
return log(category, level, false, options);
}
Observe Reactive Streams signals matching the passed filter options
and use Logger
support to handle trace implementation. Default will use the passed Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only
capture onNext and onError:
ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT,
SignalType.ON_ERROR)
Params: - category – to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.ParallelFlux.Map".
- level – the
Level
to enforce for this tracing ParallelFlux (only FINEST, FINE, INFO, WARNING and SEVERE are taken into account) - showOperatorLine – capture the current stack to display operator
class/line number.
- options – a vararg
SignalType
option to filter log messages
Returns: a new unaltered ParallelFlux
/**
* Observe Reactive Streams signals matching the passed filter {@code options} and use
* {@link Logger} support to handle trace implementation. Default will use the passed
* {@link Level} and java.util.logging. If SLF4J is available, it will be used
* instead.
* <p>
* Options allow fine grained filtering of the traced signal, for instance to only
* capture onNext and onError:
* <pre>
* ParallelFlux.log("category", Level.INFO, SignalType.ON_NEXT,
* SignalType.ON_ERROR)
*
* <p>
* <img class="marble" src="doc-files/marbles/logForFlux.svg" alt="">
*
* @param category to be mapped into logger configuration (e.g. org.springframework
* .reactor). If category ends with "." like "reactor.", a generated operator
* suffix will complete, e.g. "reactor.ParallelFlux.Map".
* @param level the {@link Level} to enforce for this tracing ParallelFlux (only
* FINEST, FINE, INFO, WARNING and SEVERE are taken into account)
* @param showOperatorLine capture the current stack to display operator
* class/line number.
* @param options a vararg {@link SignalType} option to filter log messages
*
* @return a new unaltered {@link ParallelFlux}
*/
public final ParallelFlux<T> log(@Nullable String category,
Level level,
boolean showOperatorLine,
SignalType... options) {
return onAssembly(new ParallelLog<>(this, new SignalLogger<>(this, category, level, showOperatorLine, options)));
}
Maps the source values on each 'rail' to another value.
Note that the same mapper function may be called from multiple threads
concurrently.
Params: - mapper – the mapper function turning Ts into Us.
Type parameters: - <U> – the output value type
Returns: the new ParallelFlux
instance
/**
* Maps the source values on each 'rail' to another value.
* <p>
* Note that the same mapper function may be called from multiple threads
* concurrently.
*
* @param <U> the output value type
* @param mapper the mapper function turning Ts into Us.
*
* @return the new {@link ParallelFlux} instance
*/
public final <U> ParallelFlux<U> map(Function<? super T, ? extends U> mapper) {
Objects.requireNonNull(mapper, "mapper");
return onAssembly(new ParallelMap<>(this, mapper));
}
Give a name to this sequence, which can be retrieved using Scannable.name()
as long as this is the first reachable Scannable.parents()
. Params: - name – a name for the sequence
Returns: the same sequence, but bearing a name
/**
* Give a name to this sequence, which can be retrieved using {@link Scannable#name()}
* as long as this is the first reachable {@link Scannable#parents()}.
*
* @param name a name for the sequence
* @return the same sequence, but bearing a name
*/
public final ParallelFlux<T> name(String name) {
return ParallelFluxName.createOrAppend(this, name);
}
Merges the values from each 'rail', but choose which one to merge by way of a provided Comparator
, picking the smallest of all rails. The result is exposed back as a Flux
. This version uses a default prefetch of Queues.SMALL_BUFFER_SIZE
.
Params: - comparator – the comparator to choose the smallest value available from all rails
See Also: Returns: the new Flux instance
/**
* Merges the values from each 'rail', but choose which one to merge by way of a
* provided {@link Comparator}, picking the smallest of all rails. The result is
* exposed back as a {@link Flux}.
* <p>
* This version uses a default prefetch of {@link Queues#SMALL_BUFFER_SIZE}.
*
* @param comparator the comparator to choose the smallest value available from all rails
* @return the new Flux instance
*
* @see ParallelFlux#ordered(Comparator, int)
*/
public final Flux<T> ordered(Comparator<? super T> comparator) {
return ordered(comparator, Queues.SMALL_BUFFER_SIZE);
}
Merges the values from each 'rail', but choose which one to merge by way of a provided Comparator
, picking the smallest of all rails. The result is exposed back as a Flux
. Params: - comparator – the comparator to choose the smallest value available from all rails
- prefetch – the prefetch to use
See Also: Returns: the new Flux instance
/**
* Merges the values from each 'rail', but choose which one to merge by way of a
* provided {@link Comparator}, picking the smallest of all rails. The result is
* exposed back as a {@link Flux}.
*
* @param comparator the comparator to choose the smallest value available from all rails
* @param prefetch the prefetch to use
* @return the new Flux instance
*
* @see ParallelFlux#ordered(Comparator)
*/
public final Flux<T> ordered(Comparator<? super T> comparator, int prefetch) {
return new ParallelMergeOrdered<>(this, prefetch, Queues.get(prefetch), comparator);
}
Returns the number of expected parallel Subscribers.
Returns: the number of expected parallel Subscribers
/**
* Returns the number of expected parallel Subscribers.
*
* @return the number of expected parallel Subscribers
*/
public abstract int parallelism();
Reduces all values within a 'rail' and across 'rails' with a reducer function into
a single sequential value.
Note that the same reducer function may be called from multiple threads
concurrently.
Params: - reducer – the function to reduce two values into one.
Returns: the new Mono instance emitting the reduced value or empty if the ParallelFlux
was empty
/**
* Reduces all values within a 'rail' and across 'rails' with a reducer function into
* a single sequential value.
* <p>
* Note that the same reducer function may be called from multiple threads
* concurrently.
*
* @param reducer the function to reduce two values into one.
*
* @return the new Mono instance emitting the reduced value or empty if the
* {@link ParallelFlux} was empty
*/
public final Mono<T> reduce(BiFunction<T, T, T> reducer) {
Objects.requireNonNull(reducer, "reducer");
return Mono.onAssembly(new ParallelMergeReduce<>(this, reducer));
}
Reduces all values within a 'rail' to a single value (with a possibly different
type) via a reducer function that is initialized on each rail from an
initialSupplier value.
Note that the same mapper function may be called from multiple threads
concurrently.
Params: - initialSupplier – the supplier for the initial value
- reducer – the function to reduce a previous output of reduce (or the initial
value supplied) with a current source value.
Type parameters: - <R> – the reduced output type
Returns: the new ParallelFlux
instance
/**
* Reduces all values within a 'rail' to a single value (with a possibly different
* type) via a reducer function that is initialized on each rail from an
* initialSupplier value.
* <p>
* Note that the same mapper function may be called from multiple threads
* concurrently.
*
* @param <R> the reduced output type
* @param initialSupplier the supplier for the initial value
* @param reducer the function to reduce a previous output of reduce (or the initial
* value supplied) with a current source value.
*
* @return the new {@link ParallelFlux} instance
*/
public final <R> ParallelFlux<R> reduce(Supplier<R> initialSupplier,
BiFunction<R, ? super T, R> reducer) {
Objects.requireNonNull(initialSupplier, "initialSupplier");
Objects.requireNonNull(reducer, "reducer");
return onAssembly(new ParallelReduceSeed<>(this, initialSupplier, reducer));
}
Specifies where each 'rail' will observe its incoming values with possible
work-stealing and default prefetch amount.
This operator uses the default prefetch size returned by
Queues.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's
parallelism level is lower than the ParallelFlux's, some rails may end up on
the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own
built-in trampolining logic.
Params: - scheduler – the scheduler to use
Returns: the new ParallelFlux
instance
/**
* Specifies where each 'rail' will observe its incoming values with possible
* work-stealing and default prefetch amount.
* <p>
* This operator uses the default prefetch size returned by {@code
* Queues.SMALL_BUFFER_SIZE}.
* <p>
* The operator will call {@code Scheduler.createWorker()} as many times as this
* ParallelFlux's parallelism level is.
* <p>
* No assumptions are made about the Scheduler's parallelism level, if the Scheduler's
* parallelism level is lower than the ParallelFlux's, some rails may end up on
* the same thread/worker.
* <p>
* This operator doesn't require the Scheduler to be trampolining as it does its own
* built-in trampolining logic.
*
* @param scheduler the scheduler to use
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> runOn(Scheduler scheduler) {
return runOn(scheduler, Queues.SMALL_BUFFER_SIZE);
}
Specifies where each 'rail' will observe its incoming values with possible
work-stealing and a given prefetch amount.
This operator uses the default prefetch size returned by
Queues.SMALL_BUFFER_SIZE
.
The operator will call Scheduler.createWorker()
as many times as this ParallelFlux's parallelism level is.
No assumptions are made about the Scheduler's parallelism level, if the Scheduler's
parallelism level is lower than the ParallelFlux's, some rails may end up on
the same thread/worker.
This operator doesn't require the Scheduler to be trampolining as it does its own
built-in trampolining logic.
Params: - scheduler – the scheduler to use that rail's worker has run out of work.
- prefetch – the number of values to request on each 'rail' from the source
Returns: the new ParallelFlux
instance
/**
* Specifies where each 'rail' will observe its incoming values with possible
* work-stealing and a given prefetch amount.
* <p>
* This operator uses the default prefetch size returned by {@code
* Queues.SMALL_BUFFER_SIZE}.
* <p>
* The operator will call {@code Scheduler.createWorker()} as many times as this
* ParallelFlux's parallelism level is.
* <p>
* No assumptions are made about the Scheduler's parallelism level, if the Scheduler's
* parallelism level is lower than the ParallelFlux's, some rails may end up on
* the same thread/worker.
* <p>
* This operator doesn't require the Scheduler to be trampolining as it does its own
* built-in trampolining logic.
*
* @param scheduler the scheduler to use that rail's worker has run out of work.
* @param prefetch the number of values to request on each 'rail' from the source
*
* @return the new {@link ParallelFlux} instance
*/
public final ParallelFlux<T> runOn(Scheduler scheduler, int prefetch) {
Objects.requireNonNull(scheduler, "scheduler");
return onAssembly(new ParallelRunOn<>(this,
scheduler,
prefetch,
Queues.get(prefetch)));
}
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a default prefetch value
for the rails.
This operator uses the default prefetch size returned by
Queues.SMALL_BUFFER_SIZE
.
See Also: Returns: the new Flux instance
/**
* Merges the values from each 'rail' in a round-robin or same-order fashion and
* exposes it as a regular Publisher sequence, running with a default prefetch value
* for the rails.
* <p>
* This operator uses the default prefetch size returned by {@code
* Queues.SMALL_BUFFER_SIZE}.
*
* @return the new Flux instance
*
* @see ParallelFlux#sequential(int)
*/
public final Flux<T> sequential() {
return sequential(Queues.SMALL_BUFFER_SIZE);
}
Merges the values from each 'rail' in a round-robin or same-order fashion and
exposes it as a regular Publisher sequence, running with a give prefetch value for
the rails.
Params: - prefetch – the prefetch amount to use for each rail
Returns: the new Flux instance
/**
* Merges the values from each 'rail' in a round-robin or same-order fashion and
* exposes it as a regular Publisher sequence, running with a give prefetch value for
* the rails.
*
* @param prefetch the prefetch amount to use for each rail
*
* @return the new Flux instance
*/
public final Flux<T> sequential(int prefetch) {
return Flux.onAssembly(new ParallelMergeSequential<>(this,
prefetch,
Queues.get(prefetch)));
}
Sorts the 'rails' of this ParallelFlux
and returns a Publisher that sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
Params: - comparator – the comparator to use
Returns: the new Flux instance
/**
* Sorts the 'rails' of this {@link ParallelFlux} and returns a Publisher that
* sequentially picks the smallest next value from the rails.
* <p>
* This operator requires a finite source ParallelFlux.
*
* @param comparator the comparator to use
*
* @return the new Flux instance
*/
public final Flux<T> sorted(Comparator<? super T> comparator) {
return sorted(comparator, 16);
}
Sorts the 'rails' of this ParallelFlux
and returns a Publisher that sequentially picks the smallest next value from the rails.
This operator requires a finite source ParallelFlux.
Params: - comparator – the comparator to use
- capacityHint – the expected number of total elements
Returns: the new Flux instance
/**
* Sorts the 'rails' of this {@link ParallelFlux} and returns a Publisher that
* sequentially picks the smallest next value from the rails.
* <p>
* This operator requires a finite source ParallelFlux.
*
* @param comparator the comparator to use
* @param capacityHint the expected number of total elements
*
* @return the new Flux instance
*/
public final Flux<T> sorted(Comparator<? super T> comparator, int capacityHint) {
int ch = capacityHint / parallelism() + 1;
ParallelFlux<List<T>> railReduced = reduce(() -> new ArrayList<>(ch), (a, b) -> {
a.add(b);
return a;
});
ParallelFlux<List<T>> railSorted = railReduced.map(list -> {
list.sort(comparator);
return list;
});
return Flux.onAssembly(new ParallelMergeSort<>(railSorted, comparator));
}
Subscribes an array of Subscribers to this ParallelFlux
and triggers the execution chain for all 'rails'. Params: - subscribers – the subscribers array to run in parallel, the number of items
must be equal to the parallelism level of this ParallelFlux
/**
* Subscribes an array of Subscribers to this {@link ParallelFlux} and triggers the
* execution chain for all 'rails'.
*
* @param subscribers the subscribers array to run in parallel, the number of items
* must be equal to the parallelism level of this ParallelFlux
*/
public abstract void subscribe(CoreSubscriber<? super T>[] subscribers);
Subscribes to this ParallelFlux
and triggers the execution chain for all 'rails'. /**
* Subscribes to this {@link ParallelFlux} and triggers the execution chain for all
* 'rails'.
*/
public final Disposable subscribe(){
return subscribe(null, null, null);
}
Subscribes to this ParallelFlux
by providing an onNext callback and triggers the execution chain for all 'rails'. Params: - onNext – consumer of onNext signals
/**
* Subscribes to this {@link ParallelFlux} by providing an onNext callback and
* triggers the execution chain for all 'rails'.
*
* @param onNext consumer of onNext signals
*/
public final Disposable subscribe(Consumer<? super T> onNext){
return subscribe(onNext, null, null);
}
Subscribes to this ParallelFlux
by providing an onNext and onError callback and triggers the execution chain for all 'rails'. Params: - onNext – consumer of onNext signals
- onError – consumer of error signal
/**
* Subscribes to this {@link ParallelFlux} by providing an onNext and onError callback
* and triggers the execution chain for all 'rails'.
*
* @param onNext consumer of onNext signals
* @param onError consumer of error signal
*/
public final Disposable subscribe(@Nullable Consumer<? super T> onNext, Consumer<? super Throwable>
onError){
return subscribe(onNext, onError, null);
}
Subscribes to this ParallelFlux
by providing an onNext, onError and onComplete callback and triggers the execution chain for all 'rails'. Params: - onNext – consumer of onNext signals
- onError – consumer of error signal
- onComplete – callback on completion signal
/**
* Subscribes to this {@link ParallelFlux} by providing an onNext, onError and
* onComplete callback and triggers the execution chain for all 'rails'.
*
* @param onNext consumer of onNext signals
* @param onError consumer of error signal
* @param onComplete callback on completion signal
*/
public final Disposable subscribe(
@Nullable Consumer<? super T> onNext,
@Nullable Consumer<? super Throwable> onError,
@Nullable Runnable onComplete) {
return this.subscribe(onNext, onError, onComplete, null, (Context) null);
}
@Override
@SuppressWarnings("unchecked")
public final void subscribe(CoreSubscriber<? super T> s) {
FluxHide.SuppressFuseableSubscriber<T> subscriber =
new FluxHide.SuppressFuseableSubscriber<>(Operators.toCoreSubscriber(s));
sequential().subscribe(Operators.toCoreSubscriber(subscriber));
}
Subscribes to this ParallelFlux
by providing an onNext, onError, onComplete and onSubscribe callback and triggers the execution chain for all 'rails'. Params: - onNext – consumer of onNext signals
- onError – consumer of error signal
- onComplete – callback on completion signal
- onSubscribe – consumer of the subscription signal
/**
* Subscribes to this {@link ParallelFlux} by providing an onNext, onError,
* onComplete and onSubscribe callback and triggers the execution chain for all
* 'rails'.
*
* @param onNext consumer of onNext signals
* @param onError consumer of error signal
* @param onComplete callback on completion signal
* @param onSubscribe consumer of the subscription signal
*/ //TODO maybe deprecate in 3.4, provided there is at least an alternative for tests
public final Disposable subscribe(
@Nullable Consumer<? super T> onNext,
@Nullable Consumer<? super Throwable> onError,
@Nullable Runnable onComplete,
@Nullable Consumer<? super Subscription> onSubscribe) {
return this.subscribe(onNext, onError, onComplete, onSubscribe, null);
}
Subscribes to this ParallelFlux
by providing an onNext, onError and onComplete callback as well as an initial Context
, then trigger the execution chain for all 'rails'. Params: - onNext – consumer of onNext signals
- onError – consumer of error signal
- onComplete – callback on completion signal
- initialContext –
Context
for the rails
/**
* Subscribes to this {@link ParallelFlux} by providing an onNext, onError and
* onComplete callback as well as an initial {@link Context}, then trigger the execution chain for all
* 'rails'.
*
* @param onNext consumer of onNext signals
* @param onError consumer of error signal
* @param onComplete callback on completion signal
* @param initialContext {@link Context} for the rails
*/
public final Disposable subscribe(
@Nullable Consumer<? super T> onNext,
@Nullable Consumer<? super Throwable> onError,
@Nullable Runnable onComplete,
@Nullable Context initialContext) {
return this.subscribe(onNext, onError, onComplete, null, initialContext);
}
final Disposable subscribe(
@Nullable Consumer<? super T> onNext,
@Nullable Consumer<? super Throwable> onError,
@Nullable Runnable onComplete,
@Nullable Consumer<? super Subscription> onSubscribe,
@Nullable Context initialContext) {
CorePublisher<T> publisher = Operators.onLastAssembly(this);
if (publisher instanceof ParallelFlux) {
@SuppressWarnings("unchecked")
LambdaSubscriber<? super T>[] subscribers = new LambdaSubscriber[parallelism()];
int i = 0;
while(i < subscribers.length){
subscribers[i++] =
new LambdaSubscriber<>(onNext, onError, onComplete, onSubscribe, initialContext);
}
((ParallelFlux<T>) publisher).subscribe(subscribers);
return Disposables.composite(subscribers);
}
else {
LambdaSubscriber<? super T> subscriber =
new LambdaSubscriber<>(onNext, onError, onComplete, onSubscribe, initialContext);
publisher.subscribe(Operators.toCoreSubscriber(new FluxHide.SuppressFuseableSubscriber<>(subscriber)));
return subscriber;
}
}
Merge the rails into a sequential()
Flux and subscribe
to said Flux. Params: - s – the subscriber to use on
sequential()
Flux
/**
* Merge the rails into a {@link #sequential()} Flux and
* {@link Flux#subscribe(Subscriber) subscribe} to said Flux.
*
* @param s the subscriber to use on {@link #sequential()} Flux
*/
@Override
@SuppressWarnings("unchecked")
public final void subscribe(Subscriber<? super T> s) {
FluxHide.SuppressFuseableSubscriber<T> subscriber =
new FluxHide.SuppressFuseableSubscriber<>(Operators.toCoreSubscriber(s));
Operators.onLastAssembly(sequential()).subscribe(Operators.toCoreSubscriber(subscriber));
}
Tag this ParallelFlux with a key/value pair. These can be retrieved as a Set
of all tags throughout the publisher chain by using Scannable.tags()
(as traversed by Scannable.parents()
). Params: - key – a tag key
- value – a tag value
Returns: the same sequence, but bearing tags
/**
* Tag this ParallelFlux with a key/value pair. These can be retrieved as a
* {@link Set} of
* all tags throughout the publisher chain by using {@link Scannable#tags()} (as
* traversed
* by {@link Scannable#parents()}).
*
* @param key a tag key
* @param value a tag value
* @return the same sequence, but bearing tags
*/
public final ParallelFlux<T> tag(String key, String value) {
return ParallelFluxName.createOrAppend(this, key, value);
}
Emit an onComplete or onError signal once all values across 'rails' have been observed.
Returns: the new Mono instance emitting the reduced value or empty if the ParallelFlux
was empty
/**
* Emit an onComplete or onError signal once all values across 'rails' have been observed.
*
* @return the new Mono instance emitting the reduced value or empty if the
* {@link ParallelFlux} was empty
*/
public final Mono<Void> then() {
return Mono.onAssembly(new ParallelThen(this));
}
Allows composing operators, in assembly time, on top of this ParallelFlux
and returns another ParallelFlux
with composed features. Params: - composer – the composer function from
ParallelFlux
(this) to another ParallelFlux
Type parameters: - <U> – the output value type
Returns: the ParallelFlux
returned by the function
/**
* Allows composing operators, in assembly time, on top of this {@link ParallelFlux}
* and returns another {@link ParallelFlux} with composed features.
*
* @param <U> the output value type
* @param composer the composer function from {@link ParallelFlux} (this) to another
* ParallelFlux
*
* @return the {@link ParallelFlux} returned by the function
*/
public final <U> ParallelFlux<U> transform(Function<? super ParallelFlux<T>, ParallelFlux<U>> composer) {
return onAssembly(as(composer));
}
Allows composing operators off the groups (or 'rails'), as individual GroupedFlux
instances keyed by the zero based rail's index. The transformed groups are parallelized
back once the transformation has been applied. Since groups are generated anew per each subscription, this is all done in a "lazy" fashion where each subscription trigger distinct applications of the Function
. Note that like in groups()
, requests and cancellation compose through, and cancelling only one rail may result in undefined behavior.
Params: - composer – the composition function to apply on each
rail
Type parameters: - <U> – the type of the resulting parallelized flux
Returns: a ParallelFlux
of the composed groups
/**
* Allows composing operators off the groups (or 'rails'), as individual {@link GroupedFlux}
* instances keyed by the zero based rail's index. The transformed groups are
* {@link Flux#parallel parallelized} back once the transformation has been applied.
* Since groups are generated anew per each subscription, this is all done in a "lazy"
* fashion where each subscription trigger distinct applications of the {@link Function}.
* <p>
* Note that like in {@link #groups()}, requests and cancellation compose through, and
* cancelling only one rail may result in undefined behavior.
*
* @param composer the composition function to apply on each {@link GroupedFlux rail}
* @param <U> the type of the resulting parallelized flux
* @return a {@link ParallelFlux} of the composed groups
*/
public final <U> ParallelFlux<U> transformGroups(Function<? super GroupedFlux<Integer, T>,
? extends Publisher<? extends U>> composer) {
if (getPrefetch() > -1) {
return from(groups().flatMap(composer::apply),
parallelism(), getPrefetch(),
Queues.small());
}
else {
return from(groups().flatMap(composer::apply), parallelism());
}
}
@Override
public String toString() {
return getClass().getSimpleName();
}
Validates the number of subscribers and returns true if their number matches the
parallelism level of this ParallelFlux.
Params: - subscribers – the array of Subscribers
Returns: true if the number of subscribers equals to the parallelism level
/**
* Validates the number of subscribers and returns true if their number matches the
* parallelism level of this ParallelFlux.
*
* @param subscribers the array of Subscribers
*
* @return true if the number of subscribers equals to the parallelism level
*/
protected final boolean validate(Subscriber<?>[] subscribers) {
int p = parallelism();
if (subscribers.length != p) {
IllegalArgumentException iae = new IllegalArgumentException("parallelism = " +
"" + p + ", subscribers = " + subscribers.length);
for (Subscriber<?> s : subscribers) {
Operators.error(s, iae);
}
return false;
}
return true;
}
Generates and concatenates Publishers on each 'rail', optionally delaying errors
and using the given prefetch amount for generating Publishers upfront.
Params: - mapper – the function to map each rail's value into a Publisher
- prefetch – the number of items to prefetch from each inner Publisher
- errorMode – the error handling, i.e., when to report errors from the main
source and the inner Publishers (immediate, boundary, end)
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and concatenates Publishers on each 'rail', optionally delaying errors
* and using the given prefetch amount for generating Publishers upfront.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param prefetch the number of items to prefetch from each inner Publisher
* @param errorMode the error handling, i.e., when to report errors from the main
* source and the inner Publishers (immediate, boundary, end)
*
* @return the new {@link ParallelFlux} instance
*/
final <R> ParallelFlux<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch,
ErrorMode errorMode) {
return onAssembly(new ParallelConcatMap<>(this,
mapper,
Queues.get(prefetch),
prefetch,
errorMode));
}
Generates and concatenates Publishers on each 'rail', delaying errors
and using the given prefetch amount for generating Publishers upfront.
Params: - mapper – the function to map each rail's value into a Publisher
- delayUntilEnd – true if delayed until all sources are concatenated
- prefetch – the number of items to prefetch from each inner Publisher
source and the inner Publishers (immediate, boundary, end)
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and concatenates Publishers on each 'rail', delaying errors
* and using the given prefetch amount for generating Publishers upfront.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param delayUntilEnd true if delayed until all sources are concatenated
* @param prefetch the number of items to prefetch from each inner Publisher
* source and the inner Publishers (immediate, boundary, end)
*
* @return the new {@link ParallelFlux} instance
*/
final <R> ParallelFlux<R> concatMapDelayError(Function<? super T, ? extends
Publisher<? extends R>> mapper,
boolean delayUntilEnd,
int prefetch) {
return concatMap(mapper, prefetch, delayUntilEnd ? ErrorMode.END: ErrorMode.BOUNDARY);
}
Generates and concatenates Publishers on each 'rail', delaying errors
and using the given prefetch amount for generating Publishers upfront.
Params: - mapper – the function to map each rail's value into a Publisher
- prefetch – the number of items to prefetch from each inner Publisher
source and the inner Publishers (immediate, boundary, end)
Type parameters: - <R> – the result type
Returns: the new ParallelFlux
instance
/**
* Generates and concatenates Publishers on each 'rail', delaying errors
* and using the given prefetch amount for generating Publishers upfront.
*
* @param <R> the result type
* @param mapper the function to map each rail's value into a Publisher
* @param prefetch the number of items to prefetch from each inner Publisher
* source and the inner Publishers (immediate, boundary, end)
*
* @return the new {@link ParallelFlux} instance
*/
final <R> ParallelFlux<R> concatMapDelayError(Function<? super T, ? extends
Publisher<? extends R>> mapper, int prefetch) {
return concatMap(mapper, prefetch, ErrorMode.END);
}
The prefetch configuration of the component
Returns: the prefetch configuration of the component
/**
* The prefetch configuration of the component
*
* @return the prefetch configuration of the component
*/
public int getPrefetch() {
return -1;
}
Params: - source – the source to wrap
Type parameters: - <T> – the value type
Returns: the potentially wrapped source
/**
* Invoke {@link Hooks} pointcut given a {@link ParallelFlux} and returning an
* eventually new {@link ParallelFlux}
*
* @param <T> the value type
* @param source the source to wrap
*
* @return the potentially wrapped source
*/
@SuppressWarnings("unchecked")
protected static <T> ParallelFlux<T> onAssembly(ParallelFlux<T> source) {
Function<Publisher, Publisher> hook = Hooks.onEachOperatorHook;
if(hook != null) {
source = (ParallelFlux<T>) hook.apply(source);
}
if (Hooks.GLOBAL_TRACE) {
AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
source = (ParallelFlux<T>) Hooks.addAssemblyInfo(source, stacktrace);
}
return source;
}
@SuppressWarnings("unchecked")
static <T> ParallelFlux<T> doOnSignal(ParallelFlux<T> source,
@Nullable Consumer<? super T> onNext,
@Nullable Consumer<? super T> onAfterNext,
@Nullable Consumer<? super Throwable> onError,
@Nullable Runnable onComplete,
@Nullable Runnable onAfterTerminate,
@Nullable Consumer<? super Subscription> onSubscribe,
@Nullable LongConsumer onRequest,
@Nullable Runnable onCancel) {
return onAssembly(new ParallelPeek<>(source,
onNext,
onAfterNext,
onError,
onComplete,
onAfterTerminate,
onSubscribe,
onRequest,
onCancel));
}
static final <T> List<T> sortedMerger(List<T> a, List<T> b, Comparator<? super T> comparator) {
int n = a.size() + b.size();
if (n == 0) {
return new ArrayList<>();
}
List<T> both = new ArrayList<>(n);
Iterator<T> at = a.iterator();
Iterator<T> bt = b.iterator();
T s1 = at.hasNext() ? at.next() : null;
T s2 = bt.hasNext() ? bt.next() : null;
while (s1 != null && s2 != null) {
if (comparator.compare(s1, s2) < 0) { // s1 comes before s2
both.add(s1);
s1 = at.hasNext() ? at.next() : null;
}
else {
both.add(s2);
s2 = bt.hasNext() ? bt.next() : null;
}
}
if (s1 != null) {
both.add(s1);
while (at.hasNext()) {
both.add(at.next());
}
}
else if (s2 != null) {
both.add(s2);
while (bt.hasNext()) {
both.add(bt.next());
}
}
return both;
}
}