package io.vertx.redis.client.impl;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.ErrorType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class RedisClient implements Redis, ParserHandler {
private static final Logger LOG = LoggerFactory.getLogger(RedisClient.class);
private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
public static Redis create(Vertx vertx, RedisOptions options) {
return create(vertx, options, options.getEndpoint());
}
static Redis create(Vertx vertx, RedisOptions options, SocketAddress address) {
return new RedisClient(vertx, options, address);
}
private static void authenticate(Redis client, RedisOptions options, Handler<AsyncResult<Void>> handler) {
if (options.getPassword() == null) {
handler.handle(Future.succeededFuture());
return;
}
client.send(Request.cmd(Command.AUTH).arg(options.getPassword()), auth -> {
if (auth.failed()) {
handler.handle(Future.failedFuture(auth.cause()));
} else {
handler.handle(Future.succeededFuture());
}
});
}
private static void select(Redis client, RedisOptions options, Handler<AsyncResult<Void>> handler) {
if (options.getSelect() == null) {
handler.handle(Future.succeededFuture());
return;
}
client.send(Request.cmd(Command.SELECT).arg(options.getSelect()), auth -> {
if (auth.failed()) {
handler.handle(Future.failedFuture(auth.cause()));
} else {
handler.handle(Future.succeededFuture());
}
});
}
private final Context context;
private final ArrayQueue waiting;
private final NetClient netClient;
private final SocketAddress socketAddress;
private final RedisOptions options;
private Handler<Throwable> onException = t -> LOG.error("Unhandled Error", t);
private Handler<Void> onEnd;
private Handler<Response> onMessage;
private NetSocket netSocket;
private boolean connected = false;
private RedisClient(Vertx vertx, RedisOptions options, SocketAddress endpoint) {
this.context = vertx.getOrCreateContext();
this.netClient = vertx.createNetClient(options.getNetClientOptions());
this.waiting = new ArrayQueue(options.getMaxWaitingHandlers());
this.socketAddress = endpoint;
this.options = options;
}
@Override
public Redis connect(Handler<AsyncResult<Redis>> onConnect) {
if (connected) {
onConnect.handle(Future.succeededFuture(this));
return this;
}
netClient.connect(socketAddress, clientConnect -> {
if (clientConnect.failed()) {
netClient.close();
onConnect.handle(Future.failedFuture(clientConnect.cause()));
return;
}
netSocket = clientConnect.result();
this.connected = true;
netSocket
.handler(new RESPParser(this, options.getMaxNestedArrays()))
.closeHandler(close -> {
netClient.close();
cleanupQueue(CONNECTION_CLOSED);
connected = false;
if (onEnd != null) {
onEnd.handle(close);
}
})
.exceptionHandler(exception -> {
netSocket.close();
netClient.close();
cleanupQueue(exception);
connected = false;
if (onException != null) {
onException.handle(exception);
}
});
authenticate(this, options, authenticate -> {
if (authenticate.failed()) {
onConnect.handle(Future.failedFuture(authenticate.cause()));
return;
}
select(this, options, select -> {
if (select.failed()) {
onConnect.handle(Future.failedFuture(select.cause()));
return;
}
onConnect.handle(Future.succeededFuture(this));
});
});
});
return this;
}
@Override
public void close() {
if (netSocket != null) {
netSocket.close();
}
connected = false;
}
@Override
public Redis exceptionHandler(Handler<Throwable> handler) {
this.onException = handler;
return this;
}
@Override
public Redis endHandler(Handler<Void> handler) {
this.onEnd = handler;
return this;
}
@Override
public Redis handler(Handler<Response> handler) {
this.onMessage = handler;
return this;
}
@Override
public Redis pause() {
netSocket.pause();
return this;
}
@Override
public Redis resume() {
netSocket.resume();
return this;
}
@Override
public Redis fetch(long size) {
return this;
}
private void cleanupQueue(Throwable t) {
context.runOnContext(v -> {
Handler<AsyncResult<Response>> req;
while ((req = waiting.poll()) != null) {
if (t != null) {
try {
req.handle(Future.failedFuture(t));
} catch (RuntimeException e) {
LOG.warn("Exception during cleanup", e);
}
}
}
});
}
@Override
public Redis send(final Request request, Handler<AsyncResult<Response>> handler) {
if (!connected) {
handler.handle(Future.failedFuture("Redis connection is broken."));
return this;
}
if (waiting.isFull()) {
handler.handle(Future.failedFuture("Redis waiting Queue is full"));
return this;
}
final Buffer message = ((RequestImpl) request).encode();
context.runOnContext(v -> {
waiting.offer(handler);
netSocket.write(message, write -> {
if (write.failed()) {
fatal(write.cause());
}
});
});
return this;
}
@Override
public Redis batch(List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
if (waiting.freeSlots() < commands.size()) {
handler.handle(Future.failedFuture("Redis waiting Queue is full"));
return this;
}
final List<Handler<AsyncResult<Response>>> callbacks = new ArrayList<>(commands.size());
final List<Response> replies = new ArrayList<>(commands.size());
final AtomicInteger count = new AtomicInteger(commands.size());
final AtomicBoolean failed = new AtomicBoolean(false);
final Buffer messages = Buffer.buffer();
for (int i = 0; i < commands.size(); i++) {
final int index = i;
final RequestImpl req = (RequestImpl) commands.get(index);
req.encode(messages);
callbacks.add(index, command -> {
if (!failed.get()) {
if (command.failed()) {
failed.set(true);
if (handler != null) {
handler.handle(Future.failedFuture(command.cause()));
}
return;
}
replies.add(index, command.result());
if (count.decrementAndGet() == 0) {
if (handler != null) {
handler.handle(Future.succeededFuture(replies));
}
}
}
});
}
context.runOnContext(v -> {
for (Handler<AsyncResult<Response>> callback : callbacks) {
waiting.offer(callback);
}
netSocket.write(messages, write -> {
if (write.failed()) {
fatal(write.cause());
}
});
});
return this;
}
@Override
public void handle(Response reply) {
if (waiting.isEmpty()) {
if (onMessage != null) {
onMessage.handle(reply);
} else {
LOG.warn("No handler waiting for message: " + reply);
}
return;
}
context.runOnContext(v -> {
final Handler<AsyncResult<Response>> req = waiting.poll();
if (req != null) {
if (reply == null) {
try {
req.handle(Future.succeededFuture());
} catch (RuntimeException e) {
fail(e);
}
return;
}
if (reply.type() == ResponseType.ERROR) {
try {
req.handle(Future.failedFuture((ErrorType) reply));
} catch (RuntimeException e) {
fail(e);
}
return;
}
try {
req.handle(Future.succeededFuture(reply));
} catch (RuntimeException e) {
fail(e);
}
} else {
LOG.error("No handler waiting for message: " + reply);
}
});
}
@Override
public SocketAddress socketAddress() {
return socketAddress;
}
@Override
public void fail(Throwable t) {
if (onException != null) {
onException.handle(t);
} else {
LOG.error("External failure", t);
}
}
@Override
public void fatal(Throwable t) {
if (onException != null) {
onException.handle(t);
} else {
LOG.error("External failure", t);
}
close();
cleanupQueue(t);
}
}