package io.vertx.redis.client.impl;
import io.vertx.core.*;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.ErrorType;
import java.util.List;
import java.util.SplittableRandom;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
public class RedisSentinelClient extends BaseRedisClient implements Redis {
private static final SplittableRandom RANDOM = new SplittableRandom();
private static class Pair<L, R> {
final L left;
final R right;
Pair(L left, R right) {
this.left = left;
this.right = right;
}
}
private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClient.class);
private final RedisOptions options;
private RedisConnection sentinel;
public RedisSentinelClient(Vertx vertx, RedisOptions options) {
super(vertx, options);
this.options = options;
if (options.getMaxPoolSize() < 2) {
throw new IllegalStateException("Invalid options: maxPoolSize must be at least 2");
}
if (options.getMaxPoolWaiting() < options.getMaxPoolSize()) {
throw new IllegalStateException("Invalid options: maxPoolWaiting < maxPoolSize");
}
}
@Override
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();
createConnectionInternal(options, options.getRole(), createConnection -> {
if (createConnection.failed()) {
promise.fail(createConnection.cause());
return;
}
final RedisConnection conn = createConnection.result();
createConnectionInternal(options, RedisRole.SENTINEL, create -> {
if (create.failed()) {
LOG.error("Redis PUB/SUB wrap failed.", create.cause());
return;
}
sentinel = create.result();
sentinel
.handler(msg -> {
if (msg.type() == ResponseType.MULTI) {
if ("MESSAGE".equalsIgnoreCase(msg.get(0).toString())) {
if (conn != null) {
((RedisStandaloneConnection) conn).fail(ErrorType.create("SWITCH-MASTER Received +switch-master message from Redis Sentinel."));
} else {
LOG.warn("Received +switch-master message from Redis Sentinel.");
}
}
}
});
sentinel.send(cmd(SUBSCRIBE).arg("+switch-master"), send -> {
if (send.failed()) {
promise.fail(send.cause());
} else {
promise.complete(new RedisSentinelConnection(conn, sentinel));
}
});
sentinel.exceptionHandler(t -> {
if (conn != null) {
((RedisStandaloneConnection) conn).fail(t);
} else {
LOG.error("Unhandled exception in Sentinel PUBSUB", t);
}
});
});
});
return promise.future();
}
private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<RedisConnection>> onCreate) {
final Handler<AsyncResult<String>> createAndConnect = resolve -> {
if (resolve.failed()) {
onCreate.handle(Future.failedFuture(resolve.cause()));
return;
}
if (role == RedisRole.SENTINEL) {
final RedisURI uri = new RedisURI(resolve.result());
connectionManager.getConnection(getSentinelEndpoint(uri), null).onComplete(onCreate);
} else {
connectionManager.getConnection(resolve.result(), null).onComplete(onCreate);
}
};
switch (role) {
case SENTINEL:
resolveClient(this::isSentinelOk, options, createAndConnect);
break;
case MASTER:
resolveClient(this::getMasterFromEndpoint, options, createAndConnect);
break;
case REPLICA:
resolveClient(this::getReplicaFromEndpoint, options, createAndConnect);
break;
}
}
private static void resolveClient(final Resolver checkEndpointFn, final RedisOptions options, final Handler<AsyncResult<String>> callback) {
iterate(0, checkEndpointFn, options, iterate -> {
if (iterate.failed()) {
callback.handle(Future.failedFuture(iterate.cause()));
} else {
final Pair<Integer, String> found = iterate.result();
final List<String> endpoints = options.getEndpoints();
String endpoint = endpoints.get(found.left);
endpoints.set(found.left, endpoints.get(0));
endpoints.set(0, endpoint);
callback.handle(Future.succeededFuture(found.right));
}
});
}
private static void iterate(final int idx, final Resolver checkEndpointFn, final RedisOptions argument, final Handler<AsyncResult<Pair<Integer, String>>> resultHandler) {
final List<String> endpoints = argument.getEndpoints();
if (idx >= endpoints.size()) {
resultHandler.handle(Future.failedFuture("No more endpoints in chain."));
return;
}
checkEndpointFn.resolve(endpoints.get(idx), argument, res -> {
if (res.succeeded()) {
resultHandler.handle(Future.succeededFuture(new Pair<>(idx, res.result())));
} else {
iterate(idx + 1, checkEndpointFn, argument, resultHandler);
}
});
}
private void isSentinelOk(String endpoint, RedisOptions argument, Handler<AsyncResult<String>> handler) {
final RedisURI uri = new RedisURI(endpoint);
connectionManager.getConnection(getSentinelEndpoint(uri), null)
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(conn -> {
conn.send(cmd(PING), ping -> {
if (ping.failed()) {
handler.handle(Future.failedFuture(ping.cause()));
} else {
handler.handle(Future.succeededFuture(endpoint));
}
conn.close();
});
});
}
private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
final RedisURI uri = new RedisURI(endpoint);
connectionManager.getConnection(getSentinelEndpoint(uri), null)
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(conn -> {
final String masterName = options.getMasterName();
conn.send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName), getMasterAddrByName -> {
if (getMasterAddrByName.failed()) {
handler.handle(Future.failedFuture(getMasterAddrByName.cause()));
} else {
final Response response = getMasterAddrByName.result();
final String host = response.get(0).toString().contains(":") ? "[" + response.get(0).toString() + "]" : response.get(0).toString();
handler.handle(
Future.succeededFuture(uri.protocol() + "://" + uri.userinfo() + host + ":" + response.get(1).toInteger()));
}
conn.close();
});
});
}
private void getReplicaFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
final RedisURI uri = new RedisURI(endpoint);
connectionManager.getConnection(getSentinelEndpoint(uri), null)
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(conn -> {
final String masterName = options.getMasterName();
conn.send(cmd(SENTINEL).arg("SLAVES").arg(masterName), sentinelReplicas -> {
if (sentinelReplicas.failed()) {
handler.handle(Future.failedFuture(sentinelReplicas.cause()));
} else {
final Response response = sentinelReplicas.result();
if (response.size() == 0) {
handler.handle(Future.failedFuture("No replicas linked to the master: " + masterName));
} else {
Response replicaInfoArr = response.get(RANDOM.nextInt(response.size()));
if ((replicaInfoArr.size() % 2) > 0) {
handler.handle(Future.failedFuture("Corrupted response from the sentinel"));
} else {
int port = 6379;
String ip = null;
if (replicaInfoArr.containsKey("port")) {
port = replicaInfoArr.get("port").toInteger();
}
if (replicaInfoArr.containsKey("ip")) {
ip = replicaInfoArr.get("ip").toString();
}
if (ip == null) {
handler.handle(Future.failedFuture("No IP found for a REPLICA node!"));
} else {
final String host = ip.contains(":") ? "[" + ip + "]" : ip;
handler.handle(Future.succeededFuture(uri.protocol() + "://" + uri.userinfo() + host + ":" + port));
}
}
}
}
conn.close();
});
});
}
private String getSentinelEndpoint(RedisURI uri) {
StringBuilder sb = new StringBuilder();
if (uri.unix()) {
sb.append("unix://");
sb.append(uri.socketAddress().path());
} else {
sb.append("redis");
if (uri.ssl()) {
sb.append('s');
}
sb.append("://");
sb.append(uri.userinfo());
sb.append(uri.socketAddress().host());
sb.append(':');
sb.append(uri.socketAddress().port());
}
return sb.toString();
}
}