package io.vertx.spi.cluster.hazelcast;
import com.hazelcast.config.Config;
import com.hazelcast.core.*;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastAsyncMap;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastAsyncMultiMap;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastInternalAsyncCounter;
import io.vertx.spi.cluster.hazelcast.impl.HazelcastInternalAsyncMap;
import java.util.*;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class HazelcastClusterManager implements ClusterManager, MembershipListener, LifecycleListener {
private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class);
private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
private static final String NODE_ID_ATTRIBUTE = "__vertx.nodeId";
private static final String OPTION_USE_HZ_ASYNC_API = "vertx.hazelcast.async-api";
private static final boolean USE_HZ_ASYNC_API = Boolean.getBoolean(OPTION_USE_HZ_ASYNC_API);
private Vertx vertx;
private HazelcastInstance hazelcast;
private String nodeID;
private String membershipListenerId;
private String lifecycleListenerId;
private boolean customHazelcastCluster;
private Set<String> nodeIds = new HashSet<>();
private Set<HazelcastAsyncMultiMap> multimaps = Collections.newSetFromMap(new WeakHashMap<>(1));
private NodeListener nodeListener;
private volatile boolean active;
private Config conf;
public HazelcastClusterManager() {
}
public HazelcastClusterManager(Config conf) {
Objects.requireNonNull(conf, "The Hazelcast config cannot be null.");
this.conf = conf;
}
public HazelcastClusterManager(HazelcastInstance instance) {
Objects.requireNonNull(instance, "The Hazelcast instance cannot be null.");
hazelcast = instance;
customHazelcastCluster = true;
}
public void setVertx(Vertx vertx) {
this.vertx = vertx;
}
public synchronized void join(Handler<AsyncResult<Void>> resultHandler) {
vertx.executeBlocking(fut -> {
if (!active) {
active = true;
if (!customHazelcastCluster) {
if (conf == null) {
conf = loadConfig();
if (conf == null) {
log.warn("Cannot find cluster configuration on 'vertx.hazelcast.config' system property, on the classpath, " +
"or specified programmatically. Using default hazelcast configuration");
conf = new Config();
}
}
conf.setProperty("hazelcast.shutdownhook.enabled", "false");
hazelcast = Hazelcast.newHazelcastInstance(conf);
}
Member localMember = hazelcast.getCluster().getLocalMember();
nodeID = localMember.getUuid();
localMember.setStringAttribute(NODE_ID_ATTRIBUTE, nodeID);
membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this);
fut.complete();
}
}, resultHandler);
}
@Override
public <K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler) {
vertx.executeBlocking(fut -> {
com.hazelcast.core.MultiMap<K, V> multiMap = hazelcast.getMultiMap(name);
HazelcastAsyncMultiMap<K, V> asyncMultiMap = new HazelcastAsyncMultiMap<>(vertx, multiMap);
synchronized (this) {
multimaps.add(asyncMultiMap);
}
fut.complete(asyncMultiMap);
}, resultHandler);
}
@Override
public String getNodeID() {
return nodeID;
}
@Override
public List<String> getNodes() {
List<String> list = new ArrayList<>();
for (Member member : hazelcast.getCluster().getMembers()) {
String nodeIdAttribute = member.getStringAttribute(NODE_ID_ATTRIBUTE);
list.add(nodeIdAttribute != null ? nodeIdAttribute : member.getUuid());
}
return list;
}
@Override
public void nodeListener(NodeListener listener) {
this.nodeListener = listener;
}
@Override
public <K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) {
vertx.executeBlocking(fut -> {
IMap<K, V> map = hazelcast.getMap(name);
fut.complete(USE_HZ_ASYNC_API ? new HazelcastInternalAsyncMap<>(vertx, map) : new HazelcastAsyncMap<>(vertx, map));
}, resultHandler);
}
@Override
public <K, V> Map<K, V> getSyncMap(String name) {
IMap<K, V> map = hazelcast.getMap(name);
return map;
}
@Override
public void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler) {
vertx.executeBlocking(fut -> {
ISemaphore iSemaphore = hazelcast.getSemaphore(LOCK_SEMAPHORE_PREFIX + name);
boolean locked = false;
long remaining = timeout;
do {
long start = System.nanoTime();
try {
locked = iSemaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
remaining = remaining - MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS);
} while (!locked && remaining > 0);
if (locked) {
fut.complete(new HazelcastLock(iSemaphore));
} else {
throw new VertxException("Timed out waiting to get lock " + name);
}
}, false, resultHandler);
}
@Override
public void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler) {
vertx.executeBlocking(fut ->
fut.complete(
USE_HZ_ASYNC_API ?
new HazelcastInternalAsyncCounter(vertx, hazelcast.getAtomicLong(name)) :
new HazelcastCounter(hazelcast.getAtomicLong(name))
)
, resultHandler);
}
public void leave(Handler<AsyncResult<Void>> resultHandler) {
vertx.executeBlocking(fut -> {
synchronized (HazelcastClusterManager.this) {
if (active) {
try {
active = false;
boolean left = hazelcast.getCluster().removeMembershipListener(membershipListenerId);
if (!left) {
log.warn("No membership listener");
}
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListenerId);
while (!customHazelcastCluster && hazelcast.getLifecycleService().isRunning()) {
try {
hazelcast.getLifecycleService().shutdown();
} catch (RejectedExecutionException ignore) {
log.debug("Rejected execution of the shutdown operation, retrying");
}
try {
Thread.sleep(1);
} catch (InterruptedException t) {
Thread.currentThread().interrupt();
}
}
if (customHazelcastCluster) {
hazelcast.getCluster().getLocalMember().removeAttribute(NODE_ID_ATTRIBUTE);
}
} catch (Throwable t) {
fut.fail(t);
}
}
}
fut.complete();
}, resultHandler);
}
@Override
public synchronized void memberAdded(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
nodeIds.add(memberNodeId);
nodeListener.nodeAdded(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberAdded", t);
}
}
@Override
public synchronized void memberRemoved(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String memberNodeId = member.getStringAttribute(NODE_ID_ATTRIBUTE);
if (memberNodeId == null) {
memberNodeId = member.getUuid();
}
try {
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if (nodeListener != null) {
nodeIds.remove(memberNodeId);
nodeListener.nodeLeft(memberNodeId);
}
} catch (Throwable t) {
log.error("Failed to handle memberRemoved", t);
}
}
@Override
public synchronized void stateChanged(LifecycleEvent lifecycleEvent) {
if (!active) {
return;
}
multimaps.forEach(HazelcastAsyncMultiMap::clearCache);
if(lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) {
final List<String> currentNodes = getNodes();
Set<String> newNodes = new HashSet<>(currentNodes);
newNodes.removeAll(nodeIds);
Set<String> removedMembers = new HashSet<>(nodeIds);
removedMembers.removeAll(currentNodes);
for (String nodeId : newNodes) {
nodeListener.nodeAdded(nodeId);
}
for (String nodeId : removedMembers) {
nodeListener.nodeLeft(nodeId);
}
nodeIds.retainAll(currentNodes);
}
}
@Override
public boolean isActive() {
return active;
}
@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
}
public Config getConfig() {
return conf;
}
public void setConfig(Config config) {
this.conf = config;
}
public Config loadConfig() {
return ConfigUtil.loadConfig();
}
public HazelcastInstance getHazelcastInstance() {
return hazelcast;
}
private class HazelcastCounter implements Counter {
private IAtomicLong atomicLong;
private HazelcastCounter(IAtomicLong atomicLong) {
this.atomicLong = atomicLong;
}
@Override
public void get(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.get()), resultHandler);
}
@Override
public void incrementAndGet(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.incrementAndGet()), resultHandler);
}
@Override
public void getAndIncrement(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.getAndIncrement()), resultHandler);
}
@Override
public void decrementAndGet(Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.decrementAndGet()), resultHandler);
}
@Override
public void addAndGet(long value, Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.addAndGet(value)), resultHandler);
}
@Override
public void getAndAdd(long value, Handler<AsyncResult<Long>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.getAndAdd(value)), resultHandler);
}
@Override
public void compareAndSet(long expected, long value, Handler<AsyncResult<Boolean>> resultHandler) {
Objects.requireNonNull(resultHandler, "resultHandler");
vertx.executeBlocking(fut -> fut.complete(atomicLong.compareAndSet(expected, value)), resultHandler);
}
}
private class HazelcastLock implements Lock {
private final ISemaphore semaphore;
private final AtomicBoolean released = new AtomicBoolean();
private HazelcastLock(ISemaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void release() {
if (released.compareAndSet(false, true)) {
vertx.executeBlocking(future -> {
semaphore.release();
future.complete();
}, false, null);
}
}
}
}