package io.vertx.redis.client.impl;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import java.util.List;
public abstract class BaseRedisClient implements Redis {
protected final VertxInternal vertx;
protected final RedisConnectionManager connectionManager;
public BaseRedisClient(Vertx vertx, RedisOptions options) {
this.vertx = (VertxInternal) vertx;
this.connectionManager = new RedisConnectionManager(this.vertx, options);
this.connectionManager.start();
}
@Override
public void close() {
this.connectionManager.close();
}
@Override
public Future<@Nullable Response> send(Request command) {
final Promise<Response> promise = vertx.promise();
if (command.command().isPubSub()) {
promise.fail("PubSub command in connection-less mode not allowed");
return promise.future();
}
connect()
.onFailure(promise::fail)
.onSuccess(conn -> conn.send(command)
.onComplete(send -> {
try {
promise.handle(send);
} finally {
conn.close();
}
}));
return promise.future();
}
@Override
public Future<List<@Nullable Response>> batch(List<Request> commands) {
final Promise<List<Response>> promise = vertx.promise();
for (Request req : commands) {
if (req.command().isPubSub()) {
promise.fail("PubSub command in connection-less batch not allowed");
return promise.future();
}
}
connect()
.onFailure(promise::fail)
.onSuccess(conn -> {
conn.batch(commands)
.onSuccess(responses -> {
try {
promise.complete(responses);
} finally {
conn.close();
}
})
.onFailure(err -> {
try {
promise.fail(err);
} finally {
conn.close();
}
});
});
return promise.future();
}
}