Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the License for the specific language governing permissions and limitations under the License.
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex;
import java.util.NoSuchElementException;
import java.util.concurrent.*;
import org.reactivestreams.Publisher;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.observers.*;
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.util.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
The Single
class implements the Reactive Pattern for a single value response. Single
behaves similarly to Observable
except that it can only emit either a single successful value or an error (there is no "onComplete" notification as there is for an Observable
).
The Single
class implements the SingleSource
base interface and the default consumer type it interacts with is the SingleObserver
via the subscribe(SingleObserver)
method.
The Single
operates with the following sequential protocol:
onSubscribe (onSuccess | onError)?
Note that onSuccess
and onError
are mutually exclusive events; unlike Observable
, onSuccess
is never followed by onError
.
Like Observable
, a running Single
can be stopped through the Disposable
instance provided to consumers through SingleObserver.onSubscribe
.
Like an Observable
, a Single
is lazy, can be either "hot" or "cold", synchronous or asynchronous. Single
instances returned by the methods of this class are cold
and there is a standard hot implementation in the form of a subject: SingleSubject
.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
See Flowable
or Observable
for the implementation of the Reactive Pattern for a stream or vector of values.
For more information see the ReactiveX
documentation.
Example:
Disposable d = Single.just("Hello World")
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableSingleObserver<String>() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
Thread.sleep(5000);
d.dispose();
Note that by design, subscriptions via subscribe(SingleObserver)
can't be disposed from the outside (hence the void
return of the subscribe(SingleObserver)
method) and it is the responsibility of the implementor of the SingleObserver
to allow this to happen. RxJava supports such usage with the standard DisposableSingleObserver
instance. For convenience, the subscribeWith(SingleObserver)
method is provided as well to allow working with a SingleObserver
(or subclass) instance to be applied with in a fluent manner (such as in the example above).
Type parameters: - <T> –
the type of the item emitted by the Single
See Also: Since: 2.0
/**
* The {@code Single} class implements the Reactive Pattern for a single value response.
* <p>
* {@code Single} behaves similarly to {@link Observable} except that it can only emit either a single successful
* value or an error (there is no "onComplete" notification as there is for an {@link Observable}).
* <p>
* The {@code Single} class implements the {@link SingleSource} base interface and the default consumer
* type it interacts with is the {@link SingleObserver} via the {@link #subscribe(SingleObserver)} method.
* <p>
* The {@code Single} operates with the following sequential protocol:
* <pre>
* <code>onSubscribe (onSuccess | onError)?</code>
* </pre>
* <p>
* Note that {@code onSuccess} and {@code onError} are mutually exclusive events; unlike {@code Observable},
* {@code onSuccess} is never followed by {@code onError}.
* <p>
* Like {@code Observable}, a running {@code Single} can be stopped through the {@link Disposable} instance
* provided to consumers through {@link SingleObserver#onSubscribe}.
* <p>
* Like an {@code Observable}, a {@code Single} is lazy, can be either "hot" or "cold", synchronous or
* asynchronous. {@code Single} instances returned by the methods of this class are <em>cold</em>
* and there is a standard <em>hot</em> implementation in the form of a subject:
* {@link io.reactivex.subjects.SingleSubject SingleSubject}.
* <p>
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
* <p>
* <img width="640" height="301" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.legend.png" alt="">
* <p>
* See {@link Flowable} or {@link Observable} for the
* implementation of the Reactive Pattern for a stream or vector of values.
* <p>
* For more information see the <a href="http://reactivex.io/documentation/single.html">ReactiveX
* documentation</a>.
* <p>
* Example:
* <pre><code>
* Disposable d = Single.just("Hello World")
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .subscribeWith(new DisposableSingleObserver<String>() {
* @Override
* public void onStart() {
* System.out.println("Started");
* }
*
* @Override
* public void onSuccess(String value) {
* System.out.println("Success: " + value);
* }
*
* @Override
* public void onError(Throwable error) {
* error.printStackTrace();
* }
* });
*
* Thread.sleep(5000);
*
* d.dispose();
* </code></pre>
* <p>
* Note that by design, subscriptions via {@link #subscribe(SingleObserver)} can't be disposed
* from the outside (hence the
* {@code void} return of the {@link #subscribe(SingleObserver)} method) and it is the
* responsibility of the implementor of the {@code SingleObserver} to allow this to happen.
* RxJava supports such usage with the standard
* {@link io.reactivex.observers.DisposableSingleObserver DisposableSingleObserver} instance.
* For convenience, the {@link #subscribeWith(SingleObserver)} method is provided as well to
* allow working with a {@code SingleObserver} (or subclass) instance to be applied with in
* a fluent manner (such as in the example above).
* @param <T>
* the type of the item emitted by the Single
* @since 2.0
* @see io.reactivex.observers.DisposableSingleObserver
*/
public abstract class Single<T> implements SingleSource<T> {
Runs multiple SingleSources and signals the events of the first one that signals (disposing
the rest).
- Scheduler:
amb
does not operate by default on a particular Scheduler
.
Params: - sources – the Iterable sequence of sources. A subscription to each source will
occur in the same order as in this Iterable.
Type parameters: - <T> – the value type
Returns: the new Single instance Since: 2.0
/**
* Runs multiple SingleSources and signals the events of the first one that signals (disposing
* the rest).
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.amb.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Iterable sequence of sources. A subscription to each source will
* occur in the same order as in this Iterable.
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new SingleAmb<T>(null, sources));
}
Runs multiple SingleSources and signals the events of the first one that signals (disposing
the rest).
- Scheduler:
ambArray
does not operate by default on a particular Scheduler
.
Params: - sources – the array of sources. A subscription to each source will
occur in the same order as in this array.
Type parameters: - <T> – the value type
Returns: the new Single instance Since: 2.0
/**
* Runs multiple SingleSources and signals the events of the first one that signals (disposing
* the rest).
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.ambArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in this array.
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Single<T> ambArray(final SingleSource<? extends T>... sources) {
if (sources.length == 0) {
return error(SingleInternalHelper.<T>emptyThrower());
}
if (sources.length == 1) {
return wrap((SingleSource<T>)sources[0]);
}
return RxJavaPlugins.onAssembly(new SingleAmb<T>(sources, null));
}
Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
an Iterable sequence.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the Iterable sequence of SingleSource instances
Type parameters: - <T> – the value type
Returns: the new Flowable instance Since: 2.0
/**
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* an Iterable sequence.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.i.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Iterable sequence of SingleSource instances
* @return the new Flowable instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T>> sources) {
return concat(Flowable.fromIterable(sources));
}
Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
an Observable sequence.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the ObservableSource of SingleSource instances
Type parameters: - <T> – the value type
Returns: the new Observable instance Since: 2.0
/**
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* an Observable sequence.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the ObservableSource of SingleSource instances
* @return the new Observable instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Observable<T> concat(ObservableSource<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
}
Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
a Publisher sequence.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer and the sources Publisher
is expected to honor it as well.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the Publisher of SingleSource instances
Type parameters: - <T> – the value type
Returns: the new Flowable instance Since: 2.0
/**
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* a Publisher sequence.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and the sources {@code Publisher} is expected to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Publisher of SingleSource instances
* @return the new Flowable instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources) {
return concat(sources, 2);
}
Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
a Publisher sequence and prefetched by the specified amount.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer and the sources Publisher
is expected to honor it as well.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the Publisher of SingleSource instances
- prefetch – the number of SingleSources to prefetch from the Publisher
Type parameters: - <T> – the value type
Returns: the new Flowable instance Since: 2.0
/**
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided by
* a Publisher sequence and prefetched by the specified amount.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and the sources {@code Publisher} is expected to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Publisher of SingleSource instances
* @param prefetch the number of SingleSources to prefetch from the Publisher
* @return the new Flowable instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources, int prefetch) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
}
Returns a Flowable that emits the items emitted by two Singles, one after the other.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - source1 –
a Single to be concatenated
- source2 –
a Single to be concatenated
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits items emitted by the two source Singles, one after the other.
/**
* Returns a Flowable that emits the items emitted by two Singles, one after the other.
* <p>
* <img width="640" height="366" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be concatenated
* @param source2
* a Single to be concatenated
* @return a Flowable that emits items emitted by the two source Singles, one after the other.
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> concat(
SingleSource<? extends T> source1, SingleSource<? extends T> source2
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return concat(Flowable.fromArray(source1, source2));
}
Returns a Flowable that emits the items emitted by three Singles, one after the other.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - source1 –
a Single to be concatenated
- source2 –
a Single to be concatenated
- source3 –
a Single to be concatenated
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits items emitted by the three source Singles, one after the other.
/**
* Returns a Flowable that emits the items emitted by three Singles, one after the other.
* <p>
* <img width="640" height="366" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.o3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be concatenated
* @param source2
* a Single to be concatenated
* @param source3
* a Single to be concatenated
* @return a Flowable that emits items emitted by the three source Singles, one after the other.
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> concat(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return concat(Flowable.fromArray(source1, source2, source3));
}
Returns a Flowable that emits the items emitted by four Singles, one after the other.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - source1 –
a Single to be concatenated
- source2 –
a Single to be concatenated
- source3 –
a Single to be concatenated
- source4 –
a Single to be concatenated
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits items emitted by the four source Singles, one after the other.
/**
* Returns a Flowable that emits the items emitted by four Singles, one after the other.
* <p>
* <img width="640" height="362" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concat.o4.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a Single to be concatenated
* @param source2
* a Single to be concatenated
* @param source3
* a Single to be concatenated
* @param source4
* a Single to be concatenated
* @return a Flowable that emits items emitted by the four source Singles, one after the other.
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> concat(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3, SingleSource<? extends T> source4
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return concat(Flowable.fromArray(source1, source2, source3, source4));
}
Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided in
an array.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
concatArray
does not operate by default on a particular Scheduler
.
Params: - sources – the array of SingleSource instances
Type parameters: - <T> – the value type
Returns: the new Flowable instance Since: 2.0
/**
* Concatenate the single values, in a non-overlapping fashion, of the SingleSources provided in
* an array.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatArray.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of SingleSource instances
* @return the new Flowable instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concatArray(SingleSource<? extends T>... sources) {
return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY));
}
Concatenates a sequence of SingleSource eagerly into a single stream of values.
Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
source SingleSources. The operator buffers the value emitted by these SingleSources and then drains them
in order, each one after the previous one completes.
- Backpressure:
- The operator honors backpressure from downstream.
- Scheduler:
- This method does not operate by default on a particular
Scheduler
.
Params: - sources – a sequence of Single that need to be eagerly concatenated
Type parameters: - <T> – the value type
Returns: the new Flowable instance with the specified concatenation behavior
/**
* Concatenates a sequence of SingleSource eagerly into a single stream of values.
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source SingleSources. The operator buffers the value emitted by these SingleSources and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Single that need to be eagerly concatenated
* @return the new Flowable instance with the specified concatenation behavior
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatArrayEager(SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
}
Concatenates a Publisher sequence of SingleSources eagerly into a single stream of values.
Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
emitted source Publishers as they are observed. The operator buffers the values emitted by these
Publishers and then drains them in order, each one after the previous one completes.
- Backpressure:
- Backpressure is honored towards the downstream and the outer Publisher is expected to support backpressure. Violating this assumption, the operator will signal
MissingBackpressureException
.
- Scheduler:
- This method does not operate by default on a particular
Scheduler
.
Params: - sources – a sequence of Publishers that need to be eagerly concatenated
Type parameters: - <T> – the value type
Returns: the new Publisher instance with the specified concatenation behavior
/**
* Concatenates a Publisher sequence of SingleSources eagerly into a single stream of values.
* <p>
* <img width="640" height="307" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatEager.p.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source Publishers as they are observed. The operator buffers the values emitted by these
* Publishers and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and the outer Publisher is
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link io.reactivex.exceptions.MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of Publishers that need to be eagerly concatenated
* @return the new Publisher instance with the specified concatenation behavior
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatEager(Publisher<? extends SingleSource<? extends T>> sources) {
return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
}
Concatenates a sequence of SingleSources eagerly into a single stream of values.
Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
source SingleSources. The operator buffers the values emitted by these SingleSources and then drains them
in order, each one after the previous one completes.
- Backpressure:
- Backpressure is honored towards the downstream.
- Scheduler:
- This method does not operate by default on a particular
Scheduler
.
Params: - sources – a sequence of SingleSource that need to be eagerly concatenated
Type parameters: - <T> – the value type
Returns: the new Flowable instance with the specified concatenation behavior
/**
* Concatenates a sequence of SingleSources eagerly into a single stream of values.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatEager.i.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source SingleSources. The operator buffers the values emitted by these SingleSources and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of SingleSource that need to be eagerly concatenated
* @return the new Flowable instance with the specified concatenation behavior
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatEager(Iterable<? extends SingleSource<? extends T>> sources) {
return Flowable.fromIterable(sources).concatMapEager(SingleInternalHelper.<T>toFlowable());
}
Provides an API (via a cold Single) that bridges the reactive world with the callback-style world.
Example:
Single.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onSuccess(e);
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
- Scheduler:
create
does not operate by default on a particular Scheduler
.
Params: - source – the emitter that is called when a SingleObserver subscribes to the returned
Single
Type parameters: - <T> – the value type
See Also: Returns: the new Single instance
/**
* Provides an API (via a cold Single) that bridges the reactive world with the callback-style world.
* <p>
* <img width="640" height="454" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.create.png" alt="">
* <p>
* Example:
* <pre><code>
* Single.<Event>create(emitter -> {
* Callback listener = new Callback() {
* @Override
* public void onEvent(Event e) {
* emitter.onSuccess(e);
* }
*
* @Override
* public void onFailure(Exception e) {
* emitter.onError(e);
* }
* };
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellable(c::close);
*
* });
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param source the emitter that is called when a SingleObserver subscribes to the returned {@code Single}
* @return the new Single instance
* @see SingleOnSubscribe
* @see Cancellable
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> create(SingleOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleCreate<T>(source));
}
Calls a Callable
for each individual SingleObserver
to return the actual SingleSource
to be subscribed to.
- Scheduler:
defer
does not operate by default on a particular Scheduler
.
Params: - singleSupplier – the
Callable
that is called for each individual SingleObserver
and returns a SingleSource instance to subscribe to
Type parameters: - <T> – the value type
Returns: the new Single instance
/**
* Calls a {@link Callable} for each individual {@link SingleObserver} to return the actual {@link SingleSource} to
* be subscribed to.
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.defer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param singleSupplier the {@code Callable} that is called for each individual {@code SingleObserver} and
* returns a SingleSource instance to subscribe to
* @return the new Single instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> defer(final Callable<? extends SingleSource<? extends T>> singleSupplier) {
ObjectHelper.requireNonNull(singleSupplier, "singleSupplier is null");
return RxJavaPlugins.onAssembly(new SingleDefer<T>(singleSupplier));
}
Signals a Throwable returned by the callback function for each individual SingleObserver.
- Scheduler:
error
does not operate by default on a particular Scheduler
.
Params: - errorSupplier – the callable that is called for each individual SingleObserver and
returns a Throwable instance to be emitted.
Type parameters: - <T> – the value type
Returns: the new Single instance
/**
* Signals a Throwable returned by the callback function for each individual SingleObserver.
* <p>
* <img width="640" height="283" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.error.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code error} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param errorSupplier the callable that is called for each individual SingleObserver and
* returns a Throwable instance to be emitted.
* @return the new Single instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> error(final Callable<? extends Throwable> errorSupplier) {
ObjectHelper.requireNonNull(errorSupplier, "errorSupplier is null");
return RxJavaPlugins.onAssembly(new SingleError<T>(errorSupplier));
}
Returns a Single that invokes a subscriber's onError
method when the subscriber subscribes to it.
- Scheduler:
error
does not operate by default on a particular Scheduler
.
Params: - exception – the particular Throwable to pass to
onError
Type parameters: - <T> –
the type of the item (ostensibly) emitted by the Single
See Also: Returns: a Single that invokes the subscriber's onError
method when the subscriber subscribes to it
/**
* Returns a Single that invokes a subscriber's {@link SingleObserver#onError onError} method when the
* subscriber subscribes to it.
* <p>
* <img width="640" height="283" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.error.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code error} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param exception
* the particular Throwable to pass to {@link SingleObserver#onError onError}
* @param <T>
* the type of the item (ostensibly) emitted by the Single
* @return a Single that invokes the subscriber's {@link SingleObserver#onError onError} method when
* the subscriber subscribes to it
* @see <a href="http://reactivex.io/documentation/operators/empty-never-throw.html">ReactiveX operators documentation: Throw</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> error(final Throwable exception) {
ObjectHelper.requireNonNull(exception, "exception is null");
return error(Functions.justCallable(exception));
}
Returns a Single
that invokes passed function and emits its result for each new SingleObserver that subscribes. Allows you to defer execution of passed function until SingleObserver subscribes to the Single
. It makes passed function "lazy". Result of the function invocation will be emitted by the Single
.
- Scheduler:
fromCallable
does not operate by default on a particular Scheduler
.
- Error handling:
- If the
Callable
throws an exception, the respective Throwable
is delivered to the downstream via SingleObserver.onError(Throwable)
, except when the downstream has disposed this Single
source. In this latter case, the Throwable
is delivered to the global error handler via RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
Params: - callable – function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the
Single
.
Type parameters: Returns: a Single
whose SingleObserver
s' subscriptions trigger an invocation of the given function.
/**
* Returns a {@link Single} that invokes passed function and emits its result for each new SingleObserver that subscribes.
* <p>
* Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}.
* It makes passed function "lazy".
* Result of the function invocation will be emitted by the {@link Single}.
* <p>
* <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromCallable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Callable} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link SingleObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Single} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
*
* @param callable
* function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the {@link Single}.
* @param <T>
* the type of the item emitted by the {@link Single}.
* @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function.
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
ObjectHelper.requireNonNull(callable, "callable is null");
return RxJavaPlugins.onAssembly(new SingleFromCallable<T>(callable));
}
Converts a Future
into a Single
.
You can convert any object that supports the Future
interface into a Single that emits the return value of the Future.get
method of that object, by passing the object into the from
method.
Important note: This Single is blocking; you cannot dispose it.
- Scheduler:
fromFuture
does not operate by default on a particular Scheduler
.
Params: - future – the source
Future
Type parameters: See Also: Returns: a Single
that emits the item from the source Future
/**
* Converts a {@link Future} into a {@code Single}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into a Single that emits the return
* value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
* <p>
* <em>Important note:</em> This Single is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return a {@code Single} that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromFuture(Future<? extends T> future) {
return toSingle(Flowable.<T>fromFuture(future));
}
Converts a Future
into a Single
, with a timeout on the Future.
You can convert any object that supports the Future
interface into a Single
that emits the return value of the Future.get
method of that object, by passing the object into the from
method.
Important note: This Single
is blocking; you cannot dispose it.
- Scheduler:
fromFuture
does not operate by default on a particular Scheduler
.
Params: Type parameters: See Also: Returns: a Single
that emits the item from the source Future
/**
* Converts a {@link Future} into a {@code Single}, with a timeout on the Future.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <p>
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return a {@code Single} that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
return toSingle(Flowable.<T>fromFuture(future, timeout, unit));
}
Converts a Future
into a Single
, with a timeout on the Future.
You can convert any object that supports the Future
interface into a Single
that emits the return value of the Future.get
method of that object, by passing the object into the from
method.
Important note: This Single
is blocking; you cannot dispose it.
- Scheduler:
- You specify the
Scheduler
where the blocking wait will happen.
Params: Type parameters: See Also: Returns: a Single
that emits the item from the source Future
/**
* Converts a {@link Future} into a {@code Single}, with a timeout on the Future.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <p>
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify the {@link Scheduler} where the blocking wait will happen.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param scheduler
* the Scheduler to use for the blocking wait
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return a {@code Single} that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
return toSingle(Flowable.<T>fromFuture(future, timeout, unit, scheduler));
}
Converts a Future
, operating on a specified Scheduler
, into a Single
.
You can convert any object that supports the Future
interface into a Single
that emits the return value of the Future.get
method of that object, by passing the object into the from
method.
- Scheduler:
- You specify which
Scheduler
this operator will use.
Params: - future – the source
Future
- scheduler – the
Scheduler
to wait for the Future on. Use a Scheduler such as Schedulers.io()
that can block and wait on the Future
Type parameters: See Also: Returns: a Single
that emits the item from the source Future
/**
* Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@code Single}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.s.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param scheduler
* the {@link Scheduler} to wait for the Future on. Use a Scheduler such as
* {@link Schedulers#io()} that can block and wait on the Future
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return a {@code Single} that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Single<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
return toSingle(Flowable.<T>fromFuture(future, scheduler));
}
Wraps a specific Publisher into a Single and signals its single element or error.
If the source Publisher is empty, a NoSuchElementException is signalled. If
the source has more than one element, an IndexOutOfBoundsException is signalled.
The Publisher
must follow the Reactive Streams specification.
Violating the specification may result in undefined behavior.
If possible, use create(SingleOnSubscribe<Object>)
to create a source-like Single
instead.
Note that even though Publisher
appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda.
- Backpressure:
- The
publisher
is consumed in an unbounded fashion but will be cancelled if it produced more than one item.
- Scheduler:
fromPublisher
does not operate by default on a particular Scheduler
.
Params: - publisher – the source Publisher instance, not null
Type parameters: - <T> – the value type
See Also: Returns: the new Single instance
/**
* Wraps a specific Publisher into a Single and signals its single element or error.
* <p>If the source Publisher is empty, a NoSuchElementException is signalled. If
* the source has more than one element, an IndexOutOfBoundsException is signalled.
* <p>
* The {@link Publisher} must follow the
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams">Reactive Streams specification</a>.
* Violating the specification may result in undefined behavior.
* <p>
* If possible, use {@link #create(SingleOnSubscribe)} to create a
* source-like {@code Single} instead.
* <p>
* Note that even though {@link Publisher} appears to be a functional interface, it
* is not recommended to implement it through a lambda as the specification requires
* state management that is not achievable with a stateless lambda.
* <p>
* <img width="640" height="322" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromPublisher.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code publisher} is consumed in an unbounded fashion but will be cancelled
* if it produced more than one item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param publisher the source Publisher instance, not null
* @return the new Single instance
* @see #create(SingleOnSubscribe)
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromPublisher(final Publisher<? extends T> publisher) {
ObjectHelper.requireNonNull(publisher, "publisher is null");
return RxJavaPlugins.onAssembly(new SingleFromPublisher<T>(publisher));
}
Wraps a specific ObservableSource into a Single and signals its single element or error.
If the ObservableSource is empty, a NoSuchElementException is signalled.
If the source has more than one element, an IndexOutOfBoundsException is signalled.
- Scheduler:
fromObservable
does not operate by default on a particular Scheduler
.
Params: - observableSource – the source Observable, not null
Type parameters: Returns: the new Single instance
/**
* Wraps a specific ObservableSource into a Single and signals its single element or error.
* <p>If the ObservableSource is empty, a NoSuchElementException is signalled.
* If the source has more than one element, an IndexOutOfBoundsException is signalled.
* <p>
* <img width="640" height="343" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.fromObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param observableSource the source Observable, not null
* @param <T>
* the type of the item emitted by the {@link Single}.
* @return the new Single instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> fromObservable(ObservableSource<? extends T> observableSource) {
ObjectHelper.requireNonNull(observableSource, "observableSource is null");
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(observableSource, null));
}
Returns a Single
that emits a specified item.
To convert any object into a Single
that emits that object, pass that object into the just
method.
- Scheduler:
just
does not operate by default on a particular Scheduler
.
Params: - item –
the item to emit
Type parameters: - <T> –
the type of that item
See Also: Returns: a Single
that emits item
/**
* Returns a {@code Single} that emits a specified item.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.just.png" alt="">
* <p>
* To convert any object into a {@code Single} that emits that object, pass that object into the
* {@code just} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code just} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param item
* the item to emit
* @param <T>
* the type of that item
* @return a {@code Single} that emits {@code item}
* @see <a href="http://reactivex.io/documentation/operators/just.html">ReactiveX operators documentation: Just</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
running all SingleSources at once.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
SingleSource
s signal a Throwable
via onError
, the resulting Flowable
terminates with that Throwable
and all other source SingleSource
s are disposed. If more than one SingleSource
signals an error, the resulting Flowable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Flowable
has been cancelled or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(Iterable<? extends SingleSource<? extends Object>>)
to merge sources and terminate only when all source SingleSource
s have completed or failed with an error.
Params: - sources – the Iterable sequence of SingleSource sources
Type parameters: - <T> – the common and resulting value type
See Also: Returns: the new Flowable instance Since: 2.0
/**
* Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once.
* <p>
* <img width="640" height="319" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.i.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code SingleSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(Iterable)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the Iterable sequence of SingleSource sources
* @return the new Flowable instance
* @since 2.0
* @see #mergeDelayError(Iterable)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> merge(Iterable<? extends SingleSource<? extends T>> sources) {
return merge(Flowable.fromIterable(sources));
}
Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
running all SingleSources at once.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
SingleSource
s signal a Throwable
via onError
, the resulting Flowable
terminates with that Throwable
and all other source SingleSource
s are disposed. If more than one SingleSource
signals an error, the resulting Flowable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Flowable
has been cancelled or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(Publisher<? extends SingleSource<? extends Object>>)
to merge sources and terminate only when all source SingleSource
s have completed or failed with an error.
Params: - sources – the Flowable sequence of SingleSource sources
Type parameters: - <T> – the common and resulting value type
See Also: Returns: the new Flowable instance Since: 2.0
/**
* Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once.
* <p>
* <img width="640" height="307" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code SingleSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(Publisher)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the Flowable sequence of SingleSource sources
* @return the new Flowable instance
* @see #mergeDelayError(Publisher)
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> merge(Publisher<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize()));
}
Flattens a Single
that emits a Single
into a single Single
that emits the item emitted by the nested Single
, without any transformation.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- The resulting
Single
emits the outer source's or the inner SingleSource
's Throwable
as is. Unlike the other merge()
operators, this operator won't and can't produce a CompositeException
because there is only one possibility for the outer or the inner SingleSource
to emit an onError
signal. Therefore, there is no need for a mergeDelayError(SingleSource<SingleSource<T>>)
operator.
Params: - source – a
Single
that emits a Single
Type parameters: - <T> – the value type of the sources and the output
See Also: Returns: a Single
that emits the item that is the result of flattening the Single
emitted by source
/**
* Flattens a {@code Single} that emits a {@code Single} into a single {@code Single} that emits the item
* emitted by the nested {@code Single}, without any transformation.
* <p>
* <img width="640" height="412" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.oo.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dd>The resulting {@code Single} emits the outer source's or the inner {@code SingleSource}'s {@code Throwable} as is.
* Unlike the other {@code merge()} operators, this operator won't and can't produce a {@code CompositeException} because there is
* only one possibility for the outer or the inner {@code SingleSource} to emit an {@code onError} signal.
* Therefore, there is no need for a {@code mergeDelayError(SingleSource<SingleSource<T>>)} operator.
* </dd>
* </dl>
*
* @param <T> the value type of the sources and the output
* @param source
* a {@code Single} that emits a {@code Single}
* @return a {@code Single} that emits the item that is the result of flattening the {@code Single} emitted
* by {@code source}
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Single<T> merge(SingleSource<? extends SingleSource<? extends T>> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleFlatMap<SingleSource<? extends T>, T>(source, (Function)Functions.identity()));
}
Flattens two Singles into a single Flowable, without any transformation.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the merge
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
SingleSource
s signal a Throwable
via onError
, the resulting Flowable
terminates with that Throwable
and all other source SingleSource
s are disposed. If more than one SingleSource
signals an error, the resulting Flowable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Flowable
has been cancelled or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(SingleSource<? extends Object>, SingleSource<? extends Object>)
to merge sources and terminate only when all source SingleSource
s have completed or failed with an error.
Params: - source1 –
a SingleSource to be merged
- source2 –
a SingleSource to be merged
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits all of the items emitted by the source Singles
/**
* Flattens two Singles into a single Flowable, without any transformation.
* <p>
* <img width="640" height="414" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by
* using the {@code merge} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code SingleSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(SingleSource, SingleSource)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> merge(
SingleSource<? extends T> source1, SingleSource<? extends T> source2
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return merge(Flowable.fromArray(source1, source2));
}
Flattens three Singles into a single Flowable, without any transformation.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the merge
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
SingleSource
s signal a Throwable
via onError
, the resulting Flowable
terminates with that Throwable
and all other source SingleSource
s are disposed. If more than one SingleSource
signals an error, the resulting Flowable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Flowable
has been cancelled or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(SingleSource<? extends Object>, SingleSource<? extends Object>, SingleSource<? extends Object>)
to merge sources and terminate only when all source SingleSource
s have completed or failed with an error.
Params: - source1 –
a SingleSource to be merged
- source2 –
a SingleSource to be merged
- source3 –
a SingleSource to be merged
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits all of the items emitted by the source Singles
/**
* Flattens three Singles into a single Flowable, without any transformation.
* <p>
* <img width="640" height="366" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.o3.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code merge} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code SingleSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @param source3
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(SingleSource, SingleSource, SingleSource)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> merge(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return merge(Flowable.fromArray(source1, source2, source3));
}
Flattens four Singles into a single Flowable, without any transformation.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the merge
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
SingleSource
s signal a Throwable
via onError
, the resulting Flowable
terminates with that Throwable
and all other source SingleSource
s are disposed. If more than one SingleSource
signals an error, the resulting Flowable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Flowable
has been cancelled or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(SingleSource<? extends Object>, SingleSource<? extends Object>, SingleSource<? extends Object>, SingleSource<? extends Object>)
to merge sources and terminate only when all source SingleSource
s have completed or failed with an error.
Params: - source1 –
a SingleSource to be merged
- source2 –
a SingleSource to be merged
- source3 –
a SingleSource to be merged
- source4 –
a SingleSource to be merged
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits all of the items emitted by the source Singles
/**
* Flattens four Singles into a single Flowable, without any transformation.
* <p>
* <img width="640" height="362" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.o4.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code merge} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code SingleSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
*
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @param source3
* a SingleSource to be merged
* @param source4
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> merge(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3, SingleSource<? extends T> source4
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return merge(Flowable.fromArray(source1, source2, source3, source4));
}
Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
History: 2.1.9 - experimental
Params: - sources – the Iterable sequence of SingleSource sources
Type parameters: - <T> – the common and resulting value type
See Also: Returns: the new Flowable instance Since: 2.2
/**
* Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.9 - experimental
* @param <T> the common and resulting value type
* @param sources the Iterable sequence of SingleSource sources
* @return the new Flowable instance
* @see #merge(Iterable)
* @since 2.2
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> mergeDelayError(Iterable<? extends SingleSource<? extends T>> sources) {
return mergeDelayError(Flowable.fromIterable(sources));
}
Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
History: 2.1.9 - experimental
Params: - sources – the Flowable sequence of SingleSource sources
Type parameters: - <T> – the common and resulting value type
See Also: Returns: the new Flowable instance Since: 2.2
/**
* Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.9 - experimental
* @param <T> the common and resulting value type
* @param sources the Flowable sequence of SingleSource sources
* @return the new Flowable instance
* @see #merge(Publisher)
* @since 2.2
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> mergeDelayError(Publisher<? extends SingleSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize()));
}
Flattens two Singles into a single Flowable, without any transformation, delaying
any error(s) until all sources succeed or fail.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the mergeDelayError
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
History: 2.1.9 - experimental
Params: - source1 –
a SingleSource to be merged
- source2 –
a SingleSource to be merged
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits all of the items emitted by the source Singles Since: 2.2
/**
* Flattens two Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by
* using the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.9 - experimental
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource)
* @since 2.2
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return mergeDelayError(Flowable.fromArray(source1, source2));
}
Flattens three Singles into a single Flowable, without any transformation, delaying
any error(s) until all sources succeed or fail.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the mergeDelayError
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
History: 2.1.9 - experimental
Params: - source1 –
a SingleSource to be merged
- source2 –
a SingleSource to be merged
- source3 –
a SingleSource to be merged
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits all of the items emitted by the source Singles Since: 2.2
/**
* Flattens three Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.9 - experimental
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @param source3
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource, SingleSource)
* @since 2.2
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3));
}
Flattens four Singles into a single Flowable, without any transformation, delaying
any error(s) until all sources succeed or fail.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the mergeDelayError
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
History: 2.1.9 - experimental
Params: - source1 –
a SingleSource to be merged
- source2 –
a SingleSource to be merged
- source3 –
a SingleSource to be merged
- source4 –
a SingleSource to be merged
Type parameters: - <T> – the common value type
See Also: Returns: a Flowable that emits all of the items emitted by the source Singles Since: 2.2
/**
* Flattens four Singles into a single Flowable, without any transformation, delaying
* any error(s) until all sources succeed or fail.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeDelayError} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.9 - experimental
* @param <T> the common value type
* @param source1
* a SingleSource to be merged
* @param source2
* a SingleSource to be merged
* @param source3
* a SingleSource to be merged
* @param source4
* a SingleSource to be merged
* @return a Flowable that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @see #merge(SingleSource, SingleSource, SingleSource, SingleSource)
* @since 2.2
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Flowable<T> mergeDelayError(
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
SingleSource<? extends T> source3, SingleSource<? extends T> source4
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4));
}
Returns a singleton instance of a never-signalling Single (only calls onSubscribe).
- Scheduler:
never
does not operate by default on a particular Scheduler
.
Type parameters: - <T> – the target value type
Returns: the singleton never instance Since: 2.0
/**
* Returns a singleton instance of a never-signalling Single (only calls onSubscribe).
* <p>
* <img width="640" height="244" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.never.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code never} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the target value type
* @return the singleton never instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Single<T> never() {
return RxJavaPlugins.onAssembly((Single<T>) SingleNever.INSTANCE);
}
Signals success with 0L value after the given delay for each SingleObserver.
- Scheduler:
timer
operates by default on the computation
Scheduler
.
Params: - delay – the delay amount
- unit – the time unit of the delay
Returns: the new Single instance Since: 2.0
/**
* Signals success with 0L value after the given delay for each SingleObserver.
* <p>
* <img width="640" height="292" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.timer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timer} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param delay the delay amount
* @param unit the time unit of the delay
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Single<Long> timer(long delay, TimeUnit unit) {
return timer(delay, unit, Schedulers.computation());
}
Signals success with 0L value after the given delay for each SingleObserver.
- Scheduler:
- you specify the
Scheduler
to signal on.
Params: - delay – the delay amount
- unit – the time unit of the delay
- scheduler – the scheduler where the single 0L will be emitted
Throws: - NullPointerException –
if unit is null, or
if scheduler is null
Returns: the new Single instance Since: 2.0
/**
* Signals success with 0L value after the given delay for each SingleObserver.
* <p>
* <img width="640" height="292" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.timer.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify the {@link Scheduler} to signal on.</dd>
* </dl>
* @param delay the delay amount
* @param unit the time unit of the delay
* @param scheduler the scheduler where the single 0L will be emitted
* @return the new Single instance
* @throws NullPointerException
* if unit is null, or
* if scheduler is null
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Single<Long> timer(final long delay, final TimeUnit unit, final Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleTimer(delay, unit, scheduler));
}
Compares two SingleSources and emits true if they emit the same value (compared via Object.equals).
- Scheduler:
equals
does not operate by default on a particular Scheduler
.
Params: - first – the first SingleSource instance
- second – the second SingleSource instance
Type parameters: - <T> – the common value type
Returns: the new Single instance Since: 2.0
/**
* Compares two SingleSources and emits true if they emit the same value (compared via Object.equals).
* <p>
* <img width="640" height="465" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.equals.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code equals} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common value type
* @param first the first SingleSource instance
* @param second the second SingleSource instance
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<Boolean> equals(final SingleSource<? extends T> first, final SingleSource<? extends T> second) { // NOPMD
ObjectHelper.requireNonNull(first, "first is null");
ObjectHelper.requireNonNull(second, "second is null");
return RxJavaPlugins.onAssembly(new SingleEquals<T>(first, second));
}
Advanced use only: creates a Single instance without
any safeguards by using a callback that is called with a SingleObserver.
- Scheduler:
unsafeCreate
does not operate by default on a particular Scheduler
.
Params: - onSubscribe – the function that is called with the subscribing SingleObserver
Type parameters: - <T> – the value type
Throws: - IllegalArgumentException – if
source
is a subclass of Single
; such instances don't need conversion and is possibly a port remnant from 1.x or one should use hide()
instead.
Returns: the new Single instance Since: 2.0
/**
* <strong>Advanced use only:</strong> creates a Single instance without
* any safeguards by using a callback that is called with a SingleObserver.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code unsafeCreate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param onSubscribe the function that is called with the subscribing SingleObserver
* @return the new Single instance
* @throws IllegalArgumentException if {@code source} is a subclass of {@code Single}; such
* instances don't need conversion and is possibly a port remnant from 1.x or one should use {@link #hide()}
* instead.
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> unsafeCreate(SingleSource<T> onSubscribe) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
if (onSubscribe instanceof Single) {
throw new IllegalArgumentException("unsafeCreate(Single) should be upgraded");
}
return RxJavaPlugins.onAssembly(new SingleFromUnsafeSource<T>(onSubscribe));
}
Allows using and disposing a resource while running a SingleSource instance generated from
that resource (similar to a try-with-resources).
- Scheduler:
using
does not operate by default on a particular Scheduler
.
Params: - resourceSupplier – the Callable called for each SingleObserver to generate a resource Object
- singleFunction – the function called with the returned resource Object from
resourceSupplier
and should return a SingleSource instance to be run by the operator - disposer – the consumer of the generated resource that is called exactly once for
that particular resource when the generated SingleSource terminates
(successfully or with an error) or gets disposed.
Type parameters: Returns: the new Single instance Since: 2.0
/**
* Allows using and disposing a resource while running a SingleSource instance generated from
* that resource (similar to a try-with-resources).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the SingleSource generated
* @param <U> the resource type
* @param resourceSupplier the Callable called for each SingleObserver to generate a resource Object
* @param singleFunction the function called with the returned resource
* Object from {@code resourceSupplier} and should return a SingleSource instance
* to be run by the operator
* @param disposer the consumer of the generated resource that is called exactly once for
* that particular resource when the generated SingleSource terminates
* (successfully or with an error) or gets disposed.
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, U> Single<T> using(Callable<U> resourceSupplier,
Function<? super U, ? extends SingleSource<? extends T>> singleFunction,
Consumer<? super U> disposer) {
return using(resourceSupplier, singleFunction, disposer, true);
}
Allows using and disposing a resource while running a SingleSource instance generated from
that resource (similar to a try-with-resources).
- Scheduler:
using
does not operate by default on a particular Scheduler
.
Params: - resourceSupplier – the Callable called for each SingleObserver to generate a resource Object
- singleFunction – the function called with the returned resource Object from
resourceSupplier
and should return a SingleSource instance to be run by the operator - disposer – the consumer of the generated resource that is called exactly once for
that particular resource when the generated SingleSource terminates
(successfully or with an error) or gets disposed.
- eager –
if true, the disposer is called before the terminal event is signalled
if false, the disposer is called after the terminal event is delivered to downstream
Type parameters: Returns: the new Single instance Since: 2.0
/**
* Allows using and disposing a resource while running a SingleSource instance generated from
* that resource (similar to a try-with-resources).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the SingleSource generated
* @param <U> the resource type
* @param resourceSupplier the Callable called for each SingleObserver to generate a resource Object
* @param singleFunction the function called with the returned resource
* Object from {@code resourceSupplier} and should return a SingleSource instance
* to be run by the operator
* @param disposer the consumer of the generated resource that is called exactly once for
* that particular resource when the generated SingleSource terminates
* (successfully or with an error) or gets disposed.
* @param eager
* if true, the disposer is called before the terminal event is signalled
* if false, the disposer is called after the terminal event is delivered to downstream
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, U> Single<T> using(
final Callable<U> resourceSupplier,
final Function<? super U, ? extends SingleSource<? extends T>> singleFunction,
final Consumer<? super U> disposer,
final boolean eager) {
ObjectHelper.requireNonNull(resourceSupplier, "resourceSupplier is null");
ObjectHelper.requireNonNull(singleFunction, "singleFunction is null");
ObjectHelper.requireNonNull(disposer, "disposer is null");
return RxJavaPlugins.onAssembly(new SingleUsing<T, U>(resourceSupplier, singleFunction, disposer, eager));
}
Wraps a SingleSource instance into a new Single instance if not already a Single
instance.
- Scheduler:
wrap
does not operate by default on a particular Scheduler
.
Params: - source – the source to wrap
Type parameters: - <T> – the value type
Returns: the Single wrapper or the source cast to Single (if possible)
/**
* Wraps a SingleSource instance into a new Single instance if not already a Single
* instance.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code wrap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param source the source to wrap
* @return the Single wrapper or the source cast to Single (if possible)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Single<T> wrap(SingleSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Single) {
return RxJavaPlugins.onAssembly((Single<T>)source);
}
return RxJavaPlugins.onAssembly(new SingleFromUnsafeSource<T>(source));
}
Waits until all SingleSource sources provided by the Iterable sequence signal a success
value and calls a zipper function with an array of these values to return a result
to be emitted to downstream.
If the Iterable
of SingleSource
s is empty a NoSuchElementException
error is signalled after subscription.
Note on method signature: since Java doesn't allow creating a generic array with new T[]
, the implementation of this operator has to create an Object[]
instead. Unfortunately, a Function<Integer[], R>
passed to the method would trigger a ClassCastException
.
If any of the SingleSources signal an error, all other SingleSources get disposed and the
error emitted to downstream immediately.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - sources – the Iterable sequence of SingleSource instances. An empty sequence will result in an
onError
signal of NoSuchElementException
. - zipper – the function that receives an array with values from each SingleSource
and should return a value to be emitted to downstream
Type parameters: Returns: the new Single instance Since: 2.0
/**
* Waits until all SingleSource sources provided by the Iterable sequence signal a success
* value and calls a zipper function with an array of these values to return a result
* to be emitted to downstream.
* <p>
* If the {@code Iterable} of {@link SingleSource}s is empty a {@link NoSuchElementException} error is signalled after subscription.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
* <p>
* If any of the SingleSources signal an error, all other SingleSources get disposed and the
* error emitted to downstream immediately.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common value type
* @param <R> the result value type
* @param sources the Iterable sequence of SingleSource instances. An empty sequence will result in an
* {@code onError} signal of {@link NoSuchElementException}.
* @param zipper the function that receives an array with values from each SingleSource
* and should return a value to be emitted to downstream
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Single<R> zip(final Iterable<? extends SingleSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new SingleZipIterable<T, R>(sources, zipper));
}
Returns a Single that emits the results of a specified combiner function applied to two items emitted by
two other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to two items emitted by
* two other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
return zipArray(Functions.toFunction(zipper), source1, source2);
}
Returns a Single that emits the results of a specified combiner function applied to three items emitted
by three other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to three items emitted
* by three other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3);
}
Returns a Single that emits the results of a specified combiner function applied to four items
emitted by four other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- source4 –
a fourth source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to four items
* emitted by four other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <T4> the fourth source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param source4
* a fourth source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, T4, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3, SingleSource<? extends T4> source4,
Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3, source4);
}
Returns a Single that emits the results of a specified combiner function applied to five items
emitted by five other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- source4 –
a fourth source Single
- source5 –
a fifth source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to five items
* emitted by five other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <T4> the fourth source Single's value type
* @param <T5> the fifth source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param source4
* a fourth source Single
* @param source5
* a fifth source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, T4, T5, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3, SingleSource<? extends T4> source4,
SingleSource<? extends T5> source5,
Function5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3, source4, source5);
}
Returns a Single that emits the results of a specified combiner function applied to six items
emitted by six other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- source4 –
a fourth source Single
- source5 –
a fifth source Single
- source6 –
a sixth source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to six items
* emitted by six other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <T4> the fourth source Single's value type
* @param <T5> the fifth source Single's value type
* @param <T6> the sixth source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param source4
* a fourth source Single
* @param source5
* a fifth source Single
* @param source6
* a sixth source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, T4, T5, T6, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3, SingleSource<? extends T4> source4,
SingleSource<? extends T5> source5, SingleSource<? extends T6> source6,
Function6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3, source4, source5, source6);
}
Returns a Single that emits the results of a specified combiner function applied to seven items
emitted by seven other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- source4 –
a fourth source Single
- source5 –
a fifth source Single
- source6 –
a sixth source Single
- source7 –
a seventh source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: - <T1> – the first source Single's value type
- <T2> – the second source Single's value type
- <T3> – the third source Single's value type
- <T4> – the fourth source Single's value type
- <T5> – the fifth source Single's value type
- <T6> – the sixth source Single's value type
- <T7> – the seventh source Single's value type
- <R> – the result value type
See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to seven items
* emitted by seven other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <T4> the fourth source Single's value type
* @param <T5> the fifth source Single's value type
* @param <T6> the sixth source Single's value type
* @param <T7> the seventh source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param source4
* a fourth source Single
* @param source5
* a fifth source Single
* @param source6
* a sixth source Single
* @param source7
* a seventh source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, T4, T5, T6, T7, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3, SingleSource<? extends T4> source4,
SingleSource<? extends T5> source5, SingleSource<? extends T6> source6,
SingleSource<? extends T7> source7,
Function7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3, source4, source5, source6, source7);
}
Returns a Single that emits the results of a specified combiner function applied to eight items
emitted by eight other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- source4 –
a fourth source Single
- source5 –
a fifth source Single
- source6 –
a sixth source Single
- source7 –
a seventh source Single
- source8 –
an eighth source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: - <T1> – the first source Single's value type
- <T2> – the second source Single's value type
- <T3> – the third source Single's value type
- <T4> – the fourth source Single's value type
- <T5> – the fifth source Single's value type
- <T6> – the sixth source Single's value type
- <T7> – the seventh source Single's value type
- <T8> – the eighth source Single's value type
- <R> – the result value type
See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to eight items
* emitted by eight other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <T4> the fourth source Single's value type
* @param <T5> the fifth source Single's value type
* @param <T6> the sixth source Single's value type
* @param <T7> the seventh source Single's value type
* @param <T8> the eighth source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param source4
* a fourth source Single
* @param source5
* a fifth source Single
* @param source6
* a sixth source Single
* @param source7
* a seventh source Single
* @param source8
* an eighth source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3, SingleSource<? extends T4> source4,
SingleSource<? extends T5> source5, SingleSource<? extends T6> source6,
SingleSource<? extends T7> source7, SingleSource<? extends T8> source8,
Function8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3, source4, source5, source6, source7, source8);
}
Returns a Single that emits the results of a specified combiner function applied to nine items
emitted by nine other Singles.
- Scheduler:
zip
does not operate by default on a particular Scheduler
.
Params: - source1 –
the first source Single
- source2 –
a second source Single
- source3 –
a third source Single
- source4 –
a fourth source Single
- source5 –
a fifth source Single
- source6 –
a sixth source Single
- source7 –
a seventh source Single
- source8 –
an eighth source Single
- source9 –
a ninth source Single
- zipper –
a function that, when applied to the item emitted by each of the source Singles, results in an
item that will be emitted by the resulting Single
Type parameters: - <T1> – the first source Single's value type
- <T2> – the second source Single's value type
- <T3> – the third source Single's value type
- <T4> – the fourth source Single's value type
- <T5> – the fifth source Single's value type
- <T6> – the sixth source Single's value type
- <T7> – the seventh source Single's value type
- <T8> – the eighth source Single's value type
- <T9> – the ninth source Single's value type
- <R> – the result value type
See Also: Returns: a Single that emits the zipped results
/**
* Returns a Single that emits the results of a specified combiner function applied to nine items
* emitted by nine other Singles.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T1> the first source Single's value type
* @param <T2> the second source Single's value type
* @param <T3> the third source Single's value type
* @param <T4> the fourth source Single's value type
* @param <T5> the fifth source Single's value type
* @param <T6> the sixth source Single's value type
* @param <T7> the seventh source Single's value type
* @param <T8> the eighth source Single's value type
* @param <T9> the ninth source Single's value type
* @param <R> the result value type
* @param source1
* the first source Single
* @param source2
* a second source Single
* @param source3
* a third source Single
* @param source4
* a fourth source Single
* @param source5
* a fifth source Single
* @param source6
* a sixth source Single
* @param source7
* a seventh source Single
* @param source8
* an eighth source Single
* @param source9
* a ninth source Single
* @param zipper
* a function that, when applied to the item emitted by each of the source Singles, results in an
* item that will be emitted by the resulting Single
* @return a Single that emits the zipped results
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Single<R> zip(
SingleSource<? extends T1> source1, SingleSource<? extends T2> source2,
SingleSource<? extends T3> source3, SingleSource<? extends T4> source4,
SingleSource<? extends T5> source5, SingleSource<? extends T6> source6,
SingleSource<? extends T7> source7, SingleSource<? extends T8> source8,
SingleSource<? extends T9> source9,
Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipper
) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
ObjectHelper.requireNonNull(source4, "source4 is null");
ObjectHelper.requireNonNull(source5, "source5 is null");
ObjectHelper.requireNonNull(source6, "source6 is null");
ObjectHelper.requireNonNull(source7, "source7 is null");
ObjectHelper.requireNonNull(source8, "source8 is null");
ObjectHelper.requireNonNull(source9, "source9 is null");
return zipArray(Functions.toFunction(zipper), source1, source2, source3, source4, source5, source6, source7, source8, source9);
}
Waits until all SingleSource sources provided via an array signal a success
value and calls a zipper function with an array of these values to return a result
to be emitted to downstream.
If the array of SingleSource
s is empty a NoSuchElementException
error is signalled immediately.
Note on method signature: since Java doesn't allow creating a generic array with new T[]
, the implementation of this operator has to create an Object[]
instead. Unfortunately, a Function<Integer[], R>
passed to the method would trigger a ClassCastException
.
If any of the SingleSources signal an error, all other SingleSources get disposed and the
error emitted to downstream immediately.
- Scheduler:
zipArray
does not operate by default on a particular Scheduler
.
Params: - sources – the array of SingleSource instances. An empty sequence will result in an
onError
signal of NoSuchElementException
. - zipper – the function that receives an array with values from each SingleSource
and should return a value to be emitted to downstream
Type parameters: Returns: the new Single instance Since: 2.0
/**
* Waits until all SingleSource sources provided via an array signal a success
* value and calls a zipper function with an array of these values to return a result
* to be emitted to downstream.
* <p>
* If the array of {@link SingleSource}s is empty a {@link NoSuchElementException} error is signalled immediately.
* <p>
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
*
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
* <p>
* If any of the SingleSources signal an error, all other SingleSources get disposed and the
* error emitted to downstream immediately.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zipArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the common value type
* @param <R> the result value type
* @param sources the array of SingleSource instances. An empty sequence will result in an
* {@code onError} signal of {@link NoSuchElementException}.
* @param zipper the function that receives an array with values from each SingleSource
* and should return a value to be emitted to downstream
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R> zipper, SingleSource<? extends T>... sources) {
ObjectHelper.requireNonNull(zipper, "zipper is null");
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return error(new NoSuchElementException());
}
return RxJavaPlugins.onAssembly(new SingleZipArray<T, R>(sources, zipper));
}
Signals the event of this or the other SingleSource whichever signals first.
- Scheduler:
ambWith
does not operate by default on a particular Scheduler
.
Params: - other – the other SingleSource to race for the first emission of success or error
Returns: the new Single instance. A subscription to this provided source will occur after subscribing
to the current source. Since: 2.0
/**
* Signals the event of this or the other SingleSource whichever signals first.
* <p>
* <img width="640" height="463" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.ambWith.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other SingleSource to race for the first emission of success or error
* @return the new Single instance. A subscription to this provided source will occur after subscribing
* to the current source.
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public final Single<T> ambWith(SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return ambArray(this, other);
}
Calls the specified converter function during assembly time and returns its resulting value.
This allows fluent conversion to any other type.
- Scheduler:
as
does not operate by default on a particular Scheduler
.
History: 2.1.7 - experimental
Params: - converter – the function that receives the current Single instance and returns a value
Type parameters: - <R> – the resulting object type
Throws: - NullPointerException – if converter is null
Returns: the converted value Since: 2.2
/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* <img width="640" height="553" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.as.png" alt="">
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.7 - experimental
* @param <R> the resulting object type
* @param converter the function that receives the current Single instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull SingleConverter<T, ? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}
Hides the identity of the current Single, including the Disposable that is sent to the downstream via onSubscribe()
.
- Scheduler:
hide
does not operate by default on a particular Scheduler
.
Returns: the new Single instance Since: 2.0
/**
* Hides the identity of the current Single, including the Disposable that is sent
* to the downstream via {@code onSubscribe()}.
* <p>
* <img width="640" height="458" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.hide.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code hide} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> hide() {
return RxJavaPlugins.onAssembly(new SingleHide<T>(this));
}
Transform a Single by applying a particular Transformer function to it.
This method operates on the Single itself whereas lift
operates on the Single's SingleObservers.
If the operator you are creating is designed to act on the individual item emitted by a Single, use lift
. If your operator is designed to transform the source Single as a whole (for instance, by applying a particular set of existing RxJava operators to it) use compose
.
- Scheduler:
compose
does not operate by default on a particular Scheduler
.
Params: - transformer – the transformer function, not null
Type parameters: - <R> – the value type of the single returned by the transformer function
See Also: Returns: the source Single, transformed by the transformer function
/**
* Transform a Single by applying a particular Transformer function to it.
* <p>
* <img width="640" height="612" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.compose.png" alt="">
* <p>
* This method operates on the Single itself whereas {@link #lift} operates on the Single's SingleObservers.
* <p>
* If the operator you are creating is designed to act on the individual item emitted by a Single, use
* {@link #lift}. If your operator is designed to transform the source Single as a whole (for instance, by
* applying a particular set of existing RxJava operators to it) use {@code compose}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the value type of the single returned by the transformer function
* @param transformer the transformer function, not null
* @return the source Single, transformed by the transformer function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> compose(SingleTransformer<? super T, ? extends R> transformer) {
return wrap(((SingleTransformer<T, R>) ObjectHelper.requireNonNull(transformer, "transformer is null")).apply(this));
}
Stores the success value or exception from the current Single and replays it to late SingleObservers.
The returned Single subscribes to the current Single when the first SingleObserver subscribes.
- Scheduler:
cache
does not operate by default on a particular Scheduler
.
Returns: the new Single instance Since: 2.0
/**
* Stores the success value or exception from the current Single and replays it to late SingleObservers.
* <p>
* The returned Single subscribes to the current Single when the first SingleObserver subscribes.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> cache() {
return RxJavaPlugins.onAssembly(new SingleCache<T>(this));
}
Casts the success value of the current Single into the target type or signals a
ClassCastException if not compatible.
- Scheduler:
cast
does not operate by default on a particular Scheduler
.
Params: - clazz – the type token to use for casting the success result from the current Single
Type parameters: - <U> – the target type
Returns: the new Single instance Since: 2.0
/**
* Casts the success value of the current Single into the target type or signals a
* ClassCastException if not compatible.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code cast} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the target type
* @param clazz the type token to use for casting the success result from the current Single
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Single<U> cast(final Class<? extends U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return map(Functions.castFunction(clazz));
}
Returns a Flowable that emits the item emitted by the source Single, then the item emitted by the
specified Single.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
concatWith
does not operate by default on a particular Scheduler
.
Params: - other –
a Single to be concatenated after the current
See Also: Returns: a Flowable that emits the item emitted by the source Single, followed by the item emitted by t1
/**
* Returns a Flowable that emits the item emitted by the source Single, then the item emitted by the
* specified Single.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatWith.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* a Single to be concatenated after the current
* @return a Flowable that emits the item emitted by the source Single, followed by the item emitted by
* {@code t1}
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> concatWith(SingleSource<? extends T> other) {
return concat(this, other);
}
Delays the emission of the success signal from the current Single by the specified amount.
An error signal will not be delayed.
- Scheduler:
delay
operates by default on the computation
Scheduler
.
Params: - time – the amount of time the success signal should be delayed for
- unit – the time unit
Returns: the new Single instance Since: 2.0
/**
* Delays the emission of the success signal from the current Single by the specified amount.
* An error signal will not be delayed.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.delay.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time the amount of time the success signal should be delayed for
* @param unit the time unit
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> delay(long time, TimeUnit unit) {
return delay(time, unit, Schedulers.computation(), false);
}
Delays the emission of the success or error signal from the current Single by the specified amount.
- Scheduler:
delay
operates by default on the computation
Scheduler
.
History: 2.1.5 - experimental
Params: - time – the amount of time the success or error signal should be delayed for
- unit – the time unit
- delayError – if true, both success and error signals are delayed. if false, only success signals are delayed.
Returns: the new Single instance Since: 2.2
/**
* Delays the emission of the success or error signal from the current Single by the specified amount.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.delay.e.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.5 - experimental
* @param time the amount of time the success or error signal should be delayed for
* @param unit the time unit
* @param delayError if true, both success and error signals are delayed. if false, only success signals are delayed.
* @return the new Single instance
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
return delay(time, unit, Schedulers.computation(), delayError);
}
Delays the emission of the success signal from the current Single by the specified amount.
An error signal will not be delayed.
- Scheduler:
- you specify the
Scheduler
where the non-blocking wait and emission happens
Params: - time – the amount of time the success signal should be delayed for
- unit – the time unit
- scheduler – the target scheduler to use for the non-blocking wait and emission
Throws: - NullPointerException –
if unit is null, or
if scheduler is null
Returns: the new Single instance Since: 2.0
/**
* Delays the emission of the success signal from the current Single by the specified amount.
* An error signal will not be delayed.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.delay.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
* </dl>
*
* @param time the amount of time the success signal should be delayed for
* @param unit the time unit
* @param scheduler the target scheduler to use for the non-blocking wait and emission
* @return the new Single instance
* @throws NullPointerException
* if unit is null, or
* if scheduler is null
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler) {
return delay(time, unit, scheduler, false);
}
Delays the emission of the success or error signal from the current Single by the specified amount.
- Scheduler:
- you specify the
Scheduler
where the non-blocking wait and emission happens
History: 2.1.5 - experimental
Params: - time – the amount of time the success or error signal should be delayed for
- unit – the time unit
- scheduler – the target scheduler to use for the non-blocking wait and emission
- delayError – if true, both success and error signals are delayed. if false, only success signals are delayed.
Throws: - NullPointerException –
if unit is null, or
if scheduler is null
Returns: the new Single instance Since: 2.2
/**
* Delays the emission of the success or error signal from the current Single by the specified amount.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.delay.se.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
* </dl>
* <p>History: 2.1.5 - experimental
* @param time the amount of time the success or error signal should be delayed for
* @param unit the time unit
* @param scheduler the target scheduler to use for the non-blocking wait and emission
* @param delayError if true, both success and error signals are delayed. if false, only success signals are delayed.
* @return the new Single instance
* @throws NullPointerException
* if unit is null, or
* if scheduler is null
* @since 2.2
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler, boolean delayError) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler, delayError));
}
Delays the actual subscription to the current Single until the given other CompletableSource
completes.
If the delaying source signals an error, that error is re-emitted and no subscription
to the current Single happens.
- Scheduler:
delaySubscription
does not operate by default on a particular Scheduler
.
Params: - other – the CompletableSource that has to complete before the subscription to the
current Single happens
Returns: the new Single instance Since: 2.0
/**
* Delays the actual subscription to the current Single until the given other CompletableSource
* completes.
* <p>If the delaying source signals an error, that error is re-emitted and no subscription
* to the current Single happens.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delaySubscription} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the CompletableSource that has to complete before the subscription to the
* current Single happens
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> delaySubscription(CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new SingleDelayWithCompletable<T>(this, other));
}
Delays the actual subscription to the current Single until the given other SingleSource
signals success.
If the delaying source signals an error, that error is re-emitted and no subscription
to the current Single happens.
- Scheduler:
delaySubscription
does not operate by default on a particular Scheduler
.
Params: - other – the SingleSource that has to complete before the subscription to the
current Single happens
Type parameters: - <U> – the element type of the other source
Returns: the new Single instance Since: 2.0
/**
* Delays the actual subscription to the current Single until the given other SingleSource
* signals success.
* <p>If the delaying source signals an error, that error is re-emitted and no subscription
* to the current Single happens.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delaySubscription} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the element type of the other source
* @param other the SingleSource that has to complete before the subscription to the
* current Single happens
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Single<T> delaySubscription(SingleSource<U> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new SingleDelayWithSingle<T, U>(this, other));
}
Delays the actual subscription to the current Single until the given other ObservableSource
signals its first value or completes.
If the delaying source signals an error, that error is re-emitted and no subscription
to the current Single happens.
- Scheduler:
delaySubscription
does not operate by default on a particular Scheduler
.
Params: - other – the ObservableSource that has to signal a value or complete before the
subscription to the current Single happens
Type parameters: - <U> – the element type of the other source
Returns: the new Single instance Since: 2.0
/**
* Delays the actual subscription to the current Single until the given other ObservableSource
* signals its first value or completes.
* <p>If the delaying source signals an error, that error is re-emitted and no subscription
* to the current Single happens.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delaySubscription} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the element type of the other source
* @param other the ObservableSource that has to signal a value or complete before the
* subscription to the current Single happens
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Single<T> delaySubscription(ObservableSource<U> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new SingleDelayWithObservable<T, U>(this, other));
}
Delays the actual subscription to the current Single until the given other Publisher
signals its first value or completes.
If the delaying source signals an error, that error is re-emitted and no subscription
to the current Single happens.
The other source is consumed in an unbounded manner (requesting Long.MAX_VALUE from it).
- Backpressure:
- The
other
publisher is consumed in an unbounded fashion but will be cancelled after the first item it produced.
- Scheduler:
delaySubscription
does not operate by default on a particular Scheduler
.
Params: - other – the Publisher that has to signal a value or complete before the
subscription to the current Single happens
Type parameters: - <U> – the element type of the other source
Returns: the new Single instance Since: 2.0
/**
* Delays the actual subscription to the current Single until the given other Publisher
* signals its first value or completes.
* <p>If the delaying source signals an error, that error is re-emitted and no subscription
* to the current Single happens.
* <p>The other source is consumed in an unbounded manner (requesting Long.MAX_VALUE from it).
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code other} publisher is consumed in an unbounded fashion but will be
* cancelled after the first item it produced.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delaySubscription} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the element type of the other source
* @param other the Publisher that has to signal a value or complete before the
* subscription to the current Single happens
* @return the new Single instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Single<T> delaySubscription(Publisher<U> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new SingleDelayWithPublisher<T, U>(this, other));
}
Delays the actual subscription to the current Single until the given time delay elapsed.
- Scheduler:
delaySubscription
does by default subscribe to the current Single on the computation
Scheduler
after the delay.
Params: - time – the time amount to wait with the subscription
- unit – the time unit of the waiting
Returns: the new Single instance Since: 2.0
/**
* Delays the actual subscription to the current Single until the given time delay elapsed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delaySubscription} does by default subscribe to the current Single
* on the {@code computation} {@link Scheduler} after the delay.</dd>
* </dl>
* @param time the time amount to wait with the subscription
* @param unit the time unit of the waiting
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> delaySubscription(long time, TimeUnit unit) {
return delaySubscription(time, unit, Schedulers.computation());
}
Delays the actual subscription to the current Single until the given time delay elapsed.
- Scheduler:
delaySubscription
does by default subscribe to the current Single on the Scheduler
you provided, after the delay.
Params: - time – the time amount to wait with the subscription
- unit – the time unit of the waiting
- scheduler – the scheduler to wait on and subscribe on to the current Single
Returns: the new Single instance Since: 2.0
/**
* Delays the actual subscription to the current Single until the given time delay elapsed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delaySubscription} does by default subscribe to the current Single
* on the {@link Scheduler} you provided, after the delay.</dd>
* </dl>
* @param time the time amount to wait with the subscription
* @param unit the time unit of the waiting
* @param scheduler the scheduler to wait on and subscribe on to the current Single
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> delaySubscription(long time, TimeUnit unit, Scheduler scheduler) {
return delaySubscription(Observable.timer(time, unit, scheduler));
}
Maps the Notification
success value of this Single back into normal onSuccess
, onError
or onComplete
signals as a Maybe
source. The intended use of the selector
function is to perform a type-safe identity mapping (see example) on a source that is already of type Notification<T>
. The Java language doesn't allow limiting instance methods to a certain generic argument shape, therefore, a function is used to ensure the conversion remains type safe.
- Scheduler:
dematerialize
does not operate by default on a particular Scheduler
.
Example:
Single.just(Notification.createOnNext(1))
.dematerialize(notification -> notification)
.test()
.assertResult(1);
Params: - selector – the function called with the success item and should return a
Notification
instance.
Type parameters: - <R> – the result type
See Also: Returns: the new Maybe instance Since: 2.2.4 - experimental
/**
* Maps the {@link Notification} success value of this Single back into normal
* {@code onSuccess}, {@code onError} or {@code onComplete} signals as a
* {@link Maybe} source.
* <p>
* The intended use of the {@code selector} function is to perform a
* type-safe identity mapping (see example) on a source that is already of type
* {@code Notification<T>}. The Java language doesn't allow
* limiting instance methods to a certain generic argument shape, therefore,
* a function is used to ensure the conversion remains type safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>
* Example:
* <pre><code>
* Single.just(Notification.createOnNext(1))
* .dematerialize(notification -> notification)
* .test()
* .assertResult(1);
* </code></pre>
* @param <R> the result type
* @param selector the function called with the success item and should
* return a {@link Notification} instance.
* @return the new Maybe instance
* @since 2.2.4 - experimental
* @see #materialize()
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final <R> Maybe<R> dematerialize(Function<? super T, Notification<R>> selector) {
ObjectHelper.requireNonNull(selector, "selector is null");
return RxJavaPlugins.onAssembly(new SingleDematerialize<T, R>(this, selector));
}
Calls the specified consumer with the success item after this item has been emitted to the downstream.
Note that the doAfterSuccess
action is shared between subscriptions and as such should be thread-safe.
- Scheduler:
doAfterSuccess
does not operate by default on a particular Scheduler
.
History: 2.0.1 - experimental
Params: - onAfterSuccess – the Consumer that will be called after emitting an item from upstream to the downstream
Returns: the new Single instance Since: 2.1
/**
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
* <p>
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doAfterSuccess.png" alt="">
* <p>
* Note that the {@code doAfterSuccess} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.0.1 - experimental
* @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Single instance
* @since 2.1
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
ObjectHelper.requireNonNull(onAfterSuccess, "onAfterSuccess is null");
return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess<T>(this, onAfterSuccess));
}
Registers an Action
to be called after this Single invokes either onSuccess or onError.
Note that the doAfterTerminate
action is shared between subscriptions and as such should be thread-safe.
- Scheduler:
doAfterTerminate
does not operate by default on a particular Scheduler
.
History: 2.0.6 - experimental
Params: - onAfterTerminate – an
Action
to be invoked when the source Single finishes
See Also: Returns: a Single that emits the same items as the source Single, then invokes the Action
Since: 2.1
/**
* Registers an {@link Action} to be called after this Single invokes either onSuccess or onError.
* <p>
* <img width="640" height="460" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doAfterTerminate.png" alt="">
* <p>
* Note that the {@code doAfterTerminate} action is shared between subscriptions and as such
* should be thread-safe.</p>
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* <p>History: 2.0.6 - experimental
* @param onAfterTerminate
* an {@link Action} to be invoked when the source Single finishes
* @return a Single that emits the same items as the source Single, then invokes the
* {@link Action}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 2.1
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doAfterTerminate(Action onAfterTerminate) {
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
return RxJavaPlugins.onAssembly(new SingleDoAfterTerminate<T>(this, onAfterTerminate));
}
Calls the specified action after this Single signals onSuccess or onError or gets disposed by
the downstream.
In case of a race between a terminal event and a dispose call, the provided onFinally
action is executed once per subscription.
Note that the onFinally
action is shared between subscriptions and as such should be thread-safe.
- Scheduler:
doFinally
does not operate by default on a particular Scheduler
.
History: 2.0.1 - experimental
Params: - onFinally – the action called when this Single terminates or gets disposed
Returns: the new Single instance Since: 2.1
/**
* Calls the specified action after this Single signals onSuccess or onError or gets disposed by
* the downstream.
* <p>In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action
* is executed once per subscription.
* <p>Note that the {@code onFinally} action is shared between subscriptions and as such
* should be thread-safe.
* <p>
* <img width="640" height="291" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doFinally.png" alt="">
* </p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.0.1 - experimental
* @param onFinally the action called when this Single terminates or gets disposed
* @return the new Single instance
* @since 2.1
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doFinally(Action onFinally) {
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
return RxJavaPlugins.onAssembly(new SingleDoFinally<T>(this, onFinally));
}
Calls the shared consumer with the Disposable sent through the onSubscribe for each
SingleObserver that subscribes to the current Single.
- Scheduler:
doOnSubscribe
does not operate by default on a particular Scheduler
.
Params: - onSubscribe – the consumer called with the Disposable sent via onSubscribe
Returns: the new Single instance Since: 2.0
/**
* Calls the shared consumer with the Disposable sent through the onSubscribe for each
* SingleObserver that subscribes to the current Single.
* <p>
* <img width="640" height="347" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doOnSubscribe.png" alt="">
* </p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onSubscribe the consumer called with the Disposable sent via onSubscribe
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnSubscribe(final Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
return RxJavaPlugins.onAssembly(new SingleDoOnSubscribe<T>(this, onSubscribe));
}
Returns a Single instance that calls the given onTerminate callback
just before this Single completes normally or with an exception.
This differs from doAfterTerminate
in that this happens before the onSuccess
or onError
notification.
- Scheduler:
doOnTerminate
does not operate by default on a particular Scheduler
.
Params: - onTerminate – the action to invoke when the consumer calls
onSuccess
or onError
See Also: Returns: the new Single instance Since: 2.2.7 - experimental
/**
* Returns a Single instance that calls the given onTerminate callback
* just before this Single completes normally or with an exception.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="">
* <p>
* This differs from {@code doAfterTerminate} in that this happens <em>before</em> the {@code onSuccess} or
* {@code onError} notification.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onTerminate the action to invoke when the consumer calls {@code onSuccess} or {@code onError}
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @see #doOnTerminate(Action)
* @since 2.2.7 - experimental
*/
@Experimental
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnTerminate(final Action onTerminate) {
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
return RxJavaPlugins.onAssembly(new SingleDoOnTerminate<T>(this, onTerminate));
}
Calls the shared consumer with the success value sent via onSuccess for each
SingleObserver that subscribes to the current Single.
- Scheduler:
doOnSuccess
does not operate by default on a particular Scheduler
.
Params: - onSuccess – the consumer called with the success value of onSuccess
Returns: the new Single instance Since: 2.0
/**
* Calls the shared consumer with the success value sent via onSuccess for each
* SingleObserver that subscribes to the current Single.
* <p>
* <img width="640" height="347" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doOnSuccess.2.png" alt="">
* </p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onSuccess the consumer called with the success value of onSuccess
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnSuccess(final Consumer<? super T> onSuccess) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
return RxJavaPlugins.onAssembly(new SingleDoOnSuccess<T>(this, onSuccess));
}
Calls the shared consumer with the error sent via onError or the value
via onSuccess for each SingleObserver that subscribes to the current Single.
- Scheduler:
doOnEvent
does not operate by default on a particular Scheduler
.
Params: - onEvent – the consumer called with the success value of onEvent
Returns: the new Single instance Since: 2.0
/**
* Calls the shared consumer with the error sent via onError or the value
* via onSuccess for each SingleObserver that subscribes to the current Single.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnEvent} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onEvent the consumer called with the success value of onEvent
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnEvent(final BiConsumer<? super T, ? super Throwable> onEvent) {
ObjectHelper.requireNonNull(onEvent, "onEvent is null");
return RxJavaPlugins.onAssembly(new SingleDoOnEvent<T>(this, onEvent));
}
Calls the shared consumer with the error sent via onError for each
SingleObserver that subscribes to the current Single.
- Scheduler:
doOnError
does not operate by default on a particular Scheduler
.
Params: - onError – the consumer called with the success value of onError
Returns: the new Single instance Since: 2.0
/**
* Calls the shared consumer with the error sent via onError for each
* SingleObserver that subscribes to the current Single.
* <p>
* <img width="640" height="349" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doOnError.2.png" alt="">
* </p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onError the consumer called with the success value of onError
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnError(final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onError, "onError is null");
return RxJavaPlugins.onAssembly(new SingleDoOnError<T>(this, onError));
}
Calls the shared Action
if a SingleObserver subscribed to the current Single disposes the common Disposable it received via onSubscribe.
- Scheduler:
doOnDispose
does not operate by default on a particular Scheduler
.
Params: - onDispose – the action called when the subscription is disposed
Throws: - NullPointerException – if onDispose is null
Returns: the new Single instance Since: 2.0
/**
* Calls the shared {@code Action} if a SingleObserver subscribed to the current Single
* disposes the common Disposable it received via onSubscribe.
* <p>
* <img width="640" height="332" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doOnDispose.png" alt="">
* </p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnDispose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onDispose the action called when the subscription is disposed
* @return the new Single instance
* @throws NullPointerException if onDispose is null
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> doOnDispose(final Action onDispose) {
ObjectHelper.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new SingleDoOnDispose<T>(this, onDispose));
}
Filters the success item of the Single via a predicate function and emitting it if the predicate
returns true, completing otherwise.
- Scheduler:
filter
does not operate by default on a particular Scheduler
.
Params: - predicate – a function that evaluates the item emitted by the source Maybe, returning
true
if it passes the filter
See Also: Returns: a Maybe that emit the item emitted by the source Maybe that the filter evaluates as true
/**
* Filters the success item of the Single via a predicate function and emitting it if the predicate
* returns true, completing otherwise.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.filter.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code filter} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* a function that evaluates the item emitted by the source Maybe, returning {@code true}
* if it passes the filter
* @return a Maybe that emit the item emitted by the source Maybe that the filter
* evaluates as {@code true}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new MaybeFilterSingle<T>(this, predicate));
}
Returns a Single that is based on applying a specified function to the item emitted by the source Single,
where that function returns a SingleSource.
- Scheduler:
flatMap
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that, when applied to the item emitted by the source Single, returns a SingleSource
Type parameters: - <R> – the result value type
See Also: Returns: the Single returned from mapper
when applied to the item emitted by the source Single
/**
* Returns a Single that is based on applying a specified function to the item emitted by the source Single,
* where that function returns a SingleSource.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
* @return the Single returned from {@code mapper} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMap<T, R>(this, mapper));
}
Returns a Maybe that is based on applying a specified function to the item emitted by the source Single,
where that function returns a MaybeSource.
- Scheduler:
flatMapMaybe
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that, when applied to the item emitted by the source Single, returns a MaybeSource
Type parameters: - <R> – the result value type
See Also: Returns: the Maybe returned from mapper
when applied to the item emitted by the source Single
/**
* Returns a Maybe that is based on applying a specified function to the item emitted by the source Single,
* where that function returns a MaybeSource.
* <p>
* <img width="640" height="191" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapMaybe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a MaybeSource
* @return the Maybe returned from {@code mapper} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Maybe<R> flatMapMaybe(final Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapMaybe<T, R>(this, mapper));
}
Returns a Flowable that emits items based on applying a specified function to the item emitted by the
source Single, where that function returns a Publisher.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer and the Publisher
returned by the mapper function is expected to honor it as well.
- Scheduler:
flatMapPublisher
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that, when applied to the item emitted by the source Single, returns a
Flowable
Type parameters: - <R> – the result value type
See Also: Returns: the Flowable returned from func
when applied to the item emitted by the source Single
/**
* Returns a Flowable that emits items based on applying a specified function to the item emitted by the
* source Single, where that function returns a Publisher.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapPublisher.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and the {@code Publisher} returned by the mapper function is expected to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a
* Flowable
* @return the Flowable returned from {@code func} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapPublisher<T, R>(this, mapper));
}
Maps the success value of the upstream Single
into an Iterable
and emits its items as a Flowable
sequence.
- Backpressure:
- The operator honors backpressure from downstream.
- Scheduler:
flattenAsFlowable
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that returns an Iterable sequence of values for when given an item emitted by the
source Single
Type parameters: - <U> –
the type of item emitted by the resulting Iterable
See Also: Returns: the new Flowable instance
/**
* Maps the success value of the upstream {@link Single} into an {@link Iterable} and emits its items as a
* {@link Flowable} sequence.
* <p>
* <img width="640" height="373" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenAsFlowable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenAsFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of item emitted by the resulting Iterable
* @param mapper
* a function that returns an Iterable sequence of values for when given an item emitted by the
* source Single
* @return the new Flowable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Flowable<U> flattenAsFlowable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapIterableFlowable<T, U>(this, mapper));
}
Maps the success value of the upstream Single
into an Iterable
and emits its items as an Observable
sequence.
- Scheduler:
flattenAsObservable
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that returns an Iterable sequence of values for when given an item emitted by the
source Single
Type parameters: - <U> –
the type of item emitted by the resulting Iterable
See Also: Returns: the new Observable instance
/**
* Maps the success value of the upstream {@link Single} into an {@link Iterable} and emits its items as an
* {@link Observable} sequence.
* <p>
* <img width="640" height="373" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenAsObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenAsObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of item emitted by the resulting Iterable
* @param mapper
* a function that returns an Iterable sequence of values for when given an item emitted by the
* source Single
* @return the new Observable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Observable<U> flattenAsObservable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapIterableObservable<T, U>(this, mapper));
}
Returns an Observable that is based on applying a specified function to the item emitted by the source Single,
where that function returns an ObservableSource.
- Scheduler:
flatMapObservable
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that, when applied to the item emitted by the source Single, returns an ObservableSource
Type parameters: - <R> – the result value type
See Also: Returns: the Observable returned from func
when applied to the item emitted by the source Single
/**
* Returns an Observable that is based on applying a specified function to the item emitted by the source Single,
* where that function returns an ObservableSource.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns an ObservableSource
* @return the Observable returned from {@code func} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapObservable<T, R>(this, mapper));
}
Returns a Completable
that completes based on applying a specified function to the item emitted by the source Single
, where that function returns a Completable
.
- Scheduler:
flatMapCompletable
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function that, when applied to the item emitted by the source Single, returns a
Completable
See Also: Returns: the Completable returned from func
when applied to the item emitted by the source Single Since: 2.0
/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* source {@link Single}, where that function returns a {@link Completable}.
* <p>
* <img width="640" height="267" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapCompletable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a
* Completable
* @return the Completable returned from {@code func} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable flatMapCompletable(final Function<? super T, ? extends CompletableSource> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapCompletable<T>(this, mapper));
}
Waits in a blocking fashion until the current Single signals a success value (which is returned) or
an exception (which is propagated).
- Scheduler:
blockingGet
does not operate by default on a particular Scheduler
.
- Error handling:
- If the source signals an error, the operator wraps a checked
Exception
into RuntimeException
and throws that. Otherwise, RuntimeException
s and Error
s are rethrown as they are.
Returns: the success value
/**
* Waits in a blocking fashion until the current Single signals a success value (which is returned) or
* an exception (which is propagated).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
* @return the success value
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final T blockingGet() {
BlockingMultiObserver<T> observer = new BlockingMultiObserver<T>();
subscribe(observer);
return observer.blockingGet();
}
This method requires advanced knowledge about building operators, please consider
other standard composition methods first; Returns a Single
which, when subscribed to, invokes the apply(SingleObserver)
method of the provided SingleOperator
for each individual downstream Single
and allows the insertion of a custom operator by accessing the downstream's SingleObserver
during this subscription phase and providing a new SingleObserver
, containing the custom operator's intended business logic, that will be used in the subscription process going further upstream. Generally, such a new SingleObserver
will wrap the downstream's SingleObserver
and forwards the onSuccess
and onError
events from the upstream directly or according to the emission pattern the custom operator's business logic requires. In addition, such operator can intercept the flow control calls of dispose
and isDisposed
that would have traveled upstream and perform additional actions depending on the same business logic requirements.
Example:
// Step 1: Create the consumer type that will be returned by the SingleOperator.apply():
public final class CustomSingleObserver<T> implements SingleObserver<T>, Disposable {
// The downstream's SingleObserver that will receive the onXXX events
final SingleObserver<? super String> downstream;
// The connection to the upstream source that will call this class' onXXX methods
Disposable upstream;
// The constructor takes the downstream subscriber and usually any other parameters
public CustomSingleObserver(SingleObserver<? super String> downstream) {
this.downstream = downstream;
}
// In the subscription phase, the upstream sends a Disposable to this class
// and subsequently this class has to send a Disposable to the downstream.
// Note that relaying the upstream's Disposable directly is not allowed in RxJava
@Override
public void onSubscribe(Disposable d) {
if (upstream != null) {
d.dispose();
} else {
upstream = d;
downstream.onSubscribe(this);
}
}
// The upstream calls this with the next item and the implementation's
// responsibility is to emit an item to the downstream based on the intended
// business logic, or if it can't do so for the particular item,
// request more from the upstream
@Override
public void onSuccess(T item) {
String str = item.toString();
if (str.length() < 2) {
downstream.onSuccess(str);
} else {
// Single is usually expected to produce one of the onXXX events
downstream.onError(new NoSuchElementException());
}
}
// Some operators may handle the upstream's error while others
// could just forward it to the downstream.
@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}
// Some operators may use their own resources which should be cleaned up if
// the downstream disposes the flow before it completed. Operators without
// resources can simply forward the dispose to the upstream.
// In some cases, a disposed flag may be set by this method so that other parts
// of this class may detect the dispose and stop sending events
// to the downstream.
@Override
public void dispose() {
upstream.dispose();
}
// Some operators may simply forward the call to the upstream while others
// can return the disposed flag set in dispose().
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
// Step 2: Create a class that implements the SingleOperator interface and
// returns the custom consumer type from above in its apply() method.
// Such class may define additional parameters to be submitted to
// the custom consumer type.
final class CustomSingleOperator<T> implements SingleOperator<String> {
@Override
public SingleObserver<? super String> apply(SingleObserver<? super T> upstream) {
return new CustomSingleObserver<T>(upstream);
}
}
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
// or reusing an existing one.
Single.just(5)
.lift(new CustomSingleOperator<Integer>())
.test()
.assertResult("5");
Single.just(15)
.lift(new CustomSingleOperator<Integer>())
.test()
.assertFailure(NoSuchElementException.class);
Creating custom operators can be complicated and it is recommended one consults the
RxJava wiki: Writing operators page about
the tools, requirements, rules, considerations and pitfalls of implementing them.
Note that implementing custom operators via this lift()
method adds slightly more overhead by requiring an additional allocation and indirection per assembled flows. Instead, extending the abstract Single
class and creating a SingleTransformer
with it is recommended.
Note also that it is not possible to stop the subscription phase in lift()
as the apply()
method requires a non-null SingleObserver
instance to be returned, which is then unconditionally subscribed to the upstream Single
. For example, if the operator decided there is no reason to subscribe to the upstream source because of some optimization possibility or a failure to prepare the operator, it still has to return a SingleObserver
that should immediately dispose the upstream's Disposable
in its onSubscribe
method. Again, using a SingleTransformer
and extending the Single
is a better option as subscribeActual
can decide to not subscribe to its upstream after all.
- Scheduler:
lift
does not operate by default on a particular Scheduler
, however, the SingleOperator
may use a Scheduler
to support its own asynchronous behavior.
Params: - lift – the
SingleOperator
that receives the downstream's SingleObserver
and should return a SingleObserver
with custom behavior to be used as the consumer for the current Single
.
Type parameters: - <R> – the output value type
See Also: Returns: the new Single instance
/**
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
* Returns a {@code Single} which, when subscribed to, invokes the {@link SingleOperator#apply(SingleObserver) apply(SingleObserver)} method
* of the provided {@link SingleOperator} for each individual downstream {@link Single} and allows the
* insertion of a custom operator by accessing the downstream's {@link SingleObserver} during this subscription phase
* and providing a new {@code SingleObserver}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* Generally, such a new {@code SingleObserver} will wrap the downstream's {@code SingleObserver} and forwards the
* {@code onSuccess} and {@code onError} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code dispose} and {@code isDisposed} that would have traveled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* Example:
* <pre><code>
* // Step 1: Create the consumer type that will be returned by the SingleOperator.apply():
*
* public final class CustomSingleObserver<T> implements SingleObserver<T>, Disposable {
*
* // The downstream's SingleObserver that will receive the onXXX events
* final SingleObserver<? super String> downstream;
*
* // The connection to the upstream source that will call this class' onXXX methods
* Disposable upstream;
*
* // The constructor takes the downstream subscriber and usually any other parameters
* public CustomSingleObserver(SingleObserver<? super String> downstream) {
* this.downstream = downstream;
* }
*
* // In the subscription phase, the upstream sends a Disposable to this class
* // and subsequently this class has to send a Disposable to the downstream.
* // Note that relaying the upstream's Disposable directly is not allowed in RxJava
* @Override
* public void onSubscribe(Disposable d) {
* if (upstream != null) {
* d.dispose();
* } else {
* upstream = d;
* downstream.onSubscribe(this);
* }
* }
*
* // The upstream calls this with the next item and the implementation's
* // responsibility is to emit an item to the downstream based on the intended
* // business logic, or if it can't do so for the particular item,
* // request more from the upstream
* @Override
* public void onSuccess(T item) {
* String str = item.toString();
* if (str.length() < 2) {
* downstream.onSuccess(str);
* } else {
* // Single is usually expected to produce one of the onXXX events
* downstream.onError(new NoSuchElementException());
* }
* }
*
* // Some operators may handle the upstream's error while others
* // could just forward it to the downstream.
* @Override
* public void onError(Throwable throwable) {
* downstream.onError(throwable);
* }
*
* // Some operators may use their own resources which should be cleaned up if
* // the downstream disposes the flow before it completed. Operators without
* // resources can simply forward the dispose to the upstream.
* // In some cases, a disposed flag may be set by this method so that other parts
* // of this class may detect the dispose and stop sending events
* // to the downstream.
* @Override
* public void dispose() {
* upstream.dispose();
* }
*
* // Some operators may simply forward the call to the upstream while others
* // can return the disposed flag set in dispose().
* @Override
* public boolean isDisposed() {
* return upstream.isDisposed();
* }
* }
*
* // Step 2: Create a class that implements the SingleOperator interface and
* // returns the custom consumer type from above in its apply() method.
* // Such class may define additional parameters to be submitted to
* // the custom consumer type.
*
* final class CustomSingleOperator<T> implements SingleOperator<String> {
* @Override
* public SingleObserver<? super String> apply(SingleObserver<? super T> upstream) {
* return new CustomSingleObserver<T>(upstream);
* }
* }
*
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
* // or reusing an existing one.
*
* Single.just(5)
* .lift(new CustomSingleOperator<Integer>())
* .test()
* .assertResult("5");
*
* Single.just(15)
* .lift(new CustomSingleOperator<Integer>())
* .test()
* .assertFailure(NoSuchElementException.class);
* </code></pre>
* <p>
* Creating custom operators can be complicated and it is recommended one consults the
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
* the tools, requirements, rules, considerations and pitfalls of implementing them.
* <p>
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Single}
* class and creating a {@link SingleTransformer} with it is recommended.
* <p>
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
* requires a non-null {@code SingleObserver} instance to be returned, which is then unconditionally subscribed to
* the upstream {@code Single}. For example, if the operator decided there is no reason to subscribe to the
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
* return a {@code SingleObserver} that should immediately dispose the upstream's {@code Disposable} in its
* {@code onSubscribe} method. Again, using a {@code SingleTransformer} and extending the {@code Single} is
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
* {@link SingleOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
* </dl>
*
* @param <R> the output value type
* @param lift the {@link SingleOperator} that receives the downstream's {@code SingleObserver} and should return
* a {@code SingleObserver} with custom behavior to be used as the consumer for the current
* {@code Single}.
* @return the new Single instance
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
* @see #compose(SingleTransformer)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> lift(final SingleOperator<? extends R, ? super T> lift) {
ObjectHelper.requireNonNull(lift, "lift is null");
return RxJavaPlugins.onAssembly(new SingleLift<T, R>(this, lift));
}
Returns a Single that applies a specified function to the item emitted by the source Single and
emits the result of this function application.
- Scheduler:
map
does not operate by default on a particular Scheduler
.
Params: - mapper –
a function to apply to the item emitted by the Single
Type parameters: - <R> – the result value type
See Also: Returns: a Single that emits the item from the source Single, transformed by the specified function
/**
* Returns a Single that applies a specified function to the item emitted by the source Single and
* emits the result of this function application.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.map.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function to apply to the item emitted by the Single
* @return a Single that emits the item from the source Single, transformed by the specified function
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
Maps the signal types of this Single into a Notification
of the same kind and emits it as a single success value to downstream.
- Scheduler:
materialize
does not operate by default on a particular Scheduler
.
See Also: Returns: the new Single instance Since: 2.2.4 - experimental
/**
* Maps the signal types of this Single into a {@link Notification} of the same kind
* and emits it as a single success value to downstream.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/materialize.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code materialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Single instance
* @since 2.2.4 - experimental
* @see #dematerialize(Function)
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Notification<T>> materialize() {
return RxJavaPlugins.onAssembly(new SingleMaterialize<T>(this));
}
Signals true if the current Single signals a success value that is Object-equals with the value
provided.
- Scheduler:
contains
does not operate by default on a particular Scheduler
.
Params: - value – the value to compare against the success value of this Single
Returns: the new Single instance Since: 2.0
/**
* Signals true if the current Single signals a success value that is Object-equals with the value
* provided.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code contains} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param value the value to compare against the success value of this Single
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Boolean> contains(Object value) {
return contains(value, ObjectHelper.equalsPredicate());
}
Signals true if the current Single signals a success value that is equal with
the value provided by calling a bi-predicate.
- Scheduler:
contains
does not operate by default on a particular Scheduler
.
Params: - value – the value to compare against the success value of this Single
- comparer – the function that receives the success value of this Single, the value provided
and should return true if they are considered equal
Returns: the new Single instance Since: 2.0
/**
* Signals true if the current Single signals a success value that is equal with
* the value provided by calling a bi-predicate.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code contains} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param value the value to compare against the success value of this Single
* @param comparer the function that receives the success value of this Single, the value provided
* and should return true if they are considered equal
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Boolean> contains(final Object value, final BiPredicate<Object, Object> comparer) {
ObjectHelper.requireNonNull(value, "value is null");
ObjectHelper.requireNonNull(comparer, "comparer is null");
return RxJavaPlugins.onAssembly(new SingleContains<T>(this, value, comparer));
}
Flattens this and another Single into a single Flowable, without any transformation.
You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using the mergeWith
method.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
mergeWith
does not operate by default on a particular Scheduler
.
Params: - other –
a SingleSource to be merged
See Also: Returns: that emits all of the items emitted by the source Singles
/**
* Flattens this and another Single into a single Flowable, without any transformation.
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.mergeWith.png" alt="">
* <p>
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
* the {@code mergeWith} method.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* a SingleSource to be merged
* @return that emits all of the items emitted by the source Singles
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> mergeWith(SingleSource<? extends T> other) {
return merge(this, other);
}
Modifies a Single to emit its item (or notify of its error) on a specified Scheduler
, asynchronously.
- Scheduler:
- you specify which
Scheduler
this operator will use.
Params: - scheduler – the
Scheduler
to notify subscribers on
Throws: - NullPointerException – if scheduler is null
See Also: Returns: the source Single modified so that its subscribers are notified on the specified Scheduler
/**
* Modifies a Single to emit its item (or notify of its error) on a specified {@link Scheduler},
* asynchronously.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to notify subscribers on
* @return the source Single modified so that its subscribers are notified on the specified
* {@link Scheduler}
* @throws NullPointerException if scheduler is null
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> observeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
}
Instructs a Single to emit an item (returned by a specified function) rather than invoking onError
if it encounters an error.
By default, when a Single encounters an error that prevents it from emitting the expected item to its subscriber, the Single invokes its subscriber's SingleObserver.onError
method, and then quits without invoking any more of its subscriber's methods. The onErrorReturn
method changes this behavior. If you pass a function (resumeFunction
) to a Single's onErrorReturn
method, if the original Single encounters an error, instead of invoking its subscriber's SingleObserver.onError
method, it will instead emit the return value of resumeFunction
.
You can use this to prevent errors from propagating or to supply fallback data should errors be
encountered.
- Scheduler:
onErrorReturn
does not operate by default on a particular Scheduler
.
Params: - resumeFunction –
a function that returns an item that the new Single will emit if the source Single encounters
an error
See Also: Returns: the original Single with appropriately modified behavior
/**
* Instructs a Single to emit an item (returned by a specified function) rather than invoking
* {@link SingleObserver#onError onError} if it encounters an error.
* <p>
* <img width="640" height="451" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.onErrorReturn.png" alt="">
* <p>
* By default, when a Single encounters an error that prevents it from emitting the expected item to its
* subscriber, the Single invokes its subscriber's {@link SingleObserver#onError} method, and then quits
* without invoking any more of its subscriber's methods. The {@code onErrorReturn} method changes this
* behavior. If you pass a function ({@code resumeFunction}) to a Single's {@code onErrorReturn} method, if
* the original Single encounters an error, instead of invoking its subscriber's
* {@link SingleObserver#onError} method, it will instead emit the return value of {@code resumeFunction}.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resumeFunction
* a function that returns an item that the new Single will emit if the source Single encounters
* an error
* @return the original Single with appropriately modified behavior
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> onErrorReturn(final Function<Throwable, ? extends T> resumeFunction) {
ObjectHelper.requireNonNull(resumeFunction, "resumeFunction is null");
return RxJavaPlugins.onAssembly(new SingleOnErrorReturn<T>(this, resumeFunction, null));
}
Signals the specified value as success in case the current Single signals an error.
- Scheduler:
onErrorReturnItem
does not operate by default on a particular Scheduler
.
Params: - value – the value to signal if the current Single fails
Returns: the new Single instance Since: 2.0
/**
* Signals the specified value as success in case the current Single signals an error.
* <p>
* <img width="640" height="451" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.onErrorReturnItem.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorReturnItem} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param value the value to signal if the current Single fails
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> onErrorReturnItem(final T value) {
ObjectHelper.requireNonNull(value, "value is null");
return RxJavaPlugins.onAssembly(new SingleOnErrorReturn<T>(this, null, value));
}
Instructs a Single to pass control to another Single rather than invoking SingleObserver.onError(Throwable)
if it encounters an error.
By default, when a Single encounters an error that prevents it from emitting the expected item to its SingleObserver
, the Single invokes its SingleObserver's onError
method, and then quits without invoking any more of its SingleObserver's methods. The onErrorResumeNext
method changes this behavior. If you pass another Single (resumeSingleInCaseOfError
) to a Single's onErrorResumeNext
method, if the original Single encounters an error, instead of invoking its SingleObserver's onError
method, it will instead relinquish control to resumeSingleInCaseOfError
which will invoke the SingleObserver's onSuccess
method if it is able to do so. In such a case, because no Single necessarily invokes onError
, the SingleObserver may never know that an error happened.
You can use this to prevent errors from propagating or to supply fallback data should errors be
encountered.
- Scheduler:
onErrorResumeNext
does not operate by default on a particular Scheduler
.
Params: - resumeSingleInCaseOfError – a Single that will take control if source Single encounters an error.
See Also: Returns: the original Single, with appropriately modified behavior.
/**
* Instructs a Single to pass control to another Single rather than invoking
* {@link SingleObserver#onError(Throwable)} if it encounters an error.
* <p>
* <img width="640" height="451" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.onErrorResumeNext.png" alt="">
* <p>
* By default, when a Single encounters an error that prevents it from emitting the expected item to
* its {@link SingleObserver}, the Single invokes its SingleObserver's {@code onError} method, and then quits
* without invoking any more of its SingleObserver's methods. The {@code onErrorResumeNext} method changes this
* behavior. If you pass another Single ({@code resumeSingleInCaseOfError}) to a Single's
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
* SingleObserver's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
* will invoke the SingleObserver's {@link SingleObserver#onSuccess onSuccess} method if it is able to do so. In such a case,
* because no Single necessarily invokes {@code onError}, the SingleObserver may never know that an error
* happened.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resumeSingleInCaseOfError a Single that will take control if source Single encounters an error.
* @return the original Single, with appropriately modified behavior.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> onErrorResumeNext(final Single<? extends T> resumeSingleInCaseOfError) {
ObjectHelper.requireNonNull(resumeSingleInCaseOfError, "resumeSingleInCaseOfError is null");
return onErrorResumeNext(Functions.justFunction(resumeSingleInCaseOfError));
}
Instructs a Single to pass control to another Single rather than invoking SingleObserver.onError(Throwable)
if it encounters an error.
By default, when a Single encounters an error that prevents it from emitting the expected item to its SingleObserver
, the Single invokes its SingleObserver's onError
method, and then quits without invoking any more of its SingleObserver's methods. The onErrorResumeNext
method changes this behavior. If you pass a function that will return another Single (resumeFunctionInCaseOfError
) to a Single's onErrorResumeNext
method, if the original Single encounters an error, instead of invoking its SingleObserver's onError
method, it will instead relinquish control to resumeSingleInCaseOfError
which will invoke the SingleObserver's onSuccess
method if it is able to do so. In such a case, because no Single necessarily invokes onError
, the SingleObserver may never know that an error happened.
You can use this to prevent errors from propagating or to supply fallback data should errors be
encountered.
- Scheduler:
onErrorResumeNext
does not operate by default on a particular Scheduler
.
Params: - resumeFunctionInCaseOfError – a function that returns a Single that will take control if source Single encounters an error.
See Also: Returns: the original Single, with appropriately modified behavior. Since: .20
/**
* Instructs a Single to pass control to another Single rather than invoking
* {@link SingleObserver#onError(Throwable)} if it encounters an error.
* <p>
* <img width="640" height="451" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.onErrorResumeNext.f.png" alt="">
* <p>
* By default, when a Single encounters an error that prevents it from emitting the expected item to
* its {@link SingleObserver}, the Single invokes its SingleObserver's {@code onError} method, and then quits
* without invoking any more of its SingleObserver's methods. The {@code onErrorResumeNext} method changes this
* behavior. If you pass a function that will return another Single ({@code resumeFunctionInCaseOfError}) to a Single's
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
* SingleObserver's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
* will invoke the SingleObserver's {@link SingleObserver#onSuccess onSuccess} method if it is able to do so. In such a case,
* because no Single necessarily invokes {@code onError}, the SingleObserver may never know that an error
* happened.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resumeFunctionInCaseOfError a function that returns a Single that will take control if source Single encounters an error.
* @return the original Single, with appropriately modified behavior.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
* @since .20
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> onErrorResumeNext(
final Function<? super Throwable, ? extends SingleSource<? extends T>> resumeFunctionInCaseOfError) {
ObjectHelper.requireNonNull(resumeFunctionInCaseOfError, "resumeFunctionInCaseOfError is null");
return RxJavaPlugins.onAssembly(new SingleResumeNext<T>(this, resumeFunctionInCaseOfError));
}
Nulls out references to the upstream producer and downstream SingleObserver if
the sequence is terminated or downstream calls dispose().
- Scheduler:
onTerminateDetach
does not operate by default on a particular Scheduler
.
History: 2.1.5 - experimental
Returns: a Single which nulls out references to the upstream producer and downstream SingleObserver if
the sequence is terminated or downstream calls dispose() Since: 2.2
/**
* Nulls out references to the upstream producer and downstream SingleObserver if
* the sequence is terminated or downstream calls dispose().
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.5 - experimental
* @return a Single which nulls out references to the upstream producer and downstream SingleObserver if
* the sequence is terminated or downstream calls dispose()
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> onTerminateDetach() {
return RxJavaPlugins.onAssembly(new SingleDetach<T>(this));
}
Repeatedly re-subscribes to the current Single and emits each success value.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
repeat
does not operate by default on a particular Scheduler
.
Returns: the new Flowable instance Since: 2.0
/**
* Repeatedly re-subscribes to the current Single and emits each success value.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.repeat.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Flowable instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeat() {
return toFlowable().repeat();
}
Re-subscribes to the current Single at most the given number of times and emits each success value.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
repeat
does not operate by default on a particular Scheduler
.
Params: - times – the number of times to re-subscribe to the current Single
Returns: the new Flowable instance Since: 2.0
/**
* Re-subscribes to the current Single at most the given number of times and emits each success value.
* <p>
* <img width="640" height="457" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.repeat.n.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times to re-subscribe to the current Single
* @return the new Flowable instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeat(long times) {
return toFlowable().repeat(times);
}
Re-subscribes to the current Single if
the Publisher returned by the handler function signals a value in response to a
value signalled through the Flowable the handle receives.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer. The Publisher
returned by the handler function is expected to honor backpressure as well.
- Scheduler:
repeatWhen
does not operate by default on a particular Scheduler
.
Params: - handler – the function that is called with a Flowable that signals a value when the Single
signalled a success value and returns a Publisher that has to signal a value to
trigger a resubscription to the current Single, otherwise the terminal signal of
the Publisher will be the terminal signal of the sequence as well.
Returns: the new Flowable instance Since: 2.0
/**
* Re-subscribes to the current Single if
* the Publisher returned by the handler function signals a value in response to a
* value signalled through the Flowable the handle receives.
* <p>
* <img width="640" height="1478" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.repeatWhen.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.
* The {@code Publisher} returned by the handler function is expected to honor backpressure as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the function that is called with a Flowable that signals a value when the Single
* signalled a success value and returns a Publisher that has to signal a value to
* trigger a resubscription to the current Single, otherwise the terminal signal of
* the Publisher will be the terminal signal of the sequence as well.
* @return the new Flowable instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return toFlowable().repeatWhen(handler);
}
Re-subscribes to the current Single until the given BooleanSupplier returns true.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
repeatUntil
does not operate by default on a particular Scheduler
.
Params: - stop – the BooleanSupplier called after the current Single succeeds and if returns false,
the Single is re-subscribed; otherwise the sequence completes.
Returns: the new Flowable instance Since: 2.0
/**
* Re-subscribes to the current Single until the given BooleanSupplier returns true.
* <p>
* <img width="640" height="463" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.repeatUntil.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param stop the BooleanSupplier called after the current Single succeeds and if returns false,
* the Single is re-subscribed; otherwise the sequence completes.
* @return the new Flowable instance
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> repeatUntil(BooleanSupplier stop) {
return toFlowable().repeatUntil(stop);
}
Repeatedly re-subscribes to the current Single indefinitely if it fails with an onError.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Returns: the new Single instance Since: 2.0
/**
* Repeatedly re-subscribes to the current Single indefinitely if it fails with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry() {
return toSingle(toFlowable().retry());
}
Repeatedly re-subscribe at most the specified times to the current Single
if it fails with an onError.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Params: - times – the number of times to resubscribe if the current Single fails
Returns: the new Single instance Since: 2.0
/**
* Repeatedly re-subscribe at most the specified times to the current Single
* if it fails with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times to resubscribe if the current Single fails
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(long times) {
return toSingle(toFlowable().retry(times));
}
Re-subscribe to the current Single if the given predicate returns true when the Single fails
with an onError.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Params: - predicate – the predicate called with the resubscription count and the failure Throwable
and should return true if a resubscription should happen
Returns: the new Single instance Since: 2.0
/**
* Re-subscribe to the current Single if the given predicate returns true when the Single fails
* with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate called with the resubscription count and the failure Throwable
* and should return true if a resubscription should happen
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
return toSingle(toFlowable().retry(predicate));
}
Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first
if it fails with an onError.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
History: 2.1.8 - experimental
Params: - times – the number of times to resubscribe if the current Single fails
- predicate – the predicate called with the failure Throwable
and should return true if a resubscription should happen
Returns: the new Single instance Since: 2.2
/**
* Repeatedly re-subscribe at most times or until the predicate returns false, whichever happens first
* if it fails with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.8 - experimental
* @param times the number of times to resubscribe if the current Single fails
* @param predicate the predicate called with the failure Throwable
* and should return true if a resubscription should happen
* @return the new Single instance
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(long times, Predicate<? super Throwable> predicate) {
return toSingle(toFlowable().retry(times, predicate));
}
Re-subscribe to the current Single if the given predicate returns true when the Single fails
with an onError.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Params: - predicate – the predicate called with the failure Throwable
and should return true if a resubscription should happen
Returns: the new Single instance Since: 2.0
/**
* Re-subscribe to the current Single if the given predicate returns true when the Single fails
* with an onError.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate called with the failure Throwable
* and should return true if a resubscription should happen
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retry(Predicate<? super Throwable> predicate) {
return toSingle(toFlowable().retry(predicate));
}
Re-subscribes to the current Single if and when the Publisher returned by the handler
function signals a value.
If the Publisher signals an onComplete, the resulting Single will signal a NoSuchElementException.
Note that the inner Publisher
returned by the handler function should signal either onNext
, onError
or onComplete
in response to the received Throwable
to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner Publisher
signals onError
or onComplete
while the upstream is active, the sequence is terminated with the same signal immediately.
The following example demonstrates how to retry an asynchronous source with a delay:
Single.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingGet();
- Scheduler:
retryWhen
does not operate by default on a particular Scheduler
.
Params: - handler – the function that receives a Flowable of the error the Single emits and should
return a Publisher that should signal a normal value (in response to the
throwable the Flowable emits) to trigger a resubscription or signal an error to
be the output of the resulting Single
Returns: the new Single instance
/**
* Re-subscribes to the current Single if and when the Publisher returned by the handler
* function signals a value.
* <p>
* If the Publisher signals an onComplete, the resulting Single will signal a NoSuchElementException.
* <p>
* Note that the inner {@code Publisher} returned by the handler function should signal
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
* result in the sequence to be completed immediately. Similarly, if this inner
* {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
* active, the sequence is terminated with the same signal immediately.
* <p>
* The following example demonstrates how to retry an asynchronous source with a delay:
* <pre><code>
* Single.timer(1, TimeUnit.SECONDS)
* .doOnSubscribe(s -> System.out.println("subscribing"))
* .map(v -> { throw new RuntimeException(); })
* .retryWhen(errors -> {
* AtomicInteger counter = new AtomicInteger();
* return errors
* .takeWhile(e -> counter.getAndIncrement() != 3)
* .flatMap(e -> {
* System.out.println("delay retry by " + counter.get() + " second(s)");
* return Flowable.timer(counter.get(), TimeUnit.SECONDS);
* });
* })
* .blockingGet();
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param handler the function that receives a Flowable of the error the Single emits and should
* return a Publisher that should signal a normal value (in response to the
* throwable the Flowable emits) to trigger a resubscription or signal an error to
* be the output of the resulting Single
* @return the new Single instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return toSingle(toFlowable().retryWhen(handler));
}
Subscribes to a Single but ignore its emission or notification.
If the Single emits an error, it is wrapped into an OnErrorNotImplementedException
and routed to the RxJavaPlugins.onError handler.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
See Also: Returns: a Disposable
reference can request the Single
stop work.
/**
* Subscribes to a Single but ignore its emission or notification.
* <p>
* If the Single emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Disposable} reference can request the {@link Single} stop work.
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING);
}
Subscribes to a Single and provides a composite callback to handle the item it emits
or any error notification it issues.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
Params: - onCallback –
the callback that receives either the success value or the failure Throwable
(whichever is not null)
Throws: - NullPointerException – if
onCallback
is null
See Also: Returns: a Disposable
reference can request the Single
stop work.
/**
* Subscribes to a Single and provides a composite callback to handle the item it emits
* or any error notification it issues.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onCallback
* the callback that receives either the success value or the failure Throwable
* (whichever is not null)
* @return a {@link Disposable} reference can request the {@link Single} stop work.
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
* @throws NullPointerException
* if {@code onCallback} is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) {
ObjectHelper.requireNonNull(onCallback, "onCallback is null");
BiConsumerSingleObserver<T> observer = new BiConsumerSingleObserver<T>(onCallback);
subscribe(observer);
return observer;
}
Subscribes to a Single and provides a callback to handle the item it emits.
If the Single emits an error, it is wrapped into an OnErrorNotImplementedException
and routed to the RxJavaPlugins.onError handler.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
Params: - onSuccess – the
Consumer<T>
you have designed to accept the emission from the Single
Throws: - NullPointerException – if
onSuccess
is null
See Also: Returns: a Disposable
reference can request the Single
stop work.
/**
* Subscribes to a Single and provides a callback to handle the item it emits.
* <p>
* If the Single emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSuccess
* the {@code Consumer<T>} you have designed to accept the emission from the Single
* @return a {@link Disposable} reference can request the {@link Single} stop work.
* @throws NullPointerException
* if {@code onSuccess} is null
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess) {
return subscribe(onSuccess, Functions.ON_ERROR_MISSING);
}
Subscribes to a Single and provides callbacks to handle the item it emits or any error notification it
issues.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
Params: - onSuccess – the
Consumer<T>
you have designed to accept the emission from the Single - onError – the
Consumer<Throwable>
you have designed to accept any error notification from the Single
Throws: - NullPointerException – if
onSuccess
is null, or if onError
is null
See Also: Returns: a Disposable
reference can request the Single
stop work.
/**
* Subscribes to a Single and provides callbacks to handle the item it emits or any error notification it
* issues.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSuccess
* the {@code Consumer<T>} you have designed to accept the emission from the Single
* @param onError
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
* Single
* @return a {@link Disposable} reference can request the {@link Single} stop work.
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
* @throws NullPointerException
* if {@code onSuccess} is null, or
* if {@code onError} is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ConsumerSingleObserver<T> observer = new ConsumerSingleObserver<T>(onSuccess, onError);
subscribe(observer);
return observer;
}
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
Implement this method in subclasses to handle the incoming SingleObserver
s. There is no need to call any of the plugin hooks on the current Single
instance or the SingleObserver
; all hooks and basic safeguards have been applied by subscribe(SingleObserver)
before this method gets called.
Params: - observer – the SingleObserver to handle, not null
/**
* Implement this method in subclasses to handle the incoming {@link SingleObserver}s.
* <p>There is no need to call any of the plugin hooks on the current {@code Single} instance or
* the {@code SingleObserver}; all hooks and basic safeguards have been
* applied by {@link #subscribe(SingleObserver)} before this method gets called.
* @param observer the SingleObserver to handle, not null
*/
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
Subscribes a given SingleObserver (subclass) to this Single and returns the given
SingleObserver as is.
Usage example:
Single<Integer> source = Single.just(1);
CompositeDisposable composite = new CompositeDisposable();
DisposableSingleObserver<Integer> ds = new DisposableSingleObserver<>() {
// ...
};
composite.add(source.subscribeWith(ds));
- Scheduler:
subscribeWith
does not operate by default on a particular Scheduler
.
Params: - observer – the SingleObserver (subclass) to use and return, not null
Type parameters: - <E> – the type of the SingleObserver to use and return
Throws: - NullPointerException – if
observer
is null
Returns: the input observer
Since: 2.0
/**
* Subscribes a given SingleObserver (subclass) to this Single and returns the given
* SingleObserver as is.
* <p>Usage example:
* <pre><code>
* Single<Integer> source = Single.just(1);
* CompositeDisposable composite = new CompositeDisposable();
*
* DisposableSingleObserver<Integer> ds = new DisposableSingleObserver<>() {
* // ...
* };
*
* composite.add(source.subscribeWith(ds));
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <E> the type of the SingleObserver to use and return
* @param observer the SingleObserver (subclass) to use and return, not null
* @return the input {@code observer}
* @throws NullPointerException if {@code observer} is null
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
Asynchronously subscribes subscribers to this Single on the specified Scheduler
.
- Scheduler:
- You specify which
Scheduler
this operator will use.
Params: - scheduler – the
Scheduler
to perform subscription actions on
See Also: Returns: the source Single modified so that its subscriptions happen on the specified Scheduler
/**
* Asynchronously subscribes subscribers to this Single on the specified {@link Scheduler}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.subscribeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source Single modified so that its subscriptions happen on the specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> subscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<T>(this, scheduler));
}
Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon termination of other
, this will emit a CancellationException
rather than go to SingleObserver.onSuccess(Object)
.
- Scheduler:
takeUntil
does not operate by default on a particular Scheduler
.
Params: - other – the Completable whose termination will cause
takeUntil
to emit the item from the source Single
See Also: Returns: a Single that emits the item emitted by the source Single until such time as other
terminates.
/**
* Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon
* termination of {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleObserver#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Completable whose termination will cause {@code takeUntil} to emit the item from the source
* Single
* @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates.
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> takeUntil(final CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return takeUntil(new CompletableToFlowable<T>(other));
}
Returns a Single that emits the item emitted by the source Single until a Publisher emits an item. Upon emission of an item from other
, this will emit a CancellationException
rather than go to SingleObserver.onSuccess(Object)
.
- Backpressure:
- The
other
publisher is consumed in an unbounded fashion but will be cancelled after the first item it produced.
- Scheduler:
takeUntil
does not operate by default on a particular Scheduler
.
Params: - other – the Publisher whose first emitted item will cause
takeUntil
to emit the item from the source Single
Type parameters: - <E> – the type of items emitted by
other
See Also: Returns: a Single that emits the item emitted by the source Single until such time as other
emits its first item
/**
* Returns a Single that emits the item emitted by the source Single until a Publisher emits an item. Upon
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleObserver#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code other} publisher is consumed in an unbounded fashion but will be
* cancelled after the first item it produced.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Publisher whose first emitted item will cause {@code takeUntil} to emit the item from the source
* Single
* @param <E>
* the type of items emitted by {@code other}
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits
* its first item
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <E> Single<T> takeUntil(final Publisher<E> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new SingleTakeUntil<T, E>(this, other));
}
Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon emission of an item from other
, this will emit a CancellationException
rather than go to SingleObserver.onSuccess(Object)
.
- Scheduler:
takeUntil
does not operate by default on a particular Scheduler
.
Params: - other – the Single whose emitted item will cause
takeUntil
to emit the item from the source Single
Type parameters: - <E> – the type of item emitted by
other
See Also: Returns: a Single that emits the item emitted by the source Single until such time as other
emits its item
/**
* Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon
* emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to
* {@link SingleObserver#onSuccess(Object)}.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single
* @param <E>
* the type of item emitted by {@code other}
* @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item
* @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <E> Single<T> takeUntil(final SingleSource<? extends E> other) {
ObjectHelper.requireNonNull(other, "other is null");
return takeUntil(new SingleToFlowable<E>(other));
}
Signals a TimeoutException if the current Single doesn't signal a success value within the
specified timeout window.
- Scheduler:
timeout
signals the TimeoutException on the computation
Scheduler
.
Params: - timeout – the timeout amount
- unit – the time unit
Returns: the new Single instance Since: 2.0
/**
* Signals a TimeoutException if the current Single doesn't signal a success value within the
* specified timeout window.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} signals the TimeoutException on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the timeout amount
* @param unit the time unit
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> timeout(long timeout, TimeUnit unit) {
return timeout0(timeout, unit, Schedulers.computation(), null);
}
Signals a TimeoutException if the current Single doesn't signal a success value within the
specified timeout window.
- Scheduler:
timeout
signals the TimeoutException on the Scheduler
you specify.
Params: - timeout – the timeout amount
- unit – the time unit
- scheduler – the target scheduler where the timeout is awaited and the TimeoutException
signalled
Returns: the new Single instance Since: 2.0
/**
* Signals a TimeoutException if the current Single doesn't signal a success value within the
* specified timeout window.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} signals the TimeoutException on the {@link Scheduler} you specify.</dd>
* </dl>
* @param timeout the timeout amount
* @param unit the time unit
* @param scheduler the target scheduler where the timeout is awaited and the TimeoutException
* signalled
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> timeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return timeout0(timeout, unit, scheduler, null);
}
Runs the current Single and if it doesn't signal within the specified timeout window, it is
disposed and the other SingleSource subscribed to.
- Scheduler:
timeout
subscribes to the other SingleSource on the Scheduler
you specify.
Params: - timeout – the timeout amount
- unit – the time unit
- scheduler – the scheduler where the timeout is awaited and the subscription to other happens
- other – the other SingleSource that gets subscribed to if the current Single times out
Returns: the new Single instance Since: 2.0
/**
* Runs the current Single and if it doesn't signal within the specified timeout window, it is
* disposed and the other SingleSource subscribed to.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} subscribes to the other SingleSource on the {@link Scheduler} you specify.</dd>
* </dl>
* @param timeout the timeout amount
* @param unit the time unit
* @param scheduler the scheduler where the timeout is awaited and the subscription to other happens
* @param other the other SingleSource that gets subscribed to if the current Single times out
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> timeout(long timeout, TimeUnit unit, Scheduler scheduler, SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(timeout, unit, scheduler, other);
}
Runs the current Single and if it doesn't signal within the specified timeout window, it is
disposed and the other SingleSource subscribed to.
- Scheduler:
timeout
subscribes to the other SingleSource on the computation
Scheduler
.
Params: - timeout – the timeout amount
- unit – the time unit
- other – the other SingleSource that gets subscribed to if the current Single times out
Throws: - NullPointerException –
if other is null, or
if unit is null, or
if scheduler is null
Returns: the new Single instance Since: 2.0
/**
* Runs the current Single and if it doesn't signal within the specified timeout window, it is
* disposed and the other SingleSource subscribed to.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} subscribes to the other SingleSource on
* the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the timeout amount
* @param unit the time unit
* @param other the other SingleSource that gets subscribed to if the current Single times out
* @return the new Single instance
* @throws NullPointerException
* if other is null, or
* if unit is null, or
* if scheduler is null
* @since 2.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> timeout(long timeout, TimeUnit unit, SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(timeout, unit, Schedulers.computation(), other);
}
private Single<T> timeout0(final long timeout, final TimeUnit unit, final Scheduler scheduler, final SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleTimeout<T>(this, timeout, unit, scheduler, other));
}
Calls the specified converter function with the current Single instance
during assembly time and returns its result.
- Scheduler:
to
does not operate by default on a particular Scheduler
.
Params: - convert – the function that is called with the current Single instance during
assembly time that should return some value to be the result
Type parameters: - <R> – the result type
Returns: the value returned by the convert function
/**
* Calls the specified converter function with the current Single instance
* during assembly time and returns its result.
* <p>
* <img width="640" height="553" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.to.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code to} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the result type
* @param convert the function that is called with the current Single instance during
* assembly time that should return some value to be the result
*
* @return the value returned by the convert function
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R to(Function<? super Single<T>, R> convert) {
try {
return ObjectHelper.requireNonNull(convert, "convert is null").apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}
Returns a Completable
that discards result of the Single
and calls onComplete
when this source Single
calls onSuccess
. Error terminal event is propagated.
- Scheduler:
toCompletable
does not operate by default on a particular Scheduler
.
Returns: a Completable
that calls onComplete
on it's subscriber when the source Single
calls onSuccess
. Since: 2.0 Deprecated: see ignoreElement()
instead, will be removed in 3.0
/**
* Returns a {@link Completable} that discards result of the {@link Single}
* and calls {@code onComplete} when this source {@link Single} calls
* {@code onSuccess}. Error terminal event is propagated.
* <p>
* <img width="640" height="436" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toCompletable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Completable} that calls {@code onComplete} on it's subscriber when the source {@link Single}
* calls {@code onSuccess}.
* @since 2.0
* @deprecated see {@link #ignoreElement()} instead, will be removed in 3.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Deprecated
public final Completable toCompletable() {
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(this));
}
Returns a Completable
that ignores the success value of this Single
and calls onComplete
instead on the returned Completable
.
- Scheduler:
ignoreElement
does not operate by default on a particular Scheduler
.
Returns: a Completable
that calls onComplete
on it's observer when the source Single
calls onSuccess
. Since: 2.1.13
/**
* Returns a {@link Completable} that ignores the success value of this {@link Single}
* and calls {@code onComplete} instead on the returned {@code Completable}.
* <p>
* <img width="640" height="436" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.ignoreElement.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ignoreElement} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Completable} that calls {@code onComplete} on it's observer when the source {@link Single}
* calls {@code onSuccess}.
* @since 2.1.13
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable ignoreElement() {
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(this));
}
Converts this Single into a Flowable
.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
toFlowable
does not operate by default on a particular Scheduler
.
Returns: a Flowable
that emits a single item T or an error.
/**
* Converts this Single into a {@link Flowable}.
* <p>
* <img width="640" height="462" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toFlowable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Flowable} that emits a single item T or an error.
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public final Flowable<T> toFlowable() {
if (this instanceof FuseToFlowable) {
return ((FuseToFlowable<T>)this).fuseToFlowable();
}
return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
}
Returns a Future
representing the single value emitted by this Single
.
- Scheduler:
toFuture
does not operate by default on a particular Scheduler
.
See Also: Returns: a Future
that expects a single item to be emitted by this Single
/**
* Returns a {@link Future} representing the single value emitted by this {@code Single}.
* <p>
* <img width="640" height="467" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/Single.toFuture.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Future} that expects a single item to be emitted by this {@code Single}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Future<T> toFuture() {
return subscribeWith(new FutureSingleObserver<T>());
}
Converts this Single into a Maybe
.
- Scheduler:
toMaybe
does not operate by default on a particular Scheduler
.
Returns: a Maybe
that emits a single item T or an error.
/**
* Converts this Single into a {@link Maybe}.
* <p>
* <img width="640" height="463" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toMaybe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Maybe} that emits a single item T or an error.
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public final Maybe<T> toMaybe() {
if (this instanceof FuseToMaybe) {
return ((FuseToMaybe<T>)this).fuseToMaybe();
}
return RxJavaPlugins.onAssembly(new MaybeFromSingle<T>(this));
}
Converts this Single into an Observable
.
- Scheduler:
toObservable
does not operate by default on a particular Scheduler
.
Returns: an Observable
that emits a single item T or an error.
/**
* Converts this Single into an {@link Observable}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an {@link Observable} that emits a single item T or an error.
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public final Observable<T> toObservable() {
if (this instanceof FuseToObservable) {
return ((FuseToObservable<T>)this).fuseToObservable();
}
return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}
Returns a Single which makes sure when a SingleObserver disposes the Disposable,
that call is propagated up on the specified scheduler.
- Scheduler:
unsubscribeOn
calls dispose() of the upstream on the Scheduler
you specify.
History: 2.0.9 - experimental
Params: - scheduler – the target scheduler where to execute the disposal
Throws: - NullPointerException – if scheduler is null
Returns: the new Single instance Since: 2.2
/**
* Returns a Single which makes sure when a SingleObserver disposes the Disposable,
* that call is propagated up on the specified scheduler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.</dd>
* </dl>
* <p>History: 2.0.9 - experimental
* @param scheduler the target scheduler where to execute the disposal
* @return the new Single instance
* @throws NullPointerException if scheduler is null
* @since 2.2
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> unsubscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleUnsubscribeOn<T>(this, scheduler));
}
Returns a Single that emits the result of applying a specified function to the pair of items emitted by
the source Single and another specified Single.
- Scheduler:
zipWith
does not operate by default on a particular Scheduler
.
Params: - other –
the other SingleSource
- zipper –
a function that combines the pairs of items from the two SingleSources to generate the items to
be emitted by the resulting Single
Type parameters: See Also: Returns: a Single that pairs up values from the source Single and the other
SingleSource and emits the results of zipFunction
applied to these pairs
/**
* Returns a Single that emits the result of applying a specified function to the pair of items emitted by
* the source Single and another specified Single.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.zip.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code zipWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of items emitted by the {@code other} Single
* @param <R>
* the type of items emitted by the resulting Single
* @param other
* the other SingleSource
* @param zipper
* a function that combines the pairs of items from the two SingleSources to generate the items to
* be emitted by the resulting Single
* @return a Single that pairs up values from the source Single and the {@code other} SingleSource
* and emits the results of {@code zipFunction} applied to these pairs
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, R> Single<R> zipWith(SingleSource<U> other, BiFunction<? super T, ? super U, ? extends R> zipper) {
return zip(this, other, zipper);
}
// -------------------------------------------------------------------------
// Fluent test support, super handy and reduces test preparation boilerplate
// -------------------------------------------------------------------------
Creates a TestObserver and subscribes
it to this Single.
- Scheduler:
test
does not operate by default on a particular Scheduler
.
Returns: the new TestObserver instance Since: 2.0
/**
* Creates a TestObserver and subscribes
* it to this Single.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestObserver instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final TestObserver<T> test() {
TestObserver<T> to = new TestObserver<T>();
subscribe(to);
return to;
}
Creates a TestObserver optionally in cancelled state, then subscribes it to this Single.
- Scheduler:
test
does not operate by default on a particular Scheduler
.
Params: - cancelled – if true, the TestObserver will be cancelled before subscribing to this
Single.
Returns: the new TestObserver instance Since: 2.0
/**
* Creates a TestObserver optionally in cancelled state, then subscribes it to this Single.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param cancelled if true, the TestObserver will be cancelled before subscribing to this
* Single.
* @return the new TestObserver instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final TestObserver<T> test(boolean cancelled) {
TestObserver<T> to = new TestObserver<T>();
if (cancelled) {
to.cancel();
}
subscribe(to);
return to;
}
private static <T> Single<T> toSingle(Flowable<T> source) {
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(source, null));
}
}