package io.vertx.core.net.impl;
import io.netty.buffer.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import java.util.function.Function;
public final class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler {
public static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocator) {
return safeBuffer(holder.content(), allocator);
}
public static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
if (buf == Unpooled.EMPTY_BUFFER) {
return buf;
}
if (buf.isDirect() || buf instanceof CompositeByteBuf) {
try {
if (buf.isReadable()) {
ByteBuf buffer = allocator.heapBuffer(buf.readableBytes());
buffer.writeBytes(buf);
return buffer;
} else {
return Unpooled.EMPTY_BUFFER;
}
} finally {
buf.release();
}
}
return buf;
}
private static final Handler<Object> NULL_HANDLER = m -> { };
public static <C extends ConnectionBase> VertxHandler<C> create(C connection) {
return create(connection.context, ctx -> connection);
}
public static <C extends ConnectionBase> VertxHandler<C> create(ContextInternal context, Function<ChannelHandlerContext, C> connectionFactory) {
return new VertxHandler<>(context, connectionFactory);
}
private final Function<ChannelHandlerContext, C> connectionFactory;
private final ContextInternal context;
private C conn;
private Handler<C> addHandler;
private Handler<C> removeHandler;
private Handler<Object> messageHandler;
private VertxHandler(ContextInternal context, Function<ChannelHandlerContext, C> connectionFactory) {
this.context = context;
this.connectionFactory = connectionFactory;
}
private void setConnection(C connection) {
conn = connection;
messageHandler = ((ConnectionBase)conn)::handleMessage;
if (addHandler != null) {
addHandler.handle(connection);
}
}
void fail(Throwable error) {
messageHandler = NULL_HANDLER;
conn.chctx.pipeline().fireExceptionCaught(error);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
setConnection(connectionFactory.apply(ctx));
}
public VertxHandler<C> addHandler(Handler<C> handler) {
this.addHandler = handler;
return this;
}
public VertxHandler<C> removeHandler(Handler<C> handler) {
this.removeHandler = handler;
return this;
}
public C getConnection() {
return conn;
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
C conn = getConnection();
context.executeFromIO(v -> conn.handleInterestedOpsChanged());
}
@Override
public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) throws Exception {
Channel ch = chctx.channel();
C connection = getConnection();
if (connection != null) {
context.executeFromIO(v -> {
try {
if (ch.isOpen()) {
ch.close();
}
} catch (Throwable ignore) {
}
connection.handleException(t);
});
} else {
ch.close();
}
}
@Override
public void channelInactive(ChannelHandlerContext chctx) throws Exception {
if (removeHandler != null) {
removeHandler.handle(conn);
}
context.executeFromIO(v -> conn.handleClosed());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
conn.endReadAndFlush();
}
@Override
public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
conn.setRead();
context.executeFromIO(msg, messageHandler);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) {
context.executeFromIO(v -> conn.handleIdle());
} else {
ctx.fireUserEventTriggered(evt);
}
}
}