Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.operators.observable; import java.util.Arrays; import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.annotations.NonNull; import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins;
Combines a main sequence of values with the latest from multiple other sequences via a selector function.
Type parameters:
  • <T> – the main sequence's type
  • <R> – the output type
/** * Combines a main sequence of values with the latest from multiple other sequences via * a selector function. * * @param <T> the main sequence's type * @param <R> the output type */
public final class ObservableWithLatestFromMany<T, R> extends AbstractObservableWithUpstream<T, R> { @Nullable final ObservableSource<?>[] otherArray; @Nullable final Iterable<? extends ObservableSource<?>> otherIterable; @NonNull final Function<? super Object[], R> combiner; public ObservableWithLatestFromMany(@NonNull ObservableSource<T> source, @NonNull ObservableSource<?>[] otherArray, @NonNull Function<? super Object[], R> combiner) { super(source); this.otherArray = otherArray; this.otherIterable = null; this.combiner = combiner; } public ObservableWithLatestFromMany(@NonNull ObservableSource<T> source, @NonNull Iterable<? extends ObservableSource<?>> otherIterable, @NonNull Function<? super Object[], R> combiner) { super(source); this.otherArray = null; this.otherIterable = otherIterable; this.combiner = combiner; } @Override protected void subscribeActual(Observer<? super R> observer) { ObservableSource<?>[] others = otherArray; int n = 0; if (others == null) { others = new ObservableSource[8]; try { for (ObservableSource<?> p : otherIterable) { if (n == others.length) { others = Arrays.copyOf(others, n + (n >> 1)); } others[n++] = p; } } catch (Throwable ex) { Exceptions.throwIfFatal(ex); EmptyDisposable.error(ex, observer); return; } } else { n = others.length; } if (n == 0) { new ObservableMap<T, R>(source, new SingletonArrayFunc()).subscribeActual(observer); return; } WithLatestFromObserver<T, R> parent = new WithLatestFromObserver<T, R>(observer, combiner, n); observer.onSubscribe(parent); parent.subscribe(others, n); source.subscribe(parent); } static final class WithLatestFromObserver<T, R> extends AtomicInteger implements Observer<T>, Disposable { private static final long serialVersionUID = 1577321883966341961L; final Observer<? super R> downstream; final Function<? super Object[], R> combiner; final WithLatestInnerObserver[] observers; final AtomicReferenceArray<Object> values; final AtomicReference<Disposable> upstream; final AtomicThrowable error; volatile boolean done; WithLatestFromObserver(Observer<? super R> actual, Function<? super Object[], R> combiner, int n) { this.downstream = actual; this.combiner = combiner; WithLatestInnerObserver[] s = new WithLatestInnerObserver[n]; for (int i = 0; i < n; i++) { s[i] = new WithLatestInnerObserver(this, i); } this.observers = s; this.values = new AtomicReferenceArray<Object>(n); this.upstream = new AtomicReference<Disposable>(); this.error = new AtomicThrowable(); } void subscribe(ObservableSource<?>[] others, int n) { WithLatestInnerObserver[] observers = this.observers; AtomicReference<Disposable> upstream = this.upstream; for (int i = 0; i < n; i++) { if (DisposableHelper.isDisposed(upstream.get()) || done) { return; } others[i].subscribe(observers[i]); } } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this.upstream, d); } @Override public void onNext(T t) { if (done) { return; } AtomicReferenceArray<Object> ara = values; int n = ara.length(); Object[] objects = new Object[n + 1]; objects[0] = t; for (int i = 0; i < n; i++) { Object o = ara.get(i); if (o == null) { // no latest, skip this value return; } objects[i + 1] = o; } R v; try { v = ObjectHelper.requireNonNull(combiner.apply(objects), "combiner returned a null value"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); dispose(); onError(ex); return; } HalfSerializer.onNext(downstream, v, this, error); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } done = true; cancelAllBut(-1); HalfSerializer.onError(downstream, t, this, error); } @Override public void onComplete() { if (!done) { done = true; cancelAllBut(-1); HalfSerializer.onComplete(downstream, this, error); } } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(upstream.get()); } @Override public void dispose() { DisposableHelper.dispose(upstream); for (WithLatestInnerObserver observer : observers) { observer.dispose(); } } void innerNext(int index, Object o) { values.set(index, o); } void innerError(int index, Throwable t) { done = true; DisposableHelper.dispose(upstream); cancelAllBut(index); HalfSerializer.onError(downstream, t, this, error); } void innerComplete(int index, boolean nonEmpty) { if (!nonEmpty) { done = true; cancelAllBut(index); HalfSerializer.onComplete(downstream, this, error); } } void cancelAllBut(int index) { WithLatestInnerObserver[] observers = this.observers; for (int i = 0; i < observers.length; i++) { if (i != index) { observers[i].dispose(); } } } } static final class WithLatestInnerObserver extends AtomicReference<Disposable> implements Observer<Object> { private static final long serialVersionUID = 3256684027868224024L; final WithLatestFromObserver<?, ?> parent; final int index; boolean hasValue; WithLatestInnerObserver(WithLatestFromObserver<?, ?> parent, int index) { this.parent = parent; this.index = index; } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); } @Override public void onNext(Object t) { if (!hasValue) { hasValue = true; } parent.innerNext(index, t); } @Override public void onError(Throwable t) { parent.innerError(index, t); } @Override public void onComplete() { parent.innerComplete(index, hasValue); } public void dispose() { DisposableHelper.dispose(this); } } final class SingletonArrayFunc implements Function<T, R> { @Override public R apply(T t) throws Exception { return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value"); } } }