package io.vertx.ext.cluster.infinispan.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.metadata.Metadata;
import org.infinispan.multimap.impl.EmbeddedMultimapCache;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryCreatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.notifications.cachelistener.filter.EventType;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.annotation.Annotation;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.util.stream.Collectors.*;
import static org.infinispan.notifications.Listener.Observation.*;
public class InfinispanAsyncMultiMap<K, V> implements AsyncMultiMap<K, V> {
private final VertxInternal vertx;
private final EmbeddedMultimapCache<Object, Object> multimapCache;
private final ConcurrentMap<K, ChoosableSet<V>> nearCache;
private final TaskQueue taskQueue;
public InfinispanAsyncMultiMap(Vertx vertx, EmbeddedMultimapCache<Object, Object> multimapCache) {
this.vertx = (VertxInternal) vertx;
this.multimapCache = multimapCache;
nearCache = new ConcurrentHashMap<>();
Set<Class<? extends Annotation>> filterAnnotations = Stream.<Class<? extends Annotation>>builder()
.add(CacheEntryCreated.class)
.add(CacheEntryModified.class)
.add(CacheEntryRemoved.class)
.build()
.collect(toSet());
multimapCache.getCache()
.addFilteredListener(new EntryListener(), new EventFilter(), new EventConverter(), filterAnnotations);
taskQueue = new TaskQueue();
}
private <T> void cfGet(CompletableFuture<T> cf, Promise<T> future) {
try {
future.complete(cf.get());
} catch (ExecutionException e) {
future.fail(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.fail(e);
}
}
@Override
public void add(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
Object kk = DataConverter.toCachedObject(k);
Object vv = DataConverter.toCachedObject(v);
vertx.getOrCreateContext().executeBlocking(fut -> {
cfGet(multimapCache.put(kk, vv), fut);
}, taskQueue, completionHandler);
}
@Override
public void get(K k, Handler<AsyncResult<ChoosableIterable<V>>> resultHandler) {
ContextInternal context = vertx.getOrCreateContext();
@SuppressWarnings("unchecked")
Queue<GetRequest<K, V>> getRequests = (Queue<GetRequest<K, V>>) context.contextData().computeIfAbsent(this, ctx -> new ArrayDeque<>());
synchronized (getRequests) {
ChoosableSet<V> entries = nearCache.get(k);
if (entries != null && entries.isInitialised() && getRequests.isEmpty()) {
context.runOnContext(v -> {
resultHandler.handle(Future.succeededFuture(entries));
});
} else {
getRequests.add(new GetRequest<>(k, resultHandler));
if (getRequests.size() == 1) {
dequeueGet(context, getRequests);
}
}
}
}
private void dequeueGet(ContextInternal context, Queue<GetRequest<K, V>> getRequests) {
GetRequest<K, V> getRequest;
for (; ; ) {
getRequest = getRequests.peek();
ChoosableSet<V> entries = nearCache.get(getRequest.key);
if (entries != null && entries.isInitialised()) {
Handler<AsyncResult<ChoosableIterable<V>>> handler = getRequest.handler;
context.runOnContext(v -> {
handler.handle(Future.succeededFuture(entries));
});
getRequests.remove();
if (getRequests.isEmpty()) {
return;
}
} else {
break;
}
}
K key = getRequest.key;
Handler<AsyncResult<ChoosableIterable<V>>> handler = getRequest.handler;
context.<ChoosableIterable<V>>executeBlocking(fut -> {
Collection<Object> collect;
try {
collect = multimapCache.get(DataConverter.toCachedObject(key)).get();
} catch (ExecutionException e) {
fut.fail(e.getCause());
return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fut.fail(e);
return;
}
Collection<V> entries = collect.stream()
.map(DataConverter::<V>fromCachedObject)
.collect(toList());
ChoosableSet<V> sids;
if (entries != null) {
sids = new ChoosableSet<>(entries.size());
for (V hid : entries) {
sids.add(hid);
}
} else {
sids = new ChoosableSet<>(0);
}
ChoosableSet<V> prev = (sids.isEmpty()) ? null : nearCache.putIfAbsent(key, sids);
if (prev != null) {
prev.merge(sids);
sids = prev;
}
sids.setInitialised();
fut.complete(sids);
}, taskQueue, res -> {
synchronized (getRequests) {
context.runOnContext(v -> {
handler.handle(res);
});
getRequests.remove();
if (!getRequests.isEmpty()) {
dequeueGet(context, getRequests);
}
}
});
}
@Override
public void remove(K k, V v, Handler<AsyncResult<Boolean>> completionHandler) {
Object kk = DataConverter.toCachedObject(k);
Object vv = DataConverter.toCachedObject(v);
vertx.getOrCreateContext().executeBlocking(fut -> {
cfGet(multimapCache.remove(kk, vv), fut);
}, taskQueue, completionHandler);
}
@Override
public void removeAllForValue(V v, Handler<AsyncResult<Void>> completionHandler) {
removeAllMatching(v::equals, completionHandler);
}
@Override
public void removeAllMatching(Predicate<V> p, Handler<AsyncResult<Void>> completionHandler) {
vertx.getOrCreateContext().executeBlocking(fut -> {
cfGet(multimapCache.remove(o -> p.test(DataConverter.fromCachedObject(o))), fut);
}, taskQueue, completionHandler);
}
public void clearCache() {
nearCache.clear();
}
@Listener(clustered = true, observation = POST)
private class EntryListener {
@CacheEntryCreated
public void entryCreated(CacheEntryCreatedEvent<Object, Object> event) {
K k = DataConverter.fromCachedObject(event.getKey());
Collection values = (Collection) event.getValue();
ChoosableSet<V> entries = nearCache.compute(k, (key, choosableSet) -> {
return choosableSet == null ? new ChoosableSet<>(values.size()) : choosableSet;
});
for (Object value : values) {
entries.add(DataConverter.fromCachedObject(value));
}
}
@CacheEntryModified
public void entryModified(CacheEntryModifiedEvent<Object, Object> event) {
K k = DataConverter.fromCachedObject(event.getKey());
ModifiedCollection modifiedCollection = (ModifiedCollection) event.getValue();
ChoosableSet<V> entries = nearCache.get(k);
if (entries != null) {
forEachModified(modifiedCollection.toAdd, entries::add);
forEachModified(modifiedCollection.toDelete, entries::remove);
}
}
private void forEachModified(Collection<Object> collection, Consumer<V> action) {
if (collection != null) {
collection.stream()
.<V>map(DataConverter::fromCachedObject)
.forEach(action);
}
}
@CacheEntryRemoved
public void entryRemoved(CacheEntryRemovedEvent<Object, Object> event) {
K k = DataConverter.fromCachedObject(event.getKey());
nearCache.remove(k);
}
}
private static class ChoosableSet<T> implements ChoosableIterable<T> {
private volatile boolean initialised;
private final Set<T> ids;
private volatile Iterator<T> iter;
public ChoosableSet(int initialSize) {
ids = new ConcurrentHashSet<>(initialSize);
}
public boolean isInitialised() {
return initialised;
}
public void setInitialised() {
this.initialised = true;
}
public void add(T elem) {
ids.add(elem);
}
public void remove(T elem) {
ids.remove(elem);
}
public void merge(ChoosableSet<T> toMerge) {
ids.addAll(toMerge.ids);
}
public boolean isEmpty() {
return ids.isEmpty();
}
@Override
public Iterator<T> iterator() {
return ids.iterator();
}
public synchronized T choose() {
if (!ids.isEmpty()) {
if (iter == null || !iter.hasNext()) {
iter = ids.iterator();
}
try {
return iter.next();
} catch (NoSuchElementException e) {
return null;
}
} else {
return null;
}
}
}
private static class GetRequest<K, V> {
final K key;
final Handler<AsyncResult<ChoosableIterable<V>>> handler;
GetRequest(K key, Handler<AsyncResult<ChoosableIterable<V>>> handler) {
this.key = key;
this.handler = handler;
}
}
@SerializeWith(EventFilterExternalizer.class)
private static class EventFilter implements CacheEventFilter<Object, Collection<Object>> {
@Override
public boolean accept(Object key, Collection<Object> oldValue, Metadata oldMetadata, Collection<Object> newValue, Metadata newMetadata, EventType eventType) {
return true;
}
}
public static class EventFilterExternalizer implements Externalizer<EventFilter> {
@Override
public void writeObject(ObjectOutput objectOutput, EventFilter eventFilter) {
}
@Override
public EventFilter readObject(ObjectInput objectInput) {
return new EventFilter();
}
}
@SerializeWith(EventConverterExternalizer.class)
private static class EventConverter implements CacheEventConverter<Object, Collection<Object>, Object> {
@Override
public Object convert(Object key, Collection<Object> oldValue, Metadata oldMetadata, Collection<Object> newValue, Metadata newMetadata, EventType eventType) {
if (eventType.getType() == Event.Type.CACHE_ENTRY_MODIFIED) {
if (oldValue != null && newValue != null) {
oldValue.removeAll(newValue);
newValue.removeAll(oldValue);
}
return new ModifiedCollection(oldValue, newValue);
}
return newValue;
}
}
public static class EventConverterExternalizer implements Externalizer<EventConverter> {
@Override
public void writeObject(ObjectOutput objectOutput, EventConverter eventConverter) {
}
@Override
public EventConverter readObject(ObjectInput objectInput) {
return new EventConverter();
}
}
@SerializeWith(ModifiedCollectionExternalizer.class)
private static class ModifiedCollection {
final Collection<Object> toDelete;
final Collection<Object> toAdd;
private ModifiedCollection(Collection<Object> toDelete, Collection<Object> toAdd) {
this.toDelete = toDelete;
this.toAdd = toAdd;
}
}
public static class ModifiedCollectionExternalizer implements Externalizer<ModifiedCollection> {
@Override
public void writeObject(ObjectOutput objectOutput, ModifiedCollection modifiedCollection) throws IOException {
objectOutput.writeObject(modifiedCollection.toDelete);
objectOutput.writeObject(modifiedCollection.toAdd);
}
@Override
@SuppressWarnings("unchecked")
public ModifiedCollection readObject(ObjectInput objectInput) throws IOException, ClassNotFoundException {
return new ModifiedCollection((Collection<Object>) objectInput.readObject(), (Collection<Object>) objectInput.readObject());
}
}
}