package io.vertx.config.zookeeper;
import io.vertx.config.spi.ConfigStore;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.KeeperException;
import java.util.Objects;
public class ZookeeperConfigStore implements ConfigStore {
private final CuratorFramework client;
private final String path;
private final VertxInternal vertx;
public ZookeeperConfigStore(Vertx vertx, JsonObject configuration) {
String connection = Objects.requireNonNull(configuration.getString("connection"));
path = Objects.requireNonNull(configuration.getString("path"));
this.vertx = (VertxInternal) Objects.requireNonNull(vertx);
int maxRetries = configuration.getInteger("maxRetries", 3);
int baseGraceBetweenRetries = configuration.getInteger("baseSleepTimeBetweenRetries", 1000);
client = CuratorFrameworkFactory.newClient(connection,
new ExponentialBackoffRetry(baseGraceBetweenRetries, maxRetries));
client.start();
}
@Override
public Future<Buffer> get() {
return vertx.executeBlocking(promise -> {
try {
client.blockUntilConnected();
promise.complete();
} catch (InterruptedException e) {
promise.fail(e);
}
}).flatMap(v -> {
Promise<Buffer> promise = vertx.promise();
try {
client.getData()
.inBackground((client, event) -> retrieve(event, promise))
.withUnhandledErrorListener((message, e) -> promise.fail(new Exception(message, e)))
.forPath(path);
} catch (Exception e) {
promise.fail(e);
}
return promise.future();
});
}
private void retrieve(CuratorEvent event, Promise<Buffer> promise) {
KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
if (code == KeeperException.Code.OK) {
promise.complete(Buffer.buffer(event.getData()));
} else if (code == KeeperException.Code.NONODE) {
promise.complete(Buffer.buffer("{}"));
} else {
promise.fail(KeeperException.create(code, path));
}
}
@Override
public Future<Void> close() {
client.close();
return vertx.getOrCreateContext().succeededFuture();
}
}