package io.vertx.spi.cluster.hazelcast.impl;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.MapEvent;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
public class HazelcastAsyncMultiMap<K, V> implements AsyncMultiMap<K, V>, EntryListener<K, V> {
private final VertxInternal vertx;
private final com.hazelcast.core.MultiMap<K, V> map;
private final TaskQueue taskQueue = new TaskQueue();
private ConcurrentMap<K, ChoosableSet<V>> cache = new ConcurrentHashMap<>();
public HazelcastAsyncMultiMap(Vertx vertx, com.hazelcast.core.MultiMap<K, V> map) {
this.vertx = (VertxInternal) vertx;
this.map = map;
map.addEntryListener(this, true);
}
@Override
public void removeAllForValue(V val, Handler<AsyncResult<Void>> completionHandler) {
removeAllMatching(val::equals, completionHandler);
}
@Override
public void removeAllMatching(Predicate<V> p, Handler<AsyncResult<Void>> completionHandler) {
vertx.getOrCreateContext().executeBlocking(fut -> {
for (Map.Entry<K, V> entry : map.entrySet()) {
V v = entry.getValue();
if (p.test(v)) {
map.remove(entry.getKey(), v);
}
}
fut.complete();
}, taskQueue, completionHandler);
}
@Override
public void add(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
vertx.getOrCreateContext().executeBlocking(fut -> {
map.put(k, HazelcastClusterNodeInfo.convertClusterNodeInfo(v));
fut.complete();
}, taskQueue, completionHandler);
}
@Override
public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) {
ContextInternal context = vertx.getOrCreateContext();
@SuppressWarnings("unchecked")
Queue<GetRequest<K, V>> getRequests = (Queue<GetRequest<K, V>>) context.contextData().computeIfAbsent(this, ctx -> new ArrayDeque<>());
synchronized (getRequests) {
ChoosableSet<V> entries = cache.get(k);
if (entries != null && entries.isInitialised() && getRequests.isEmpty()) {
context.runOnContext(v -> {
resultHandler.handle(Future.succeededFuture(entries));
});
} else {
getRequests.add(new GetRequest<>(k, resultHandler));
if (getRequests.size() == 1) {
dequeueGet(context, getRequests);
}
}
}
}
private void dequeueGet(ContextInternal context, Queue<GetRequest<K, V>> getRequests) {
GetRequest<K, V> getRequest;
for (; ; ) {
getRequest = getRequests.peek();
ChoosableSet<V> entries = cache.get(getRequest.key);
if (entries != null && entries.isInitialised()) {
Handler<AsyncResult<ChoosableIterable<V>>> handler = getRequest.handler;
context.runOnContext(v -> {
handler.handle(Future.succeededFuture(entries));
});
getRequests.remove();
if (getRequests.isEmpty()) {
return;
}
} else {
break;
}
}
K key = getRequest.key;
Handler<AsyncResult<ChoosableIterable<V>>> handler = getRequest.handler;
context.<ChoosableIterable<V>>executeBlocking(fut -> {
Collection<V> entries = map.get(key);
ChoosableSet<V> sids;
if (entries != null) {
sids = new ChoosableSet<>(entries.size());
for (V hid : entries) {
sids.add(hid);
}
} else {
sids = new ChoosableSet<>(0);
}
ChoosableSet<V> prev = (sids.isEmpty()) ? null : cache.putIfAbsent(key, sids);
if (prev != null) {
prev.merge(sids);
sids = prev;
}
sids.setInitialised();
fut.complete(sids);
}, taskQueue, res -> {
synchronized (getRequests) {
context.runOnContext(v -> {
handler.handle(res);
});
getRequests.remove();
if (!getRequests.isEmpty()) {
dequeueGet(context, getRequests);
}
}
});
}
@Override
public void remove(K k, V v, Handler<AsyncResult<Boolean>> completionHandler) {
vertx.getOrCreateContext().executeBlocking(fut -> {
fut.complete(map.remove(k, HazelcastClusterNodeInfo.convertClusterNodeInfo(v)));
}, taskQueue, completionHandler);
}
@Override
public void entryAdded(EntryEvent<K, V> entry) {
addEntry(entry.getKey(), entry.getValue());
}
private void addEntry(K k, V v) {
ChoosableSet<V> entries = cache.get(k);
if (entries == null) {
entries = new ChoosableSet<>(1);
ChoosableSet<V> prev = cache.putIfAbsent(k, entries);
if (prev != null) {
entries = prev;
}
}
entries.add(v);
}
@Override
public void entryRemoved(EntryEvent<K, V> entry) {
removeEntry(entry.getKey(), entry.getOldValue());
}
private void removeEntry(K k, V v) {
ChoosableSet<V> entries = cache.get(k);
if (entries != null) {
if (v != null) {
entries.remove(v);
if (entries.isEmpty()) {
cache.remove(k);
}
}
}
}
@Override
public void entryUpdated(EntryEvent<K, V> entry) {
K k = entry.getKey();
ChoosableSet<V> entries = cache.get(k);
if (entries != null) {
entries.add(entry.getValue());
}
}
@Override
public void entryEvicted(EntryEvent<K, V> entry) {
entryRemoved(entry);
}
@Override
public void mapEvicted(MapEvent mapEvent) {
clearCache();
}
@Override
public void mapCleared(MapEvent mapEvent) {
clearCache();
}
public void clearCache() {
cache.clear();
}
private static class GetRequest<K, V> {
final K key;
final Handler<AsyncResult<ChoosableIterable<V>>> handler;
GetRequest(K key, Handler<AsyncResult<ChoosableIterable<V>>> handler) {
this.key = key;
this.handler = handler;
}
}
}