package io.vertx.spi.cluster.zookeeper.impl;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.core.spi.cluster.ClusterManager;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

As Zookeeper do not support set TTL value to zkNode, we have to handle it by application self. 1. Publish a specific message to all consumers that listen ttl action. 2. All Consumer could receive this message in same time, so we have to make distribution lock as delete action can only be execute one time. 3. Check key is exist, and delete if exist, note: we just make a timer to delete if timeout, and we also save timer if we want to cancel this action in later. 4. Release lock.

Still if a put without ttl happens after the put with ttl but before the timeout expires, the most recent update will be removed. For this case, we should add a field to indicate that if we should stop timer in the cluster.

Created by stream.

/** * As Zookeeper do not support set TTL value to zkNode, we have to handle it by application self. * 1. Publish a specific message to all consumers that listen ttl action. * 2. All Consumer could receive this message in same time, so we have to make distribution lock as delete action can only * be execute one time. * 3. Check key is exist, and delete if exist, note: we just make a timer to delete if timeout, and we also save timer if we want to cancel * this action in later. * 4. Release lock. * <p> * Still if a put without ttl happens after the put with ttl but before the timeout expires, the most recent update will be removed. * For this case, we should add a field to indicate that if we should stop timer in the cluster. * <p> * Created by stream. */
public class AsyncMapTTLMonitor<K, V> { private final Vertx vertx; private final ClusterManager clusterManager; private final Map<String, ZKAsyncMap<K, V>> keyPathAndAsyncMap = new ConcurrentHashMap<>(); static final String TTL_KEY_HANDLER_ADDRESS = "__VERTX_ZK_TTL_HANDLER_ADDRESS"; static final String TTL_KEY_BODY_KEY_PATH = "keyPath"; static final String TTL_KEY_BODY_TIMEOUT = "timeout"; static final String TTL_KEY_IS_CANCEL = "isCancel"; private static final String TTL_KEY_LOCK = "__VERTX_ZK_TTL_LOCK"; private static final long TTL_KEY_GET_LOCK_TIMEOUT = 1500; private final LocalMap<String, Long> ttlTimer; private MessageConsumer<JsonObject> consumer; private volatile static AsyncMapTTLMonitor instance; private static final Logger logger = LoggerFactory.getLogger(AsyncMapTTLMonitor.class); @SuppressWarnings("unchecked") public static <K, V> AsyncMapTTLMonitor<K, V> getInstance(Vertx vertx, ClusterManager clusterManager) { if (instance == null) { synchronized (AsyncMapTTLMonitor.class) { if (instance == null) { instance = new AsyncMapTTLMonitor<K, V>(vertx, clusterManager); } } } return instance; } private AsyncMapTTLMonitor(Vertx vertx, ClusterManager clusterManager) { this.ttlTimer = vertx.sharedData().getLocalMap("__VERTX_ZK_TTL_TIMER"); this.vertx = vertx; this.clusterManager = clusterManager; initConsumer(); } private void initConsumer() { this.consumer = vertx.eventBus().consumer(TTL_KEY_HANDLER_ADDRESS, event -> { JsonObject body = event.body(); String keyPath = body.getString(TTL_KEY_BODY_KEY_PATH); if (keyPathAndAsyncMap.get(keyPath) == null) return; if (body.getBoolean(TTL_KEY_IS_CANCEL, false)) { long timerID = ttlTimer.remove(body.getString(keyPath)); if (timerID > 0) vertx.cancelTimer(timerID); } else { long timerID = vertx.setTimer(body.getLong(TTL_KEY_BODY_TIMEOUT), aLong -> { clusterManager.getLockWithTimeout(TTL_KEY_LOCK, TTL_KEY_GET_LOCK_TIMEOUT, lockAsyncResult -> { ZKAsyncMap<K, V> zkAsyncMap = keyPathAndAsyncMap.get(keyPath); if (lockAsyncResult.succeeded()) { zkAsyncMap.checkExists(keyPath) .compose(checkResult -> checkResult ? zkAsyncMap.delete(keyPath, null) : Future.succeededFuture()) .setHandler(deleteResult -> { if (deleteResult.succeeded()) { lockAsyncResult.result().release(); logger.debug(String.format("The key %s have arrived time, and have been deleted.", keyPath)); } else { logger.error(String.format("Delete expire key %s failed.", keyPath), deleteResult.cause()); } }); } else { logger.error("get TTL lock failed.", lockAsyncResult.cause()); } }); } ); ttlTimer.put(keyPath, timerID); } } ); } void addAsyncMapWithPath(String keyPath, ZKAsyncMap<K, V> asyncMap) { keyPathAndAsyncMap.putIfAbsent(keyPath, asyncMap); } public void stop() { consumer.unregister(); keyPathAndAsyncMap.clear(); instance = null; } }