/*
 * Copyright 2014 Red Hat, Inc.
 *
 * Red Hat licenses this file to you under the Apache License, version 2.0
 * (the "License"); you may not use this file except in compliance with the
 * License.  You may obtain a copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package io.vertx.reactivex.core.streams;

import java.util.Map;
import io.reactivex.Observable;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.Completable;
import io.reactivex.Maybe;

Pumps data from a ReadStream to a WriteStream and performs flow control where necessary to prevent the write stream buffer from getting overfull.

Instances of this class read items from a ReadStream and write them to a WriteStream. If data can be read faster than it can be written this could result in the write queue of the WriteStream growing without bound, eventually causing it to exhaust all available RAM.

To prevent this, after each write, instances of this class check whether the write queue of the WriteStream is full, and if so, the ReadStream is paused, and a drainHandler is set on the WriteStream.

When the WriteStream has processed half of its backlog, the drainHandler will be called, which results in the pump resuming the ReadStream.

This class can be used to pump from any ReadStream to any WriteStream, e.g. from an HttpServerRequest to an AsyncFile, or from NetSocket to a WebSocket.

Please see the documentation for more information.

NOTE: This class has been automatically generated from the original non RX-ified interface using Vert.x codegen.
/** * Pumps data from a {@link io.vertx.reactivex.core.streams.ReadStream} to a {@link io.vertx.reactivex.core.streams.WriteStream} and performs flow control where necessary to * prevent the write stream buffer from getting overfull. * <p> * Instances of this class read items from a {@link io.vertx.reactivex.core.streams.ReadStream} and write them to a {@link io.vertx.reactivex.core.streams.WriteStream}. If data * can be read faster than it can be written this could result in the write queue of the {@link io.vertx.reactivex.core.streams.WriteStream} growing * without bound, eventually causing it to exhaust all available RAM. * <p> * To prevent this, after each write, instances of this class check whether the write queue of the {@link io.vertx.reactivex.core.streams.WriteStream} is full, and if so, the {@link io.vertx.reactivex.core.streams.ReadStream} is paused, and a <code>drainHandler</code> is set on the * {@link io.vertx.reactivex.core.streams.WriteStream}. * <p> * When the {@link io.vertx.reactivex.core.streams.WriteStream} has processed half of its backlog, the <code>drainHandler</code> will be * called, which results in the pump resuming the {@link io.vertx.reactivex.core.streams.ReadStream}. * <p> * This class can be used to pump from any {@link io.vertx.reactivex.core.streams.ReadStream} to any {@link io.vertx.reactivex.core.streams.WriteStream}, * e.g. from an {@link io.vertx.reactivex.core.http.HttpServerRequest} to an {@link io.vertx.reactivex.core.file.AsyncFile}, * or from {@link io.vertx.reactivex.core.net.NetSocket} to a {@link io.vertx.reactivex.core.http.WebSocket}. * <p> * Please see the documentation for more information. * * <p/> * NOTE: This class has been automatically generated from the {@link io.vertx.core.streams.Pump original} non RX-ified interface using Vert.x codegen. */
@io.vertx.lang.rx.RxGen(io.vertx.core.streams.Pump.class) public class Pump { @Override public String toString() { return delegate.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Pump that = (Pump) o; return delegate.equals(that.delegate); } @Override public int hashCode() { return delegate.hashCode(); } public static final io.vertx.lang.rx.TypeArg<Pump> __TYPE_ARG = new io.vertx.lang.rx.TypeArg<>( obj -> new Pump((io.vertx.core.streams.Pump) obj), Pump::getDelegate ); private final io.vertx.core.streams.Pump delegate; public Pump(io.vertx.core.streams.Pump delegate) { this.delegate = delegate; } public io.vertx.core.streams.Pump getDelegate() { return delegate; }
Create a new Pump with the given ReadStream and WriteStream
Params:
  • rs – the read stream
  • ws – the write stream
Returns:the pump
/** * Create a new <code>Pump</code> with the given <code>ReadStream</code> and <code>WriteStream</code> * @param rs the read stream * @param ws the write stream * @return the pump */
public static <T> io.vertx.reactivex.core.streams.Pump pump(io.vertx.reactivex.core.streams.ReadStream<T> rs, io.vertx.reactivex.core.streams.WriteStream<T> ws) { io.vertx.reactivex.core.streams.Pump ret = io.vertx.reactivex.core.streams.Pump.newInstance(io.vertx.core.streams.Pump.pump(rs.getDelegate(), ws.getDelegate())); return ret; }
Create a new Pump with the given ReadStream and WriteStream
Params:
  • rs – the read stream
  • ws – the write stream
Returns:the pump
/** * Create a new <code>Pump</code> with the given <code>ReadStream</code> and <code>WriteStream</code> * @param rs the read stream * @param ws the write stream * @return the pump */
public static <T> io.vertx.reactivex.core.streams.Pump pump(Flowable<T> rs, io.vertx.reactivex.core.streams.WriteStream<T> ws) { io.vertx.reactivex.core.streams.Pump ret = io.vertx.reactivex.core.streams.Pump.newInstance(io.vertx.core.streams.Pump.pump(io.vertx.reactivex.impl.ReadStreamSubscriber.asReadStream(rs,java.util.function.Function.identity()).resume(), ws.getDelegate())); return ret; }
Create a new Pump with the given ReadStream and WriteStream
Params:
  • rs – the read stream
  • ws – the write stream
Returns:the pump
/** * Create a new <code>Pump</code> with the given <code>ReadStream</code> and <code>WriteStream</code> * @param rs the read stream * @param ws the write stream * @return the pump */
public static <T> io.vertx.reactivex.core.streams.Pump pump(Observable<T> rs, io.vertx.reactivex.core.streams.WriteStream<T> ws) { io.vertx.reactivex.core.streams.Pump ret = io.vertx.reactivex.core.streams.Pump.newInstance(io.vertx.core.streams.Pump.pump(io.vertx.reactivex.impl.ReadStreamSubscriber.asReadStream(rs,java.util.function.Function.identity()).resume(), ws.getDelegate())); return ret; }
Create a new Pump with the given ReadStream and WriteStream and writeQueueMaxSize
Params:
  • rs – the read stream
  • ws – the write stream
  • writeQueueMaxSize – the max size of the write queue
Returns:the pump
/** * Create a new <code>Pump</code> with the given <code>ReadStream</code> and <code>WriteStream</code> and * <code>writeQueueMaxSize</code> * @param rs the read stream * @param ws the write stream * @param writeQueueMaxSize the max size of the write queue * @return the pump */
public static <T> io.vertx.reactivex.core.streams.Pump pump(io.vertx.reactivex.core.streams.ReadStream<T> rs, io.vertx.reactivex.core.streams.WriteStream<T> ws, int writeQueueMaxSize) { io.vertx.reactivex.core.streams.Pump ret = io.vertx.reactivex.core.streams.Pump.newInstance(io.vertx.core.streams.Pump.pump(rs.getDelegate(), ws.getDelegate(), writeQueueMaxSize)); return ret; }
Create a new Pump with the given ReadStream and WriteStream and writeQueueMaxSize
Params:
  • rs – the read stream
  • ws – the write stream
  • writeQueueMaxSize – the max size of the write queue
Returns:the pump
/** * Create a new <code>Pump</code> with the given <code>ReadStream</code> and <code>WriteStream</code> and * <code>writeQueueMaxSize</code> * @param rs the read stream * @param ws the write stream * @param writeQueueMaxSize the max size of the write queue * @return the pump */
public static <T> io.vertx.reactivex.core.streams.Pump pump(Flowable<T> rs, io.vertx.reactivex.core.streams.WriteStream<T> ws, int writeQueueMaxSize) { io.vertx.reactivex.core.streams.Pump ret = io.vertx.reactivex.core.streams.Pump.newInstance(io.vertx.core.streams.Pump.pump(io.vertx.reactivex.impl.ReadStreamSubscriber.asReadStream(rs,java.util.function.Function.identity()).resume(), ws.getDelegate(), writeQueueMaxSize)); return ret; }
Create a new Pump with the given ReadStream and WriteStream and writeQueueMaxSize
Params:
  • rs – the read stream
  • ws – the write stream
  • writeQueueMaxSize – the max size of the write queue
Returns:the pump
/** * Create a new <code>Pump</code> with the given <code>ReadStream</code> and <code>WriteStream</code> and * <code>writeQueueMaxSize</code> * @param rs the read stream * @param ws the write stream * @param writeQueueMaxSize the max size of the write queue * @return the pump */
public static <T> io.vertx.reactivex.core.streams.Pump pump(Observable<T> rs, io.vertx.reactivex.core.streams.WriteStream<T> ws, int writeQueueMaxSize) { io.vertx.reactivex.core.streams.Pump ret = io.vertx.reactivex.core.streams.Pump.newInstance(io.vertx.core.streams.Pump.pump(io.vertx.reactivex.impl.ReadStreamSubscriber.asReadStream(rs,java.util.function.Function.identity()).resume(), ws.getDelegate(), writeQueueMaxSize)); return ret; }
Set the write queue max size to maxSize
Params:
  • maxSize – the max size
Returns:a reference to this, so the API can be used fluently
/** * Set the write queue max size to <code>maxSize</code> * @param maxSize the max size * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.Pump setWriteQueueMaxSize(int maxSize) { delegate.setWriteQueueMaxSize(maxSize); return this; }
Start the Pump. The Pump can be started and stopped multiple times.
Returns:a reference to this, so the API can be used fluently
/** * Start the Pump. The Pump can be started and stopped multiple times. * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.Pump start() { delegate.start(); return this; }
Stop the Pump. The Pump can be started and stopped multiple times.
Returns:a reference to this, so the API can be used fluently
/** * Stop the Pump. The Pump can be started and stopped multiple times. * @return a reference to this, so the API can be used fluently */
public io.vertx.reactivex.core.streams.Pump stop() { delegate.stop(); return this; }
Return the total number of items pumped by this pump.
Returns:
/** * Return the total number of items pumped by this pump. * @return */
public int numberPumped() { int ret = delegate.numberPumped(); return ret; } public static Pump newInstance(io.vertx.core.streams.Pump arg) { return arg != null ? new Pump(arg) : null; } }