Copyright (c) 2016-present, RxJava Contributors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the License for the specific language governing permissions and limitations under the License.
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.observers;
import java.util.*;
import java.util.concurrent.*;
import io.reactivex.Notification;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.util.*;
Base class with shared infrastructure to support TestSubscriber and TestObserver.
Type parameters: - <T> – the value type consumed
- <U> – the subclass of this BaseTestConsumer
/**
* Base class with shared infrastructure to support TestSubscriber and TestObserver.
* @param <T> the value type consumed
* @param <U> the subclass of this BaseTestConsumer
*/
public abstract class BaseTestConsumer<T, U extends BaseTestConsumer<T, U>> implements Disposable {
The latch that indicates an onError or onComplete has been called. /** The latch that indicates an onError or onComplete has been called. */
protected final CountDownLatch done;
The list of values received. /** The list of values received. */
protected final List<T> values;
The list of errors received. /** The list of errors received. */
protected final List<Throwable> errors;
The number of completions. /** The number of completions. */
protected long completions;
The last thread seen by the observer. /** The last thread seen by the observer. */
protected Thread lastThread;
protected boolean checkSubscriptionOnce;
protected int initialFusionMode;
protected int establishedFusionMode;
The optional tag associated with this test consumer.
Since: 2.0.7
/**
* The optional tag associated with this test consumer.
* @since 2.0.7
*/
protected CharSequence tag;
Indicates that one of the awaitX method has timed out.
Since: 2.0.7
/**
* Indicates that one of the awaitX method has timed out.
* @since 2.0.7
*/
protected boolean timeout;
public BaseTestConsumer() {
this.values = new VolatileSizeArrayList<T>();
this.errors = new VolatileSizeArrayList<Throwable>();
this.done = new CountDownLatch(1);
}
Returns the last thread which called the onXXX methods of this TestObserver/TestSubscriber.
Returns: the last thread which called the onXXX methods
/**
* Returns the last thread which called the onXXX methods of this TestObserver/TestSubscriber.
* @return the last thread which called the onXXX methods
*/
public final Thread lastThread() {
return lastThread;
}
Returns a shared list of received onNext values.
Note that accessing the items via certain methods of the List
interface while the upstream is still actively emitting more items may result in a ConcurrentModificationException
.
The List.size()
method will return the number of items already received by this TestObserver/TestSubscriber in a thread-safe manner that can be read via List.get(int)
) method (index range of 0 to List.size() - 1
).
A view of the returned List can be created via List.subList(int, int)
by using the bounds 0 (inclusive) to List.size()
(exclusive) which, when accessed in a read-only fashion, should be also thread-safe and not throw any ConcurrentModificationException
.
Returns: a list of received onNext values
/**
* Returns a shared list of received onNext values.
* <p>
* Note that accessing the items via certain methods of the {@link List}
* interface while the upstream is still actively emitting
* more items may result in a {@code ConcurrentModificationException}.
* <p>
* The {@link List#size()} method will return the number of items
* already received by this TestObserver/TestSubscriber in a thread-safe
* manner that can be read via {@link List#get(int)}) method
* (index range of 0 to {@code List.size() - 1}).
* <p>
* A view of the returned List can be created via {@link List#subList(int, int)}
* by using the bounds 0 (inclusive) to {@link List#size()} (exclusive) which,
* when accessed in a read-only fashion, should be also thread-safe and not throw any
* {@code ConcurrentModificationException}.
* @return a list of received onNext values
*/
public final List<T> values() {
return values;
}
Returns a shared list of received onError exceptions.
Note that accessing the errors via certain methods of the List
interface while the upstream is still actively emitting more items or errors may result in a ConcurrentModificationException
.
The List.size()
method will return the number of errors already received by this TestObserver/TestSubscriber in a thread-safe manner that can be read via List.get(int)
) method (index range of 0 to List.size() - 1
).
A view of the returned List can be created via List.subList(int, int)
by using the bounds 0 (inclusive) to List.size()
(exclusive) which, when accessed in a read-only fashion, should be also thread-safe and not throw any ConcurrentModificationException
.
Returns: a list of received events onError exceptions
/**
* Returns a shared list of received onError exceptions.
* <p>
* Note that accessing the errors via certain methods of the {@link List}
* interface while the upstream is still actively emitting
* more items or errors may result in a {@code ConcurrentModificationException}.
* <p>
* The {@link List#size()} method will return the number of errors
* already received by this TestObserver/TestSubscriber in a thread-safe
* manner that can be read via {@link List#get(int)}) method
* (index range of 0 to {@code List.size() - 1}).
* <p>
* A view of the returned List can be created via {@link List#subList(int, int)}
* by using the bounds 0 (inclusive) to {@link List#size()} (exclusive) which,
* when accessed in a read-only fashion, should be also thread-safe and not throw any
* {@code ConcurrentModificationException}.
* @return a list of received events onError exceptions
*/
public final List<Throwable> errors() {
return errors;
}
Returns the number of times onComplete was called.
Returns: the number of times onComplete was called
/**
* Returns the number of times onComplete was called.
* @return the number of times onComplete was called
*/
public final long completions() {
return completions;
}
Returns true if TestObserver/TestSubscriber received any onError or onComplete events.
Returns: true if TestObserver/TestSubscriber received any onError or onComplete events
/**
* Returns true if TestObserver/TestSubscriber received any onError or onComplete events.
* @return true if TestObserver/TestSubscriber received any onError or onComplete events
*/
public final boolean isTerminated() {
return done.getCount() == 0;
}
Returns the number of onNext values received.
Returns: the number of onNext values received
/**
* Returns the number of onNext values received.
* @return the number of onNext values received
*/
public final int valueCount() {
return values.size();
}
Returns the number of onError exceptions received.
Returns: the number of onError exceptions received
/**
* Returns the number of onError exceptions received.
* @return the number of onError exceptions received
*/
public final int errorCount() {
return errors.size();
}
Fail with the given message and add the sequence of errors as suppressed ones.
Note this is deliberately the only fail method. Most of the times an assertion
would fail but it is possible it was due to an exception somewhere. This construct
will capture those potential errors and report it along with the original failure.
Params: - message – the message to use
Returns: AssertionError the prepared AssertionError instance
/**
* Fail with the given message and add the sequence of errors as suppressed ones.
* <p>Note this is deliberately the only fail method. Most of the times an assertion
* would fail but it is possible it was due to an exception somewhere. This construct
* will capture those potential errors and report it along with the original failure.
*
* @param message the message to use
* @return AssertionError the prepared AssertionError instance
*/
protected final AssertionError fail(String message) {
StringBuilder b = new StringBuilder(64 + message.length());
b.append(message);
b.append(" (")
.append("latch = ").append(done.getCount()).append(", ")
.append("values = ").append(values.size()).append(", ")
.append("errors = ").append(errors.size()).append(", ")
.append("completions = ").append(completions)
;
if (timeout) {
b.append(", timeout!");
}
if (isDisposed()) {
b.append(", disposed!");
}
CharSequence tag = this.tag;
if (tag != null) {
b.append(", tag = ")
.append(tag);
}
b
.append(')')
;
AssertionError ae = new AssertionError(b.toString());
if (!errors.isEmpty()) {
if (errors.size() == 1) {
ae.initCause(errors.get(0));
} else {
CompositeException ce = new CompositeException(errors);
ae.initCause(ce);
}
}
return ae;
}
Awaits until this TestObserver/TestSubscriber receives an onError or onComplete events.
Throws: - InterruptedException – if the current thread is interrupted while waiting
See Also: Returns: this
/**
* Awaits until this TestObserver/TestSubscriber receives an onError or onComplete events.
* @return this
* @throws InterruptedException if the current thread is interrupted while waiting
* @see #awaitTerminalEvent()
*/
@SuppressWarnings("unchecked")
public final U await() throws InterruptedException {
if (done.getCount() == 0) {
return (U)this;
}
done.await();
return (U)this;
}
Awaits the specified amount of time or until this TestObserver/TestSubscriber
receives an onError or onComplete events, whichever happens first.
Params: - time – the waiting time
- unit – the time unit of the waiting time
Throws: - InterruptedException – if the current thread is interrupted while waiting
See Also: Returns: true if the TestObserver/TestSubscriber terminated, false if timeout happened
/**
* Awaits the specified amount of time or until this TestObserver/TestSubscriber
* receives an onError or onComplete events, whichever happens first.
* @param time the waiting time
* @param unit the time unit of the waiting time
* @return true if the TestObserver/TestSubscriber terminated, false if timeout happened
* @throws InterruptedException if the current thread is interrupted while waiting
* @see #awaitTerminalEvent(long, TimeUnit)
*/
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
boolean d = done.getCount() == 0 || (done.await(time, unit));
timeout = !d;
return d;
}
// assertion methods
Assert that this TestObserver/TestSubscriber received exactly one onComplete event.
Returns: this
/**
* Assert that this TestObserver/TestSubscriber received exactly one onComplete event.
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertComplete() {
long c = completions;
if (c == 0) {
throw fail("Not completed");
} else
if (c > 1) {
throw fail("Multiple completions: " + c);
}
return (U)this;
}
Assert that this TestObserver/TestSubscriber has not received any onComplete event.
Returns: this
/**
* Assert that this TestObserver/TestSubscriber has not received any onComplete event.
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertNotComplete() {
long c = completions;
if (c == 1) {
throw fail("Completed!");
} else
if (c > 1) {
throw fail("Multiple completions: " + c);
}
return (U)this;
}
Assert that this TestObserver/TestSubscriber has not received any onError event.
Returns: this
/**
* Assert that this TestObserver/TestSubscriber has not received any onError event.
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertNoErrors() {
int s = errors.size();
if (s != 0) {
throw fail("Error(s) present: " + errors);
}
return (U)this;
}
Assert that this TestObserver/TestSubscriber received exactly the specified onError event value.
The comparison is performed via Objects.equals(); since most exceptions don't implement equals(), this assertion may fail. Use the assertError(Class)
overload to test against the class of an error instead of an instance of an error or assertError(Predicate)
to test with different condition.
Params: - error – the error to check
See Also: Returns: this
/**
* Assert that this TestObserver/TestSubscriber received exactly the specified onError event value.
*
* <p>The comparison is performed via Objects.equals(); since most exceptions don't
* implement equals(), this assertion may fail. Use the {@link #assertError(Class)}
* overload to test against the class of an error instead of an instance of an error
* or {@link #assertError(Predicate)} to test with different condition.
* @param error the error to check
* @return this
* @see #assertError(Class)
* @see #assertError(Predicate)
*/
public final U assertError(Throwable error) {
return assertError(Functions.equalsWith(error));
}
Asserts that this TestObserver/TestSubscriber received exactly one onError event which is an
instance of the specified errorClass class.
Params: - errorClass – the error class to expect
Returns: this
/**
* Asserts that this TestObserver/TestSubscriber received exactly one onError event which is an
* instance of the specified errorClass class.
* @param errorClass the error class to expect
* @return this
*/
@SuppressWarnings({ "unchecked", "rawtypes", "cast" })
public final U assertError(Class<? extends Throwable> errorClass) {
return (U)assertError((Predicate)Functions.isInstanceOf(errorClass));
}
Asserts that this TestObserver/TestSubscriber received exactly one onError event for which
the provided predicate returns true.
Params: - errorPredicate –
the predicate that receives the error Throwable
and should return true for expected errors.
Returns: this
/**
* Asserts that this TestObserver/TestSubscriber received exactly one onError event for which
* the provided predicate returns true.
* @param errorPredicate
* the predicate that receives the error Throwable
* and should return true for expected errors.
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertError(Predicate<Throwable> errorPredicate) {
int s = errors.size();
if (s == 0) {
throw fail("No errors");
}
boolean found = false;
for (Throwable e : errors) {
try {
if (errorPredicate.test(e)) {
found = true;
break;
}
} catch (Exception ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
if (found) {
if (s != 1) {
throw fail("Error present but other errors as well");
}
} else {
throw fail("Error not present");
}
return (U)this;
}
Assert that this TestObserver/TestSubscriber received exactly one onNext value which is equal to
the given value with respect to Objects.equals.
Params: - value – the value to expect
Returns: this
/**
* Assert that this TestObserver/TestSubscriber received exactly one onNext value which is equal to
* the given value with respect to Objects.equals.
* @param value the value to expect
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertValue(T value) {
int s = values.size();
if (s != 1) {
throw fail("expected: " + valueAndClass(value) + " but was: " + values);
}
T v = values.get(0);
if (!ObjectHelper.equals(value, v)) {
throw fail("expected: " + valueAndClass(value) + " but was: " + valueAndClass(v));
}
return (U)this;
}
Assert that this TestObserver/TestSubscriber did not receive an onNext value which is equal to
the given value with respect to null-safe Object.equals.
History: 2.0.5 - experimental
Params: - value – the value to expect not being received
Returns: this Since: 2.1
/**
* Assert that this TestObserver/TestSubscriber did not receive an onNext value which is equal to
* the given value with respect to null-safe Object.equals.
*
* <p>History: 2.0.5 - experimental
* @param value the value to expect not being received
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertNever(T value) {
int s = values.size();
for (int i = 0; i < s; i++) {
T v = this.values.get(i);
if (ObjectHelper.equals(v, value)) {
throw fail("Value at position " + i + " is equal to " + valueAndClass(value) + "; Expected them to be different");
}
}
return (U) this;
}
Asserts that this TestObserver/TestSubscriber received exactly one onNext value for which
the provided predicate returns true.
Params: - valuePredicate –
the predicate that receives the onNext value
and should return true for the expected value.
Returns: this
/**
* Asserts that this TestObserver/TestSubscriber received exactly one onNext value for which
* the provided predicate returns true.
* @param valuePredicate
* the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertValue(Predicate<T> valuePredicate) {
assertValueAt(0, valuePredicate);
if (values.size() > 1) {
throw fail("Value present but other values as well");
}
return (U)this;
}
Asserts that this TestObserver/TestSubscriber did not receive any onNext value for which
the provided predicate returns true.
History: 2.0.5 - experimental
Params: - valuePredicate – the predicate that receives the onNext value
and should return true for the expected value.
Returns: this Since: 2.1
/**
* Asserts that this TestObserver/TestSubscriber did not receive any onNext value for which
* the provided predicate returns true.
*
* <p>History: 2.0.5 - experimental
* @param valuePredicate the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertNever(Predicate<? super T> valuePredicate) {
int s = values.size();
for (int i = 0; i < s; i++) {
T v = this.values.get(i);
try {
if (valuePredicate.test(v)) {
throw fail("Value at position " + i + " matches predicate " + valuePredicate.toString() + ", which was not expected.");
}
} catch (Exception ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
return (U)this;
}
Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
which is equal to the given value with respect to null-safe Object.equals.
History: 2.1.3 - experimental
Params: - index – the position to assert on
- value – the value to expect
Returns: this Since: 2.2
/**
* Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
* which is equal to the given value with respect to null-safe Object.equals.
* <p>History: 2.1.3 - experimental
* @param index the position to assert on
* @param value the value to expect
* @return this
* @since 2.2
*/
@SuppressWarnings("unchecked")
public final U assertValueAt(int index, T value) {
int s = values.size();
if (s == 0) {
throw fail("No values");
}
if (index >= s) {
throw fail("Invalid index: " + index);
}
T v = values.get(index);
if (!ObjectHelper.equals(value, v)) {
throw fail("expected: " + valueAndClass(value) + " but was: " + valueAndClass(v));
}
return (U)this;
}
Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
for the provided predicate returns true.
Params: - index – the position to assert on
- valuePredicate –
the predicate that receives the onNext value
and should return true for the expected value.
Returns: this
/**
* Asserts that this TestObserver/TestSubscriber received an onNext value at the given index
* for the provided predicate returns true.
* @param index the position to assert on
* @param valuePredicate
* the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertValueAt(int index, Predicate<T> valuePredicate) {
int s = values.size();
if (s == 0) {
throw fail("No values");
}
if (index >= values.size()) {
throw fail("Invalid index: " + index);
}
boolean found = false;
try {
if (valuePredicate.test(values.get(index))) {
found = true;
}
} catch (Exception ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
if (!found) {
throw fail("Value not present");
}
return (U)this;
}
Appends the class name to a non-null value.
Params: - o – the object
Returns: the string representation
/**
* Appends the class name to a non-null value.
* @param o the object
* @return the string representation
*/
public static String valueAndClass(Object o) {
if (o != null) {
return o + " (class: " + o.getClass().getSimpleName() + ")";
}
return "null";
}
Assert that this TestObserver/TestSubscriber received the specified number onNext events.
Params: - count – the expected number of onNext events
Returns: this
/**
* Assert that this TestObserver/TestSubscriber received the specified number onNext events.
* @param count the expected number of onNext events
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertValueCount(int count) {
int s = values.size();
if (s != count) {
throw fail("Value counts differ; expected: " + count + " but was: " + s);
}
return (U)this;
}
Assert that this TestObserver/TestSubscriber has not received any onNext events.
Returns: this
/**
* Assert that this TestObserver/TestSubscriber has not received any onNext events.
* @return this
*/
public final U assertNoValues() {
return assertValueCount(0);
}
Assert that the TestObserver/TestSubscriber received only the specified values in the specified order.
Params: - values – the values expected
See Also: Returns: this
/**
* Assert that the TestObserver/TestSubscriber received only the specified values in the specified order.
* @param values the values expected
* @return this
* @see #assertValueSet(Collection)
*/
@SuppressWarnings("unchecked")
public final U assertValues(T... values) {
int s = this.values.size();
if (s != values.length) {
throw fail("Value count differs; expected: " + values.length + " " + Arrays.toString(values)
+ " but was: " + s + " " + this.values);
}
for (int i = 0; i < s; i++) {
T v = this.values.get(i);
T u = values[i];
if (!ObjectHelper.equals(u, v)) {
throw fail("Values at position " + i + " differ; expected: " + valueAndClass(u) + " but was: " + valueAndClass(v));
}
}
return (U)this;
}
Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
History: 2.1.4 - experimental
Params: - values – the values expected
Returns: this Since: 2.2
/**
* Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
* <p>History: 2.1.4 - experimental
* @param values the values expected
* @return this
* @since 2.2
*/
@SuppressWarnings("unchecked")
public final U assertValuesOnly(T... values) {
return assertSubscribed()
.assertValues(values)
.assertNoErrors()
.assertNotComplete();
}
Assert that the TestObserver/TestSubscriber received only items that are in the specified
collection as well, irrespective of the order they were received.
This helps asserting when the order of the values is not guaranteed, i.e., when merging
asynchronous streams.
To ensure that only the expected items have been received, no more and no less, in any order, apply assertValueCount(int)
with expected.size()
.
Params: - expected – the collection of values expected in any order
Returns: this
/**
* Assert that the TestObserver/TestSubscriber received only items that are in the specified
* collection as well, irrespective of the order they were received.
* <p>
* This helps asserting when the order of the values is not guaranteed, i.e., when merging
* asynchronous streams.
* <p>
* To ensure that only the expected items have been received, no more and no less, in any order,
* apply {@link #assertValueCount(int)} with {@code expected.size()}.
*
* @param expected the collection of values expected in any order
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertValueSet(Collection<? extends T> expected) {
if (expected.isEmpty()) {
assertNoValues();
return (U)this;
}
for (T v : this.values) {
if (!expected.contains(v)) {
throw fail("Value not in the expected collection: " + valueAndClass(v));
}
}
return (U)this;
}
Assert that the TestObserver/TestSubscriber received only the specified values in any order without terminating.
History: 2.1.14 - experimental
Params: - expected – the collection of values expected in any order
Returns: this Since: 2.2
/**
* Assert that the TestObserver/TestSubscriber received only the specified values in any order without terminating.
* <p>History: 2.1.14 - experimental
* @param expected the collection of values expected in any order
* @return this
* @since 2.2
*/
public final U assertValueSetOnly(Collection<? extends T> expected) {
return assertSubscribed()
.assertValueSet(expected)
.assertNoErrors()
.assertNotComplete();
}
Assert that the TestObserver/TestSubscriber received only the specified sequence of values in the same order.
Params: - sequence – the sequence of expected values in order
Returns: this
/**
* Assert that the TestObserver/TestSubscriber received only the specified sequence of values in the same order.
* @param sequence the sequence of expected values in order
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertValueSequence(Iterable<? extends T> sequence) {
int i = 0;
Iterator<T> actualIterator = values.iterator();
Iterator<? extends T> expectedIterator = sequence.iterator();
boolean actualNext;
boolean expectedNext;
for (;;) {
expectedNext = expectedIterator.hasNext();
actualNext = actualIterator.hasNext();
if (!actualNext || !expectedNext) {
break;
}
T u = expectedIterator.next();
T v = actualIterator.next();
if (!ObjectHelper.equals(u, v)) {
throw fail("Values at position " + i + " differ; expected: " + valueAndClass(u) + " but was: " + valueAndClass(v));
}
i++;
}
if (actualNext) {
throw fail("More values received than expected (" + i + ")");
}
if (expectedNext) {
throw fail("Fewer values received than expected (" + i + ")");
}
return (U)this;
}
Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
History: 2.1.14 - experimental
Params: - sequence – the sequence of expected values in order
Returns: this Since: 2.2
/**
* Assert that the TestObserver/TestSubscriber received only the specified values in the specified order without terminating.
* <p>History: 2.1.14 - experimental
* @param sequence the sequence of expected values in order
* @return this
* @since 2.2
*/
public final U assertValueSequenceOnly(Iterable<? extends T> sequence) {
return assertSubscribed()
.assertValueSequence(sequence)
.assertNoErrors()
.assertNotComplete();
}
Assert that the TestObserver/TestSubscriber terminated (i.e., the terminal latch reached zero).
Returns: this
/**
* Assert that the TestObserver/TestSubscriber terminated (i.e., the terminal latch reached zero).
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertTerminated() {
if (done.getCount() != 0) {
throw fail("Subscriber still running!");
}
long c = completions;
if (c > 1) {
throw fail("Terminated with multiple completions: " + c);
}
int s = errors.size();
if (s > 1) {
throw fail("Terminated with multiple errors: " + s);
}
if (c != 0 && s != 0) {
throw fail("Terminated with multiple completions and errors: " + c);
}
return (U)this;
}
Assert that the TestObserver/TestSubscriber has not terminated (i.e., the terminal latch is still non-zero).
Returns: this
/**
* Assert that the TestObserver/TestSubscriber has not terminated (i.e., the terminal latch is still non-zero).
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertNotTerminated() {
if (done.getCount() == 0) {
throw fail("Subscriber terminated!");
}
return (U)this;
}
Waits until the any terminal event has been received by this TestObserver/TestSubscriber
or returns false if the wait has been interrupted.
Returns: true if the TestObserver/TestSubscriber terminated, false if the wait has been interrupted
/**
* Waits until the any terminal event has been received by this TestObserver/TestSubscriber
* or returns false if the wait has been interrupted.
* @return true if the TestObserver/TestSubscriber terminated, false if the wait has been interrupted
*/
public final boolean awaitTerminalEvent() {
try {
await();
return true;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return false;
}
}
Awaits the specified amount of time or until this TestObserver/TestSubscriber
receives an onError or onComplete events, whichever happens first.
Params: - duration – the waiting time
- unit – the time unit of the waiting time
Returns: true if the TestObserver/TestSubscriber terminated, false if timeout or interrupt happened
/**
* Awaits the specified amount of time or until this TestObserver/TestSubscriber
* receives an onError or onComplete events, whichever happens first.
* @param duration the waiting time
* @param unit the time unit of the waiting time
* @return true if the TestObserver/TestSubscriber terminated, false if timeout or interrupt happened
*/
public final boolean awaitTerminalEvent(long duration, TimeUnit unit) {
try {
return await(duration, unit);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return false;
}
}
Assert that there is a single error and it has the given message.
Params: - message – the message expected
Returns: this
/**
* Assert that there is a single error and it has the given message.
* @param message the message expected
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertErrorMessage(String message) {
int s = errors.size();
if (s == 0) {
throw fail("No errors");
} else
if (s == 1) {
Throwable e = errors.get(0);
String errorMessage = e.getMessage();
if (!ObjectHelper.equals(message, errorMessage)) {
throw fail("Error message differs; exptected: " + message + " but was: " + errorMessage);
}
} else {
throw fail("Multiple errors");
}
return (U)this;
}
Returns a list of 3 other lists: the first inner list contains the plain
values received; the second list contains the potential errors
and the final list contains the potential completions as Notifications.
Returns: a list of (values, errors, completion-notifications)
/**
* Returns a list of 3 other lists: the first inner list contains the plain
* values received; the second list contains the potential errors
* and the final list contains the potential completions as Notifications.
*
* @return a list of (values, errors, completion-notifications)
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public final List<List<Object>> getEvents() {
List<List<Object>> result = new ArrayList<List<Object>>();
result.add((List)values());
result.add((List)errors());
List<Object> completeList = new ArrayList<Object>();
for (long i = 0; i < completions; i++) {
completeList.add(Notification.createOnComplete());
}
result.add(completeList);
return result;
}
Assert that the onSubscribe method was called exactly once.
Returns: this
/**
* Assert that the onSubscribe method was called exactly once.
* @return this
*/
public abstract U assertSubscribed();
Assert that the onSubscribe method hasn't been called at all.
Returns: this
/**
* Assert that the onSubscribe method hasn't been called at all.
* @return this
*/
public abstract U assertNotSubscribed();
Assert that the upstream signalled the specified values in order and
completed normally.
Params: - values – the expected values, asserted in order
See Also: Returns: this
/**
* Assert that the upstream signalled the specified values in order and
* completed normally.
* @param values the expected values, asserted in order
* @return this
* @see #assertFailure(Class, Object...)
* @see #assertFailure(Predicate, Object...)
* @see #assertFailureAndMessage(Class, String, Object...)
*/
public final U assertResult(T... values) {
return assertSubscribed()
.assertValues(values)
.assertNoErrors()
.assertComplete();
}
Assert that the upstream signalled the specified values in order
and then failed with a specific class or subclass of Throwable.
Params: - error – the expected exception (parent) class
- values – the expected values, asserted in order
Returns: this
/**
* Assert that the upstream signalled the specified values in order
* and then failed with a specific class or subclass of Throwable.
* @param error the expected exception (parent) class
* @param values the expected values, asserted in order
* @return this
*/
public final U assertFailure(Class<? extends Throwable> error, T... values) {
return assertSubscribed()
.assertValues(values)
.assertError(error)
.assertNotComplete();
}
Assert that the upstream signalled the specified values in order and then failed
with a Throwable for which the provided predicate returns true.
Params: - errorPredicate –
the predicate that receives the error Throwable
and should return true for expected errors.
- values – the expected values, asserted in order
Returns: this
/**
* Assert that the upstream signalled the specified values in order and then failed
* with a Throwable for which the provided predicate returns true.
* @param errorPredicate
* the predicate that receives the error Throwable
* and should return true for expected errors.
* @param values the expected values, asserted in order
* @return this
*/
public final U assertFailure(Predicate<Throwable> errorPredicate, T... values) {
return assertSubscribed()
.assertValues(values)
.assertError(errorPredicate)
.assertNotComplete();
}
Assert that the upstream signalled the specified values in order,
then failed with a specific class or subclass of Throwable
and with the given exact error message.
Params: - error – the expected exception (parent) class
- message – the expected failure message
- values – the expected values, asserted in order
Returns: this
/**
* Assert that the upstream signalled the specified values in order,
* then failed with a specific class or subclass of Throwable
* and with the given exact error message.
* @param error the expected exception (parent) class
* @param message the expected failure message
* @param values the expected values, asserted in order
* @return this
*/
public final U assertFailureAndMessage(Class<? extends Throwable> error,
String message, T... values) {
return assertSubscribed()
.assertValues(values)
.assertError(error)
.assertErrorMessage(message)
.assertNotComplete();
}
Awaits until the internal latch is counted down.
If the wait times out or gets interrupted, the TestObserver/TestSubscriber is cancelled.
Params: - time – the waiting time
- unit – the time unit of the waiting time
Throws: - RuntimeException – wrapping an InterruptedException if the wait is interrupted
Returns: this
/**
* Awaits until the internal latch is counted down.
* <p>If the wait times out or gets interrupted, the TestObserver/TestSubscriber is cancelled.
* @param time the waiting time
* @param unit the time unit of the waiting time
* @return this
* @throws RuntimeException wrapping an InterruptedException if the wait is interrupted
*/
@SuppressWarnings("unchecked")
public final U awaitDone(long time, TimeUnit unit) {
try {
if (!done.await(time, unit)) {
timeout = true;
dispose();
}
} catch (InterruptedException ex) {
dispose();
throw ExceptionHelper.wrapOrThrow(ex);
}
return (U)this;
}
Assert that the TestObserver/TestSubscriber has received a Disposable but no other events.
Returns: this
/**
* Assert that the TestObserver/TestSubscriber has received a Disposable but no other events.
* @return this
*/
public final U assertEmpty() {
return assertSubscribed()
.assertNoValues()
.assertNoErrors()
.assertNotComplete();
}
Set the tag displayed along with an assertion failure's
other state information.
History: 2.0.7 - experimental
Params: - tag – the string to display (null won't print any tag)
Returns: this Since: 2.1
/**
* Set the tag displayed along with an assertion failure's
* other state information.
* <p>History: 2.0.7 - experimental
* @param tag the string to display (null won't print any tag)
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U withTag(CharSequence tag) {
this.tag = tag;
return (U)this;
}
Enumeration of default wait strategies when waiting for a specific number of items in BaseTestConsumer.awaitCount(int, Runnable)
. History: 2.0.7 - experimental
Since: 2.1
/**
* Enumeration of default wait strategies when waiting for a specific number of
* items in {@link BaseTestConsumer#awaitCount(int, Runnable)}.
* <p>History: 2.0.7 - experimental
* @since 2.1
*/
public enum TestWaitStrategy implements Runnable {
The wait loop will spin as fast as possible. /** The wait loop will spin as fast as possible. */
SPIN {
@Override
public void run() {
// nothing to do
}
},
The current thread will be yielded. /** The current thread will be yielded. */
YIELD {
@Override
public void run() {
Thread.yield();
}
},
The current thread sleeps for 1 millisecond. /** The current thread sleeps for 1 millisecond. */
SLEEP_1MS {
@Override
public void run() {
sleep(1);
}
},
The current thread sleeps for 10 milliseconds. /** The current thread sleeps for 10 milliseconds. */
SLEEP_10MS {
@Override
public void run() {
sleep(10);
}
},
The current thread sleeps for 100 milliseconds. /** The current thread sleeps for 100 milliseconds. */
SLEEP_100MS {
@Override
public void run() {
sleep(100);
}
},
The current thread sleeps for 1000 milliseconds. /** The current thread sleeps for 1000 milliseconds. */
SLEEP_1000MS {
@Override
public void run() {
sleep(1000);
}
}
;
@Override
public abstract void run();
static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
Await until the TestObserver/TestSubscriber receives the given
number of items or terminates by sleeping 10 milliseconds at a time
up to 5000 milliseconds of timeout.
History: 2.0.7 - experimental
Params: - atLeast – the number of items expected at least
See Also: Returns: this Since: 2.1
/**
* Await until the TestObserver/TestSubscriber receives the given
* number of items or terminates by sleeping 10 milliseconds at a time
* up to 5000 milliseconds of timeout.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @return this
* @see #awaitCount(int, Runnable, long)
* @since 2.1
*/
public final U awaitCount(int atLeast) {
return awaitCount(atLeast, TestWaitStrategy.SLEEP_10MS, 5000);
}
Await until the TestObserver/TestSubscriber receives the given
number of items or terminates by waiting according to the wait
strategy and up to 5000 milliseconds of timeout.
History: 2.0.7 - experimental
Params: - atLeast – the number of items expected at least
- waitStrategy – a Runnable called when the current received count hasn't reached the expected value and there was no terminal event either, see
TestWaitStrategy
for examples
See Also: Returns: this Since: 2.1
/**
* Await until the TestObserver/TestSubscriber receives the given
* number of items or terminates by waiting according to the wait
* strategy and up to 5000 milliseconds of timeout.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
* no terminal event either, see {@link TestWaitStrategy}
* for examples
* @return this
* @see #awaitCount(int, Runnable, long)
* @since 2.1
*/
public final U awaitCount(int atLeast, Runnable waitStrategy) {
return awaitCount(atLeast, waitStrategy, 5000);
}
Await until the TestObserver/TestSubscriber receives the given
number of items or terminates.
History: 2.0.7 - experimental
Params: - atLeast – the number of items expected at least
- waitStrategy – a Runnable called when the current received count hasn't reached the expected value and there was no terminal event either, see
TestWaitStrategy
for examples - timeoutMillis – if positive, the await ends if the specified amount of
time has passed no matter how many items were received
Returns: this Since: 2.1
/**
* Await until the TestObserver/TestSubscriber receives the given
* number of items or terminates.
* <p>History: 2.0.7 - experimental
* @param atLeast the number of items expected at least
* @param waitStrategy a Runnable called when the current received count
* hasn't reached the expected value and there was
* no terminal event either, see {@link TestWaitStrategy}
* for examples
* @param timeoutMillis if positive, the await ends if the specified amount of
* time has passed no matter how many items were received
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U awaitCount(int atLeast, Runnable waitStrategy, long timeoutMillis) {
long start = System.currentTimeMillis();
for (;;) {
if (timeoutMillis > 0L && System.currentTimeMillis() - start >= timeoutMillis) {
timeout = true;
break;
}
if (done.getCount() == 0L) {
break;
}
if (values.size() >= atLeast) {
break;
}
waitStrategy.run();
}
return (U)this;
}
Returns true if an await timed out.
See Also: Returns: true if one of the timeout-based await methods has timed out.
History: 2.0.7 - experimental
Since: 2.1
/**
* Returns true if an await timed out.
* @return true if one of the timeout-based await methods has timed out.
* <p>History: 2.0.7 - experimental
* @see #clearTimeout()
* @see #assertTimeout()
* @see #assertNoTimeout()
* @since 2.1
*/
public final boolean isTimeout() {
return timeout;
}
Clears the timeout flag set by the await methods when they timed out.
History: 2.0.7 - experimental
See Also: Returns: this Since: 2.1
/**
* Clears the timeout flag set by the await methods when they timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.1
* @see #isTimeout()
*/
@SuppressWarnings("unchecked")
public final U clearTimeout() {
timeout = false;
return (U)this;
}
Asserts that some awaitX method has timed out.
History: 2.0.7 - experimental
Returns: this Since: 2.1
/**
* Asserts that some awaitX method has timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertTimeout() {
if (!timeout) {
throw fail("No timeout?!");
}
return (U)this;
}
Asserts that some awaitX method has not timed out.
History: 2.0.7 - experimental
Returns: this Since: 2.1
/**
* Asserts that some awaitX method has not timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertNoTimeout() {
if (timeout) {
throw fail("Timeout?!");
}
return (U)this;
}
}