/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;

Emits events on a different thread specified by a scheduler callback.
Type parameters:
  • <T> – the value type
See Also:
/** * Emits events on a different thread specified by a scheduler callback. * * @param <T> the value type * * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a> */
final class FluxPublishOn<T> extends FluxOperator<T, T> implements Fuseable { final Scheduler scheduler; final boolean delayError; final Supplier<? extends Queue<T>> queueSupplier; final int prefetch; final int lowTide; FluxPublishOn(Flux<? extends T> source, Scheduler scheduler, boolean delayError, int prefetch, int lowTide, Supplier<? extends Queue<T>> queueSupplier) { super(source); if (prefetch <= 0) { throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch); } this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); this.delayError = delayError; this.prefetch = prefetch; this.lowTide = lowTide; this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.RUN_ON) return scheduler; return super.scanUnsafe(key); } @Override public int getPrefetch() { return prefetch; } @Override @SuppressWarnings("unchecked") public void subscribe(CoreSubscriber<? super T> actual) { Worker worker; try { worker = Objects.requireNonNull(scheduler.createWorker(), "The scheduler returned a null worker"); } catch (Throwable e) { Operators.error(actual, Operators.onOperatorError(e, actual.currentContext())); return; } if (actual instanceof ConditionalSubscriber) { ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) actual; source.subscribe(new PublishOnConditionalSubscriber<>(cs, scheduler, worker, delayError, prefetch, lowTide, queueSupplier)); return; } source.subscribe(new PublishOnSubscriber<>(actual, scheduler, worker, delayError, prefetch, lowTide, queueSupplier)); } static final class PublishOnSubscriber<T> implements QueueSubscription<T>, Runnable, InnerOperator<T, T> { final CoreSubscriber<? super T> actual; final Scheduler scheduler; final Worker worker; final boolean delayError; final int prefetch; final int limit; final Supplier<? extends Queue<T>> queueSupplier; Subscription s; Queue<T> queue; volatile boolean cancelled; volatile boolean done; Throwable error; volatile int wip; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater<PublishOnSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(PublishOnSubscriber.class, "wip"); volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<PublishOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(PublishOnSubscriber.class, "requested"); int sourceMode; long produced; boolean outputFused; PublishOnSubscriber(CoreSubscriber<? super T> actual, Scheduler scheduler, Worker worker, boolean delayError, int prefetch, int lowTide, Supplier<? extends Queue<T>> queueSupplier) { this.actual = actual; this.worker = worker; this.scheduler = scheduler; this.delayError = delayError; this.prefetch = prefetch; this.queueSupplier = queueSupplier; this.limit = Operators.unboundedOrLimit(prefetch, lowTide); } @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription<T> f = (QueueSubscription<T>) s; int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER); if (m == Fuseable.SYNC) { sourceMode = Fuseable.SYNC; queue = f; done = true; actual.onSubscribe(this); return; } if (m == Fuseable.ASYNC) { sourceMode = Fuseable.ASYNC; queue = f; actual.onSubscribe(this); s.request(Operators.unboundedOrPrefetch(prefetch)); return; } } queue = queueSupplier.get(); actual.onSubscribe(this); s.request(Operators.unboundedOrPrefetch(prefetch)); } } @Override public void onNext(T t) { if (sourceMode == ASYNC) { trySchedule(this, null, null /* t always null */); return; } if (done) { Operators.onNextDropped(t, actual.currentContext()); return; } if (!queue.offer(t)) { error = Operators.onOperatorError(s, Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), t, actual.currentContext()); done = true; } trySchedule(this, null, t); } @Override public void onError(Throwable t) { if (done) { Operators.onErrorDropped(t, actual.currentContext()); return; } error = t; done = true; trySchedule(null, t, null); } @Override public void onComplete() { if (done) { return; } done = true; //WIP also guards, no competing onNext trySchedule(null, null, null); } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); //WIP also guards during request and onError is possible trySchedule(this, null, null); } } @Override public void cancel() { if (cancelled) { return; } cancelled = true; s.cancel(); worker.dispose(); if (WIP.getAndIncrement(this) == 0) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); } } void trySchedule( @Nullable Subscription subscription, @Nullable Throwable suppressed, @Nullable Object dataSignal) { if (WIP.getAndIncrement(this) != 0) { return; } try { worker.schedule(this); } catch (RejectedExecutionException ree) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal, actual.currentContext())); } } void runSync() { int missed = 1; final Subscriber<? super T> a = actual; final Queue<T> q = queue; long e = produced; for (; ; ) { long r = requested; while (e != r) { T v; try { v = q.poll(); } catch (Throwable ex) { doError(a, Operators.onOperatorError(s, ex, actual.currentContext())); return; } if (cancelled) { Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); return; } if (v == null) { doComplete(a); return; } a.onNext(v); e++; } if (cancelled) { Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); return; } if (q.isEmpty()) { doComplete(a); return; } int w = wip; if (missed == w) { produced = e; missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } else { missed = w; } } } void runAsync() { int missed = 1; final Subscriber<? super T> a = actual; final Queue<T> q = queue; long e = produced; for (; ; ) { long r = requested; while (e != r) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); doError(a, Operators.onOperatorError(ex, actual.currentContext())); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); e++; if (e == limit) { if (r != Long.MAX_VALUE) { r = REQUESTED.addAndGet(this, -e); } s.request(e); e = 0L; } } if (e == r && checkTerminated(done, q.isEmpty(), a)) { return; } int w = wip; if (missed == w) { produced = e; missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } else { missed = w; } } } void runBackfused() { int missed = 1; for (; ; ) { if (cancelled) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); return; } boolean d = done; actual.onNext(null); if (d) { Throwable e = error; if (e != null) { doError(actual, e); } else { doComplete(actual); } return; } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } void doComplete(Subscriber<?> a) { a.onComplete(); worker.dispose(); } void doError(Subscriber<?> a, Throwable e) { try { a.onError(e); } finally { worker.dispose(); } } @Override public void run() { if (outputFused) { runBackfused(); } else if (sourceMode == Fuseable.SYNC) { runSync(); } else { runAsync(); } } boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) { if (cancelled) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); return true; } if (d) { if (delayError) { if (empty) { Throwable e = error; if (e != null) { doError(a, e); } else { doComplete(a); } return true; } } else { Throwable e = error; if (e != null) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); doError(a, e); return true; } else if (empty) { doComplete(a); return true; } } } return false; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.REQUESTED_FROM_DOWNSTREAM ) return requested; if (key == Attr.PARENT ) return s; if (key == Attr.CANCELLED) return cancelled; if (key == Attr.TERMINATED) return done; if (key == Attr.BUFFERED) return queue != null ? queue.size() : 0; if (key == Attr.ERROR) return error; if (key == Attr.DELAY_ERROR) return delayError; if (key == Attr.PREFETCH) return prefetch; if (key == Attr.RUN_ON) return worker; return InnerOperator.super.scanUnsafe(key); } @Override public CoreSubscriber<? super T> actual() { return actual; } @Override public void clear() { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override @Nullable public T poll() { T v = queue.poll(); if (v != null && sourceMode != SYNC) { long p = produced + 1; if (p == limit) { produced = 0; s.request(p); } else { produced = p; } } return v; } @Override public int requestFusion(int requestedMode) { if ((requestedMode & ASYNC) != 0) { outputFused = true; return ASYNC; } return NONE; } @Override public int size() { return queue.size(); } } static final class PublishOnConditionalSubscriber<T> implements QueueSubscription<T>, Runnable, InnerOperator<T, T> { final ConditionalSubscriber<? super T> actual; final Worker worker; final Scheduler scheduler; final boolean delayError; final int prefetch; final int limit; final Supplier<? extends Queue<T>> queueSupplier; Subscription s; Queue<T> queue; volatile boolean cancelled; volatile boolean done; Throwable error; volatile int wip; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater<PublishOnConditionalSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(PublishOnConditionalSubscriber.class, "wip"); volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<PublishOnConditionalSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(PublishOnConditionalSubscriber.class, "requested"); int sourceMode; long produced; long consumed; boolean outputFused; PublishOnConditionalSubscriber(ConditionalSubscriber<? super T> actual, Scheduler scheduler, Worker worker, boolean delayError, int prefetch, int lowTide, Supplier<? extends Queue<T>> queueSupplier) { this.actual = actual; this.worker = worker; this.scheduler = scheduler; this.delayError = delayError; this.prefetch = prefetch; this.queueSupplier = queueSupplier; this.limit = Operators.unboundedOrLimit(prefetch, lowTide); } @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription<T> f = (QueueSubscription<T>) s; int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER); if (m == Fuseable.SYNC) { sourceMode = Fuseable.SYNC; queue = f; done = true; actual.onSubscribe(this); return; } if (m == Fuseable.ASYNC) { sourceMode = Fuseable.ASYNC; queue = f; actual.onSubscribe(this); s.request(Operators.unboundedOrPrefetch(prefetch)); return; } } queue = queueSupplier.get(); actual.onSubscribe(this); s.request(Operators.unboundedOrPrefetch(prefetch)); } } @Override public void onNext(T t) { if (sourceMode == ASYNC) { trySchedule(this, null, null); return; } if (done) { Operators.onNextDropped(t, actual.currentContext()); return; } if (!queue.offer(t)) { error = Operators.onOperatorError(s, Exceptions.failWithOverflow(Exceptions.BACKPRESSURE_ERROR_QUEUE_FULL), t, actual.currentContext()); done = true; } trySchedule(this, null, t); } @Override public void onError(Throwable t) { if(done){ Operators.onErrorDropped(t, actual.currentContext()); return; } error = t; done = true; trySchedule(null, t, null); } @Override public void onComplete() { if(done){ return; } done = true; trySchedule(null, null, null); } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); trySchedule(this, null, null); } } @Override public void cancel() { if (cancelled) { return; } cancelled = true; s.cancel(); worker.dispose(); if (WIP.getAndIncrement(this) == 0) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); } } void trySchedule( @Nullable Subscription subscription, @Nullable Throwable suppressed, @Nullable Object dataSignal) { if (WIP.getAndIncrement(this) != 0) { return; } try { worker.schedule(this); } catch (RejectedExecutionException ree) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal, actual.currentContext())); } } void runSync() { int missed = 1; final ConditionalSubscriber<? super T> a = actual; final Queue<T> q = queue; long e = produced; for (; ; ) { long r = requested; while (e != r) { T v; try { v = q.poll(); } catch (Throwable ex) { doError(a, Operators.onOperatorError(s, ex, actual.currentContext())); return; } if (cancelled) { Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); return; } if (v == null) { doComplete(a); return; } if (a.tryOnNext(v)) { e++; } } if (cancelled) { Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); return; } if (q.isEmpty()) { doComplete(a); return; } int w = wip; if (missed == w) { produced = e; missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } else { missed = w; } } } void runAsync() { int missed = 1; final ConditionalSubscriber<? super T> a = actual; final Queue<T> q = queue; long emitted = produced; long polled = consumed; for (; ; ) { long r = requested; while (emitted != r) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); doError(a, Operators.onOperatorError(ex, actual.currentContext())); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } if (a.tryOnNext(v)) { emitted++; } polled++; if (polled == limit) { s.request(polled); polled = 0L; } } if (emitted == r && checkTerminated(done, q.isEmpty(), a)) { return; } int w = wip; if (missed == w) { produced = emitted; consumed = polled; missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } else { missed = w; } } } void runBackfused() { int missed = 1; for (; ; ) { if (cancelled) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); return; } boolean d = done; actual.onNext(null); if (d) { Throwable e = error; if (e != null) { doError(actual, e); } else { doComplete(actual); } return; } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } @Override public void run() { if (outputFused) { runBackfused(); } else if (sourceMode == Fuseable.SYNC) { runSync(); } else { runAsync(); } } @Override public CoreSubscriber<? super T> actual() { return actual; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; if (key == Attr.PARENT) return s; if (key == Attr.CANCELLED) return cancelled; if (key == Attr.TERMINATED) return done; if (key == Attr.BUFFERED) return queue != null ? queue.size() : 0; if (key == Attr.ERROR) return error; if (key == Attr.DELAY_ERROR) return delayError; if (key == Attr.PREFETCH) return prefetch; if (key == Attr.RUN_ON) return worker; return InnerOperator.super.scanUnsafe(key); } void doComplete(Subscriber<?> a) { a.onComplete(); worker.dispose(); } void doError(Subscriber<?> a, Throwable e) { try { a.onError(e); } finally { worker.dispose(); } } boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) { if (cancelled) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); return true; } if (d) { if (delayError) { if (empty) { Throwable e = error; if (e != null) { doError(a, e); } else { doComplete(a); } return true; } } else { Throwable e = error; if (e != null) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); doError(a, e); return true; } else if (empty) { doComplete(a); return true; } } } return false; } @Override public void clear() { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override @Nullable public T poll() { T v = queue.poll(); if (v != null && sourceMode != SYNC) { long p = consumed + 1; if (p == limit) { consumed = 0; s.request(p); } else { consumed = p; } } return v; } @Override public int requestFusion(int requestedMode) { if ((requestedMode & ASYNC) != 0) { outputFused = true; return ASYNC; } return NONE; } @Override public int size() { return queue.size(); } } }