package io.vertx.spi.cluster.zookeeper.impl;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.shareddata.impl.ClusterSerializable;
import org.apache.curator.RetryLoop;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import java.io.*;
import java.lang.reflect.Constructor;
import java.time.Instant;
import java.util.stream.Stream;
abstract class ZKMap<K, V> {
final CuratorFramework curator;
protected final Vertx vertx;
final String mapPath;
protected final String mapName;
static final String ZK_PATH_ASYNC_MAP = "asyncMap";
static final String ZK_PATH_ASYNC_MULTI_MAP = "asyncMultiMap";
static final String EVENTBUS_PATH = "/" + ZK_PATH_ASYNC_MULTI_MAP + "/__vertx.subs/";
static final String ZK_PATH_SYNC_MAP = "syncMap";
private RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 5);
ZKMap(CuratorFramework curator, Vertx vertx, String mapType, String mapName) {
this.curator = curator;
this.vertx = vertx;
this.mapName = mapName;
this.mapPath = "/" + mapType + "/" + mapName;
}
String keyPath(K k) {
return mapPath + "/" + k.toString();
}
String valuePath(K k, Object v) {
return keyPath(k) + "/" + v.toString();
}
Future<Void> assertKeyIsNotNull(Object key) {
boolean result = key == null;
if (result) return Future.failedFuture("key can not be null.");
else return Future.succeededFuture();
}
Future<Void> assertValueIsNotNull(Object value) {
boolean result = value == null;
if (result) return Future.failedFuture("value can not be null.");
else return Future.succeededFuture();
}
Future<Void> assertKeyAndValueAreNotNull(Object key, Object value) {
return assertKeyIsNotNull(key).compose(aVoid -> assertValueIsNotNull(value));
}
byte[] asByte(Object object) throws IOException {
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
DataOutput dataOutput = new DataOutputStream(byteOut);
if (object instanceof ClusterSerializable) {
ClusterSerializable clusterSerializable = (ClusterSerializable) object;
dataOutput.writeBoolean(true);
dataOutput.writeUTF(object.getClass().getName());
Buffer buffer = Buffer.buffer();
clusterSerializable.writeToBuffer(buffer);
byte[] bytes = buffer.getBytes();
dataOutput.writeInt(bytes.length);
dataOutput.write(bytes);
} else {
dataOutput.writeBoolean(false);
ByteArrayOutputStream javaByteOut = new ByteArrayOutputStream();
ObjectOutput objectOutput = new ObjectOutputStream(javaByteOut);
objectOutput.writeObject(object);
dataOutput.write(javaByteOut.toByteArray());
}
return byteOut.toByteArray();
}
<T> T asObject(byte[] bytes) throws Exception {
ByteArrayInputStream byteIn = new ByteArrayInputStream(bytes);
DataInputStream in = new DataInputStream(byteIn);
boolean isClusterSerializable = in.readBoolean();
if (isClusterSerializable) {
String className = in.readUTF();
Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
int length = in.readInt();
byte[] body = new byte[length];
in.readFully(body);
try {
ClusterSerializable clusterSerializable;
if (clazz.getConstructors().length == 0) {
Constructor<T> constructor = (Constructor<T>) clazz.getDeclaredConstructor();
constructor.setAccessible(true);
clusterSerializable = (ClusterSerializable) constructor.newInstance();
} else {
clusterSerializable = (ClusterSerializable) clazz.newInstance();
}
clusterSerializable.readFromBuffer(0, Buffer.buffer(body));
return (T) clusterSerializable;
} catch (Exception e) {
throw new IllegalStateException("Failed to load class " + e.getMessage(), e);
}
} else {
byte[] body = new byte[in.available()];
in.readFully(body);
ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(body));
return (T) objectIn.readObject();
}
}
<T> T getData(Stat stat, String path) throws Exception {
T result = null;
if (null != curator.checkExists().forPath(path)) {
result = asObject(curator.getData().storingStatIn(stat).forPath(path));
} else {
curator.create().creatingParentsIfNeeded().forPath(path, asByte(null));
}
return result;
}
boolean compareAndSet(long startTime, int retries, Stat stat, String path, V expect, V update) throws Exception {
V currentValue = getData(stat, path);
if (currentValue == expect || currentValue.equals(expect)) {
try {
curator.setData().withVersion(stat.getVersion()).forPath(path, asByte(update));
} catch (KeeperException.BadVersionException | KeeperException.NoNodeException e) {
if (!retryPolicy.allowRetry(retries, Instant.now().toEpochMilli() - startTime, RetryLoop.getDefaultRetrySleeper())) {
throw new VertxException("failed to acquire optimistic lock");
}
}
return true;
} else {
return false;
}
}
Future<Boolean> checkExists(K k) {
return checkExists(keyPath(k));
}
Future<Boolean> checkExists(String path) {
Future<Boolean> future = Future.future();
try {
curator.sync().inBackground((clientSync, eventSync) -> {
try {
if (eventSync.getType() == CuratorEventType.SYNC) {
curator.checkExists().inBackground((clientCheck, eventCheck) -> {
if (eventCheck.getType() == CuratorEventType.EXISTS) {
if (eventCheck.getStat() == null) {
vertx.runOnContext(aVoid -> future.complete(false));
} else {
vertx.runOnContext(aVoid -> future.complete(true));
}
}
}).forPath(path);
}
} catch (Exception ex) {
vertx.runOnContext(aVoid -> future.fail(ex));
}
}).forPath(path);
} catch (Exception ex) {
vertx.runOnContext(aVoid -> future.fail(ex));
}
return future;
}
Future<Stat> create(K k, V v) {
return create(keyPath(k), v);
}
Future<Stat> create(String path, V v) {
Future<Stat> future = Future.future();
try {
CreateMode nodeMode = path.contains(EVENTBUS_PATH) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
curator.create().creatingParentsIfNeeded().withMode(nodeMode).inBackground((cl, el) -> {
if (el.getType() == CuratorEventType.CREATE) {
vertx.runOnContext(event -> future.complete(el.getStat()));
}
}).forPath(path, asByte(v));
} catch (Exception ex) {
vertx.runOnContext(event -> future.fail(ex));
}
return future;
}
Future<Stat> setData(K k, V v) {
return setData(keyPath(k), v);
}
Future<Stat> setData(String path, V v) {
Future<Stat> future = Future.future();
try {
curator.setData().inBackground((client, event) -> {
if (event.getType() == CuratorEventType.SET_DATA) {
vertx.runOnContext(e -> future.complete(event.getStat()));
}
}).forPath(path, asByte(v));
} catch (Exception ex) {
vertx.runOnContext(event -> future.fail(ex));
}
return future;
}
Future<V> delete(K k, V v) {
return delete(keyPath(k), v);
}
Future<V> delete(String path, V v) {
Future<V> future = Future.future();
try {
curator.delete().deletingChildrenIfNeeded().inBackground((client, event) -> {
if (event.getType() == CuratorEventType.DELETE) {
String[] paths = path.split("/");
String parentNodePath = Stream.of(paths).limit(paths.length - 1).reduce((previous, current) -> previous + "/" + current).get();
curator.getChildren().inBackground((childClient, childEvent) -> {
if (childEvent.getChildren().size() == 0) {
curator.delete().inBackground((deleteClient, deleteEvent) -> {
if (deleteEvent.getType() == CuratorEventType.DELETE)
vertx.runOnContext(ea -> future.complete(v));
}).forPath(parentNodePath);
} else {
vertx.runOnContext(ea -> future.complete(v));
}
}).forPath(parentNodePath);
}
}).forPath(path);
} catch (Exception ex) {
vertx.runOnContext(aVoid -> future.fail(ex));
}
return future;
}
}