package io.vertx.core.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.ServiceHelper;
import io.vertx.core.Verticle;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class DeploymentManager {
private static final Logger log = LoggerFactory.getLogger(DeploymentManager.class);
private final VertxInternal vertx;
private final Map<String, Deployment> deployments = new ConcurrentHashMap<>();
private final Map<String, IsolatingClassLoader> classloaders = new HashMap<>();
private final Map<String, List<VerticleFactory>> verticleFactories = new ConcurrentHashMap<>();
private final List<VerticleFactory> defaultFactories = new ArrayList<>();
public DeploymentManager(VertxInternal vertx) {
this.vertx = vertx;
loadVerticleFactories();
}
private void loadVerticleFactories() {
Collection<VerticleFactory> factories = ServiceHelper.loadFactories(VerticleFactory.class);
factories.forEach(this::registerVerticleFactory);
VerticleFactory defaultFactory = new JavaVerticleFactory();
defaultFactory.init(vertx);
defaultFactories.add(defaultFactory);
}
private String generateDeploymentID() {
return UUID.randomUUID().toString();
}
public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
if (options.getInstances() < 1) {
throw new IllegalArgumentException("Can't specify < 1 instances to deploy");
}
if (options.isMultiThreaded() && !options.isWorker()) {
throw new IllegalArgumentException("If multi-threaded then must be worker too");
}
if (options.getExtraClasspath() != null) {
throw new IllegalArgumentException("Can't specify extraClasspath for already created verticle");
}
if (options.getIsolationGroup() != null) {
throw new IllegalArgumentException("Can't specify isolationGroup for already created verticle");
}
if (options.getIsolatedClasses() != null) {
throw new IllegalArgumentException("Can't specify isolatedClasses for already created verticle");
}
ContextInternal currentContext = vertx.getOrCreateContext();
ClassLoader cl = getClassLoader(options);
int nbInstances = options.getInstances();
Set<Verticle> verticles = Collections.newSetFromMap(new IdentityHashMap<>());
for (int i = 0; i < nbInstances; i++) {
Verticle verticle;
try {
verticle = verticleSupplier.get();
} catch (Exception e) {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture(e));
}
return;
}
if (verticle == null) {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture("Supplied verticle is null"));
}
return;
}
verticles.add(verticle);
}
if (verticles.size() != nbInstances) {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture("Same verticle supplied more than once"));
}
return;
}
Verticle[] verticlesArray = verticles.toArray(new Verticle[verticles.size()]);
String verticleClass = verticlesArray[0].getClass().getName();
doDeploy("java:" + verticleClass, options, currentContext, currentContext, completionHandler, cl, verticlesArray);
}
public void deployVerticle(String identifier,
DeploymentOptions options,
Handler<AsyncResult<String>> completionHandler) {
if (options.isMultiThreaded() && !options.isWorker()) {
throw new IllegalArgumentException("If multi-threaded then must be worker too");
}
ContextInternal callingContext = vertx.getOrCreateContext();
ClassLoader cl = getClassLoader(options);
doDeployVerticle(identifier, options, callingContext, callingContext, cl, completionHandler);
}
private void doDeployVerticle(String identifier,
DeploymentOptions options,
ContextInternal parentContext,
ContextInternal callingContext,
ClassLoader cl,
Handler<AsyncResult<String>> completionHandler) {
List<VerticleFactory> verticleFactories = resolveFactories(identifier);
Iterator<VerticleFactory> iter = verticleFactories.iterator();
doDeployVerticle(iter, null, identifier, options, parentContext, callingContext, cl, completionHandler);
}
private void doDeployVerticle(Iterator<VerticleFactory> iter,
Throwable prevErr,
String identifier,
DeploymentOptions options,
ContextInternal parentContext,
ContextInternal callingContext,
ClassLoader cl,
Handler<AsyncResult<String>> completionHandler) {
if (iter.hasNext()) {
VerticleFactory verticleFactory = iter.next();
Promise<String> promise = Promise.promise();
if (verticleFactory.requiresResolve()) {
try {
verticleFactory.resolve(identifier, options, cl, promise);
} catch (Exception e) {
try {
promise.fail(e);
} catch (Exception ignore) {
}
}
} else {
promise.complete(identifier);
}
promise.future().setHandler(ar -> {
Throwable err;
if (ar.succeeded()) {
String resolvedName = ar.result();
if (!resolvedName.equals(identifier)) {
try {
deployVerticle(resolvedName, options, completionHandler);
} catch (Exception e) {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture(e));
}
}
return;
} else {
if (verticleFactory.blockingCreate()) {
vertx.<Verticle[]>executeBlocking(createFut -> {
try {
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
createFut.complete(verticles);
} catch (Exception e) {
createFut.fail(e);
}
}, res -> {
if (res.succeeded()) {
doDeploy(identifier, options, parentContext, callingContext, completionHandler, cl, res.result());
} else {
doDeployVerticle(iter, res.cause(), identifier, options, parentContext, callingContext, cl, completionHandler);
}
});
return;
} else {
try {
Verticle[] verticles = createVerticles(verticleFactory, identifier, options.getInstances(), cl);
doDeploy(identifier, options, parentContext, callingContext, completionHandler, cl, verticles);
return;
} catch (Exception e) {
err = e;
}
}
}
} else {
err = ar.cause();
}
doDeployVerticle(iter, err, identifier, options, parentContext, callingContext, cl, completionHandler);
});
} else {
if (prevErr != null) {
reportFailure(prevErr, callingContext, completionHandler);
} else {
}
}
}
private Verticle[] createVerticles(VerticleFactory verticleFactory, String identifier, int instances, ClassLoader cl) throws Exception {
Verticle[] verticles = new Verticle[instances];
for (int i = 0; i < instances; i++) {
verticles[i] = verticleFactory.createVerticle(identifier, cl);
if (verticles[i] == null) {
throw new NullPointerException("VerticleFactory::createVerticle returned null");
}
}
return verticles;
}
private String getSuffix(int pos, String str) {
if (pos + 1 >= str.length()) {
throw new IllegalArgumentException("Invalid name: " + str);
}
return str.substring(pos + 1);
}
public void undeployVerticle(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
Deployment deployment = deployments.get(deploymentID);
Context currentContext = vertx.getOrCreateContext();
if (deployment == null) {
reportFailure(new IllegalStateException("Unknown deployment"), currentContext, completionHandler);
} else {
deployment.undeploy(completionHandler);
}
}
public Set<String> deployments() {
return Collections.unmodifiableSet(deployments.keySet());
}
public Deployment getDeployment(String deploymentID) {
return deployments.get(deploymentID);
}
public void undeployAll(Handler<AsyncResult<Void>> completionHandler) {
Set<String> deploymentIDs = new HashSet<>();
for (Map.Entry<String, Deployment> entry: deployments.entrySet()) {
if (!entry.getValue().isChild()) {
deploymentIDs.add(entry.getKey());
}
}
if (!deploymentIDs.isEmpty()) {
AtomicInteger count = new AtomicInteger(0);
for (String deploymentID : deploymentIDs) {
undeployVerticle(deploymentID, ar -> {
if (ar.failed()) {
log.error("Undeploy failed", ar.cause());
}
if (count.incrementAndGet() == deploymentIDs.size()) {
completionHandler.handle(Future.succeededFuture());
}
});
}
} else {
Context context = vertx.getOrCreateContext();
context.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
}
}
public void registerVerticleFactory(VerticleFactory factory) {
String prefix = factory.prefix();
if (prefix == null) {
throw new IllegalArgumentException("factory.prefix() cannot be null");
}
List<VerticleFactory> facts = verticleFactories.get(prefix);
if (facts == null) {
facts = new ArrayList<>();
verticleFactories.put(prefix, facts);
}
if (facts.contains(factory)) {
throw new IllegalArgumentException("Factory already registered");
}
facts.add(factory);
facts.sort((fact1, fact2) -> fact1.order() - fact2.order());
factory.init(vertx);
}
public void unregisterVerticleFactory(VerticleFactory factory) {
String prefix = factory.prefix();
if (prefix == null) {
throw new IllegalArgumentException("factory.prefix() cannot be null");
}
List<VerticleFactory> facts = verticleFactories.get(prefix);
boolean removed = false;
if (facts != null) {
if (facts.remove(factory)) {
removed = true;
}
if (facts.isEmpty()) {
verticleFactories.remove(prefix);
}
}
if (!removed) {
throw new IllegalArgumentException("factory isn't registered");
}
}
public Set<VerticleFactory> verticleFactories() {
Set<VerticleFactory> facts = new HashSet<>();
for (List<VerticleFactory> list: verticleFactories.values()) {
facts.addAll(list);
}
return facts;
}
private List<VerticleFactory> resolveFactories(String identifier) {
List<VerticleFactory> factoryList = null;
int pos = identifier.indexOf(':');
String lookup = null;
if (pos != -1) {
lookup = identifier.substring(0, pos);
} else {
pos = identifier.lastIndexOf('.');
if (pos != -1) {
lookup = getSuffix(pos, identifier);
} else {
factoryList = defaultFactories;
}
}
if (factoryList == null) {
factoryList = verticleFactories.get(lookup);
if (factoryList == null) {
factoryList = defaultFactories;
}
}
return factoryList;
}
private static URL mapToURL(String path) {
try {
return new URL(path);
} catch (MalformedURLException e) {
try {
return new File(path).toURI().toURL();
} catch (MalformedURLException e1) {
throw new IllegalArgumentException(e1);
}
}
}
private static List<URL> () {
List<URL> classpathURLs = new ArrayList<>();
String classpath = System.getProperty("java.class.path");
if (Objects.nonNull(classpath)) {
for (String path : classpath.split(File.pathSeparator)) {
classpathURLs.add(mapToURL(path));
}
}
return classpathURLs;
}
static List<URL> (ClassLoader current) {
List<URL> classpathURLs = new ArrayList<>();
String searchFile = "META-INF/MANIFEST.MF";
Enumeration<URL> urls;
try {
urls = current.getResources(searchFile);
} catch (IOException e) {
throw new IllegalStateException(e);
}
for (URL url : Collections.list(urls)) {
String urlString = url.toExternalForm();
if ("jar".equals(url.getProtocol().toLowerCase())) {
String prefix = "jar:";
String suffix = "!/" + searchFile;
urlString = urlString.replace(prefix, "").replace(suffix, "").trim();
}
urlString = urlString.replace(searchFile, "").trim();
try {
classpathURLs.add(new URL(urlString));
} catch (MalformedURLException e) {
throw new IllegalStateException(e);
}
}
return classpathURLs;
}
static List<URL> (ClassLoader current) {
if (current instanceof URLClassLoader) {
URLClassLoader urlc = (URLClassLoader)current;
return Arrays.asList(urlc.getURLs());
} else {
List<URL> classpathURLs = extractCPFromProperty();
for (URL url : extractCPByManifest(current)) {
if (!classpathURLs.contains(url)) {
classpathURLs.add(url);
}
}
return classpathURLs;
}
}
private ClassLoader getClassLoader(DeploymentOptions options) {
String isolationGroup = options.getIsolationGroup();
ClassLoader cl;
if (isolationGroup == null) {
cl = getCurrentClassLoader();
} else {
synchronized (this) {
IsolatingClassLoader icl = classloaders.get(isolationGroup);
if (icl == null) {
ClassLoader current = getCurrentClassLoader();
List<URL> urls = new ArrayList<>();
List<String> extraClasspath = options.getExtraClasspath();
if (extraClasspath != null) {
for (String pathElement: extraClasspath) {
urls.add(mapToURL(pathElement));
}
}
urls.addAll(extractClasspath(current));
icl = new IsolatingClassLoader(urls.toArray(new URL[urls.size()]), getCurrentClassLoader(),
options.getIsolatedClasses());
classloaders.put(isolationGroup, icl);
} else {
icl.refCount++;
}
cl = icl;
}
}
return cl;
}
private ClassLoader getCurrentClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = getClass().getClassLoader();
}
return cl;
}
private <T> void reportFailure(Throwable t, Context context, Handler<AsyncResult<T>> completionHandler) {
if (completionHandler != null) {
reportResult(context, completionHandler, Future.failedFuture(t));
} else {
log.error(t.getMessage(), t);
}
}
private <T> void reportSuccess(T result, Context context, Handler<AsyncResult<T>> completionHandler) {
if (completionHandler != null) {
reportResult(context, completionHandler, Future.succeededFuture(result));
}
}
private <T> void reportResult(Context context, Handler<AsyncResult<T>> completionHandler, AsyncResult<T> result) {
context.runOnContext(v -> {
try {
completionHandler.handle(result);
} catch (Throwable t) {
log.error("Failure in calling handler", t);
throw t;
}
});
}
private void doDeploy(String identifier,
DeploymentOptions options,
ContextInternal parentContext,
ContextInternal callingContext,
Handler<AsyncResult<String>> completionHandler,
ClassLoader tccl, Verticle... verticles) {
JsonObject conf = options.getConfig() == null ? new JsonObject() : options.getConfig().copy();
String poolName = options.getWorkerPoolName();
Deployment parent = parentContext.getDeployment();
String deploymentID = generateDeploymentID();
DeploymentImpl deployment = new DeploymentImpl(parent, deploymentID, identifier, options);
AtomicInteger deployCount = new AtomicInteger();
AtomicBoolean failureReported = new AtomicBoolean();
for (Verticle verticle: verticles) {
WorkerExecutorInternal workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize(), options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
ContextImpl context = (ContextImpl) (options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl));
if (workerExec != null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Promise<Void> startPromise = Promise.promise();
Future<Void> startFuture = startPromise.future();
verticle.start(startPromise);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
if (parent.addChild(deployment)) {
deployment.child = true;
} else {
deployment.undeploy(null);
return;
}
}
VertxMetrics metrics = vertx.metricsSPI();
if (metrics != null) {
metrics.verticleDeployed(verticle);
}
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (failureReported.compareAndSet(false, true)) {
deployment.rollback(callingContext, completionHandler, context, ar.cause());
}
});
} catch (Throwable t) {
if (failureReported.compareAndSet(false, true))
deployment.rollback(callingContext, completionHandler, context, t);
}
});
}
}
static class VerticleHolder {
final Verticle verticle;
final ContextImpl context;
VerticleHolder(Verticle verticle, ContextImpl context) {
this.verticle = verticle;
this.context = context;
}
}
private class DeploymentImpl implements Deployment {
private static final int ST_DEPLOYED = 0, ST_UNDEPLOYING = 1, ST_UNDEPLOYED = 2;
private final Deployment parent;
private final String deploymentID;
private final String verticleIdentifier;
private final List<VerticleHolder> verticles = new CopyOnWriteArrayList<>();
private final Set<Deployment> children = new ConcurrentHashSet<>();
private final DeploymentOptions options;
private int status = ST_DEPLOYED;
private volatile boolean child;
private DeploymentImpl(Deployment parent, String deploymentID, String verticleIdentifier, DeploymentOptions options) {
this.parent = parent;
this.deploymentID = deploymentID;
this.verticleIdentifier = verticleIdentifier;
this.options = options;
}
public void addVerticle(VerticleHolder holder) {
verticles.add(holder);
}
private synchronized void rollback(ContextInternal callingContext, Handler<AsyncResult<String>> completionHandler, ContextImpl context, Throwable cause) {
if (status == ST_DEPLOYED) {
status = ST_UNDEPLOYING;
doUndeployChildren(callingContext, childrenResult -> {
synchronized (DeploymentImpl.this) {
status = ST_UNDEPLOYED;
}
if (childrenResult.failed()) {
reportFailure(cause, callingContext, completionHandler);
} else {
context.runCloseHooks(closeHookAsyncResult -> reportFailure(cause, callingContext, completionHandler));
}
});
}
}
@Override
public void undeploy(Handler<AsyncResult<Void>> completionHandler) {
ContextInternal currentContext = vertx.getOrCreateContext();
doUndeploy(currentContext, completionHandler);
}
private synchronized void doUndeployChildren(ContextInternal undeployingContext, Handler<AsyncResult<Void>> completionHandler) {
if (!children.isEmpty()) {
final int size = children.size();
AtomicInteger childCount = new AtomicInteger();
boolean undeployedSome = false;
for (Deployment childDeployment: new HashSet<>(children)) {
undeployedSome = true;
childDeployment.doUndeploy(undeployingContext, ar -> {
children.remove(childDeployment);
if (ar.failed()) {
reportFailure(ar.cause(), undeployingContext, completionHandler);
} else if (childCount.incrementAndGet() == size) {
completionHandler.handle(Future.succeededFuture());
}
});
}
if (!undeployedSome) {
completionHandler.handle(Future.succeededFuture());
}
} else {
completionHandler.handle(Future.succeededFuture());
}
}
public synchronized void doUndeploy(ContextInternal undeployingContext, Handler<AsyncResult<Void>> completionHandler) {
if (status == ST_UNDEPLOYED) {
reportFailure(new IllegalStateException("Already undeployed"), undeployingContext, completionHandler);
return;
}
if (!children.isEmpty()) {
status = ST_UNDEPLOYING;
doUndeployChildren(undeployingContext, ar -> {
if (ar.failed()) {
reportFailure(ar.cause(), undeployingContext, completionHandler);
} else {
doUndeploy(undeployingContext, completionHandler);
}
});
} else {
status = ST_UNDEPLOYED;
AtomicInteger undeployCount = new AtomicInteger();
int numToUndeploy = verticles.size();
if (parent != null) {
parent.removeChild(this);
}
for (VerticleHolder verticleHolder: verticles) {
ContextImpl context = verticleHolder.context;
context.runOnContext(v -> {
Promise<Void> stopPromise = Promise.promise();
Future<Void> stopFuture = stopPromise.future();
AtomicBoolean failureReported = new AtomicBoolean();
stopFuture.setHandler(ar -> {
deployments.remove(deploymentID);
VertxMetrics metrics = vertx.metricsSPI();
if (metrics != null) {
metrics.verticleUndeployed(verticleHolder.verticle);
}
context.runCloseHooks(ar2 -> {
if (ar2.failed()) {
log.error("Failed to run close hook", ar2.cause());
}
String group = options.getIsolationGroup();
if (group != null) {
synchronized (DeploymentManager.this) {
IsolatingClassLoader icl = classloaders.get(group);
if (--icl.refCount == 0) {
classloaders.remove(group);
try {
icl.close();
} catch (IOException e) {
log.debug("Issue when closing isolation group loader", e);
}
}
}
}
if (ar.succeeded() && undeployCount.incrementAndGet() == numToUndeploy) {
reportSuccess(null, undeployingContext, completionHandler);
} else if (ar.failed() && !failureReported.get()) {
failureReported.set(true);
reportFailure(ar.cause(), undeployingContext, completionHandler);
}
});
});
try {
verticleHolder.verticle.stop(stopPromise);
} catch (Throwable t) {
stopPromise.fail(t);
}
});
}
}
}
@Override
public String verticleIdentifier() {
return verticleIdentifier;
}
@Override
public DeploymentOptions deploymentOptions() {
return options;
}
@Override
public synchronized boolean addChild(Deployment deployment) {
if (status == ST_DEPLOYED) {
children.add(deployment);
return true;
} else {
return false;
}
}
@Override
public void removeChild(Deployment deployment) {
children.remove(deployment);
}
@Override
public Set<Verticle> getVerticles() {
Set<Verticle> verts = new HashSet<>();
for (VerticleHolder holder: verticles) {
verts.add(holder.verticle);
}
return verts;
}
@Override
public boolean isChild() {
return child;
}
@Override
public String deploymentID() {
return deploymentID;
}
}
}