package io.vertx.core.impl.launcher.commands;
import io.vertx.core.*;
import io.vertx.core.cli.annotations.*;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.impl.launcher.VertxLifecycleHooks;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.launcher.ExecutionContext;
import java.io.File;
import java.io.FileNotFoundException;
import java.lang.reflect.Method;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Objects;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@Summary("Creates a bare instance of vert.x.")
@Description("This command launches a vert.x instance but do not deploy any verticles. It will " +
"receive a verticle if another node of the cluster dies.")
@Name("bare")
public class BareCommand extends ClasspathHandler {
public static final String VERTX_OPTIONS_PROP_PREFIX = "vertx.options.";
public static final String DEPLOYMENT_OPTIONS_PROP_PREFIX = "vertx.deployment.options.";
public static final String METRICS_OPTIONS_PROP_PREFIX = "vertx.metrics.options.";
protected Vertx vertx;
protected int clusterPort;
protected String clusterHost;
protected int clusterPublicPort;
protected String clusterPublicHost;
protected int quorum;
protected String haGroup;
protected String vertxOptions;
protected VertxOptions options;
protected Runnable finalAction;
@Option(longName = "quorum", argName = "q")
@Description("Used in conjunction with -ha this specifies the minimum number of nodes in the cluster for any HA " +
"deploymentIDs to be active. Defaults to 1.")
@DefaultValue("-1")
public void setQuorum(int quorum) {
this.quorum = quorum;
}
@Option(longName = "hagroup", argName = "group")
@Description("used in conjunction with -ha this specifies the HA group this node will join. There can be multiple " +
"HA groups in a cluster. Nodes will only failover to other nodes in the same group. Defaults to '__DEFAULT__'.")
@DefaultValue("__DEFAULT__")
public void setHAGroup(String group) {
this.haGroup = group;
}
@Option(longName = "cluster-port", argName = "port")
@Description("Port to use for cluster communication. Default is 0 which means choose a spare random port.")
@DefaultValue("0")
public void setClusterPort(int port) {
this.clusterPort = port;
}
@Option(longName = "cluster-host", argName = "host")
@Description("host to bind to for cluster communication. If this is not specified vert.x will attempt to choose one" +
" from the available interfaces.")
public void setClusterHost(String host) {
this.clusterHost = host;
}
@Option(longName = "cluster-public-port", argName = "public-port")
@Description("Public port to use for cluster communication. Default is -1 which means same as cluster port.")
@DefaultValue("-1")
public void setClusterPublicPort(int port) {
this.clusterPublicPort = port;
}
@Option(longName = "cluster-public-host", argName = "public-host")
@Description("Public host to bind to for cluster communication. If not specified, Vert.x will use the same as cluster host.")
public void setClusterPublicHost(String host) {
this.clusterPublicHost = host;
}
@Option(longName = "options", argName = "options")
@Description("Specifies the Vert.x options. It should reference either a JSON file which represents the options OR be a JSON string.")
public void setVertxOptions(String vertxOptions) {
if (vertxOptions != null) {
this.vertxOptions = vertxOptions.trim()
.replaceAll("^\"|\"$", "")
.replaceAll("^'|'$", "");
} else {
this.vertxOptions = null;
}
}
public boolean isClustered() {
return true;
}
public boolean getHA() {
return true;
}
@Override
public void run() {
this.run(null);
}
public void run(Runnable action) {
this.finalAction = action;
vertx = startVertx();
}
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
protected Vertx startVertx() {
JsonObject optionsJson = getJsonFromFileOrString(vertxOptions, "options");
if (optionsJson == null) {
MetricsOptions metricsOptions = getMetricsOptions();
options = new VertxOptions().setMetricsOptions(metricsOptions);
} else {
MetricsOptions metricsOptions = getMetricsOptions(optionsJson.getJsonObject("metricsOptions"));
options = new VertxOptions(optionsJson).setMetricsOptions(metricsOptions);
}
configureFromSystemProperties(options, VERTX_OPTIONS_PROP_PREFIX);
beforeStartingVertx(options);
Vertx instance;
if (isClustered()) {
log.info("Starting clustering...");
EventBusOptions eventBusOptions = options.getEventBusOptions();
if (!Objects.equals(eventBusOptions.getHost(), EventBusOptions.DEFAULT_CLUSTER_HOST)) {
clusterHost = eventBusOptions.getHost();
}
if (eventBusOptions.getPort() != EventBusOptions.DEFAULT_CLUSTER_PORT) {
clusterPort = eventBusOptions.getPort();
}
if (!Objects.equals(eventBusOptions.getClusterPublicHost(), EventBusOptions.DEFAULT_CLUSTER_PUBLIC_HOST)) {
clusterPublicHost = eventBusOptions.getClusterPublicHost();
}
if (eventBusOptions.getClusterPublicPort() != EventBusOptions.DEFAULT_CLUSTER_PUBLIC_PORT) {
clusterPublicPort = eventBusOptions.getClusterPublicPort();
}
if (clusterHost == null) {
clusterHost = getDefaultAddress();
if (clusterHost == null) {
log.error("Unable to find a default network interface for clustering. Please specify one using -cluster-host");
return null;
} else {
log.info("No cluster-host specified so using address " + clusterHost);
}
}
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<AsyncResult<Vertx>> result = new AtomicReference<>();
eventBusOptions.setClustered(true)
.setHost(clusterHost).setPort(clusterPort)
.setClusterPublicHost(clusterPublicHost);
if (clusterPublicPort != -1) {
eventBusOptions.setClusterPublicPort(clusterPublicPort);
}
if (getHA()) {
options.setHAEnabled(true);
if (haGroup != null) {
options.setHAGroup(haGroup);
}
if (quorum != -1) {
options.setQuorumSize(quorum);
}
}
create(options, ar -> {
result.set(ar);
latch.countDown();
});
try {
if (!latch.await(2, TimeUnit.MINUTES)) {
log.error("Timed out in starting clustered Vert.x");
return null;
}
} catch (InterruptedException e) {
log.error("Thread interrupted in startup");
Thread.currentThread().interrupt();
return null;
}
if (result.get().failed()) {
log.error("Failed to form cluster");
result.get().cause().printStackTrace();
return null;
}
instance = result.get().result();
} else {
instance = create(options);
}
addShutdownHook(instance, log, finalAction);
afterStartingVertx(instance);
return instance;
}
protected JsonObject getJsonFromFileOrString(String jsonFileOrString, String argName) {
JsonObject conf;
if (jsonFileOrString != null) {
try (Scanner scanner = new Scanner(new File(jsonFileOrString), "UTF-8").useDelimiter("\\A")) {
String sconf = scanner.next();
try {
conf = new JsonObject(sconf);
} catch (DecodeException e) {
log.error("Configuration file " + sconf + " does not contain a valid JSON object");
return null;
}
} catch (FileNotFoundException e) {
try {
conf = new JsonObject(jsonFileOrString);
} catch (DecodeException e2) {
log.error("The -" + argName + " argument does not point to an existing file or is not a valid JSON object");
e2.printStackTrace();
return null;
}
}
} else {
conf = null;
}
return conf;
}
protected void afterStartingVertx(Vertx instance) {
Object main = executionContext.main();
if (main instanceof VertxLifecycleHooks) {
((VertxLifecycleHooks) main).afterStartingVertx(instance);
}
}
protected void beforeStartingVertx(VertxOptions options) {
Object main = executionContext.main();
if (main instanceof VertxLifecycleHooks) {
((VertxLifecycleHooks) main).beforeStartingVertx(options);
}
}
protected MetricsOptions getMetricsOptions() {
return getMetricsOptions(null);
}
protected MetricsOptions getMetricsOptions(JsonObject jsonObject) {
MetricsOptions metricsOptions;
VertxMetricsFactory factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class);
if (factory != null) {
metricsOptions = jsonObject == null ? factory.newOptions() : factory.newOptions(jsonObject);
} else {
metricsOptions = jsonObject == null ? new MetricsOptions() : new MetricsOptions(jsonObject);
}
configureFromSystemProperties(metricsOptions, METRICS_OPTIONS_PROP_PREFIX);
return metricsOptions;
}
protected void configureFromSystemProperties(Object options, String prefix) {
Properties props = System.getProperties();
Enumeration e = props.propertyNames();
while (e.hasMoreElements()) {
String propName = (String) e.nextElement();
String propVal = props.getProperty(propName);
if (propName.startsWith(prefix)) {
String fieldName = propName.substring(prefix.length());
Method setter = getSetter(fieldName, options.getClass());
if (setter == null) {
log.warn("No such property to configure on options: " + options.getClass().getName() + "." + fieldName);
continue;
}
Class<?> argType = setter.getParameterTypes()[0];
Object arg;
try {
if (argType.equals(String.class)) {
arg = propVal;
} else if (argType.equals(int.class)) {
arg = Integer.valueOf(propVal);
} else if (argType.equals(long.class)) {
arg = Long.valueOf(propVal);
} else if (argType.equals(boolean.class)) {
arg = Boolean.valueOf(propVal);
} else if (argType.isEnum()){
arg = Enum.valueOf((Class<? extends Enum>)argType, propVal);
} else {
log.warn("Invalid type for setter: " + argType);
continue;
}
} catch (IllegalArgumentException e2) {
log.warn("Invalid argtype:" + argType + " on options: " + options.getClass().getName() + "." + fieldName);
continue;
}
try {
setter.invoke(options, arg);
} catch (Exception ex) {
throw new VertxException("Failed to invoke setter: " + setter, ex);
}
}
}
}
private Method getSetter(String fieldName, Class<?> clazz) {
Method[] meths = clazz.getDeclaredMethods();
for (Method meth : meths) {
if (("set" + fieldName).toLowerCase().equals(meth.getName().toLowerCase())) {
return meth;
}
}
meths = clazz.getMethods();
for (Method meth : meths) {
if (("set" + fieldName).toLowerCase().equals(meth.getName().toLowerCase())) {
return meth;
}
}
return null;
}
protected static void addShutdownHook(Vertx vertx, Logger log, Runnable action) {
Runtime.getRuntime().addShutdownHook(new Thread(getTerminationRunnable(vertx, log, action)));
}
public static Runnable getTerminationRunnable(Vertx vertx, Logger log, Runnable action) {
return () -> {
CountDownLatch latch = new CountDownLatch(1);
if (vertx != null) {
vertx.close(ar -> {
if (!ar.succeeded()) {
log.error("Failure in stopping Vert.x", ar.cause());
}
latch.countDown();
});
try {
if (!latch.await(2, TimeUnit.MINUTES)) {
log.error("Timed out waiting to undeploy all");
}
if (action != null) {
action.run();
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
};
}
protected String getDefaultAddress() {
Enumeration<NetworkInterface> nets;
try {
nets = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
return null;
}
NetworkInterface netinf;
while (nets.hasMoreElements()) {
netinf = nets.nextElement();
Enumeration<InetAddress> addresses = netinf.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (!address.isAnyLocalAddress() && !address.isMulticastAddress()
&& !(address instanceof Inet6Address)) {
return address.getHostAddress();
}
}
}
return null;
}
public void setExecutionContext(ExecutionContext context) {
this.executionContext = context;
}
public synchronized Vertx vertx() {
return vertx;
}
}