package io.vertx.spi.cluster.ignite;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.spi.cluster.ignite.impl.AsyncMapImpl;
import io.vertx.spi.cluster.ignite.impl.AsyncMultiMapImpl;
import io.vertx.spi.cluster.ignite.impl.MapImpl;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;

import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static javax.cache.expiry.Duration.*;
import static org.apache.ignite.events.EventType.*;

/** * Apache Ignite based cluster manager. * * @author Andrey Gura */
public class IgniteClusterManager implements ClusterManager { private static final Logger log = LoggerFactory.getLogger(IgniteClusterManager.class); // Default Ignite configuration file private static final String DEFAULT_CONFIG_FILE = "default-ignite.xml"; // User defined Ignite configuration file private static final String CONFIG_FILE = "ignite.xml"; public static final String VERTX_CACHE_TEMPLATE_NAME = "*"; private static final String VERTX_NODE_PREFIX = "vertx.ignite.node."; // Workaround for https://github.com/vert-x3/vertx-ignite/issues/63 private static final ExpiryPolicy DEFAULT_EXPIRY_POLICY = new ClearExpiryPolicy(); private final Queue<String> pendingLocks = new ConcurrentLinkedQueue<>(); private Vertx vertx; private IgniteConfiguration cfg; private Ignite ignite; private boolean customIgnite; private String nodeID = UUID.randomUUID().toString(); private NodeListener nodeListener; private IgnitePredicate<Event> eventListener; private volatile boolean active; private final Object monitor = new Object(); private CollectionConfiguration collectionCfg;
@SuppressWarnings("unused") public IgniteClusterManager() { System.setProperty("IGNITE_NO_SHUTDOWN_HOOK", "true"); }
@SuppressWarnings("unused") public IgniteClusterManager(IgniteConfiguration cfg) { this.cfg = cfg; setNodeID(cfg); }
@SuppressWarnings("unused") public IgniteClusterManager(URL configFile) { this.cfg = loadConfiguration(configFile); }
public IgniteClusterManager(Ignite ignite) { Objects.requireNonNull(ignite, "Ignite instance can't be null."); this.ignite = ignite; this.customIgnite = true; }
public Ignite getIgniteInstance() { return ignite; } @Override public void setVertx(Vertx vertx) { this.vertx = vertx; } @Override public void nodeListener(NodeListener nodeListener) { this.nodeListener = nodeListener; } @Override public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> handler) { vertx.executeBlocking( fut -> fut.complete(new AsyncMultiMapImpl<>(this.<K, Set<V>>getCache(name), vertx)), handler ); } @Override public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> handler) { vertx.executeBlocking( fut -> fut.complete(new AsyncMapImpl<>(getCache(name), vertx)), handler ); } @Override public <K, V> Map<K, V> getSyncMap(String name) { return new MapImpl<>(getCache(name)); } @Override public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> handler) { vertx.executeBlocking(fut -> { boolean locked; try { IgniteQueue<String> queue = getQueue(name, true); pendingLocks.offer(name); locked = queue.offer(getNodeID(), timeout, TimeUnit.MILLISECONDS); if (!locked) { // EVT_NODE_LEFT/EVT_NODE_FAILED event might be already handled, so trying get lock again if // node left topology. // Use IgniteSempahore when it will be fixed. String ownerId = queue.peek(); ClusterNode ownerNode = ignite.cluster().forNodeId(UUID.fromString(ownerId)).node(); if (ownerNode == null) { queue.remove(ownerId); locked = queue.offer(getNodeID(), timeout, TimeUnit.MILLISECONDS); } } } catch (Exception e) { throw new VertxException("Error during getting lock " + name, e); } finally { pendingLocks.remove(name); } if (locked) { fut.complete(new LockImpl(name)); } else { throw new VertxException("Timed out waiting to get lock " + name); } }, false, handler); } @Override public void getCounter(String name, Handler<AsyncResult<Counter>> handler) { vertx.executeBlocking(fut -> fut.complete(new CounterImpl(ignite.atomicLong(name, 0, true))), handler); } @Override public String getNodeID() { return nodeID; } @Override public List<String> getNodes() { return ignite.cluster().nodes().stream() .map(IgniteClusterManager::nodeId).collect(Collectors.toList()); } @Override public void join(Handler<AsyncResult<Void>> handler) { synchronized (monitor) { vertx.executeBlocking(fut -> { if (!active) { active = true; if (!customIgnite) { ignite = cfg == null ? Ignition.start(loadConfiguration()) : Ignition.start(cfg); } nodeID = nodeId(ignite.cluster().localNode()); for (CacheConfiguration cacheCfg : ignite.configuration().getCacheConfiguration()) { if (cacheCfg.getName().equals(VERTX_CACHE_TEMPLATE_NAME)) { collectionCfg = new CollectionConfiguration(); collectionCfg.setAtomicityMode(cacheCfg.getAtomicityMode()); collectionCfg.setBackups(cacheCfg.getBackups()); break; } } if (collectionCfg == null) { collectionCfg = new CollectionConfiguration(); } eventListener = event -> { if (!active) { return false; } if (nodeListener != null) { vertx.executeBlocking(f -> { if (isActive()) { switch (event.type()) { case EVT_NODE_JOINED: nodeListener.nodeAdded(nodeId(((DiscoveryEvent)event).eventNode())); break; case EVT_NODE_LEFT: case EVT_NODE_FAILED: String nodeId = nodeId(((DiscoveryEvent)event).eventNode()); nodeListener.nodeLeft(nodeId); releasePendingLocksForFailedNode(nodeId); break; } } fut.complete(); }, null); } return true; }; ignite.events().localListen(eventListener, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); fut.complete(); } }, handler); } }
private void releasePendingLocksForFailedNode(final String nodeId) { Set<String> processed = new HashSet<>(); pendingLocks.forEach(name -> { if (processed.add(name)) { IgniteQueue<String> queue = getQueue(name, false); if (queue != null && nodeId.equals(queue.peek())) { queue.remove(nodeId); } } }); } @Override public void leave(Handler<AsyncResult<Void>> handler) { synchronized (monitor) { vertx.executeBlocking(fut -> { if (active) { active = false; try { if (!customIgnite) ignite.close(); else if (eventListener != null) ignite.events().stopLocalListen(eventListener, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } catch (Exception e) { log.error(e); } } fut.complete(); }, handler); } } @Override public boolean isActive() { return active; } private IgniteConfiguration loadConfiguration(URL config) { try { IgniteConfiguration cfg = F.first(IgnitionEx.loadConfigurations(config).get1()); setNodeID(cfg); return cfg; } catch (IgniteCheckedException e) { log.error("Configuration loading error:", e); throw new RuntimeException(e); } } private IgniteConfiguration loadConfiguration() { ClassLoader ctxClsLoader = Thread.currentThread().getContextClassLoader(); InputStream is = null; if (ctxClsLoader != null) { is = ctxClsLoader.getResourceAsStream(CONFIG_FILE); } if (is == null) { is = getClass().getClassLoader().getResourceAsStream(CONFIG_FILE); if (is == null) { is = getClass().getClassLoader().getResourceAsStream(DEFAULT_CONFIG_FILE); log.info("Using default configuration."); } } try { IgniteConfiguration cfg = F.first(IgnitionEx.loadConfigurations(is).get1()); setNodeID(cfg); return cfg; } catch (IgniteCheckedException e) { log.error("Configuration loading error:", e); throw new RuntimeException(e); } } private void setNodeID(IgniteConfiguration cfg) { UUID uuid = UUID.fromString(nodeID); cfg.setNodeId(uuid); cfg.setIgniteInstanceName(VERTX_NODE_PREFIX + uuid); } private <K, V> IgniteCache<K, V> getCache(String name) { IgniteCache<K, V> cache = ignite.getOrCreateCache(name); return cache.withExpiryPolicy(DEFAULT_EXPIRY_POLICY); } private <T> IgniteQueue<T> getQueue(String name, boolean create) { return ignite.queue(name, 1, create ? collectionCfg : null); } private static String nodeId(ClusterNode node) { return node.id().toString(); } private class LockImpl implements Lock { private final String name; private LockImpl(String name) { this.name = name; } @Override public void release() { vertx.executeBlocking(future -> { IgniteQueue<String> queue = getQueue(name, true); String ownerId = queue.poll(); if (ownerId == null) { throw new VertxException("Inconsistent lock state " + name); } future.complete(); }, false, null); } } private class CounterImpl implements Counter { private final IgniteAtomicLong cnt; private CounterImpl(IgniteAtomicLong cnt) { this.cnt = cnt; } @Override public void get(Handler<AsyncResult<Long>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.get()), handler); } @Override public void incrementAndGet(Handler<AsyncResult<Long>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.incrementAndGet()), handler); } @Override public void getAndIncrement(Handler<AsyncResult<Long>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.getAndIncrement()), handler); } @Override public void decrementAndGet(Handler<AsyncResult<Long>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.decrementAndGet()), handler); } @Override public void addAndGet(long value, Handler<AsyncResult<Long>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.addAndGet(value)), handler); } @Override public void getAndAdd(long value, Handler<AsyncResult<Long>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.getAndAdd(value)), handler); } @Override public void compareAndSet(long expected, long value, Handler<AsyncResult<Boolean>> handler) { Objects.requireNonNull(handler, "handler"); vertx.executeBlocking(fut -> fut.complete(cnt.compareAndSet(expected, value)), handler); } } private static class ClearExpiryPolicy implements ExpiryPolicy, Serializable { @Override public Duration getExpiryForCreation() { return ETERNAL; } @Override public Duration getExpiryForAccess() { return ETERNAL; } @Override public Duration getExpiryForUpdate() { return ETERNAL; } } }