package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import java.io.RandomAccessFile;
import java.net.SocketAddress;
class FileStreamChannel extends AbstractChannel {
private static final SocketAddress LOCAL_ADDRESS = new StreamSocketAddress();
private static final SocketAddress REMOTE_ADDRESS = new StreamSocketAddress();
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
private final ChannelConfig config = new DefaultChannelConfig(this);
private boolean active;
private boolean closed;
private long bytesWritten;
private final VertxHttp2Stream stream;
FileStreamChannel(
Promise<Long> result,
VertxHttp2Stream stream,
long offset,
long length) {
super(null, Id.INSTANCE);
pipeline().addLast(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof RandomAccessFile) {
ChannelFuture fut = ctx.writeAndFlush(new ChunkedFile((RandomAccessFile) evt, offset, length, 8192 ));
fut.addListener(f -> {
if (f.isSuccess()) {
result.tryComplete(bytesWritten);
} else {
result.tryFail(f.cause());
}
fut.addListener(ChannelFutureListener.CLOSE);
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
result.tryFail(cause);
}
});
}
});
this.stream = stream;
}
final Handler<Void> drainHandler = v -> {
flush();
};
@Override
protected void doRegister() throws Exception {
active = true;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return LOCAL_ADDRESS;
}
@Override
protected SocketAddress remoteAddress0() {
return REMOTE_ADDRESS;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
active = false;
closed = true;
}
@Override
protected void doBeginRead() throws Exception {
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
ByteBuf chunk;
while (!stream.isNotWritable() && (chunk = (ByteBuf) in.current()) != null) {
bytesWritten += chunk.readableBytes();
stream.writeData(chunk.retain(), false);
stream.handlerContext.flush();
in.remove();
}
}
@Override
public ChannelConfig config() {
return config;
}
@Override
public boolean isOpen() {
return !closed;
}
@Override
public boolean isActive() {
return active;
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
private static class StreamSocketAddress extends SocketAddress {
@Override
public String toString() {
return "stream";
}
}
private class DefaultUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
safeSetSuccess(promise);
}
}
static class Id implements ChannelId {
static final ChannelId INSTANCE = new Id();
private Id() { }
@Override
public String asShortText() {
return toString();
}
@Override
public String asLongText() {
return toString();
}
@Override
public int compareTo(ChannelId o) {
if (o instanceof Id) {
return 0;
}
return asLongText().compareTo(o.asLongText());
}
@Override
public int hashCode() {
return 0;
}
@Override
public boolean equals(Object obj) {
return obj instanceof Id;
}
@Override
public String toString() {
return "stream";
}
}
}