package io.vertx.spi.cluster.hazelcast.impl;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.MapEvent;
import com.hazelcast.multimap.MultiMap;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SubsMapHelper implements EntryListener<String, HazelcastRegistrationInfo> {
private static final Logger log = LoggerFactory.getLogger(SubsMapHelper.class);
private final VertxInternal vertx;
private final MultiMap<String, HazelcastRegistrationInfo> map;
private final NodeSelector nodeSelector;
private final UUID listenerId;
private final ConcurrentMap<String, Set<RegistrationInfo>> ownSubs = new ConcurrentHashMap<>();
private final ReadWriteLock republishLock = new ReentrantReadWriteLock();
public SubsMapHelper(VertxInternal vertx, HazelcastInstance hazelcast, NodeSelector nodeSelector) {
this.vertx = vertx;
map = hazelcast.getMultiMap("__vertx.subs");
this.nodeSelector = nodeSelector;
listenerId = map.addEntryListener(this, false);
}
public List<RegistrationInfo> get(String address) {
Lock readLock = republishLock.readLock();
readLock.lock();
try {
List<RegistrationInfo> list = new ArrayList<>();
for (HazelcastRegistrationInfo registrationInfo : map.get(address)) {
list.add(registrationInfo.unwrap());
}
return list;
} finally {
readLock.unlock();
}
}
public void put(String address, RegistrationInfo registrationInfo) {
Lock readLock = republishLock.readLock();
readLock.lock();
try {
ownSubs.compute(address, (add, curr) -> {
Set<RegistrationInfo> res = curr != null ? curr : new CopyOnWriteArraySet<>();
res.add(registrationInfo);
return res;
});
map.put(address, new HazelcastRegistrationInfo(registrationInfo));
} finally {
readLock.unlock();
}
}
public void remove(String address, RegistrationInfo registrationInfo) {
Lock readLock = republishLock.readLock();
readLock.lock();
try {
ownSubs.computeIfPresent(address, (add, curr) -> {
curr.remove(registrationInfo);
return curr.isEmpty() ? null : curr;
});
map.remove(address, new HazelcastRegistrationInfo(registrationInfo));
} finally {
readLock.unlock();
}
}
public void removeAllForNodes(Set<String> nodeIds) {
for (Map.Entry<String, HazelcastRegistrationInfo> entry : map.entrySet()) {
HazelcastRegistrationInfo registrationInfo = entry.getValue();
if (nodeIds.contains(registrationInfo.unwrap().nodeId())) {
map.remove(entry.getKey(), registrationInfo);
}
}
}
public void republishOwnSubs() {
Lock writeLock = republishLock.writeLock();
writeLock.lock();
try {
for (Map.Entry<String, Set<RegistrationInfo>> entry : ownSubs.entrySet()) {
String address = entry.getKey();
for (RegistrationInfo registrationInfo : entry.getValue()) {
map.put(address, new HazelcastRegistrationInfo(registrationInfo));
}
}
} finally {
writeLock.unlock();
}
}
@Override
public void entryAdded(EntryEvent<String, HazelcastRegistrationInfo> event) {
fireRegistrationUpdateEvent(event);
}
private void fireRegistrationUpdateEvent(EntryEvent<String, HazelcastRegistrationInfo> event) {
String address = event.getKey();
vertx.<List<RegistrationInfo>>executeBlocking(prom -> {
prom.complete(get(address));
}, false, ar -> {
if (ar.succeeded()) {
nodeSelector.registrationsUpdated(new RegistrationUpdateEvent(address, ar.result()));
} else {
log.trace("A failure occured while retrieving the updated registrations", ar.cause());
nodeSelector.registrationsUpdated(new RegistrationUpdateEvent(address, Collections.emptyList()));
}
});
}
@Override
public void entryEvicted(EntryEvent<String, HazelcastRegistrationInfo> event) {
}
@Override
public void entryRemoved(EntryEvent<String, HazelcastRegistrationInfo> event) {
fireRegistrationUpdateEvent(event);
}
@Override
public void entryUpdated(EntryEvent<String, HazelcastRegistrationInfo> event) {
fireRegistrationUpdateEvent(event);
}
@Override
public void mapCleared(MapEvent event) {
}
@Override
public void mapEvicted(MapEvent event) {
}
@Override
public void entryExpired(EntryEvent<String, HazelcastRegistrationInfo> event) {
}
public void close() {
map.removeEntryListener(listenerId);
}
}