package io.vertx.core.net.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.vertx.core.Handler;
import java.util.function.Function;
public final class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler {
public static <C extends ConnectionBase> VertxHandler<C> create(Function<ChannelHandlerContext, C> connectionFactory) {
return new VertxHandler<>(connectionFactory);
}
private final Function<ChannelHandlerContext, C> connectionFactory;
private C conn;
private Handler<C> addHandler;
private Handler<C> removeHandler;
private VertxHandler(Function<ChannelHandlerContext, C> connectionFactory) {
this.connectionFactory = connectionFactory;
}
public static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocator) {
return safeBuffer(holder.content(), allocator);
}
public static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
if (buf == Unpooled.EMPTY_BUFFER) {
return buf;
}
if (buf.isDirect() || buf instanceof CompositeByteBuf) {
try {
if (buf.isReadable()) {
ByteBuf buffer = allocator.heapBuffer(buf.readableBytes());
buffer.writeBytes(buf);
return buffer;
} else {
return Unpooled.EMPTY_BUFFER;
}
} finally {
buf.release();
}
}
return buf;
}
private void setConnection(C connection) {
conn = connection;
if (addHandler != null) {
addHandler.handle(connection);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
setConnection(connectionFactory.apply(ctx));
}
public VertxHandler<C> addHandler(Handler<C> handler) {
this.addHandler = handler;
return this;
}
public VertxHandler<C> removeHandler(Handler<C> handler) {
this.removeHandler = handler;
return this;
}
public C getConnection() {
return conn;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
C conn = getConnection();
conn.handleInterestedOpsChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) {
C connection = getConnection();
if (connection != null) {
connection.handleException(t);
}
chctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext chctx) {
if (removeHandler != null) {
removeHandler.handle(conn);
}
conn.handleClosed();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
conn.endReadAndFlush();
}
@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) {
conn.read(msg);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
conn.close(promise);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
conn.handleIdle();
} else {
ctx.fireUserEventTriggered(evt);
}
}
}