package io.vertx.mqtt.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
public class MqttClientImpl implements MqttClient {
private enum Status { CLOSED, CONNECTING, CONNECTED, CLOSING }
private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$");
private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$");
private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class);
private static final int MAX_MESSAGE_ID = 65535;
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;
private final VertxInternal vertx;
private final MqttClientOptions options;
private NetSocketInternal connection;
private ContextInternal ctx;
private Handler<Integer> publishCompletionHandler;
private Handler<Integer> publishCompletionExpirationHandler;
private Handler<Integer> publishCompletionPhantomHandler;
private Handler<Integer> unsubscribeCompletionHandler;
private Handler<MqttPublishMessage> publishHandler;
private Handler<MqttSubAckMessage> subscribeCompletionHandler;
private Promise<MqttConnAckMessage> connectPromise;
private Promise<Void> disconnectPromise;
private Handler<Void> pingrespHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private HashMap<Integer, ExpiringPacket> qos1outbound = new HashMap<>();
private HashMap<Integer, ExpiringPacket> qos2outbound = new HashMap<>();
private HashMap<Integer, MqttMessage> qos2inbound = new HashMap<>();
private int messageIdCounter;
private final long keepAliveTimeout;
private Deque<Ping> pings = new ArrayDeque<>();
private int countInflightQueue;
private NetClient client;
private Status status = Status.CLOSED;
public MqttClientImpl(Vertx vertx, MqttClientOptions options) {
NetClientOptions netClientOptions = new NetClientOptions(options);
netClientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
this.vertx = (VertxInternal) vertx;
this.options = new MqttClientOptions(options);
this.keepAliveTimeout = ((options.getKeepAliveInterval() * 1000) * 3) / 2;
}
int getInFlightMessagesCount() {
synchronized (this) {
return countInflightQueue;
}
}
@Override
public Future<MqttConnAckMessage> connect(int port, String host) {
return this.doConnect(port, host, null);
}
@Override
public MqttClient connect(int port, String host, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
Future<MqttConnAckMessage> fut = connect(port, host);
if (connectHandler != null) {
fut.onComplete(connectHandler);
}
return this;
}
@Override
public Future<MqttConnAckMessage> connect(int port, String host, String serverName) {
return connect(port, host, serverName);
}
@Override
public MqttClient connect(int port, String host, String serverName, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
Future<MqttConnAckMessage> fut = this.doConnect(port, host, serverName);
if (connectHandler != null) {
fut.onComplete(connectHandler);
}
return this;
}
private Future<MqttConnAckMessage> doConnect(int port, String host, String serverName) {
ContextInternal ctx = vertx.getOrCreateContext();
NetClient client = vertx.createNetClient(options, new CloseFuture());
PromiseInternal<MqttConnAckMessage> connectPromise = ctx.promise();
PromiseInternal<Void> disconnectPromise = ctx.promise();
synchronized (this) {
if (this.status != Status.CLOSED) {
return ctx.failedFuture(new IllegalStateException("Client " + this.status.name().toLowerCase()));
}
this.status = Status.CONNECTING;
this.ctx = ctx;
this.connectPromise = connectPromise;
this.disconnectPromise = disconnectPromise;
this.client = client;
}
ctx.runOnContext(v -> {
log.debug(String.format("Trying to connect with %s:%d", host, port));
client.connect(port, host, serverName, done -> {
if (done.failed()) {
log.error(String.format("Can't connect to %s:%d", host, port), done.cause());
synchronized (this) {
this.connectPromise = null;
this.disconnectPromise = null;
this.ctx = null;
this.client = null;
}
client.close();
connectPromise.fail(done.cause());
disconnectPromise.complete();
} else {
log.info(String.format("Connection with %s:%d established successfully", host, port));
boolean closing;
synchronized (MqttClientImpl.this) {
if (closing = (status == Status.CLOSING)) {
this.status = Status.CLOSED;
this.client = null;
this.connectPromise = null;
this.disconnectPromise = null;
}
}
NetSocketInternal soi = (NetSocketInternal) done.result();
if (closing) {
soi.close();
connectPromise.fail("Disconnected");
disconnectPromise.complete();
return;
}
if (options.isAutoGeneratedClientId() && (options.getClientId() == null || options.getClientId().isEmpty())) {
options.setClientId(generateRandomClientId());
}
initChannel(soi);
synchronized (MqttClientImpl.this) {
this.connection = soi;
}
soi.messageHandler(msg -> this.handleMessage(soi.channelHandlerContext(), msg));
soi.closeHandler(v2 -> {
client.close();
synchronized (MqttClientImpl.this) {
this.connection = null;
this.status = Status.CLOSED;
this.connectPromise = null;
this.disconnectPromise = null;
}
connectPromise.fail("Closed");
disconnectPromise.complete();
});
soi.exceptionHandler(this::handleException);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
AT_MOST_ONCE,
false,
0);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveInterval()
);
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes() : null
);
io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(connect);
}
});
});
return connectPromise.future();
}
@Override
public Future<Void> disconnect() {
NetSocketInternal connection;
Status status;
Future<Void> fut;
synchronized (this) {
status = this.status;
switch (this.status) {
case CLOSED:
return vertx.getOrCreateContext().succeededFuture();
case CONNECTED:
this.status = Status.CLOSING;
connection = this.connection;
break;
case CONNECTING:
this.status = Status.CLOSING;
connection = this.connection;
break;
case CLOSING:
connection = null;
break;
default:
throw new AssertionError();
}
fut = this.disconnectPromise.future();
}
if (connection != null) {
if (status == Status.CONNECTED) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.DISCONNECT,
false,
AT_MOST_ONCE,
false,
0
);
io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null);
connection.writeMessage(disconnect);
}
connection.close();
}
return fut;
}
@Override
public MqttClient disconnect(Handler<AsyncResult<Void>> disconnectHandler) {
Future<Void> fut = disconnect();
if (disconnectHandler != null) {
fut.onComplete(disconnectHandler);
}
return this;
}
@Override
public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
if (MqttQoS.FAILURE == qosLevel) {
throw new IllegalArgumentException("QoS level must be one of AT_MOST_ONCE, AT_LEAST_ONCE or EXACTLY_ONCE");
}
io.netty.handler.codec.mqtt.MqttMessage publish;
MqttPublishVariableHeader variableHeader;
synchronized (this) {
if (countInflightQueue >= options.getMaxInflightQueue()) {
String msg = String.format("Attempt to exceed the limit of %d inflight messages", options.getMaxInflightQueue());
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INFLIGHT_QUEUE_FULL, msg);
return ctx.failedFuture(exception);
}
if (!isValidTopicName(topic)) {
String msg = String.format("Invalid Topic Name - %s. It mustn't contains wildcards: # and +. Also it can't contains U+0000(NULL) chars", topic);
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_NAME, msg);
return ctx.failedFuture(exception);
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
isDup,
qosLevel,
isRetain,
0
);
ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes());
variableHeader = new MqttPublishVariableHeader(topic, nextMessageId());
publish = MqttMessageFactory.newMessage(fixedHeader, variableHeader, buf);
switch (qosLevel) {
case AT_LEAST_ONCE:
qos1outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubackTimeout, variableHeader.packetId()));
countInflightQueue++;
break;
case EXACTLY_ONCE:
qos2outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubrecTimeout, variableHeader.packetId()));
countInflightQueue++;
break;
default:
break;
}
}
return this.write(publish).map(variableHeader.packetId());
}
@Override
public MqttClient publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, Handler<AsyncResult<Integer>> publishSentHandler) {
Future<Integer> fut = publish(topic, payload, qosLevel, isDup, isRetain);
if (publishSentHandler != null) {
fut.onComplete(publishSentHandler);
}
return this;
}
@Override
public MqttClient publishCompletionHandler(Handler<Integer> publishCompletionHandler) {
this.publishCompletionHandler = publishCompletionHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionHandler() {
return this.publishCompletionHandler;
}
@Override
public MqttClient publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler) {
this.publishCompletionExpirationHandler = publishCompletionExpirationHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionExpirationHandler() {
return this.publishCompletionExpirationHandler;
}
@Override
public MqttClient publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionPhantomHandler) {
this.publishCompletionPhantomHandler = publishCompletionPhantomHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionUnknownPacketIdHandler() {
return this.publishCompletionPhantomHandler;
}
@Override
public MqttClient publishHandler(Handler<MqttPublishMessage> publishHandler) {
this.publishHandler = publishHandler;
return this;
}
private synchronized Handler<MqttPublishMessage> publishHandler() {
return this.publishHandler;
}
@Override
public MqttClient subscribeCompletionHandler(Handler<MqttSubAckMessage> subscribeCompletionHandler) {
this.subscribeCompletionHandler = subscribeCompletionHandler;
return this;
}
private synchronized Handler<MqttSubAckMessage> subscribeCompletionHandler() {
return this.subscribeCompletionHandler;
}
@Override
public Future<Integer> subscribe(String topic, int qos) {
return subscribe(Collections.singletonMap(topic, qos));
}
@Override
public MqttClient subscribe(String topic, int qos, Handler<AsyncResult<Integer>> subscribeSentHandler) {
return subscribe(Collections.singletonMap(topic, qos), subscribeSentHandler);
}
@Override
public Future<Integer> subscribe(Map<String, Integer> topics) {
Map<String, Integer> invalidTopics = topics.entrySet()
.stream()
.filter(e -> !isValidTopicFilter(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (invalidTopics.size() > 0) {
String msg = String.format("Invalid Topic Filters: %s", invalidTopics);
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_FILTER, msg);
return ctx.failedFuture(exception);
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
.map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
.collect(Collectors.toList());
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
return this.write(subscribe).map(variableHeader.messageId());
}
@Override
public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Integer>> subscribeSentHandler) {
Future<Integer> fut = subscribe(topics);
if (subscribeSentHandler != null) {
fut.onComplete(subscribeSentHandler);
}
return this;
}
@Override
public MqttClient unsubscribeCompletionHandler(Handler<Integer> unsubscribeCompletionHandler) {
this.unsubscribeCompletionHandler = unsubscribeCompletionHandler;
return this;
}
private synchronized Handler<Integer> unsubscribeCompletionHandler() {
return this.unsubscribeCompletionHandler;
}
@Override
public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {
Future<Integer> fut = unsubscribe(topic);
if (unsubscribeSentHandler != null) {
fut.onComplete(unsubscribeSentHandler);
}
return this;
}
@Override
public Future<Integer> unsubscribe(String topic) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));
io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(unsubscribe);
return ctx.succeededFuture(variableHeader.messageId());
}
@Override
public synchronized MqttClient pingResponseHandler(Handler<Void> pingResponseHandler) {
this.pingrespHandler = pingResponseHandler;
return this;
}
private synchronized Handler<Void> pingResponseHandler() {
return this.pingrespHandler;
}
@Override
public synchronized MqttClient exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
private synchronized Handler<Throwable> exceptionHandler() {
return this.exceptionHandler;
}
@Override
public synchronized MqttClient closeHandler(Handler<Void> closeHandler) {
this.closeHandler = closeHandler;
return this;
}
private synchronized Handler<Void> closeHandler() {
return this.closeHandler;
}
private class Ping {
final long id;
private Ping(long id) {
this.id = id;
}
void ack() {
vertx.cancelTimer(id);
}
void cancel() {
vertx.cancelTimer(id);
}
}
@Override
public MqttClient ping() {
if (Vertx.currentContext() == ctx) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null);
long id = vertx.setTimer(keepAliveTimeout, _id -> {
disconnect();
});
pings.add(new Ping(id));
this.write(pingreq);
} else {
ctx.runOnContext(v -> ping());
}
return this;
}
@Override
public synchronized String clientId() {
return this.options.getClientId();
}
@Override
public synchronized boolean isConnected() {
return this.status == Status.CONNECTED;
}
private void publishAcknowledge(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(puback);
}
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
private void publishComplete(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(pubcomp);
}
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, new ExpiringPacket(this::handlePubcompTimeout, publishMessageId));
}
this.write(pubrel);
}
private void initChannel(NetSocketInternal sock) {
ChannelPipeline pipeline = sock.channelHandlerContext().pipeline();
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveInterval() != 0) {
int keepAliveInterval = this.options.getKeepAliveInterval();
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, 0, keepAliveInterval) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.ALL_IDLE) {
ping();
}
}
});
}
}
private synchronized int nextMessageId() {
this.messageIdCounter = ((this.messageIdCounter % MAX_MESSAGE_ID) != 0) ? this.messageIdCounter + 1 : 1;
return this.messageIdCounter;
}
private synchronized NetSocketInternal connection() {
return connection;
}
private Future<Void> write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) {
log.debug(String.format("Sending packet %s", mqttMessage));
return this.connection().writeMessage(mqttMessage);
}
private void handleClosed() {
Promise<MqttConnAckMessage> connectPromise;
Promise<Void> disconnectPromise;
NetClient client;
Deque<Ping> pings;
synchronized (this) {
client = this.client;
connectPromise = this.connectPromise;
disconnectPromise = this.disconnectPromise;
pings = this.pings;
this.disconnectPromise = null;
this.status = Status.CLOSED;
this.connection = null;
this.ctx = null;
this.client = null;
this.pings = new ArrayDeque<>();
}
pings.forEach(ping -> {
ping.cancel();
});
Handler<Void> handler = closeHandler();
if (handler != null) {
handler.handle(null);
}
disconnectPromise.complete();
if (connectPromise != null) {
connectPromise.fail("Closed");
}
client.close();
}
private void handleMessage(ChannelHandlerContext chctx, Object msg) {
if (msg instanceof io.netty.handler.codec.mqtt.MqttMessage) {
io.netty.handler.codec.mqtt.MqttMessage mqttMessage = (io.netty.handler.codec.mqtt.MqttMessage) msg;
DecoderResult result = mqttMessage.decoderResult();
if (result.isFailure()) {
chctx.pipeline().fireExceptionCaught(result.cause());
return;
}
if (!result.isFinished()) {
chctx.pipeline().fireExceptionCaught(new Exception("Unfinished message"));
return;
}
log.debug(String.format("Incoming packet %s", msg));
switch (mqttMessage.fixedHeader().messageType()) {
case CONNACK:
io.netty.handler.codec.mqtt.MqttConnAckMessage connack = (io.netty.handler.codec.mqtt.MqttConnAckMessage) mqttMessage;
MqttConnAckMessage mqttConnAckMessage = MqttConnAckMessage.create(
connack.variableHeader().connectReturnCode(),
connack.variableHeader().isSessionPresent());
handleConnack(mqttConnAckMessage);
break;
case PUBLISH:
io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload(), chctx.alloc());
MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
publish.variableHeader().packetId(),
publish.fixedHeader().qosLevel(),
publish.fixedHeader().isDup(),
publish.fixedHeader().isRetain(),
publish.variableHeader().topicName(),
newBuf);
handlePublish(mqttPublishMessage);
break;
case PUBACK:
handlePuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREC:
handlePubrec(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREL:
handlePubrel(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBCOMP:
handlePubcomp(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case SUBACK:
io.netty.handler.codec.mqtt.MqttSubAckMessage unsuback = (io.netty.handler.codec.mqtt.MqttSubAckMessage) mqttMessage;
MqttSubAckMessage mqttSubAckMessage = MqttSubAckMessage.create(
unsuback.variableHeader().messageId(),
unsuback.payload().grantedQoSLevels());
handleSuback(mqttSubAckMessage);
break;
case UNSUBACK:
handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PINGRESP:
handlePingresp();
break;
default:
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type " + msg.getClass().getName()));
break;
}
} else {
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type"));
}
}
private void handlePingresp() {
Ping ping = pings.poll();
if (ping != null) {
ping.ack();
}
Handler<Void> handler = pingResponseHandler();
if (handler != null) {
handler.handle(null);
}
}
private void handleUnsuback(int unsubackMessageId) {
Handler<Integer> handler = unsubscribeCompletionHandler();
if (handler != null) {
handler.handle(unsubackMessageId);
}
}
private void handlePuback(int pubackMessageId) {
synchronized (this) {
ExpiringPacket removedPacket = qos1outbound.remove(pubackMessageId);
if (removedPacket == null) {
log.debug("Received PUBACK packet without having related PUBLISH packet in storage");
Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
if (handler != null) {
handler.handle(pubackMessageId);
}
return;
}
removedPacket.cancelTimer();
countInflightQueue--;
}
Handler<Integer> handler = publishCompletionHandler();
if (handler != null) {
handler.handle(pubackMessageId);
}
}
private void handlePubackTimeout(int packetId) {
ExpiringPacket expiredMessage;
synchronized (this) {
expiredMessage = qos1outbound.remove(packetId);
if (expiredMessage == null) {
log.debug("PUBLISH expiration timer fired but QoS 1 message has already been PUBACKed by server");
return;
}
}
countInflightQueue--;
Handler<Integer> handler = publishCompletionExpirationHandler();
if (handler != null) {
handler.handle(expiredMessage.packetId);
}
}
private void handlePubcomp(int pubcompMessageId) {
synchronized (this) {
ExpiringPacket removedPacket = qos2outbound.remove(pubcompMessageId);
if (removedPacket == null) {
log.debug("Received PUBCOMP packet without having related PUBREL packet in storage");
Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
if (handler != null) {
handler.handle(pubcompMessageId);
}
return;
}
removedPacket.cancelTimer();
countInflightQueue--;
}
Handler<Integer> handler = publishCompletionHandler();
if (handler != null) {
handler.handle(pubcompMessageId);
}
}
private void handlePubcompTimeout(int packetId) {
ExpiringPacket expiredMessage;
synchronized (this) {
expiredMessage = qos2outbound.remove(packetId);
if (expiredMessage == null) {
log.debug("PUBCOMP expiration timer fired but QoS 2 message has already been PUBCOMPed by server");
return;
}
}
countInflightQueue--;
Handler<Integer> handler = publishCompletionExpirationHandler();
if (handler != null) {
handler.handle(expiredMessage.packetId);
}
}
private void handlePubrec(int pubrecMessageId) {
synchronized (this) {
ExpiringPacket removedPacket = qos2outbound.remove(pubrecMessageId);
if (removedPacket == null) {
log.debug("Received PUBREC packet without having related PUBLISH packet in storage");
Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
if (handler != null) {
handler.handle(pubrecMessageId);
}
return;
}
removedPacket.cancelTimer();
}
this.publishRelease(pubrecMessageId);
}
private void handlePubrecTimeout(int packetId) {
ExpiringPacket expiredMessage;
synchronized (this) {
expiredMessage = qos2outbound.remove(packetId);
if (expiredMessage == null) {
log.debug("PUBREC expiration timer fired but QoS 2 message has already been PUBRECed by server");
return;
}
}
countInflightQueue--;
Handler<Integer> handler = publishCompletionExpirationHandler();
if (handler != null) {
handler.handle(expiredMessage.packetId);
}
}
private void handleSuback(MqttSubAckMessage msg) {
Handler<MqttSubAckMessage> handler = subscribeCompletionHandler();
if (handler != null) {
handler.handle(msg);
}
}
private void handlePublish(MqttPublishMessage msg) {
Handler<MqttPublishMessage> handler;
switch (msg.qosLevel()) {
case AT_MOST_ONCE:
handler = this.publishHandler();
if (handler != null) {
handler.handle(msg);
}
break;
case AT_LEAST_ONCE:
this.publishAcknowledge(msg.messageId());
handler = this.publishHandler();
if (handler != null) {
handler.handle(msg);
}
break;
case EXACTLY_ONCE:
this.publishReceived(msg);
break;
}
}
private void handlePubrel(int pubrelMessageId) {
MqttMessage message;
synchronized (this) {
message = qos2inbound.remove(pubrelMessageId);
if (message == null) {
log.warn("Received PUBREL packet without having related PUBREC packet in storage");
return;
}
this.publishComplete(pubrelMessageId);
}
Handler<MqttPublishMessage> handler = this.publishHandler();
if (handler != null) {
handler.handle((MqttPublishMessage) message);
}
}
private void handleConnack(MqttConnAckMessage msg) {
Status status = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED ? Status.CONNECTED : Status.CLOSING;
if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
NetSocketInternal connection;
Promise<MqttConnAckMessage> connectPromise;
synchronized (this) {
connection = this.connection;
connectPromise = this.connectPromise;
this.connectPromise = null;
this.status = Status.CONNECTED;
}
connection.closeHandler(v -> handleClosed());
connectPromise.complete(msg);
} else {
Promise<MqttConnAckMessage> connectPromise;
Promise<Void> disconnectPromise;
NetSocketInternal connection;
NetClient client;
synchronized (this) {
connectPromise = this.connectPromise;
disconnectPromise = this.disconnectPromise;
connection = this.connection;
client = this.client;
this.connectPromise = null;
this.disconnectPromise = null;
this.status = Status.CLOSED;
this.connection = null;
this.client = null;
}
connection.closeHandler(null);
MqttConnectionException exception = new MqttConnectionException(msg.code());
log.error(String.format("Connection refused by the server - code: %s", msg.code()));
connectPromise.fail(exception);
disconnectPromise.complete();
client.close();
}
}
private void handleException(Throwable t) {
Handler<Throwable> handler = exceptionHandler();
if (handler != null) {
handler.handle(t);
}
}
private String generateRandomClientId() {
return UUID.randomUUID().toString();
}
private boolean isValidTopicName(String topicName) {
if(!isValidStringSizeInUTF8(topicName)){
return false;
}
Matcher matcher = validTopicNamePattern.matcher(topicName);
return matcher.find();
}
private boolean isValidTopicFilter(String topicFilter) {
if(!isValidStringSizeInUTF8(topicFilter)){
return false;
}
Matcher matcher = validTopicFilterPattern.matcher(topicFilter);
return matcher.find();
}
private boolean isValidStringSizeInUTF8(String string) {
try {
int length = string.getBytes("UTF-8").length;
return length >= MIN_TOPIC_LEN && length <= MAX_TOPIC_LEN;
} catch (UnsupportedEncodingException e) {
log.error("UTF-8 charset is not supported", e);
}
return false;
}
private class ExpiringPacket {
private final int packetId;
private final long timerId;
ExpiringPacket(Handler<Integer> timeoutHandler, final int packetId) {
this.packetId = packetId;
if (options.getAckTimeout() > -1) {
this.timerId = vertx.setTimer(options.getAckTimeout() * 1000L, tid -> timeoutHandler.handle(packetId));
} else {
this.timerId = -1;
}
}
boolean cancelTimer() {
return vertx.cancelTimer(timerId);
}
}
}