package io.vertx.spi.cluster.ignite.impl;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgnitePredicate;
import javax.cache.Cache;
import javax.cache.CacheException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.events.EventType.*;
public class SubsMapHelper {
private static final int[] CACHE_EVENTS = new int[]{EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED};
private final IgniteCache<IgniteRegistrationInfo, Boolean> map;
private final IgnitePredicate<Event> eventListener;
public SubsMapHelper(Ignite ignite, NodeSelector nodeSelector, VertxInternal vertxInternal) {
map = ignite.getOrCreateCache("__vertx.subs");
eventListener = event -> listen(event, nodeSelector, vertxInternal);
ignite.events().localListen(eventListener, CACHE_EVENTS);
}
public void get(String address, Promise<List<RegistrationInfo>> promise) {
try {
List<RegistrationInfo> infos = map.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.address().equals(address)))
.getAll().stream()
.map(Cache.Entry::getKey)
.map(IgniteRegistrationInfo::registrationInfo)
.collect(toList());
promise.complete(infos);
} catch (IllegalStateException | CacheException e) {
promise.fail(new VertxException(e));
}
}
public Future<Void> put(String address, RegistrationInfo registrationInfo) {
try {
map.put(new IgniteRegistrationInfo(address, registrationInfo), Boolean.TRUE);
} catch (IllegalStateException | CacheException e) {
return Future.failedFuture(new VertxException(e));
}
return Future.succeededFuture();
}
public void remove(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
try {
map.remove(new IgniteRegistrationInfo(address, registrationInfo));
promise.complete();
} catch (IllegalStateException | CacheException e) {
promise.fail(new VertxException(e));
}
}
public void removeAllForNode(String nodeId) {
Set<IgniteRegistrationInfo> toRemove = map.query(new ScanQuery<IgniteRegistrationInfo, Boolean>((k, v) -> k.registrationInfo().nodeId().equals(nodeId)))
.getAll().stream()
.map(Cache.Entry::getKey)
.collect(Collectors.toSet());
try {
map.removeAll(toRemove);
} catch (IllegalStateException | CacheException t) {
}
}
boolean listen(Event event, final NodeSelector nodeSelector, final VertxInternal vertxInternal) {
if (!(event instanceof CacheEvent)) {
return true;
}
CacheEvent cacheEvent = (CacheEvent) event;
if (!Objects.equals(cacheEvent.cacheName(), map.getName())) {
return true;
}
vertxInternal.<List<RegistrationInfo>>executeBlocking(promise -> {
String address = cacheEvent.<IgniteRegistrationInfo>key().address();
promise.future().onSuccess(registrationInfos -> {
nodeSelector.registrationsUpdated(new RegistrationUpdateEvent(address, registrationInfos));
});
get(address, promise);
});
return true;
}
public void leave(Ignite ignite) {
if (eventListener != null) {
ignite.events().stopLocalListen(eventListener, CACHE_EVENTS);
}
}
}