/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.function.Function;
import java.util.function.LongConsumer;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.util.context.Context;

Wrapper API around a downstream Subscriber for emitting any number of next signals followed by zero or one onError/onComplete.

Type parameters:
  • <T> – the value type
/** * Wrapper API around a downstream Subscriber for emitting any number of * next signals followed by zero or one onError/onComplete. * <p> * @param <T> the value type */
public interface FluxSink<T> {
See Also:
  • onComplete.onComplete()
/** * @see Subscriber#onComplete() */
void complete();
Return the current subscriber Context.

Context can be enriched via Flux.subscriberContext(Function) operator or directly by a child subscriber overriding CoreSubscriber.currentContext()

Returns:the current subscriber Context.
/** * Return the current subscriber {@link Context}. * <p> * {@link Context} can be enriched via {@link Flux#subscriberContext(Function)} * operator or directly by a child subscriber overriding * {@link CoreSubscriber#currentContext()} * * @return the current subscriber {@link Context}. */
Context currentContext();
Params:
  • e – the exception to signal, not null
See Also:
  • onError.onError(Throwable)
/** * @see Subscriber#onError(Throwable) * @param e the exception to signal, not null */
void error(Throwable e);
Try emitting, might throw an unchecked exception.
Params:
  • t – the value to emit, not null
See Also:
  • onNext.onNext(Object)
/** * Try emitting, might throw an unchecked exception. * @see Subscriber#onNext(Object) * @param t the value to emit, not null */
FluxSink<T> next(T t);
The current outstanding request amount.
Returns:the current outstanding request amount
/** * The current outstanding request amount. * @return the current outstanding request amount */
long requestedFromDownstream();
Returns true if the downstream cancelled the sequence.
Returns:true if the downstream cancelled the sequence
/** * Returns true if the downstream cancelled the sequence. * @return true if the downstream cancelled the sequence */
boolean isCancelled();
Attaches a LongConsumer to this FluxSink that will be notified of any request to this sink.

For push/pull sinks created using Flux.create(Consumer<? super FluxSink<Object>>) or Flux.create(Consumer<? super FluxSink<Object>>, OverflowStrategy), the consumer is invoked for every request to enable a hybrid backpressure-enabled push/pull model. When bridging with asynchronous listener-based APIs, the onRequest callback may be used to request more data from source if required and to manage backpressure by delivering data to sink only when requests are pending.

For push-only sinks created using Flux.push(Consumer<? super FluxSink<Object>>) or Flux.push(Consumer<? super FluxSink<Object>>, OverflowStrategy), the consumer is invoked with an initial request of Long.MAX_VALUE when this method is invoked.

Params:
  • consumer – the consumer to invoke on each request
Returns:FluxSink with a consumer that is notified of requests
/** * Attaches a {@link LongConsumer} to this {@link FluxSink} that will be notified of * any request to this sink. * <p> * For push/pull sinks created using {@link Flux#create(java.util.function.Consumer)} * or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)}, * the consumer * is invoked for every request to enable a hybrid backpressure-enabled push/pull model. * When bridging with asynchronous listener-based APIs, the {@code onRequest} callback * may be used to request more data from source if required and to manage backpressure * by delivering data to sink only when requests are pending. * <p> * For push-only sinks created using {@link Flux#push(java.util.function.Consumer)} * or {@link Flux#push(java.util.function.Consumer, FluxSink.OverflowStrategy)}, * the consumer is invoked with an initial request of {@code Long.MAX_VALUE} when this method * is invoked. * * @param consumer the consumer to invoke on each request * @return {@link FluxSink} with a consumer that is notified of requests */
FluxSink<T> onRequest(LongConsumer consumer);
Attach a Disposable as a callback for when this FluxSink is cancelled. This happens only when the downstream Subscription is cancelled.
Params:
See Also:
Returns:the FluxSink with a cancellation callback
/** * Attach a {@link Disposable} as a callback for when this {@link FluxSink} is * cancelled. This happens only when the downstream {@link Subscription} * is {@link Subscription#cancel() cancelled}. * * @param d the {@link Disposable} to use as a callback * @return the {@link FluxSink} with a cancellation callback * @see #onCancel(Disposable) onDispose(Disposable) for a callback that covers cancellation AND terminal signals */
FluxSink<T> onCancel(Disposable d);
Attach a Disposable as a callback for when this FluxSink is effectively disposed, that is it cannot be used anymore. This includes both having played terminal signals (onComplete, onError) and having been cancelled (see onCancel(Disposable)).

Note that the "dispose" term is used from the perspective of the sink. Not to be confused with Flux.subscribe()'s Disposable.dispose() method, which maps to disposing the Subscription (effectively, a Subscription.cancel() signal).

Params:
See Also:
Returns:the FluxSink with a callback invoked on any terminal signal or on cancellation
/** * Attach a {@link Disposable} as a callback for when this {@link FluxSink} is effectively * disposed, that is it cannot be used anymore. This includes both having played terminal * signals (onComplete, onError) and having been cancelled (see {@link #onCancel(Disposable)}). * <p> * Note that the "dispose" term is used from the perspective of the sink. Not to * be confused with {@link Flux#subscribe()}'s {@link Disposable#dispose()} method, which * maps to disposing the {@link Subscription} (effectively, a {@link Subscription#cancel()} * signal). * * @param d the {@link Disposable} to use as a callback * @return the {@link FluxSink} with a callback invoked on any terminal signal or on cancellation * @see #onCancel(Disposable) onCancel(Disposable) for a cancellation-only callback */
FluxSink<T> onDispose(Disposable d);
Enumeration for backpressure handling.
/** * Enumeration for backpressure handling. */
enum OverflowStrategy {
Completely ignore downstream backpressure requests.

This may yield IllegalStateException when queues get full downstream.

/** * Completely ignore downstream backpressure requests. * <p> * This may yield {@link IllegalStateException} when queues get full downstream. */
IGNORE,
Signal an IllegalStateException when the downstream can't keep up
/** * Signal an {@link IllegalStateException} when the downstream can't keep up */
ERROR,
Drop the incoming signal if the downstream is not ready to receive it.
/** * Drop the incoming signal if the downstream is not ready to receive it. */
DROP,
Downstream will get only the latest signals from upstream.
/** * Downstream will get only the latest signals from upstream. */
LATEST,
Buffer all signals if the downstream can't keep up.

Warning! This does unbounded buffering and may lead to OutOfMemoryError.

/** * Buffer all signals if the downstream can't keep up. * <p> * Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}. */
BUFFER } }