package io.vertx.spi.cluster.ignite.impl;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.shareddata.AsyncMap;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteFuture;
import javax.cache.Cache;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static io.vertx.spi.cluster.ignite.impl.ClusterSerializationUtils.marshal;
import static io.vertx.spi.cluster.ignite.impl.ClusterSerializationUtils.unmarshal;
public class AsyncMapImpl<K, V> implements AsyncMap<K, V> {
private final VertxInternal vertx;
private final IgniteCache<K, V> cache;
public AsyncMapImpl(IgniteCache<K, V> cache, VertxInternal vertx) {
this.cache = cache;
this.vertx = vertx;
}
@Override
public Future<V> get(K k) {
return execute(cache -> cache.getAsync(marshal(k)));
}
@Override
public Future<Void> put(K k, V v) {
return execute(cache -> cache.putAsync(marshal(k), marshal(v)));
}
@Override
public Future<Void> put(K k, V v, long ttl) {
return executeWithTtl(cache -> cache.putAsync(marshal(k), marshal(v)), ttl);
}
@Override
public Future<V> putIfAbsent(K k, V v) {
return execute(cache -> cache.getAndPutIfAbsentAsync(marshal(k), marshal(v)));
}
@Override
public Future<V> putIfAbsent(K k, V v, long ttl) {
return executeWithTtl(cache -> cache.getAndPutIfAbsentAsync(marshal(k), marshal(v)), ttl);
}
@Override
public Future<V> remove(K k) {
return execute(cache -> cache.getAndRemoveAsync(marshal(k)));
}
@Override
public Future<Boolean> removeIfPresent(K k, V v) {
return execute(cache -> cache.removeAsync(marshal(k), marshal(v)));
}
@Override
public Future<V> replace(K k, V v) {
return execute(cache -> cache.getAndReplaceAsync(marshal(k), marshal(v)));
}
@Override
public Future<Boolean> replaceIfPresent(K k, V oldValue, V newValue) {
return execute(cache -> cache.replaceAsync(marshal(k), marshal(oldValue), marshal(newValue)));
}
@Override
public Future<Void> clear() {
return execute(IgniteCache::clearAsync);
}
@Override
public Future<Integer> size() {
return execute(IgniteCache::sizeAsync);
}
@Override
public Future<Set<K>> keys() {
return entries().map(Map::keySet);
}
@Override
public Future<List<V>> values() {
return entries().map(map -> new ArrayList<>(map.values()));
}
@Override
public Future<Map<K, V>> entries() {
return vertx.executeBlocking(fut -> {
List<Cache.Entry<K, V>> all = cache.query(new ScanQuery<K, V>()).getAll();
Map<K, V> map = new HashMap<>(all.size());
for (Cache.Entry<K, V> entry : all) {
map.put(unmarshal(entry.getKey()), unmarshal(entry.getValue()));
}
fut.complete(map);
});
}
private <T> Future<T> execute(Function<IgniteCache<K, V>, IgniteFuture<T>> cacheOp) {
return executeWithTtl(cacheOp, -1);
}
private <T> Future<T> executeWithTtl(Function<IgniteCache<K, V>, IgniteFuture<T>> cacheOp, long ttl) {
ContextInternal ctx = vertx.getOrCreateContext();
Promise<T> promise = ctx.promise();
IgniteCache<K, V> cache0 = ttl > 0 ?
cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, ttl))) : cache;
IgniteFuture<T> future = cacheOp.apply(cache0);
future.listen(fut -> {
try {
promise.complete(unmarshal(future.get()));
} catch (IgniteException e) {
promise.fail(new VertxException(e));
}
});
return promise.future();
}
}