/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.FluxGroupJoin.JoinSupport;
import reactor.core.publisher.FluxGroupJoin.LeftRightEndSubscriber;
import reactor.core.publisher.FluxGroupJoin.LeftRightSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

See Also:
Since:3.0
/** * @see <a href="https://github.com/reactor/reactive-streams-commons">https://github.com/reactor/reactive-streams-commons</a> * @since 3.0 */
final class FluxJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends FluxOperator<TLeft, R> { final Publisher<? extends TRight> other; final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd; final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd; final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector; FluxJoin(Flux<TLeft> source, Publisher<? extends TRight> other, Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) { super(source); this.other = Objects.requireNonNull(other, "other"); this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd"); this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd"); this.resultSelector = Objects.requireNonNull(resultSelector, "resultSelector"); } @Override public void subscribe(CoreSubscriber<? super R> actual) { JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R> parent = new JoinSubscription<>(actual, leftEnd, rightEnd, resultSelector); actual.onSubscribe(parent); LeftRightSubscriber left = new LeftRightSubscriber(parent, true); parent.cancellations.add(left); LeftRightSubscriber right = new LeftRightSubscriber(parent, false); parent.cancellations.add(right); source.subscribe(left); other.subscribe(right); } static final class JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R> implements JoinSupport<R> { final Queue<Object> queue; final BiPredicate<Object, Object> queueBiOffer; final Disposable.Composite cancellations; final Map<Integer, TLeft> lefts; final Map<Integer, TRight> rights; final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd; final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd; final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector; final CoreSubscriber<? super R> actual; volatile int wip; static final AtomicIntegerFieldUpdater<JoinSubscription> WIP = AtomicIntegerFieldUpdater.newUpdater(JoinSubscription.class, "wip"); volatile int active; static final AtomicIntegerFieldUpdater<JoinSubscription> ACTIVE = AtomicIntegerFieldUpdater.newUpdater(JoinSubscription.class, "active"); volatile long requested; static final AtomicLongFieldUpdater<JoinSubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(JoinSubscription.class, "requested"); volatile Throwable error; static final AtomicReferenceFieldUpdater<JoinSubscription, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(JoinSubscription.class, Throwable.class, "error"); int leftIndex; int rightIndex; static final Integer LEFT_VALUE = 1; static final Integer RIGHT_VALUE = 2; static final Integer LEFT_CLOSE = 3; static final Integer RIGHT_CLOSE = 4; @SuppressWarnings("unchecked") JoinSubscription(CoreSubscriber<? super R> actual, Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd, Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd, BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) { this.actual = actual; this.cancellations = Disposables.composite(); this.queue = Queues.unboundedMultiproducer().get(); this.queueBiOffer = (BiPredicate) queue; this.lefts = new LinkedHashMap<>(); this.rights = new LinkedHashMap<>(); this.leftEnd = leftEnd; this.rightEnd = rightEnd; this.resultSelector = resultSelector; ACTIVE.lazySet(this, 2); } @Override public final CoreSubscriber<? super R> actual() { return actual; } @Override public Stream<? extends Scannable> inners() { return Scannable.from(cancellations).inners(); } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested; if (key == Attr.CANCELLED) return cancellations.isDisposed(); if (key == Attr.BUFFERED) return queue.size() / 2; if (key == Attr.TERMINATED) return active == 0; if (key == Attr.ERROR) return error; return JoinSupport.super.scanUnsafe(key); } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); } } @Override public void cancel() { if (cancellations.isDisposed()) { return; } cancellations.dispose(); if (WIP.getAndIncrement(this) == 0) { queue.clear(); } } void errorAll(Subscriber<?> a) { Throwable ex = Exceptions.terminate(ERROR, this); lefts.clear(); rights.clear(); a.onError(ex); } void drain() { if (WIP.getAndIncrement(this) != 0) { return; } int missed = 1; Queue<Object> q = queue; Subscriber<? super R> a = actual; for (; ; ) { for (; ; ) { if (cancellations.isDisposed()) { q.clear(); return; } Throwable ex = error; if (ex != null) { q.clear(); cancellations.dispose(); errorAll(a); return; } boolean d = active == 0; Integer mode = (Integer) q.poll(); boolean empty = mode == null; if (d && empty) { lefts.clear(); rights.clear(); cancellations.dispose(); a.onComplete(); return; } if (empty) { break; } Object val = q.poll(); if (mode == LEFT_VALUE) { @SuppressWarnings("unchecked") TLeft left = (TLeft) val; int idx = leftIndex++; lefts.put(idx, left); Publisher<TLeftEnd> p; try { p = Objects.requireNonNull(leftEnd.apply(left), "The leftEnd returned a null Publisher"); } catch (Throwable exc) { Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this, exc, left, actual.currentContext())); errorAll(a); return; } LeftRightEndSubscriber end = new LeftRightEndSubscriber(this, true, idx); cancellations.add(end); p.subscribe(end); ex = error; if (ex != null) { q.clear(); cancellations.dispose(); errorAll(a); return; } long r = requested; long e = 0L; for (TRight right : rights.values()) { R w; try { w = Objects.requireNonNull(resultSelector.apply(left, right), "The resultSelector returned a null value"); } catch (Throwable exc) { Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this, exc, right, actual.currentContext())); errorAll(a); return; } if (e != r) { a.onNext(w); e++; } else { Exceptions.addThrowable(ERROR, this, Exceptions.failWithOverflow("Could not " + "emit value due to lack of requests")); q.clear(); cancellations.dispose(); errorAll(a); return; } } if (e != 0L) { Operators.produced(REQUESTED, this, e); } } else if (mode == RIGHT_VALUE) { @SuppressWarnings("unchecked") TRight right = (TRight) val; int idx = rightIndex++; rights.put(idx, right); Publisher<TRightEnd> p; try { p = Objects.requireNonNull(rightEnd.apply(right), "The rightEnd returned a null Publisher"); } catch (Throwable exc) { Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this, exc, right, actual.currentContext())); errorAll(a); return; } LeftRightEndSubscriber end = new LeftRightEndSubscriber(this, false, idx); cancellations.add(end); p.subscribe(end); ex = error; if (ex != null) { q.clear(); cancellations.dispose(); errorAll(a); return; } long r = requested; long e = 0L; for (TLeft left : lefts.values()) { R w; try { w = Objects.requireNonNull(resultSelector.apply(left, right), "The resultSelector returned a null value"); } catch (Throwable exc) { Exceptions.addThrowable(ERROR, this, Operators.onOperatorError(this, exc, left, actual.currentContext())); errorAll(a); return; } if (e != r) { a.onNext(w); e++; } else { Exceptions.addThrowable(ERROR, this, Exceptions.failWithOverflow("Could not emit " + "value due to lack of requests")); q.clear(); cancellations.dispose(); errorAll(a); return; } } if (e != 0L) { Operators.produced(REQUESTED, this, e); } } else if (mode == LEFT_CLOSE) { LeftRightEndSubscriber end = (LeftRightEndSubscriber) val; lefts.remove(end.index); cancellations.remove(end); } else if (mode == RIGHT_CLOSE) { LeftRightEndSubscriber end = (LeftRightEndSubscriber) val; rights.remove(end.index); cancellations.remove(end); } } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } @Override public void innerError(Throwable ex) { if (Exceptions.addThrowable(ERROR, this, ex)) { ACTIVE.decrementAndGet(this); drain(); } else { Operators.onErrorDropped(ex, actual.currentContext()); } } @Override public void innerComplete(LeftRightSubscriber sender) { cancellations.remove(sender); ACTIVE.decrementAndGet(this); drain(); } @Override public void innerValue(boolean isLeft, Object o) { queueBiOffer.test(isLeft ? LEFT_VALUE : RIGHT_VALUE, o); drain(); } @Override public void innerClose(boolean isLeft, LeftRightEndSubscriber index) { queueBiOffer.test(isLeft ? LEFT_CLOSE : RIGHT_CLOSE, index); drain(); } @Override public void innerCloseError(Throwable ex) { if (Exceptions.addThrowable(ERROR, this, ex)) { drain(); } else { Operators.onErrorDropped(ex, actual.currentContext()); } } } }