package io.vertx.core.eventbus.impl.clustered;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.NetClientImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.spi.metrics.EventBusMetrics;
import java.util.ArrayDeque;
import java.util.Queue;
class ConnectionHolder {
private static final Logger log = LoggerFactory.getLogger(ConnectionHolder.class);
private static final String PING_ADDRESS = "__vertx_ping";
private final ClusteredEventBus eventBus;
private final NetClient client;
private final ServerID serverID;
private final Vertx vertx;
private final EventBusMetrics metrics;
private Queue<ClusteredMessage> pending;
private NetSocket socket;
private boolean connected;
private long timeoutID = -1;
private long pingTimeoutID = -1;
ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID, EventBusOptions options) {
this.eventBus = eventBus;
this.serverID = serverID;
this.vertx = eventBus.vertx();
this.metrics = eventBus.getMetrics();
NetClientOptions clientOptions = new NetClientOptions(options.toJson());
ClusteredEventBus.setCertOptions(clientOptions, options.getKeyCertOptions());
ClusteredEventBus.setTrustOptions(clientOptions, options.getTrustOptions());
client = new NetClientImpl(eventBus.vertx(), clientOptions, false);
}
synchronized void connect() {
if (connected) {
throw new IllegalStateException("Already connected");
}
client.connect(serverID.port, serverID.host, res -> {
if (res.succeeded()) {
connected(res.result());
} else {
log.warn("Connecting to server " + serverID + " failed", res.cause());
close(res.cause());
}
});
}
synchronized void writeMessage(ClusteredMessage message) {
if (connected) {
Buffer data = message.encodeToWire();
if (metrics != null) {
metrics.messageWritten(message.address(), data.length());
}
socket.write(data, message.writeHandler());
} else {
if (pending == null) {
if (log.isDebugEnabled()) {
log.debug("Not connected to server " + serverID + " - starting queuing");
}
pending = new ArrayDeque<>();
}
pending.add(message);
}
}
void close() {
close(ConnectionBase.CLOSED_EXCEPTION);
}
private void close(Throwable cause) {
if (timeoutID != -1) {
vertx.cancelTimer(timeoutID);
}
if (pingTimeoutID != -1) {
vertx.cancelTimer(pingTimeoutID);
}
synchronized (this) {
ClusteredMessage<?, ?> msg;
if (pending != null) {
Future<Void> failure = Future.failedFuture(cause);
while ((msg = pending.poll()) != null) {
Handler<AsyncResult<Void>> handler = msg.writeHandler();
if (handler != null) {
handler.handle(failure);
}
}
}
}
try {
client.close();
} catch (Exception ignore) {
}
if (eventBus.connections().remove(serverID, this)) {
if (log.isDebugEnabled()) {
log.debug("Cluster connection closed for server " + serverID);
}
}
}
private void schedulePing() {
EventBusOptions options = eventBus.options();
pingTimeoutID = vertx.setTimer(options.getClusterPingInterval(), id1 -> {
timeoutID = vertx.setTimer(options.getClusterPingReplyInterval(), id2 -> {
log.warn("No pong from server " + serverID + " - will consider it dead");
close();
});
ClusteredMessage pingMessage =
new ClusteredMessage<>(serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, eventBus, null);
Buffer data = pingMessage.encodeToWire();
socket.write(data);
});
}
private synchronized void connected(NetSocket socket) {
this.socket = socket;
connected = true;
socket.exceptionHandler(err -> {
close(err);
});
socket.closeHandler(v -> close());
socket.handler(data -> {
vertx.cancelTimer(timeoutID);
schedulePing();
});
schedulePing();
if (pending != null) {
if (log.isDebugEnabled()) {
log.debug("Draining the queue for server " + serverID);
}
for (ClusteredMessage message : pending) {
Buffer data = message.encodeToWire();
if (metrics != null) {
metrics.messageWritten(message.address(), data.length());
}
socket.write(data, message.writeHandler());
}
}
pending = null;
}
}