package io.vertx.ext.consul.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.consul.*;
public abstract class WatchImpl<T> implements Watch<T> {
private static final String BLOCKING_WAIT = "10m";
private static final int DELAY_LIMIT_SECONDS = 180;
public static class Key extends WatchImpl<KeyValue> {
private final String key;
public Key(String key, Vertx vertx, ConsulClientOptions options) {
super(vertx, ConsulClient.create(vertx, options));
this.key = key;
}
@Override
void wait(long index, Handler<AsyncResult<State<KeyValue>>> handler) {
BlockingQueryOptions options = new BlockingQueryOptions().setWait(BLOCKING_WAIT).setIndex(index);
consulClient.getValueWithOptions(key, options, h ->
handler.handle(h.map(kv -> new State<KeyValue>(kv, kv.getModifyIndex()))));
}
}
public static class KeyPrefix extends WatchImpl<KeyValueList> {
private final String keyPrefix;
public KeyPrefix(String keyPrefix, Vertx vertx, ConsulClientOptions options) {
super(vertx, ConsulClient.create(vertx, options));
this.keyPrefix = keyPrefix;
}
@Override
void wait(long index, Handler<AsyncResult<State<KeyValueList>>> handler) {
BlockingQueryOptions options = new BlockingQueryOptions().setWait(BLOCKING_WAIT).setIndex(index);
consulClient.getValuesWithOptions(keyPrefix, options, h ->
handler.handle(h.map(kv -> new State<KeyValueList>(kv, kv.getIndex()))));
}
}
public static class Services extends WatchImpl<ServiceList> {
public Services(Vertx vertx, ConsulClientOptions options) {
super(vertx, ConsulClient.create(vertx, options));
}
@Override
void wait(long index, Handler<AsyncResult<State<ServiceList>>> handler) {
BlockingQueryOptions options = new BlockingQueryOptions().setWait(BLOCKING_WAIT).setIndex(index);
consulClient.catalogServicesWithOptions(options, h ->
handler.handle(h.map(services -> new State<ServiceList>(services, services.getIndex()))));
}
}
public static class Service extends WatchImpl<ServiceEntryList> {
private final String service;
public Service(String service, Vertx vertx, ConsulClientOptions options) {
super(vertx, ConsulClient.create(vertx, options));
this.service = service;
}
@Override
void wait(long index, Handler<AsyncResult<State<ServiceEntryList>>> handler) {
BlockingQueryOptions bOpts = new BlockingQueryOptions().setWait(BLOCKING_WAIT).setIndex(index);
ServiceQueryOptions sOpts = new ServiceQueryOptions().setBlockingOptions(bOpts);
consulClient.healthServiceNodesWithOptions(service, false, sOpts, h ->
handler.handle(h.map(services -> new State<ServiceEntryList>(services, services.getIndex()))));
}
}
public static class Events extends WatchImpl<EventList> {
private final String event;
public Events(String event, Vertx vertx, ConsulClientOptions options) {
super(vertx, ConsulClient.create(vertx, options));
this.event = event;
}
@Override
void wait(long index, Handler<AsyncResult<State<EventList>>> handler) {
BlockingQueryOptions bOpts = new BlockingQueryOptions().setWait(BLOCKING_WAIT).setIndex(index);
EventListOptions eOpts = new EventListOptions().setBlockingOptions(bOpts).setName(event);
consulClient.listEventsWithOptions(eOpts, h ->
handler.handle(h.map(events -> new State<EventList>(events, events.getIndex()))));
}
}
public static class Nodes extends WatchImpl<NodeList> {
public Nodes(Vertx vertx, ConsulClientOptions options) {
super(vertx, ConsulClient.create(vertx, options));
}
@Override
void wait(long index, Handler<AsyncResult<State<NodeList>>> handler) {
BlockingQueryOptions bOpts = new BlockingQueryOptions().setWait(BLOCKING_WAIT).setIndex(index);
NodeQueryOptions qOpts = new NodeQueryOptions().setBlockingOptions(bOpts);
consulClient.catalogNodesWithOptions(qOpts, h ->
handler.handle(h.map(nodes -> new State<NodeList>(nodes, nodes.getIndex()))));
}
}
private volatile boolean started = false;
private volatile boolean stopped = false;
private Handler<WatchResult<T>> handler;
private State<T> current = new State<>(null, 0);
protected final Vertx vertx;
protected final ConsulClient consulClient;
private WatchImpl(Vertx vertx, ConsulClient consulClient) {
this.vertx = vertx;
this.consulClient = consulClient;
}
abstract void wait(long index, Handler<AsyncResult<State<T>>> handler);
@Override
public Watch<T> setHandler(Handler<WatchResult<T>> handler) {
this.handler = handler;
return this;
}
@Override
public synchronized Watch<T> start() {
if (!started) {
started = true;
vertx.runOnContext(v -> go());
} else {
throw new IllegalStateException("Watch already started");
}
return this;
}
@Override
public synchronized void stop() {
if (!started) {
throw new IllegalStateException("An unstarted watch");
}
if (stopped) {
throw new IllegalStateException("Watch already stopped");
}
stopped = true;
consulClient.close();
}
private void go() {
fetch(0, newState -> {
if (newState.equals(current)) {
vertx.setTimer(1000, l -> go());
} else {
State<T> prevState = current;
current = newState;
sendSuccess(prevState.value, newState.value);
vertx.runOnContext(v -> go());
}
});
}
private void fetch(long cnt, Handler<State<T>> result) {
if (stopped) {
return;
}
wait(current.index, h -> {
if (stopped) {
return;
}
if (h.succeeded()) {
result.handle(h.result());
} else {
sendFail(current.value, h.cause());
long newCnt = cnt + 1;
long delay = newCnt * newCnt;
if (delay > DELAY_LIMIT_SECONDS) {
delay = DELAY_LIMIT_SECONDS;
}
vertx.setTimer(delay * 1000, l -> fetch(newCnt, result));
}
});
}
private void sendSuccess(T prevValue, T nextValue) {
if (!stopped && handler != null) {
handler.handle(new WatchResult<T>() {
@Override
public T prevResult() {
return prevValue;
}
@Override
public T nextResult() {
return nextValue;
}
@Override
public Throwable cause() {
return null;
}
@Override
public boolean succeeded() {
return true;
}
@Override
public boolean failed() {
return false;
}
});
}
}
private void sendFail(T prevValue, Throwable cause) {
if (!stopped && handler != null) {
handler.handle(new WatchResult<T>() {
@Override
public T prevResult() {
return prevValue;
}
@Override
public T nextResult() {
return null;
}
@Override
public Throwable cause() {
return cause;
}
@Override
public boolean succeeded() {
return false;
}
@Override
public boolean failed() {
return true;
}
});
}
}
static class State<T> {
final T value;
final long index;
State(T v, long i) {
value = v;
index = i;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State<?> state = (State<?>) o;
return index == state.index && (value != null ? value.equals(state.value) : state.value == null);
}
@Override
public int hashCode() {
int result = value != null ? value.hashCode() : 0;
result = 31 * result + (int) (index ^ (index >>> 32));
return result;
}
}
}