/*
 * Copyright 2014 Red Hat, Inc.
 *
 * Red Hat licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.vertx.reactivex.core.streams;

import java.util.Map;
import io.reactivex.Observable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.Completable;
import io.reactivex.Maybe;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;

Represents a stream of items that can be read from.

Any class that implements this interface can be used by a Pump to pump data from it to a WriteStream.

Streaming mode

The stream is either in flowing or fetch mode.
    Initially the stream is in flowing mode.
  • When the stream is in flowing mode, elements are delivered to the handler.
  • When the stream is in fetch mode, only the number of requested elements will be delivered to the handler.
The mode can be changed with the pause, resume and fetch methods:
  • Calling resume sets the flowing mode
  • Calling pause sets the fetch mode and resets the demand to 0
  • Calling fetch requests a specific amount of elements and adds it to the actual demand

NOTE: This class has been automatically generated from the original non RX-ified interface using Vert.x codegen.
/** * Represents a stream of items that can be read from. * <p> * Any class that implements this interface can be used by a {@link io.vertx.reactivex.core.streams.Pump} to pump data from it * to a {@link io.vertx.reactivex.core.streams.WriteStream}. * <p> * <h3>Streaming mode</h3> * The stream is either in <i>flowing</i> or <i>fetch</i> mode. * <ul> * <i>Initially the stream is in <i>flowing</i> mode.</i> * <li>When the stream is in <i>flowing</i> mode, elements are delivered to the <code>handler</code>.</li> * <li>When the stream is in <i>fetch</i> mode, only the number of requested elements will be delivered to the <code>handler</code>.</li> * </ul> * The mode can be changed with the {@link io.vertx.reactivex.core.streams.ReadStream#pause}, {@link io.vertx.reactivex.core.streams.ReadStream#resume} and {@link io.vertx.reactivex.core.streams.ReadStream#fetch} methods: * <ul> * <li>Calling {@link io.vertx.reactivex.core.streams.ReadStream#resume} sets the <i>flowing</i> mode</li> * <li>Calling {@link io.vertx.reactivex.core.streams.ReadStream#pause} sets the <i>fetch</i> mode and resets the demand to <code>0</code></li> * <li>Calling {@link io.vertx.reactivex.core.streams.ReadStream#fetch} requests a specific amount of elements and adds it to the actual demand</li> * </ul> * * <p/> * NOTE: This class has been automatically generated from the {@link io.vertx.core.streams.ReadStream original} non RX-ified interface using Vert.x codegen. */
@io.vertx.lang.rx.RxGen(io.vertx.core.streams.ReadStream.class) public interface ReadStream<T> extends io.vertx.reactivex.core.streams.StreamBase { io.vertx.core.streams.ReadStream getDelegate();
Set an exception handler on the read stream.
Params:
  • handler – the exception handler
Returns:a reference to this, so the API can be used fluently
/** * Set an exception handler on the read stream. * @param handler the exception handler * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> exceptionHandler(Handler<Throwable> handler);
Set a data handler. As data is read, the handler will be called with the data.
Params:
  • handler –
Returns:a reference to this, so the API can be used fluently
/** * Set a data handler. As data is read, the handler will be called with the data. * @param handler * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> handler(Handler<T> handler);
Pause the ReadStream, it sets the buffer in fetch mode and clears the actual demand.

While it's paused, no data will be sent to the data handler.

Returns:a reference to this, so the API can be used fluently
/** * Pause the <code>ReadStream</code>, it sets the buffer in <code>fetch</code> mode and clears the actual demand. * <p> * While it's paused, no data will be sent to the data <code>handler</code>. * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> pause();
Resume reading, and sets the buffer in flowing mode.

If the ReadStream has been paused, reading will recommence on it.
Returns:a reference to this, so the API can be used fluently
/** * Resume reading, and sets the buffer in <code>flowing</code> mode. * <p/> * If the <code>ReadStream</code> has been paused, reading will recommence on it. * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> resume();
Fetch the specified amount of elements. If the ReadStream has been paused, reading will recommence with the specified amount of items, otherwise the specified amount will be added to the current stream demand.
Params:
  • amount –
Returns:a reference to this, so the API can be used fluently
/** * Fetch the specified <code>amount</code> of elements. If the <code>ReadStream</code> has been paused, reading will * recommence with the specified <code>amount</code> of items, otherwise the specified <code>amount</code> will * be added to the current stream demand. * @param amount * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> fetch(long amount);
Set an end handler. Once the stream has ended, and there is no more data to be read, this handler will be called.
Params:
  • endHandler –
Returns:a reference to this, so the API can be used fluently
/** * Set an end handler. Once the stream has ended, and there is no more data to be read, this handler will be called. * @param endHandler * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> endHandler(Handler<Void> endHandler);
Pause this stream and return a Pipe to transfer the elements of this stream to a destination WriteStream.

The stream will be resumed when the pipe will be wired to a WriteStream.
Returns:a pipe
/** * Pause this stream and return a {@link io.vertx.reactivex.core.streams.Pipe} to transfer the elements of this stream to a destination {@link io.vertx.reactivex.core.streams.WriteStream}. * <p/> * The stream will be resumed when the pipe will be wired to a <code>WriteStream</code>. * @return a pipe */
public io.vertx.reactivex.core.streams.Pipe<T> pipe();
Like pipeTo but with no completion handler.
Params:
  • dst –
/** * Like {@link io.vertx.reactivex.core.streams.ReadStream#pipeTo} but with no completion handler. * @param dst */
public void pipeTo(io.vertx.reactivex.core.streams.WriteStream<T> dst);
Pipe this ReadStream to the WriteStream.

Elements emitted by this stream will be written to the write stream until this stream ends or fails.

Once this stream has ended or failed, the write stream will be ended and the handler will be called with the result.

Params:
  • dst – the destination write stream
  • handler –
/** * Pipe this <code>ReadStream</code> to the <code>WriteStream</code>. * <p> * Elements emitted by this stream will be written to the write stream until this stream ends or fails. * <p> * Once this stream has ended or failed, the write stream will be ended and the <code>handler</code> will be * called with the result. * @param dst the destination write stream * @param handler */
public void pipeTo(io.vertx.reactivex.core.streams.WriteStream<T> dst, Handler<AsyncResult<Void>> handler); io.reactivex.Observable<T> toObservable(); io.reactivex.Flowable<T> toFlowable(); public static <T>ReadStream<T> newInstance(io.vertx.core.streams.ReadStream arg) { return arg != null ? new ReadStreamImpl<T>(arg) : null; } public static <T>ReadStream<T> newInstance(io.vertx.core.streams.ReadStream arg, io.vertx.lang.rx.TypeArg<T> __typeArg_T) { return arg != null ? new ReadStreamImpl<T>(arg, __typeArg_T) : null; } } class ReadStreamImpl<T> implements ReadStream<T> { private final io.vertx.core.streams.ReadStream<T> delegate; public final io.vertx.lang.rx.TypeArg<T> __typeArg_0; public ReadStreamImpl(io.vertx.core.streams.ReadStream delegate) { this.delegate = delegate; this.__typeArg_0 = io.vertx.lang.rx.TypeArg.unknown(); } public ReadStreamImpl(io.vertx.core.streams.ReadStream delegate, io.vertx.lang.rx.TypeArg<T> typeArg_0) { this.delegate = delegate; this.__typeArg_0 = typeArg_0; } public io.vertx.core.streams.ReadStream getDelegate() { return delegate; } private io.reactivex.Observable<T> observable; private io.reactivex.Flowable<T> flowable; public synchronized io.reactivex.Observable<T> toObservable() { if (observable == null) { java.util.function.Function<T, T> conv = (java.util.function.Function<T, T>) __typeArg_0.wrap; observable = io.vertx.reactivex.ObservableHelper.toObservable(delegate, conv); } return observable; } public synchronized io.reactivex.Flowable<T> toFlowable() { if (flowable == null) { java.util.function.Function<T, T> conv = (java.util.function.Function<T, T>) __typeArg_0.wrap; flowable = io.vertx.reactivex.FlowableHelper.toFlowable(delegate, conv); } return flowable; }
Set an exception handler on the read stream.
Params:
  • handler – the exception handler
Returns:a reference to this, so the API can be used fluently
/** * Set an exception handler on the read stream. * @param handler the exception handler * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> exceptionHandler(Handler<Throwable> handler) { delegate.exceptionHandler(handler); return this; }
Set a data handler. As data is read, the handler will be called with the data.
Params:
  • handler –
Returns:a reference to this, so the API can be used fluently
/** * Set a data handler. As data is read, the handler will be called with the data. * @param handler * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> handler(Handler<T> handler) { delegate.handler(new Handler<T>() { public void handle(T event) { handler.handle((T)__typeArg_0.wrap(event)); } }); return this; }
Pause the ReadStream, it sets the buffer in fetch mode and clears the actual demand.

While it's paused, no data will be sent to the data handler.

Returns:a reference to this, so the API can be used fluently
/** * Pause the <code>ReadStream</code>, it sets the buffer in <code>fetch</code> mode and clears the actual demand. * <p> * While it's paused, no data will be sent to the data <code>handler</code>. * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> pause() { delegate.pause(); return this; }
Resume reading, and sets the buffer in flowing mode.

If the ReadStream has been paused, reading will recommence on it.
Returns:a reference to this, so the API can be used fluently
/** * Resume reading, and sets the buffer in <code>flowing</code> mode. * <p/> * If the <code>ReadStream</code> has been paused, reading will recommence on it. * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> resume() { delegate.resume(); return this; }
Fetch the specified amount of elements. If the ReadStream has been paused, reading will recommence with the specified amount of items, otherwise the specified amount will be added to the current stream demand.
Params:
  • amount –
Returns:a reference to this, so the API can be used fluently
/** * Fetch the specified <code>amount</code> of elements. If the <code>ReadStream</code> has been paused, reading will * recommence with the specified <code>amount</code> of items, otherwise the specified <code>amount</code> will * be added to the current stream demand. * @param amount * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> fetch(long amount) { delegate.fetch(amount); return this; }
Set an end handler. Once the stream has ended, and there is no more data to be read, this handler will be called.
Params:
  • endHandler –
Returns:a reference to this, so the API can be used fluently
/** * Set an end handler. Once the stream has ended, and there is no more data to be read, this handler will be called. * @param endHandler * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.ReadStream<T> endHandler(Handler<Void> endHandler) { delegate.endHandler(endHandler); return this; }
Pause this stream and return a Pipe to transfer the elements of this stream to a destination WriteStream.

The stream will be resumed when the pipe will be wired to a WriteStream.
Returns:a pipe
/** * Pause this stream and return a {@link io.vertx.reactivex.core.streams.Pipe} to transfer the elements of this stream to a destination {@link io.vertx.reactivex.core.streams.WriteStream}. * <p/> * The stream will be resumed when the pipe will be wired to a <code>WriteStream</code>. * @return a pipe */
public io.vertx.reactivex.core.streams.Pipe<T> pipe() { io.vertx.reactivex.core.streams.Pipe<T> ret = io.vertx.reactivex.core.streams.Pipe.newInstance(delegate.pipe(), __typeArg_0); return ret; }
Like ReadStream.pipeTo but with no completion handler.
Params:
  • dst –
/** * Like {@link io.vertx.reactivex.core.streams.ReadStream#pipeTo} but with no completion handler. * @param dst */
public void pipeTo(io.vertx.reactivex.core.streams.WriteStream<T> dst) { delegate.pipeTo(dst.getDelegate()); }
Pipe this ReadStream to the WriteStream.

Elements emitted by this stream will be written to the write stream until this stream ends or fails.

Once this stream has ended or failed, the write stream will be ended and the handler will be called with the result.

Params:
  • dst – the destination write stream
  • handler –
/** * Pipe this <code>ReadStream</code> to the <code>WriteStream</code>. * <p> * Elements emitted by this stream will be written to the write stream until this stream ends or fails. * <p> * Once this stream has ended or failed, the write stream will be ended and the <code>handler</code> will be * called with the result. * @param dst the destination write stream * @param handler */
public void pipeTo(io.vertx.reactivex.core.streams.WriteStream<T> dst, Handler<AsyncResult<Void>> handler) { delegate.pipeTo(dst.getDelegate(), handler); }
Pipe this ReadStream to the WriteStream.

Elements emitted by this stream will be written to the write stream until this stream ends or fails.

Once this stream has ended or failed, the write stream will be ended and the handler will be called with the result.

Params:
  • dst – the destination write stream
Returns:
/** * Pipe this <code>ReadStream</code> to the <code>WriteStream</code>. * <p> * Elements emitted by this stream will be written to the write stream until this stream ends or fails. * <p> * Once this stream has ended or failed, the write stream will be ended and the <code>handler</code> will be * called with the result. * @param dst the destination write stream * @return */
public Completable rxPipeTo(io.vertx.reactivex.core.streams.WriteStream<T> dst) { return io.vertx.reactivex.impl.AsyncResultCompletable.toCompletable(handler -> { pipeTo(dst, handler); }); } }