package io.vertx.pgclient.impl;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.pgclient.PgNotification;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.SqlConnectionImpl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
public class PgConnectionImpl extends SqlConnectionImpl<PgConnectionImpl> implements PgConnection {
public static void connect(Vertx vertx, PgConnectOptions options, Handler<AsyncResult<PgConnection>> handler) {
Context ctx = Vertx.currentContext();
if (ctx != null) {
PgConnectionFactory client = new PgConnectionFactory(ctx, false, options);
client.connectAndInit(ar -> {
if (ar.succeeded()) {
Connection conn = ar.result();
PgConnectionImpl p = new PgConnectionImpl(client, ctx, conn);
conn.init(p);
handler.handle(Future.succeededFuture(p));
} else {
handler.handle(Future.failedFuture(ar.cause()));
}
});
} else {
vertx.runOnContext(v -> {
if (options.isUsingDomainSocket() && !vertx.isNativeTransportEnabled()) {
handler.handle(Future.failedFuture("Native transport is not available"));
} else {
connect(vertx, options, handler);
}
});
}
}
private final PgConnectionFactory factory;
private volatile Handler<PgNotification> notificationHandler;
public PgConnectionImpl(PgConnectionFactory factory, Context context, Connection conn) {
super(context, conn);
this.factory = factory;
}
@Override
public PgConnection notificationHandler(Handler<PgNotification> handler) {
notificationHandler = handler;
return this;
}
public void handleNotification(int processId, String channel, String payload) {
Handler<PgNotification> handler = notificationHandler;
if (handler != null) {
handler.handle(new PgNotification().setProcessId(processId).setChannel(channel).setPayload(payload));
}
}
@Override
public int processId() {
return conn.getProcessId();
}
@Override
public int secretKey() {
return conn.getSecretKey();
}
@Override
public PgConnection cancelRequest(Handler<AsyncResult<Void>> handler) {
Context current = Vertx.currentContext();
if (current == context) {
factory.connect(ar -> {
if (ar.succeeded()) {
PgSocketConnection conn = ar.result();
conn.sendCancelRequestMessage(this.processId(), this.secretKey(), handler);
} else {
handler.handle(Future.failedFuture(ar.cause()));
}
});
} else {
context.runOnContext(v -> cancelRequest(handler));
}
return this;
}
}