package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.ErrorType;

import java.util.List;
import java.util.Random;

import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;

public class RedisSentinelClient implements Redis {

  // We don't need to be secure, we just want so simple
  // randomization to avoid picking the same slave all the time
  private static final Random RANDOM = new Random();

  private static class Pair<L, R> {
    final L left;
    final R right;

    Pair(L left, R right) {
      this.left = left;
      this.right = right;

  private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClient.class);

  private final Vertx vertx;
  private final RedisOptions options;

  private Redis sentinel;
  private RedisClient redis;

  private RedisSentinelClient(Vertx vertx, RedisOptions options) {
    this.vertx = vertx;
    this.options = options;

  public Redis connect(Handler<AsyncResult<Redis>> onCreate) {
    // sentinel (HA) requires 2 connections, one to watch for sentinel events and the connection itself
    createClientInternal(vertx, options, RedisRole.SENTINEL, create -> {
      if (create.failed()) {
        LOG.error("Redis PUB/SUB wrap failed.", create.cause());

      sentinel = create.result();

        .handler(msg -> {
          if (msg.type() == ResponseType.MULTI) {
            if ("MESSAGE".equalsIgnoreCase(msg.get(0).toString())) {
              // we don't care about the payload
              if (redis != null) {
                redis.fail(ErrorType.create("SWITCH-MASTER Received +switch-master message from Redis Sentinel."));
              } else {
                LOG.warn("Received +switch-master message from Redis Sentinel.");

      sentinel.send(cmd(SUBSCRIBE).arg("+switch-master"), send -> {
        if (send.failed()) {
          LOG.error("Unable to subscribe to Sentinel PUBSUB", send.cause());

      sentinel.exceptionHandler(t -> {
        LOG.error("Unhandled exception in Sentinel PUBSUB", t);

    createClientInternal(vertx, options, options.getRole(), create -> {
      if (create.failed()) {

      redis = (RedisClient) create.result();


    return this;

  public void close() {

  public Redis exceptionHandler(Handler<Throwable> handler) {
    return this;

  public Redis endHandler(Handler<Void> handler) {
    return this;

  public Redis handler(Handler<Response> handler) {
    return this;

  public Redis pause() {
    return this;

  public Redis resume() {
    return null;

  public Redis send(Request command, Handler<AsyncResult<Response>> handler) {
    redis.send(command, handler);
    return this;

  public Redis batch(List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
    redis.batch(commands, handler);
    return this;

  public SocketAddress socketAddress() {
    return redis.socketAddress();

  public Redis fetch(long amount) {
    return this;

  public static Redis create(Vertx vertx, RedisOptions options) {
    return new RedisSentinelClient(vertx, options);

  private static void createClientInternal(Vertx vertx, RedisOptions options, RedisRole role, Handler<AsyncResult<Redis>> onCreate) {

    final Handler<AsyncResult<SocketAddress>> createAndConnect = resolve -> {
      if (resolve.failed()) {
      // wrap a new client
      RedisClient.create(vertx, options, resolve.result()).connect(onCreate);

    switch (role) {
      case SENTINEL:
        resolveClient(vertx, RedisSentinelClient::isSentinelOk, options, createAndConnect);

      case MASTER:
        resolveClient(vertx, RedisSentinelClient::getMasterFromEndpoint, options, createAndConnect);

      case SLAVE:
        resolveClient(vertx, RedisSentinelClient::getSlaveFromEndpoint, options, createAndConnect);

/** * We use the algorithm from http://redis.io/topics/sentinel-clients * to get a sentinel client and then do 'stuff' with it */
private static void resolveClient(final Vertx vertx, final Resolver checkEndpointFn, final RedisOptions options, final Handler<AsyncResult<SocketAddress>> callback) { // Because finding the master is going to be an async list we will terminate // when we find one then use promises... iterate(0, vertx, checkEndpointFn, options, iterate -> { if (iterate.failed()) { callback.handle(Future.failedFuture(iterate.cause())); } else { final Pair<Integer, SocketAddress> found = iterate.result(); // This is the endpoint that has responded so stick it on the top of // the list final List<SocketAddress> endpoints = options.getEndpoints(); SocketAddress endpoint = endpoints.get(found.left); endpoints.set(found.left, endpoints.get(0)); endpoints.set(0, endpoint); // now return the right address callback.handle(Future.succeededFuture(found.right)); } }); } private static void iterate(final int idx, final Vertx vertx, final Resolver checkEndpointFn, final RedisOptions argument, final Handler<AsyncResult<Pair<Integer, SocketAddress>>> resultHandler) { // stop condition final List<SocketAddress> endpoints = argument.getEndpoints(); if (idx >= endpoints.size()) { resultHandler.handle(Future.failedFuture("No more endpoints in chain.")); return; } // attempt to perform operation checkEndpointFn.resolve(vertx, endpoints.get(idx), argument, res -> { if (res.succeeded()) { resultHandler.handle(Future.succeededFuture(new Pair<>(idx, res.result()))); } else { // try again with next endpoint iterate(idx + 1, vertx, checkEndpointFn, argument, resultHandler); } }); } // begin endpoint check methods private static void isSentinelOk(Vertx vertx, SocketAddress endpoint, RedisOptions argument, Handler<AsyncResult<SocketAddress>> handler) { RedisClient.create(vertx, argument, endpoint).connect(onCreate -> { if (onCreate.failed()) { handler.handle(Future.failedFuture(onCreate.cause())); return; } final Redis conn = onCreate.result(); // Send a command just to check we have a working node conn.send(cmd(PING), info -> { if (info.failed()) { handler.handle(Future.failedFuture(info.cause())); return; } handler.handle(Future.succeededFuture(endpoint)); conn.close(); }); }); } private static void getMasterFromEndpoint(Vertx vertx, SocketAddress endpoint, RedisOptions options, Handler<AsyncResult<SocketAddress>> handler) { RedisClient.create(vertx, options, endpoint).connect(onCreate -> { if (onCreate.failed()) { handler.handle(Future.failedFuture(onCreate.cause())); return; } final Redis conn = onCreate.result(); final String masterName = options.getMasterName(); // Send a command just to check we have a working node conn.send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName), getMasterAddrByName -> { if (getMasterAddrByName.failed()) { handler.handle(Future.failedFuture(getMasterAddrByName.cause())); return; } // Test the response final Response response = getMasterAddrByName.result(); handler.handle( Future.succeededFuture(SocketAddress.inetSocketAddress(response.get(1).toInteger(), response.get(0).toString()))); // we don't need this connection anymore conn.close(); }); }); } private static void getSlaveFromEndpoint(Vertx vertx, SocketAddress endpoint, RedisOptions options, Handler<AsyncResult<SocketAddress>> handler) { RedisClient.create(vertx, options, endpoint).connect(onCreate -> { if (onCreate.failed()) { handler.handle(Future.failedFuture(onCreate.cause())); return; } final Redis conn = onCreate.result(); final String masterName = options.getMasterName(); // Send a command just to check we have a working node conn.send(cmd(SENTINEL).arg("SLAVES").arg(masterName), sentinelSlaves -> { if (sentinelSlaves.failed()) { handler.handle(Future.failedFuture(sentinelSlaves.cause())); return; } final Response response = sentinelSlaves.result(); // Test the response if (response.size() == 0) { handler.handle(Future.failedFuture("No slaves linked to the master: " + masterName)); } else { Response slaveInfoArr = response.get(RANDOM.nextInt(response.size())); if ((slaveInfoArr.size() % 2) > 0) { handler.handle(Future.failedFuture("Corrupted response from the sentinel")); } else { int port = 6379; String ip = null; for (int i = 0; i < slaveInfoArr.size(); i += 2) { if ("port".equals(slaveInfoArr.get(i).toString())) { port = slaveInfoArr.get(i + 1).toInteger(); } if ("ip".equals(slaveInfoArr.get(i).toString())) { ip = slaveInfoArr.get(i + 1).toString(); } } if (ip == null) { handler.handle(Future.failedFuture("No IP found for a SLAVE node!")); } else { handler.handle(Future.succeededFuture(SocketAddress.inetSocketAddress(port, ip))); } } } conn.close(); }); }); } }