package io.vertx.redis.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.RedisOptions;
import io.vertx.redis.client.*;
import io.vertx.redis.sentinel.RedisSentinel;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static io.vertx.redis.client.Command.*;
public class RedisSentinelClientImpl implements RedisSentinel {
private final Vertx vertx;
private final io.vertx.redis.RedisOptions options;
private final AtomicReference<CompletableFuture<Redis>> redis = new AtomicReference<>();
public RedisSentinelClientImpl(Vertx vertx, RedisOptions options) {
this.vertx = vertx;
this.options = options;
}
@SuppressWarnings("unchecked")
private static List<?> toPayload(Object... parameters) {
List<Object> result = new ArrayList<>(parameters.length);
for (Object param : parameters) {
if (param instanceof JsonArray) {
param = ((JsonArray) param).getList();
}
if (param instanceof JsonObject) {
param = ((JsonObject) param).getMap();
}
if (param instanceof Collection) {
((Collection) param).stream().filter(Objects::nonNull).forEach(result::add);
} else if (param instanceof Map) {
for (Map.Entry<?, ?> pair : ((Map<?, ?>) param).entrySet()) {
result.add(pair.getKey());
result.add(pair.getValue());
}
} else if (param instanceof Stream) {
((Stream) param).forEach(e -> {
if (e instanceof Object[]) {
Collections.addAll(result, (Object[]) e);
} else {
result.add(e);
}
});
} else if (param instanceof Buffer) {
result.add(((Buffer) param).getBytes());
} else if (param != null) {
result.add(param);
}
}
return result;
}
private void send(io.vertx.redis.client.Command command, List arguments, Handler<AsyncResult<Response>> handler) {
final Request req = Request.cmd(command);
if (arguments != null) {
for (Object o : arguments) {
if (o == null) {
req.nullArg();
} else {
req.arg(o.toString());
}
}
}
CompletableFuture<Redis> fut = redis.get();
if (fut == null) {
CompletableFuture f = new CompletableFuture<>();
if (redis.compareAndSet(null, f)) {
fut = f;
Redis.createClient(vertx, new io.vertx.redis.client.RedisOptions()
.setNetClientOptions(options)
.setEndpoint(options.isDomainSocket() ? SocketAddress.domainSocketAddress(options.getDomainSocketAddress()) : SocketAddress.inetSocketAddress(options.getPort(), options.getHost()))
.setPassword(options.getAuth())
.setSelect(options.getSelect())).connect(onReady -> {
if (onReady.succeeded()) {
f.complete(onReady.result());
} else {
f.completeExceptionally(onReady.cause());
}
});
} else {
fut = redis.get();
}
}
fut.whenComplete((client, err) -> {
if (err == null) {
client.send(req, handler);
} else {
handler.handle(Future.failedFuture(err));
}
});
}
private void sendString(io.vertx.redis.client.Command command, List arguments, Handler<AsyncResult<String>> handler) {
send(command, arguments, ar -> {
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
} else {
handler.handle(Future.succeededFuture(ar.result().toString()));
}
});
}
private static JsonArray toJsonArray(Response response) {
final JsonArray json = new JsonArray();
for (Response r : response) {
switch (r.type()) {
case INTEGER:
json.add(r.toLong());
break;
case SIMPLE:
case BULK:
json.add(r.toString());
break;
case MULTI:
json.add(toJsonArray(r));
break;
}
}
return json;
}
private void sendJsonArray(io.vertx.redis.client.Command command, List arguments, Handler<AsyncResult<JsonArray>> handler) {
send(command, arguments, ar -> {
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
} else {
handler.handle(Future.succeededFuture(toJsonArray(ar.result())));
}
});
}
private void sendVoid(io.vertx.redis.client.Command command, List arguments, Handler<AsyncResult<Void>> handler) {
send(command, arguments, ar -> {
if (ar.failed()) {
handler.handle(Future.failedFuture(ar.cause()));
} else {
handler.handle(Future.succeededFuture());
}
});
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
CompletableFuture<Redis> prev = redis.getAndSet(null);
if (prev != null) {
prev.whenComplete((client, err) -> {
if (err == null) {
client.close();
if (handler != null) {
handler.handle(Future.succeededFuture());
}
} else {
if (handler != null) {
handler.handle(Future.failedFuture(err.getCause()));
}
}
});
}
}
@Override
public RedisSentinel masters(Handler<AsyncResult<JsonArray>> handler) {
sendJsonArray(SENTINEL, toPayload("MASTERS"), handler);
return this;
}
@Override
public RedisSentinel master(String name, Handler<AsyncResult<JsonArray>> handler) {
sendJsonArray(SENTINEL, toPayload("MASTER", name), handler);
return this;
}
@Override
public RedisSentinel slaves(String name, Handler<AsyncResult<JsonArray>> handler) {
sendJsonArray(SENTINEL, toPayload("SLAVES", name), handler);
return this;
}
@Override
public RedisSentinel sentinels(String name, Handler<AsyncResult<JsonArray>> handler) {
sendJsonArray(SENTINEL, toPayload("SENTINELS", name), handler);
return this;
}
@Override
public RedisSentinel getMasterAddrByName(String name, Handler<AsyncResult<JsonArray>> handler) {
sendJsonArray(SENTINEL, toPayload("GET-MASTER-ADDR-BY-NAME", name), handler);
return this;
}
@Override
public RedisSentinel reset(String pattern, Handler<AsyncResult<Void>> handler) {
sendVoid(SENTINEL, toPayload("RESET", pattern), handler);
return this;
}
@Override
public RedisSentinel failover(String name, Handler<AsyncResult<String>> handler) {
sendString(SENTINEL, toPayload("FAILOVER", name), handler);
return this;
}
@Override
public RedisSentinel ckquorum(String name, Handler<AsyncResult<String>> handler) {
sendString(SENTINEL, toPayload("CKQUORUM", name), handler);
return this;
}
@Override
public RedisSentinel flushConfig(Handler<AsyncResult<Void>> handler) {
sendVoid(SENTINEL, toPayload("FLUSHCONFIG"), handler);
return this;
}
}