/*
 *  Copyright (c) 2011-2015 The original author or authors
 *
 *  All rights reserved. This program and the accompanying materials
 *  are made available under the terms of the Eclipse Public License v1.0
 *  and Apache License v2.0 which accompanies this distribution.
 *
 *       The Eclipse Public License is available at
 *       http://www.eclipse.org/legal/epl-v10.html
 *
 *       The Apache License v2.0 is available at
 *       http://www.opensource.org/licenses/apache2.0.php
 *
 *  You may elect to redistribute this code under either of these licenses.
 */

package io.vertx.ext.mail.impl;

import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.mail.MailConfig;
import io.vertx.ext.mail.MailMessage;
import io.vertx.ext.mail.MailResult;
import io.vertx.ext.mail.mailencoder.EmailAddress;
import io.vertx.ext.mail.mailencoder.EncodedPart;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

class SMTPSendMail {

  private static final Logger log = LoggerFactory.getLogger(SMTPSendMail.class);
  private static final Pattern linePattern = Pattern.compile("\r\n");

  private final SMTPConnection connection;
  private final MailMessage email;
  private final MailConfig config;
  private final MailResult mailResult;
  private final EncodedPart encodedPart;
  private final AtomicLong written = new AtomicLong();

  SMTPSendMail(SMTPConnection connection, MailMessage email, MailConfig config,
               EncodedPart encodedPart, String messageId) {
    this.connection = connection;
    this.email = email;
    this.config = config;
    this.mailResult = new MailResult();
    this.encodedPart = encodedPart;
    this.mailResult.setMessageID(messageId);
  }

  
Starts a mail transaction.
/** * Starts a mail transaction. */
void startMailTransaction(final Handler<AsyncResult<MailResult>> resultHandler) { sendMailEvenlope() .flatMap(this::sendMailData) .onComplete(resultHandler); }
Check if message size is allowed if size is supported.

returns true if the message is allowed.

/** * Check if message size is allowed if size is supported. * <p> * returns true if the message is allowed. */
private boolean checkSize() { final int size = connection.getCapa().getSize(); return size == 0 || size >= encodedPart.size(); } private String mailFromAddress() { final String fromAddr; String bounceAddr = email.getBounceAddress(); if (bounceAddr != null && !bounceAddr.isEmpty()) { fromAddr = bounceAddr; } else { fromAddr = email.getFrom(); } EmailAddress from = new EmailAddress(fromAddr); return from.getEmail(); } private String sizeParameter() { final String sizeParameter; if (connection.getCapa().getSize() > 0) { sizeParameter = " SIZE=" + encodedPart.size(); } else { sizeParameter = ""; } return sizeParameter; } private List<String> allRecipients() { List<String> recipientAddrs = new ArrayList<>(); if (email.getTo() != null) { recipientAddrs.addAll(email.getTo()); } if (email.getCc() != null) { recipientAddrs.addAll(email.getCc()); } if (email.getBcc() != null) { recipientAddrs.addAll(email.getBcc()); } return recipientAddrs.stream().map(r -> { final String email; if (EmailAddress.POSTMASTER.equalsIgnoreCase(r)) { email = r; } else { email = new EmailAddress(r).getEmail(); } return email; }).collect(Collectors.toList()); } private Future<Boolean> sendMailEvenlope() { Promise<Boolean> evenlopePromise = Promise.promise(); try { if (checkSize()) { final String mailFromLine = "MAIL FROM:<" + mailFromAddress() + ">" + sizeParameter(); final List<String> allRecipients = allRecipients(); if (config.isPipelining() && connection.getCapa().isCapaPipelining()) { final List<String> groupCommands = new ArrayList<>(); groupCommands.add(mailFromLine); groupCommands.addAll(allRecipients.stream().map(r -> "RCPT TO:<" + r + ">").collect(Collectors.toList())); groupCommands.add("DATA"); connection.writeCommands(groupCommands, evenlopeResultStr -> { String[] evenlopeResult = linePattern.split(evenlopeResultStr); if (groupCommands.size() != evenlopeResult.length) { evenlopePromise.fail("Sent " + groupCommands.size() + " commands, but got " + evenlopeResult.length + " responses."); } else { // result follows the same order in the commands list for (int i = 0; i < evenlopeResult.length; i ++) { String message = evenlopeResult[i]; if (i == 0) { if (!StatusCode.isStatusOk(message)) { evenlopePromise.fail("sender address not accepted: " + message); return; } } else if (i < evenlopeResult.length - 1) { if (StatusCode.isStatusOk(message)) { mailResult.getRecipients().add(allRecipients.get(i - 1)); } else { if (!config.isAllowRcptErrors()) { evenlopePromise.fail("recipient address not accepted: " + message); return; } } } else { // DATA result if (StatusCode.isStatusOk(message)) { if (mailResult.getRecipients().size() == 0) { // send dot only evenlopePromise.complete(false); return; } } else { evenlopePromise.fail("DATA command not accepted: " + message); return; } } } evenlopePromise.complete(true); } }); } else { // sent line by line because PIPELINING is not supported Future<Void> future = sendMailFrom(mailFromLine); for (String email: allRecipients) { future = future.flatMap(v -> sendRcptTo(email)); } return future.flatMap(v -> sendDataCmd()); } } else { evenlopePromise.fail("message exceeds allowed size limit"); } } catch (Exception e) { evenlopePromise.fail(e); } return evenlopePromise.future(); } private Future<Void> sendMailFrom(String mailFromLine) { Promise<Void> promise = Promise.promise(); connection.write(mailFromLine, message -> { if (log.isDebugEnabled()) { written.getAndAdd(mailFromLine.length()); } if (StatusCode.isStatusOk(message)) { promise.complete(); } else { promise.fail("sender address not accepted: " + message); } }); return promise.future(); } private Future<Void> sendRcptTo(String email) { Promise<Void> promise = Promise.promise(); try { final String line = "RCPT TO:<" + email + ">"; connection.write(line, message -> { if (log.isDebugEnabled()) { written.getAndAdd(line.length()); } try { if (StatusCode.isStatusOk(message)) { mailResult.getRecipients().add(email); promise.complete(); } else { if (config.isAllowRcptErrors()) { promise.complete(); } else { promise.fail("recipient address not accepted: " + message); } } } catch (Exception e) { promise.fail(e); } }); } catch (Exception e) { promise.fail(e); } return promise.future(); } private Future<Boolean> sendDataCmd() { Promise<Boolean> promise = Promise.promise(); try { if (mailResult.getRecipients().size() > 0) { connection.write("DATA", message -> { if (log.isDebugEnabled()) { written.getAndAdd(4); } if (StatusCode.isStatusOk(message)) { promise.complete(true); } else { promise.fail("DATA command not accepted: " + message); } }); } else { promise.fail("no recipient addresses were accepted, not sending mail"); } } catch (Exception e) { promise.fail(e); } return promise.future(); } private Future<MailResult> sendMailData(boolean includeData) { if (!includeData) { return sendEndDot(); } return sendMailHeaders(this.encodedPart.headers()) .flatMap(v -> sendMailBody()) .flatMap(v -> sendEndDot()); } private Future<Void> sendMailHeaders(MultiMap headers) { Promise<Void> promise = Promise.promise(); try { StringBuilder sb = new StringBuilder(); headers.forEach(header -> sb.append(header.getKey()).append(": ").append(header.getValue()).append("\r\n")); final String headerLines = sb.toString(); connection.writeLineWithDrainPromise(headerLines, written.getAndAdd(headerLines.length()) < 1000, promise); } catch (Exception e) { promise.fail(e); } return promise.future(); } private Future<MailResult> sendEndDot() { Promise<MailResult> promise = Promise.promise(); try { connection.getContext().runOnContext(v -> connection.write(".", msg -> { if (StatusCode.isStatusOk(msg)) { promise.complete(mailResult); } else { promise.fail("sending data failed: " + msg); } })); } catch (Exception e) { promise.fail(e); } return promise.future(); } private Future<Void> sendMailBody() { Promise<Void> promise = Promise.promise(); final EncodedPart part = this.encodedPart; try { if (isMultiPart(part)) { sendMultiPart(part, 0, promise); } else { sendRegularPartBody(part, promise); } } catch (Exception e) { promise.fail(e); } return promise.future(); } private void sendMultiPart(EncodedPart multiPart, final int i, Promise<Void> promise) { try { final String boundaryStart = "--" + multiPart.boundary(); final EncodedPart thePart = multiPart.parts().get(i); Promise<Void> boundaryStartPromise = Promise.promise(); boundaryStartPromise.future() .compose(v -> sendMailHeaders(thePart.headers())).onComplete(v -> { if (v.succeeded()) { Promise<Void> nextPromise = Promise.promise(); nextPromise.future().onComplete(vv -> { if (vv.succeeded()) { if (i == multiPart.parts().size() - 1) { String boundaryEnd = boundaryStart + "--"; connection.writeLineWithDrainPromise(boundaryEnd, written.getAndAdd(boundaryEnd.length()) < 1000, promise); } else { sendMultiPart(multiPart, i + 1, promise); } } else { promise.fail(vv.cause()); } }); if (isMultiPart(thePart)) { sendMultiPart(thePart, 0, nextPromise); } else { sendRegularPartBody(thePart, nextPromise); } } else { promise.fail(v.cause()); } }); connection.writeLineWithDrainPromise(boundaryStart, written.getAndAdd(boundaryStart.length()) < 1000, boundaryStartPromise); } catch (Exception e) { promise.fail(e); } } private boolean isMultiPart(EncodedPart part) { return part.parts() != null && part.parts().size() > 0; } private void sendBodyLineByLine(String[] lines, int i, Promise<Void> promise) { if (i < lines.length) { String line = lines[i]; if (line.startsWith(".")) { line = "." + line; } Promise<Void> writeLinePromise = Promise.promise(); connection.writeLineWithDrainPromise(line, written.getAndAdd(line.length()) < 1000, writeLinePromise); writeLinePromise.future().onComplete(v -> { if (v.succeeded()) { sendBodyLineByLine(lines, i + 1, promise); } else { promise.fail(v.cause()); } }); } else { promise.complete(); } } private void sendRegularPartBody(EncodedPart part, Promise<Void> promise) { if (part.body() != null) { // send body string line by line sendBodyLineByLine(part.body().split("\n"), 0, promise); } else { ReadStream<Buffer> attachBodyStream = part.bodyStream(connection.getContext()); if (attachBodyStream != null) { attachBodyStream.pipe().endOnComplete(false).to(connection.getSocket(), promise); } else { promise.fail(new IllegalStateException("No mail body and stream found")); } } } }