package io.vertx.spi.cluster.zookeeper.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.AsyncMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static io.vertx.spi.cluster.zookeeper.impl.AsyncMapTTLMonitor.*;
public class ZKAsyncMap<K, V> extends ZKMap<K, V> implements AsyncMap<K, V> {
private final PathChildrenCache curatorCache;
private AsyncMapTTLMonitor<K, V> asyncMapTTLMonitor;
public ZKAsyncMap(Vertx vertx, CuratorFramework curator, AsyncMapTTLMonitor<K,V> asyncMapTTLMonitor, String mapName) {
super(curator, vertx, ZK_PATH_ASYNC_MAP, mapName);
this.curatorCache = new PathChildrenCache(curator, mapPath, true);
try {
this.asyncMapTTLMonitor = asyncMapTTLMonitor;
curatorCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
throw new VertxException(e);
}
}
@Override
public void get(K k, Handler<AsyncResult<V>> asyncResultHandler) {
assertKeyIsNotNull(k)
.compose(aVoid -> checkExists(k))
.compose(checkResult -> {
Future<V> future = Future.future();
if (checkResult) {
ChildData childData = curatorCache.getCurrentData(keyPath(k));
if (childData != null && childData.getData() != null) {
try {
V value = asObject(childData.getData());
future.complete(value);
} catch (Exception e) {
future.fail(e);
}
} else {
future.complete();
}
} else {
future.complete();
}
return future;
})
.setHandler(asyncResultHandler);
}
@Override
public void put(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
put(k, v, Optional.empty(), completionHandler);
}
@Override
public void put(K k, V v, long timeout, Handler<AsyncResult<Void>> completionHandler) {
put(k, v, Optional.of(timeout), completionHandler);
}
private void put(K k, V v, Optional<Long> timeoutOptional, Handler<AsyncResult<Void>> completionHandler) {
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> checkExists(k))
.compose(checkResult -> checkResult ? setData(k, v) : create(k, v))
.compose(aVoid -> {
JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k));
if (timeoutOptional.isPresent()) {
asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this);
body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get());
} else body.put(TTL_KEY_IS_CANCEL, true);
vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body);
Future<Void> future = Future.future();
future.complete();
return future;
})
.setHandler(completionHandler);
}
@Override
public void putIfAbsent(K k, V v, Handler<AsyncResult<V>> completionHandler) {
putIfAbsent(k, v, Optional.empty(), completionHandler);
}
@Override
public void putIfAbsent(K k, V v, long timeout, Handler<AsyncResult<V>> completionHandler) {
putIfAbsent(k, v, Optional.of(timeout), completionHandler);
}
private void putIfAbsent(K k, V v, Optional<Long> timeoutOptional, Handler<AsyncResult<V>> completionHandler) {
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> {
Future<V> innerFuture = Future.future();
vertx.executeBlocking(future -> {
long startTime = Instant.now().toEpochMilli();
int retries = 0;
for (; ; ) {
try {
Stat stat = new Stat();
String path = keyPath(k);
V currentValue = getData(stat, path);
if (compareAndSet(startTime, retries++, stat, path, currentValue, v)) {
future.complete(currentValue);
return;
}
} catch (Exception e) {
future.fail(e);
return;
}
}
}, false, innerFuture);
return innerFuture;
})
.compose(value -> {
JsonObject body = new JsonObject().put(TTL_KEY_BODY_KEY_PATH, keyPath(k));
if (timeoutOptional.isPresent()) {
asyncMapTTLMonitor.addAsyncMapWithPath(keyPath(k), this);
body.put(TTL_KEY_BODY_TIMEOUT, timeoutOptional.get());
} else body.put(TTL_KEY_IS_CANCEL, true);
vertx.eventBus().publish(TTL_KEY_HANDLER_ADDRESS, body);
return Future.succeededFuture(value);
})
.setHandler(completionHandler);
}
@Override
public void remove(K k, Handler<AsyncResult<V>> asyncResultHandler) {
assertKeyIsNotNull(k).compose(aVoid -> {
Future<V> future = Future.future();
get(k, future);
return future;
}).compose(value -> {
Future<V> future = Future.future();
if (value != null) {
return delete(k, value);
} else {
future.complete();
}
return future;
}).setHandler(asyncResultHandler);
}
@Override
public void removeIfPresent(K k, V v, Handler<AsyncResult<Boolean>> resultHandler) {
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> {
Future<V> future = Future.future();
get(k, future);
return future;
})
.compose(value -> {
Future<Boolean> future = Future.future();
if (value.equals(v)) {
delete(k, v).setHandler(deleteResult -> {
if (deleteResult.succeeded()) future.complete(true);
else future.fail(deleteResult.cause());
});
} else {
future.complete(false);
}
return future;
}).setHandler(resultHandler);
}
@Override
public void replace(K k, V v, Handler<AsyncResult<V>> asyncResultHandler) {
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> {
Future<V> innerFuture = Future.future();
vertx.executeBlocking(future -> {
long startTime = Instant.now().toEpochMilli();
int retries = 0;
for (; ; ) {
try {
Stat stat = new Stat();
String path = keyPath(k);
V currentValue = getData(stat, path);
if (currentValue == null) {
future.complete(null);
return;
}
if (compareAndSet(startTime, retries++, stat, path, currentValue, v)) {
future.complete(currentValue);
return;
}
} catch (Exception e) {
future.fail(e);
return;
}
}
}, false, innerFuture);
return innerFuture;
})
.setHandler(asyncResultHandler);
}
@Override
public void replaceIfPresent(K k, V oldValue, V newValue, Handler<AsyncResult<Boolean>> resultHandler) {
assertKeyIsNotNull(k)
.compose(aVoid -> assertValueIsNotNull(oldValue))
.compose(aVoid -> assertValueIsNotNull(newValue))
.compose(aVoid -> {
Future<Boolean> innerFuture = Future.future();
vertx.executeBlocking(future -> {
long startTime = Instant.now().toEpochMilli();
int retries = 0;
for (; ; ) {
try {
Stat stat = new Stat();
String path = keyPath(k);
V currentValue = getData(stat, path);
if (!currentValue.equals(oldValue)) {
future.complete(false);
return;
}
if (compareAndSet(startTime, retries++, stat, path, oldValue, newValue)) {
future.complete(true);
return;
}
} catch (Exception e) {
future.fail(e);
return;
}
}
}, false, innerFuture);
return innerFuture;
})
.setHandler(resultHandler);
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
delete(mapPath, null).setHandler(result -> {
if (result.succeeded()) {
resultHandler.handle(Future.succeededFuture());
} else {
resultHandler.handle(Future.failedFuture(result.cause()));
}
});
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
try {
curator.getChildren().inBackground((client, event) ->
vertx.runOnContext(aVoid -> resultHandler.handle(Future.succeededFuture(event.getChildren().size()))))
.forPath(mapPath);
} catch (Exception e) {
resultHandler.handle(Future.failedFuture(e));
}
}
public void keys(Handler<AsyncResult<Set<K>>> resultHandler) {
Context context = vertx.getOrCreateContext();
try {
curator.getChildren().inBackground((client, event) -> {
Set<K> keys = new HashSet<>();
for (String base64Key : event.getChildren()) {
byte[] binaryKey = Base64.getUrlDecoder().decode(base64Key);
K key;
try {
key = asObject(binaryKey);
} catch (Exception e) {
context.runOnContext(v -> resultHandler.handle(Future.failedFuture(e)));
return;
}
keys.add(key);
}
context.runOnContext(v -> resultHandler.handle(Future.succeededFuture(keys)));
}).forPath(mapPath);
} catch (Exception e) {
resultHandler.handle(Future.failedFuture(e));
}
}
@Override
public void values(Handler<AsyncResult<List<V>>> resultHandler) {
Future<Set<K>> keysFuture = Future.future();
keys(keysFuture);
keysFuture.compose(keys -> {
List<Future> futures = new ArrayList<>(keys.size());
for (K k : keys) {
Future valueFuture = Future.future();
get(k, valueFuture);
futures.add(valueFuture);
}
return CompositeFuture.all(futures).map(compositeFuture -> {
List<V> values = new ArrayList<>(compositeFuture.size());
for (int i = 0; i < compositeFuture.size(); i++) {
values.add(compositeFuture.resultAt(i));
}
return values;
});
}).setHandler(resultHandler);
}
@Override
public void entries(Handler<AsyncResult<Map<K, V>>> resultHandler) {
Future<Set<K>> keysFuture = Future.future();
keys(keysFuture);
keysFuture.map(ArrayList::new).compose(keys -> {
List<Future> futures = new ArrayList<>(keys.size());
for (K k : keys) {
Future valueFuture = Future.future();
get(k, valueFuture);
futures.add(valueFuture);
}
return CompositeFuture.all(futures).map(compositeFuture -> {
Map<K, V> map = new HashMap<>();
for (int i = 0; i < compositeFuture.size(); i++) {
map.put(keys.get(i), compositeFuture.resultAt(i));
}
return map;
});
}).setHandler(resultHandler);
}
@Override
String keyPath(K k) {
try {
return keyPathPrefix() + Base64.getUrlEncoder().encodeToString(asByte(k));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String keyPathPrefix() {
return mapPath + "/";
}
}