/*
* Copyright 2014 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.vertx.reactivex.core.streams;
import io.vertx.reactivex.RxHelper;
import io.vertx.reactivex.ObservableHelper;
import io.vertx.reactivex.FlowableHelper;
import io.vertx.reactivex.impl.AsyncResultMaybe;
import io.vertx.reactivex.impl.AsyncResultSingle;
import io.vertx.reactivex.impl.AsyncResultCompletable;
import io.vertx.reactivex.WriteStreamObserver;
import io.vertx.reactivex.WriteStreamSubscriber;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.Iterator;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.vertx.core.Handler;
import io.vertx.core.AsyncResult;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import io.vertx.lang.rx.RxGen;
import io.vertx.lang.rx.TypeArg;
import io.vertx.lang.rx.MappingIterator;
Pipe data from a ReadStream
to a WriteStream
and performs flow control where necessary to prevent the write stream buffer from getting overfull. Instances of this class read items from a ReadStream
and write them to a WriteStream
. If data can be read faster than it can be written this could result in the write queue of the WriteStream
growing without bound, eventually causing it to exhaust all available RAM.
To prevent this, after each write, instances of this class check whether the write queue of the WriteStream
is full, and if so, the ReadStream
is paused, and a drainHandler
is set on the WriteStream
.
When the WriteStream
has processed half of its backlog, the drainHandler
will be called, which results in the pump resuming the ReadStream
.
This class can be used to pipe from any ReadStream
to any WriteStream
, e.g. from an HttpServerRequest
to an AsyncFile
, or from NetSocket
to a WebSocket
.
Please see the documentation for more information.
NOTE: This class has been automatically generated from the original
non RX-ified interface using Vert.x codegen. /**
* Pipe data from a {@link io.vertx.reactivex.core.streams.ReadStream} to a {@link io.vertx.reactivex.core.streams.WriteStream} and performs flow control where necessary to
* prevent the write stream buffer from getting overfull.
* <p>
* Instances of this class read items from a {@link io.vertx.reactivex.core.streams.ReadStream} and write them to a {@link io.vertx.reactivex.core.streams.WriteStream}. If data
* can be read faster than it can be written this could result in the write queue of the {@link io.vertx.reactivex.core.streams.WriteStream} growing
* without bound, eventually causing it to exhaust all available RAM.
* <p>
* To prevent this, after each write, instances of this class check whether the write queue of the {@link io.vertx.reactivex.core.streams.WriteStream} is full, and if so, the {@link io.vertx.reactivex.core.streams.ReadStream} is paused, and a <code>drainHandler</code> is set on the
* {@link io.vertx.reactivex.core.streams.WriteStream}.
* <p>
* When the {@link io.vertx.reactivex.core.streams.WriteStream} has processed half of its backlog, the <code>drainHandler</code> will be
* called, which results in the pump resuming the {@link io.vertx.reactivex.core.streams.ReadStream}.
* <p>
* This class can be used to pipe from any {@link io.vertx.reactivex.core.streams.ReadStream} to any {@link io.vertx.reactivex.core.streams.WriteStream},
* e.g. from an {@link io.vertx.reactivex.core.http.HttpServerRequest} to an {@link io.vertx.reactivex.core.file.AsyncFile},
* or from {@link io.vertx.reactivex.core.net.NetSocket} to a {@link io.vertx.reactivex.core.http.WebSocket}.
* <p>
* Please see the documentation for more information.
*
* <p/>
* NOTE: This class has been automatically generated from the {@link io.vertx.core.streams.Pipe original} non RX-ified interface using Vert.x codegen.
*/
@RxGen(io.vertx.core.streams.Pipe.class)
public class Pipe<T> {
@Override
public String toString() {
return delegate.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Pipe that = (Pipe) o;
return delegate.equals(that.delegate);
}
@Override
public int hashCode() {
return delegate.hashCode();
}
public static final TypeArg<Pipe> __TYPE_ARG = new TypeArg<>( obj -> new Pipe((io.vertx.core.streams.Pipe) obj),
Pipe::getDelegate
);
private final io.vertx.core.streams.Pipe<T> delegate;
public final TypeArg<T> __typeArg_0;
public Pipe(io.vertx.core.streams.Pipe delegate) {
this.delegate = delegate;
this.__typeArg_0 = TypeArg.unknown(); }
public Pipe(Object delegate, TypeArg<T> typeArg_0) {
this.delegate = (io.vertx.core.streams.Pipe)delegate;
this.__typeArg_0 = typeArg_0;
}
public io.vertx.core.streams.Pipe getDelegate() {
return delegate;
}
Params: - end –
true
to end the stream on a source ReadStream
failure
Returns: a reference to this, so the API can be used fluently
/**
* Set to <code>true</code> to call {@link io.vertx.reactivex.core.streams.WriteStream#end} when the source <code>ReadStream</code> fails, <code>false</code> otherwise.
* @param end <code>true</code> to end the stream on a source <code>ReadStream</code> failure
* @return a reference to this, so the API can be used fluently
*/
public io.vertx.reactivex.core.streams.Pipe<T> endOnFailure(boolean end) {
delegate.endOnFailure(end);
return this;
}
Params: - end –
true
to end the stream on a source ReadStream
success
Returns: a reference to this, so the API can be used fluently
/**
* Set to <code>true</code> to call {@link io.vertx.reactivex.core.streams.WriteStream#end} when the source <code>ReadStream</code> succeeds, <code>false</code> otherwise.
* @param end <code>true</code> to end the stream on a source <code>ReadStream</code> success
* @return a reference to this, so the API can be used fluently
*/
public io.vertx.reactivex.core.streams.Pipe<T> endOnSuccess(boolean end) {
delegate.endOnSuccess(end);
return this;
}
Set to true
to call WriteStream.end
when the source ReadStream
completes, false
otherwise.
Calling this overwrites endOnFailure
and endOnSuccess
.
Params: - end –
true
to end the stream on a source ReadStream
completion
Returns: a reference to this, so the API can be used fluently
/**
* Set to <code>true</code> to call {@link io.vertx.reactivex.core.streams.WriteStream#end} when the source <code>ReadStream</code> completes, <code>false</code> otherwise.
* <p>
* Calling this overwrites {@link io.vertx.reactivex.core.streams.Pipe#endOnFailure} and {@link io.vertx.reactivex.core.streams.Pipe#endOnSuccess}.
* @param end <code>true</code> to end the stream on a source <code>ReadStream</code> completion
* @return a reference to this, so the API can be used fluently
*/
public io.vertx.reactivex.core.streams.Pipe<T> endOnComplete(boolean end) {
delegate.endOnComplete(end);
return this;
}
Start to pipe the elements to the destination WriteStream
.
When the operation fails with a write error, the source stream is resumed.
Params: - dst – the destination write stream
- completionHandler – the handler called when the pipe operation completes
/**
* Start to pipe the elements to the destination <code>WriteStream</code>.
* <p>
* When the operation fails with a write error, the source stream is resumed.
* @param dst the destination write stream
* @param completionHandler the handler called when the pipe operation completes
*/
public void to(io.vertx.reactivex.core.streams.WriteStream<T> dst, Handler<AsyncResult<Void>> completionHandler) {
delegate.to(dst.getDelegate(), completionHandler);
}
Start to pipe the elements to the destination WriteStream
.
When the operation fails with a write error, the source stream is resumed.
Params: - dst – the destination write stream
/**
* Start to pipe the elements to the destination <code>WriteStream</code>.
* <p>
* When the operation fails with a write error, the source stream is resumed.
* @param dst the destination write stream
*/
public void to(io.vertx.reactivex.core.streams.WriteStream<T> dst) {
to(dst, ar -> { });
}
Start to pipe the elements to the destination WriteStream
.
When the operation fails with a write error, the source stream is resumed.
Params: - dst – the destination write stream
Returns:
/**
* Start to pipe the elements to the destination <code>WriteStream</code>.
* <p>
* When the operation fails with a write error, the source stream is resumed.
* @param dst the destination write stream
* @return
*/
public io.reactivex.Completable rxTo(io.vertx.reactivex.core.streams.WriteStream<T> dst) {
return AsyncResultCompletable.toCompletable($handler -> {
to(dst, $handler);
});
}
Close the pipe.
The streams handlers will be unset and the read stream resumed unless it is already ended.
/**
* Close the pipe.
* <p>
* The streams handlers will be unset and the read stream resumed unless it is already ended.
*/
public void close() {
delegate.close();
}
public static <T> Pipe<T> newInstance(io.vertx.core.streams.Pipe arg) {
return arg != null ? new Pipe<T>(arg) : null;
}
public static <T> Pipe<T> newInstance(io.vertx.core.streams.Pipe arg, TypeArg<T> __typeArg_T) {
return arg != null ? new Pipe<T>(arg, __typeArg_T) : null;
}
}