/*
 * Copyright 2016 Red Hat Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.vertx.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.netty.handler.codec.mqtt.MqttQoS.*;

MQTT client implementation
/** * MQTT client implementation */
public class MqttClientImpl implements MqttClient { private enum Status { CLOSED, CONNECTING, CONNECTED, CLOSING } // patterns for topics validation private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$"); private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$"); private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class); private static final int MAX_MESSAGE_ID = 65535; private static final int MAX_TOPIC_LEN = 65535; private static final int MIN_TOPIC_LEN = 1; private static final String PROTOCOL_NAME = "MQTT"; private static final int PROTOCOL_VERSION = 4; private static final int DEFAULT_IDLE_TIMEOUT = 0; private final VertxInternal vertx; private final MqttClientOptions options; private NetSocketInternal connection; private ContextInternal ctx; // handler to call when a publish is complete private Handler<Integer> publishCompletionHandler; // handler to call when a publish has expired private Handler<Integer> publishCompletionExpirationHandler; // handler to call when a PUBACK is received for an unknown packetId private Handler<Integer> publishCompletionPhantomHandler; // handler to call when a unsubscribe request is completed private Handler<Integer> unsubscribeCompletionHandler; // handler to call when a publish message comes in private Handler<MqttPublishMessage> publishHandler; // handler to call when a subscribe request is completed private Handler<MqttSubAckMessage> subscribeCompletionHandler; // handler to call when a connection request is completed private Promise<MqttConnAckMessage> connectPromise; // handler to call when a connection disconnects private Promise<Void> disconnectPromise; // handler to call when a pingresp is received private Handler<Void> pingrespHandler; // handler to call when a problem at protocol level happens private Handler<Throwable> exceptionHandler; //handler to call when the remote MQTT server closes the connection private Handler<Void> closeHandler; // storage of PUBLISH QoS=1 messages which was not responded with PUBACK private HashMap<Integer, ExpiringPacket> qos1outbound = new HashMap<>(); // storage of PUBLISH QoS=2 messages which was not responded with PUBREC // and PUBREL messages which was not responded with PUBCOMP private HashMap<Integer, ExpiringPacket> qos2outbound = new HashMap<>(); // storage of PUBLISH messages which was responded with PUBREC private HashMap<Integer, MqttMessage> qos2inbound = new HashMap<>(); // counter for the message identifier private int messageIdCounter; // Keep alive management private final long keepAliveTimeout; private Deque<Ping> pings = new ArrayDeque<>(); // total number of unacknowledged packets private int countInflightQueue; private NetClient client; private Status status = Status.CLOSED;
Constructor
Params:
  • vertx – Vert.x instance
  • options – MQTT client options
/** * Constructor * * @param vertx Vert.x instance * @param options MQTT client options */
public MqttClientImpl(Vertx vertx, MqttClientOptions options) { // copy given options NetClientOptions netClientOptions = new NetClientOptions(options); netClientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); this.vertx = (VertxInternal) vertx; this.options = new MqttClientOptions(options); this.keepAliveTimeout = ((options.getKeepAliveInterval() * 1000) * 3) / 2; } int getInFlightMessagesCount() { synchronized (this) { return countInflightQueue; } } @Override public Future<MqttConnAckMessage> connect(int port, String host) { return this.doConnect(port, host, null); } /** * See {@link MqttClient#connect(int, String, Handler)} for more details */ @Override public MqttClient connect(int port, String host, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) { Future<MqttConnAckMessage> fut = connect(port, host); if (connectHandler != null) { fut.onComplete(connectHandler); } return this; } @Override public Future<MqttConnAckMessage> connect(int port, String host, String serverName) { return connect(port, host, serverName); } /** * See {@link MqttClient#connect(int, String, String, Handler)} for more details */ @Override public MqttClient connect(int port, String host, String serverName, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) { Future<MqttConnAckMessage> fut = this.doConnect(port, host, serverName); if (connectHandler != null) { fut.onComplete(connectHandler); } return this; } private Future<MqttConnAckMessage> doConnect(int port, String host, String serverName) { ContextInternal ctx = vertx.getOrCreateContext(); NetClient client = vertx.createNetClient(options, new CloseFuture()); PromiseInternal<MqttConnAckMessage> connectPromise = ctx.promise(); PromiseInternal<Void> disconnectPromise = ctx.promise(); synchronized (this) { if (this.status != Status.CLOSED) { return ctx.failedFuture(new IllegalStateException("Client " + this.status.name().toLowerCase())); } this.status = Status.CONNECTING; this.ctx = ctx; this.connectPromise = connectPromise; this.disconnectPromise = disconnectPromise; this.client = client; } ctx.runOnContext(v -> { log.debug(String.format("Trying to connect with %s:%d", host, port)); client.connect(port, host, serverName, done -> { // the TCP connection fails if (done.failed()) { log.error(String.format("Can't connect to %s:%d", host, port), done.cause()); synchronized (this) { this.connectPromise = null; this.disconnectPromise = null; this.ctx = null; this.client = null; } client.close(); connectPromise.fail(done.cause()); disconnectPromise.complete(); } else { log.info(String.format("Connection with %s:%d established successfully", host, port)); boolean closing; synchronized (MqttClientImpl.this) { if (closing = (status == Status.CLOSING)) { this.status = Status.CLOSED; this.client = null; this.connectPromise = null; this.disconnectPromise = null; } } NetSocketInternal soi = (NetSocketInternal) done.result(); if (closing) { soi.close(); connectPromise.fail("Disconnected"); disconnectPromise.complete(); return; } if (options.isAutoGeneratedClientId() && (options.getClientId() == null || options.getClientId().isEmpty())) { options.setClientId(generateRandomClientId()); } initChannel(soi); synchronized (MqttClientImpl.this) { this.connection = soi; } soi.messageHandler(msg -> this.handleMessage(soi.channelHandlerContext(), msg)); soi.closeHandler(v2 -> { client.close(); synchronized (MqttClientImpl.this) { this.connection = null; this.status = Status.CLOSED; this.connectPromise = null; this.disconnectPromise = null; } connectPromise.fail("Closed"); disconnectPromise.complete(); }); // an exception at connection level soi.exceptionHandler(this::handleException); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, AT_MOST_ONCE, false, 0); MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( PROTOCOL_NAME, PROTOCOL_VERSION, options.hasUsername(), options.hasPassword(), options.isWillRetain(), options.getWillQoS(), options.isWillFlag(), options.isCleanSession(), options.getKeepAliveInterval() ); MqttConnectPayload payload = new MqttConnectPayload( options.getClientId() == null ? "" : options.getClientId(), options.getWillTopic(), options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, options.hasUsername() ? options.getUsername() : null, options.hasPassword() ? options.getPassword().getBytes() : null ); io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); this.write(connect); } }); }); return connectPromise.future(); }
See MqttClient.disconnect() for more details
/** * See {@link MqttClient#disconnect()} for more details */
@Override public Future<Void> disconnect() { NetSocketInternal connection; Status status; Future<Void> fut; synchronized (this) { status = this.status; switch (this.status) { case CLOSED: return vertx.getOrCreateContext().succeededFuture(); case CONNECTED: this.status = Status.CLOSING; connection = this.connection; break; case CONNECTING: this.status = Status.CLOSING; connection = this.connection; break; case CLOSING: connection = null; break; default: throw new AssertionError(); } fut = this.disconnectPromise.future(); } if (connection != null) { if (status == Status.CONNECTED) { MqttFixedHeader fixedHeader = new MqttFixedHeader( MqttMessageType.DISCONNECT, false, AT_MOST_ONCE, false, 0 ); io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null); connection.writeMessage(disconnect); } connection.close(); } return fut; } /** * See {@link MqttClient#disconnect(Handler)} for more details */ @Override public MqttClient disconnect(Handler<AsyncResult<Void>> disconnectHandler) { Future<Void> fut = disconnect(); if (disconnectHandler != null) { fut.onComplete(disconnectHandler); } return this; } /** * See {@link MqttClient#publish(String, Buffer, MqttQoS, boolean, boolean)} for more details */ @Override public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) { if (MqttQoS.FAILURE == qosLevel) { throw new IllegalArgumentException("QoS level must be one of AT_MOST_ONCE, AT_LEAST_ONCE or EXACTLY_ONCE"); } io.netty.handler.codec.mqtt.MqttMessage publish; MqttPublishVariableHeader variableHeader; synchronized (this) { if (countInflightQueue >= options.getMaxInflightQueue()) { String msg = String.format("Attempt to exceed the limit of %d inflight messages", options.getMaxInflightQueue()); log.error(msg); MqttException exception = new MqttException(MqttException.MQTT_INFLIGHT_QUEUE_FULL, msg); return ctx.failedFuture(exception); } if (!isValidTopicName(topic)) { String msg = String.format("Invalid Topic Name - %s. It mustn't contains wildcards: # and +. Also it can't contains U+0000(NULL) chars", topic); log.error(msg); MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_NAME, msg); return ctx.failedFuture(exception); } MqttFixedHeader fixedHeader = new MqttFixedHeader( MqttMessageType.PUBLISH, isDup, qosLevel, isRetain, 0 ); ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes()); variableHeader = new MqttPublishVariableHeader(topic, nextMessageId()); publish = MqttMessageFactory.newMessage(fixedHeader, variableHeader, buf); switch (qosLevel) { case AT_LEAST_ONCE: qos1outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubackTimeout, variableHeader.packetId())); countInflightQueue++; break; case EXACTLY_ONCE: qos2outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubrecTimeout, variableHeader.packetId())); countInflightQueue++; break; default: // nothing to do for AT_MOST_ONCE break; } } return this.write(publish).map(variableHeader.packetId()); } /** * See {@link MqttClient#publish(String, Buffer, MqttQoS, boolean, boolean, Handler)} for more details */ @Override public MqttClient publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, Handler<AsyncResult<Integer>> publishSentHandler) { Future<Integer> fut = publish(topic, payload, qosLevel, isDup, isRetain); if (publishSentHandler != null) { fut.onComplete(publishSentHandler); } return this; } /** * See {@link MqttClient#publishCompletionHandler(Handler)} for more details */ @Override public MqttClient publishCompletionHandler(Handler<Integer> publishCompletionHandler) { this.publishCompletionHandler = publishCompletionHandler; return this; } private synchronized Handler<Integer> publishCompletionHandler() { return this.publishCompletionHandler; }
{@inheritDoc}
/** * {@inheritDoc} */
@Override public MqttClient publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler) { this.publishCompletionExpirationHandler = publishCompletionExpirationHandler; return this; } private synchronized Handler<Integer> publishCompletionExpirationHandler() { return this.publishCompletionExpirationHandler; }
{@inheritDoc}
/** * {@inheritDoc} */
@Override public MqttClient publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionPhantomHandler) { this.publishCompletionPhantomHandler = publishCompletionPhantomHandler; return this; } private synchronized Handler<Integer> publishCompletionUnknownPacketIdHandler() { return this.publishCompletionPhantomHandler; } /** * See {@link MqttClient#publishHandler(Handler)} for more details */ @Override public MqttClient publishHandler(Handler<MqttPublishMessage> publishHandler) { this.publishHandler = publishHandler; return this; } private synchronized Handler<MqttPublishMessage> publishHandler() { return this.publishHandler; } /** * See {@link MqttClient#subscribeCompletionHandler(Handler)} for more details */ @Override public MqttClient subscribeCompletionHandler(Handler<MqttSubAckMessage> subscribeCompletionHandler) { this.subscribeCompletionHandler = subscribeCompletionHandler; return this; } private synchronized Handler<MqttSubAckMessage> subscribeCompletionHandler() { return this.subscribeCompletionHandler; }
See MqttClient.subscribe(String, int) for more details
/** * See {@link MqttClient#subscribe(String, int)} for more details */
@Override public Future<Integer> subscribe(String topic, int qos) { return subscribe(Collections.singletonMap(topic, qos)); } /** * See {@link MqttClient#subscribe(String, int, Handler)} for more details */ @Override public MqttClient subscribe(String topic, int qos, Handler<AsyncResult<Integer>> subscribeSentHandler) { return subscribe(Collections.singletonMap(topic, qos), subscribeSentHandler); } /** * See {@link MqttClient#subscribe(Map)} for more details */ @Override public Future<Integer> subscribe(Map<String, Integer> topics) { Map<String, Integer> invalidTopics = topics.entrySet() .stream() .filter(e -> !isValidTopicFilter(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); if (invalidTopics.size() > 0) { String msg = String.format("Invalid Topic Filters: %s", invalidTopics); log.error(msg); MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_FILTER, msg); return ctx.failedFuture(exception); } MqttFixedHeader fixedHeader = new MqttFixedHeader( MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); List<MqttTopicSubscription> subscriptions = topics.entrySet() .stream() .map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue()))) .collect(Collectors.toList()); MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions); io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); return this.write(subscribe).map(variableHeader.messageId()); } /** * See {@link MqttClient#subscribe(Map, Handler)} for more details */ @Override public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Integer>> subscribeSentHandler) { Future<Integer> fut = subscribe(topics); if (subscribeSentHandler != null) { fut.onComplete(subscribeSentHandler); } return this; } /** * See {@link MqttClient#unsubscribeCompletionHandler(Handler)} for more details */ @Override public MqttClient unsubscribeCompletionHandler(Handler<Integer> unsubscribeCompletionHandler) { this.unsubscribeCompletionHandler = unsubscribeCompletionHandler; return this; } private synchronized Handler<Integer> unsubscribeCompletionHandler() { return this.unsubscribeCompletionHandler; } /** * See {@link MqttClient#unsubscribe(String, Handler)} )} for more details */ @Override public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) { Future<Integer> fut = unsubscribe(topic); if (unsubscribeSentHandler != null) { fut.onComplete(unsubscribeSentHandler); } return this; }
See MqttClient.unsubscribe(String) )} for more details
/** * See {@link MqttClient#unsubscribe(String)} )} for more details */
@Override public Future<Integer> unsubscribe(String topic) { MqttFixedHeader fixedHeader = new MqttFixedHeader( MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList())); io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); this.write(unsubscribe); return ctx.succeededFuture(variableHeader.messageId()); } /** * See {@link MqttClient#pingResponseHandler(Handler)} for more details */ @Override public synchronized MqttClient pingResponseHandler(Handler<Void> pingResponseHandler) { this.pingrespHandler = pingResponseHandler; return this; } private synchronized Handler<Void> pingResponseHandler() { return this.pingrespHandler; } /** * See {@link MqttClient#exceptionHandler(Handler)} for more details */ @Override public synchronized MqttClient exceptionHandler(Handler<Throwable> handler) { exceptionHandler = handler; return this; } private synchronized Handler<Throwable> exceptionHandler() { return this.exceptionHandler; } /** * See {@link MqttClient#closeHandler(Handler)} for more details */ @Override public synchronized MqttClient closeHandler(Handler<Void> closeHandler) { this.closeHandler = closeHandler; return this; } private synchronized Handler<Void> closeHandler() { return this.closeHandler; } private class Ping { final long id; private Ping(long id) { this.id = id; } void ack() { vertx.cancelTimer(id); } void cancel() { vertx.cancelTimer(id); } }
See MqttClient.ping() for more details
/** * See {@link MqttClient#ping()} for more details */
@Override public MqttClient ping() { if (Vertx.currentContext() == ctx) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0); io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null); long id = vertx.setTimer(keepAliveTimeout, _id -> { disconnect(); }); pings.add(new Ping(id)); this.write(pingreq); } else { ctx.runOnContext(v -> ping()); } return this; } @Override public synchronized String clientId() { return this.options.getClientId(); } @Override public synchronized boolean isConnected() { return this.status == Status.CONNECTED; }
Sends PUBACK packet to server
Params:
  • publishMessageId – identifier of the PUBLISH message to acknowledge
/** * Sends PUBACK packet to server * * @param publishMessageId identifier of the PUBLISH message to acknowledge */
private void publishAcknowledge(int publishMessageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessageId); io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); this.write(puback); }
Sends PUBREC packet to server
Params:
  • publishMessage – a PUBLISH message to acknowledge
/** * Sends PUBREC packet to server * * @param publishMessage a PUBLISH message to acknowledge */
private void publishReceived(MqttPublishMessage publishMessage) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessage.messageId()); io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); synchronized (this) { qos2inbound.put(publishMessage.messageId(), publishMessage); } this.write(pubrec); }
Sends PUBCOMP packet to server
Params:
  • publishMessageId – identifier of the PUBLISH message to acknowledge
/** * Sends PUBCOMP packet to server * * @param publishMessageId identifier of the PUBLISH message to acknowledge */
private void publishComplete(int publishMessageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessageId); io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); this.write(pubcomp); }
Sends the PUBREL message to server
Params:
  • publishMessageId – identifier of the PUBLISH message to acknowledge
/** * Sends the PUBREL message to server * * @param publishMessageId identifier of the PUBLISH message to acknowledge */
private void publishRelease(int publishMessageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessageId); io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); synchronized (this) { qos2outbound.put(publishMessageId, new ExpiringPacket(this::handlePubcompTimeout, publishMessageId)); } this.write(pubrel); } private void initChannel(NetSocketInternal sock) { ChannelPipeline pipeline = sock.channelHandlerContext().pipeline(); // add into pipeline netty's (en/de)coder pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize())); } else { // max message size not set, so the default from Netty MQTT codec is used pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder()); } if (this.options.isAutoKeepAlive() && this.options.getKeepAliveInterval() != 0) { int keepAliveInterval = this.options.getKeepAliveInterval(); // handler for sending PINGREQ (keepAlive) if reader- or writer-channel become idle pipeline.addBefore("handler", "idle", new IdleStateHandler(0, 0, keepAliveInterval) { @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { if (evt.state() == IdleState.ALL_IDLE) { // verify that server is still connected (e.g. when using QoS-0) ping(); } } }); } }
Update and return the next message identifier
Returns:message identifier
/** * Update and return the next message identifier * * @return message identifier */
private synchronized int nextMessageId() { // if 0 or MAX_MESSAGE_ID, it becomes 1 (first valid messageId) this.messageIdCounter = ((this.messageIdCounter % MAX_MESSAGE_ID) != 0) ? this.messageIdCounter + 1 : 1; return this.messageIdCounter; } private synchronized NetSocketInternal connection() { return connection; } private Future<Void> write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) { log.debug(String.format("Sending packet %s", mqttMessage)); return this.connection().writeMessage(mqttMessage); }
Used for calling the close handler when the remote MQTT server closes the connection
/** * Used for calling the close handler when the remote MQTT server closes the connection */
private void handleClosed() { Promise<MqttConnAckMessage> connectPromise; Promise<Void> disconnectPromise; NetClient client; Deque<Ping> pings; synchronized (this) { client = this.client; connectPromise = this.connectPromise; disconnectPromise = this.disconnectPromise; pings = this.pings; this.disconnectPromise = null; this.status = Status.CLOSED; this.connection = null; this.ctx = null; this.client = null; this.pings = new ArrayDeque<>(); } // Cleanup pending pings pings.forEach(ping -> { ping.cancel(); }); Handler<Void> handler = closeHandler(); if (handler != null) { handler.handle(null); } disconnectPromise.complete(); if (connectPromise != null) { connectPromise.fail("Closed"); } client.close(); }
Handle the MQTT message received from the remote MQTT server
Params:
  • msg – Incoming Packet
/** * Handle the MQTT message received from the remote MQTT server * * @param msg Incoming Packet */
private void handleMessage(ChannelHandlerContext chctx, Object msg) { // handling directly native Netty MQTT messages, some of them are translated // to the related Vert.x ones for polyglotization if (msg instanceof io.netty.handler.codec.mqtt.MqttMessage) { io.netty.handler.codec.mqtt.MqttMessage mqttMessage = (io.netty.handler.codec.mqtt.MqttMessage) msg; DecoderResult result = mqttMessage.decoderResult(); if (result.isFailure()) { chctx.pipeline().fireExceptionCaught(result.cause()); return; } if (!result.isFinished()) { chctx.pipeline().fireExceptionCaught(new Exception("Unfinished message")); return; } log.debug(String.format("Incoming packet %s", msg)); switch (mqttMessage.fixedHeader().messageType()) { case CONNACK: io.netty.handler.codec.mqtt.MqttConnAckMessage connack = (io.netty.handler.codec.mqtt.MqttConnAckMessage) mqttMessage; MqttConnAckMessage mqttConnAckMessage = MqttConnAckMessage.create( connack.variableHeader().connectReturnCode(), connack.variableHeader().isSessionPresent()); handleConnack(mqttConnAckMessage); break; case PUBLISH: io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage; ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload(), chctx.alloc()); MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create( publish.variableHeader().packetId(), publish.fixedHeader().qosLevel(), publish.fixedHeader().isDup(), publish.fixedHeader().isRetain(), publish.variableHeader().topicName(), newBuf); handlePublish(mqttPublishMessage); break; case PUBACK: handlePuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId()); break; case PUBREC: handlePubrec(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId()); break; case PUBREL: handlePubrel(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId()); break; case PUBCOMP: handlePubcomp(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId()); break; case SUBACK: io.netty.handler.codec.mqtt.MqttSubAckMessage unsuback = (io.netty.handler.codec.mqtt.MqttSubAckMessage) mqttMessage; MqttSubAckMessage mqttSubAckMessage = MqttSubAckMessage.create( unsuback.variableHeader().messageId(), unsuback.payload().grantedQoSLevels()); handleSuback(mqttSubAckMessage); break; case UNSUBACK: handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId()); break; case PINGRESP: handlePingresp(); break; default: chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type " + msg.getClass().getName())); break; } } else { chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type")); } }
Used for calling the pingresp handler when the server replies to the ping
/** * Used for calling the pingresp handler when the server replies to the ping */
private void handlePingresp() { Ping ping = pings.poll(); if (ping != null) { ping.ack(); } Handler<Void> handler = pingResponseHandler(); if (handler != null) { handler.handle(null); } }
Used for calling the unsuback handler when the server acks an unsubscribe
Params:
  • unsubackMessageId – identifier of the subscribe acknowledged by the server
/** * Used for calling the unsuback handler when the server acks an unsubscribe * * @param unsubackMessageId identifier of the subscribe acknowledged by the server */
private void handleUnsuback(int unsubackMessageId) { Handler<Integer> handler = unsubscribeCompletionHandler(); if (handler != null) { handler.handle(unsubackMessageId); } }
Used for calling the puback handler when the server acknowledge a QoS 1 message with puback
Params:
  • pubackMessageId – identifier of the message acknowledged by the server
/** * Used for calling the puback handler when the server acknowledge a QoS 1 message with puback * * @param pubackMessageId identifier of the message acknowledged by the server */
private void handlePuback(int pubackMessageId) { synchronized (this) { ExpiringPacket removedPacket = qos1outbound.remove(pubackMessageId); if (removedPacket == null) { log.debug("Received PUBACK packet without having related PUBLISH packet in storage"); // PUBACK has been received after timer has already fired Handler<Integer> handler = publishCompletionUnknownPacketIdHandler(); if (handler != null) { handler.handle(pubackMessageId); } return; } removedPacket.cancelTimer(); countInflightQueue--; } Handler<Integer> handler = publishCompletionHandler(); if (handler != null) { handler.handle(pubackMessageId); } } private void handlePubackTimeout(int packetId) { ExpiringPacket expiredMessage; synchronized (this) { expiredMessage = qos1outbound.remove(packetId); if (expiredMessage == null) { // the message has already been ACKed log.debug("PUBLISH expiration timer fired but QoS 1 message has already been PUBACKed by server"); return; } } countInflightQueue--; Handler<Integer> handler = publishCompletionExpirationHandler(); if (handler != null) { handler.handle(expiredMessage.packetId); } }
Used for calling the pubcomp handler when the server client acknowledge a QoS 2 message with pubcomp
Params:
  • pubcompMessageId – identifier of the message acknowledged by the server
/** * Used for calling the pubcomp handler when the server client acknowledge a QoS 2 message with pubcomp * * @param pubcompMessageId identifier of the message acknowledged by the server */
private void handlePubcomp(int pubcompMessageId) { synchronized (this) { ExpiringPacket removedPacket = qos2outbound.remove(pubcompMessageId); if (removedPacket == null) { log.debug("Received PUBCOMP packet without having related PUBREL packet in storage"); Handler<Integer> handler = publishCompletionUnknownPacketIdHandler(); if (handler != null) { handler.handle(pubcompMessageId); } return; } removedPacket.cancelTimer(); countInflightQueue--; } Handler<Integer> handler = publishCompletionHandler(); if (handler != null) { handler.handle(pubcompMessageId); } } private void handlePubcompTimeout(int packetId) { ExpiringPacket expiredMessage; synchronized (this) { expiredMessage = qos2outbound.remove(packetId); if (expiredMessage == null) { log.debug("PUBCOMP expiration timer fired but QoS 2 message has already been PUBCOMPed by server"); return; } } countInflightQueue--; Handler<Integer> handler = publishCompletionExpirationHandler(); if (handler != null) { handler.handle(expiredMessage.packetId); } }
Used for sending the pubrel when a pubrec is received from the server
Params:
  • pubrecMessageId – identifier of the message acknowledged by server
/** * Used for sending the pubrel when a pubrec is received from the server * * @param pubrecMessageId identifier of the message acknowledged by server */
private void handlePubrec(int pubrecMessageId) { synchronized (this) { ExpiringPacket removedPacket = qos2outbound.remove(pubrecMessageId); if (removedPacket == null) { log.debug("Received PUBREC packet without having related PUBLISH packet in storage"); Handler<Integer> handler = publishCompletionUnknownPacketIdHandler(); if (handler != null) { handler.handle(pubrecMessageId); } return; } removedPacket.cancelTimer(); } this.publishRelease(pubrecMessageId); } private void handlePubrecTimeout(int packetId) { ExpiringPacket expiredMessage; synchronized (this) { expiredMessage = qos2outbound.remove(packetId); if (expiredMessage == null) { log.debug("PUBREC expiration timer fired but QoS 2 message has already been PUBRECed by server"); return; } } countInflightQueue--; Handler<Integer> handler = publishCompletionExpirationHandler(); if (handler != null) { handler.handle(expiredMessage.packetId); } }
Used for calling the suback handler when the server acknowledges subscribe to topics
Params:
  • msg – message with suback information
/** * Used for calling the suback handler when the server acknowledges subscribe to topics * * @param msg message with suback information */
private void handleSuback(MqttSubAckMessage msg) { Handler<MqttSubAckMessage> handler = subscribeCompletionHandler(); if (handler != null) { handler.handle(msg); } }
Used for calling the publish handler when the server publishes a message
Params:
  • msg – published message
/** * Used for calling the publish handler when the server publishes a message * * @param msg published message */
private void handlePublish(MqttPublishMessage msg) { Handler<MqttPublishMessage> handler; switch (msg.qosLevel()) { case AT_MOST_ONCE: handler = this.publishHandler(); if (handler != null) { handler.handle(msg); } break; case AT_LEAST_ONCE: this.publishAcknowledge(msg.messageId()); handler = this.publishHandler(); if (handler != null) { handler.handle(msg); } break; case EXACTLY_ONCE: this.publishReceived(msg); // we will handle the PUBLISH when a PUBREL comes break; } }
Used for calling the pubrel handler when the server acknowledge a QoS 2 message with pubrel
Params:
  • pubrelMessageId – identifier of the message acknowledged by the server
/** * Used for calling the pubrel handler when the server acknowledge a QoS 2 message with pubrel * * @param pubrelMessageId identifier of the message acknowledged by the server */
private void handlePubrel(int pubrelMessageId) { MqttMessage message; synchronized (this) { message = qos2inbound.remove(pubrelMessageId); if (message == null) { log.warn("Received PUBREL packet without having related PUBREC packet in storage"); return; } this.publishComplete(pubrelMessageId); } Handler<MqttPublishMessage> handler = this.publishHandler(); if (handler != null) { handler.handle((MqttPublishMessage) message); } }
Used for calling the connect handler when the server replies to the request
Params:
  • msg – connection response message
/** * Used for calling the connect handler when the server replies to the request * * @param msg connection response message */
private void handleConnack(MqttConnAckMessage msg) { Status status = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED ? Status.CONNECTED : Status.CLOSING; if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) { NetSocketInternal connection; Promise<MqttConnAckMessage> connectPromise; synchronized (this) { connection = this.connection; connectPromise = this.connectPromise; this.connectPromise = null; this.status = Status.CONNECTED; } connection.closeHandler(v -> handleClosed()); connectPromise.complete(msg); } else { Promise<MqttConnAckMessage> connectPromise; Promise<Void> disconnectPromise; NetSocketInternal connection; NetClient client; synchronized (this) { connectPromise = this.connectPromise; disconnectPromise = this.disconnectPromise; connection = this.connection; client = this.client; this.connectPromise = null; this.disconnectPromise = null; this.status = Status.CLOSED; this.connection = null; this.client = null; } connection.closeHandler(null); MqttConnectionException exception = new MqttConnectionException(msg.code()); log.error(String.format("Connection refused by the server - code: %s", msg.code())); connectPromise.fail(exception); disconnectPromise.complete(); client.close(); } }
Used for calling the exception handler when an error at connection level
Params:
  • t – exception raised
/** * Used for calling the exception handler when an error at connection level * * @param t exception raised */
private void handleException(Throwable t) { Handler<Throwable> handler = exceptionHandler(); if (handler != null) { handler.handle(t); } }
Returns:Randomly-generated ClientId
/** * @return Randomly-generated ClientId */
private String generateRandomClientId() { return UUID.randomUUID().toString(); }
Check either given Topic Name valid of not
Params:
  • topicName – given Topic Name
Returns:true - valid, otherwise - false
/** * Check either given Topic Name valid of not * * @param topicName given Topic Name * @return true - valid, otherwise - false */
private boolean isValidTopicName(String topicName) { if(!isValidStringSizeInUTF8(topicName)){ return false; } Matcher matcher = validTopicNamePattern.matcher(topicName); return matcher.find(); }
Check either given Topic Filter valid of not
Params:
  • topicFilter – given Topic Filter
Returns:true - valid, otherwise - false
/** * Check either given Topic Filter valid of not * * @param topicFilter given Topic Filter * @return true - valid, otherwise - false */
private boolean isValidTopicFilter(String topicFilter) { if(!isValidStringSizeInUTF8(topicFilter)){ return false; } Matcher matcher = validTopicFilterPattern.matcher(topicFilter); return matcher.find(); }
Check either given string has size more then 65535 bytes in UTF-8 Encoding
Params:
  • string – given string
Returns:true - size is lower or equal than 65535, otherwise - false
/** * Check either given string has size more then 65535 bytes in UTF-8 Encoding * * @param string given string * @return true - size is lower or equal than 65535, otherwise - false */
private boolean isValidStringSizeInUTF8(String string) { try { int length = string.getBytes("UTF-8").length; return length >= MIN_TOPIC_LEN && length <= MAX_TOPIC_LEN; } catch (UnsupportedEncodingException e) { log.error("UTF-8 charset is not supported", e); } return false; }
A wrapper around a packet ID for which the client will wait a limited time for the server's ACK to arrive.
/** * A wrapper around a packet ID for which the client will wait a limited time * for the server's ACK to arrive. */
private class ExpiringPacket { private final int packetId; private final long timerId;
Creates a new expiring packet.
Params:
  • timeoutHandler – The handler to invoke once the client stops waiting for the server's ACK.
  • packetId – The packet ID.
/** * Creates a new expiring packet. * * @param timeoutHandler The handler to invoke once the client stops waiting for the server's ACK. * @param packetId The packet ID. */
ExpiringPacket(Handler<Integer> timeoutHandler, final int packetId) { this.packetId = packetId; if (options.getAckTimeout() > -1) { this.timerId = vertx.setTimer(options.getAckTimeout() * 1000L, tid -> timeoutHandler.handle(packetId)); } else { // default MQTT client behavior, // don't start a timer for expiring the publish this.timerId = -1; } }
Cancels the timer created for expiring the ACK.

This method should be invoked once the server's ACK for the packet ID has arrived in order to prevent the client from timing out while waiting for an ACK.

Returns:true if the timer has been canceled.
/** * Cancels the timer created for expiring the ACK. * <p> * This method should be invoked once the server's ACK for the packet ID has arrived * in order to prevent the client from timing out while waiting for an ACK. * * @return {@code true} if the timer has been canceled. */
boolean cancelTimer() { return vertx.cancelTimer(timerId); } } }