package io.vertx.spi.cluster.hazelcast.impl;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.AsyncMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.*;
public class HazelcastInternalAsyncMap<K, V> implements AsyncMap<K, V> {
private final Vertx vertx;
private final IMap<K, V> map;
public HazelcastInternalAsyncMap(Vertx vertx, IMap<K, V> map) {
this.vertx = vertx;
this.map = map;
}
@Override
public void get(K k, Handler<AsyncResult<V>> asyncResultHandler) {
executeAsync(
map.getAsync(convertParam(k)),
asyncResultHandler
);
}
@Override
public void put(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
executeAsyncVoid(
map.setAsync(kk, vv),
completionHandler
);
}
@Override
public void putIfAbsent(K k, V v, Handler<AsyncResult<V>> resultHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
vertx.executeBlocking(fut -> fut.complete(convertReturn(map.putIfAbsent(kk, HazelcastServerID.convertServerID(vv)))),
resultHandler);
}
@Override
public void put(K k, V v, long ttl, Handler<AsyncResult<Void>> completionHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
executeAsyncVoid(
(ICompletableFuture<Void>) map.putAsync(kk, vv, ttl, TimeUnit.MILLISECONDS),
completionHandler
);
}
@Override
public void putIfAbsent(K k, V v, long ttl, Handler<AsyncResult<V>> resultHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
vertx.executeBlocking(fut -> fut.complete(convertReturn(map.putIfAbsent(kk, HazelcastServerID.convertServerID(vv),
ttl, TimeUnit.MILLISECONDS))), resultHandler);
}
@Override
public void remove(K k, Handler<AsyncResult<V>> resultHandler) {
K kk = convertParam(k);
executeAsync(
(ICompletableFuture<V>)map.removeAsync(kk),
resultHandler
);
}
@Override
public void removeIfPresent(K k, V v, Handler<AsyncResult<Boolean>> resultHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
vertx.executeBlocking(fut -> fut.complete(map.remove(kk, vv)), resultHandler);
}
@Override
public void replace(K k, V v, Handler<AsyncResult<V>> resultHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
vertx.executeBlocking(fut -> fut.complete(convertReturn(map.replace(kk, vv))), resultHandler);
}
@Override
public void replaceIfPresent(K k, V oldValue, V newValue, Handler<AsyncResult<Boolean>> resultHandler) {
K kk = convertParam(k);
V vv = convertParam(oldValue);
V vvv = convertParam(newValue);
vertx.executeBlocking(fut -> fut.complete(map.replace(kk, vv, vvv)), resultHandler);
}
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) {
vertx.executeBlocking(fut -> {
map.clear();
fut.complete();
}, resultHandler);
}
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) {
vertx.executeBlocking(fut -> fut.complete(map.size()), resultHandler);
}
@Override
public void keys(Handler<AsyncResult<Set<K>>> resultHandler) {
vertx.executeBlocking(fut -> {
Set<K> set = new HashSet<>();
for (K kk : map.keySet()) {
K k = ConversionUtils.convertReturn(kk);
set.add(k);
}
fut.complete(set);
}, resultHandler);
}
@Override
public void values(Handler<AsyncResult<List<V>>> resultHandler) {
vertx.executeBlocking(fut -> {
List<V> list = new ArrayList<>();
for (V vv : map.values()) {
V v = ConversionUtils.convertReturn(vv);
list.add(v);
}
fut.complete(list);
}, resultHandler);
}
@Override
public void entries(Handler<AsyncResult<Map<K, V>>> resultHandler) {
vertx.executeBlocking(fut -> {
Map<K, V> result = new HashMap<>();
for (Map.Entry<K, V> entry : map.entrySet()) {
K k = ConversionUtils.convertReturn(entry.getKey());
V v = ConversionUtils.convertReturn(entry.getValue());
result.put(k, v);
}
fut.complete(result);
}, resultHandler);
}
private <T> void executeAsync(ICompletableFuture<T> future,
Handler<AsyncResult<T>> resultHandler) {
future.andThen(
new HandlerCallBackAdapter(resultHandler),
VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext())
);
}
private void executeAsyncVoid(ICompletableFuture<Void> future,
Handler<AsyncResult<Void>> resultHandler) {
future.andThen(
new VoidHandlerCallBackAdapter(resultHandler),
VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext())
);
}
}