/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
Dispatches onNext, onError and onComplete signals to zero-to-many Subscribers.
Please note, that along with multiple consumers, current implementation of
DirectProcessor supports multiple producers. However, all producers must produce
messages on the same Thread, otherwise
Reactive Streams Spec contract is
violated.
Note: DirectProcessor does not coordinate backpressure between its
Subscribers and the upstream, but consumes its upstream in an
unbounded manner.
In the case where a downstream Subscriber is not ready to receive items (hasn't
requested yet or enough), it will be terminated with an
IllegalStateException
.
Hence in terms of interaction model, DirectProcessor only supports PUSH from the
source through the processor to the Subscribers.
Note: If there are no Subscribers, upstream items are dropped and only
the terminal events are retained. A terminated DirectProcessor will emit the
terminal signal to late subscribers.
Note: The implementation ignores Subscriptions set via onSubscribe.
Type parameters: - <T> – the input and output value type
/**
* Dispatches onNext, onError and onComplete signals to zero-to-many Subscribers.
* Please note, that along with multiple consumers, current implementation of
* DirectProcessor supports multiple producers. However, all producers must produce
* messages on the same Thread, otherwise
* <a href="http://www.reactive-streams.org/">Reactive Streams Spec</a> contract is
* violated.
* <p>
* <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.2.0.M2/src/docs/marble/directprocessornormal.png" alt="">
* </p>
*
* </br>
* </br>
*
* <p>
* <b>Note: </b> DirectProcessor does not coordinate backpressure between its
* Subscribers and the upstream, but consumes its upstream in an
* unbounded manner.
* In the case where a downstream Subscriber is not ready to receive items (hasn't
* requested yet or enough), it will be terminated with an
* <i>{@link IllegalStateException}</i>.
* Hence in terms of interaction model, DirectProcessor only supports PUSH from the
* source through the processor to the Subscribers.
*
* <p>
* <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.2.0.M2/src/docs/marble/directprocessorerror.png" alt="">
* </p>
* </p>
*
* </br>
* </br>
*
* <p>
* <b>Note: </b> If there are no Subscribers, upstream items are dropped and only
* the terminal events are retained. A terminated DirectProcessor will emit the
* terminal signal to late subscribers.
*
* <p>
* <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.2.0.M2/src/docs/marble/directprocessorterminal.png" alt="">
* </p>
* </p>
*
* </br>
* </br>
*
* <p>
* <b>Note: </b> The implementation ignores Subscriptions set via onSubscribe.
* </p>
*
* @param <T> the input and output value type
*/
public final class DirectProcessor<T> extends FluxProcessor<T, T> {
Create a new DirectProcessor
Type parameters: - <E> – Type of processed signals
Returns: a fresh processor
/**
* Create a new {@link DirectProcessor}
*
* @param <E> Type of processed signals
*
* @return a fresh processor
*/
public static <E> DirectProcessor<E> create() {
return new DirectProcessor<>();
}
@SuppressWarnings("rawtypes")
private static final DirectInner[] EMPTY = new DirectInner[0];
@SuppressWarnings("rawtypes")
private static final DirectInner[] TERMINATED = new DirectInner[0];
@SuppressWarnings("unchecked")
private volatile DirectInner<T>[] subscribers = EMPTY;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DirectProcessor, DirectInner[]>
SUBSCRIBERS =
AtomicReferenceFieldUpdater.newUpdater(DirectProcessor.class,
DirectInner[].class,
"subscribers");
Throwable error;
DirectProcessor() {
}
@Override
public int getPrefetch() {
return Integer.MAX_VALUE;
}
@Override
public void onSubscribe(Subscription s) {
Objects.requireNonNull(s, "s");
if (subscribers != TERMINATED) {
s.request(Long.MAX_VALUE);
}
else {
s.cancel();
}
}
@Override
public void onNext(T t) {
Objects.requireNonNull(t, "t");
DirectInner<T>[] inners = subscribers;
if (inners == TERMINATED) {
Operators.onNextDropped(t, currentContext());
return;
}
for (DirectInner<T> s : inners) {
s.onNext(t);
}
}
@Override
public void onError(Throwable t) {
Objects.requireNonNull(t, "t");
DirectInner<T>[] inners = subscribers;
if (inners == TERMINATED) {
Operators.onErrorDropped(t, currentContext());
return;
}
error = t;
for (DirectInner<?> s : SUBSCRIBERS.getAndSet(this, TERMINATED)) {
s.onError(t);
}
}
@Override
public void onComplete() {
for (DirectInner<?> s : SUBSCRIBERS.getAndSet(this, TERMINATED)) {
s.onComplete();
}
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Objects.requireNonNull(actual, "subscribe");
DirectInner<T> p = new DirectInner<>(actual, this);
actual.onSubscribe(p);
if (add(p)) {
if (p.cancelled) {
remove(p);
}
}
else {
Throwable e = error;
if (e != null) {
actual.onError(e);
}
else {
actual.onComplete();
}
}
}
@Override
public Stream<? extends Scannable> inners() {
return Stream.of(subscribers);
}
@Override
public boolean isTerminated() {
return TERMINATED == subscribers;
}
@Override
public long downstreamCount() {
return subscribers.length;
}
boolean add(DirectInner<T> s) {
DirectInner<T>[] a = subscribers;
if (a == TERMINATED) {
return false;
}
synchronized (this) {
a = subscribers;
if (a == TERMINATED) {
return false;
}
int len = a.length;
@SuppressWarnings("unchecked") DirectInner<T>[] b = new DirectInner[len + 1];
System.arraycopy(a, 0, b, 0, len);
b[len] = s;
subscribers = b;
return true;
}
}
@SuppressWarnings("unchecked")
void remove(DirectInner<T> s) {
DirectInner<T>[] a = subscribers;
if (a == TERMINATED || a == EMPTY) {
return;
}
synchronized (this) {
a = subscribers;
if (a == TERMINATED || a == EMPTY) {
return;
}
int len = a.length;
int j = -1;
for (int i = 0; i < len; i++) {
if (a[i] == s) {
j = i;
break;
}
}
if (j < 0) {
return;
}
if (len == 1) {
subscribers = EMPTY;
return;
}
DirectInner<T>[] b = new DirectInner[len - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, len - j - 1);
subscribers = b;
}
}
@Override
public boolean hasDownstreams() {
DirectInner<T>[] s = subscribers;
return s != EMPTY && s != TERMINATED;
}
@Override
@Nullable
public Throwable getError() {
if (subscribers == TERMINATED) {
return error;
}
return null;
}
static final class DirectInner<T> implements InnerProducer<T> {
final CoreSubscriber<? super T> actual;
final DirectProcessor<T> parent;
volatile boolean cancelled;
volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<DirectInner> REQUESTED =
AtomicLongFieldUpdater.newUpdater(DirectInner.class, "requested");
DirectInner(CoreSubscriber<? super T> actual, DirectProcessor<T> parent) {
this.actual = actual;
this.parent = parent;
}
@Override
public void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
}
}
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
parent.remove(this);
}
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return parent;
if (key == Attr.CANCELLED) return cancelled;
return InnerProducer.super.scanUnsafe(key);
}
@Override
public CoreSubscriber<? super T> actual() {
return actual;
}
void onNext(T value) {
if (requested != 0L) {
actual.onNext(value);
if (requested != Long.MAX_VALUE) {
REQUESTED.decrementAndGet(this);
}
return;
}
parent.remove(this);
actual.onError(Exceptions.failWithOverflow(
"Can't deliver value due to lack of requests"));
}
void onError(Throwable e) {
actual.onError(e);
}
void onComplete() {
actual.onComplete();
}
}
}