package io.vertx.ext.mail.impl;
import io.vertx.core.*;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.mail.MailConfig;
import java.util.List;
class SMTPConnection {
private static final Logger log = LoggerFactory.getLogger(SMTPConnection.class);
private NetSocket ns;
private boolean socketClosed;
private boolean socketShutDown;
private Handler<String> commandReplyHandler;
private Handler<Throwable> errorHandler;
private boolean broken;
private boolean idle;
private boolean doShutdown;
private final NetClient client;
private Capabilities capa = new Capabilities();
private final ConnectionLifeCycleListener listener;
private Context context;
private final MultilineParser nsHandler;
SMTPConnection(NetClient client, ConnectionLifeCycleListener listener) {
broken = true;
idle = false;
doShutdown = false;
socketClosed = false;
socketShutDown = false;
this.client = client;
this.listener = listener;
this.nsHandler = new MultilineParser(buffer -> {
if (commandReplyHandler == null) {
log.debug("dropping reply arriving after we stopped processing the buffer.");
} else {
Handler<String> currentHandler = commandReplyHandler;
commandReplyHandler = null;
currentHandler.handle(buffer.toString());
}
});
}
Capabilities getCapa() {
return capa;
}
void parseCapabilities(String message) {
capa = new Capabilities();
capa.parseCapabilities(message);
}
void shutdown() {
broken = true;
commandReplyHandler = null;
socketShutDown = true;
if (ns != null) {
ns.close();
ns = null;
}
}
void writeCommands(List<String> commands, Handler<String> resultHandler) {
String cmds = String.join("\r\n", commands);
this.nsHandler.setExpected(commands.size());
this.write(cmds, r -> {
try {
resultHandler.handle(r);
} finally {
this.nsHandler.setExpected(1);
}
});
}
void write(String str, Handler<String> commandResultHandler) {
write(str, -1, commandResultHandler);
}
void write(String str, int blank, Handler<String> commandResultHandler) {
this.commandReplyHandler = commandResultHandler;
if (socketClosed) {
log.debug("connection was closed by server");
handleError("connection was closed by server");
} else {
if (ns != null) {
if (log.isDebugEnabled()) {
String logStr;
if (blank >= 0) {
StringBuilder sb = new StringBuilder();
for (int i = blank; i < str.length(); i++) {
sb.append('*');
}
logStr = str.substring(0, blank) + sb;
} else {
logStr = str;
}
if (logStr.length() < 1000) {
log.debug("command: " + logStr);
} else {
log.debug("command: " + logStr.substring(0, 1000) + "...");
}
}
ns.write(str + "\r\n");
} else {
log.debug("not sending command " + str + " since the netsocket is null");
}
}
}
void writeLine(String str, boolean mayLog) {
if (mayLog) {
log.debug(str);
}
ns.write(str + "\r\n");
}
void writeLineWithDrainPromise(String str, boolean mayLog, Promise<Void> promise) {
if (mayLog) {
log.debug(str);
}
if (ns.writeQueueFull()) {
ns.drainHandler(v -> {
ns.drainHandler(null);
ns.write(str + "\r\n").onComplete(promise);
});
} else {
ns.write(str + "\r\n").onComplete(promise);
}
}
private void handleError(String message) {
handleError(new NoStackTraceThrowable(message));
}
private void handleError(Throwable throwable) {
errorHandler.handle(throwable);
}
public void openConnection(MailConfig config, Handler<String> initialReplyHandler, Handler<Throwable> errorHandler) {
this.errorHandler = errorHandler;
broken = false;
idle = false;
client.connect(config.getPort(), config.getHostname(), asyncResult -> {
if (asyncResult.succeeded()) {
context = Vertx.currentContext();
ns = asyncResult.result();
socketClosed = false;
ns.exceptionHandler(e -> {
log.debug("exceptionHandler called");
if (!socketClosed && !socketShutDown && !idle && !broken) {
setBroken();
log.debug("got an exception on the netsocket", e);
handleError(e);
} else {
log.debug("not returning follow-up exception", e);
}
});
ns.closeHandler(v -> {
log.debug("socket has been closed");
listener.connectionClosed(this);
socketClosed = true;
if (!socketShutDown && !idle && !broken) {
setBroken();
log.debug("throwing: connection has been closed by the server");
handleError("connection has been closed by the server");
} else {
if (socketShutDown || broken) {
log.debug("close has been expected");
} else {
log.debug("closed while connection has been idle (timeout on server?)");
}
if (!broken) {
setBroken();
}
if (!socketShutDown) {
shutdown();
listener.dataEnded(this);
}
}
});
commandReplyHandler = initialReplyHandler;
ns.handler(this.nsHandler);
} else {
log.error("exception on connect", asyncResult.cause());
listener.connectionClosed(null);
handleError(asyncResult.cause());
}
});
}
boolean isSsl() {
return ns.isSsl();
}
void upgradeToSsl(Handler<AsyncResult<Void>> handler) {
ns.upgradeToSsl(handler);
}
public boolean isBroken() {
return broken;
}
public boolean isIdle() {
return idle;
}
public void returnToPool() {
if (isIdle()) {
log.info("state error: idle connection returned to pool");
handleError("state error: idle connection returned to pool");
} else {
if (doShutdown) {
log.debug("shutting connection down");
quitCloseConnection();
} else {
log.debug("returning connection to pool");
commandReplyHandler = null;
listener.dataEnded(this);
log.debug("setting error handler to null");
errorHandler = null;
}
}
}
private void quitCloseConnection() {
if (!socketShutDown) {
context.runOnContext(v1 -> {
log.debug("shutting down connection");
if (socketClosed) {
log.debug("connection is already closed, only doing shutdown()");
shutdown();
} else {
useConnection();
new SMTPQuit(this, v -> {
shutdown();
log.debug("connection is shut down");
}).start();
}
});
}
}
void useConnection() {
idle = false;
}
void setIdle() {
idle = true;
}
private Handler<Throwable> prevErrorHandler = null;
public void setErrorHandler(Handler<Throwable> newHandler) {
if (prevErrorHandler == null) {
prevErrorHandler = errorHandler;
}
errorHandler = newHandler;
}
public void resetErrorHandler() {
errorHandler = prevErrorHandler;
}
public void setBroken() {
if (!broken) {
log.debug("setting connection to broken");
broken = true;
commandReplyHandler = null;
log.debug("closing connection");
shutdown();
listener.dataEnded(this);
} else {
log.debug("connection is already set to broken");
}
}
public void setDoShutdown() {
log.debug("will shut down connection after send operation finishes");
doShutdown = true;
}
public void close() {
quitCloseConnection();
}
boolean isClosed() {
return socketClosed;
}
Context getContext() {
return context;
}
NetSocket getSocket() {
return ns;
}
}