/*
 * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

import static reactor.core.publisher.FluxReplay.ReplaySubscriber.EMPTY;
import static reactor.core.publisher.FluxReplay.ReplaySubscriber.TERMINATED;

Replays all or the last N items to Subscribers.

Type parameters:
  • <T> – the value type
/** * Replays all or the last N items to Subscribers. * <p> * <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/emitterreplay.png" * alt=""> * <p> * * @param <T> the value type */
public final class ReplayProcessor<T> extends FluxProcessor<T, T> implements Fuseable {
Create a ReplayProcessor that caches the last element it has pushed, replaying it to late subscribers. This is a buffer-based ReplayProcessor with a history size of 1.

Type parameters:
  • <T> – the type of the pushed elements
Returns:a new ReplayProcessor that replays its last pushed element to each new Subscriber
/** * Create a {@link ReplayProcessor} that caches the last element it has pushed, * replaying it to late subscribers. This is a buffer-based ReplayProcessor with * a history size of 1. * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/replaylast.png" * alt=""> * * @param <T> the type of the pushed elements * * @return a new {@link ReplayProcessor} that replays its last pushed element to each new * {@link Subscriber} */
public static <T> ReplayProcessor<T> cacheLast() { return cacheLastOrDefault(null); }
Create a ReplayProcessor that caches the last element it has pushed, replaying it to late subscribers. If a Subscriber comes in before any value has been pushed, then the defaultValue is emitted instead. This is a buffer-based ReplayProcessor with a history size of 1.

Params:
  • value – a default value to start the sequence with in case nothing has been cached yet.
Type parameters:
  • <T> – the type of the pushed elements
Returns:a new ReplayProcessor that replays its last pushed element to each new Subscriber, or a default one if nothing was pushed yet
/** * Create a {@link ReplayProcessor} that caches the last element it has pushed, * replaying it to late subscribers. If a {@link Subscriber} comes in <b>before</b> * any value has been pushed, then the {@code defaultValue} is emitted instead. * This is a buffer-based ReplayProcessor with a history size of 1. * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/replaylastd.png" * alt=""> * * @param value a default value to start the sequence with in case nothing has been * cached yet. * @param <T> the type of the pushed elements * * @return a new {@link ReplayProcessor} that replays its last pushed element to each new * {@link Subscriber}, or a default one if nothing was pushed yet */
public static <T> ReplayProcessor<T> cacheLastOrDefault(@Nullable T value) { ReplayProcessor<T> b = create(1); if (value != null) { b.onNext(value); } return b; }
Create a new ReplayProcessor that replays an unbounded number of elements, using a default internal Queue.
Type parameters:
  • <E> – the type of the pushed elements
Returns:a new ReplayProcessor that replays the whole history to each new Subscriber.
/** * Create a new {@link ReplayProcessor} that replays an unbounded number of elements, * using a default internal {@link Queues#SMALL_BUFFER_SIZE Queue}. * * @param <E> the type of the pushed elements * * @return a new {@link ReplayProcessor} that replays the whole history to each new * {@link Subscriber}. */
public static <E> ReplayProcessor<E> create() { return create(Queues.SMALL_BUFFER_SIZE, true); }
Create a new ReplayProcessor that replays up to historySize elements.
Params:
  • historySize – the backlog size, ie. maximum items retained for replay.
Type parameters:
  • <E> – the type of the pushed elements
Returns:a new ReplayProcessor that replays a limited history to each new Subscriber.
/** * Create a new {@link ReplayProcessor} that replays up to {@code historySize} * elements. * * @param historySize the backlog size, ie. maximum items retained for replay. * @param <E> the type of the pushed elements * * @return a new {@link ReplayProcessor} that replays a limited history to each new * {@link Subscriber}. */
public static <E> ReplayProcessor<E> create(int historySize) { return create(historySize, false); }
Create a new ReplayProcessor that either replay all the elements or a limited amount of elements depending on the unbounded parameter.
Params:
  • historySize – maximum items retained if bounded, or initial link size if unbounded
  • unbounded – true if "unlimited" data store must be supplied
Type parameters:
  • <E> – the type of the pushed elements
Returns:a new ReplayProcessor that replays the whole history to each new Subscriber if configured as unbounded, a limited history otherwise.
/** * Create a new {@link ReplayProcessor} that either replay all the elements or a * limited amount of elements depending on the {@code unbounded} parameter. * * @param historySize maximum items retained if bounded, or initial link size if unbounded * @param unbounded true if "unlimited" data store must be supplied * @param <E> the type of the pushed elements * * @return a new {@link ReplayProcessor} that replays the whole history to each new * {@link Subscriber} if configured as unbounded, a limited history otherwise. */
public static <E> ReplayProcessor<E> create(int historySize, boolean unbounded) { FluxReplay.ReplayBuffer<E> buffer; if (unbounded) { buffer = new FluxReplay.UnboundedReplayBuffer<>(historySize); } else { buffer = new FluxReplay.SizeBoundReplayBuffer<>(historySize); } return new ReplayProcessor<>(buffer); }
Creates a time-bounded replay processor.

In this setting, the ReplayProcessor internally tags each observed item with a timestamp value supplied by the Schedulers.parallel() and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination signal, leaving the buffer empty.

Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.

If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

Note that terminal signals (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete signal arrives at T=10. If an subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onCompleted signal.

Params:
  • maxAge – the maximum age of the contained items
Type parameters:
  • <T> – the type of items observed and emitted by the Processor
Returns:a new ReplayProcessor that replays elements based on their age.
/** * Creates a time-bounded replay processor. * <p> * In this setting, the {@code ReplayProcessor} internally tags each observed item * with a timestamp value supplied by the {@link Schedulers#parallel()} and keeps only * those whose age is less than the supplied time value converted to milliseconds. For * example, an item arrives at T=0 and the max age is set to 5; at T&gt;=5 this first * item is then evicted by any subsequent item or termination signal, leaving the * buffer empty. * <p> * Once the processor is terminated, subscribers subscribing to it will receive items * that remained in the buffer after the terminal signal, regardless of their age. * <p> * If an subscriber subscribes while the {@code ReplayProcessor} is active, it will * observe only those items from within the buffer that have an age less than the * specified time, and each item observed thereafter, even if the buffer evicts items * due to the time constraint in the mean time. In other words, once an subscriber * subscribes, it observes items without gaps in the sequence except for any outdated * items at the beginning of the sequence. * <p> * Note that terminal signals ({@code onError} and {@code onComplete}) trigger * eviction as well. For example, with a max age of 5, the first item is observed at * T=0, then an {@code onComplete} signal arrives at T=10. If an subscriber subscribes * at T=11, it will find an empty {@code ReplayProcessor} with just an {@code * onCompleted} signal. * * @param <T> the type of items observed and emitted by the Processor * @param maxAge the maximum age of the contained items * * @return a new {@link ReplayProcessor} that replays elements based on their age. */
public static <T> ReplayProcessor<T> createTimeout(Duration maxAge) { return createTimeout(maxAge, Schedulers.parallel()); }
Creates a time-bounded replay processor.

In this setting, the ReplayProcessor internally tags each observed item with a timestamp value supplied by the Scheduler and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination signal, leaving the buffer empty.

Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.

If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

Note that terminal signals (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete signal arrives at T=10. If an subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onCompleted signal.

Params:
  • maxAge – the maximum age of the contained items
Type parameters:
  • <T> – the type of items observed and emitted by the Processor
Returns:a new ReplayProcessor that replays elements based on their age.
/** * Creates a time-bounded replay processor. * <p> * In this setting, the {@code ReplayProcessor} internally tags each observed item * with a timestamp value supplied by the {@link Scheduler} and keeps only * those whose age is less than the supplied time value converted to milliseconds. For * example, an item arrives at T=0 and the max age is set to 5; at T&gt;=5 this first * item is then evicted by any subsequent item or termination signal, leaving the * buffer empty. * <p> * Once the processor is terminated, subscribers subscribing to it will receive items * that remained in the buffer after the terminal signal, regardless of their age. * <p> * If an subscriber subscribes while the {@code ReplayProcessor} is active, it will * observe only those items from within the buffer that have an age less than the * specified time, and each item observed thereafter, even if the buffer evicts items * due to the time constraint in the mean time. In other words, once an subscriber * subscribes, it observes items without gaps in the sequence except for any outdated * items at the beginning of the sequence. * <p> * Note that terminal signals ({@code onError} and {@code onComplete}) trigger * eviction as well. For example, with a max age of 5, the first item is observed at * T=0, then an {@code onComplete} signal arrives at T=10. If an subscriber subscribes * at T=11, it will find an empty {@code ReplayProcessor} with just an {@code * onCompleted} signal. * * @param <T> the type of items observed and emitted by the Processor * @param maxAge the maximum age of the contained items * * @return a new {@link ReplayProcessor} that replays elements based on their age. */
public static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler) { return createSizeAndTimeout(Integer.MAX_VALUE, maxAge, scheduler); }
Creates a time- and size-bounded replay processor.

In this setting, the ReplayProcessor internally tags each received item with a timestamp value supplied by the Schedulers.parallel() and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

When subscribers subscribe to a terminated ReplayProcessor, they observe the items that remained in the buffer after the terminal signal, regardless of their age, but at most size items.

If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

Note that terminal signals (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete signal arrives at T=10. If an Subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onCompleted signal.

Params:
  • maxAge – the maximum age of the contained items
  • size – the maximum number of buffered items
Type parameters:
  • <T> – the type of items observed and emitted by the Processor
Returns:a new ReplayProcessor that replay up to size elements, but will evict them from its history based on their age.
/** * Creates a time- and size-bounded replay processor. * <p> * In this setting, the {@code ReplayProcessor} internally tags each received item * with a timestamp value supplied by the {@link Schedulers#parallel()} and holds at * most * {@code size} items in its internal buffer. It evicts items from the start of the * buffer if their age becomes less-than or equal to the supplied age in milliseconds * or the buffer reaches its {@code size} limit. * <p> * When subscribers subscribe to a terminated {@code ReplayProcessor}, they observe * the items that remained in the buffer after the terminal signal, regardless of * their age, but at most {@code size} items. * <p> * If an subscriber subscribes while the {@code ReplayProcessor} is active, it will * observe only those items from within the buffer that have age less than the * specified time and each subsequent item, even if the buffer evicts items due to the * time constraint in the mean time. In other words, once an subscriber subscribes, it * observes items without gaps in the sequence except for the outdated items at the * beginning of the sequence. * <p> * Note that terminal signals ({@code onError} and {@code onComplete}) trigger * eviction as well. For example, with a max age of 5, the first item is observed at * T=0, then an {@code onComplete} signal arrives at T=10. If an Subscriber subscribes * at T=11, it will find an empty {@code ReplayProcessor} with just an {@code * onCompleted} signal. * * @param <T> the type of items observed and emitted by the Processor * @param maxAge the maximum age of the contained items * @param size the maximum number of buffered items * * @return a new {@link ReplayProcessor} that replay up to {@code size} elements, but * will evict them from its history based on their age. */
public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge) { return createSizeAndTimeout(size, maxAge, Schedulers.parallel()); }
Creates a time- and size-bounded replay processor.

In this setting, the ReplayProcessor internally tags each received item with a timestamp value supplied by the Scheduler and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

When subscribers subscribe to a terminated ReplayProcessor, they observe the items that remained in the buffer after the terminal signal, regardless of their age, but at most size items.

If an subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

Note that terminal signals (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete signal arrives at T=10. If an Subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onCompleted signal.

Params:
  • maxAge – the maximum age of the contained items in milliseconds
  • size – the maximum number of buffered items
  • scheduler – the Scheduler that provides the current time
Type parameters:
  • <T> – the type of items observed and emitted by the Processor
Returns:a new ReplayProcessor that replay up to size elements, but will evict them from its history based on their age.
/** * Creates a time- and size-bounded replay processor. * <p> * In this setting, the {@code ReplayProcessor} internally tags each received item * with a timestamp value supplied by the {@link Scheduler} and holds at most * {@code size} items in its internal buffer. It evicts items from the start of the * buffer if their age becomes less-than or equal to the supplied age in milliseconds * or the buffer reaches its {@code size} limit. * <p> * When subscribers subscribe to a terminated {@code ReplayProcessor}, they observe * the items that remained in the buffer after the terminal signal, regardless of * their age, but at most {@code size} items. * <p> * If an subscriber subscribes while the {@code ReplayProcessor} is active, it will * observe only those items from within the buffer that have age less than the * specified time and each subsequent item, even if the buffer evicts items due to the * time constraint in the mean time. In other words, once an subscriber subscribes, it * observes items without gaps in the sequence except for the outdated items at the * beginning of the sequence. * <p> * Note that terminal signals ({@code onError} and {@code onComplete}) trigger * eviction as well. For example, with a max age of 5, the first item is observed at * T=0, then an {@code onComplete} signal arrives at T=10. If an Subscriber subscribes * at T=11, it will find an empty {@code ReplayProcessor} with just an {@code * onCompleted} signal. * * @param <T> the type of items observed and emitted by the Processor * @param maxAge the maximum age of the contained items in milliseconds * @param size the maximum number of buffered items * @param scheduler the {@link Scheduler} that provides the current time * * @return a new {@link ReplayProcessor} that replay up to {@code size} elements, but * will evict them from its history based on their age. */
public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); if (size <= 0) { throw new IllegalArgumentException("size > 0 required but it was " + size); } return new ReplayProcessor<>(new FluxReplay.SizeAndTimeBoundReplayBuffer<>(size, maxAge.toMillis(), scheduler)); } final FluxReplay.ReplayBuffer<T> buffer; Subscription subscription; volatile FluxReplay.ReplaySubscription<T>[] subscribers; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<ReplayProcessor, FluxReplay.ReplaySubscription[]> SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(ReplayProcessor.class, FluxReplay.ReplaySubscription[].class, "subscribers"); ReplayProcessor(FluxReplay.ReplayBuffer<T> buffer) { this.buffer = buffer; SUBSCRIBERS.lazySet(this, EMPTY); } @Override public void subscribe(CoreSubscriber<? super T> actual) { Objects.requireNonNull(actual, "subscribe"); FluxReplay.ReplaySubscription<T> rs = new ReplayInner<>(actual, this); actual.onSubscribe(rs); if (add(rs)) { if (rs.isCancelled()) { remove(rs); return; } } buffer.replay(rs); } @Override @Nullable public Throwable getError() { return buffer.getError(); } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT){ return subscription; } if (key == Attr.CAPACITY) return buffer.capacity(); return super.scanUnsafe(key); } @Override public Stream<? extends Scannable> inners() { return Stream.of(subscribers); } @Override public long downstreamCount() { return subscribers.length; } @Override public boolean isTerminated() { return buffer.isDone(); } boolean add(FluxReplay.ReplaySubscription<T> rs) { for (; ; ) { FluxReplay.ReplaySubscription<T>[] a = subscribers; if (a == TERMINATED) { return false; } int n = a.length; @SuppressWarnings("unchecked") FluxReplay.ReplaySubscription<T>[] b = new ReplayInner[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = rs; if (SUBSCRIBERS.compareAndSet(this, a, b)) { return true; } } } @SuppressWarnings("unchecked") void remove(FluxReplay.ReplaySubscription<T> rs) { outer: for (; ; ) { FluxReplay.ReplaySubscription<T>[] a = subscribers; if (a == TERMINATED || a == EMPTY) { return; } int n = a.length; for (int i = 0; i < n; i++) { if (a[i] == rs) { FluxReplay.ReplaySubscription<T>[] b; if (n == 1) { b = EMPTY; } else { b = new ReplayInner[n - 1]; System.arraycopy(a, 0, b, 0, i); System.arraycopy(a, i + 1, b, i, n - i - 1); } if (SUBSCRIBERS.compareAndSet(this, a, b)) { return; } continue outer; } } break; } } @Override public void onSubscribe(Subscription s) { if (buffer.isDone()) { s.cancel(); } else if (Operators.validate(subscription, s)) { subscription = s; s.request(Long.MAX_VALUE); } } @Override public Context currentContext() { return Operators.multiSubscribersContext(subscribers); } @Override public int getPrefetch() { return Integer.MAX_VALUE; } @Override public void onNext(T t) { FluxReplay.ReplayBuffer<T> b = buffer; if (b.isDone()) { Operators.onNextDropped(t, currentContext()); } else { b.add(t); for (FluxReplay.ReplaySubscription<T> rs : subscribers) { b.replay(rs); } } } @Override public void onError(Throwable t) { FluxReplay.ReplayBuffer<T> b = buffer; if (b.isDone()) { Operators.onErrorDroppedMulticast(t); } else { b.onError(t); @SuppressWarnings("unchecked") FluxReplay.ReplaySubscription<T>[] a = SUBSCRIBERS.getAndSet(this, TERMINATED); for (FluxReplay.ReplaySubscription<T> rs : a) { b.replay(rs); } } } @Override public void onComplete() { FluxReplay.ReplayBuffer<T> b = buffer; if (!b.isDone()) { b.onComplete(); @SuppressWarnings("unchecked") FluxReplay.ReplaySubscription<T>[] a = SUBSCRIBERS.getAndSet(this, TERMINATED); for (FluxReplay.ReplaySubscription<T> rs : a) { b.replay(rs); } } } static final class ReplayInner<T> implements FluxReplay.ReplaySubscription<T> { final CoreSubscriber<? super T> actual; final ReplayProcessor<T> parent; final FluxReplay.ReplayBuffer<T> buffer; int index; int tailIndex; Object node; volatile int wip; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater<ReplayInner> WIP = AtomicIntegerFieldUpdater.newUpdater(ReplayInner.class, "wip"); volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<ReplayInner> REQUESTED = AtomicLongFieldUpdater.newUpdater(ReplayInner.class, "requested"); int fusionMode; ReplayInner(CoreSubscriber<? super T> actual, ReplayProcessor<T> parent) { this.actual = actual; this.parent = parent; this.buffer = parent.buffer; } @Override public long requested() { return requested; } @Override public boolean isCancelled() { return requested == Long.MIN_VALUE; } @Override public CoreSubscriber<? super T> actual() { return actual; } @Override public int requestFusion(int requestedMode) { if ((requestedMode & ASYNC) != 0) { fusionMode = ASYNC; return ASYNC; } return NONE; } @Override @Nullable public T poll() { return buffer.poll(this); } @Override public void clear() { buffer.clear(this); } @Override public boolean isEmpty() { return buffer.isEmpty(this); } @Override public int size() { return buffer.size(this); } @Override public void request(long n) { if (Operators.validate(n)) { if (fusionMode() == NONE) { Operators.addCapCancellable(REQUESTED, this, n); } buffer.replay(this); } } @Override public void cancel() { if (REQUESTED.getAndSet(this, Long.MIN_VALUE) != Long.MIN_VALUE) { parent.remove(this); if (enter()) { node = null; } } } @Override public void node(@Nullable Object node) { this.node = node; } @Override public int fusionMode() { return fusionMode; } @Override @Nullable public Object node() { return node; } @Override public int index() { return index; } @Override public void index(int index) { this.index = index; } @Override public int tailIndex() { return tailIndex; } @Override public void tailIndex(int tailIndex) { this.tailIndex = tailIndex; } @Override public boolean enter() { return WIP.getAndIncrement(this) == 0; } @Override public int leave(int missed) { return WIP.addAndGet(this, -missed); } @Override public void produced(long n) { REQUESTED.addAndGet(this, -n); } } }