package io.vertx.ext.mail.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.ext.auth.PRNG;
import io.vertx.ext.mail.MailConfig;
import io.vertx.ext.mail.StartTLSOptions;
import io.vertx.ext.mail.impl.sasl.AuthOperationFactory;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
class SMTPConnectionPool implements ConnectionLifeCycleListener {
private static final Logger log = LoggerFactory.getLogger(SMTPConnectionPool.class);
private final int maxSockets;
private final boolean keepAlive;
private final Queue<Waiter> waiters = new ArrayDeque<>();
private final Set<SMTPConnection> allConnections = new HashSet<>();
private final NetClient netClient;
private final MailConfig config;
private final PRNG prng;
private final AuthOperationFactory authOperationFactory;
private String hostname;
private boolean closed = false;
private int connCount;
private Handler<Void> closeFinishedHandler;
SMTPConnectionPool(Vertx vertx, MailConfig config) {
this.config = config;
maxSockets = config.getMaxPoolSize();
keepAlive = config.isKeepAlive();
this.prng = new PRNG(vertx);
this.authOperationFactory = new AuthOperationFactory(prng);
String verification = config.getHostnameVerificationAlgorithm();
if ((verification == null || verification.isEmpty()) && !config.isTrustAll() &&
(config.isSsl() || config.getStarttls() != StartTLSOptions.DISABLED)) {
config.setHostnameVerificationAlgorithm("HTTPS");
}
netClient = vertx.createNetClient(config);
}
AuthOperationFactory getAuthOperationFactory() {
return authOperationFactory;
}
void getConnection(String hostname, Handler<AsyncResult<SMTPConnection>> resultHandler) {
log.debug("getConnection()");
this.hostname = hostname;
if (closed) {
resultHandler.handle(Future.failedFuture("connection pool is closed"));
} else {
getConnection0(resultHandler);
}
}
void close() {
close(null);
}
synchronized void close(Handler<Void> finishedHandler) {
if (closed) {
throw new IllegalStateException("pool is already closed");
} else {
closed = true;
closeFinishedHandler = finishedHandler;
closeAllConnections();
this.prng.close();
}
}
synchronized int connCount() {
return connCount;
}
public synchronized void dataEnded(SMTPConnection conn) {
checkReuseConnection(conn);
}
public synchronized void connectionClosed(SMTPConnection conn) {
log.debug("connection closed, removing from pool");
connCount--;
if (conn != null) {
allConnections.remove(conn);
}
Waiter waiter = waiters.poll();
if (waiter != null) {
log.debug("creating new connection for waiter");
createNewConnection(waiter.handler);
}
if (closed && connCount == 0) {
log.debug("all connections closed, closing NetClient");
netClient.close();
if (closeFinishedHandler != null) {
closeFinishedHandler.handle(null);
}
}
}
NetClient getNetClient() {
return this.netClient;
}
private synchronized void getConnection0(Handler<AsyncResult<SMTPConnection>> handler) {
SMTPConnection idleConn = null;
for (SMTPConnection conn : allConnections) {
if (!conn.isBroken() && conn.isIdle()) {
idleConn = conn;
break;
}
}
if (idleConn == null && connCount >= maxSockets) {
log.debug("waiting for a free socket");
waiters.add(new Waiter(handler));
} else {
if (idleConn == null) {
log.debug("create a new connection");
createNewConnection(handler);
} else {
if (idleConn.isClosed()) {
log.warn("idle connection is closed already, this may cause a problem");
}
log.debug("found idle connection, checking");
final SMTPConnection conn = idleConn;
conn.useConnection();
conn.getContext().runOnContext(v -> new SMTPReset(conn, result -> {
if (result.succeeded()) {
handler.handle(Future.succeededFuture(conn));
} else {
conn.setBroken();
log.debug("using idle connection failed, create a new connection");
createNewConnection(handler);
}
}).start());
}
}
}
private synchronized void checkReuseConnection(SMTPConnection conn) {
if (conn.isBroken()) {
log.debug("connection is broken, closing");
conn.close();
} else {
if (!keepAlive || closed) {
log.debug("connection pool is disabled or pool is already closed, immediately doing QUIT");
conn.close();
} else {
log.debug("checking for waiting operations");
Waiter waiter = waiters.poll();
if (waiter != null) {
log.debug("running one waiting operation");
conn.useConnection();
waiter.handler.handle(Future.succeededFuture(conn));
} else {
log.debug("keeping connection idle");
conn.setIdle();
}
}
}
}
private void closeAllConnections() {
Set<SMTPConnection> copy;
if (connCount > 0) {
synchronized (this) {
copy = new HashSet<>(allConnections);
allConnections.clear();
}
for (SMTPConnection conn : copy) {
if (conn.isIdle() || conn.isBroken()) {
conn.close();
} else {
log.debug("closing connection after current send operation finishes");
conn.setDoShutdown();
}
}
} else {
this.netClient.close();
if (closeFinishedHandler != null) {
closeFinishedHandler.handle(null);
}
}
}
private void createNewConnection(Handler<AsyncResult<SMTPConnection>> handler) {
connCount++;
log.debug("Connection count is " + connCount);
createConnection(result -> {
if (result.succeeded()) {
allConnections.add(result.result());
}
handler.handle(result);
});
}
private void createConnection(Handler<AsyncResult<SMTPConnection>> handler) {
SMTPConnection conn = new SMTPConnection(netClient, this);
new SMTPStarter(conn, config, hostname, this.authOperationFactory, result -> {
if (result.succeeded()) {
handler.handle(Future.succeededFuture(conn));
} else {
handler.handle(Future.failedFuture(result.cause()));
}
}).start();
}
private static class Waiter {
private final Handler<AsyncResult<SMTPConnection>> handler;
private Waiter(Handler<AsyncResult<SMTPConnection>> handler) {
this.handler = handler;
}
}
}