/*
 * Copyright 2014 Red Hat, Inc.
 *
 * Red Hat licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.vertx.rxjava.core.streams;

import java.util.Map;
import rx.Observable;
import rx.Single;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;

Pipe data from a ReadStream to a WriteStream and performs flow control where necessary to prevent the write stream buffer from getting overfull.

Instances of this class read items from a ReadStream and write them to a WriteStream. If data can be read faster than it can be written this could result in the write queue of the WriteStream growing without bound, eventually causing it to exhaust all available RAM.

To prevent this, after each write, instances of this class check whether the write queue of the WriteStream is full, and if so, the ReadStream is paused, and a drainHandler is set on the WriteStream.

When the WriteStream has processed half of its backlog, the drainHandler will be called, which results in the pump resuming the ReadStream.

This class can be used to pipe from any ReadStream to any WriteStream, e.g. from an HttpServerRequest to an AsyncFile, or from NetSocket to a WebSocket.

Please see the documentation for more information.

NOTE: This class has been automatically generated from the original non RX-ified interface using Vert.x codegen.
/** * Pipe data from a {@link io.vertx.rxjava.core.streams.ReadStream} to a {@link io.vertx.rxjava.core.streams.WriteStream} and performs flow control where necessary to * prevent the write stream buffer from getting overfull. * <p> * Instances of this class read items from a {@link io.vertx.rxjava.core.streams.ReadStream} and write them to a {@link io.vertx.rxjava.core.streams.WriteStream}. If data * can be read faster than it can be written this could result in the write queue of the {@link io.vertx.rxjava.core.streams.WriteStream} growing * without bound, eventually causing it to exhaust all available RAM. * <p> * To prevent this, after each write, instances of this class check whether the write queue of the {@link io.vertx.rxjava.core.streams.WriteStream} is full, and if so, the {@link io.vertx.rxjava.core.streams.ReadStream} is paused, and a <code>drainHandler</code> is set on the * {@link io.vertx.rxjava.core.streams.WriteStream}. * <p> * When the {@link io.vertx.rxjava.core.streams.WriteStream} has processed half of its backlog, the <code>drainHandler</code> will be * called, which results in the pump resuming the {@link io.vertx.rxjava.core.streams.ReadStream}. * <p> * This class can be used to pipe from any {@link io.vertx.rxjava.core.streams.ReadStream} to any {@link io.vertx.rxjava.core.streams.WriteStream}, * e.g. from an {@link io.vertx.rxjava.core.http.HttpServerRequest} to an {@link io.vertx.rxjava.core.file.AsyncFile}, * or from {@link io.vertx.rxjava.core.net.NetSocket} to a {@link io.vertx.rxjava.core.http.WebSocket}. * <p> * Please see the documentation for more information. * * <p/> * NOTE: This class has been automatically generated from the {@link io.vertx.core.streams.Pipe original} non RX-ified interface using Vert.x codegen. */
@io.vertx.lang.rx.RxGen(io.vertx.core.streams.Pipe.class) public class Pipe<T> { @Override public String toString() { return delegate.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Pipe that = (Pipe) o; return delegate.equals(that.delegate); } @Override public int hashCode() { return delegate.hashCode(); } public static final io.vertx.lang.rx.TypeArg<Pipe> __TYPE_ARG = new io.vertx.lang.rx.TypeArg<>( obj -> new Pipe((io.vertx.core.streams.Pipe) obj), Pipe::getDelegate ); private final io.vertx.core.streams.Pipe<T> delegate; public final io.vertx.lang.rx.TypeArg<T> __typeArg_0; public Pipe(io.vertx.core.streams.Pipe delegate) { this.delegate = delegate; this.__typeArg_0 = io.vertx.lang.rx.TypeArg.unknown(); } public Pipe(io.vertx.core.streams.Pipe delegate, io.vertx.lang.rx.TypeArg<T> typeArg_0) { this.delegate = delegate; this.__typeArg_0 = typeArg_0; } public io.vertx.core.streams.Pipe getDelegate() { return delegate; }
Set to true to call WriteStream.end when the source ReadStream fails, false otherwise.
Params:
  • end – true to end the stream on a source ReadStream failure
Returns:a reference to this, so the API can be used fluently
/** * Set to <code>true</code> to call {@link io.vertx.rxjava.core.streams.WriteStream#end} when the source <code>ReadStream</code> fails, <code>false</code> otherwise. * @param end <code>true</code> to end the stream on a source <code>ReadStream</code> failure * @return a reference to this, so the API can be used fluently */
public io.vertx.rxjava.core.streams.Pipe<T> endOnFailure(boolean end) { delegate.endOnFailure(end); return this; }
Set to true to call WriteStream.end when the source ReadStream succeeds, false otherwise.
Params:
  • end – true to end the stream on a source ReadStream success
Returns:a reference to this, so the API can be used fluently
/** * Set to <code>true</code> to call {@link io.vertx.rxjava.core.streams.WriteStream#end} when the source <code>ReadStream</code> succeeds, <code>false</code> otherwise. * @param end <code>true</code> to end the stream on a source <code>ReadStream</code> success * @return a reference to this, so the API can be used fluently */
public io.vertx.rxjava.core.streams.Pipe<T> endOnSuccess(boolean end) { delegate.endOnSuccess(end); return this; }
Set to true to call WriteStream.end when the source ReadStream completes, false otherwise.

Calling this overwrites endOnFailure and endOnSuccess.

Params:
  • end – true to end the stream on a source ReadStream completion
Returns:a reference to this, so the API can be used fluently
/** * Set to <code>true</code> to call {@link io.vertx.rxjava.core.streams.WriteStream#end} when the source <code>ReadStream</code> completes, <code>false</code> otherwise. * <p> * Calling this overwrites {@link io.vertx.rxjava.core.streams.Pipe#endOnFailure} and {@link io.vertx.rxjava.core.streams.Pipe#endOnSuccess}. * @param end <code>true</code> to end the stream on a source <code>ReadStream</code> completion * @return a reference to this, so the API can be used fluently */
public io.vertx.rxjava.core.streams.Pipe<T> endOnComplete(boolean end) { delegate.endOnComplete(end); return this; }
Like to but without a completion handler
Params:
  • dst –
/** * Like {@link io.vertx.rxjava.core.streams.Pipe#to} but without a completion handler * @param dst */
public void to(io.vertx.rxjava.core.streams.WriteStream<T> dst) { delegate.to(dst.getDelegate()); }
Start to pipe the elements to the destination WriteStream.

When the operation fails with a write error, the source stream is resumed.

Params:
  • dst – the destination write stream
  • completionHandler – the handler called when the pipe operation completes
/** * Start to pipe the elements to the destination <code>WriteStream</code>. * <p> * When the operation fails with a write error, the source stream is resumed. * @param dst the destination write stream * @param completionHandler the handler called when the pipe operation completes */
public void to(io.vertx.rxjava.core.streams.WriteStream<T> dst, Handler<AsyncResult<Void>> completionHandler) { delegate.to(dst.getDelegate(), completionHandler); }
Start to pipe the elements to the destination WriteStream.

When the operation fails with a write error, the source stream is resumed.

Params:
  • dst – the destination write stream
Returns:
Deprecated:use rxTo instead
/** * Start to pipe the elements to the destination <code>WriteStream</code>. * <p> * When the operation fails with a write error, the source stream is resumed. * @param dst the destination write stream * @return * @deprecated use {@link #rxTo} instead */
@Deprecated() public Observable<Void> toObservable(io.vertx.rxjava.core.streams.WriteStream<T> dst) { io.vertx.rx.java.ObservableFuture<Void> completionHandler = io.vertx.rx.java.RxHelper.observableFuture(); to(dst, completionHandler.toHandler()); return completionHandler; }
Start to pipe the elements to the destination WriteStream.

When the operation fails with a write error, the source stream is resumed.

Params:
  • dst – the destination write stream
Returns:
/** * Start to pipe the elements to the destination <code>WriteStream</code>. * <p> * When the operation fails with a write error, the source stream is resumed. * @param dst the destination write stream * @return */
public Single<Void> rxTo(io.vertx.rxjava.core.streams.WriteStream<T> dst) { return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> { to(dst, fut); })); }
Close the pipe.

The streams handlers will be unset and the read stream resumed unless it is already ended.

/** * Close the pipe. * <p> * The streams handlers will be unset and the read stream resumed unless it is already ended. */
public void close() { delegate.close(); } public static <T>Pipe<T> newInstance(io.vertx.core.streams.Pipe arg) { return arg != null ? new Pipe<T>(arg) : null; } public static <T>Pipe<T> newInstance(io.vertx.core.streams.Pipe arg, io.vertx.lang.rx.TypeArg<T> __typeArg_T) { return arg != null ? new Pipe<T>(arg, __typeArg_T) : null; } }