Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the License for the specific language governing permissions and limitations under the License.
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex;
import java.util.concurrent.*;
import org.reactivestreams.Publisher;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.observers.*;
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.mixed.*;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
The Completable
class represents a deferred computation without any value but only indication for completion or exception. Completable
behaves similarly to Observable
except that it can only emit either a completion or error signal (there is no onNext
or onSuccess
as with the other reactive types).
The Completable
class implements the CompletableSource
base interface and the default consumer type it interacts with is the CompletableObserver
via the subscribe(CompletableObserver)
method. The Completable
operates with the following sequential protocol:
onSubscribe (onError | onComplete)?
Note that as with the Observable
protocol, onError
and onComplete
are mutually exclusive events.
Like Observable
, a running Completable
can be stopped through the Disposable
instance provided to consumers through SingleObserver.onSubscribe
.
Like an Observable
, a Completable
is lazy, can be either "hot" or "cold", synchronous or asynchronous. Completable
instances returned by the methods of this class are cold
and there is a standard hot implementation in the form of a subject: CompletableSubject
.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
See Flowable
or Observable
for the implementation of the Reactive Pattern for a stream or vector of values.
Example:
Disposable d = Completable.complete()
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(5000);
d.dispose();
Note that by design, subscriptions via subscribe(CompletableObserver)
can't be disposed from the outside (hence the void
return of the subscribe(CompletableObserver)
method) and it is the responsibility of the implementor of the CompletableObserver
to allow this to happen. RxJava supports such usage with the standard DisposableCompletableObserver
instance. For convenience, the subscribeWith(CompletableObserver)
method is provided as well to allow working with a CompletableObserver
(or subclass) instance to be applied with in a fluent manner (such as in the example above).
See Also:
/**
* The {@code Completable} class represents a deferred computation without any value but
* only indication for completion or exception.
* <p>
* {@code Completable} behaves similarly to {@link Observable} except that it can only emit either
* a completion or error signal (there is no {@code onNext} or {@code onSuccess} as with the other
* reactive types).
* <p>
* The {@code Completable} class implements the {@link CompletableSource} base interface and the default consumer
* type it interacts with is the {@link CompletableObserver} via the {@link #subscribe(CompletableObserver)} method.
* The {@code Completable} operates with the following sequential protocol:
* <pre><code>
* onSubscribe (onError | onComplete)?
* </code></pre>
* <p>
* Note that as with the {@code Observable} protocol, {@code onError} and {@code onComplete} are mutually exclusive events.
* <p>
* Like {@link Observable}, a running {@code Completable} can be stopped through the {@link Disposable} instance
* provided to consumers through {@link SingleObserver#onSubscribe}.
* <p>
* Like an {@code Observable}, a {@code Completable} is lazy, can be either "hot" or "cold", synchronous or
* asynchronous. {@code Completable} instances returned by the methods of this class are <em>cold</em>
* and there is a standard <em>hot</em> implementation in the form of a subject:
* {@link io.reactivex.subjects.CompletableSubject CompletableSubject}.
* <p>
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
* <p>
* <img width="640" height="577" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.png" alt="">
* <p>
* See {@link Flowable} or {@link Observable} for the
* implementation of the Reactive Pattern for a stream or vector of values.
* <p>
* Example:
* <pre><code>
* Disposable d = Completable.complete()
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .subscribeWith(new DisposableCompletableObserver() {
* @Override
* public void onStart() {
* System.out.println("Started");
* }
*
* @Override
* public void onError(Throwable error) {
* error.printStackTrace();
* }
*
* @Override
* public void onComplete() {
* System.out.println("Done!");
* }
* });
*
* Thread.sleep(5000);
*
* d.dispose();
* </code></pre>
* <p>
* Note that by design, subscriptions via {@link #subscribe(CompletableObserver)} can't be disposed
* from the outside (hence the
* {@code void} return of the {@link #subscribe(CompletableObserver)} method) and it is the
* responsibility of the implementor of the {@code CompletableObserver} to allow this to happen.
* RxJava supports such usage with the standard
* {@link io.reactivex.observers.DisposableCompletableObserver DisposableCompletableObserver} instance.
* For convenience, the {@link #subscribeWith(CompletableObserver)} method is provided as well to
* allow working with a {@code CompletableObserver} (or subclass) instance to be applied with in
* a fluent manner (such as in the example above).
*
* @see io.reactivex.observers.DisposableCompletableObserver
*/
public abstract class Completable implements CompletableSource {
Returns a Completable which terminates as soon as one of the source Completables
terminates (normally or with an error) and disposes all other Completables.
- Scheduler:
ambArray
does not operate by default on a particular Scheduler
.
Params: - sources – the array of source Completables. A subscription to each source will
occur in the same order as in this array.
Throws: - NullPointerException – if sources is null
Returns: the new Completable instance
/**
* Returns a Completable which terminates as soon as one of the source Completables
* terminates (normally or with an error) and disposes all other Completables.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.ambArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of source Completables. A subscription to each source will
* occur in the same order as in this array.
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable ambArray(final CompletableSource... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return complete();
}
if (sources.length == 1) {
return wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new CompletableAmb(sources, null));
}
Returns a Completable which terminates as soon as one of the source Completables
terminates (normally or with an error) and disposes all other Completables.
- Scheduler:
amb
does not operate by default on a particular Scheduler
.
Params: - sources – the array of source Completables. A subscription to each source will
occur in the same order as in this Iterable.
Throws: - NullPointerException – if sources is null
Returns: the new Completable instance
/**
* Returns a Completable which terminates as soon as one of the source Completables
* terminates (normally or with an error) and disposes all other Completables.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.amb.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of source Completables. A subscription to each source will
* occur in the same order as in this Iterable.
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable amb(final Iterable<? extends CompletableSource> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new CompletableAmb(null, sources));
}
Returns a Completable instance that completes immediately when subscribed to.
- Scheduler:
complete
does not operate by default on a particular Scheduler
.
Returns: a Completable instance that completes immediately
/**
* Returns a Completable instance that completes immediately when subscribed to.
* <p>
* <img width="640" height="472" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.complete.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code complete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return a Completable instance that completes immediately
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable complete() {
return RxJavaPlugins.onAssembly(CompletableEmpty.INSTANCE);
}
Returns a Completable which completes only when all sources complete, one after another.
- Scheduler:
concatArray
does not operate by default on a particular Scheduler
.
Params: - sources – the sources to concatenate
Throws: - NullPointerException – if sources is null
Returns: the Completable instance which completes only when all sources complete
/**
* Returns a Completable which completes only when all sources complete, one after another.
* <p>
* <img width="640" height="283" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sources to concatenate
* @return the Completable instance which completes only when all sources complete
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable concatArray(CompletableSource... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return complete();
} else
if (sources.length == 1) {
return wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources));
}
Returns a Completable which completes only when all sources complete, one after another.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the sources to concatenate
Throws: - NullPointerException – if sources is null
Returns: the Completable instance which completes only when all sources complete
/**
* Returns a Completable which completes only when all sources complete, one after another.
* <p>
* <img width="640" height="303" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concat.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sources to concatenate
* @return the Completable instance which completes only when all sources complete
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable concat(Iterable<? extends CompletableSource> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new CompletableConcatIterable(sources));
}
Returns a Completable which completes only when all sources complete, one after another.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the sources to concatenate
Throws: - NullPointerException – if sources is null
Returns: the Completable instance which completes only when all sources complete
/**
* Returns a Completable which completes only when all sources complete, one after another.
* <p>
* <img width="640" height="237" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concat.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sources to concatenate
* @return the Completable instance which completes only when all sources complete
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static Completable concat(Publisher<? extends CompletableSource> sources) {
return concat(sources, 2);
}
Returns a Completable which completes only when all sources complete, one after another.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
concat
does not operate by default on a particular Scheduler
.
Params: - sources – the sources to concatenate
- prefetch – the number of sources to prefetch from the sources
Throws: - NullPointerException – if sources is null
Returns: the Completable instance which completes only when all sources complete
/**
* Returns a Completable which completes only when all sources complete, one after another.
* <p>
* <img width="640" height="237" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concat.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sources to concatenate
* @param prefetch the number of sources to prefetch from the sources
* @return the Completable instance which completes only when all sources complete
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static Completable concat(Publisher<? extends CompletableSource> sources, int prefetch) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, prefetch));
}
Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
Example:
Completable.create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onComplete();
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
- Scheduler:
create
does not operate by default on a particular Scheduler
.
Params: - source – the emitter that is called when a CompletableObserver subscribes to the returned
Completable
See Also: Returns: the new Completable instance
/**
* Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
* <p>
* <img width="640" height="442" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.create.png" alt="">
* <p>
* Example:
* <pre><code>
* Completable.create(emitter -> {
* Callback listener = new Callback() {
* @Override
* public void onEvent(Event e) {
* emitter.onComplete();
* }
*
* @Override
* public void onFailure(Exception e) {
* emitter.onError(e);
* }
* };
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellable(c::close);
*
* });
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param source the emitter that is called when a CompletableObserver subscribes to the returned {@code Completable}
* @return the new Completable instance
* @see CompletableOnSubscribe
* @see Cancellable
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable create(CompletableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new CompletableCreate(source));
}
Constructs a Completable instance by wrapping the given source callback
without any safeguards; you should manage the lifecycle and response
to downstream disposal.
- Scheduler:
unsafeCreate
does not operate by default on a particular Scheduler
.
Params: - source – the callback which will receive the CompletableObserver instances
when the Completable is subscribed to.
Throws: - NullPointerException – if source is null
Returns: the created Completable instance
/**
* Constructs a Completable instance by wrapping the given source callback
* <strong>without any safeguards; you should manage the lifecycle and response
* to downstream disposal</strong>.
* <p>
* <img width="640" height="260" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.unsafeCreate.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code unsafeCreate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param source the callback which will receive the CompletableObserver instances
* when the Completable is subscribed to.
* @return the created Completable instance
* @throws NullPointerException if source is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable unsafeCreate(CompletableSource source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Completable) {
throw new IllegalArgumentException("Use of unsafeCreate(Completable)!");
}
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
}
Defers the subscription to a Completable instance returned by a supplier.
- Scheduler:
defer
does not operate by default on a particular Scheduler
.
Params: - completableSupplier – the supplier that returns the Completable that will be subscribed to.
Returns: the Completable instance
/**
* Defers the subscription to a Completable instance returned by a supplier.
* <p>
* <img width="640" height="298" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.defer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code defer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param completableSupplier the supplier that returns the Completable that will be subscribed to.
* @return the Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable defer(final Callable<? extends CompletableSource> completableSupplier) {
ObjectHelper.requireNonNull(completableSupplier, "completableSupplier");
return RxJavaPlugins.onAssembly(new CompletableDefer(completableSupplier));
}
Creates a Completable which calls the given error supplier for each subscriber
and emits its returned Throwable.
If the errorSupplier returns null, the child CompletableObservers will receive a
NullPointerException.
- Scheduler:
error
does not operate by default on a particular Scheduler
.
Params: - errorSupplier – the error supplier, not null
Throws: - NullPointerException – if errorSupplier is null
Returns: the new Completable instance
/**
* Creates a Completable which calls the given error supplier for each subscriber
* and emits its returned Throwable.
* <p>
* <img width="640" height="462" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.error.f.png" alt="">
* <p>
* If the errorSupplier returns null, the child CompletableObservers will receive a
* NullPointerException.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code error} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param errorSupplier the error supplier, not null
* @return the new Completable instance
* @throws NullPointerException if errorSupplier is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable error(final Callable<? extends Throwable> errorSupplier) {
ObjectHelper.requireNonNull(errorSupplier, "errorSupplier is null");
return RxJavaPlugins.onAssembly(new CompletableErrorSupplier(errorSupplier));
}
Creates a Completable instance that emits the given Throwable exception to subscribers.
- Scheduler:
error
does not operate by default on a particular Scheduler
.
Params: - error – the Throwable instance to emit, not null
Throws: - NullPointerException – if error is null
Returns: the new Completable instance
/**
* Creates a Completable instance that emits the given Throwable exception to subscribers.
* <p>
* <img width="640" height="462" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.error.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code error} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param error the Throwable instance to emit, not null
* @return the new Completable instance
* @throws NullPointerException if error is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable error(final Throwable error) {
ObjectHelper.requireNonNull(error, "error is null");
return RxJavaPlugins.onAssembly(new CompletableError(error));
}
Returns a Completable instance that runs the given Action for each subscriber and
emits either an unchecked exception or simply completes.
- Scheduler:
fromAction
does not operate by default on a particular Scheduler
.
- Error handling:
- If the
Action
throws an exception, the respective Throwable
is delivered to the downstream via CompletableObserver.onError(Throwable)
, except when the downstream has disposed this Completable
source. In this latter case, the Throwable
is delivered to the global error handler via RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
Params: - run – the runnable to run for each subscriber
Throws: - NullPointerException – if run is null
Returns: the new Completable instance
/**
* Returns a Completable instance that runs the given Action for each subscriber and
* emits either an unchecked exception or simply completes.
* <p>
* <img width="640" height="297" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromAction.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromAction} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Action} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link CompletableObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Completable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param run the runnable to run for each subscriber
* @return the new Completable instance
* @throws NullPointerException if run is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable fromAction(final Action run) {
ObjectHelper.requireNonNull(run, "run is null");
return RxJavaPlugins.onAssembly(new CompletableFromAction(run));
}
Returns a Completable which when subscribed, executes the callable function, ignores its
normal result and emits onError or onComplete only.
- Scheduler:
fromCallable
does not operate by default on a particular Scheduler
.
- Error handling:
- If the
Callable
throws an exception, the respective Throwable
is delivered to the downstream via CompletableObserver.onError(Throwable)
, except when the downstream has disposed this Completable
source. In this latter case, the Throwable
is delivered to the global error handler via RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
Params: - callable – the callable instance to execute for each subscriber
Returns: the new Completable instance
/**
* Returns a Completable which when subscribed, executes the callable function, ignores its
* normal result and emits onError or onComplete only.
* <p>
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromCallable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Callable} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link CompletableObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Completable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param callable the callable instance to execute for each subscriber
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable fromCallable(final Callable<?> callable) {
ObjectHelper.requireNonNull(callable, "callable is null");
return RxJavaPlugins.onAssembly(new CompletableFromCallable(callable));
}
Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
Note that if any of the observers to this Completable call dispose, this Completable will cancel the future.
- Scheduler:
fromFuture
does not operate by default on a particular Scheduler
.
Params: - future – the future to react to
Returns: the new Completable instance
/**
* Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
* <p>
* <img width="640" height="628" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromFuture.png" alt="">
* <p>
* Note that if any of the observers to this Completable call dispose, this Completable will cancel the future.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param future the future to react to
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable fromFuture(final Future<?> future) {
ObjectHelper.requireNonNull(future, "future is null");
return fromAction(Functions.futureAction(future));
}
Returns a Completable instance that when subscribed to, subscribes to the Maybe
instance and emits a completion event if the maybe emits onSuccess
/onComplete
or forwards any onError
events.
- Scheduler:
fromMaybe
does not operate by default on a particular Scheduler
.
History: 2.1.17 - beta
Params: - maybe – the Maybe instance to subscribe to, not null
Type parameters: - <T> – the value type of the
MaybeSource
element
Throws: - NullPointerException – if single is null
Returns: the new Completable instance Since: 2.2
/**
* Returns a Completable instance that when subscribed to, subscribes to the {@code Maybe} instance and
* emits a completion event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any
* {@code onError} events.
* <p>
* <img width="640" height="235" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromMaybe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.17 - beta
* @param <T> the value type of the {@link MaybeSource} element
* @param maybe the Maybe instance to subscribe to, not null
* @return the new Completable instance
* @throws NullPointerException if single is null
* @since 2.2
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Completable fromMaybe(final MaybeSource<T> maybe) {
ObjectHelper.requireNonNull(maybe, "maybe is null");
return RxJavaPlugins.onAssembly(new MaybeIgnoreElementCompletable<T>(maybe));
}
Returns a Completable instance that runs the given Runnable for each subscriber and
emits either its exception or simply completes.
- Scheduler:
fromRunnable
does not operate by default on a particular Scheduler
.
- Error handling:
- If the
Runnable
throws an exception, the respective Throwable
is delivered to the downstream via CompletableObserver.onError(Throwable)
, except when the downstream has disposed this Completable
source. In this latter case, the Throwable
is delivered to the global error handler via RxJavaPlugins.onError(Throwable)
as an UndeliverableException
.
Params: - run – the runnable to run for each subscriber
Throws: - NullPointerException – if run is null
Returns: the new Completable instance
/**
* Returns a Completable instance that runs the given Runnable for each subscriber and
* emits either its exception or simply completes.
* <p>
* <img width="640" height="297" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromRunnable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd> If the {@link Runnable} throws an exception, the respective {@link Throwable} is
* delivered to the downstream via {@link CompletableObserver#onError(Throwable)},
* except when the downstream has disposed this {@code Completable} source.
* In this latter case, the {@code Throwable} is delivered to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} as an {@link io.reactivex.exceptions.UndeliverableException UndeliverableException}.
* </dd>
* </dl>
* @param run the runnable to run for each subscriber
* @return the new Completable instance
* @throws NullPointerException if run is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable fromRunnable(final Runnable run) {
ObjectHelper.requireNonNull(run, "run is null");
return RxJavaPlugins.onAssembly(new CompletableFromRunnable(run));
}
Returns a Completable instance that subscribes to the given Observable, ignores all values and
emits only the terminal event.
- Scheduler:
fromObservable
does not operate by default on a particular Scheduler
.
Params: - observable – the Observable instance to subscribe to, not null
Type parameters: - <T> – the type of the Observable
Throws: - NullPointerException – if flowable is null
Returns: the new Completable instance
/**
* Returns a Completable instance that subscribes to the given Observable, ignores all values and
* emits only the terminal event.
* <p>
* <img width="640" height="414" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the type of the Observable
* @param observable the Observable instance to subscribe to, not null
* @return the new Completable instance
* @throws NullPointerException if flowable is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Completable fromObservable(final ObservableSource<T> observable) {
ObjectHelper.requireNonNull(observable, "observable is null");
return RxJavaPlugins.onAssembly(new CompletableFromObservable<T>(observable));
}
Returns a Completable instance that subscribes to the given publisher, ignores all values and
emits only the terminal event.
The Publisher
must follow the Reactive Streams specification.
Violating the specification may result in undefined behavior.
If possible, use create(CompletableOnSubscribe)
to create a source-like Completable
instead.
Note that even though Publisher
appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
fromPublisher
does not operate by default on a particular Scheduler
.
Params: - publisher – the Publisher instance to subscribe to, not null
Type parameters: - <T> – the type of the publisher
Throws: - NullPointerException – if publisher is null
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that subscribes to the given publisher, ignores all values and
* emits only the terminal event.
* <p>
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromPublisher.png" alt="">
* <p>
* The {@link Publisher} must follow the
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams">Reactive Streams specification</a>.
* Violating the specification may result in undefined behavior.
* <p>
* If possible, use {@link #create(CompletableOnSubscribe)} to create a
* source-like {@code Completable} instead.
* <p>
* Note that even though {@link Publisher} appears to be a functional interface, it
* is not recommended to implement it through a lambda as the specification requires
* state management that is not achievable with a stateless lambda.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the type of the publisher
* @param publisher the Publisher instance to subscribe to, not null
* @return the new Completable instance
* @throws NullPointerException if publisher is null
* @see #create(CompletableOnSubscribe)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Completable fromPublisher(final Publisher<T> publisher) {
ObjectHelper.requireNonNull(publisher, "publisher is null");
return RxJavaPlugins.onAssembly(new CompletableFromPublisher<T>(publisher));
}
Returns a Completable instance that when subscribed to, subscribes to the Single instance and
emits a completion event if the single emits onSuccess or forwards any onError events.
- Scheduler:
fromSingle
does not operate by default on a particular Scheduler
.
Params: - single – the Single instance to subscribe to, not null
Type parameters: - <T> – the value type of the Single
Throws: - NullPointerException – if single is null
Returns: the new Completable instance
/**
* Returns a Completable instance that when subscribed to, subscribes to the Single instance and
* emits a completion event if the single emits onSuccess or forwards any onError events.
* <p>
* <img width="640" height="356" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the Single
* @param single the Single instance to subscribe to, not null
* @return the new Completable instance
* @throws NullPointerException if single is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Completable fromSingle(final SingleSource<T> single) {
ObjectHelper.requireNonNull(single, "single is null");
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(single));
}
Returns a Completable instance that subscribes to all sources at once and
completes only when all source Completables complete or one of them emits an error.
- Scheduler:
mergeArray
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
CompletableSource
s signal a Throwable
via onError
, the resulting Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed. If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Completable
has been disposed or terminated with a (composite) error will be sent to the same global error handler. Use mergeArrayDelayError(CompletableSource...)
to merge sources and terminate only when all source CompletableSource
s have completed or failed with an error.
Params: - sources – the iterable sequence of sources.
Throws: - NullPointerException – if sources is null
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="270" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeArray.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeArray} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code CompletableSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Completable} terminates with that {@code Throwable} and all other source {@code CompletableSource}s are disposed.
* If more than one {@code CompletableSource} signals an error, the resulting {@code Completable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Completable} has been disposed or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeArrayDelayError(CompletableSource...)} to merge sources and terminate only when all source {@code CompletableSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param sources the iterable sequence of sources.
* @return the new Completable instance
* @throws NullPointerException if sources is null
* @see #mergeArrayDelayError(CompletableSource...)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable mergeArray(CompletableSource... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return complete();
} else
if (sources.length == 1) {
return wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources));
}
Returns a Completable instance that subscribes to all sources at once and
completes only when all source Completables complete or one of them emits an error.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
CompletableSource
s signal a Throwable
via onError
, the resulting Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed. If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Completable
has been disposed or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(Iterable<? extends CompletableSource>)
to merge sources and terminate only when all source CompletableSource
s have completed or failed with an error.
Params: - sources – the iterable sequence of sources.
Throws: - NullPointerException – if sources is null
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="311" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code CompletableSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Completable} terminates with that {@code Throwable} and all other source {@code CompletableSource}s are disposed.
* If more than one {@code CompletableSource} signals an error, the resulting {@code Completable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Completable} has been disposed or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(Iterable)} to merge sources and terminate only when all source {@code CompletableSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param sources the iterable sequence of sources.
* @return the new Completable instance
* @throws NullPointerException if sources is null
* @see #mergeDelayError(Iterable)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable merge(Iterable<? extends CompletableSource> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new CompletableMergeIterable(sources));
}
Returns a Completable instance that subscribes to all sources at once and
completes only when all source Completables complete or one of them emits an error.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
CompletableSource
s signal a Throwable
via onError
, the resulting Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed. If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Completable
has been disposed or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(Publisher<? extends CompletableSource>)
to merge sources and terminate only when all source CompletableSource
s have completed or failed with an error.
Params: - sources – the iterable sequence of sources.
Throws: - NullPointerException – if sources is null
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that subscribes to all sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="336" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code CompletableSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Completable} terminates with that {@code Throwable} and all other source {@code CompletableSource}s are disposed.
* If more than one {@code CompletableSource} signals an error, the resulting {@code Completable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Completable} has been disposed or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(Publisher)} to merge sources and terminate only when all source {@code CompletableSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param sources the iterable sequence of sources.
* @return the new Completable instance
* @throws NullPointerException if sources is null
* @see #mergeDelayError(Publisher)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public static Completable merge(Publisher<? extends CompletableSource> sources) {
return merge0(sources, Integer.MAX_VALUE, false);
}
Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
completes only when all source Completables complete or one of them emits an error.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
merge
does not operate by default on a particular Scheduler
.
- Error handling:
- If any of the source
CompletableSource
s signal a Throwable
via onError
, the resulting Completable
terminates with that Throwable
and all other source CompletableSource
s are disposed. If more than one CompletableSource
signals an error, the resulting Completable
may terminate with the first one's error or, depending on the concurrency of the sources, may terminate with a CompositeException
containing two or more of the various error signals. Throwable
s that didn't make into the composite will be sent (individually) to the global error handler via RxJavaPlugins.onError(Throwable)
method as UndeliverableException
errors. Similarly, Throwable
s signaled by source(s) after the returned Completable
has been disposed or terminated with a (composite) error will be sent to the same global error handler. Use mergeDelayError(Publisher<? extends CompletableSource>, int)
to merge sources and terminate only when all source CompletableSource
s have completed or failed with an error.
Params: - sources – the iterable sequence of sources.
- maxConcurrency – the maximum number of concurrent subscriptions
Throws: - NullPointerException – if sources is null
- IllegalArgumentException – if maxConcurrency is less than 1
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
* completes only when all source Completables complete or one of them emits an error.
* <p>
* <img width="640" height="269" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code CompletableSource}s signal a {@code Throwable} via {@code onError}, the resulting
* {@code Completable} terminates with that {@code Throwable} and all other source {@code CompletableSource}s are disposed.
* If more than one {@code CompletableSource} signals an error, the resulting {@code Completable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@code CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Completable} has been disposed or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeDelayError(Publisher, int)} to merge sources and terminate only when all source {@code CompletableSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param sources the iterable sequence of sources.
* @param maxConcurrency the maximum number of concurrent subscriptions
* @return the new Completable instance
* @throws NullPointerException if sources is null
* @throws IllegalArgumentException if maxConcurrency is less than 1
* @see #mergeDelayError(Publisher, int)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static Completable merge(Publisher<? extends CompletableSource> sources, int maxConcurrency) {
return merge0(sources, maxConcurrency, false);
}
Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
completes only when all source Completables terminate in one way or another, combining any exceptions
thrown by either the sources Observable or the inner Completable instances.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well. - Scheduler:
merge0
does not operate by default on a particular Scheduler
.
Params: - sources – the iterable sequence of sources.
- maxConcurrency – the maximum number of concurrent subscriptions
- delayErrors – delay all errors from the main source and from the inner Completables?
Throws: - NullPointerException – if sources is null
- IllegalArgumentException – if maxConcurrency is less than 1
Returns: the new Completable instance
/**
* Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
* completes only when all source Completables terminate in one way or another, combining any exceptions
* thrown by either the sources Observable or the inner Completable instances.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge0} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the iterable sequence of sources.
* @param maxConcurrency the maximum number of concurrent subscriptions
* @param delayErrors delay all errors from the main source and from the inner Completables?
* @return the new Completable instance
* @throws NullPointerException if sources is null
* @throws IllegalArgumentException if maxConcurrency is less than 1
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
private static Completable merge0(Publisher<? extends CompletableSource> sources, int maxConcurrency, boolean delayErrors) {
ObjectHelper.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new CompletableMerge(sources, maxConcurrency, delayErrors));
}
Returns a CompletableConsumable that subscribes to all Completables in the source array and delays
any error emitted by either the sources observable or any of the inner Completables until all of
them terminate in a way or another.
- Scheduler:
mergeArrayDelayError
does not operate by default on a particular Scheduler
.
Params: - sources – the array of Completables
Throws: - NullPointerException – if sources is null
Returns: the new Completable instance
/**
* Returns a CompletableConsumable that subscribes to all Completables in the source array and delays
* any error emitted by either the sources observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeArrayDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the array of Completables
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable mergeArrayDelayError(CompletableSource... sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new CompletableMergeDelayErrorArray(sources));
}
Returns a Completable that subscribes to all Completables in the source sequence and delays
any error emitted by either the sources observable or any of the inner Completables until all of
them terminate in a way or another.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
Params: - sources – the sequence of Completables
Throws: - NullPointerException – if sources is null
Returns: the new Completable instance
/**
* Returns a Completable that subscribes to all Completables in the source sequence and delays
* any error emitted by either the sources observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sequence of Completables
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable mergeDelayError(Iterable<? extends CompletableSource> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new CompletableMergeDelayErrorIterable(sources));
}
Returns a Completable that subscribes to all Completables in the source sequence and delays
any error emitted by either the sources observable or any of the inner Completables until all of
them terminate in a way or another.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
Params: - sources – the sequence of Completables
Throws: - NullPointerException – if sources is null
Returns: the new Completable instance
/**
* Returns a Completable that subscribes to all Completables in the source sequence and delays
* any error emitted by either the sources observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="466" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sequence of Completables
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public static Completable mergeDelayError(Publisher<? extends CompletableSource> sources) {
return merge0(sources, Integer.MAX_VALUE, true);
}
Returns a Completable that subscribes to a limited number of inner Completables at once in
the source sequence and delays any error emitted by either the sources
observable or any of the inner Completables until all of
them terminate in a way or another.
- Backpressure:
- The returned
Completable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
mergeDelayError
does not operate by default on a particular Scheduler
.
Params: - sources – the sequence of Completables
- maxConcurrency – the maximum number of concurrent subscriptions to Completables
Throws: - NullPointerException – if sources is null
Returns: the new Completable instance
/**
* Returns a Completable that subscribes to a limited number of inner Completables at once in
* the source sequence and delays any error emitted by either the sources
* observable or any of the inner Completables until all of
* them terminate in a way or another.
* <p>
* <img width="640" height="440" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.pn.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param sources the sequence of Completables
* @param maxConcurrency the maximum number of concurrent subscriptions to Completables
* @return the new Completable instance
* @throws NullPointerException if sources is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static Completable mergeDelayError(Publisher<? extends CompletableSource> sources, int maxConcurrency) {
return merge0(sources, maxConcurrency, true);
}
Returns a Completable that never calls onError or onComplete.
- Scheduler:
never
does not operate by default on a particular Scheduler
.
Returns: the singleton instance that never calls onError or onComplete
/**
* Returns a Completable that never calls onError or onComplete.
* <p>
* <img width="640" height="512" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.never.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code never} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the singleton instance that never calls onError or onComplete
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable never() {
return RxJavaPlugins.onAssembly(CompletableNever.INSTANCE);
}
Returns a Completable instance that fires its onComplete event after the given delay elapsed.
- Scheduler:
timer
does operate by default on the computation
Scheduler
.
Params: - delay – the delay time
- unit – the delay unit
Returns: the new Completable instance
/**
* Returns a Completable instance that fires its onComplete event after the given delay elapsed.
* <p>
* <img width="640" height="413" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timer} does operate by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param delay the delay time
* @param unit the delay unit
* @return the new Completable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Completable timer(long delay, TimeUnit unit) {
return timer(delay, unit, Schedulers.computation());
}
Returns a Completable instance that fires its onComplete event after the given delay elapsed
by using the supplied scheduler.
- Scheduler:
timer
operates on the Scheduler
you specify.
Params: - delay – the delay time
- unit – the delay unit
- scheduler – the scheduler where to emit the complete event
Returns: the new Completable instance
/**
* Returns a Completable instance that fires its onComplete event after the given delay elapsed
* by using the supplied scheduler.
* <p>
* <img width="640" height="413" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timer.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timer} operates on the {@link Scheduler} you specify.</dd>
* </dl>
* @param delay the delay time
* @param unit the delay unit
* @param scheduler the scheduler where to emit the complete event
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Completable timer(final long delay, final TimeUnit unit, final Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new CompletableTimer(delay, unit, scheduler));
}
Creates a NullPointerException instance and sets the given Throwable as its initial cause.
Params: - ex – the Throwable instance to use as cause, not null (not verified)
Returns: the created NullPointerException
/**
* Creates a NullPointerException instance and sets the given Throwable as its initial cause.
* @param ex the Throwable instance to use as cause, not null (not verified)
* @return the created NullPointerException
*/
private static NullPointerException toNpe(Throwable ex) {
NullPointerException npe = new NullPointerException("Actually not, but can't pass out an exception otherwise...");
npe.initCause(ex);
return npe;
}
Returns a Completable instance which manages a resource along
with a custom Completable instance while the subscription is active.
This overload disposes eagerly before the terminal event is emitted.
- Scheduler:
using
does not operate by default on a particular Scheduler
.
Params: - resourceSupplier – the supplier that returns a resource to be managed.
- completableFunction – the function that given a resource returns a Completable instance that will be subscribed to
- disposer – the consumer that disposes the resource created by the resource supplier
Type parameters: - <R> – the resource type
Returns: the new Completable instance
/**
* Returns a Completable instance which manages a resource along
* with a custom Completable instance while the subscription is active.
* <p>
* <img width="640" height="388" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.using.png" alt="">
* <p>
* This overload disposes eagerly before the terminal event is emitted.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the resource type
* @param resourceSupplier the supplier that returns a resource to be managed.
* @param completableFunction the function that given a resource returns a Completable instance that will be subscribed to
* @param disposer the consumer that disposes the resource created by the resource supplier
* @return the new Completable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <R> Completable using(Callable<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction,
Consumer<? super R> disposer) {
return using(resourceSupplier, completableFunction, disposer, true);
}
Returns a Completable instance which manages a resource along
with a custom Completable instance while the subscription is active and performs eager or lazy
resource disposition.
If this overload performs a lazy disposal after the terminal event is emitted.
Exceptions thrown at this time will be delivered to RxJavaPlugins only.
- Scheduler:
using
does not operate by default on a particular Scheduler
.
Params: - resourceSupplier – the supplier that returns a resource to be managed
- completableFunction – the function that given a resource returns a non-null
Completable instance that will be subscribed to
- disposer – the consumer that disposes the resource created by the resource supplier
- eager – if true, the resource is disposed before the terminal event is emitted, if false, the
resource is disposed after the terminal event has been emitted
Type parameters: - <R> – the resource type
Returns: the new Completable instance
/**
* Returns a Completable instance which manages a resource along
* with a custom Completable instance while the subscription is active and performs eager or lazy
* resource disposition.
* <p>
* <img width="640" height="332" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.using.b.png" alt="">
* <p>
* If this overload performs a lazy disposal after the terminal event is emitted.
* Exceptions thrown at this time will be delivered to RxJavaPlugins only.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the resource type
* @param resourceSupplier the supplier that returns a resource to be managed
* @param completableFunction the function that given a resource returns a non-null
* Completable instance that will be subscribed to
* @param disposer the consumer that disposes the resource created by the resource supplier
* @param eager if true, the resource is disposed before the terminal event is emitted, if false, the
* resource is disposed after the terminal event has been emitted
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <R> Completable using(
final Callable<R> resourceSupplier,
final Function<? super R, ? extends CompletableSource> completableFunction,
final Consumer<? super R> disposer,
final boolean eager) {
ObjectHelper.requireNonNull(resourceSupplier, "resourceSupplier is null");
ObjectHelper.requireNonNull(completableFunction, "completableFunction is null");
ObjectHelper.requireNonNull(disposer, "disposer is null");
return RxJavaPlugins.onAssembly(new CompletableUsing<R>(resourceSupplier, completableFunction, disposer, eager));
}
Wraps the given CompletableSource into a Completable
if not already Completable.
- Scheduler:
wrap
does not operate by default on a particular Scheduler
.
Params: - source – the source to wrap
Throws: - NullPointerException – if source is null
Returns: the source or its wrapper Completable
/**
* Wraps the given CompletableSource into a Completable
* if not already Completable.
* <p>
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.wrap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code wrap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param source the source to wrap
* @return the source or its wrapper Completable
* @throws NullPointerException if source is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable wrap(CompletableSource source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Completable) {
return RxJavaPlugins.onAssembly((Completable)source);
}
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
}
Returns a Completable that emits the a terminated event of either this Completable
or the other Completable whichever fires first.
- Scheduler:
ambWith
does not operate by default on a particular Scheduler
.
Params: - other – the other Completable, not null. A subscription to this provided source will occur after subscribing
to the current source.
Throws: - NullPointerException – if other is null
Returns: the new Completable instance
/**
* Returns a Completable that emits the a terminated event of either this Completable
* or the other Completable whichever fires first.
* <p>
* <img width="640" height="484" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.ambWith.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other Completable, not null. A subscription to this provided source will occur after subscribing
* to the current source.
* @return the new Completable instance
* @throws NullPointerException if other is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable ambWith(CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return ambArray(this, other);
}
Returns an Observable which will subscribe to this Completable and once that is completed then will subscribe to the next
ObservableSource. An error event from this Completable will be propagated to the downstream subscriber and will result in skipping the subscription of the Observable.
- Scheduler:
andThen
does not operate by default on a particular Scheduler
.
Params: - next – the Observable to subscribe after this Completable is completed, not null
Type parameters: - <T> – the value type of the next ObservableSource
Throws: - NullPointerException – if next is null
Returns: Observable that composes this Completable and next
/**
* Returns an Observable which will subscribe to this Completable and once that is completed then
* will subscribe to the {@code next} ObservableSource. An error event from this Completable will be
* propagated to the downstream subscriber and will result in skipping the subscription of the
* Observable.
* <p>
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.andThen.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the next ObservableSource
* @param next the Observable to subscribe after this Completable is completed, not null
* @return Observable that composes this Completable and next
* @throws NullPointerException if next is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Observable<T> andThen(ObservableSource<T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new CompletableAndThenObservable<T>(this, next));
}
Returns a Flowable which will subscribe to this Completable and once that is completed then will subscribe to the next
Flowable. An error event from this Completable will be propagated to the downstream subscriber and will result in skipping the subscription of the Publisher.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
andThen
does not operate by default on a particular Scheduler
.
Params: - next – the Publisher to subscribe after this Completable is completed, not null
Type parameters: - <T> – the value type of the next Publisher
Throws: - NullPointerException – if next is null
Returns: Flowable that composes this Completable and next
/**
* Returns a Flowable which will subscribe to this Completable and once that is completed then
* will subscribe to the {@code next} Flowable. An error event from this Completable will be
* propagated to the downstream subscriber and will result in skipping the subscription of the
* Publisher.
* <p>
* <img width="640" height="249" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.andThen.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the next Publisher
* @param next the Publisher to subscribe after this Completable is completed, not null
* @return Flowable that composes this Completable and next
* @throws NullPointerException if next is null
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Flowable<T> andThen(Publisher<T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new CompletableAndThenPublisher<T>(this, next));
}
Returns a Single which will subscribe to this Completable and once that is completed then will subscribe to the next
SingleSource. An error event from this Completable will be propagated to the downstream subscriber and will result in skipping the subscription of the Single.
- Scheduler:
andThen
does not operate by default on a particular Scheduler
.
Params: - next – the Single to subscribe after this Completable is completed, not null
Type parameters: - <T> – the value type of the next SingleSource
Returns: Single that composes this Completable and next
/**
* Returns a Single which will subscribe to this Completable and once that is completed then
* will subscribe to the {@code next} SingleSource. An error event from this Completable will be
* propagated to the downstream subscriber and will result in skipping the subscription of the
* Single.
* <p>
* <img width="640" height="437" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.andThen.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the value type of the next SingleSource
* @param next the Single to subscribe after this Completable is completed, not null
* @return Single that composes this Completable and next
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Single<T> andThen(SingleSource<T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new SingleDelayWithCompletable<T>(next, this));
}
Returns a Maybe
which will subscribe to this Completable and once that is completed then will subscribe to the next
MaybeSource. An error event from this Completable will be propagated to the downstream subscriber and will result in skipping the subscription of the Maybe.
- Scheduler:
andThen
does not operate by default on a particular Scheduler
.
Params: - next – the Maybe to subscribe after this Completable is completed, not null
Type parameters: - <T> – the value type of the next MaybeSource
Returns: Maybe that composes this Completable and next
/**
* Returns a {@link Maybe} which will subscribe to this Completable and once that is completed then
* will subscribe to the {@code next} MaybeSource. An error event from this Completable will be
* propagated to the downstream subscriber and will result in skipping the subscription of the
* Maybe.
* <p>
* <img width="640" height="280" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.andThen.m.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the value type of the next MaybeSource
* @param next the Maybe to subscribe after this Completable is completed, not null
* @return Maybe that composes this Completable and next
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Maybe<T> andThen(MaybeSource<T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new MaybeDelayWithCompletable<T>(next, this));
}
Returns a Completable that first runs this Completable
and then the other completable.
This is an alias for concatWith(CompletableSource)
.
- Scheduler:
andThen
does not operate by default on a particular Scheduler
.
Params: - next – the other Completable, not null
Throws: - NullPointerException – if other is null
Returns: the new Completable instance
/**
* Returns a Completable that first runs this Completable
* and then the other completable.
* <p>
* <img width="640" height="437" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.andThen.c.png" alt="">
* <p>
* This is an alias for {@link #concatWith(CompletableSource)}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param next the other Completable, not null
* @return the new Completable instance
* @throws NullPointerException if other is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable andThen(CompletableSource next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, next));
}
Calls the specified converter function during assembly time and returns its resulting value.
This allows fluent conversion to any other type.
- Scheduler:
as
does not operate by default on a particular Scheduler
.
History: 2.1.7 - experimental
Params: - converter – the function that receives the current Completable instance and returns a value
Type parameters: - <R> – the resulting object type
Throws: - NullPointerException – if converter is null
Returns: the converted value Since: 2.2
/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* <img width="640" height="751" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.as.png" alt="">
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.7 - experimental
* @param <R> the resulting object type
* @param converter the function that receives the current Completable instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull CompletableConverter<? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}
Subscribes to and awaits the termination of this Completable instance in a blocking manner and
rethrows any exception emitted.
- Scheduler:
blockingAwait
does not operate by default on a particular Scheduler
.
- Error handling:
- If the source signals an error, the operator wraps a checked
Exception
into RuntimeException
and throws that. Otherwise, RuntimeException
s and Error
s are rethrown as they are.
Throws: - RuntimeException – wrapping an InterruptedException if the current thread is interrupted
/**
* Subscribes to and awaits the termination of this Completable instance in a blocking manner and
* rethrows any exception emitted.
* <p>
* <img width="640" height="432" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingAwait.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingAwait} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
* @throws RuntimeException wrapping an InterruptedException if the current thread is interrupted
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingAwait() {
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
observer.blockingGet();
}
Subscribes to and awaits the termination of this Completable instance in a blocking manner
with a specific timeout and rethrows any exception emitted within the timeout window.
- Scheduler:
blockingAwait
does not operate by default on a particular Scheduler
.
- Error handling:
- If the source signals an error, the operator wraps a checked
Exception
into RuntimeException
and throws that. Otherwise, RuntimeException
s and Error
s are rethrown as they are.
Params: - timeout – the timeout value
- unit – the timeout unit
Throws: - RuntimeException – wrapping an InterruptedException if the current thread is interrupted
Returns: true if the this Completable instance completed normally within the time limit,
false if the timeout elapsed before this Completable terminated.
/**
* Subscribes to and awaits the termination of this Completable instance in a blocking manner
* with a specific timeout and rethrows any exception emitted within the timeout window.
* <p>
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingAwait.t.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingAwait} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the timeout unit
* @return true if the this Completable instance completed normally within the time limit,
* false if the timeout elapsed before this Completable terminated.
* @throws RuntimeException wrapping an InterruptedException if the current thread is interrupted
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final boolean blockingAwait(long timeout, TimeUnit unit) {
ObjectHelper.requireNonNull(unit, "unit is null");
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
return observer.blockingAwait(timeout, unit);
}
Subscribes to this Completable instance and blocks until it terminates, then returns null or
the emitted exception if any.
- Scheduler:
blockingGet
does not operate by default on a particular Scheduler
.
Throws: - RuntimeException – that wraps an InterruptedException if the wait is interrupted
Returns: the throwable if this terminated with an error, null otherwise
/**
* Subscribes to this Completable instance and blocks until it terminates, then returns null or
* the emitted exception if any.
* <p>
* <img width="640" height="435" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingGet.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the throwable if this terminated with an error, null otherwise
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted
*/
@Nullable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Throwable blockingGet() {
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
return observer.blockingGetError();
}
Subscribes to this Completable instance and blocks until it terminates or the specified timeout
elapses, then returns null for normal termination or the emitted exception if any.
- Scheduler:
blockingGet
does not operate by default on a particular Scheduler
.
Params: - timeout – the timeout value
- unit – the time unit
Throws: - RuntimeException – that wraps an InterruptedException if the wait is interrupted or
TimeoutException if the specified timeout elapsed before it
Returns: the throwable if this terminated with an error, null otherwise
/**
* Subscribes to this Completable instance and blocks until it terminates or the specified timeout
* elapses, then returns null for normal termination or the emitted exception if any.
* <p>
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingGet.t.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the time unit
* @return the throwable if this terminated with an error, null otherwise
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted or
* TimeoutException if the specified timeout elapsed before it
*/
@Nullable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Throwable blockingGet(long timeout, TimeUnit unit) {
ObjectHelper.requireNonNull(unit, "unit is null");
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
return observer.blockingGetError(timeout, unit);
}
Subscribes to this Completable only once, when the first CompletableObserver
subscribes to the result Completable, caches its terminal event
and relays/replays it to observers.
Note that this operator doesn't allow disposing the connection
of the upstream source.
- Scheduler:
cache
does not operate by default on a particular Scheduler
.
History: 2.0.4 - experimental
Returns: the new Completable instance Since: 2.1
/**
* Subscribes to this Completable only once, when the first CompletableObserver
* subscribes to the result Completable, caches its terminal event
* and relays/replays it to observers.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.cache.png" alt="">
* <p>
* Note that this operator doesn't allow disposing the connection
* of the upstream source.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.0.4 - experimental
* @return the new Completable instance
* @since 2.1
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable cache() {
return RxJavaPlugins.onAssembly(new CompletableCache(this));
}
Calls the given transformer function with this instance and returns the function's resulting
Completable.
- Scheduler:
compose
does not operate by default on a particular Scheduler
.
Params: - transformer – the transformer function, not null
Throws: - NullPointerException – if transformer is null
Returns: the Completable returned by the function
/**
* Calls the given transformer function with this instance and returns the function's resulting
* Completable.
* <p>
* <img width="640" height="625" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.compose.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code compose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param transformer the transformer function, not null
* @return the Completable returned by the function
* @throws NullPointerException if transformer is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable compose(CompletableTransformer transformer) {
return wrap(ObjectHelper.requireNonNull(transformer, "transformer is null").apply(this));
}
Concatenates this Completable with another Completable.
- Scheduler:
concatWith
does not operate by default on a particular Scheduler
.
Params: - other – the other Completable, not null
Throws: - NullPointerException – if other is null
See Also: Returns: the new Completable which subscribes to this and then the other Completable
/**
* Concatenates this Completable with another Completable.
* <p>
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatWith.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other Completable, not null
* @return the new Completable which subscribes to this and then the other Completable
* @throws NullPointerException if other is null
* @see #andThen(MaybeSource)
* @see #andThen(ObservableSource)
* @see #andThen(SingleSource)
* @see #andThen(Publisher)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatWith(CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new CompletableAndThenCompletable(this, other));
}
Returns a Completable which delays the emission of the completion event by the given time.
- Scheduler:
delay
does operate by default on the computation
Scheduler
.
Params: - delay – the delay time
- unit – the delay unit
Throws: - NullPointerException – if unit is null
Returns: the new Completable instance
/**
* Returns a Completable which delays the emission of the completion event by the given time.
* <p>
* <img width="640" height="343" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.delay.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} does operate by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param delay the delay time
* @param unit the delay unit
* @return the new Completable instance
* @throws NullPointerException if unit is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Completable delay(long delay, TimeUnit unit) {
return delay(delay, unit, Schedulers.computation(), false);
}
Returns a Completable which delays the emission of the completion event by the given time while
running on the specified scheduler.
- Scheduler:
delay
operates on the Scheduler
you specify.
Params: - delay – the delay time
- unit – the delay unit
- scheduler – the scheduler to run the delayed completion on
Throws: - NullPointerException – if unit or scheduler is null
Returns: the new Completable instance
/**
* Returns a Completable which delays the emission of the completion event by the given time while
* running on the specified scheduler.
* <p>
* <img width="640" height="313" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.delay.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} operates on the {@link Scheduler} you specify.</dd>
* </dl>
* @param delay the delay time
* @param unit the delay unit
* @param scheduler the scheduler to run the delayed completion on
* @return the new Completable instance
* @throws NullPointerException if unit or scheduler is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable delay(long delay, TimeUnit unit, Scheduler scheduler) {
return delay(delay, unit, scheduler, false);
}
Returns a Completable which delays the emission of the completion event, and optionally the error as well, by the given time while
running on the specified scheduler.
- Scheduler:
delay
operates on the Scheduler
you specify.
Params: - delay – the delay time
- unit – the delay unit
- scheduler – the scheduler to run the delayed completion on
- delayError – delay the error emission as well?
Throws: - NullPointerException – if unit or scheduler is null
Returns: the new Completable instance
/**
* Returns a Completable which delays the emission of the completion event, and optionally the error as well, by the given time while
* running on the specified scheduler.
* <p>
* <img width="640" height="253" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.delay.sb.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} operates on the {@link Scheduler} you specify.</dd>
* </dl>
* @param delay the delay time
* @param unit the delay unit
* @param scheduler the scheduler to run the delayed completion on
* @param delayError delay the error emission as well?
* @return the new Completable instance
* @throws NullPointerException if unit or scheduler is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable delay(final long delay, final TimeUnit unit, final Scheduler scheduler, final boolean delayError) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new CompletableDelay(this, delay, unit, scheduler, delayError));
}
Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time.
- Scheduler:
- This version of
delaySubscription
operates by default on the computation
Scheduler
.
Params: - delay – the time to delay the subscription
- unit – the time unit of
delay
See Also: Returns: a Completable that delays the subscription to the source CompletableSource by the given amount Since: 2.2.3 - experimental
/**
* Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time.
* <p>
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.delaySubscription.t.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delaySubscription} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param delay the time to delay the subscription
* @param unit the time unit of {@code delay}
* @return a Completable that delays the subscription to the source CompletableSource by the given amount
* @since 2.2.3 - experimental
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Completable delaySubscription(long delay, TimeUnit unit) {
return delaySubscription(delay, unit, Schedulers.computation());
}
Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time,
both waiting and subscribing on a given Scheduler.
- Scheduler:
- You specify which
Scheduler
this operator will use.
Params: - delay – the time to delay the subscription
- unit – the time unit of
delay
- scheduler – the Scheduler on which the waiting and subscription will happen
See Also: Returns: a Completable that delays the subscription to the source CompletableSource by a given
amount, waiting and subscribing on the given Scheduler Since: 2.2.3 - experimental
/**
* Returns a Completable that delays the subscription to the source CompletableSource by a given amount of time,
* both waiting and subscribing on a given Scheduler.
* <p>
* <img width="640" height="420" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.delaySubscription.ts.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param delay the time to delay the subscription
* @param unit the time unit of {@code delay}
* @param scheduler the Scheduler on which the waiting and subscription will happen
* @return a Completable that delays the subscription to the source CompletableSource by a given
* amount, waiting and subscribing on the given Scheduler
* @since 2.2.3 - experimental
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@CheckReturnValue
@Experimental
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
return Completable.timer(delay, unit, scheduler).andThen(this);
}
Returns a Completable which calls the given onComplete callback if this Completable completes.
- Scheduler:
doOnComplete
does not operate by default on a particular Scheduler
.
Params: - onComplete – the callback to call when this emits an onComplete event
Throws: - NullPointerException – if onComplete is null
See Also: Returns: the new Completable instance
/**
* Returns a Completable which calls the given onComplete callback if this Completable completes.
* <p>
* <img width="640" height="304" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnComplete.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onComplete the callback to call when this emits an onComplete event
* @return the new Completable instance
* @throws NullPointerException if onComplete is null
* @see #doFinally(Action)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doOnComplete(Action onComplete) {
return doOnLifecycle(Functions.emptyConsumer(), Functions.emptyConsumer(),
onComplete, Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
Calls the shared Action
if a CompletableObserver subscribed to the current Completable disposes the common Disposable it received via onSubscribe.
- Scheduler:
doOnDispose
does not operate by default on a particular Scheduler
.
Params: - onDispose – the action to call when the child subscriber disposes the subscription
Throws: - NullPointerException – if onDispose is null
Returns: the new Completable instance
/**
* Calls the shared {@code Action} if a CompletableObserver subscribed to the current
* Completable disposes the common Disposable it received via onSubscribe.
* <p>
* <img width="640" height="589" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnDispose.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnDispose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onDispose the action to call when the child subscriber disposes the subscription
* @return the new Completable instance
* @throws NullPointerException if onDispose is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doOnDispose(Action onDispose) {
return doOnLifecycle(Functions.emptyConsumer(), Functions.emptyConsumer(),
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION, onDispose);
}
Returns a Completable which calls the given onError callback if this Completable emits an error.
- Scheduler:
doOnError
does not operate by default on a particular Scheduler
.
Params: - onError – the error callback
Throws: - NullPointerException – if onError is null
See Also: Returns: the new Completable instance
/**
* Returns a Completable which calls the given onError callback if this Completable emits an error.
* <p>
* <img width="640" height="304" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnError.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onError the error callback
* @return the new Completable instance
* @throws NullPointerException if onError is null
* @see #doFinally(Action)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doOnError(Consumer<? super Throwable> onError) {
return doOnLifecycle(Functions.emptyConsumer(), onError,
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
Returns a Completable which calls the given onEvent callback with the (throwable) for an onError
or (null) for an onComplete signal from this Completable before delivering said signal to the downstream.
- Scheduler:
doOnEvent
does not operate by default on a particular Scheduler
.
Params: - onEvent – the event callback
Throws: - NullPointerException – if onEvent is null
Returns: the new Completable instance
/**
* Returns a Completable which calls the given onEvent callback with the (throwable) for an onError
* or (null) for an onComplete signal from this Completable before delivering said signal to the downstream.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnEvent.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnEvent} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onEvent the event callback
* @return the new Completable instance
* @throws NullPointerException if onEvent is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doOnEvent(final Consumer<? super Throwable> onEvent) {
ObjectHelper.requireNonNull(onEvent, "onEvent is null");
return RxJavaPlugins.onAssembly(new CompletableDoOnEvent(this, onEvent));
}
Returns a Completable instance that calls the various callbacks on the specific
lifecycle events.
- Scheduler:
doOnLifecycle
does not operate by default on a particular Scheduler
.
Params: - onSubscribe – the consumer called when a CompletableSubscriber subscribes.
- onError – the consumer called when this emits an onError event
- onComplete – the runnable called just before when this Completable completes normally
- onAfterTerminate – the runnable called after this Completable completes normally
- onDispose – the runnable called when the child disposes the subscription
Returns: the new Completable instance
/**
* Returns a Completable instance that calls the various callbacks on the specific
* lifecycle events.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onSubscribe the consumer called when a CompletableSubscriber subscribes.
* @param onError the consumer called when this emits an onError event
* @param onComplete the runnable called just before when this Completable completes normally
* @param onAfterTerminate the runnable called after this Completable completes normally
* @param onDispose the runnable called when the child disposes the subscription
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
private Completable doOnLifecycle(
final Consumer<? super Disposable> onSubscribe,
final Consumer<? super Throwable> onError,
final Action onComplete,
final Action onTerminate,
final Action onAfterTerminate,
final Action onDispose) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
ObjectHelper.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new CompletablePeek(this, onSubscribe, onError, onComplete, onTerminate, onAfterTerminate, onDispose));
}
Returns a Completable instance that calls the given onSubscribe callback with the disposable
that child subscribers receive on subscription.
- Scheduler:
doOnSubscribe
does not operate by default on a particular Scheduler
.
Params: - onSubscribe – the callback called when a child subscriber subscribes
Throws: - NullPointerException – if onSubscribe is null
Returns: the new Completable instance
/**
* Returns a Completable instance that calls the given onSubscribe callback with the disposable
* that child subscribers receive on subscription.
* <p>
* <img width="640" height="304" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnSubscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onSubscribe the callback called when a child subscriber subscribes
* @return the new Completable instance
* @throws NullPointerException if onSubscribe is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doOnSubscribe(Consumer<? super Disposable> onSubscribe) {
return doOnLifecycle(onSubscribe, Functions.emptyConsumer(),
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
Returns a Completable instance that calls the given onTerminate callback just before this Completable
completes normally or with an exception.
- Scheduler:
doOnTerminate
does not operate by default on a particular Scheduler
.
Params: - onTerminate – the callback to call just before this Completable terminates
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that calls the given onTerminate callback just before this Completable
* completes normally or with an exception.
* <p>
* <img width="640" height="304" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnTerminate.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onTerminate the callback to call just before this Completable terminates
* @return the new Completable instance
* @see #doFinally(Action)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doOnTerminate(final Action onTerminate) {
return doOnLifecycle(Functions.emptyConsumer(), Functions.emptyConsumer(),
Functions.EMPTY_ACTION, onTerminate,
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}
Returns a Completable instance that calls the given onTerminate callback after this Completable
completes normally or with an exception.
- Scheduler:
doAfterTerminate
does not operate by default on a particular Scheduler
.
Params: - onAfterTerminate – the callback to call after this Completable terminates
See Also: Returns: the new Completable instance
/**
* Returns a Completable instance that calls the given onTerminate callback after this Completable
* completes normally or with an exception.
* <p>
* <img width="640" height="304" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doAfterTerminate.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onAfterTerminate the callback to call after this Completable terminates
* @return the new Completable instance
* @see #doFinally(Action)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doAfterTerminate(final Action onAfterTerminate) {
return doOnLifecycle(
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION,
onAfterTerminate,
Functions.EMPTY_ACTION);
}
Calls the specified action after this Completable signals onError or onComplete or gets disposed by
the downstream.
In case of a race between a terminal event and a dispose call, the provided onFinally
action is executed once per subscription.
Note that the onFinally
action is shared between subscriptions and as such should be thread-safe.
- Scheduler:
doFinally
does not operate by default on a particular Scheduler
.
History: 2.0.1 - experimental
Params: - onFinally – the action called when this Completable terminates or gets disposed
Returns: the new Completable instance Since: 2.1
/**
* Calls the specified action after this Completable signals onError or onComplete or gets disposed by
* the downstream.
* <p>
* <img width="640" height="331" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doFinally.png" alt="">
* <p>
* In case of a race between a terminal event and a dispose call, the provided {@code onFinally} action
* is executed once per subscription.
* <p>
* Note that the {@code onFinally} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doFinally} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.0.1 - experimental
* @param onFinally the action called when this Completable terminates or gets disposed
* @return the new Completable instance
* @since 2.1
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable doFinally(Action onFinally) {
ObjectHelper.requireNonNull(onFinally, "onFinally is null");
return RxJavaPlugins.onAssembly(new CompletableDoFinally(this, onFinally));
}
This method requires advanced knowledge about building operators, please consider
other standard composition methods first; Returns a Completable
which, when subscribed to, invokes the apply(CompletableObserver)
method of the provided CompletableOperator
for each individual downstream Completable
and allows the insertion of a custom operator by accessing the downstream's CompletableObserver
during this subscription phase and providing a new CompletableObserver
, containing the custom operator's intended business logic, that will be used in the subscription process going further upstream.
Generally, such a new CompletableObserver
will wrap the downstream's CompletableObserver
and forwards the onError
and onComplete
events from the upstream directly or according to the emission pattern the custom operator's business logic requires. In addition, such operator can intercept the flow control calls of dispose
and isDisposed
that would have traveled upstream and perform additional actions depending on the same business logic requirements.
Example:
// Step 1: Create the consumer type that will be returned by the CompletableOperator.apply():
public final class CustomCompletableObserver implements CompletableObserver, Disposable {
// The downstream's CompletableObserver that will receive the onXXX events
final CompletableObserver downstream;
// The connection to the upstream source that will call this class' onXXX methods
Disposable upstream;
// The constructor takes the downstream subscriber and usually any other parameters
public CustomCompletableObserver(CompletableObserver downstream) {
this.downstream = downstream;
}
// In the subscription phase, the upstream sends a Disposable to this class
// and subsequently this class has to send a Disposable to the downstream.
// Note that relaying the upstream's Disposable directly is not allowed in RxJava
@Override
public void onSubscribe(Disposable d) {
if (upstream != null) {
d.dispose();
} else {
upstream = d;
downstream.onSubscribe(this);
}
}
// Some operators may handle the upstream's error while others
// could just forward it to the downstream.
@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}
// When the upstream completes, usually the downstream should complete as well.
// In completable, this could also mean doing some side-effects
@Override
public void onComplete() {
System.out.println("Sequence completed");
downstream.onComplete();
}
// Some operators may use their own resources which should be cleaned up if
// the downstream disposes the flow before it completed. Operators without
// resources can simply forward the dispose to the upstream.
// In some cases, a disposed flag may be set by this method so that other parts
// of this class may detect the dispose and stop sending events
// to the downstream.
@Override
public void dispose() {
upstream.dispose();
}
// Some operators may simply forward the call to the upstream while others
// can return the disposed flag set in dispose().
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
// Step 2: Create a class that implements the CompletableOperator interface and
// returns the custom consumer type from above in its apply() method.
// Such class may define additional parameters to be submitted to
// the custom consumer type.
final class CustomCompletableOperator implements CompletableOperator {
@Override
public CompletableObserver apply(CompletableObserver upstream) {
return new CustomCompletableObserver(upstream);
}
}
// Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
// or reusing an existing one.
Completable.complete()
.lift(new CustomCompletableOperator())
.test()
.assertResult();
Creating custom operators can be complicated and it is recommended one consults the
RxJava wiki: Writing operators page about
the tools, requirements, rules, considerations and pitfalls of implementing them.
Note that implementing custom operators via this lift()
method adds slightly more overhead by requiring an additional allocation and indirection per assembled flows. Instead, extending the abstract Completable
class and creating a CompletableTransformer
with it is recommended.
Note also that it is not possible to stop the subscription phase in lift()
as the apply()
method requires a non-null CompletableObserver
instance to be returned, which is then unconditionally subscribed to the upstream Completable
. For example, if the operator decided there is no reason to subscribe to the upstream source because of some optimization possibility or a failure to prepare the operator, it still has to return a CompletableObserver
that should immediately dispose the upstream's Disposable
in its onSubscribe
method. Again, using a CompletableTransformer
and extending the Completable
is a better option as subscribeActual
can decide to not subscribe to its upstream after all.
- Scheduler:
lift
does not operate by default on a particular Scheduler
, however, the CompletableOperator
may use a Scheduler
to support its own asynchronous behavior.
Params: - onLift – the
CompletableOperator
that receives the downstream's CompletableObserver
and should return a CompletableObserver
with custom behavior to be used as the consumer for the current Completable
.
See Also: Returns: the new Completable instance
/**
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
* Returns a {@code Completable} which, when subscribed to, invokes the {@link CompletableOperator#apply(CompletableObserver) apply(CompletableObserver)} method
* of the provided {@link CompletableOperator} for each individual downstream {@link Completable} and allows the
* insertion of a custom operator by accessing the downstream's {@link CompletableObserver} during this subscription phase
* and providing a new {@code CompletableObserver}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* <img width="640" height="313" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.lift.png" alt="">
* <p>
* Generally, such a new {@code CompletableObserver} will wrap the downstream's {@code CompletableObserver} and forwards the
* {@code onError} and {@code onComplete} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code dispose} and {@code isDisposed} that would have traveled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* Example:
* <pre><code>
* // Step 1: Create the consumer type that will be returned by the CompletableOperator.apply():
*
* public final class CustomCompletableObserver implements CompletableObserver, Disposable {
*
* // The downstream's CompletableObserver that will receive the onXXX events
* final CompletableObserver downstream;
*
* // The connection to the upstream source that will call this class' onXXX methods
* Disposable upstream;
*
* // The constructor takes the downstream subscriber and usually any other parameters
* public CustomCompletableObserver(CompletableObserver downstream) {
* this.downstream = downstream;
* }
*
* // In the subscription phase, the upstream sends a Disposable to this class
* // and subsequently this class has to send a Disposable to the downstream.
* // Note that relaying the upstream's Disposable directly is not allowed in RxJava
* @Override
* public void onSubscribe(Disposable d) {
* if (upstream != null) {
* d.dispose();
* } else {
* upstream = d;
* downstream.onSubscribe(this);
* }
* }
*
* // Some operators may handle the upstream's error while others
* // could just forward it to the downstream.
* @Override
* public void onError(Throwable throwable) {
* downstream.onError(throwable);
* }
*
* // When the upstream completes, usually the downstream should complete as well.
* // In completable, this could also mean doing some side-effects
* @Override
* public void onComplete() {
* System.out.println("Sequence completed");
* downstream.onComplete();
* }
*
* // Some operators may use their own resources which should be cleaned up if
* // the downstream disposes the flow before it completed. Operators without
* // resources can simply forward the dispose to the upstream.
* // In some cases, a disposed flag may be set by this method so that other parts
* // of this class may detect the dispose and stop sending events
* // to the downstream.
* @Override
* public void dispose() {
* upstream.dispose();
* }
*
* // Some operators may simply forward the call to the upstream while others
* // can return the disposed flag set in dispose().
* @Override
* public boolean isDisposed() {
* return upstream.isDisposed();
* }
* }
*
* // Step 2: Create a class that implements the CompletableOperator interface and
* // returns the custom consumer type from above in its apply() method.
* // Such class may define additional parameters to be submitted to
* // the custom consumer type.
*
* final class CustomCompletableOperator implements CompletableOperator {
* @Override
* public CompletableObserver apply(CompletableObserver upstream) {
* return new CustomCompletableObserver(upstream);
* }
* }
*
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
* // or reusing an existing one.
*
* Completable.complete()
* .lift(new CustomCompletableOperator())
* .test()
* .assertResult();
* </code></pre>
* <p>
* Creating custom operators can be complicated and it is recommended one consults the
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
* the tools, requirements, rules, considerations and pitfalls of implementing them.
* <p>
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Completable}
* class and creating a {@link CompletableTransformer} with it is recommended.
* <p>
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
* requires a non-null {@code CompletableObserver} instance to be returned, which is then unconditionally subscribed to
* the upstream {@code Completable}. For example, if the operator decided there is no reason to subscribe to the
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
* return a {@code CompletableObserver} that should immediately dispose the upstream's {@code Disposable} in its
* {@code onSubscribe} method. Again, using a {@code CompletableTransformer} and extending the {@code Completable} is
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
* {@link CompletableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
* </dl>
*
* @param onLift the {@link CompletableOperator} that receives the downstream's {@code CompletableObserver} and should return
* a {@code CompletableObserver} with custom behavior to be used as the consumer for the current
* {@code Completable}.
* @return the new Completable instance
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
* @see #compose(CompletableTransformer)
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable lift(final CompletableOperator onLift) {
ObjectHelper.requireNonNull(onLift, "onLift is null");
return RxJavaPlugins.onAssembly(new CompletableLift(this, onLift));
}
Maps the signal types of this Completable into a Notification
of the same kind and emits it as a single success value to downstream.
- Scheduler:
materialize
does not operate by default on a particular Scheduler
.
Type parameters: - <T> – the intended target element type of the notification
See Also: Returns: the new Single instance Since: 2.2.4 - experimental
/**
* Maps the signal types of this Completable into a {@link Notification} of the same kind
* and emits it as a single success value to downstream.
* <p>
* <img width="640" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/materialize.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code materialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the intended target element type of the notification
* @return the new Single instance
* @since 2.2.4 - experimental
* @see Single#dematerialize(Function)
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Single<Notification<T>> materialize() {
return RxJavaPlugins.onAssembly(new CompletableMaterialize<T>(this));
}
Returns a Completable which subscribes to this and the other Completable and completes
when both of them complete or one emits an error.
- Scheduler:
mergeWith
does not operate by default on a particular Scheduler
.
Params: - other – the other Completable instance
Throws: - NullPointerException – if other is null
Returns: the new Completable instance
/**
* Returns a Completable which subscribes to this and the other Completable and completes
* when both of them complete or one emits an error.
* <p>
* <img width="640" height="442" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeWith.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other Completable instance
* @return the new Completable instance
* @throws NullPointerException if other is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable mergeWith(CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return mergeArray(this, other);
}
Returns a Completable which emits the terminal events from the thread of the specified scheduler.
- Scheduler:
observeOn
operates on a Scheduler
you specify.
Params: - scheduler – the scheduler to emit terminal events on
Throws: - NullPointerException – if scheduler is null
Returns: the new Completable instance
/**
* Returns a Completable which emits the terminal events from the thread of the specified scheduler.
* <p>
* <img width="640" height="523" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code observeOn} operates on a {@link Scheduler} you specify.</dd>
* </dl>
* @param scheduler the scheduler to emit terminal events on
* @return the new Completable instance
* @throws NullPointerException if scheduler is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable observeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new CompletableObserveOn(this, scheduler));
}
Returns a Completable instance that if this Completable emits an error, it will emit an onComplete
and swallow the throwable.
- Scheduler:
onErrorComplete
does not operate by default on a particular Scheduler
.
Returns: the new Completable instance
/**
* Returns a Completable instance that if this Completable emits an error, it will emit an onComplete
* and swallow the throwable.
* <p>
* <img width="640" height="585" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorComplete.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Completable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable onErrorComplete() {
return onErrorComplete(Functions.alwaysTrue());
}
Returns a Completable instance that if this Completable emits an error and the predicate returns
true, it will emit an onComplete and swallow the throwable.
- Scheduler:
onErrorComplete
does not operate by default on a particular Scheduler
.
Params: - predicate – the predicate to call when an Throwable is emitted which should return true
if the Throwable should be swallowed and replaced with an onComplete.
Returns: the new Completable instance
/**
* Returns a Completable instance that if this Completable emits an error and the predicate returns
* true, it will emit an onComplete and swallow the throwable.
* <p>
* <img width="640" height="283" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorComplete.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorComplete} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate to call when an Throwable is emitted which should return true
* if the Throwable should be swallowed and replaced with an onComplete.
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable onErrorComplete(final Predicate<? super Throwable> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new CompletableOnErrorComplete(this, predicate));
}
Returns a Completable instance that when encounters an error from this Completable, calls the
specified mapper function that returns another Completable instance for it and resumes the
execution with it.
- Scheduler:
onErrorResumeNext
does not operate by default on a particular Scheduler
.
Params: - errorMapper – the mapper function that takes the error and should return a Completable as
continuation.
Returns: the new Completable instance
/**
* Returns a Completable instance that when encounters an error from this Completable, calls the
* specified mapper function that returns another Completable instance for it and resumes the
* execution with it.
* <p>
* <img width="640" height="426" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorResumeNext.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param errorMapper the mapper function that takes the error and should return a Completable as
* continuation.
* @return the new Completable instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable onErrorResumeNext(final Function<? super Throwable, ? extends CompletableSource> errorMapper) {
ObjectHelper.requireNonNull(errorMapper, "errorMapper is null");
return RxJavaPlugins.onAssembly(new CompletableResumeNext(this, errorMapper));
}
Nulls out references to the upstream producer and downstream CompletableObserver if
the sequence is terminated or downstream calls dispose().
- Scheduler:
onTerminateDetach
does not operate by default on a particular Scheduler
.
History: 2.1.5 - experimental
Returns: a Completable which nulls out references to the upstream producer and downstream CompletableObserver if
the sequence is terminated or downstream calls dispose() Since: 2.2
/**
* Nulls out references to the upstream producer and downstream CompletableObserver if
* the sequence is terminated or downstream calls dispose().
* <p>
* <img width="640" height="326" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onTerminateDetach.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.5 - experimental
* @return a Completable which nulls out references to the upstream producer and downstream CompletableObserver if
* the sequence is terminated or downstream calls dispose()
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable onTerminateDetach() {
return RxJavaPlugins.onAssembly(new CompletableDetach(this));
}
Returns a Completable that repeatedly subscribes to this Completable until disposed.
- Scheduler:
repeat
does not operate by default on a particular Scheduler
.
Returns: the new Completable instance
/**
* Returns a Completable that repeatedly subscribes to this Completable until disposed.
* <p>
* <img width="640" height="373" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.repeat.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Completable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable repeat() {
return fromPublisher(toFlowable().repeat());
}
Returns a Completable that subscribes repeatedly at most the given times to this Completable.
- Scheduler:
repeat
does not operate by default on a particular Scheduler
.
Params: - times – the number of times the resubscription should happen
Throws: - IllegalArgumentException – if times is less than zero
Returns: the new Completable instance
/**
* Returns a Completable that subscribes repeatedly at most the given times to this Completable.
* <p>
* <img width="640" height="408" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.repeat.n.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times the resubscription should happen
* @return the new Completable instance
* @throws IllegalArgumentException if times is less than zero
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable repeat(long times) {
return fromPublisher(toFlowable().repeat(times));
}
Returns a Completable that repeatedly subscribes to this Completable so long as the given
stop supplier returns false.
- Scheduler:
repeatUntil
does not operate by default on a particular Scheduler
.
Params: - stop – the supplier that should return true to stop resubscribing.
Throws: - NullPointerException – if stop is null
Returns: the new Completable instance
/**
* Returns a Completable that repeatedly subscribes to this Completable so long as the given
* stop supplier returns false.
* <p>
* <img width="640" height="381" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.repeatUntil.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param stop the supplier that should return true to stop resubscribing.
* @return the new Completable instance
* @throws NullPointerException if stop is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable repeatUntil(BooleanSupplier stop) {
return fromPublisher(toFlowable().repeatUntil(stop));
}
Returns a Completable instance that repeats when the Publisher returned by the handler
emits an item or completes when this Publisher emits a completed event.
- Scheduler:
repeatWhen
does not operate by default on a particular Scheduler
.
Params: - handler – the function that transforms the stream of values indicating the completion of
this Completable and returns a Publisher that emits items for repeating or completes to indicate the
repetition should stop
Throws: - NullPointerException – if stop is null
Returns: the new Completable instance
/**
* Returns a Completable instance that repeats when the Publisher returned by the handler
* emits an item or completes when this Publisher emits a completed event.
* <p>
* <img width="640" height="586" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.repeatWhen.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the function that transforms the stream of values indicating the completion of
* this Completable and returns a Publisher that emits items for repeating or completes to indicate the
* repetition should stop
* @return the new Completable instance
* @throws NullPointerException if stop is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().repeatWhen(handler));
}
Returns a Completable that retries this Completable as long as it emits an onError event.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Returns: the new Completable instance
/**
* Returns a Completable that retries this Completable as long as it emits an onError event.
* <p>
* <img width="640" height="368" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retry.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new Completable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry() {
return fromPublisher(toFlowable().retry());
}
Returns a Completable that retries this Completable in case of an error as long as the predicate
returns true.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Params: - predicate – the predicate called when this emits an error with the repeat count and the latest exception
and should return true to retry.
Returns: the new Completable instance
/**
* Returns a Completable that retries this Completable in case of an error as long as the predicate
* returns true.
* <p>
* <img width="640" height="325" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retry.ff.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate called when this emits an error with the repeat count and the latest exception
* and should return true to retry.
* @return the new Completable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
return fromPublisher(toFlowable().retry(predicate));
}
Returns a Completable that when this Completable emits an error, retries at most the given
number of times before giving up and emitting the last error.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Params: - times – the number of times to resubscribe if the current Completable fails
Throws: - IllegalArgumentException – if times is negative
Returns: the new Completable instance
/**
* Returns a Completable that when this Completable emits an error, retries at most the given
* number of times before giving up and emitting the last error.
* <p>
* <img width="640" height="451" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retry.n.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param times the number of times to resubscribe if the current Completable fails
* @return the new Completable instance
* @throws IllegalArgumentException if times is negative
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry(long times) {
return fromPublisher(toFlowable().retry(times));
}
Returns a Completable that when this Completable emits an error, retries at most times
or until the predicate returns false, whichever happens first and emitting the last error.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
History: 2.1.8 - experimental
Params: - times – the number of times to resubscribe if the current Completable fails
- predicate – the predicate that is called with the latest throwable and should return
true to indicate the returned Completable should resubscribe to this Completable.
Throws: - NullPointerException – if predicate is null
- IllegalArgumentException – if times is negative
Returns: the new Completable instance Since: 2.2
/**
* Returns a Completable that when this Completable emits an error, retries at most times
* or until the predicate returns false, whichever happens first and emitting the last error.
* <p>
* <img width="640" height="361" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retry.nf.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.1.8 - experimental
* @param times the number of times to resubscribe if the current Completable fails
* @param predicate the predicate that is called with the latest throwable and should return
* true to indicate the returned Completable should resubscribe to this Completable.
* @return the new Completable instance
* @throws NullPointerException if predicate is null
* @throws IllegalArgumentException if times is negative
* @since 2.2
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry(long times, Predicate<? super Throwable> predicate) {
return fromPublisher(toFlowable().retry(times, predicate));
}
Returns a Completable that when this Completable emits an error, calls the given predicate with
the latest exception to decide whether to resubscribe to this or not.
- Scheduler:
retry
does not operate by default on a particular Scheduler
.
Params: - predicate – the predicate that is called with the latest throwable and should return
true to indicate the returned Completable should resubscribe to this Completable.
Throws: - NullPointerException – if predicate is null
Returns: the new Completable instance
/**
* Returns a Completable that when this Completable emits an error, calls the given predicate with
* the latest exception to decide whether to resubscribe to this or not.
* <p>
* <img width="640" height="336" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retry.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param predicate the predicate that is called with the latest throwable and should return
* true to indicate the returned Completable should resubscribe to this Completable.
* @return the new Completable instance
* @throws NullPointerException if predicate is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retry(Predicate<? super Throwable> predicate) {
return fromPublisher(toFlowable().retry(predicate));
}
Returns a Completable which given a Publisher and when this Completable emits an error, delivers
that error through a Flowable and the Publisher should signal a value indicating a retry in response
or a terminal event indicating a termination.
Note that the inner Publisher
returned by the handler function should signal either onNext
, onError
or onComplete
in response to the received Throwable
to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner Publisher
signals onError
or onComplete
while the upstream is active, the sequence is terminated with the same signal immediately.
The following example demonstrates how to retry an asynchronous source with a delay:
Completable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.doOnComplete(() -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Flowable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingAwait();
- Scheduler:
retryWhen
does not operate by default on a particular Scheduler
.
Params: - handler – the handler that receives a Flowable delivering Throwables and should return a Publisher that
emits items to indicate retries or emits terminal events to indicate termination.
Throws: - NullPointerException – if handler is null
Returns: the new Completable instance
/**
* Returns a Completable which given a Publisher and when this Completable emits an error, delivers
* that error through a Flowable and the Publisher should signal a value indicating a retry in response
* or a terminal event indicating a termination.
* <p>
* <img width="640" height="586" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.retryWhen.png" alt="">
* <p>
* Note that the inner {@code Publisher} returned by the handler function should signal
* either {@code onNext}, {@code onError} or {@code onComplete} in response to the received
* {@code Throwable} to indicate the operator should retry or terminate. If the upstream to
* the operator is asynchronous, signalling onNext followed by onComplete immediately may
* result in the sequence to be completed immediately. Similarly, if this inner
* {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
* active, the sequence is terminated with the same signal immediately.
* <p>
* The following example demonstrates how to retry an asynchronous source with a delay:
* <pre><code>
* Completable.timer(1, TimeUnit.SECONDS)
* .doOnSubscribe(s -> System.out.println("subscribing"))
* .doOnComplete(() -> { throw new RuntimeException(); })
* .retryWhen(errors -> {
* AtomicInteger counter = new AtomicInteger();
* return errors
* .takeWhile(e -> counter.getAndIncrement() != 3)
* .flatMap(e -> {
* System.out.println("delay retry by " + counter.get() + " second(s)");
* return Flowable.timer(counter.get(), TimeUnit.SECONDS);
* });
* })
* .blockingAwait();
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code retryWhen} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param handler the handler that receives a Flowable delivering Throwables and should return a Publisher that
* emits items to indicate retries or emits terminal events to indicate termination.
* @return the new Completable instance
* @throws NullPointerException if handler is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler) {
return fromPublisher(toFlowable().retryWhen(handler));
}
Returns a Completable which first runs the other Completable
then this completable if the other completed normally.
- Scheduler:
startWith
does not operate by default on a particular Scheduler
.
Params: - other – the other completable to run first
Throws: - NullPointerException – if other is null
Returns: the new Completable instance
/**
* Returns a Completable which first runs the other Completable
* then this completable if the other completed normally.
* <p>
* <img width="640" height="437" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.startWith.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param other the other completable to run first
* @return the new Completable instance
* @throws NullPointerException if other is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable startWith(CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return concatArray(other, this);
}
Returns an Observable which first delivers the events
of the other Observable then runs this CompletableConsumable.
- Scheduler:
startWith
does not operate by default on a particular Scheduler
.
Params: - other – the other Observable to run first
Type parameters: - <T> – the value type
Throws: - NullPointerException – if other is null
Returns: the new Observable instance
/**
* Returns an Observable which first delivers the events
* of the other Observable then runs this CompletableConsumable.
* <p>
* <img width="640" height="289" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.startWith.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param other the other Observable to run first
* @return the new Observable instance
* @throws NullPointerException if other is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Observable<T> startWith(Observable<T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return other.concatWith(this.<T>toObservable());
}
Returns a Flowable which first delivers the events
of the other Publisher then runs this Completable.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer and expects the other Publisher
to honor it as well.
- Scheduler:
startWith
does not operate by default on a particular Scheduler
.
Params: - other – the other Publisher to run first
Type parameters: - <T> – the value type
Throws: - NullPointerException – if other is null
Returns: the new Flowable instance
/**
* Returns a Flowable which first delivers the events
* of the other Publisher then runs this Completable.
* <p>
* <img width="640" height="250" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.startWith.p.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer
* and expects the other {@code Publisher} to honor it as well.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param other the other Publisher to run first
* @return the new Flowable instance
* @throws NullPointerException if other is null
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Flowable<T> startWith(Publisher<T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return this.<T>toFlowable().startWith(other);
}
Hides the identity of this Completable and its Disposable.
Allows preventing certain identity-based optimizations (fusion).
- Scheduler:
hide
does not operate by default on a particular Scheduler
.
History: 2.0.5 - experimental
Returns: the new Completable instance Since: 2.1
/**
* Hides the identity of this Completable and its Disposable.
* <p>
* <img width="640" height="432" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.hide.png" alt="">
* <p>
* Allows preventing certain identity-based optimizations (fusion).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code hide} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>History: 2.0.5 - experimental
* @return the new Completable instance
* @since 2.1
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable hide() {
return RxJavaPlugins.onAssembly(new CompletableHide(this));
}
Subscribes to this CompletableConsumable and returns a Disposable which can be used to dispose
the subscription.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
Returns: the Disposable that allows disposing the subscription
/**
* Subscribes to this CompletableConsumable and returns a Disposable which can be used to dispose
* the subscription.
* <p>
* <img width="640" height="352" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.subscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the Disposable that allows disposing the subscription
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
EmptyCompletableObserver observer = new EmptyCompletableObserver();
subscribe(observer);
return observer;
}
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(CompletableObserver observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null CompletableObserver. Please check the handler provided to RxJavaPlugins.setOnCompletableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
}
Implement this method to handle the incoming CompletableObserver
s and perform the business logic in your operator. There is no need to call any of the plugin hooks on the current Completable
instance or the CompletableObserver
; all hooks and basic safeguards have been applied by subscribe(CompletableObserver)
before this method gets called.
Params: - observer – the CompletableObserver instance, never null
/**
* Implement this method to handle the incoming {@link CompletableObserver}s and
* perform the business logic in your operator.
* <p>There is no need to call any of the plugin hooks on the current {@code Completable} instance or
* the {@code CompletableObserver}; all hooks and basic safeguards have been
* applied by {@link #subscribe(CompletableObserver)} before this method gets called.
* @param observer the CompletableObserver instance, never null
*/
protected abstract void subscribeActual(CompletableObserver observer);
Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
CompletableObserver as is.
Usage example:
Completable source = Completable.complete().delay(1, TimeUnit.SECONDS);
CompositeDisposable composite = new CompositeDisposable();
DisposableCompletableObserver ds = new DisposableCompletableObserver() {
// ...
};
composite.add(source.subscribeWith(ds));
- Scheduler:
subscribeWith
does not operate by default on a particular Scheduler
.
Params: - observer – the CompletableObserver (subclass) to use and return, not null
Type parameters: - <E> – the type of the CompletableObserver to use and return
Throws: - NullPointerException – if
observer
is null
Returns: the input observer
Since: 2.0
/**
* Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
* CompletableObserver as is.
* <p>
* <img width="640" height="349" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.subscribeWith.png" alt="">
* <p>Usage example:
* <pre><code>
* Completable source = Completable.complete().delay(1, TimeUnit.SECONDS);
* CompositeDisposable composite = new CompositeDisposable();
*
* DisposableCompletableObserver ds = new DisposableCompletableObserver() {
* // ...
* };
*
* composite.add(source.subscribeWith(ds));
* </code></pre>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribeWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <E> the type of the CompletableObserver to use and return
* @param observer the CompletableObserver (subclass) to use and return, not null
* @return the input {@code observer}
* @throws NullPointerException if {@code observer} is null
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends CompletableObserver> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
Subscribes to this Completable and calls back either the onError or onComplete functions.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
Params: - onComplete – the runnable that is called if the Completable completes normally
- onError – the consumer that is called if this Completable emits an error
Throws: - NullPointerException – if either callback is null
Returns: the Disposable that can be used for disposing the subscription asynchronously
/**
* Subscribes to this Completable and calls back either the onError or onComplete functions.
* <p>
* <img width="640" height="352" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.subscribe.ff.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onComplete the runnable that is called if the Completable completes normally
* @param onError the consumer that is called if this Completable emits an error
* @return the Disposable that can be used for disposing the subscription asynchronously
* @throws NullPointerException if either callback is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Action onComplete, final Consumer<? super Throwable> onError) {
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
CallbackCompletableObserver observer = new CallbackCompletableObserver(onError, onComplete);
subscribe(observer);
return observer;
}
Subscribes to this Completable and calls the given Action when this Completable
completes normally.
If the Completable emits an error, it is wrapped into an OnErrorNotImplementedException
and routed to the RxJavaPlugins.onError handler.
- Scheduler:
subscribe
does not operate by default on a particular Scheduler
.
Params: - onComplete – the runnable called when this Completable completes normally
Returns: the Disposable that allows disposing the subscription
/**
* Subscribes to this Completable and calls the given Action when this Completable
* completes normally.
* <p>
* <img width="640" height="352" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.subscribe.f.png" alt="">
* <p>
* If the Completable emits an error, it is wrapped into an
* {@link io.reactivex.exceptions.OnErrorNotImplementedException OnErrorNotImplementedException}
* and routed to the RxJavaPlugins.onError handler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onComplete the runnable called when this Completable completes normally
* @return the Disposable that allows disposing the subscription
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Action onComplete) {
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete);
subscribe(observer);
return observer;
}
Returns a Completable which subscribes the child subscriber on the specified scheduler, making
sure the subscription side-effects happen on that specific thread of the scheduler.
- Scheduler:
subscribeOn
operates on a Scheduler
you specify.
Params: - scheduler – the Scheduler to subscribe on
Throws: - NullPointerException – if scheduler is null
Returns: the new Completable instance
/**
* Returns a Completable which subscribes the child subscriber on the specified scheduler, making
* sure the subscription side-effects happen on that specific thread of the scheduler.
* <p>
* <img width="640" height="686" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.subscribeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribeOn} operates on a {@link Scheduler} you specify.</dd>
* </dl>
* @param scheduler the Scheduler to subscribe on
* @return the new Completable instance
* @throws NullPointerException if scheduler is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable subscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new CompletableSubscribeOn(this, scheduler));
}
Terminates the downstream if this or the other Completable
terminates (wins the termination race) while disposing the connection to the losing source.
- Scheduler:
takeUntil
does not operate by default on a particular Scheduler
.
- Error handling:
- If both this and the other sources signal an error, only one of the errors is signaled to the downstream and the other error is signaled to the global error handler via
RxJavaPlugins.onError(Throwable)
.
History: 2.1.17 - experimental
Params: - other – the other completable source to observe for the terminal signals
Returns: the new Completable instance Since: 2.2
/**
* Terminates the downstream if this or the other {@code Completable}
* terminates (wins the termination race) while disposing the connection to the losing source.
* <p>
* <img width="640" height="468" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.takeuntil.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code takeUntil} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If both this and the other sources signal an error, only one of the errors
* is signaled to the downstream and the other error is signaled to the global
* error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
* </dl>
* <p>History: 2.1.17 - experimental
* @param other the other completable source to observe for the terminal signals
* @return the new Completable instance
* @since 2.2
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable takeUntil(CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new CompletableTakeUntilCompletable(this, other));
}
Returns a Completable that runs this Completable and emits a TimeoutException in case
this Completable doesn't complete within the given time.
- Scheduler:
timeout
signals the TimeoutException on the computation
Scheduler
.
Params: - timeout – the timeout value
- unit – the timeout unit
Throws: - NullPointerException – if unit is null
Returns: the new Completable instance
/**
* Returns a Completable that runs this Completable and emits a TimeoutException in case
* this Completable doesn't complete within the given time.
* <p>
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timeout.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} signals the TimeoutException on the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the timeout unit
* @return the new Completable instance
* @throws NullPointerException if unit is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Completable timeout(long timeout, TimeUnit unit) {
return timeout0(timeout, unit, Schedulers.computation(), null);
}
Returns a Completable that runs this Completable and switches to the other Completable
in case this Completable doesn't complete within the given time.
- Scheduler:
timeout
subscribes to the other CompletableSource on the computation
Scheduler
.
Params: - timeout – the timeout value
- unit – the timeout unit
- other – the other Completable instance to switch to in case of a timeout
Throws: - NullPointerException – if unit or other is null
Returns: the new Completable instance
/**
* Returns a Completable that runs this Completable and switches to the other Completable
* in case this Completable doesn't complete within the given time.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timeout.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} subscribes to the other CompletableSource on
* the {@code computation} {@link Scheduler}.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the timeout unit
* @param other the other Completable instance to switch to in case of a timeout
* @return the new Completable instance
* @throws NullPointerException if unit or other is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Completable timeout(long timeout, TimeUnit unit, CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(timeout, unit, Schedulers.computation(), other);
}
Returns a Completable that runs this Completable and emits a TimeoutException in case
this Completable doesn't complete within the given time while "waiting" on the specified
Scheduler.
- Scheduler:
timeout
signals the TimeoutException on the Scheduler
you specify.
Params: - timeout – the timeout value
- unit – the timeout unit
- scheduler – the scheduler to use to wait for completion
Throws: - NullPointerException – if unit or scheduler is null
Returns: the new Completable instance
/**
* Returns a Completable that runs this Completable and emits a TimeoutException in case
* this Completable doesn't complete within the given time while "waiting" on the specified
* Scheduler.
* <p>
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timeout.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} signals the TimeoutException on the {@link Scheduler} you specify.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the timeout unit
* @param scheduler the scheduler to use to wait for completion
* @return the new Completable instance
* @throws NullPointerException if unit or scheduler is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable timeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return timeout0(timeout, unit, scheduler, null);
}
Returns a Completable that runs this Completable and switches to the other Completable
in case this Completable doesn't complete within the given time while "waiting" on
the specified scheduler.
- Scheduler:
timeout
subscribes to the other CompletableSource on the Scheduler
you specify.
Params: - timeout – the timeout value
- unit – the timeout unit
- scheduler – the scheduler to use to wait for completion
- other – the other Completable instance to switch to in case of a timeout
Throws: - NullPointerException – if unit, scheduler or other is null
Returns: the new Completable instance
/**
* Returns a Completable that runs this Completable and switches to the other Completable
* in case this Completable doesn't complete within the given time while "waiting" on
* the specified scheduler.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timeout.sc.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} subscribes to the other CompletableSource on
* the {@link Scheduler} you specify.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the timeout unit
* @param scheduler the scheduler to use to wait for completion
* @param other the other Completable instance to switch to in case of a timeout
* @return the new Completable instance
* @throws NullPointerException if unit, scheduler or other is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable timeout(long timeout, TimeUnit unit, Scheduler scheduler, CompletableSource other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(timeout, unit, scheduler, other);
}
Returns a Completable that runs this Completable and optionally switches to the other Completable
in case this Completable doesn't complete within the given time while "waiting" on
the specified scheduler.
- Scheduler:
- You specify the
Scheduler
this operator runs on.
Params: - timeout – the timeout value
- unit – the timeout unit
- scheduler – the scheduler to use to wait for completion
- other – the other Completable instance to switch to in case of a timeout,
if null a TimeoutException is emitted instead
Throws: - NullPointerException – if unit or scheduler
Returns: the new Completable instance
/**
* Returns a Completable that runs this Completable and optionally switches to the other Completable
* in case this Completable doesn't complete within the given time while "waiting" on
* the specified scheduler.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify the {@link Scheduler} this operator runs on.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the timeout unit
* @param scheduler the scheduler to use to wait for completion
* @param other the other Completable instance to switch to in case of a timeout,
* if null a TimeoutException is emitted instead
* @return the new Completable instance
* @throws NullPointerException if unit or scheduler
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
private Completable timeout0(long timeout, TimeUnit unit, Scheduler scheduler, CompletableSource other) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new CompletableTimeout(this, timeout, unit, scheduler, other));
}
Allows fluent conversion to another type via a function callback.
- Scheduler:
to
does not operate by default on a particular Scheduler
.
Params: - converter – the function called with this which should return some other value.
Type parameters: - <U> – the output type
Throws: - NullPointerException – if converter is null
Returns: the converted value
/**
* Allows fluent conversion to another type via a function callback.
* <p>
* <img width="640" height="751" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.to.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code to} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the output type
* @param converter the function called with this which should return some other value.
* @return the converted value
* @throws NullPointerException if converter is null
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> U to(Function<? super Completable, U> converter) {
try {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}
Returns a Flowable which when subscribed to subscribes to this Completable and
relays the terminal events to the subscriber.
- Backpressure:
- The returned
Flowable
honors the backpressure of the downstream consumer.
- Scheduler:
toFlowable
does not operate by default on a particular Scheduler
.
Type parameters: - <T> – the value type
Returns: the new Flowable instance
/**
* Returns a Flowable which when subscribed to subscribes to this Completable and
* relays the terminal events to the subscriber.
* <p>
* <img width="640" height="585" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toFlowable.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @return the new Flowable instance
*/
@CheckReturnValue
@SuppressWarnings("unchecked")
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Flowable<T> toFlowable() {
if (this instanceof FuseToFlowable) {
return ((FuseToFlowable<T>)this).fuseToFlowable();
}
return RxJavaPlugins.onAssembly(new CompletableToFlowable<T>(this));
}
Converts this Completable into a Maybe
.
- Scheduler:
toMaybe
does not operate by default on a particular Scheduler
.
Type parameters: - <T> – the value type
Returns: a Maybe
that only calls onComplete
or onError
, based on which one is called by the source Completable.
/**
* Converts this Completable into a {@link Maybe}.
* <p>
* <img width="640" height="585" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toMaybe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the value type
* @return a {@link Maybe} that only calls {@code onComplete} or {@code onError}, based on which one is
* called by the source Completable.
*/
@CheckReturnValue
@SuppressWarnings("unchecked")
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Maybe<T> toMaybe() {
if (this instanceof FuseToMaybe) {
return ((FuseToMaybe<T>)this).fuseToMaybe();
}
return RxJavaPlugins.onAssembly(new MaybeFromCompletable<T>(this));
}
Returns an Observable which when subscribed to subscribes to this Completable and
relays the terminal events to the subscriber.
- Scheduler:
toObservable
does not operate by default on a particular Scheduler
.
Type parameters: - <T> – the value type
Returns: the new Observable created
/**
* Returns an Observable which when subscribed to subscribes to this Completable and
* relays the terminal events to the subscriber.
* <p>
* <img width="640" height="293" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @return the new Observable created
*/
@CheckReturnValue
@SuppressWarnings("unchecked")
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Observable<T> toObservable() {
if (this instanceof FuseToObservable) {
return ((FuseToObservable<T>)this).fuseToObservable();
}
return RxJavaPlugins.onAssembly(new CompletableToObservable<T>(this));
}
Converts this Completable into a Single which when this Completable completes normally,
calls the given supplier and emits its returned value through onSuccess.
- Scheduler:
toSingle
does not operate by default on a particular Scheduler
.
Params: - completionValueSupplier – the value supplier called when this Completable completes normally
Type parameters: - <T> – the value type
Throws: - NullPointerException – if completionValueSupplier is null
Returns: the new Single instance
/**
* Converts this Completable into a Single which when this Completable completes normally,
* calls the given supplier and emits its returned value through onSuccess.
* <p>
* <img width="640" height="583" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param completionValueSupplier the value supplier called when this Completable completes normally
* @return the new Single instance
* @throws NullPointerException if completionValueSupplier is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Single<T> toSingle(final Callable<? extends T> completionValueSupplier) {
ObjectHelper.requireNonNull(completionValueSupplier, "completionValueSupplier is null");
return RxJavaPlugins.onAssembly(new CompletableToSingle<T>(this, completionValueSupplier, null));
}
Converts this Completable into a Single which when this Completable completes normally,
emits the given value through onSuccess.
- Scheduler:
toSingleDefault
does not operate by default on a particular Scheduler
.
Params: - completionValue – the value to emit when this Completable completes normally
Type parameters: - <T> – the value type
Throws: - NullPointerException – if completionValue is null
Returns: the new Single instance
/**
* Converts this Completable into a Single which when this Completable completes normally,
* emits the given value through onSuccess.
* <p>
* <img width="640" height="583" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toSingleDefault.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSingleDefault} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param completionValue the value to emit when this Completable completes normally
* @return the new Single instance
* @throws NullPointerException if completionValue is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Single<T> toSingleDefault(final T completionValue) {
ObjectHelper.requireNonNull(completionValue, "completionValue is null");
return RxJavaPlugins.onAssembly(new CompletableToSingle<T>(this, null, completionValue));
}
Returns a Completable which makes sure when a subscriber disposes the subscription, the
dispose is called on the specified scheduler.
- Scheduler:
unsubscribeOn
calls dispose() of the upstream on the Scheduler
you specify.
Params: - scheduler – the target scheduler where to execute the disposing
Throws: - NullPointerException – if scheduler is null
Returns: the new Completable instance
/**
* Returns a Completable which makes sure when a subscriber disposes the subscription, the
* dispose is called on the specified scheduler.
* <p>
* <img width="640" height="716" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.unsubscribeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.</dd>
* </dl>
* @param scheduler the target scheduler where to execute the disposing
* @return the new Completable instance
* @throws NullPointerException if scheduler is null
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Completable unsubscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new CompletableDisposeOn(this, scheduler));
}
// -------------------------------------------------------------------------
// Fluent test support, super handy and reduces test preparation boilerplate
// -------------------------------------------------------------------------
Creates a TestObserver and subscribes
it to this Completable.
- Scheduler:
test
does not operate by default on a particular Scheduler
.
Returns: the new TestObserver instance Since: 2.0
/**
* Creates a TestObserver and subscribes
* it to this Completable.
* <p>
* <img width="640" height="458" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.test.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestObserver instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final TestObserver<Void> test() {
TestObserver<Void> to = new TestObserver<Void>();
subscribe(to);
return to;
}
Creates a TestObserver optionally in cancelled state, then subscribes it to this Completable.
Params: - cancelled – if true, the TestObserver will be cancelled before subscribing to this
Completable.
- Scheduler:
test
does not operate by default on a particular Scheduler
.
Returns: the new TestObserver instance Since: 2.0
/**
* Creates a TestObserver optionally in cancelled state, then subscribes it to this Completable.
* @param cancelled if true, the TestObserver will be cancelled before subscribing to this
* Completable.
* <p>
* <img width="640" height="499" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.test.b.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestObserver instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final TestObserver<Void> test(boolean cancelled) {
TestObserver<Void> to = new TestObserver<Void>();
if (cancelled) {
to.cancel();
}
subscribe(to);
return to;
}
}