Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.operators.flowable; import java.util.*; import io.reactivex.Flowable; import io.reactivex.internal.util.*; import io.reactivex.subscribers.DefaultSubscriber;
Returns an Iterable that always returns the item most recently emitted by an Observable, or a seed value if no item has yet been emitted.

Type parameters:
  • <T> – the value type
/** * Returns an Iterable that always returns the item most recently emitted by an Observable, or a * seed value if no item has yet been emitted. * <p> * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt=""> * * @param <T> the value type */
public final class BlockingFlowableMostRecent<T> implements Iterable<T> { final Flowable<T> source; final T initialValue; public BlockingFlowableMostRecent(Flowable<T> source, T initialValue) { this.source = source; this.initialValue = initialValue; } @Override public Iterator<T> iterator() { MostRecentSubscriber<T> mostRecentSubscriber = new MostRecentSubscriber<T>(initialValue); /** * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain * since it is for BlockingObservable. */ source.subscribe(mostRecentSubscriber); return mostRecentSubscriber.getIterable(); } static final class MostRecentSubscriber<T> extends DefaultSubscriber<T> { volatile Object value; MostRecentSubscriber(T value) { this.value = NotificationLite.next(value); } @Override public void onComplete() { value = NotificationLite.complete(); } @Override public void onError(Throwable e) { value = NotificationLite.error(e); } @Override public void onNext(T args) { value = NotificationLite.next(args); }
The Iterator return is not thread safe. In other words don't call Iterator.hasNext() in one thread expect Iterator.next() called from a different thread to work.
Returns:the Iterator
/** * The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one * thread expect {@link Iterator#next()} called from a different thread to work. * @return the Iterator */
public Iterator getIterable() { return new Iterator(); } final class Iterator implements java.util.Iterator<T> {
buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
/** * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). */
private Object buf; @Override public boolean hasNext() { buf = value; return !NotificationLite.isComplete(buf); } @Override public T next() { try { // if hasNext wasn't called before calling next. if (buf == null) { buf = value; } if (NotificationLite.isComplete(buf)) { throw new NoSuchElementException(); } if (NotificationLite.isError(buf)) { throw ExceptionHelper.wrapOrThrow(NotificationLite.getError(buf)); } return NotificationLite.getValue(buf); } finally { buf = null; } } @Override public void remove() { throw new UnsupportedOperationException("Read only iterator"); } } } }