/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Samples the main source and emits its latest value whenever the other Publisher signals a value.

Termination of either Publishers will result in termination for the Subscriber as well.

Both Publishers will run in unbounded mode because the backpressure would interfere with the sampling precision.

Type parameters:
  • <T> – the input and output value type
  • <U> – the value type of the sampler (irrelevant)
See Also:
/** * Samples the main source and emits its latest value whenever the other Publisher * signals a value. * <p> * <p> * Termination of either Publishers will result in termination for the Subscriber * as well. * <p> * <p> * Both Publishers will run in unbounded mode because the backpressure * would interfere with the sampling precision. * * @param <T> the input and output value type * @param <U> the value type of the sampler (irrelevant) * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a> */
final class FluxSample<T, U> extends InternalFluxOperator<T, T> { final Publisher<U> other; FluxSample(Flux<? extends T> source, Publisher<U> other) { super(source); this.other = Objects.requireNonNull(other, "other"); } @Override public int getPrefetch() { return Integer.MAX_VALUE; } @Override public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) { CoreSubscriber<T> serial = Operators.serialize(actual); SampleMainSubscriber<T> main = new SampleMainSubscriber<>(serial); actual.onSubscribe(main); other.subscribe(new SampleOther<>(main)); return main; } @Override public Object scanUnsafe(Attr key) { if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; return super.scanUnsafe(key); } static final class SampleMainSubscriber<T> implements InnerOperator<T, T> { final CoreSubscriber<? super T> actual; final Context ctx; volatile T value; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<SampleMainSubscriber, Object> VALUE = AtomicReferenceFieldUpdater.newUpdater(SampleMainSubscriber.class, Object.class, "value"); volatile Subscription main; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<SampleMainSubscriber, Subscription> MAIN = AtomicReferenceFieldUpdater.newUpdater(SampleMainSubscriber.class, Subscription.class, "main"); volatile Subscription other; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<SampleMainSubscriber, Subscription> OTHER = AtomicReferenceFieldUpdater.newUpdater(SampleMainSubscriber.class, Subscription.class, "other"); volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<SampleMainSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(SampleMainSubscriber.class, "requested"); SampleMainSubscriber(CoreSubscriber<? super T> actual) { this.actual = actual; this.ctx = actual.currentContext(); } @Override public final CoreSubscriber<? super T> actual() { return actual; } @Override public Stream<? extends Scannable> inners() { return Stream.of(Scannable.from(other)); } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; if (key == Attr.PARENT) return main; if (key == Attr.CANCELLED) return main == Operators.cancelledSubscription(); if (key == Attr.BUFFERED) return value != null ? 1 : 0; if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; return InnerOperator.super.scanUnsafe(key); } @Override public void onSubscribe(Subscription s) { if (!MAIN.compareAndSet(this, null, s)) { s.cancel(); if (main != Operators.cancelledSubscription()) { Operators.reportSubscriptionSet(); } return; } s.request(Long.MAX_VALUE); } void cancelMain() { Subscription s = main; if (s != Operators.cancelledSubscription()) { s = MAIN.getAndSet(this, Operators.cancelledSubscription()); if (s != null && s != Operators.cancelledSubscription()) { s.cancel(); } } } void cancelOther() { Subscription s = other; if (s != Operators.cancelledSubscription()) { s = OTHER.getAndSet(this, Operators.cancelledSubscription()); if (s != null && s != Operators.cancelledSubscription()) { s.cancel(); } } } void setOther(Subscription s) { if (!OTHER.compareAndSet(this, null, s)) { s.cancel(); if (other != Operators.cancelledSubscription()) { Operators.reportSubscriptionSet(); } return; } s.request(Long.MAX_VALUE); } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); } } @Override public void cancel() { cancelMain(); cancelOther(); } @Override public void onNext(T t) { Object old = VALUE.getAndSet(this, t); if (old != null) { Operators.onDiscard(old, ctx); } } @Override public void onError(Throwable t) { cancelOther(); actual.onError(t); } @Override public void onComplete() { cancelOther(); //implementation note: if the sampler triggers at the exact same time as the // last source value being emitted, it could be that the sampler "wins" and // suppresses the value by cancelling the main. In any other case, the block // below ensures last source value is always part of the sample. T v = value; if (v != null) { actual.onNext(value); } actual.onComplete(); } @SuppressWarnings("unchecked") @Nullable T getAndNullValue() { return (T) VALUE.getAndSet(this, null); } void decrement() { REQUESTED.decrementAndGet(this); } } static final class SampleOther<T, U> implements InnerConsumer<U> { final SampleMainSubscriber<T> main; SampleOther(SampleMainSubscriber<T> main) { this.main = main; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return main.other; if (key == Attr.ACTUAL) return main; if (key == Attr.CANCELLED) return main.other == Operators.cancelledSubscription(); if (key == Attr.PREFETCH) return Integer.MAX_VALUE; if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; return null; } @Override public Context currentContext() { return main.currentContext(); } @Override public void onSubscribe(Subscription s) { main.setOther(s); } @Override public void onNext(U t) { SampleMainSubscriber<T> m = main; T v = m.getAndNullValue(); if (v != null) { if (m.requested != 0L) { m.actual.onNext(v); if (m.requested != Long.MAX_VALUE) { m.decrement(); } return; } m.cancel(); m.actual.onError(Exceptions.failWithOverflow("Can't signal value due to lack of requests")); Operators.onDiscard(v, m.ctx); } } @Override public void onError(Throwable t) { SampleMainSubscriber<T> m = main; m.cancelMain(); m.actual.onError(t); } @Override public void onComplete() { SampleMainSubscriber<T> m = main; m.cancelMain(); m.actual.onComplete(); } } }