package io.vertx.core.eventbus.impl;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.streams.ReadStream;
public class BodyReadStream<T> implements ReadStream<T> {
private ReadStream<Message<T>> delegate;
public BodyReadStream(ReadStream<Message<T>> delegate) {
this.delegate = delegate;
}
@Override
public ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return null;
}
@Override
public ReadStream<T> handler(Handler<T> handler) {
if (handler != null) {
delegate.handler(message -> handler.handle(message.body()));
} else {
delegate.handler(null);
}
return this;
}
@Override
public ReadStream<T> pause() {
delegate.pause();
return this;
}
@Override
public ReadStream<T> fetch(long amount) {
delegate.fetch(amount);
return this;
}
@Override
public ReadStream<T> resume() {
delegate.resume();
return this;
}
@Override
public ReadStream<T> endHandler(Handler<Void> endHandler) {
delegate.endHandler(endHandler);
return this;
}
}