/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
Maps each upstream value into a single true
or false
value provided by a generated Publisher for that input value and emits the input value if the inner Publisher returned true
.
Only the first item emitted by the inner Publisher's are considered. If
the inner Publisher is empty, no resulting item is generated for that input value.
Author: David Karnok, Simon Baslé Type parameters: - <T> – the input value type
/**
* Maps each upstream value into a single {@code true} or {@code false} value provided by
* a generated Publisher for that input value and emits the input value if the inner
* Publisher returned {@code true}.
* <p>
* Only the first item emitted by the inner Publisher's are considered. If
* the inner Publisher is empty, no resulting item is generated for that input value.
*
* @param <T> the input value type
*
* @author David Karnok
* @author Simon Baslé
*/
//adapted from RxJava2Extensions: https://github.com/akarnokd/RxJava2Extensions/blob/master/src/main/java/hu/akarnokd/rxjava2/operators/FlowableFilterAsync.java
class FluxFilterWhen<T> extends InternalFluxOperator<T, T> {
final Function<? super T, ? extends Publisher<Boolean>> asyncPredicate;
final int bufferSize;
FluxFilterWhen(Flux<T> source,
Function<? super T, ? extends Publisher<Boolean>> asyncPredicate,
int bufferSize) {
super(source);
this.asyncPredicate = asyncPredicate;
this.bufferSize = bufferSize;
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new FluxFilterWhenSubscriber<>(actual, asyncPredicate, bufferSize);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
static final class FluxFilterWhenSubscriber<T> implements InnerOperator<T, T> {
final Function<? super T, ? extends Publisher<Boolean>> asyncPredicate;
final int bufferSize;
final AtomicReferenceArray<T> toFilter;
final CoreSubscriber<? super T> actual;
final Context ctx;
int consumed;
long consumerIndex;
long emitted;
Boolean innerResult;
long producerIndex;
Subscription upstream;
volatile boolean cancelled;
volatile FilterWhenInner current;
volatile boolean done;
volatile Throwable error;
volatile long requested;
volatile int state;
volatile int wip;
static final AtomicReferenceFieldUpdater<FluxFilterWhenSubscriber, Throwable> ERROR =
AtomicReferenceFieldUpdater.newUpdater(FluxFilterWhenSubscriber.class, Throwable.class, "error");
static final AtomicLongFieldUpdater<FluxFilterWhenSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(FluxFilterWhenSubscriber.class, "requested");
static final AtomicIntegerFieldUpdater<FluxFilterWhenSubscriber> WIP =
AtomicIntegerFieldUpdater.newUpdater(FluxFilterWhenSubscriber.class, "wip");
static final AtomicReferenceFieldUpdater<FluxFilterWhenSubscriber, FilterWhenInner>CURRENT =
AtomicReferenceFieldUpdater.newUpdater(FluxFilterWhenSubscriber.class, FilterWhenInner.class, "current");
@SuppressWarnings("ConstantConditions")
static final FilterWhenInner INNER_CANCELLED = new FilterWhenInner(null, false);
static final int STATE_FRESH = 0;
static final int STATE_RUNNING = 1;
static final int STATE_RESULT = 2;
FluxFilterWhenSubscriber(CoreSubscriber<? super T> actual,
Function<? super T, ? extends Publisher<Boolean>> asyncPredicate,
int bufferSize) {
this.actual = actual;
this.ctx = actual.currentContext();
this.toFilter = new AtomicReferenceArray<>(Queues.ceilingNextPowerOfTwo(bufferSize));
this.asyncPredicate = asyncPredicate;
this.bufferSize = bufferSize;
}
@Override
public final CoreSubscriber<? super T> actual() {
return actual;
}
@Override
public void onNext(T t) {
long pi = producerIndex;
int m = toFilter.length() - 1;
int offset = (int)pi & m;
toFilter.lazySet(offset, t);
producerIndex = pi + 1;
drain();
}
@Override
public void onError(Throwable t) {
ERROR.set(this, t);
done = true;
drain();
}
@Override
public void onComplete() {
done = true;
drain();
}
@Override
public void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
drain();
}
}
@Override
public void cancel() {
if (!cancelled) {
cancelled = true;
upstream.cancel();
cancelInner();
if (WIP.getAndIncrement(this) == 0) {
clear();
}
}
}
void cancelInner() {
FilterWhenInner a = CURRENT.get(this);
if (a != INNER_CANCELLED) {
a = CURRENT.getAndSet(this, INNER_CANCELLED);
if (a != null && a != INNER_CANCELLED) {
a.cancel();
}
}
}
void clear() {
int n = toFilter.length();
for (int i = 0; i < n; i++) {
T old = toFilter.getAndSet(i, null);
Operators.onDiscard(old, ctx);
}
innerResult = null;
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(upstream, s)) {
upstream = s;
actual.onSubscribe(this);
s.request(bufferSize);
}
}
@SuppressWarnings("unchecked")
void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}
int missed = 1;
int limit = Operators.unboundedOrLimit(bufferSize);
long e = emitted;
long ci = consumerIndex;
int f = consumed;
int m = toFilter.length() - 1;
Subscriber<? super T> a = actual;
for (;;) {
long r = requested;
while (e != r) {
if (cancelled) {
clear();
return;
}
boolean d = done;
int offset = (int)ci & m;
T t = toFilter.get(offset);
boolean empty = t == null;
if (d && empty) {
Throwable ex = Exceptions.terminate(ERROR, this);
if (ex == null) {
a.onComplete();
} else {
a.onError(ex);
}
return;
}
if (empty) {
break;
}
int s = state;
if (s == STATE_FRESH) {
Publisher<Boolean> p;
try {
p = Objects.requireNonNull(asyncPredicate.apply(t), "The asyncPredicate returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Exceptions.addThrowable(ERROR, this, ex);
p = null; //discarded as "old" below
}
if (p != null) {
if (p instanceof Callable) {
Boolean u;
try {
u = ((Callable<Boolean>)p).call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Exceptions.addThrowable(ERROR, this, ex);
u = null; //triggers discard below
}
if (u != null && u) {
a.onNext(t);
e++;
}
else {
Operators.onDiscard(t, ctx);
}
} else {
FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));
if (CURRENT.compareAndSet(this,null, inner)) {
state = STATE_RUNNING;
p.subscribe(inner);
break;
}
}
}
T old = toFilter.getAndSet(offset, null);
Operators.onDiscard(old, ctx);
ci++;
if (++f == limit) {
f = 0;
upstream.request(limit);
}
} else
if (s == STATE_RESULT) {
Boolean u = innerResult;
innerResult = null;
if (u != null && u) {
a.onNext(t);
e++;
}
else {
Operators.onDiscard(t, ctx);
}
toFilter.lazySet(offset, null);
ci++;
if (++f == limit) {
f = 0;
upstream.request(limit);
}
state = STATE_FRESH;
} else {
break;
}
}
if (e == r) {
if (cancelled) {
clear();
return;
}
boolean d = done;
int offset = (int)ci & m;
T t = toFilter.get(offset);
boolean empty = t == null;
if (d && empty) {
Throwable ex = Exceptions.terminate(ERROR, this);
if (ex == null) {
a.onComplete();
} else {
a.onError(ex);
}
return;
}
}
int w = wip;
if (missed == w) {
consumed = f;
consumerIndex = ci;
emitted = e;
missed = WIP.addAndGet(this, -missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
void clearCurrent() {
FilterWhenInner c = current;
if (c != INNER_CANCELLED) {
CURRENT.compareAndSet(this, c, null);
}
}
void innerResult(Boolean item) {
innerResult = item;
state = STATE_RESULT;
clearCurrent();
drain();
}
void innerError(Throwable ex) {
Exceptions.addThrowable(ERROR, this, ex);
state = STATE_RESULT;
clearCurrent();
drain();
}
void innerComplete() {
state = STATE_RESULT;
clearCurrent();
drain();
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return upstream;
if (key == Attr.TERMINATED) return done;
if (key == Attr.CANCELLED) return cancelled;
if (key == Attr.ERROR) //FIXME ERROR is often reset by Exceptions.terminate :(
return error;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;
if (key == Attr.CAPACITY) return toFilter.length();
if (key == Attr.LARGE_BUFFERED) return producerIndex - consumerIndex;
if (key == Attr.BUFFERED) {
long realBuffered = producerIndex - consumerIndex;
if (realBuffered <= Integer.MAX_VALUE) return (int) realBuffered;
return Integer.MIN_VALUE;
}
if (key == Attr.PREFETCH) return bufferSize;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return InnerOperator.super.scanUnsafe(key);
}
@Override
public Stream<? extends Scannable> inners() {
FilterWhenInner c = current;
return c == null ? Stream.empty() : Stream.of(c);
}
}
static final class FilterWhenInner implements InnerConsumer<Boolean> {
final FluxFilterWhenSubscriber<?> parent;
final boolean cancelOnNext;
boolean done;
volatile Subscription sub;
static final AtomicReferenceFieldUpdater<FilterWhenInner, Subscription> SUB =
AtomicReferenceFieldUpdater.newUpdater(FilterWhenInner.class, Subscription.class, "sub");
FilterWhenInner(FluxFilterWhenSubscriber<?> parent, boolean cancelOnNext) {
this.parent = parent;
this.cancelOnNext = cancelOnNext;
}
@Override
public Context currentContext() {
return parent.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(SUB, this, s)) {
s.request(Long.MAX_VALUE);
}
}
@Override
public void onNext(Boolean t) {
if (!done) {
if (cancelOnNext) {
sub.cancel();
}
done = true;
parent.innerResult(t);
}
}
@Override
public void onError(Throwable t) {
if (!done) {
done = true;
parent.innerError(t);
} else {
Operators.onErrorDropped(t, parent.currentContext());
}
}
@Override
public void onComplete() {
if (!done) {
done = true;
parent.innerComplete();
}
}
void cancel() {
Operators.terminate(SUB, this);
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return parent;
if (key == Attr.ACTUAL) return sub;
if (key == Attr.CANCELLED) return sub == Operators.cancelledSubscription();
if (key == Attr.TERMINATED) return done;
if (key == Attr.PREFETCH) return Integer.MAX_VALUE;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return done ? 0L : 1L;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
}
}