package io.vertx.redis.client.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.ErrorType;
import java.util.List;
import java.util.Random;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
public class RedisSentinelClient implements Redis {
private static final Random RANDOM = new Random();
private static class Pair<L, R> {
final L left;
final R right;
Pair(L left, R right) {
this.left = left;
this.right = right;
}
}
private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClient.class);
private final Vertx vertx;
private final RedisOptions options;
private Redis sentinel;
private RedisClient redis;
private RedisSentinelClient(Vertx vertx, RedisOptions options) {
this.vertx = vertx;
this.options = options;
}
@Override
public Redis connect(Handler<AsyncResult<Redis>> onCreate) {
createClientInternal(vertx, options, RedisRole.SENTINEL, create -> {
if (create.failed()) {
LOG.error("Redis PUB/SUB wrap failed.", create.cause());
return;
}
sentinel = create.result();
sentinel
.handler(msg -> {
if (msg.type() == ResponseType.MULTI) {
if ("MESSAGE".equalsIgnoreCase(msg.get(0).toString())) {
if (redis != null) {
redis.fail(ErrorType.create("SWITCH-MASTER Received +switch-master message from Redis Sentinel."));
} else {
LOG.warn("Received +switch-master message from Redis Sentinel.");
}
}
}
});
sentinel.send(cmd(SUBSCRIBE).arg("+switch-master"), send -> {
if (send.failed()) {
LOG.error("Unable to subscribe to Sentinel PUBSUB", send.cause());
sentinel.close();
}
});
sentinel.exceptionHandler(t -> {
LOG.error("Unhandled exception in Sentinel PUBSUB", t);
sentinel.close();
});
});
createClientInternal(vertx, options, options.getRole(), create -> {
if (create.failed()) {
onCreate.handle(create);
return;
}
redis = (RedisClient) create.result();
onCreate.handle(Future.succeededFuture(this));
});
return this;
}
@Override
public void close() {
sentinel.close();
redis.close();
}
@Override
public Redis exceptionHandler(Handler<Throwable> handler) {
redis.exceptionHandler(handler);
return this;
}
@Override
public Redis endHandler(Handler<Void> handler) {
redis.endHandler(handler);
return this;
}
@Override
public Redis handler(Handler<Response> handler) {
redis.handler(handler);
return this;
}
@Override
public Redis pause() {
redis.pause();
return this;
}
@Override
public Redis resume() {
redis.resume();
return null;
}
@Override
public Redis send(Request command, Handler<AsyncResult<Response>> handler) {
redis.send(command, handler);
return this;
}
@Override
public Redis batch(List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
redis.batch(commands, handler);
return this;
}
@Override
public SocketAddress socketAddress() {
return redis.socketAddress();
}
@Override
public Redis fetch(long amount) {
redis.fetch(amount);
return this;
}
public static Redis create(Vertx vertx, RedisOptions options) {
return new RedisSentinelClient(vertx, options);
}
private static void createClientInternal(Vertx vertx, RedisOptions options, RedisRole role, Handler<AsyncResult<Redis>> onCreate) {
final Handler<AsyncResult<SocketAddress>> createAndConnect = resolve -> {
if (resolve.failed()) {
onCreate.handle(Future.failedFuture(resolve.cause()));
return;
}
RedisClient.create(vertx, options, resolve.result()).connect(onCreate);
};
switch (role) {
case SENTINEL:
resolveClient(vertx, RedisSentinelClient::isSentinelOk, options, createAndConnect);
break;
case MASTER:
resolveClient(vertx, RedisSentinelClient::getMasterFromEndpoint, options, createAndConnect);
break;
case SLAVE:
resolveClient(vertx, RedisSentinelClient::getSlaveFromEndpoint, options, createAndConnect);
}
}
private static void resolveClient(final Vertx vertx, final Resolver checkEndpointFn, final RedisOptions options, final Handler<AsyncResult<SocketAddress>> callback) {
iterate(0, vertx, checkEndpointFn, options, iterate -> {
if (iterate.failed()) {
callback.handle(Future.failedFuture(iterate.cause()));
} else {
final Pair<Integer, SocketAddress> found = iterate.result();
final List<SocketAddress> endpoints = options.getEndpoints();
SocketAddress endpoint = endpoints.get(found.left);
endpoints.set(found.left, endpoints.get(0));
endpoints.set(0, endpoint);
callback.handle(Future.succeededFuture(found.right));
}
});
}
private static void iterate(final int idx, final Vertx vertx, final Resolver checkEndpointFn, final RedisOptions argument, final Handler<AsyncResult<Pair<Integer, SocketAddress>>> resultHandler) {
final List<SocketAddress> endpoints = argument.getEndpoints();
if (idx >= endpoints.size()) {
resultHandler.handle(Future.failedFuture("No more endpoints in chain."));
return;
}
checkEndpointFn.resolve(vertx, endpoints.get(idx), argument, res -> {
if (res.succeeded()) {
resultHandler.handle(Future.succeededFuture(new Pair<>(idx, res.result())));
} else {
iterate(idx + 1, vertx, checkEndpointFn, argument, resultHandler);
}
});
}
private static void isSentinelOk(Vertx vertx, SocketAddress endpoint, RedisOptions argument, Handler<AsyncResult<SocketAddress>> handler) {
RedisClient.create(vertx, argument, endpoint).connect(onCreate -> {
if (onCreate.failed()) {
handler.handle(Future.failedFuture(onCreate.cause()));
return;
}
final Redis conn = onCreate.result();
conn.send(cmd(PING), info -> {
if (info.failed()) {
handler.handle(Future.failedFuture(info.cause()));
return;
}
handler.handle(Future.succeededFuture(endpoint));
conn.close();
});
});
}
private static void getMasterFromEndpoint(Vertx vertx, SocketAddress endpoint, RedisOptions options, Handler<AsyncResult<SocketAddress>> handler) {
RedisClient.create(vertx, options, endpoint).connect(onCreate -> {
if (onCreate.failed()) {
handler.handle(Future.failedFuture(onCreate.cause()));
return;
}
final Redis conn = onCreate.result();
final String masterName = options.getMasterName();
conn.send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName), getMasterAddrByName -> {
if (getMasterAddrByName.failed()) {
handler.handle(Future.failedFuture(getMasterAddrByName.cause()));
return;
}
final Response response = getMasterAddrByName.result();
handler.handle(
Future.succeededFuture(SocketAddress.inetSocketAddress(response.get(1).toInteger(), response.get(0).toString())));
conn.close();
});
});
}
private static void getSlaveFromEndpoint(Vertx vertx, SocketAddress endpoint, RedisOptions options, Handler<AsyncResult<SocketAddress>> handler) {
RedisClient.create(vertx, options, endpoint).connect(onCreate -> {
if (onCreate.failed()) {
handler.handle(Future.failedFuture(onCreate.cause()));
return;
}
final Redis conn = onCreate.result();
final String masterName = options.getMasterName();
conn.send(cmd(SENTINEL).arg("SLAVES").arg(masterName), sentinelSlaves -> {
if (sentinelSlaves.failed()) {
handler.handle(Future.failedFuture(sentinelSlaves.cause()));
return;
}
final Response response = sentinelSlaves.result();
if (response.size() == 0) {
handler.handle(Future.failedFuture("No slaves linked to the master: " + masterName));
} else {
Response slaveInfoArr = response.get(RANDOM.nextInt(response.size()));
if ((slaveInfoArr.size() % 2) > 0) {
handler.handle(Future.failedFuture("Corrupted response from the sentinel"));
} else {
int port = 6379;
String ip = null;
for (int i = 0; i < slaveInfoArr.size(); i += 2) {
if ("port".equals(slaveInfoArr.get(i).toString())) {
port = slaveInfoArr.get(i + 1).toInteger();
}
if ("ip".equals(slaveInfoArr.get(i).toString())) {
ip = slaveInfoArr.get(i + 1).toString();
}
}
if (ip == null) {
handler.handle(Future.failedFuture("No IP found for a SLAVE node!"));
} else {
handler.handle(Future.succeededFuture(SocketAddress.inetSocketAddress(port, ip)));
}
}
}
conn.close();
});
});
}
}