package io.vertx.redis.client.impl;
import io.vertx.core.*;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.NumberType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
public class RedisClusterClient extends BaseRedisClient implements Redis {
public static void addReducer(Command command, Function<List<Response>, Response> fn) {
RedisClusterConnection.addReducer(command, fn);
}
public static void addUnSupportedCommand(Command command, String error) {
RedisClusterConnection.addUnSupportedCommand(command, error);
}
public static void addMasterOnlyCommand(Command command) {
RedisClusterConnection.addMasterOnlyCommand(command);
}
static {
addReducer(MSET, list ->
SimpleStringType.OK);
addReducer(DEL, list ->
NumberType.create(list.stream().mapToLong(Response::toLong).sum()));
addReducer(MGET, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
}
MultiType multi = MultiType.create(total, false);
for (Response resp : list) {
for (Response child : resp) {
multi.add(child);
}
}
return multi;
});
addReducer(KEYS, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
}
MultiType multi = MultiType.create(total, false);
for (Response resp : list) {
for (Response child : resp) {
multi.add(child);
}
}
return multi;
});
addReducer(FLUSHDB, list ->
SimpleStringType.OK);
addReducer(DBSIZE, list ->
NumberType.create(list.stream().mapToLong(Response::toLong).sum()));
Arrays.asList(ASKING, AUTH, BGREWRITEAOF, BGSAVE, CLIENT, COMMAND, CONFIG,
DEBUG, DISCARD, HOST, INFO, LASTSAVE, LATENCY, LOLWUT, MEMORY, MODULE, MONITOR, PFDEBUG, PFSELFTEST,
PING, READONLY, READWRITE, REPLCONF, REPLICAOF, ROLE, SAVE, SCAN, SCRIPT, SELECT, SHUTDOWN, SLAVEOF, SLOWLOG, SWAPDB,
SYNC, SENTINEL).forEach(command -> addUnSupportedCommand(command, null));
addUnSupportedCommand(FLUSHALL, "RedisClusterClient does not handle command FLUSHALL, use FLUSHDB");
addMasterOnlyCommand(WAIT);
}
private final RedisOptions options;
public RedisClusterClient(Vertx vertx, RedisOptions options) {
super(vertx, options);
this.options = options;
if (options.getMaxPoolWaiting() < options.getMaxPoolSize()) {
throw new IllegalStateException("Invalid options: maxPoolWaiting < maxPoolSize");
}
}
@Override
public Future<RedisConnection> connect() {
final Promise<RedisConnection> promise = vertx.promise();
connect(options.getEndpoints(), 0, promise);
return promise.future();
}
private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
if (index >= endpoints.size()) {
onConnect.handle(Future.failedFuture("Cannot connect to any of the provided endpoints"));
return;
}
connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != options.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
connect(endpoints, index + 1, onConnect);
})
.onSuccess(conn -> {
getSlots(endpoints.get(index), conn, getSlots -> {
if (getSlots.failed()) {
conn.close();
connect(endpoints, index + 1, onConnect);
return;
}
conn.close();
final Slots slots = getSlots.result();
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicInteger counter = new AtomicInteger();
final Map<String, RedisConnection> connections = new HashMap<>();
if (options.getMaxPoolSize() < slots.size()) {
onConnect.handle(Future.failedFuture("RedisOptions maxPoolSize < Cluster size(" + slots.size() + "): The pool is not able to hold all required connections!"));
return;
}
for (String endpoint : slots.endpoints()) {
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != options.getUseReplicas() ? cmd(READONLY) : null)
.onFailure(err -> {
failed.set(true);
connectionComplete(counter, slots, connections, failed, onConnect);
})
.onSuccess(cconn -> {
synchronized (connections) {
connections.put(endpoint, cconn);
}
connectionComplete(counter, slots, connections, failed, onConnect);
});
}
});
});
}
private void connectionComplete(AtomicInteger counter, Slots slots, Map<String, RedisConnection> connections, AtomicBoolean failed, Handler<AsyncResult<RedisConnection>> onConnect) {
if (counter.incrementAndGet() == slots.endpoints().length) {
if (failed.get()) {
synchronized (connections) {
for (RedisConnection value : connections.values()) {
if (value != null) {
value.close();
}
}
}
onConnect.handle(Future.failedFuture("Failed to connect to all nodes of the cluster"));
} else {
onConnect.handle(Future.succeededFuture(new RedisClusterConnection(vertx, options, slots, connections)));
}
}
}
private void getSlots(String endpoint, RedisConnection conn, Handler<AsyncResult<Slots>> onGetSlots) {
conn.send(cmd(CLUSTER).arg("SLOTS"), send -> {
if (send.failed()) {
onGetSlots.handle(Future.failedFuture(send.cause()));
return;
}
final Response reply = send.result();
if (reply.size() == 0) {
onGetSlots.handle(Future.failedFuture("SLOTS No slots available in the cluster."));
return;
}
onGetSlots.handle(Future.succeededFuture(new Slots(endpoint, reply)));
});
}
}