package io.vertx.servicediscovery.zookeeper;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.json.JsonObject;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.spi.ServiceImporter;
import io.vertx.servicediscovery.spi.ServicePublisher;
import io.vertx.servicediscovery.spi.ServiceType;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.zookeeper.KeeperException;
import java.util.*;
public class ZookeeperServiceImporter implements ServiceImporter, TreeCacheListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImporter.class);
private ServicePublisher publisher;
private CuratorFramework client;
private ServiceDiscovery<JsonObject> discovery;
private TreeCache cache;
private volatile boolean started;
private Set<RegistrationHolder<ServiceInstance<JsonObject>>> registrations = new ConcurrentHashSet<>();
@Override
public void start(Vertx vertx, ServicePublisher publisher, JsonObject configuration, Promise<Void> future) {
this.publisher = publisher;
String connection = Objects.requireNonNull(configuration.getString("connection"));
int maxRetries = configuration.getInteger("maxRetries", 3);
int baseGraceBetweenRetries = configuration.getInteger("baseSleepTimeBetweenRetries", 1000);
String basePath = configuration.getString("basePath", "/discovery");
boolean canBeReadOnly = configuration.getBoolean("canBeReadOnly", true);
int connectionTimeoutMs = configuration.getInteger("connectionTimeoutMs", 1000);
vertx.<Void>executeBlocking(
f -> {
try {
client = CuratorFrameworkFactory.builder()
.canBeReadOnly(canBeReadOnly)
.connectString(connection)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(new ExponentialBackoffRetry(baseGraceBetweenRetries, maxRetries))
.build();
client.start();
discovery = ServiceDiscoveryBuilder.builder(JsonObject.class)
.client(client)
.basePath(basePath)
.serializer(new JsonObjectSerializer())
.watchInstances(true)
.build();
discovery.start();
cache = TreeCache.newBuilder(client, basePath).build();
cache.start();
cache.getListenable().addListener(this);
f.complete();
} catch (Exception e) {
future.fail(e);
}
},
ar -> {
if (ar.failed()) {
future.fail(ar.cause());
} else {
Promise<Void> p = Promise.promise();
p.future().onComplete(x -> {
if (x.failed()) {
future.fail(x.cause());
} else {
future.complete(null);
}
});
compute(p);
}
}
);
}
private synchronized void compute(Promise<Void> done) {
List<ServiceInstance<JsonObject>> instances = new ArrayList<>();
try {
Collection<String> names = discovery.queryForNames();
for (String name : names) {
instances.addAll(discovery.queryForInstances(name));
}
} catch (KeeperException.NoNodeException e) {
} catch (Exception e) {
if (done != null) {
done.fail(e);
} else {
LOGGER.error("Unable to retrieve service instances from Zookeeper", e);
return;
}
}
Set<RegistrationHolder<ServiceInstance<JsonObject>>> registered
= new HashSet<>(registrations);
Set<ServiceInstance<JsonObject>> remote = new HashSet<>(instances);
List<Future> actions = new ArrayList<>();
RegistrationHolder.filter(registered, instances)
.stream()
.map(reg -> {
Promise<Void> promise = Promise.promise();
publisher.unpublish(reg.record().getRegistration(), v -> {
registrations.remove(reg);
if (v.succeeded()) {
promise.complete(null);
} else {
promise.fail(v.cause());
}
});
return promise.future();
}).forEach(actions::add);
RegistrationHolder.filter(remote, registrations)
.stream()
.map(instance -> {
Promise<Void> promise = Promise.promise();
publisher.publish(createRecordForInstance(instance), v -> {
if (v.succeeded()) {
registrations.add(new RegistrationHolder<>(v.result(), instance));
promise.complete(null);
} else {
promise.fail(v.cause());
}
});
return promise.future();
}).forEach(actions::add);
if (done != null) {
CompositeFuture.all(actions).onComplete(ar -> {
if (ar.succeeded()) {
done.complete(null);
} else {
done.fail(ar.cause());
}
});
}
}
static Record createRecordForInstance(ServiceInstance<JsonObject> instance) {
Record record = new Record();
record.setName(instance.getName());
JsonObject payload = instance.getPayload();
record.setMetadata(payload);
record.getMetadata().put("zookeeper-service-type", instance.getServiceType().toString());
record.getMetadata().put("zookeeper-address", instance.getAddress());
record.getMetadata().put("zookeeper-registration-time",
instance.getRegistrationTimeUTC());
record.getMetadata().put("zookeeper-port", instance.getPort());
record.getMetadata().put("zookeeper-ssl-port", instance.getSslPort());
record.getMetadata().put("zookeeper-id", instance.getId());
record.setLocation(new JsonObject());
if (instance.getUriSpec() != null) {
String uri = instance.buildUriSpec();
record.getLocation().put("endpoint", uri);
} else {
String uri = "http";
if (instance.getSslPort() != null) {
uri += "s://" + instance.getAddress() + ":" + instance.getSslPort();
} else if (instance.getPort() != null) {
uri += "s://" + instance.getAddress() + ":" + instance.getPort();
} else {
uri += "://" + instance.getAddress();
}
record.getLocation().put("endpoint", uri);
}
if (instance.getPort() != null) {
record.getLocation().put("port", instance.getPort());
}
if (instance.getSslPort() != null) {
record.getLocation().put("ssl-port", instance.getSslPort());
}
if (instance.getAddress() != null) {
record.getLocation().put("address", instance.getAddress());
}
record.setType(payload.getString("service-type", ServiceType.UNKNOWN));
return record;
}
@Override
public void close(Handler<Void> closeHandler) {
Promise<Void> done = Promise.promise();
unregisterAllServices(done);
done.future().onComplete(v -> {
try {
cache.close();
discovery.close();
client.close();
} catch (Exception e) {
}
closeHandler.handle(null);
});
}
@Override
public void childEvent(CuratorFramework curatorFramework,
TreeCacheEvent treeCacheEvent) {
if (treeCacheEvent.getType() == TreeCacheEvent.Type.INITIALIZED) {
started = true;
} else if (started) {
compute(null);
}
}
private synchronized void unregisterAllServices(Promise<Void> done) {
List<Future> list = new ArrayList<>();
new HashSet<>(registrations).forEach(reg -> {
Promise<Void> unreg = Promise.promise();
publisher.unpublish(reg.record().getRegistration(), unreg);
});
registrations.clear();
CompositeFuture.all(list).onComplete(x -> {
if (x.failed()) {
done.fail(x.cause());
} else {
done.complete();
}
});
}
}