package io.vertx.redis.client.impl;
import io.vertx.core.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.ErrorType;
import io.vertx.redis.client.impl.types.IntegerType;
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.vertx.redis.client.Command.*;
import static io.vertx.redis.client.Request.cmd;
public class RedisClusterClient implements Redis {
private static final int RETRIES = 16;
private static final Logger LOG = LoggerFactory.getLogger(RedisClusterClient.class);
private static final Random RANDOM = new Random();
private static final Map<Command, Function<List<Response>, Response>> REDUCERS = new HashMap<>();
private static final Map<Command, String> UNSUPPORTEDCOMMANDS = new HashMap<>();
public static void addReducer(Command command, Function<List<Response>, Response> fn) {
REDUCERS.put(command, fn);
}
public static void addUnSupportedCommand(Command command, String error) {
if(error == null || error.isEmpty()) {
UNSUPPORTEDCOMMANDS.put(command, "RedisClusterClient does not handle command " +
new String(command.getBytes(), StandardCharsets.ISO_8859_1).split("\r\n")[1] + ", use non cluster client on the right node.");
} else {
UNSUPPORTEDCOMMANDS.put(command, error);
}
}
public static Redis create(Vertx vertx, RedisOptions options) {
return new RedisClusterClient(vertx, options);
}
static {
addReducer(MSET, list -> {
return SimpleStringType.OK;
});
addReducer(DEL, list -> {
return IntegerType.create(list.stream().mapToLong(Response::toLong).sum());
});
addReducer(MGET, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
}
MultiType multi = MultiType.create(total);
for (Response resp : list) {
for (Response child : resp) {
multi.add(child);
}
}
return multi;
});
addReducer(KEYS, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
}
MultiType multi = MultiType.create(total);
for (Response resp : list) {
for (Response child : resp) {
multi.add(child);
}
}
return multi;
});
addReducer(FLUSHDB, list -> SimpleStringType.OK);
addReducer(DBSIZE, list -> {
return IntegerType.create(list.stream().mapToLong(Response::toLong).sum());
});
Arrays.asList(ASKING, AUTH, BGREWRITEAOF, BGSAVE, CLIENT, CLUSTER, COMMAND, CONFIG,
DEBUG, DISCARD, HOST, INFO, LASTSAVE, LATENCY, LOLWUT, MEMORY, MODULE, MONITOR, PFDEBUG, PFSELFTEST,
PING, READONLY, READWRITE, REPLCONF, REPLICAOF, ROLE, SAVE, SCAN, SCRIPT, SELECT, SHUTDOWN, SLAVEOF, SLOWLOG, SWAPDB,
SYNC, SENTINEL).forEach(command -> addUnSupportedCommand(command, null));
addUnSupportedCommand(FLUSHALL, "RedisClusterClient does not handle command FLUSHALL, use FLUSHDB");
}
private final Vertx vertx;
private final RedisSlaves slaves;
private final RedisOptions options;
private final Map<SocketAddress, Redis> connections = new HashMap<>();
private final Redis[][] slots = new Redis[16384][];
private Handler<Throwable> onException = t -> LOG.error("Unhandled Error", t);
private Handler<Void> onEnd;
private Handler<Response> onMessage;
private int slotNumber;
private RedisClusterClient(Vertx vertx, RedisOptions options) {
this.vertx = vertx;
this.slaves = options.getUseSlave();
this.options = options;
}
@Override
public Redis connect(Handler<AsyncResult<Redis>> onCreate) {
final List<SocketAddress> endpoints = options.getEndpoints();
final AtomicInteger counter = new AtomicInteger(endpoints.size());
for (SocketAddress endpoint : endpoints) {
getClient(endpoint, options, ar -> {
final int total = counter.decrementAndGet();
if (ar.failed()) {
LOG.warn("Failed to connect to: " + endpoint);
}
if (total == 0) {
getSlots(options, getSlots -> {
if (getSlots.failed()) {
onCreate.handle(Future.failedFuture(getSlots.cause()));
} else {
onCreate.handle(Future.succeededFuture(this));
}
});
}
});
}
return this;
}
@Override
public void close() {
connections.entrySet().removeIf(kv -> {
if (kv.getValue() != null) {
kv.getValue().close();
}
return true;
});
}
@Override
public Redis exceptionHandler(Handler<Throwable> handler) {
this.onException = handler;
return this;
}
@Override
public Redis endHandler(Handler<Void> handler) {
this.onEnd = handler;
return this;
}
@Override
public Redis handler(Handler<Response> handler) {
this.onMessage = handler;
return this;
}
@Override
public Redis pause() {
this.connections.values().forEach(conn -> {
if (conn != null) {
conn.pause();
}
});
return this;
}
@Override
public Redis resume() {
this.connections.values().forEach(conn -> {
if (conn != null) {
conn.resume();
}
});
return null;
}
@Override
public Redis send(Request request, Handler<AsyncResult<Response>> handler) {
final RequestImpl req = (RequestImpl) request;
final Command cmd = req.command();
if(UNSUPPORTEDCOMMANDS.containsKey(cmd)) {
try {
handler.handle(Future.failedFuture(UNSUPPORTEDCOMMANDS.get(cmd)));
} catch (RuntimeException e) {
onException.handle(e);
}
return this;
}
if (cmd.isMovable()) {
try {
handler.handle(Future.failedFuture("RedisClusterClient does not handle movable keys commands, use non cluster client on the right node."));
} catch (RuntimeException e) {
onException.handle(e);
}
return this;
}
if (cmd.isKeyless() && REDUCERS.containsKey(cmd)) {
final List<Future> responses = new ArrayList<>(slotNumber);
for (int i = 1; i <= slotNumber; i++) {
Redis[] clients = slots[(slots.length / slotNumber - 1) * i];
final Future<Response> f = Future.future();
send(selectMasterOrSlave(req.command().isReadOnly(), clients), options, RETRIES, req, f);
responses.add(f);
}
CompositeFuture.all(responses).setHandler(composite -> {
if (composite.failed()) {
try {
handler.handle(Future.failedFuture(composite.cause()));
} catch (RuntimeException e) {
onException.handle(e);
}
} else {
try {
handler.handle(Future.succeededFuture(REDUCERS.get(cmd).apply(composite.result().list())));
} catch (RuntimeException e) {
onException.handle(e);
}
}
});
return this;
}
if (cmd.isKeyless()) {
send(selectClient(-1, cmd.isReadOnly()), options, RETRIES, req, handler);
return this;
}
final List<byte[]> args = req.getArgs();
if (cmd.isMultiKey()) {
int currentSlot = -1;
int start = cmd.getFirstKey() - 1;
int end = cmd.getLastKey();
if (end > 0) {
end--;
}
if (end < 0) {
end = args.size() + (end + 1);
}
int step = cmd.getInterval();
for (int i = start; i < end; i += step) {
int slot = ZModem.generate(args.get(i));
if (currentSlot == -1) {
currentSlot = slot;
continue;
}
if (currentSlot != slot) {
if (!REDUCERS.containsKey(cmd)) {
try {
handler.handle(Future.failedFuture("No Reducer available for: " + cmd));
} catch (RuntimeException e) {
onException.handle(e);
}
return this;
}
final Map<Integer, Request> requests = splitRequest(cmd, args, start, end, step);
final List<Future> responses = new ArrayList<>(requests.size());
for (Map.Entry<Integer, Request> kv : requests.entrySet()) {
final Promise<Response> p = Promise.promise();
send(selectClient(kv.getKey(), cmd.isReadOnly()), options, RETRIES, kv.getValue(), p);
responses.add(p.future());
}
CompositeFuture.all(responses).setHandler(composite -> {
if (composite.failed()) {
try {
handler.handle(Future.failedFuture(composite.cause()));
} catch (RuntimeException e) {
onException.handle(e);
}
} else {
try {
handler.handle(Future.succeededFuture(REDUCERS.get(cmd).apply(composite.result().list())));
} catch (RuntimeException e) {
onException.handle(e);
}
}
});
return this;
}
}
send(selectClient(currentSlot, cmd.isReadOnly()), options, RETRIES, req, handler);
return this;
}
int start = cmd.getFirstKey() - 1;
send(selectClient(ZModem.generate(args.get(start)), cmd.isReadOnly()), options, RETRIES, req, handler);
return this;
}
private Map<Integer, Request> splitRequest(Command cmd, List<byte[]> args, int start, int end, int step) {
final Map<Integer, Request> map = new IdentityHashMap<>();
for (int i = start; i < end; i += step) {
int slot = ZModem.generate(args.get(i));
Request request = map.get(slot);
if (request == null) {
request = Request.cmd(cmd);
for (int j = 0; j < start; j++) {
request.arg(args.get(j));
}
map.put(slot, request);
}
request.arg(args.get(i));
for (int j = i + 1; j < i + step; j++) {
request.arg(args.get(j));
}
}
final Collection<Request> col = map.values();
col.forEach(req -> {
for (int j = end; j < args.size(); j++) {
req.arg(args.get(j));
}
});
return map;
}
@Override
public Redis batch(List<Request> requests, Handler<AsyncResult<List<Response>>> handler) {
int currentSlot = -1;
boolean readOnly = false;
for (int i = 0; i < requests.size(); i++) {
final RequestImpl req = (RequestImpl) requests.get(i);
final Command cmd = req.command();
if(UNSUPPORTEDCOMMANDS.containsKey(cmd)) {
try {
handler.handle(Future.failedFuture(UNSUPPORTEDCOMMANDS.get(cmd)));
} catch (RuntimeException e) {
onException.handle(e);
}
return this;
}
readOnly |= cmd.isReadOnly();
if (cmd.isKeyless()) {
continue;
}
if (cmd.isMovable()) {
handler.handle(Future.failedFuture("RedisClusterClient does not handle movable keys commands, use non cluster client on the right node."));
return this;
}
final List<byte[]> args = req.getArgs();
if (cmd.isMultiKey()) {
int start = cmd.getFirstKey() - 1;
int end = cmd.getLastKey();
if (end > 0) {
end--;
}
if (end < 0) {
end = args.size() + (end + 1);
}
int step = cmd.getInterval();
for (int j = start; j < end; j += step) {
int slot = ZModem.generate(args.get(j));
if (currentSlot == -1) {
currentSlot = slot;
continue;
}
if (currentSlot != slot) {
handler.handle(Future.failedFuture("RedisClusterClient does not handle batching commands with keys across different slots. TODO: Split the command into slots and then batch."));
return this;
}
}
continue;
}
int start = cmd.getFirstKey() - 1;
final int slot = ZModem.generate(args.get(start));
if (currentSlot == -1) {
currentSlot = slot;
continue;
}
if (currentSlot != slot) {
handler.handle(Future.failedFuture("RedisClusterClient does not handle batching commands with keys across different slots. TODO: Split the command into slots and then batch."));
return this;
}
}
batch(selectClient(currentSlot, readOnly), options, RETRIES, requests, handler);
return this;
}
@Override
public SocketAddress socketAddress() {
throw new UnsupportedOperationException("Cluster Connection is not bound to a socket");
}
@Override
public Redis fetch(long amount) {
this.connections.values().forEach(conn -> {
if (conn != null) {
conn.fetch(amount);
}
});
return this;
}
private void getClient(SocketAddress address, RedisOptions options, Handler<AsyncResult<Redis>> onClient) {
Redis cli = connections.get(address);
if (cli != null) {
onClient.handle(Future.succeededFuture(cli));
return;
}
RedisClient.create(vertx, options, address).connect(create -> {
if (create.failed()) {
onClient.handle(create);
return;
}
final Redis conn = create.result();
conn.exceptionHandler(t -> {
connections.remove(address);
if (onException != null) {
onException.handle(t);
}
getSlots(options, ar -> {
if (ar.failed()) {
if (onException != null) {
onException.handle(ar.cause());
}
}
});
});
conn.endHandler(v -> {
connections.remove(address);
if (connections.size() == 0) {
if (onEnd != null) {
onEnd.handle(null);
}
}
});
conn.handler(r -> {
if (onMessage != null) {
onMessage.handle(r);
}
});
connections.put(address, conn);
onClient.handle(Future.succeededFuture(conn));
});
}
private void getSlots(RedisOptions options, Handler<AsyncResult<Void>> handler) {
final Set<SocketAddress> exclude = new HashSet<>();
final AtomicReference<Throwable> cause = new AtomicReference<>();
final Runnable tryClient = new Runnable() {
@Override
public void run() {
final Redis conn = getRandomConnection(exclude);
if (conn == null) {
handler.handle(Future.failedFuture(ErrorType.create("SLOTS No client's available.")));
return;
}
conn.send(cmd(CLUSTER).arg("SLOTS"), send -> {
if (send.failed()) {
exclude.add(conn.socketAddress());
cause.set(send.cause());
run();
return;
}
final Response reply = send.result();
if (reply.size() == 0) {
exclude.add(conn.socketAddress());
cause.set(ErrorType.create("SLOTS No slots available in the cluster."));
run();
return;
}
final Set<SocketAddress> seenClients = new HashSet<>();
final AtomicInteger slotCounter = new AtomicInteger(reply.size());
slotNumber = reply.size();
for (int i = 0; i < reply.size(); i++) {
Response s = reply.get(i);
int start = s.get(0).toInteger();
int end = s.get(1).toInteger();
List<SocketAddress> addresses = new ArrayList<>();
for (int index = 2; index < s.size(); index++) {
Response c = s.get(index);
SocketAddress address = SocketAddress.inetSocketAddress(c.get(1).toInteger(), c.get(0).toString());
addresses.add(address);
seenClients.add(address);
}
loadSlot(start, end, addresses, options, onLoad -> {
if (slotCounter.decrementAndGet() == 0) {
connections.entrySet().removeIf(kv -> {
if (kv.getValue() == null) {
return true;
}
if (!seenClients.contains(kv.getKey())) {
kv.getValue().close();
kv.setValue(null);
return true;
}
return false;
});
handler.handle(Future.succeededFuture());
}
});
}
});
}
};
tryClient.run();
}
private Redis getRandomConnection(Set<SocketAddress> exclude) {
List<Redis> available = connections.entrySet().stream()
.filter(kv -> !exclude.contains(kv.getKey()) && kv.getValue() != null)
.map(Map.Entry::getValue)
.collect(Collectors.toList());
if (available.size() == 0) {
return null;
}
return available.get(RANDOM.nextInt(available.size()));
}
private void loadSlot(int start, int end, List<SocketAddress> addresses, RedisOptions options, Handler<Void> onLoad) {
final Redis[] connections = new Redis[addresses.size()];
final AtomicInteger counter = new AtomicInteger(addresses.size());
for (int i = 0; i < addresses.size(); i++) {
final int idx = i;
final SocketAddress address = addresses.get(idx);
final Future<Redis> getClientFuture = Future.future();
getClient(address, options, getClientFuture);
getClientFuture.compose(getClient -> {
if (RedisSlaves.NEVER != options.getUseSlave()) {
final Future<Response> readOnlyFuture = Future.future();
getClient.send(cmd(READONLY), readOnlyFuture);
return readOnlyFuture.map(getClient);
} else {
return Future.succeededFuture(getClient);
}
}).setHandler(getClient -> {
if (getClient.failed()) {
LOG.warn("Could not get a connection to node [" + address + "]");
} else {
connections[idx] = getClient.result();
}
if (counter.decrementAndGet() == 0) {
for (int j = start; j <= end; j++) {
slots[j] = connections;
}
onLoad.handle(null);
}
});
}
}
private void send(final Redis client, final RedisOptions options, final int retries, Request command, Handler<AsyncResult<Response>> handler) {
if (client == null) {
try {
handler.handle(Future.failedFuture("No connection available."));
} catch (RuntimeException e) {
onException.handle(e);
}
return;
}
client.send(command, send -> {
if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) {
final ErrorType cause = (ErrorType) send.cause();
boolean ask = cause.is("ASK");
boolean moved = !ask && cause.is("MOVED");
if (moved || ask) {
final Runnable andThen = () -> {
String addr = cause.slice(' ', 2);
if (addr == null) {
try {
handler.handle(Future.failedFuture(cause));
} catch (RuntimeException e) {
onException.handle(e);
}
return;
}
int sep = addr.lastIndexOf(':');
SocketAddress socketAddress;
if (sep != -1) {
socketAddress = SocketAddress.inetSocketAddress(
Integer.parseInt(addr.substring(sep + 1)),
addr.substring(0, sep));
} else {
socketAddress = SocketAddress.domainSocketAddress(addr);
}
getClient(socketAddress, options, getClient -> {
if (getClient.failed()) {
try {
handler.handle(Future.failedFuture(getClient.cause()));
} catch (RuntimeException e) {
onException.handle(e);
}
return;
}
send(getClient.result(), options, retries - 1, command, handler);
});
};
if (moved) {
getSlots(options, getSlots -> andThen.run());
} else {
andThen.run();
}
return;
}
if (cause.is("TRYAGAIN") || cause.is("CLUSTERDOWN")) {
long backoff = (long) (Math.pow(2, 16 - Math.max(retries, 9)) * 10);
vertx.setTimer(backoff, t -> send(client, options, retries - 1, command, handler));
return;
}
}
try {
handler.handle(send);
} catch (RuntimeException e) {
onException.handle(e);
}
});
}
private void batch(final Redis client, final RedisOptions options, final int retries, List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
if (client == null) {
try {
handler.handle(Future.failedFuture("No connection available."));
} catch (RuntimeException e) {
onException.handle(e);
}
return;
}
client.batch(commands, send -> {
if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) {
final ErrorType cause = (ErrorType) send.cause();
boolean ask = cause.is("ASK");
boolean moved = !ask && cause.is("MOVED");
if (moved || ask) {
final Runnable andThen = () -> {
String addr = cause.slice(' ', 2);
if (addr == null) {
try {
handler.handle(Future.failedFuture(cause));
} catch (RuntimeException e) {
onException.handle(e);
}
return;
}
int sep = addr.lastIndexOf(':');
SocketAddress socketAddress;
if (sep != -1) {
socketAddress = SocketAddress.inetSocketAddress(
Integer.parseInt(addr.substring(sep + 1)),
addr.substring(0, sep));
} else {
socketAddress = SocketAddress.domainSocketAddress(addr);
}
getClient(socketAddress, options, getClient -> {
if (getClient.failed()) {
try {
handler.handle(Future.failedFuture(getClient.cause()));
} catch (RuntimeException e) {
onException.handle(e);
}
return;
}
batch(getClient.result(), options, retries - 1, commands, handler);
});
};
if (moved) {
getSlots(options, getSlots -> andThen.run());
} else {
andThen.run();
}
return;
}
if (cause.is("TRYAGAIN") || cause.is("CLUSTERDOWN")) {
long backoff = (long) (Math.pow(2, 16 - Math.max(retries, 9)) * 10);
vertx.setTimer(backoff, t -> batch(client, options, retries - 1, commands, handler));
return;
}
}
try {
handler.handle(send);
} catch (RuntimeException e) {
onException.handle(e);
}
});
}
private Redis selectClient(int keySlot, boolean readOnly) {
if (keySlot == -1) {
return getRandomConnection(Collections.emptySet());
}
Redis[] clients = slots[keySlot];
if (clients == null || clients.length == 0) {
return getRandomConnection(Collections.emptySet());
}
return selectMasterOrSlave(readOnly, clients);
}
private Redis selectMasterOrSlave(boolean readOnly, Redis[] clients) {
int index = 0;
if (readOnly && slaves != RedisSlaves.NEVER && clients.length > 1) {
if (slaves == RedisSlaves.ALWAYS) {
index = RANDOM.nextInt(clients.length - 1) + 1;
}
if (slaves == RedisSlaves.SHARE) {
index = RANDOM.nextInt(clients.length);
}
}
return clients[index];
}
}