package io.vertx.redis.client.impl;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.clientconnection.ConnectionListener;
import io.vertx.redis.client.*;
import io.vertx.redis.client.impl.types.ErrorType;
import io.vertx.redis.client.impl.types.Multi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class RedisStandaloneConnection implements RedisConnection, ParserHandler {
private static final String BASE_ADDRESS = "io.vertx.redis";
private static final Logger LOG = LoggerFactory.getLogger(RedisStandaloneConnection.class);
private static final ErrorType CONNECTION_CLOSED = ErrorType.create("CONNECTION_CLOSED");
private final ConnectionListener<RedisConnection> listener;
private final VertxInternal vertx;
private final ContextInternal context;
private final EventBus eventBus;
private final NetSocket netSocket;
private final ArrayQueue waiting;
private final int recycleTimeout;
private Handler<Throwable> onException;
private Handler<Void> onEnd;
private Handler<Response> onMessage;
private long expirationTimestamp;
public RedisStandaloneConnection(Vertx vertx, ContextInternal context, ConnectionListener<RedisConnection> connectionListener, NetSocket netSocket, RedisOptions options) {
this.listener = connectionListener;
this.vertx = (VertxInternal) vertx;
this.eventBus = vertx.eventBus();
this.context = context;
this.netSocket = netSocket;
this.waiting = new ArrayQueue(options.getMaxWaitingHandlers());
this.recycleTimeout = options.getPoolRecycleTimeout();
}
void forceClose() {
listener.onEvict();
netSocket.close();
}
public boolean isValid() {
return expirationTimestamp > 0 && System.currentTimeMillis() <= expirationTimestamp;
}
@Override
public void close() {
expirationTimestamp = recycleTimeout > 0 ? System.currentTimeMillis() + recycleTimeout : 0L;
listener.onRecycle();
}
@Override
public boolean pendingQueueFull() {
return waiting.isFull();
}
@Override
public RedisConnection exceptionHandler(Handler<Throwable> handler) {
this.onException = handler;
return this;
}
@Override
public RedisConnection endHandler(Handler<Void> handler) {
this.onEnd = handler;
return this;
}
@Override
public RedisConnection handler(Handler<Response> handler) {
this.onMessage = handler;
return this;
}
@Override
public RedisConnection pause() {
netSocket.pause();
return this;
}
@Override
public RedisConnection resume() {
netSocket.resume();
return this;
}
@Override
public RedisConnection fetch(long size) {
return this;
}
@Override
public Future<Response> send(final Request request) {
final Promise<Response> promise = vertx.promise();
final boolean voidCmd = request.command().isVoid();
if (!voidCmd && waiting.isFull()) {
promise.fail("Redis waiting Queue is full");
return promise.future();
}
final Buffer message = ((RequestImpl) request).encode();
context.execute(v -> {
if (!voidCmd) {
if (waiting.isFull()) {
promise.fail("Redis waiting Queue is full");
return;
}
waiting.offer(promise);
}
netSocket.write(message, write -> {
if (write.failed()) {
fatal(write.cause());
} else {
if (voidCmd) {
promise.complete();
}
}
});
});
return promise.future();
}
@Override
public Future<List<Response>> batch(List<Request> commands) {
final Promise<List<Response>> promise = vertx.promise();
if (waiting.freeSlots() < commands.size()) {
promise.fail("Redis waiting Queue is full");
return promise.future();
}
final List<Promise<Response>> callbacks = new ArrayList<>(commands.size());
final List<Response> replies = new ArrayList<>(commands.size());
final AtomicInteger count = new AtomicInteger(commands.size());
final AtomicBoolean failed = new AtomicBoolean(false);
final Buffer messages = Buffer.buffer();
for (int i = 0; i < commands.size(); i++) {
final int index = i;
final RequestImpl req = (RequestImpl) commands.get(index);
req.encode(messages);
callbacks.add(index, vertx.promise(command -> {
if (!failed.get()) {
if (command.failed()) {
failed.set(true);
promise.fail(command.cause());
return;
}
}
replies.add(index, command.result());
if (count.decrementAndGet() == 0) {
promise.complete(replies);
}
}));
}
context.execute(v -> {
if (waiting.freeSlots() < callbacks.size()) {
promise.fail("Redis waiting Queue is full");
return;
}
for (Promise<Response> callback : callbacks) {
waiting.offer(callback);
}
netSocket.write(messages, write -> {
if (write.failed()) {
fatal(write.cause());
}
});
});
return promise.future();
}
@Override
public void handle(Response reply) {
if ((reply != null && reply.type() == ResponseType.PUSH) || waiting.isEmpty()) {
if (onMessage != null) {
context.execute(reply, onMessage);
} else {
if (reply instanceof Multi) {
if (reply.size() == 3 && "message".equals(reply.get(0).toString())) {
eventBus.send(
BASE_ADDRESS + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("value", new JsonObject()
.put("channel", reply.get(1).toString())
.put("message", reply.get(2).toString())));
return;
}
if (reply.size() == 4 && "pmessage".equals(reply.get(0).toString())) {
eventBus.send(
BASE_ADDRESS + "." + reply.get(1).toString(),
new JsonObject()
.put("status", "OK")
.put("value", new JsonObject()
.put("pattern", reply.get(1).toString())
.put("channel", reply.get(2).toString())
.put("message", reply.get(3).toString())));
return;
}
}
LOG.warn("No handler waiting for message: " + reply);
}
return;
}
context.execute(v -> {
final Promise<Response> req = waiting.poll();
if (req != null) {
if (reply == null) {
try {
req.complete();
} catch (RuntimeException e) {
fail(e);
}
return;
}
if (reply.type() == ResponseType.ERROR) {
try {
req.fail((ErrorType) reply);
} catch (RuntimeException e) {
fail(e);
}
return;
}
try {
req.complete(reply);
} catch (RuntimeException e) {
fail(e);
}
} else {
LOG.error("No handler waiting for message: " + reply);
}
});
}
public void end(Void v) {
cleanupQueue(CONNECTION_CLOSED);
evict();
if (onEnd != null) {
context.execute(v, onEnd);
}
}
@Override
public void fail(Throwable t) {
evict();
if (onException != null) {
context.execute(t, onException);
}
}
@Override
public void fatal(Throwable t) {
cleanupQueue(t);
evict();
if (onException != null) {
context.execute(t, onException);
}
}
private void evict() {
try {
listener.onEvict();
} catch (RejectedExecutionException e) {
if (onException != null) {
context.execute(e, onException);
}
}
}
private void cleanupQueue(Throwable t) {
context.execute(v -> {
Promise<Response> req;
while ((req = waiting.poll()) != null) {
if (t != null) {
try {
req.fail(t);
} catch (RuntimeException e) {
LOG.warn("Exception during cleanup", e);
}
}
}
});
}
}