Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.subscribers; import org.reactivestreams.*; import io.reactivex.FlowableSubscriber; import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.fuseable.QueueSubscription; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.plugins.RxJavaPlugins;
Base class for a fuseable intermediate subscriber.
Type parameters:
  • <T> – the upstream value type
  • <R> – the downstream value type
/** * Base class for a fuseable intermediate subscriber. * @param <T> the upstream value type * @param <R> the downstream value type */
public abstract class BasicFuseableSubscriber<T, R> implements FlowableSubscriber<T>, QueueSubscription<R> {
The downstream subscriber.
/** The downstream subscriber. */
protected final Subscriber<? super R> downstream;
The upstream subscription.
/** The upstream subscription. */
protected Subscription upstream;
The upstream's QueueSubscription if not null.
/** The upstream's QueueSubscription if not null. */
protected QueueSubscription<T> qs;
Flag indicating no further onXXX event should be accepted.
/** Flag indicating no further onXXX event should be accepted. */
protected boolean done;
Holds the established fusion mode of the upstream.
/** Holds the established fusion mode of the upstream. */
protected int sourceMode;
Construct a BasicFuseableSubscriber by wrapping the given subscriber.
Params:
  • downstream – the subscriber, not null (not verified)
/** * Construct a BasicFuseableSubscriber by wrapping the given subscriber. * @param downstream the subscriber, not null (not verified) */
public BasicFuseableSubscriber(Subscriber<? super R> downstream) { this.downstream = downstream; } // final: fixed protocol steps to support fuseable and non-fuseable upstream @SuppressWarnings("unchecked") @Override public final void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; if (s instanceof QueueSubscription) { this.qs = (QueueSubscription<T>)s; } if (beforeDownstream()) { downstream.onSubscribe(this); afterDownstream(); } } }
Override this to perform actions before the call actual.onSubscribe(this) happens.
Returns:true if onSubscribe should continue with the call
/** * Override this to perform actions before the call {@code actual.onSubscribe(this)} happens. * @return true if onSubscribe should continue with the call */
protected boolean beforeDownstream() { return true; }
Override this to perform actions after the call to actual.onSubscribe(this) happened.
/** * Override this to perform actions after the call to {@code actual.onSubscribe(this)} happened. */
protected void afterDownstream() { // default no-op } // ----------------------------------- // Convenience and state-aware methods // ----------------------------------- @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } done = true; downstream.onError(t); }
Rethrows the throwable if it is a fatal exception or calls onError(Throwable).
Params:
  • t – the throwable to rethrow or signal to the actual subscriber
/** * Rethrows the throwable if it is a fatal exception or calls {@link #onError(Throwable)}. * @param t the throwable to rethrow or signal to the actual subscriber */
protected final void fail(Throwable t) { Exceptions.throwIfFatal(t); upstream.cancel(); onError(t); } @Override public void onComplete() { if (done) { return; } done = true; downstream.onComplete(); }
Calls the upstream's QueueSubscription.requestFusion with the mode and saves the established mode in BasicFuseableSubscriber<T,R>.sourceMode if that mode doesn't have the QueueFuseable.BOUNDARY flag set.

If the upstream doesn't support fusion (BasicFuseableSubscriber<T,R>.qs is null), the method returns QueueFuseable.NONE.

Params:
  • mode – the fusion mode requested
Returns:the established fusion mode
/** * Calls the upstream's QueueSubscription.requestFusion with the mode and * saves the established mode in {@link #sourceMode} if that mode doesn't * have the {@link QueueSubscription#BOUNDARY} flag set. * <p> * If the upstream doesn't support fusion ({@link #qs} is null), the method * returns {@link QueueSubscription#NONE}. * @param mode the fusion mode requested * @return the established fusion mode */
protected final int transitiveBoundaryFusion(int mode) { QueueSubscription<T> qs = this.qs; if (qs != null) { if ((mode & BOUNDARY) == 0) { int m = qs.requestFusion(mode); if (m != NONE) { sourceMode = m; } return m; } } return NONE; } // -------------------------------------------------------------- // Default implementation of the RS and QS protocol (can be overridden) // -------------------------------------------------------------- @Override public void request(long n) { upstream.request(n); } @Override public void cancel() { upstream.cancel(); } @Override public boolean isEmpty() { return qs.isEmpty(); } @Override public void clear() { qs.clear(); } // ----------------------------------------------------------- // The rest of the Queue interface methods shouldn't be called // ----------------------------------------------------------- @Override public final boolean offer(R e) { throw new UnsupportedOperationException("Should not be called!"); } @Override public final boolean offer(R v1, R v2) { throw new UnsupportedOperationException("Should not be called!"); } }