package io.vertx.servicediscovery.impl;
import io.vertx.core.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.servicediscovery.*;
import io.vertx.servicediscovery.spi.ServiceDiscoveryBackend;
import io.vertx.servicediscovery.spi.ServiceExporter;
import io.vertx.servicediscovery.spi.ServiceImporter;
import io.vertx.servicediscovery.spi.ServicePublisher;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Function;
import java.util.stream.Collectors;
public class DiscoveryImpl implements ServiceDiscovery, ServicePublisher {
private final Vertx vertx;
private final String announce;
private final String usage;
private final ServiceDiscoveryBackend backend;
private final Set<ServiceImporter> importers = new CopyOnWriteArraySet<>();
private final Set<ServiceExporter> exporters = new CopyOnWriteArraySet<>();
private final Set<ServiceReference> bindings = new CopyOnWriteArraySet<>();
private final static Logger LOGGER = LoggerFactory.getLogger(DiscoveryImpl.class.getName());
private final String id;
private final ServiceDiscoveryOptions options;
public DiscoveryImpl(Vertx vertx, ServiceDiscoveryOptions options) {
this(vertx, options, getBackend(options.getBackendConfiguration().getString("backend-name", null)));
}
DiscoveryImpl(Vertx vertx, ServiceDiscoveryOptions options, ServiceDiscoveryBackend backend) {
this.vertx = vertx;
this.announce = options.getAnnounceAddress();
this.usage = options.getUsageAddress();
this.backend = backend;
this.backend.init(vertx, options.getBackendConfiguration());
this.id = options.getName() != null ? options.getName() : getNodeId(vertx);
this.options = options;
}
public void initialize(Handler<ServiceDiscovery> completionHandler) {
Collection<ServiceImporter> spi = getServiceImporterFromSPI();
Map<Future, ServiceImporter> map = new HashMap<>();
List<Future> futures = spi.stream().map(imp -> {
Promise<Void> promise = Promise.promise();
imp.start(vertx, this, new JsonObject(), promise);
map.put(promise.future(), imp);
return promise.future();
}).collect(Collectors.toList());
CompositeFuture.join(futures)
.setHandler(ar -> {
for (Future f : futures) {
ServiceImporter serviceImporter = map.get(f);
if (f.succeeded()) {
LOGGER.info("Auto-registration of importer " + serviceImporter);
importers.add(serviceImporter);
} else {
LOGGER.warn("Failed to register importer " + serviceImporter);
}
}
completionHandler.handle(this);
});
}
private String getNodeId(Vertx vertx) {
if (vertx.isClustered()) {
return ((VertxInternal) vertx).getNodeID();
} else {
return "localhost";
}
}
private static ServiceDiscoveryBackend getBackend(String maybeName) {
ServiceLoader<ServiceDiscoveryBackend> backends = ServiceLoader.load(ServiceDiscoveryBackend.class);
Iterator<ServiceDiscoveryBackend> iterator = backends.iterator();
if (maybeName == null) {
if (!iterator.hasNext()) {
return new DefaultServiceDiscoveryBackend();
} else {
return iterator.next();
}
}
if (maybeName.equals(DefaultServiceDiscoveryBackend.class.getName())) {
return new DefaultServiceDiscoveryBackend();
}
while (iterator.hasNext()) {
ServiceDiscoveryBackend backend = iterator.next();
if (backend.name().equals(maybeName)) {
return backend;
}
}
throw new IllegalStateException("Cannot find the discovery backend implementation with name " + maybeName + " in " +
"the classpath");
}
private Collection<ServiceImporter> getServiceImporterFromSPI() {
ServiceLoader<ServiceImporter> importers = ServiceLoader.load(ServiceImporter.class);
Iterator<ServiceImporter> iterator = importers.iterator();
List<ServiceImporter> list = new ArrayList<>();
while (iterator.hasNext()) {
ServiceImporter importer = iterator.next();
list.add(importer);
}
return list;
}
@Override
public ServiceReference getReference(Record record) {
return getReferenceWithConfiguration(record, new JsonObject());
}
@Override
public ServiceReference getReferenceWithConfiguration(Record record, JsonObject configuration) {
ServiceReference reference = ServiceTypes.get(record).get(vertx, this, record, configuration);
bindings.add(reference);
sendBindEvent(reference);
return reference;
}
private void sendBindEvent(ServiceReference reference) {
if (usage == null) {
return;
}
vertx.eventBus().publish(usage, new JsonObject()
.put("type", "bind")
.put("record", reference.record().toJson())
.put("id", id));
}
@Override
public boolean release(ServiceReference reference) {
boolean removed = bindings.remove(reference);
reference.release();
sendUnbindEvent(reference);
return removed;
}
private void sendUnbindEvent(ServiceReference reference) {
if (usage == null) {
return;
}
vertx.eventBus().publish(usage, new JsonObject()
.put("type", "release")
.put("record", reference.record().toJson())
.put("id", id));
}
@Override
public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration,
Handler<AsyncResult<Void>> completionHandler) {
JsonObject conf;
if (configuration == null) {
conf = new JsonObject();
} else {
conf = configuration;
}
Promise<Void> completed = Promise.promise();
completed.future().setHandler(
ar -> {
if (ar.failed()) {
LOGGER.error("Cannot start the service importer " + importer, ar.cause());
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture(ar.cause()));
}
} else {
importers.add(importer);
LOGGER.info("Service importer " + importer + " started");
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture(null));
}
}
}
);
importer.start(vertx, this, conf, completed);
return this;
}
@Override
public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration) {
return registerServiceImporter(importer, configuration, null);
}
@Override
public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonObject configuration) {
return registerServiceExporter(exporter, configuration, null);
}
@Override
public ServiceDiscovery registerServiceExporter(ServiceExporter exporter, JsonObject configuration,
Handler<AsyncResult<Void>> completionHandler) {
JsonObject conf;
if (configuration == null) {
conf = new JsonObject();
} else {
conf = configuration;
}
Promise<Void> completed = Promise.promise();
completed.future().setHandler(
ar -> {
if (ar.failed()) {
LOGGER.error("Cannot start the service importer " + exporter, ar.cause());
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture(ar.cause()));
}
} else {
exporters.add(exporter);
LOGGER.info("Service exporter " + exporter + " started");
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture(null));
}
}
}
);
exporter.init(vertx, this, conf, completed);
return this;
}
@Override
public void close() {
LOGGER.info("Stopping service discovery");
List<Future> futures = new ArrayList<>();
for (ServiceImporter importer : importers) {
Promise<Void> promise = Promise.promise();
importer.close(v -> promise.complete());
futures.add(promise.future());
}
for (ServiceExporter exporter : exporters) {
Promise<Void> promise = Promise.promise();
exporter.close(promise::complete);
futures.add(promise.future());
}
bindings.forEach(ServiceReference::release);
bindings.clear();
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
LOGGER.info("Discovery bridges stopped");
} else {
LOGGER.warn("Some discovery bridges did not stopped smoothly", ar.cause());
}
});
}
@Override
public void publish(Record record, Handler<AsyncResult<Record>> resultHandler) {
Status status = record.getStatus() == null || record.getStatus() == Status.UNKNOWN
? Status.UP : record.getStatus();
backend.store(record.setStatus(status), ar -> {
if (ar.failed()) {
resultHandler.handle(Future.failedFuture(ar.cause()));
return;
}
for (ServiceExporter exporter : exporters) {
exporter.onPublish(new Record(ar.result()));
}
Record announcedRecord = new Record(ar.result());
announcedRecord
.setRegistration(null)
.setStatus(status);
vertx.eventBus().publish(announce, announcedRecord.toJson());
resultHandler.handle(Future.succeededFuture(ar.result()));
});
}
@Override
public void unpublish(String id, Handler<AsyncResult<Void>> resultHandler) {
backend.remove(id, record -> {
if (record.failed()) {
resultHandler.handle(Future.failedFuture(record.cause()));
return;
}
for (ServiceExporter exporter : exporters) {
exporter.onUnpublish(id);
}
Record announcedRecord = new Record(record.result());
announcedRecord
.setRegistration(null)
.setStatus(Status.DOWN);
vertx.eventBus().publish(announce, announcedRecord.toJson());
resultHandler.handle(Future.succeededFuture());
});
}
@Override
public void getRecord(JsonObject filter,
Handler<AsyncResult<Record>> resultHandler) {
boolean includeOutOfService = false;
Function<Record, Boolean> accept;
if (filter == null) {
accept = r -> true;
} else {
includeOutOfService = filter.getString("status") != null;
accept = r -> r.match(filter);
}
getRecord(accept, includeOutOfService, resultHandler);
}
@Override
public void getRecord(Function<Record, Boolean> filter, Handler<AsyncResult<Record>> resultHandler) {
getRecord(filter, false, resultHandler);
}
@Override
public void getRecord(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<Record>>
resultHandler) {
Objects.requireNonNull(filter);
backend.getRecords(list -> {
if (list.failed()) {
resultHandler.handle(Future.failedFuture(list.cause()));
} else {
Optional<Record> any = list.result().stream()
.filter(filter::apply)
.filter(record -> includeOutOfService || record.getStatus() == Status.UP)
.findAny();
if (any.isPresent()) {
resultHandler.handle(Future.succeededFuture(any.get()));
} else {
resultHandler.handle(Future.succeededFuture(null));
}
}
});
}
@Override
public void getRecords(JsonObject filter, Handler<AsyncResult<List<Record>>> resultHandler) {
boolean includeOutOfService = false;
Function<Record, Boolean> accept;
if (filter == null) {
accept = r -> true;
} else {
includeOutOfService = filter.getString("status") != null;
accept = r -> r.match(filter);
}
getRecords(accept, includeOutOfService, resultHandler);
}
@Override
public void getRecords(Function<Record, Boolean> filter, Handler<AsyncResult<List<Record>>> resultHandler) {
getRecords(filter, false, resultHandler);
}
@Override
public void getRecords(Function<Record, Boolean> filter, boolean includeOutOfService, Handler<AsyncResult<List<Record>>> resultHandler) {
Objects.requireNonNull(filter);
backend.getRecords(list -> {
if (list.failed()) {
resultHandler.handle(Future.failedFuture(list.cause()));
} else {
resultHandler.handle(Future.succeededFuture(
list.result().stream()
.filter(filter::apply)
.filter(record -> includeOutOfService || record.getStatus() == Status.UP)
.collect(Collectors.toList())
));
}
});
}
@Override
public void update(Record record, Handler<AsyncResult<Record>> resultHandler) {
backend.update(record, ar -> {
if (ar.failed()) {
resultHandler.handle(Future.failedFuture(ar.cause()));
} else {
resultHandler.handle(Future.succeededFuture(record));
}
});
for (ServiceExporter exporter : exporters) {
exporter.onUpdate(record);
}
Record announcedRecord = new Record(record);
vertx.eventBus().publish(announce, announcedRecord.toJson());
}
@Override
public Set<ServiceReference> bindings() {
return new HashSet<>(bindings);
}
@Override
public ServiceDiscoveryOptions options() {
return options;
}
public void unbind(ServiceReference reference) {
if (bindings.remove(reference)) {
sendUnbindEvent(reference);
}
}
}