package io.vertx.spi.cluster.zookeeper.impl;
import io.vertx.core.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.INITIALIZED;
import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.NODE_ADDED;
import static org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type.NODE_REMOVED;
public class ZKAsyncMultiMap<K, V> extends ZKMap<K, V> implements AsyncMultiMap<K, V> {
private TreeCache treeCache;
private CountDownLatch latch = new CountDownLatch(1);
private ConcurrentMap<String, ChoosableSet<V>> cache = new ConcurrentHashMap<>();
private ConcurrentMap<String, ChoosableSet<V>> eventBusSnapshotCache = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(ZKAsyncMultiMap.class);
public ZKAsyncMultiMap(Vertx vertx, CuratorFramework curator, String mapName) {
super(curator, vertx, ZK_PATH_ASYNC_MULTI_MAP, mapName);
treeCache = new TreeCache(curator, mapPath);
treeCache.getListenable().addListener(new Listener());
try {
treeCache.start();
latch.await(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new VertxException(e);
}
}
@Override
public void add(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
String path = valuePath(k, v);
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> checkExists(path))
.compose(checkResult -> checkResult ? setData(path, v) : create(path, v))
.compose(stat -> {
if (path.contains(EVENTBUS_PATH)) {
ChoosableSet<V> serverIDs = eventBusSnapshotCache.get(path);
if (serverIDs == null) serverIDs = new ChoosableSet<>(1);
serverIDs.add(v);
eventBusSnapshotCache.put(path, serverIDs);
}
Future<Void> future = Future.future();
try {
curator.sync().inBackground((syncClient, syncEvent) -> {
if (syncEvent.getType() == CuratorEventType.SYNC) {
curator.getData().inBackground((getClient, getEvent) -> {
if (stat == null || stat.getMtime() <= getEvent.getStat().getMtime()) {
vertx.runOnContext(aVoid -> future.complete());
} else {
vertx.runOnContext(aVoid -> future.fail("can not get correct zxid."));
}
}).forPath(path);
}
}).forPath(path);
} catch (Exception ex) {
vertx.runOnContext(aVoid -> future.fail(ex));
}
return future;
})
.setHandler(completionHandler);
}
@Override
public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> asyncResultHandler) {
Context ctx = vertx.getOrCreateContext();
assertKeyIsNotNull(k)
.compose(aVoid -> {
final String keyPath = keyPath(k);
ChoosableSet<V> entries = cache.get(keyPath);
Future<ChoosableIterable<V>> future = Future.future();
if (entries != null && !entries.isEmpty()) {
future.complete(entries);
} else {
try {
curator.sync().inBackground((clientSync, eventSync) -> {
if (eventSync.getType() == CuratorEventType.SYNC) {
Map<String, ChildData> maps = treeCache.getCurrentChildren(keyPath);
ChoosableSet<V> newEntries = new ChoosableSet<>(maps != null ? maps.size() : 0);
if (maps != null) {
for (ChildData childData : maps.values()) {
try {
if (childData != null && childData.getData() != null && childData.getData().length > 0) {
newEntries.add(asObject(childData.getData()));
}
} catch (Exception ex) {
ctx.runOnContext(v -> future.fail(ex));
}
}
cache.putIfAbsent(keyPath, newEntries);
}
ctx.runOnContext(v -> future.complete(newEntries));
}
}).forPath(keyPath);
} catch (Exception ex) {
ctx.runOnContext(v -> future.fail(ex));
}
}
return future;
})
.setHandler(ar -> ctx.runOnContext(v -> asyncResultHandler.handle(ar)));
}
@Override
public void remove(K k, V v, Handler<AsyncResult<Boolean>> completionHandler) {
assertKeyAndValueAreNotNull(k, v)
.compose(aVoid -> {
String fullPath = valuePath(k, v);
return remove(keyPath(k), v, fullPath);
})
.setHandler(completionHandler);
}
private Future<Boolean> remove(String keyPath, V v, String fullPath) {
return checkExists(fullPath).compose(checkResult -> {
Future<Boolean> future = Future.future();
if (checkResult) {
Optional.ofNullable(treeCache.getCurrentData(fullPath))
.ifPresent(childData -> delete(fullPath, null).setHandler(deleteResult -> {
if (keyPath.contains(EVENTBUS_PATH)) {
Optional.ofNullable(eventBusSnapshotCache.get(keyPath)).ifPresent(vs -> {
vs.remove(v);
eventBusSnapshotCache.put(keyPath, vs);
});
}
future.complete(true);
}));
} else {
future.complete(false);
}
return future;
});
}
@Override
public void removeAllForValue(V v, Handler<AsyncResult<Void>> completionHandler) {
removeAllMatching(value -> value.hashCode() == v.hashCode(), completionHandler);
}
@Override
public void removeAllMatching(Predicate<V> p, Handler<AsyncResult<Void>> completionHandler) {
List<Future> futures = new ArrayList<>();
Optional.ofNullable(treeCache.getCurrentChildren(mapPath)).ifPresent(childDataMap -> {
childDataMap.keySet().forEach(partKeyPath -> {
String keyPath = mapPath + "/" + partKeyPath;
treeCache.getCurrentChildren(keyPath).keySet().forEach(valuePath -> {
String fullPath = keyPath + "/" + valuePath;
Optional.ofNullable(treeCache.getCurrentData(fullPath))
.filter(childData -> Optional.of(childData.getData()).isPresent())
.ifPresent(childData -> {
try {
V value = asObject(childData.getData());
if (p.test(value)) {
futures.add(remove(keyPath, value, fullPath));
}
} catch (Exception e) {
futures.add(Future.failedFuture(e));
}
});
});
});
CompositeFuture.all(futures).compose(compositeFuture -> {
Future<Void> future = Future.future();
future.complete();
return future;
}).setHandler(completionHandler);
});
}
private Future<Void> restoreSnapshotCache() {
Future<Void> futureResult = Future.future();
List<Future> allFuture = eventBusSnapshotCache.entrySet().stream().map(entry -> {
String path = entry.getKey().substring(mapPath.length() + 1).split("/", 2)[0];
ChoosableSet<V> values = entry.getValue();
List<Future> futures = values.getIds().stream().map(value -> {
Future<Void> future = Future.future();
add((K) path, value, future);
return future;
}).collect(Collectors.toList());
return futures;
}).flatMap(Collection::stream).collect(Collectors.toList());
CompositeFuture.all(allFuture).setHandler(event -> {
if (event.failed()) {
futureResult.fail(event.cause());
} else {
futureResult.complete();
}
});
return futureResult;
}
private class Listener implements TreeCacheListener {
private AtomicBoolean reconnected = new AtomicBoolean(false);
private String cachePath(final String key) {
return mapPath + "/" + key;
}
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent treeCacheEvent) throws Exception {
if (treeCacheEvent.getType() == INITIALIZED) {
latch.countDown();
return;
}
final ChildData childData = treeCacheEvent.getData();
String[] key = null;
ChoosableSet<V> entries = null;
if (treeCacheEvent.getType() == NODE_ADDED || treeCacheEvent.getType() == NODE_REMOVED) {
if (childData == null || mapPath.length() == childData.getPath().length()) {
return;
}
key = childData.getPath().substring(mapPath.length() + 1).split("/", 2);
entries = cache.computeIfAbsent(cachePath(key[0]), k -> new ChoosableSet<>(1));
}
switch (treeCacheEvent.getType()) {
case NODE_ADDED:
if (key.length > 1) {
entries.add(asObject(childData.getData()));
}
break;
case NODE_REMOVED:
if (key.length == 1) {
cache.remove(cachePath(key[0]));
} else {
for (V entry : entries)
if (entry.toString().equals(key[1])) entries.remove(entry);
}
if (reconnected.get()) {
reconnected.set(false);
restoreSnapshotCache().setHandler(event -> {
if (event.failed()) {
logger.error("restore eventbus snapshot cache failed.", event.cause());
} else {
logger.info("restore eventbus snapshot cache success.");
}
});
}
break;
case CONNECTION_SUSPENDED:
logger.warn("connection to the zookeeper server have suspended.");
break;
case CONNECTION_RECONNECTED:
reconnected.set(true);
break;
case CONNECTION_LOST:
logger.error("connection to the zookeeper server have lost, all the temporary node will be remove.");
break;
}
}
}
}