package io.vertx.sqlclient.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.NetSocketInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.sqlclient.impl.command.*;
import java.util.ArrayDeque;
import java.util.Deque;
public abstract class SocketConnectionBase implements Connection {
private static final Logger logger = LoggerFactory.getLogger(SocketConnectionBase.class);
public enum Status {
CLOSED, CONNECTED, CLOSING
}
protected final PreparedStatementCache psCache;
private final int preparedStatementCacheSqlLimit;
private final StringLongSequence psSeq = new StringLongSequence();
private final ArrayDeque<CommandBase<?>> pending = new ArrayDeque<>();
private final Context context;
private int inflight;
private Holder holder;
private final int pipeliningLimit;
protected final NetSocketInternal socket;
protected Status status = Status.CONNECTED;
public SocketConnectionBase(NetSocketInternal socket,
boolean cachePreparedStatements,
int preparedStatementCacheSize,
int preparedStatementCacheSqlLimit,
int pipeliningLimit,
Context context) {
this.socket = socket;
this.context = context;
this.pipeliningLimit = pipeliningLimit;
this.psCache = cachePreparedStatements ? new PreparedStatementCache(preparedStatementCacheSize, this) : null;
this.preparedStatementCacheSqlLimit = preparedStatementCacheSqlLimit;
}
public Context context() {
return context;
}
public void init() {
socket.closeHandler(this::handleClosed);
socket.exceptionHandler(this::handleException);
socket.messageHandler(msg -> {
try {
handleMessage(msg);
} catch (Exception e) {
handleException(e);
}
});
}
public NetSocketInternal socket() {
return socket;
}
public boolean isSsl() {
return socket.isSsl();
}
@Override
public void init(Holder holder) {
this.holder = holder;
}
@Override
public int getProcessId() {
throw new UnsupportedOperationException();
}
@Override
public int getSecretKey() {
throw new UnsupportedOperationException();
}
@Override
public void close(Holder holder) {
if (Vertx.currentContext() == context) {
if (status == Status.CONNECTED) {
status = Status.CLOSING;
pending.add(CloseConnectionCommand.INSTANCE);
checkPending();
}
} else {
context.runOnContext(v -> close(holder));
}
}
public void schedule(CommandBase<?> cmd) {
if (cmd.handler == null) {
throw new IllegalArgumentException();
}
if (Vertx.currentContext() != context) {
throw new IllegalStateException();
}
PreparedStatementCache psCache = this.psCache;
if (psCache != null && cmd instanceof PrepareStatementCommand) {
PrepareStatementCommand psCmd = (PrepareStatementCommand) cmd;
if (psCmd.sql().length() > preparedStatementCacheSqlLimit) {
return;
}
CachedPreparedStatement cached = psCache.get(psCmd.sql());
if (cached != null) {
psCmd.cached = cached;
Handler<? super CommandResponse<PreparedStatement>> handler = psCmd.handler;
cached.get(handler);
return;
} else {
if (psCache.size() >= psCache.getCapacity() && !psCache.isReady()) {
} else {
psCmd.statement = psSeq.next();
psCmd.cached = cached = new CachedPreparedStatement();
psCache.put(psCmd.sql(), cached);
Handler<? super CommandResponse<PreparedStatement>> a = psCmd.handler;
((CachedPreparedStatement) psCmd.cached).get(a);
psCmd.handler = (Handler<? super CommandResponse<PreparedStatement>>) psCmd.cached;
}
}
}
if (status == Status.CONNECTED) {
pending.add(cmd);
checkPending();
} else {
cmd.fail(new VertxException("Connection not open " + status));
}
}
static class CachedPreparedStatement implements Handler<CommandResponse<PreparedStatement>> {
private final Deque<Handler<? super CommandResponse<PreparedStatement>>> waiters = new ArrayDeque<>();
CommandResponse<PreparedStatement> resp;
void get(Handler<? super CommandResponse<PreparedStatement>> handler) {
if (resp != null) {
handler.handle(resp);
} else {
waiters.add(handler);
}
}
@Override
public void handle(CommandResponse<PreparedStatement> event) {
resp = event;
Handler<? super CommandResponse<PreparedStatement>> waiter;
while ((waiter = waiters.poll()) != null) {
waiter.handle(resp);
}
}
}
private void checkPending() {
ChannelHandlerContext ctx = socket.channelHandlerContext();
if (inflight < pipeliningLimit) {
CommandBase<?> cmd;
while (inflight < pipeliningLimit && (cmd = pending.poll()) != null) {
inflight++;
ctx.write(cmd);
}
ctx.flush();
}
}
private void handleMessage(Object msg) {
if (msg instanceof CommandResponse) {
inflight--;
checkPending();
CommandResponse resp =(CommandResponse) msg;
resp.cmd.handler.handle(msg);
} else if (msg instanceof Notification) {
handleNotification((Notification) msg);
} else if (msg instanceof Notice) {
handleNotice((Notice) msg);
}
}
private void handleNotification(Notification response) {
if (holder != null) {
holder.handleNotification(response.getProcessId(), response.getChannel(), response.getPayload());
}
}
private void handleNotice(Notice notice) {
notice.log(logger);
}
private void handleClosed(Void v) {
handleClose(null);
}
private synchronized void handleException(Throwable t) {
if (t instanceof DecoderException) {
DecoderException err = (DecoderException) t;
t = err.getCause();
}
handleClose(t);
}
private void handleClose(Throwable t) {
if (status != Status.CLOSED) {
status = Status.CLOSED;
if (t != null) {
synchronized (this) {
if (holder != null) {
holder.handleException(t);
}
}
}
Throwable cause = t == null ? new VertxException("closed") : t;
CommandBase<?> cmd;
while ((cmd = pending.poll()) != null) {
CommandBase<?> c = cmd;
context.runOnContext(v -> c.fail(cause));
}
if (holder != null) {
holder.handleClosed();
}
}
}
}