Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.processors; import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.Scheduler; import io.reactivex.annotations.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins;
Replays events to Subscribers.

The ReplayProcessor supports the following item retainment strategies:

The ReplayProcessor can be created in bounded and unbounded mode. It can be bounded by size (maximum number of elements retained at most) and/or time (maximum age of elements replayed).

Since a ReplayProcessor is a Reactive Streams Processor, nulls are not allowed (Rule 2.13) as parameters to onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the processor's state is not changed.

This ReplayProcessor respects the individual backpressure behavior of its Subscribers but does not coordinate their request amounts towards the upstream (because there might not be any) and consumes the upstream in an unbounded manner (requesting Long.MAX_VALUE). Note that Subscribers receive a continuous sequence of values after they subscribed even if an individual item gets delayed due to backpressure. Due to concurrency requirements, a size-bounded ReplayProcessor may hold strong references to more source emissions than specified.

When this ReplayProcessor is terminated via onError(Throwable) or onComplete(), late Subscribers will receive the retained/cached items first (if any) followed by the respective terminal event. If the ReplayProcessor has a time-bound, the age of the retained/cached items are still considered when replaying and thus it may result in no items being emitted before the terminal event.

Once an Subscriber has subscribed, it will receive items continuously from that point on. Bounds only affect how many past items a new Subscriber will receive before it catches up with the live event feed.

Even though ReplayProcessor implements the Subscriber interface, calling onSubscribe is not required (Rule 2.12) if the processor is used as a standalone source. However, calling onSubscribe after the ReplayProcessor reached its terminal state will result in the given Subscription being canceled immediately.

Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).

This ReplayProcessor supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasSubscribers() as well as means to read the retained/cached items in a non-blocking and thread-safe manner via hasValue(), getValue(), getValues() or getValues(Object[]).

Note that due to concurrency requirements, a size- and time-bounded ReplayProcessor may hold strong references to more source emissions than specified while it isn't terminated yet. Use the cleanupBuffer() to allow such inaccessible items to be cleaned up by GC once no consumer references them anymore.

Backpressure:
This ReplayProcessor respects the individual backpressure behavior of its Subscribers but does not coordinate their request amounts towards the upstream (because there might not be any) and consumes the upstream in an unbounded manner (requesting Long.MAX_VALUE). Note that Subscribers receive a continuous sequence of values after they subscribed even if an individual item gets delayed due to backpressure.
Scheduler:
ReplayProcessor does not operate by default on a particular Scheduler and the Subscribers get notified on the thread the respective onXXX methods were invoked. Time-bound ReplayProcessors use the given Scheduler in their create methods as time source to timestamp of items received for the age checks.
Error handling:
When the onError(Throwable) is called, the ReplayProcessor enters into a terminal state and emits the same Throwable instance to the last set of Subscribers. During this emission, if one or more Subscribers cancel their respective Subscriptions, the Throwable is delivered to the global error handler via RxJavaPlugins.onError(Throwable) (multiple times if multiple Subscribers cancel at once). If there were no Subscribers subscribed to this ReplayProcessor when the onError() was called, the global error handler is not invoked.

Example usage:

 
ReplayProcessor<Object> processor = new ReplayProcessor<T>();
processor.onNext("one");
processor.onNext("two");
processor.onNext("three");
processor.onComplete();
// both of the following will get the onNext/onComplete calls from above
processor.subscribe(subscriber1);
processor.subscribe(subscriber2);
 
Type parameters:
  • <T> – the value type
/** * Replays events to Subscribers. * <p> * The {@code ReplayProcessor} supports the following item retainment strategies: * <ul> * <li>{@link #create()} and {@link #create(int)}: retains and replays all events to current and * future {@code Subscriber}s. * <p> * <img width="640" height="269" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.u.png" alt=""> * <p> * <img width="640" height="345" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.ue.png" alt=""> * </li> * <li>{@link #createWithSize(int)}: retains at most the given number of items and replays only these * latest items to new {@code Subscriber}s. * <p> * <img width="640" height="332" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.n.png" alt=""> * </li> * <li>{@link #createWithTime(long, TimeUnit, Scheduler)}: retains items no older than the specified time * and replays them to new {@code Subscriber}s (which could mean all items age out). * <p> * <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.t.png" alt=""> * </li> * <li>{@link #createWithTimeAndSize(long, TimeUnit, Scheduler, int)}: retains no more than the given number of items * which are also no older than the specified time and replays them to new {@code Subscriber}s (which could mean all items age out). * <p> * <img width="640" height="404" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ReplayProcessor.nt.png" alt=""> * </li> * </ul> * <p> * The {@code ReplayProcessor} can be created in bounded and unbounded mode. It can be bounded by * size (maximum number of elements retained at most) and/or time (maximum age of elements replayed). * <p> * Since a {@code ReplayProcessor} is a Reactive Streams {@code Processor}, * {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a * {@link NullPointerException} being thrown and the processor's state is not changed. * <p> * This {@code ReplayProcessor} respects the individual backpressure behavior of its {@code Subscriber}s but * does not coordinate their request amounts towards the upstream (because there might not be any) and * consumes the upstream in an unbounded manner (requesting {@code Long.MAX_VALUE}). * Note that {@code Subscriber}s receive a continuous sequence of values after they subscribed even * if an individual item gets delayed due to backpressure. * Due to concurrency requirements, a size-bounded {@code ReplayProcessor} may hold strong references to more source * emissions than specified. * <p> * When this {@code ReplayProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, * late {@link Subscriber}s will receive the retained/cached items first (if any) followed by the respective * terminal event. If the {@code ReplayProcessor} has a time-bound, the age of the retained/cached items are still considered * when replaying and thus it may result in no items being emitted before the terminal event. * <p> * Once an {@code Subscriber} has subscribed, it will receive items continuously from that point on. Bounds only affect how * many past items a new {@code Subscriber} will receive before it catches up with the live event feed. * <p> * Even though {@code ReplayProcessor} implements the {@code Subscriber} interface, calling * {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>) * if the processor is used as a standalone source. However, calling {@code onSubscribe} * after the {@code ReplayProcessor} reached its terminal state will result in the * given {@code Subscription} being canceled immediately. * <p> * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively). * <p> * This {@code ReplayProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, * {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the retained/cached items * in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()}, * {@link #getValues()} or {@link #getValues(Object[])}. * <p> * Note that due to concurrency requirements, a size- and time-bounded {@code ReplayProcessor} may hold strong references to more * source emissions than specified while it isn't terminated yet. Use the {@link #cleanupBuffer()} to allow * such inaccessible items to be cleaned up by GC once no consumer references them anymore. * <dl> * <dt><b>Backpressure:</b></dt> * <dd>This {@code ReplayProcessor} respects the individual backpressure behavior of its {@code Subscriber}s but * does not coordinate their request amounts towards the upstream (because there might not be any) and * consumes the upstream in an unbounded manner (requesting {@code Long.MAX_VALUE}). * Note that {@code Subscriber}s receive a continuous sequence of values after they subscribed even * if an individual item gets delayed due to backpressure.</dd> * <dt><b>Scheduler:</b></dt> * <dd>{@code ReplayProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and * the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked. * Time-bound {@code ReplayProcessor}s use the given {@code Scheduler} in their {@code create} methods * as time source to timestamp of items received for the age checks.</dd> * <dt><b>Error handling:</b></dt> * <dd>When the {@link #onError(Throwable)} is called, the {@code ReplayProcessor} enters into a terminal state * and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission, * if one or more {@code Subscriber}s cancel their respective {@code Subscription}s, the * {@code Throwable} is delivered to the global error handler via * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s * cancel at once). * If there were no {@code Subscriber}s subscribed to this {@code ReplayProcessor} when the {@code onError()} * was called, the global error handler is not invoked. * </dd> * </dl> * <p> * Example usage: * <pre> {@code ReplayProcessor<Object> processor = new ReplayProcessor<T>(); processor.onNext("one"); processor.onNext("two"); processor.onNext("three"); processor.onComplete(); // both of the following will get the onNext/onComplete calls from above processor.subscribe(subscriber1); processor.subscribe(subscriber2); } </pre> * * @param <T> the value type */
public final class ReplayProcessor<T> extends FlowableProcessor<T> {
An empty array to avoid allocation in getValues().
/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0]; final ReplayBuffer<T> buffer; boolean done; final AtomicReference<ReplaySubscription<T>[]> subscribers; @SuppressWarnings("rawtypes") static final ReplaySubscription[] EMPTY = new ReplaySubscription[0]; @SuppressWarnings("rawtypes") static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0];
Creates an unbounded ReplayProcessor.

The internal buffer is backed by an ArrayList and starts with an initial capacity of 16. Once the number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the number of items grows, this causes frequent array reallocation and copying, and may hurt performance and latency. This can be avoided with the create(int) overload which takes an initial capacity parameter and can be tuned to reduce the array reallocation frequency as needed.

Type parameters:
  • <T> – the type of items observed and emitted by the ReplayProcessor
Returns:the created ReplayProcessor
/** * Creates an unbounded ReplayProcessor. * <p> * The internal buffer is backed by an {@link ArrayList} and starts with an initial capacity of 16. Once the * number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the * number of items grows, this causes frequent array reallocation and copying, and may hurt performance * and latency. This can be avoided with the {@link #create(int)} overload which takes an initial capacity * parameter and can be tuned to reduce the array reallocation frequency as needed. * * @param <T> * the type of items observed and emitted by the ReplayProcessor * @return the created ReplayProcessor */
@CheckReturnValue @NonNull public static <T> ReplayProcessor<T> create() { return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(16)); }
Creates an unbounded ReplayProcessor with the specified initial buffer capacity.

Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new items. For example, if you know that the buffer will hold 32k items, you can ask the ReplayProcessor to preallocate its internal array with a capacity to hold that many items. Once the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead due to frequent array-copying.

Params:
  • capacityHint – the initial buffer capacity
Type parameters:
  • <T> – the type of items observed and emitted by this type of processor
Returns:the created processor
/** * Creates an unbounded ReplayProcessor with the specified initial buffer capacity. * <p> * Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new * items. For example, if you know that the buffer will hold 32k items, you can ask the * {@code ReplayProcessor} to preallocate its internal array with a capacity to hold that many items. Once * the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead * due to frequent array-copying. * * @param <T> * the type of items observed and emitted by this type of processor * @param capacityHint * the initial buffer capacity * @return the created processor */
@CheckReturnValue @NonNull public static <T> ReplayProcessor<T> create(int capacityHint) { return new ReplayProcessor<T>(new UnboundedReplayBuffer<T>(capacityHint)); }
Creates a size-bounded ReplayProcessor.

In this setting, the ReplayProcessor holds at most size items in its internal buffer and discards the oldest item.

When Subscribers subscribe to a terminated ReplayProcessor, they are guaranteed to see at most size onNext events followed by a termination event.

If a Subscriber subscribes while the ReplayProcessor is active, it will observe all items in the buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to the size constraint in the mean time. In other words, once a Subscriber subscribes, it will receive items without gaps in the sequence.

Params:
  • maxSize – the maximum number of buffered items
Type parameters:
  • <T> – the type of items observed and emitted by this type of processor
Returns:the created processor
/** * Creates a size-bounded ReplayProcessor. * <p> * In this setting, the {@code ReplayProcessor} holds at most {@code size} items in its internal buffer and * discards the oldest item. * <p> * When {@code Subscriber}s subscribe to a terminated {@code ReplayProcessor}, they are guaranteed to see at most * {@code size} {@code onNext} events followed by a termination event. * <p> * If a {@code Subscriber} subscribes while the {@code ReplayProcessor} is active, it will observe all items in the * buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to * the size constraint in the mean time. In other words, once a {@code Subscriber} subscribes, it will receive items * without gaps in the sequence. * * @param <T> * the type of items observed and emitted by this type of processor * @param maxSize * the maximum number of buffered items * @return the created processor */
@CheckReturnValue @NonNull public static <T> ReplayProcessor<T> createWithSize(int maxSize) { return new ReplayProcessor<T>(new SizeBoundReplayBuffer<T>(maxSize)); }
Creates an unbounded ReplayProcessor with the bounded-implementation for testing purposes.

This variant behaves like the regular unbounded ReplayProcessor created via create() but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unbounded ReplayProcessor due to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.

Type parameters:
  • <T> – the type of items observed and emitted by this type of processor
Returns:the created processor
/** * Creates an unbounded ReplayProcessor with the bounded-implementation for testing purposes. * <p> * This variant behaves like the regular unbounded {@code ReplayProcessor} created via {@link #create()} but * uses the structures of the bounded-implementation. This is by no means intended for the replacement of * the original, array-backed and unbounded {@code ReplayProcessor} due to the additional overhead of the * linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior * of the bounded implementations without the interference of the eviction policies. * * @param <T> * the type of items observed and emitted by this type of processor * @return the created processor */
/* test */ static <T> ReplayProcessor<T> createUnbounded() { return new ReplayProcessor<T>(new SizeBoundReplayBuffer<T>(Integer.MAX_VALUE)); }
Creates a time-bounded ReplayProcessor.

In this setting, the ReplayProcessor internally tags each observed item with a timestamp value supplied by the Scheduler and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.

Once the processor is terminated, Subscribers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.

If a Subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once a Subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

Note that terminal notifications (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete notification arrives at T=10. If a Subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onComplete notification.

Params:
  • maxAge – the maximum age of the contained items
  • unit – the time unit of time
  • scheduler – the Scheduler that provides the current time
Type parameters:
  • <T> – the type of items observed and emitted by this type of processor
Returns:the created processor
/** * Creates a time-bounded ReplayProcessor. * <p> * In this setting, the {@code ReplayProcessor} internally tags each observed item with a timestamp value * supplied by the {@link Scheduler} and keeps only those whose age is less than the supplied time value * converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T&gt;=5 * this first item is then evicted by any subsequent item or termination event, leaving the buffer empty. * <p> * Once the processor is terminated, {@code Subscriber}s subscribing to it will receive items that remained in the * buffer after the terminal event, regardless of their age. * <p> * If a {@code Subscriber} subscribes while the {@code ReplayProcessor} is active, it will observe only those items * from within the buffer that have an age less than the specified time, and each item observed thereafter, * even if the buffer evicts items due to the time constraint in the mean time. In other words, once a * {@code Subscriber} subscribes, it observes items without gaps in the sequence except for any outdated items at the * beginning of the sequence. * <p> * Note that terminal notifications ({@code onError} and {@code onComplete}) trigger eviction as well. For * example, with a max age of 5, the first item is observed at T=0, then an {@code onComplete} notification * arrives at T=10. If a {@code Subscriber} subscribes at T=11, it will find an empty {@code ReplayProcessor} with just * an {@code onComplete} notification. * * @param <T> * the type of items observed and emitted by this type of processor * @param maxAge * the maximum age of the contained items * @param unit * the time unit of {@code time} * @param scheduler * the {@link Scheduler} that provides the current time * @return the created processor */
@CheckReturnValue @NonNull public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler) { return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(Integer.MAX_VALUE, maxAge, unit, scheduler)); }
Creates a time- and size-bounded ReplayProcessor.

In this setting, the ReplayProcessor internally tags each received item with a timestamp value supplied by the Scheduler and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

When Subscribers subscribe to a terminated ReplayProcessor, they observe the items that remained in the buffer after the terminal notification, regardless of their age, but at most size items.

If a Subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once a Subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

Note that terminal notifications (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete notification arrives at T=10. If a Subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onComplete notification.

Params:
  • maxAge – the maximum age of the contained items
  • unit – the time unit of time
  • maxSize – the maximum number of buffered items
  • scheduler – the Scheduler that provides the current time
Type parameters:
  • <T> – the type of items observed and emitted by this type of processor
Returns:the created processor
/** * Creates a time- and size-bounded ReplayProcessor. * <p> * In this setting, the {@code ReplayProcessor} internally tags each received item with a timestamp value * supplied by the {@link Scheduler} and holds at most {@code size} items in its internal buffer. It evicts * items from the start of the buffer if their age becomes less-than or equal to the supplied age in * milliseconds or the buffer reaches its {@code size} limit. * <p> * When {@code Subscriber}s subscribe to a terminated {@code ReplayProcessor}, they observe the items that remained in * the buffer after the terminal notification, regardless of their age, but at most {@code size} items. * <p> * If a {@code Subscriber} subscribes while the {@code ReplayProcessor} is active, it will observe only those items * from within the buffer that have age less than the specified time and each subsequent item, even if the * buffer evicts items due to the time constraint in the mean time. In other words, once a {@code Subscriber} * subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning * of the sequence. * <p> * Note that terminal notifications ({@code onError} and {@code onComplete}) trigger eviction as well. For * example, with a max age of 5, the first item is observed at T=0, then an {@code onComplete} notification * arrives at T=10. If a {@code Subscriber} subscribes at T=11, it will find an empty {@code ReplayProcessor} with just * an {@code onComplete} notification. * * @param <T> * the type of items observed and emitted by this type of processor * @param maxAge * the maximum age of the contained items * @param unit * the time unit of {@code time} * @param maxSize * the maximum number of buffered items * @param scheduler * the {@link Scheduler} that provides the current time * @return the created processor */
@CheckReturnValue @NonNull public static <T> ReplayProcessor<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize) { return new ReplayProcessor<T>(new SizeAndTimeBoundReplayBuffer<T>(maxSize, maxAge, unit, scheduler)); }
Constructs a ReplayProcessor with the given custom ReplayBuffer instance.
Params:
  • buffer – the ReplayBuffer instance, not null (not verified)
/** * Constructs a ReplayProcessor with the given custom ReplayBuffer instance. * @param buffer the ReplayBuffer instance, not null (not verified) */
@SuppressWarnings("unchecked") ReplayProcessor(ReplayBuffer<T> buffer) { this.buffer = buffer; this.subscribers = new AtomicReference<ReplaySubscription<T>[]>(EMPTY); } @Override protected void subscribeActual(Subscriber<? super T> s) { ReplaySubscription<T> rs = new ReplaySubscription<T>(s, this); s.onSubscribe(rs); if (add(rs)) { if (rs.cancelled) { remove(rs); return; } } buffer.replay(rs); } @Override public void onSubscribe(Subscription s) { if (done) { s.cancel(); return; } s.request(Long.MAX_VALUE); } @Override public void onNext(T t) { ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); if (done) { return; } ReplayBuffer<T> b = buffer; b.next(t); for (ReplaySubscription<T> rs : subscribers.get()) { b.replay(rs); } } @SuppressWarnings("unchecked") @Override public void onError(Throwable t) { ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); if (done) { RxJavaPlugins.onError(t); return; } done = true; ReplayBuffer<T> b = buffer; b.error(t); for (ReplaySubscription<T> rs : subscribers.getAndSet(TERMINATED)) { b.replay(rs); } } @SuppressWarnings("unchecked") @Override public void onComplete() { if (done) { return; } done = true; ReplayBuffer<T> b = buffer; b.complete(); for (ReplaySubscription<T> rs : subscribers.getAndSet(TERMINATED)) { b.replay(rs); } } @Override public boolean hasSubscribers() { return subscribers.get().length != 0; } /* test */ int subscriberCount() { return subscribers.get().length; } @Override @Nullable public Throwable getThrowable() { ReplayBuffer<T> b = buffer; if (b.isDone()) { return b.getError(); } return null; }
Makes sure the item cached by the head node in a bounded ReplayProcessor is released (as it is never part of a replay).

By default, live bounded buffers will remember one item before the currently receivable one to ensure subscribers can always receive a continuous sequence of items. A terminated ReplayProcessor automatically releases this inaccessible item.

The method must be called sequentially, similar to the standard onXXX methods.

History: 2.1.11 - experimental

Since:2.2
/** * Makes sure the item cached by the head node in a bounded * ReplayProcessor is released (as it is never part of a replay). * <p> * By default, live bounded buffers will remember one item before * the currently receivable one to ensure subscribers can always * receive a continuous sequence of items. A terminated ReplayProcessor * automatically releases this inaccessible item. * <p> * The method must be called sequentially, similar to the standard * {@code onXXX} methods. * <p>History: 2.1.11 - experimental * @since 2.2 */
public void cleanupBuffer() { buffer.trimHead(); }
Returns the latest value this processor has or null if no such value exists.

The method is thread-safe.

Returns:the latest value this processor currently has or null if no such value exists
/** * Returns the latest value this processor has or null if no such value exists. * <p>The method is thread-safe. * @return the latest value this processor currently has or null if no such value exists */
public T getValue() { return buffer.getValue(); }
Returns an Object array containing snapshot all values of this processor.

The method is thread-safe.

Returns:the array containing the snapshot of all values of this processor
/** * Returns an Object array containing snapshot all values of this processor. * <p>The method is thread-safe. * @return the array containing the snapshot of all values of this processor */
public Object[] getValues() { @SuppressWarnings("unchecked") T[] a = (T[])EMPTY_ARRAY; T[] b = getValues(a); if (b == EMPTY_ARRAY) { return new Object[0]; } return b; }
Returns a typed array containing a snapshot of all values of this processor.

The method follows the conventions of Collection.toArray by setting the array element after the last value to null (if the capacity permits).

The method is thread-safe.

Params:
  • array – the target array to copy values into if it fits
Returns:the given array if the values fit into it or a new array containing all values
/** * Returns a typed array containing a snapshot of all values of this processor. * <p>The method follows the conventions of Collection.toArray by setting the array element * after the last value to null (if the capacity permits). * <p>The method is thread-safe. * @param array the target array to copy values into if it fits * @return the given array if the values fit into it or a new array containing all values */
public T[] getValues(T[] array) { return buffer.getValues(array); } @Override public boolean hasComplete() { ReplayBuffer<T> b = buffer; return b.isDone() && b.getError() == null; } @Override public boolean hasThrowable() { ReplayBuffer<T> b = buffer; return b.isDone() && b.getError() != null; }
Returns true if this processor has any value.

The method is thread-safe.

Returns:true if the processor has any value
/** * Returns true if this processor has any value. * <p>The method is thread-safe. * @return true if the processor has any value */
public boolean hasValue() { return buffer.size() != 0; // NOPMD } /* test*/ int size() { return buffer.size(); } boolean add(ReplaySubscription<T> rs) { for (;;) { ReplaySubscription<T>[] a = subscribers.get(); if (a == TERMINATED) { return false; } int len = a.length; @SuppressWarnings("unchecked") ReplaySubscription<T>[] b = new ReplaySubscription[len + 1]; System.arraycopy(a, 0, b, 0, len); b[len] = rs; if (subscribers.compareAndSet(a, b)) { return true; } } } @SuppressWarnings("unchecked") void remove(ReplaySubscription<T> rs) { for (;;) { ReplaySubscription<T>[] a = subscribers.get(); if (a == TERMINATED || a == EMPTY) { return; } int len = a.length; int j = -1; for (int i = 0; i < len; i++) { if (a[i] == rs) { j = i; break; } } if (j < 0) { return; } ReplaySubscription<T>[] b; if (len == 1) { b = EMPTY; } else { b = new ReplaySubscription[len - 1]; System.arraycopy(a, 0, b, 0, j); System.arraycopy(a, j + 1, b, j, len - j - 1); } if (subscribers.compareAndSet(a, b)) { return; } } }
Abstraction over a buffer that receives events and replays them to individual Subscribers.
Type parameters:
  • <T> – the value type
/** * Abstraction over a buffer that receives events and replays them to * individual Subscribers. * * @param <T> the value type */
interface ReplayBuffer<T> { void next(T value); void error(Throwable ex); void complete(); void replay(ReplaySubscription<T> rs); int size(); @Nullable T getValue(); T[] getValues(T[] array); boolean isDone(); Throwable getError();
Make sure an old inaccessible head value is released in a bounded buffer.
/** * Make sure an old inaccessible head value is released * in a bounded buffer. */
void trimHead(); } static final class ReplaySubscription<T> extends AtomicInteger implements Subscription { private static final long serialVersionUID = 466549804534799122L; final Subscriber<? super T> downstream; final ReplayProcessor<T> state; Object index; final AtomicLong requested; volatile boolean cancelled; long emitted; ReplaySubscription(Subscriber<? super T> actual, ReplayProcessor<T> state) { this.downstream = actual; this.state = state; this.requested = new AtomicLong(); } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); state.buffer.replay(this); } } @Override public void cancel() { if (!cancelled) { cancelled = true; state.remove(this); } } } static final class UnboundedReplayBuffer<T> implements ReplayBuffer<T> { final List<T> buffer; Throwable error; volatile boolean done; volatile int size; UnboundedReplayBuffer(int capacityHint) { this.buffer = new ArrayList<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); } @Override public void next(T value) { buffer.add(value); size++; } @Override public void error(Throwable ex) { error = ex; done = true; } @Override public void complete() { done = true; } @Override public void trimHead() { // not applicable for an unbounded buffer } @Override @Nullable public T getValue() { int s = size; if (s == 0) { return null; } return buffer.get(s - 1); } @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { int s = size; if (s == 0) { if (array.length != 0) { array[0] = null; } return array; } List<T> b = buffer; if (array.length < s) { array = (T[])Array.newInstance(array.getClass().getComponentType(), s); } for (int i = 0; i < s; i++) { array[i] = b.get(i); } if (array.length > s) { array[s] = null; } return array; } @Override public void replay(ReplaySubscription<T> rs) { if (rs.getAndIncrement() != 0) { return; } int missed = 1; final List<T> b = buffer; final Subscriber<? super T> a = rs.downstream; Integer indexObject = (Integer)rs.index; int index; if (indexObject != null) { index = indexObject; } else { index = 0; rs.index = 0; } long e = rs.emitted; for (;;) { long r = rs.requested.get(); while (e != r) { if (rs.cancelled) { rs.index = null; return; } boolean d = done; int s = size; if (d && index == s) { rs.index = null; rs.cancelled = true; Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } if (index == s) { break; } a.onNext(b.get(index)); index++; e++; } if (e == r) { if (rs.cancelled) { rs.index = null; return; } boolean d = done; int s = size; if (d && index == s) { rs.index = null; rs.cancelled = true; Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } } rs.index = index; rs.emitted = e; missed = rs.addAndGet(-missed); if (missed == 0) { break; } } } @Override public int size() { return size; } @Override public boolean isDone() { return done; } @Override public Throwable getError() { return error; } } static final class Node<T> extends AtomicReference<Node<T>> { private static final long serialVersionUID = 6404226426336033100L; final T value; Node(T value) { this.value = value; } } static final class TimedNode<T> extends AtomicReference<TimedNode<T>> { private static final long serialVersionUID = 6404226426336033100L; final T value; final long time; TimedNode(T value, long time) { this.value = value; this.time = time; } } static final class SizeBoundReplayBuffer<T> implements ReplayBuffer<T> { final int maxSize; int size; volatile Node<T> head; Node<T> tail; Throwable error; volatile boolean done; SizeBoundReplayBuffer(int maxSize) { this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); Node<T> h = new Node<T>(null); this.tail = h; this.head = h; } void trim() { if (size > maxSize) { size--; Node<T> h = head; head = h.get(); } } @Override public void next(T value) { Node<T> n = new Node<T>(value); Node<T> t = tail; tail = n; size++; t.set(n); // releases both the tail and size trim(); } @Override public void error(Throwable ex) { error = ex; trimHead(); done = true; } @Override public void complete() { trimHead(); done = true; } @Override public void trimHead() { if (head.value != null) { Node<T> n = new Node<T>(null); n.lazySet(head.get()); head = n; } } @Override public boolean isDone() { return done; } @Override public Throwable getError() { return error; } @Override public T getValue() { Node<T> h = head; for (;;) { Node<T> n = h.get(); if (n == null) { return h.value; } h = n; } } @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { int s = 0; Node<T> h = head; Node<T> h0 = h; for (;;) { Node<T> next = h0.get(); if (next == null) { break; } s++; h0 = next; } if (array.length < s) { array = (T[])Array.newInstance(array.getClass().getComponentType(), s); } for (int j = 0; j < s; j++) { h = h.get(); array[j] = h.value; } if (array.length > s) { array[s] = null; } return array; } @Override @SuppressWarnings("unchecked") public void replay(ReplaySubscription<T> rs) { if (rs.getAndIncrement() != 0) { return; } int missed = 1; final Subscriber<? super T> a = rs.downstream; Node<T> index = (Node<T>)rs.index; if (index == null) { index = head; } long e = rs.emitted; for (;;) { long r = rs.requested.get(); while (e != r) { if (rs.cancelled) { rs.index = null; return; } boolean d = done; Node<T> next = index.get(); boolean empty = next == null; if (d && empty) { rs.index = null; rs.cancelled = true; Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } if (empty) { break; } a.onNext(next.value); e++; index = next; } if (e == r) { if (rs.cancelled) { rs.index = null; return; } boolean d = done; if (d && index.get() == null) { rs.index = null; rs.cancelled = true; Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } } rs.index = index; rs.emitted = e; missed = rs.addAndGet(-missed); if (missed == 0) { break; } } } @Override public int size() { int s = 0; Node<T> h = head; while (s != Integer.MAX_VALUE) { Node<T> next = h.get(); if (next == null) { break; } s++; h = next; } return s; } } static final class SizeAndTimeBoundReplayBuffer<T> implements ReplayBuffer<T> { final int maxSize; final long maxAge; final TimeUnit unit; final Scheduler scheduler; int size; volatile TimedNode<T> head; TimedNode<T> tail; Throwable error; volatile boolean done; SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) { this.maxSize = ObjectHelper.verifyPositive(maxSize, "maxSize"); this.maxAge = ObjectHelper.verifyPositive(maxAge, "maxAge"); this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); this.scheduler = ObjectHelper.requireNonNull(scheduler, "scheduler is null"); TimedNode<T> h = new TimedNode<T>(null, 0L); this.tail = h; this.head = h; } void trim() { if (size > maxSize) { size--; TimedNode<T> h = head; head = h.get(); } long limit = scheduler.now(unit) - maxAge; TimedNode<T> h = head; for (;;) { if (size <= 1) { head = h; break; } TimedNode<T> next = h.get(); if (next == null) { head = h; break; } if (next.time > limit) { head = h; break; } h = next; size--; } } void trimFinal() { long limit = scheduler.now(unit) - maxAge; TimedNode<T> h = head; for (;;) { TimedNode<T> next = h.get(); if (next == null) { if (h.value != null) { head = new TimedNode<T>(null, 0L); } else { head = h; } break; } if (next.time > limit) { if (h.value != null) { TimedNode<T> n = new TimedNode<T>(null, 0L); n.lazySet(h.get()); head = n; } else { head = h; } break; } h = next; } } @Override public void trimHead() { if (head.value != null) { TimedNode<T> n = new TimedNode<T>(null, 0L); n.lazySet(head.get()); head = n; } } @Override public void next(T value) { TimedNode<T> n = new TimedNode<T>(value, scheduler.now(unit)); TimedNode<T> t = tail; tail = n; size++; t.set(n); // releases both the tail and size trim(); } @Override public void error(Throwable ex) { trimFinal(); error = ex; done = true; } @Override public void complete() { trimFinal(); done = true; } @Override @Nullable public T getValue() { TimedNode<T> h = head; for (;;) { TimedNode<T> next = h.get(); if (next == null) { break; } h = next; } long limit = scheduler.now(unit) - maxAge; if (h.time < limit) { return null; } return h.value; } @Override @SuppressWarnings("unchecked") public T[] getValues(T[] array) { TimedNode<T> h = getHead(); int s = size(h); if (s == 0) { if (array.length != 0) { array[0] = null; } } else { if (array.length < s) { array = (T[])Array.newInstance(array.getClass().getComponentType(), s); } int i = 0; while (i != s) { TimedNode<T> next = h.get(); array[i] = next.value; i++; h = next; } if (array.length > s) { array[s] = null; } } return array; } TimedNode<T> getHead() { TimedNode<T> index = head; // skip old entries long limit = scheduler.now(unit) - maxAge; TimedNode<T> next = index.get(); while (next != null) { long ts = next.time; if (ts > limit) { break; } index = next; next = index.get(); } return index; } @Override @SuppressWarnings("unchecked") public void replay(ReplaySubscription<T> rs) { if (rs.getAndIncrement() != 0) { return; } int missed = 1; final Subscriber<? super T> a = rs.downstream; TimedNode<T> index = (TimedNode<T>)rs.index; if (index == null) { index = getHead(); } long e = rs.emitted; for (;;) { long r = rs.requested.get(); while (e != r) { if (rs.cancelled) { rs.index = null; return; } boolean d = done; TimedNode<T> next = index.get(); boolean empty = next == null; if (d && empty) { rs.index = null; rs.cancelled = true; Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } if (empty) { break; } a.onNext(next.value); e++; index = next; } if (e == r) { if (rs.cancelled) { rs.index = null; return; } boolean d = done; if (d && index.get() == null) { rs.index = null; rs.cancelled = true; Throwable ex = error; if (ex == null) { a.onComplete(); } else { a.onError(ex); } return; } } rs.index = index; rs.emitted = e; missed = rs.addAndGet(-missed); if (missed == 0) { break; } } } @Override public int size() { return size(getHead()); } int size(TimedNode<T> h) { int s = 0; while (s != Integer.MAX_VALUE) { TimedNode<T> next = h.get(); if (next == null) { break; } s++; h = next; } return s; } @Override public Throwable getError() { return error; } @Override public boolean isDone() { return done; } } }