package io.vertx.spi.cluster.hazelcast.impl;
import com.hazelcast.map.IMap;
import io.vertx.core.Future;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.shareddata.AsyncMap;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletionStage;
import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.convertParam;
import static io.vertx.spi.cluster.hazelcast.impl.ConversionUtils.convertReturn;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class HazelcastAsyncMap<K, V> implements AsyncMap<K, V> {
private final VertxInternal vertx;
private final IMap<K, V> map;
public HazelcastAsyncMap(VertxInternal vertx, IMap<K, V> map) {
this.vertx = vertx;
this.map = map;
}
@Override
public Future<V> get(K k) {
K kk = convertParam(k);
ContextInternal context = vertx.getOrCreateContext();
return Future.fromCompletionStage(map.getAsync(kk), context).map(ConversionUtils::convertReturn);
}
@Override
public Future<Void> put(K k, V v) {
K kk = convertParam(k);
V vv = convertParam(v);
ContextInternal context = vertx.getOrCreateContext();
return Future.fromCompletionStage(map.setAsync(kk, HazelcastServerID.convertServerID(vv)), context);
}
@Override
public Future<V> putIfAbsent(K k, V v) {
K kk = convertParam(k);
V vv = convertParam(v);
return vertx.executeBlocking(fut -> {
fut.complete(convertReturn(map.putIfAbsent(kk, HazelcastServerID.convertServerID(vv))));
}, false);
}
@Override
public Future<Void> put(K k, V v, long ttl) {
K kk = convertParam(k);
V vv = convertParam(v);
ContextInternal context = vertx.getOrCreateContext();
CompletionStage<Void> completionStage = map.setAsync(kk, HazelcastServerID.convertServerID(vv), ttl, MILLISECONDS);
return Future.fromCompletionStage(completionStage, context);
}
@Override
public Future<V> putIfAbsent(K k, V v, long ttl) {
K kk = convertParam(k);
V vv = convertParam(v);
return vertx.executeBlocking(fut -> {
fut.complete(convertReturn(map.putIfAbsent(kk, HazelcastServerID.convertServerID(vv), ttl, MILLISECONDS)));
}, false);
}
@Override
public Future<V> remove(K k) {
K kk = convertParam(k);
ContextInternal context = vertx.getOrCreateContext();
CompletionStage<V> completionStage = map.removeAsync(kk);
return Future.fromCompletionStage(completionStage, context).map(ConversionUtils::convertReturn);
}
@Override
public Future<Boolean> removeIfPresent(K k, V v) {
K kk = convertParam(k);
V vv = convertParam(v);
return vertx.executeBlocking(fut -> fut.complete(map.remove(kk, vv)), false);
}
@Override
public Future<V> replace(K k, V v) {
K kk = convertParam(k);
V vv = convertParam(v);
return vertx.executeBlocking(fut -> fut.complete(convertReturn(map.replace(kk, vv))), false);
}
@Override
public Future<Boolean> replaceIfPresent(K k, V oldValue, V newValue) {
K kk = convertParam(k);
V vv = convertParam(oldValue);
V vvv = convertParam(newValue);
return vertx.executeBlocking(fut -> fut.complete(map.replace(kk, vv, vvv)), false);
}
@Override
public Future<Void> clear() {
return vertx.executeBlocking(fut -> {
map.clear();
fut.complete();
}, false);
}
@Override
public Future<Integer> size() {
return vertx.executeBlocking(fut -> fut.complete(map.size()), false);
}
@Override
public Future<Set<K>> keys() {
return vertx.executeBlocking(fut -> {
Set<K> set = new HashSet<>();
for (K kk : map.keySet()) {
K k = ConversionUtils.convertReturn(kk);
set.add(k);
}
fut.complete(set);
}, false);
}
@Override
public Future<List<V>> values() {
return vertx.executeBlocking(fut -> {
List<V> list = new ArrayList<>();
for (V vv : map.values()) {
V v = ConversionUtils.convertReturn(vv);
list.add(v);
}
fut.complete(list);
}, false);
}
@Override
public Future<Map<K, V>> entries() {
return vertx.executeBlocking(fut -> {
Map<K, V> result = new HashMap<>();
for (Entry<K, V> entry : map.entrySet()) {
K k = ConversionUtils.convertReturn(entry.getKey());
V v = ConversionUtils.convertReturn(entry.getValue());
result.put(k, v);
}
fut.complete(result);
}, false);
}
}