package io.vertx.pgclient.impl.codec;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.command.CommandResponse;
import io.vertx.core.VertxException;
import java.util.ArrayDeque;
import java.util.Iterator;
public class PgCodec extends CombinedChannelDuplexHandler<PgDecoder, PgEncoder> {
private final ArrayDeque<PgCommandCodec<?, ?>> inflight = new ArrayDeque<>();
public PgCodec() {
PgDecoder decoder = new PgDecoder(inflight);
PgEncoder encoder = new PgEncoder(decoder, inflight);
init(decoder, encoder);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
fail(ctx, cause);
super.exceptionCaught(ctx, cause);
}
private void fail(ChannelHandlerContext ctx, Throwable cause) {
for (Iterator<PgCommandCodec<?, ?>> it = inflight.iterator(); it.hasNext();) {
PgCommandCodec<?, ?> codec = it.next();
it.remove();
CommandResponse<Object> failure = CommandResponse.failure(cause);
failure.cmd = (CommandBase) codec.cmd;
ctx.fireChannelRead(failure);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
fail(ctx, new VertxException("closed"));
super.channelInactive(ctx);
}
}