package io.vertx.spi.cluster.hazelcast;
import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.cp.ISemaphore;
import com.hazelcast.map.IMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.Lock;
import io.vertx.core.spi.cluster.*;
import io.vertx.spi.cluster.hazelcast.impl.*;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class HazelcastClusterManager implements ClusterManager, MembershipListener, LifecycleListener {
private static final Logger log = LoggerFactory.getLogger(HazelcastClusterManager.class);
private static final String LOCK_SEMAPHORE_PREFIX = "__vertx.";
private VertxInternal vertx;
private NodeSelector nodeSelector;
private HazelcastInstance hazelcast;
private UUID nodeId;
private NodeInfo nodeInfo;
private SubsMapHelper subsMapHelper;
private IMap<String, HazelcastNodeInfo> nodeInfoMap;
private UUID membershipListenerId;
private UUID lifecycleListenerId;
private boolean customHazelcastCluster;
private Set<String> nodeIds = new HashSet<>();
private NodeListener nodeListener;
private volatile boolean active;
private Config conf;
private ExecutorService lockReleaseExec;
public HazelcastClusterManager() {
}
public HazelcastClusterManager(Config conf) {
Objects.requireNonNull(conf, "The Hazelcast config cannot be null.");
this.conf = conf;
}
public HazelcastClusterManager(HazelcastInstance instance) {
Objects.requireNonNull(instance, "The Hazelcast instance cannot be null.");
hazelcast = instance;
customHazelcastCluster = true;
}
@Override
public void init(Vertx vertx, NodeSelector nodeSelector) {
this.vertx = (VertxInternal) vertx;
this.nodeSelector = nodeSelector;
}
@Override
public void join(Promise<Void> promise) {
vertx.executeBlocking(prom -> {
if (!active) {
active = true;
lockReleaseExec = Executors.newCachedThreadPool(r -> new Thread(r, "vertx-hazelcast-service-release-lock-thread"));
if (!customHazelcastCluster) {
if (conf == null) {
conf = loadConfig();
if (conf == null) {
log.warn("Cannot find cluster configuration on 'vertx.hazelcast.config' system property, on the classpath, " +
"or specified programmatically. Using default hazelcast configuration");
conf = new Config();
}
}
conf.setProperty("hazelcast.shutdownhook.enabled", "false");
hazelcast = Hazelcast.newHazelcastInstance(conf);
}
Member localMember = hazelcast.getCluster().getLocalMember();
nodeId = localMember.getUuid();
membershipListenerId = hazelcast.getCluster().addMembershipListener(this);
lifecycleListenerId = hazelcast.getLifecycleService().addLifecycleListener(this);
subsMapHelper = new SubsMapHelper(vertx, hazelcast, nodeSelector);
nodeInfoMap = hazelcast.getMap("__vertx.nodeInfo");
prom.complete();
}
}, promise);
}
@Override
public String getNodeId() {
return nodeId.toString();
}
@Override
public List<String> getNodes() {
List<String> list = new ArrayList<>();
for (Member member : hazelcast.getCluster().getMembers()) {
list.add(member.getUuid().toString());
}
return list;
}
@Override
public void nodeListener(NodeListener listener) {
this.nodeListener = listener;
}
@Override
public void setNodeInfo(NodeInfo nodeInfo, Promise<Void> promise) {
synchronized (this) {
this.nodeInfo = nodeInfo;
}
HazelcastNodeInfo value = new HazelcastNodeInfo(nodeInfo);
vertx.executeBlocking(prom -> {
nodeInfoMap.put(nodeId.toString(), value);
prom.complete();
}, false, promise);
}
@Override
public synchronized NodeInfo getNodeInfo() {
return nodeInfo;
}
@Override
public void getNodeInfo(String nodeId, Promise<NodeInfo> promise) {
vertx.executeBlocking(prom -> {
HazelcastNodeInfo value = nodeInfoMap.get(nodeId);
if (value != null) {
prom.complete(value.unwrap());
} else {
promise.fail("Not a member of the cluster");
}
}, false, promise);
}
@Override
public <K, V> void getAsyncMap(String name, Promise<AsyncMap<K, V>> promise) {
promise.complete(new HazelcastAsyncMap<>(vertx, hazelcast.getMap(name)));
}
@Override
public <K, V> Map<K, V> getSyncMap(String name) {
return hazelcast.getMap(name);
}
@Override
public void getLockWithTimeout(String name, long timeout, Promise<Lock> promise) {
vertx.executeBlocking(prom -> {
ISemaphore iSemaphore = hazelcast.getCPSubsystem().getSemaphore(LOCK_SEMAPHORE_PREFIX + name);
boolean locked = false;
long remaining = timeout;
do {
long start = System.nanoTime();
try {
locked = iSemaphore.tryAcquire(remaining, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
remaining = remaining - MILLISECONDS.convert(System.nanoTime() - start, NANOSECONDS);
} while (!locked && remaining > 0);
if (locked) {
prom.complete(new HazelcastLock(iSemaphore, lockReleaseExec));
} else {
throw new VertxException("Timed out waiting to get lock " + name);
}
}, false, promise);
}
@Override
public void getCounter(String name, Promise<Counter> promise) {
promise.complete(new HazelcastCounter(vertx, hazelcast.getCPSubsystem().getAtomicLong(name)));
}
@Override
public void leave(Promise<Void> promise) {
vertx.executeBlocking(prom -> {
synchronized (HazelcastClusterManager.this) {
if (active) {
try {
active = false;
lockReleaseExec.shutdown();
subsMapHelper.close();
boolean left = hazelcast.getCluster().removeMembershipListener(membershipListenerId);
if (!left) {
log.warn("No membership listener");
}
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListenerId);
while (!customHazelcastCluster && hazelcast.getLifecycleService().isRunning()) {
try {
hazelcast.getLifecycleService().shutdown();
} catch (RejectedExecutionException ignore) {
log.debug("Rejected execution of the shutdown operation, retrying");
}
try {
Thread.sleep(1);
} catch (InterruptedException t) {
Thread.currentThread().interrupt();
}
}
} catch (Throwable t) {
prom.fail(t);
}
}
}
prom.complete();
}, promise);
}
@Override
public synchronized void memberAdded(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String nid = member.getUuid().toString();
try {
if (nodeListener != null) {
nodeIds.add(nid);
nodeListener.nodeAdded(nid);
}
} catch (Throwable t) {
log.error("Failed to handle memberAdded", t);
}
}
@Override
public synchronized void memberRemoved(MembershipEvent membershipEvent) {
if (!active) {
return;
}
Member member = membershipEvent.getMember();
String nid = member.getUuid().toString();
try {
membersRemoved(Collections.singleton(nid));
} catch (Throwable t) {
log.error("Failed to handle memberRemoved", t);
}
}
private synchronized void membersRemoved(Set<String> ids) {
cleanSubs(ids);
cleanNodeInfos(ids);
nodeInfoMap.put(nodeId.toString(), new HazelcastNodeInfo(getNodeInfo()));
nodeSelector.registrationsLost();
republishOwnSubs();
if (nodeListener != null) {
nodeIds.removeAll(ids);
ids.forEach(nodeListener::nodeLeft);
}
}
private void cleanSubs(Set<String> ids) {
subsMapHelper.removeAllForNodes(ids);
}
private void cleanNodeInfos(Set<String> ids) {
ids.forEach(nodeInfoMap::remove);
}
private void republishOwnSubs() {
vertx.executeBlocking(prom -> {
subsMapHelper.republishOwnSubs();
prom.complete();
}, false);
}
@Override
public synchronized void stateChanged(LifecycleEvent lifecycleEvent) {
if (!active) {
return;
}
if (lifecycleEvent.getState() == LifecycleEvent.LifecycleState.MERGED) {
final List<String> currentNodes = getNodes();
Set<String> newNodes = new HashSet<>(currentNodes);
newNodes.removeAll(nodeIds);
Set<String> removedMembers = new HashSet<>(nodeIds);
removedMembers.removeAll(currentNodes);
for (String nodeId : newNodes) {
nodeListener.nodeAdded(nodeId);
}
membersRemoved(removedMembers);
nodeIds.retainAll(currentNodes);
}
}
@Override
public boolean isActive() {
return active;
}
@Override
public void addRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
SubsOpSerializer serializer = SubsOpSerializer.get(vertx.getOrCreateContext());
serializer.execute(subsMapHelper::put, address, registrationInfo, promise);
}
@Override
public void removeRegistration(String address, RegistrationInfo registrationInfo, Promise<Void> promise) {
SubsOpSerializer serializer = SubsOpSerializer.get(vertx.getOrCreateContext());
serializer.execute(subsMapHelper::remove, address, registrationInfo, promise);
}
@Override
public void getRegistrations(String address, Promise<List<RegistrationInfo>> promise) {
vertx.executeBlocking(prom -> {
prom.complete(subsMapHelper.get(address));
}, false, promise);
}
@Override
public String clusterHost() {
String host;
if (!customHazelcastCluster && (host = System.getProperty("hazelcast.local.localAddress")) != null) {
return host;
}
if (!customHazelcastCluster && conf.getNetworkConfig().getPublicAddress() == null) {
return hazelcast.getCluster().getLocalMember().getAddress().getHost();
}
return null;
}
@Override
public String clusterPublicHost() {
String host;
if (!customHazelcastCluster && (host = System.getProperty("hazelcast.local.publicAddress")) != null) {
return host;
}
if (!customHazelcastCluster && (host = conf.getNetworkConfig().getPublicAddress()) != null) {
return host;
}
return null;
}
public Config getConfig() {
return conf;
}
public void setConfig(Config config) {
this.conf = config;
}
public Config loadConfig() {
return ConfigUtil.loadConfig();
}
public HazelcastInstance getHazelcastInstance() {
return hazelcast;
}
}