package io.vertx.core.impl;
import io.vertx.core.*;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class HAManager {
private static final Logger log = LoggerFactory.getLogger(HAManager.class);
private static final long QUORUM_CHECK_PERIOD = 1000;
private final VertxInternal vertx;
private final DeploymentManager deploymentManager;
private final VerticleManager verticleFactoryManager;
private final ClusterManager clusterManager;
private final int quorumSize;
private final String group;
private final JsonObject haInfo;
private final Map<String, String> clusterMap;
private final String nodeID;
private final Queue<Runnable> toDeployOnQuorum = new ConcurrentLinkedQueue<>();
private final boolean enabled;
private long quorumTimerID;
private long checkQuorumTimerID = -1L;
private volatile boolean attainedQuorum;
private volatile FailoverCompleteHandler failoverCompleteHandler;
private volatile boolean failDuringFailover;
private volatile boolean stopped;
private volatile boolean killed;
private Consumer<Set<String>> clusterViewChangedHandler;
public HAManager(VertxInternal vertx, DeploymentManager deploymentManager, VerticleManager verticleFactoryManager, ClusterManager clusterManager,
Map<String, String> clusterMap, int quorumSize, String group, boolean enabled) {
this.vertx = vertx;
this.deploymentManager = deploymentManager;
this.verticleFactoryManager = verticleFactoryManager;
this.clusterManager = clusterManager;
this.clusterMap = clusterMap;
this.quorumSize = enabled ? quorumSize : 0;
this.group = enabled ? group : "__DISABLED__";
this.enabled = enabled;
this.haInfo = new JsonObject().put("verticles", new JsonArray()).put("group", this.group);
this.nodeID = clusterManager.getNodeId();
}
void init() {
synchronized (haInfo) {
clusterMap.put(nodeID, haInfo.encode());
}
clusterManager.nodeListener(new NodeListener() {
@Override
public void nodeAdded(String nodeID) {
HAManager.this.nodeAdded(nodeID);
}
@Override
public void nodeLeft(String leftNodeID) {
HAManager.this.nodeLeft(leftNodeID);
}
});
quorumTimerID = vertx.setPeriodic(QUORUM_CHECK_PERIOD, tid -> checkHADeployments());
synchronized (this) {
checkQuorum();
}
}
public void removeFromHA(String depID) {
Deployment dep = deploymentManager.getDeployment(depID);
if (dep == null || !dep.deploymentOptions().isHa()) {
return;
}
synchronized (haInfo) {
JsonArray haMods = haInfo.getJsonArray("verticles");
Iterator<Object> iter = haMods.iterator();
while (iter.hasNext()) {
Object obj = iter.next();
JsonObject mod = (JsonObject) obj;
if (mod.getString("dep_id").equals(depID)) {
iter.remove();
}
}
clusterMap.put(nodeID, haInfo.encode());
}
}
public void addDataToAHAInfo(String key, JsonObject value) {
synchronized (haInfo) {
haInfo.put(key, value);
clusterMap.put(nodeID, haInfo.encode());
}
}
public void deployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
if (attainedQuorum) {
doDeployVerticle(verticleName, deploymentOptions, doneHandler);
} else {
log.info("Quorum not attained. Deployment of verticle will be delayed until there's a quorum.");
addToHADeployList(verticleName, deploymentOptions, doneHandler);
}
}
public void stop() {
if (!stopped) {
if (clusterManager.isActive()) {
clusterMap.remove(nodeID);
}
long timerID = checkQuorumTimerID;
if (timerID >= 0L) {
checkQuorumTimerID = -1L;
vertx.cancelTimer(timerID);
}
vertx.cancelTimer(quorumTimerID);
stopped = true;
}
}
public void simulateKill() {
if (!stopped) {
killed = true;
CountDownLatch latch = new CountDownLatch(1);
Promise<Void> promise = Promise.promise();
clusterManager.leave(promise);
promise.future()
.onFailure(t -> log.error("Failed to leave cluster", t))
.onComplete(ar -> latch.countDown());
long timerID = checkQuorumTimerID;
if (timerID >= 0L) {
checkQuorumTimerID = -1L;
vertx.cancelTimer(timerID);
}
vertx.cancelTimer(quorumTimerID);
boolean interrupted = false;
try {
long remainingNanos = MINUTES.toNanos(1);
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
latch.await(remainingNanos, NANOSECONDS);
break;
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
stopped = true;
}
}
public void setFailoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) {
this.failoverCompleteHandler = failoverCompleteHandler;
}
public boolean isKilled() {
return killed;
}
public boolean isEnabled() {
return enabled;
}
public void failDuringFailover(boolean fail) {
failDuringFailover = fail;
}
private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
final Handler<AsyncResult<String>> wrappedHandler = ar1 -> {
vertx.<String>executeBlocking(fut -> {
if (ar1.succeeded()) {
String deploymentID = ar1.result();
addToHA(deploymentID, verticleName, deploymentOptions);
fut.complete(deploymentID);
} else {
fut.fail(ar1.cause());
}
}, false, ar2 -> {
if (doneHandler != null) {
doneHandler.handle(ar2);
} else if (ar2.failed()) {
log.error("Failed to deploy verticle", ar2.cause());
}
});
};
verticleFactoryManager.deployVerticle(verticleName, deploymentOptions).map(Deployment::deploymentID).onComplete(wrappedHandler);
}
private synchronized void nodeAdded(final String nodeID) {
addHaInfoIfLost();
checkQuorumWhenAdded(nodeID, System.currentTimeMillis());
}
private synchronized void nodeLeft(String leftNodeID) {
addHaInfoIfLost();
checkQuorum();
if (attainedQuorum) {
String sclusterInfo = clusterMap.get(leftNodeID);
if (sclusterInfo == null) {
} else {
JsonObject clusterInfo = new JsonObject(sclusterInfo);
checkFailover(leftNodeID, clusterInfo);
}
List<String> nodes = clusterManager.getNodes();
for (Map.Entry<String, String> entry: clusterMap.entrySet()) {
if (!leftNodeID.equals(entry.getKey()) && !nodes.contains(entry.getKey())) {
JsonObject haInfo = new JsonObject(entry.getValue());
checkFailover(entry.getKey(), haInfo);
}
}
}
}
private void addHaInfoIfLost() {
if (clusterManager.getNodes().contains(nodeID) && !clusterMap.containsKey(nodeID)) {
synchronized (haInfo) {
clusterMap.put(nodeID, haInfo.encode());
}
}
}
private synchronized void checkQuorumWhenAdded(final String nodeID, final long start) {
if (!stopped) {
if (clusterMap.containsKey(nodeID)) {
checkQuorum();
} else {
checkQuorumTimerID = vertx.setTimer(200, tid -> {
checkQuorumTimerID = -1L;
if (!stopped) {
vertx.executeBlockingInternal(fut -> {
if (System.currentTimeMillis() - start > 10000) {
log.warn("Timed out waiting for group information to appear");
} else {
ContextImpl.executeIsolated(v -> {
checkQuorumWhenAdded(nodeID, start);
});
}
fut.complete();
}, null);
}
});
}
}
}
private void checkQuorum() {
if (quorumSize == 0) {
this.attainedQuorum = true;
} else {
List<String> nodes = clusterManager.getNodes();
int count = 0;
for (String node : nodes) {
String json = clusterMap.get(node);
if (json != null) {
JsonObject clusterInfo = new JsonObject(json);
String group = clusterInfo.getString("group");
if (group.equals(this.group)) {
count++;
}
}
}
boolean attained = count >= quorumSize;
if (!attainedQuorum && attained) {
log.info("A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed");
this.attainedQuorum = true;
} else if (attainedQuorum && !attained) {
log.info("There is no longer a quorum. Any HA deploymentIDs will be undeployed until a quorum is re-attained");
this.attainedQuorum = false;
}
}
}
private void addToHA(String deploymentID, String verticleName, DeploymentOptions deploymentOptions) {
String encoded;
synchronized (haInfo) {
JsonObject verticleConf = new JsonObject().put("dep_id", deploymentID);
verticleConf.put("verticle_name", verticleName);
verticleConf.put("options", deploymentOptions.toJson());
JsonArray haMods = haInfo.getJsonArray("verticles");
haMods.add(verticleConf);
encoded = haInfo.encode();
clusterMap.put(nodeID, encoded);
}
}
private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
toDeployOnQuorum.add(() -> {
ContextImpl.executeIsolated(v -> {
deployVerticle(verticleName, deploymentOptions, doneHandler);
});
});
}
private void checkHADeployments() {
try {
if (attainedQuorum) {
deployHADeployments();
} else {
undeployHADeployments();
}
} catch (Throwable t) {
log.error("Failed when checking HA deploymentIDs", t);
}
}
private void undeployHADeployments() {
for (String deploymentID: deploymentManager.deployments()) {
Deployment dep = deploymentManager.getDeployment(deploymentID);
if (dep != null) {
if (dep.deploymentOptions().isHa()) {
ContextImpl.executeIsolated(v -> {
deploymentManager.undeployVerticle(deploymentID).onComplete(result -> {
if (result.succeeded()) {
log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum");
addToHADeployList(dep.verticleIdentifier(), dep.deploymentOptions(), result1 -> {
if (result1.succeeded()) {
log.info("Successfully redeployed verticle " + dep.verticleIdentifier() + " after quorum was re-attained");
} else {
log.error("Failed to redeploy verticle " + dep.verticleIdentifier() + " after quorum was re-attained", result1.cause());
}
});
} else {
log.error("Failed to undeploy deployment on lost quorum", result.cause());
}
});
});
}
}
}
}
private void deployHADeployments() {
int size = toDeployOnQuorum.size();
if (size != 0) {
log.info("There are " + size + " HA deploymentIDs waiting on a quorum. These will now be deployed");
Runnable task;
while ((task = toDeployOnQuorum.poll()) != null) {
try {
task.run();
} catch (Throwable t) {
log.error("Failed to run redeployment task", t);
}
}
}
}
private void checkFailover(String failedNodeID, JsonObject theHAInfo) {
try {
JsonArray deployments = theHAInfo.getJsonArray("verticles");
String group = theHAInfo.getString("group");
String chosen = chooseHashedNode(group, failedNodeID.hashCode());
if (chosen != null && chosen.equals(this.nodeID)) {
if (deployments != null && deployments.size() != 0) {
log.info("node" + nodeID + " says: Node " + failedNodeID + " has failed. This node will deploy " + deployments.size() + " deploymentIDs from that node.");
for (Object obj: deployments) {
JsonObject app = (JsonObject)obj;
processFailover(app);
}
}
clusterMap.remove(failedNodeID);
runOnContextAndWait(() -> {
if (failoverCompleteHandler != null) {
failoverCompleteHandler.handle(failedNodeID, theHAInfo, true);
}
});
}
} catch (Throwable t) {
log.error("Failed to handle failover", t);
runOnContextAndWait(() -> {
if (failoverCompleteHandler != null) {
failoverCompleteHandler.handle(failedNodeID, theHAInfo, false);
}
});
}
}
private void runOnContextAndWait(Runnable runnable) {
CountDownLatch latch = new CountDownLatch(1);
vertx.runOnContext(v -> {
try {
runnable.run();
} finally {
latch.countDown();
}
});
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
}
private void processFailover(JsonObject failedVerticle) {
if (failDuringFailover) {
throw new VertxException("Oops!");
}
final String verticleName = failedVerticle.getString("verticle_name");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> err = new AtomicReference<>();
ContextImpl.executeIsolated(v -> {
JsonObject options = failedVerticle.getJsonObject("options");
doDeployVerticle(verticleName, new DeploymentOptions(options), result -> {
if (result.succeeded()) {
log.info("Successfully redeployed verticle " + verticleName + " after failover");
} else {
log.error("Failed to redeploy verticle after failover", result.cause());
err.set(result.cause());
}
latch.countDown();
Throwable t = err.get();
if (t != null) {
throw new VertxException(t);
}
});
});
try {
if (!latch.await(120, TimeUnit.SECONDS)) {
throw new VertxException("Timed out waiting for redeploy on failover");
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
private String chooseHashedNode(String group, int hashCode) {
List<String> nodes = clusterManager.getNodes();
ArrayList<String> matchingMembers = new ArrayList<>();
for (String node: nodes) {
String sclusterInfo = clusterMap.get(node);
if (sclusterInfo != null) {
JsonObject clusterInfo = new JsonObject(sclusterInfo);
String memberGroup = clusterInfo.getString("group");
if (group == null || group.equals(memberGroup)) {
matchingMembers.add(node);
}
}
}
if (!matchingMembers.isEmpty()) {
long absHash = (long)hashCode + Integer.MAX_VALUE;
long lpos = absHash % matchingMembers.size();
return matchingMembers.get((int)lpos);
} else {
return null;
}
}
}