package io.vertx.mqtt.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.NetSocketInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
public class MqttClientImpl implements MqttClient {
private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$");
private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$");
private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class);
private static final int MAX_MESSAGE_ID = 65535;
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;
private final MqttClientOptions options;
private final NetClient client;
private NetSocketInternal connection;
private Context ctx;
private Handler<Integer> publishCompletionHandler;
private Handler<Integer> unsubscribeCompletionHandler;
private Handler<MqttPublishMessage> publishHandler;
private Handler<MqttSubAckMessage> subscribeCompletionHandler;
private Handler<AsyncResult<MqttConnAckMessage>> connectHandler;
private Handler<Void> pingrespHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private HashMap<Integer, io.netty.handler.codec.mqtt.MqttMessage> qos1outbound = new HashMap<>();
private HashMap<Integer, io.netty.handler.codec.mqtt.MqttMessage> qos2outbound = new HashMap<>();
private HashMap<Integer, MqttMessage> qos2inbound = new HashMap<>();
private int messageIdCounter;
private int countInflightQueue;
private boolean isConnected;
public MqttClientImpl(Vertx vertx, MqttClientOptions options) {
NetClientOptions netClientOptions = new NetClientOptions(options);
netClientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
this.client = vertx.createNetClient(netClientOptions);
this.options = options;
}
@Override
public MqttClient connect(int port, String host, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
this.doConnect(port, host, null, connectHandler);
return this;
}
@Override
public MqttClient connect(int port, String host, String serverName, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
this.doConnect(port, host, serverName, connectHandler);
return this;
}
private void doConnect(int port, String host, String serverName, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
log.debug(String.format("Trying to connect with %s:%d", host, port));
this.client.connect(port, host, serverName, done -> {
if (done.failed()) {
log.error(String.format("Can't connect to %s:%d", host, port), done.cause());
if (connectHandler != null) {
connectHandler.handle(Future.failedFuture(done.cause()));
}
} else {
log.info(String.format("Connection with %s:%d established successfully", host, port));
NetSocketInternal soi = (NetSocketInternal) done.result();
ChannelPipeline pipeline = soi.channelHandlerContext().pipeline();
this.connectHandler = connectHandler;
if (options.isAutoGeneratedClientId() && (options.getClientId() == null || options.getClientId().isEmpty())) {
options.setClientId(generateRandomClientId());
}
initChannel(pipeline);
this.connection = soi;
this.ctx = Vertx.currentContext();
soi.messageHandler(msg -> this.handleMessage(soi.channelHandlerContext(), msg));
soi.closeHandler(v -> handleClosed());
soi.exceptionHandler(this::handleException);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
AT_MOST_ONCE,
false,
0);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes() : null
);
io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(connect);
}
});
}
@Override
public MqttClient disconnect() {
return disconnect(null);
}
@Override
public MqttClient disconnect(Handler<AsyncResult<Void>> disconnectHandler) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.DISCONNECT,
false,
AT_MOST_ONCE,
false,
0
);
io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null);
this.write(disconnect);
if (disconnectHandler != null) {
disconnectHandler.handle(Future.succeededFuture());
}
connection().close();
return this;
}
@Override
public MqttClient publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
return publish(topic, payload, qosLevel, isDup, isRetain, null);
}
@Override
public MqttClient publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, Handler<AsyncResult<Integer>> publishSentHandler) {
io.netty.handler.codec.mqtt.MqttMessage publish;
MqttPublishVariableHeader variableHeader;
synchronized (this) {
if (countInflightQueue >= options.getMaxInflightQueue()) {
String msg = String.format("Attempt to exceed the limit of %d inflight messages", options.getMaxInflightQueue());
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INFLIGHT_QUEUE_FULL, msg);
if (publishSentHandler != null) {
ctx.runOnContext(v -> publishSentHandler.handle(Future.failedFuture(exception)));
}
return this;
}
if (!isValidTopicName(topic)) {
String msg = String.format("Invalid Topic Name - %s. It mustn't contains wildcards: # and +. Also it can't contains U+0000(NULL) chars", topic);
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_NAME, msg);
if (publishSentHandler != null) {
ctx.runOnContext(v -> publishSentHandler.handle(Future.failedFuture(exception)));
}
return this;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
isDup,
qosLevel,
isRetain,
0
);
ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes());
variableHeader = new MqttPublishVariableHeader(topic, nextMessageId());
publish = MqttMessageFactory.newMessage(fixedHeader, variableHeader, buf);
switch (qosLevel) {
case AT_LEAST_ONCE:
qos1outbound.put(variableHeader.packetId(), publish);
countInflightQueue++;
break;
case EXACTLY_ONCE:
qos2outbound.put(variableHeader.packetId(), publish);
countInflightQueue++;
break;
}
}
this.write(publish);
if (publishSentHandler != null) {
publishSentHandler.handle(Future.succeededFuture(variableHeader.packetId()));
}
return this;
}
@Override
public MqttClient publishCompletionHandler(Handler<Integer> publishCompletionHandler) {
this.publishCompletionHandler = publishCompletionHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionHandler() {
return this.publishCompletionHandler;
}
@Override
public MqttClient publishHandler(Handler<MqttPublishMessage> publishHandler) {
this.publishHandler = publishHandler;
return this;
}
private synchronized Handler<MqttPublishMessage> publishHandler() {
return this.publishHandler;
}
@Override
public MqttClient subscribeCompletionHandler(Handler<MqttSubAckMessage> subscribeCompletionHandler) {
this.subscribeCompletionHandler = subscribeCompletionHandler;
return this;
}
private synchronized Handler<MqttSubAckMessage> subscribeCompletionHandler() {
return this.subscribeCompletionHandler;
}
@Override
public MqttClient subscribe(String topic, int qos) {
return subscribe(topic, qos, null);
}
@Override
public MqttClient subscribe(String topic, int qos, Handler<AsyncResult<Integer>> subscribeSentHandler) {
return subscribe(Collections.singletonMap(topic, qos), subscribeSentHandler);
}
@Override
public MqttClient subscribe(Map<String, Integer> topics) {
return subscribe(topics, null);
}
@Override
public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Integer>> subscribeSentHandler) {
Map<String, Integer> invalidTopics = topics.entrySet()
.stream()
.filter(e -> !isValidTopicFilter(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (invalidTopics.size() > 0) {
String msg = String.format("Invalid Topic Filters: %s", invalidTopics);
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_FILTER, msg);
if (subscribeSentHandler != null) {
subscribeSentHandler.handle(Future.failedFuture(exception));
}
return this;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
.map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
.collect(Collectors.toList());
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(subscribe);
if (subscribeSentHandler != null) {
subscribeSentHandler.handle(Future.succeededFuture(variableHeader.messageId()));
}
return this;
}
@Override
public MqttClient unsubscribeCompletionHandler(Handler<Integer> unsubscribeCompletionHandler) {
this.unsubscribeCompletionHandler = unsubscribeCompletionHandler;
return this;
}
private synchronized Handler<Integer> unsubscribeCompletionHandler() {
return this.unsubscribeCompletionHandler;
}
@Override
public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));
io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(unsubscribe);
if (unsubscribeSentHandler != null) {
unsubscribeSentHandler.handle(Future.succeededFuture(variableHeader.messageId()));
}
return this;
}
private synchronized Handler<AsyncResult<MqttConnAckMessage>> connectHandler() {
return this.connectHandler;
}
@Override
public MqttClient unsubscribe(String topic) {
return this.unsubscribe(topic, null);
}
@Override
public synchronized MqttClient pingResponseHandler(Handler<Void> pingResponseHandler) {
this.pingrespHandler = pingResponseHandler;
return this;
}
private synchronized Handler<Void> pingResponseHandler() {
return this.pingrespHandler;
}
@Override
public synchronized MqttClient exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
private synchronized Handler<Throwable> exceptionHandler() {
return this.exceptionHandler;
}
@Override
public synchronized MqttClient closeHandler(Handler<Void> closeHandler) {
this.closeHandler = closeHandler;
return this;
}
private synchronized Handler<Void> closeHandler() {
return this.closeHandler;
}
@Override
public MqttClient ping() {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null);
this.write(pingreq);
return this;
}
@Override
public synchronized String clientId() {
return this.options.getClientId();
}
@Override
public synchronized boolean isConnected() {
return this.isConnected;
}
private void publishAcknowledge(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(puback);
}
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
private void publishComplete(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(pubcomp);
}
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, pubrel);
}
this.write(pubrel);
}
private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveTimeSeconds() != 0) {
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
ping();
}
}
}
});
}
}
private synchronized int nextMessageId() {
this.messageIdCounter = ((this.messageIdCounter % MAX_MESSAGE_ID) != 0) ? this.messageIdCounter + 1 : 1;
return this.messageIdCounter;
}
private synchronized NetSocketInternal connection() {
return connection;
}
void write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) {
log.debug(String.format("Sending packet %s", mqttMessage));
this.connection().writeMessage(mqttMessage);
}
private void handleClosed() {
synchronized (this) {
boolean isConnected = this.isConnected;
this.isConnected = false;
if (!isConnected) {
return;
}
}
Handler<Void> handler = closeHandler();
if (handler != null) {
handler.handle(null);
}
}
private void handleMessage(ChannelHandlerContext chctx, Object msg) {
if (msg instanceof io.netty.handler.codec.mqtt.MqttMessage) {
io.netty.handler.codec.mqtt.MqttMessage mqttMessage = (io.netty.handler.codec.mqtt.MqttMessage) msg;
DecoderResult result = mqttMessage.decoderResult();
if (result.isFailure()) {
chctx.pipeline().fireExceptionCaught(result.cause());
return;
}
if (!result.isFinished()) {
chctx.pipeline().fireExceptionCaught(new Exception("Unfinished message"));
return;
}
log.debug(String.format("Incoming packet %s", msg));
switch (mqttMessage.fixedHeader().messageType()) {
case CONNACK:
io.netty.handler.codec.mqtt.MqttConnAckMessage connack = (io.netty.handler.codec.mqtt.MqttConnAckMessage) mqttMessage;
MqttConnAckMessage mqttConnAckMessage = MqttConnAckMessage.create(
connack.variableHeader().connectReturnCode(),
connack.variableHeader().isSessionPresent());
handleConnack(mqttConnAckMessage);
break;
case PUBLISH:
io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload(), chctx.alloc());
MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
publish.variableHeader().packetId(),
publish.fixedHeader().qosLevel(),
publish.fixedHeader().isDup(),
publish.fixedHeader().isRetain(),
publish.variableHeader().topicName(),
newBuf);
handlePublish(mqttPublishMessage);
break;
case PUBACK:
handlePuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREC:
handlePubrec(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREL:
handlePubrel(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBCOMP:
handlePubcomp(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case SUBACK:
io.netty.handler.codec.mqtt.MqttSubAckMessage unsuback = (io.netty.handler.codec.mqtt.MqttSubAckMessage) mqttMessage;
MqttSubAckMessage mqttSubAckMessage = MqttSubAckMessage.create(
unsuback.variableHeader().messageId(),
unsuback.payload().grantedQoSLevels());
handleSuback(mqttSubAckMessage);
break;
case UNSUBACK:
handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PINGRESP:
handlePingresp();
break;
default:
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type " + msg.getClass().getName()));
break;
}
} else {
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type"));
}
}
private void handlePingresp() {
Handler<Void> handler = pingResponseHandler();
if (handler != null) {
handler.handle(null);
}
}
private void handleUnsuback(int unsubackMessageId) {
Handler<Integer> handler = unsubscribeCompletionHandler();
if (handler != null) {
handler.handle(unsubackMessageId);
}
}
private void handlePuback(int pubackMessageId) {
synchronized (this) {
io.netty.handler.codec.mqtt.MqttMessage removedPacket = qos1outbound.remove(pubackMessageId);
if (removedPacket == null) {
log.warn("Received PUBACK packet without having related PUBLISH packet in storage");
return;
}
countInflightQueue--;
}
Handler<Integer> handler = publishCompletionHandler();
if (handler != null) {
handler.handle(pubackMessageId);
}
}
private void handlePubcomp(int pubcompMessageId) {
synchronized (this) {
io.netty.handler.codec.mqtt.MqttMessage removedPacket = qos2outbound.remove(pubcompMessageId);
if (removedPacket == null) {
log.warn("Received PUBCOMP packet without having related PUBREL packet in storage");
return;
}
countInflightQueue--;
}
Handler<Integer> handler = publishCompletionHandler();
if (handler != null) {
handler.handle(pubcompMessageId);
}
}
private void handlePubrec(int pubrecMessageId) {
this.publishRelease(pubrecMessageId);
}
private void handleSuback(MqttSubAckMessage msg) {
Handler<MqttSubAckMessage> handler = subscribeCompletionHandler();
if (handler != null) {
handler.handle(msg);
}
}
private void handlePublish(MqttPublishMessage msg) {
Handler<MqttPublishMessage> handler;
switch (msg.qosLevel()) {
case AT_MOST_ONCE:
handler = this.publishHandler();
if (handler != null) {
handler.handle(msg);
}
break;
case AT_LEAST_ONCE:
this.publishAcknowledge(msg.messageId());
handler = this.publishHandler();
if (handler != null) {
handler.handle(msg);
}
break;
case EXACTLY_ONCE:
this.publishReceived(msg);
break;
}
}
private void handlePubrel(int pubrelMessageId) {
MqttMessage message;
synchronized (this) {
message = qos2inbound.remove(pubrelMessageId);
if (message == null) {
log.warn("Received PUBREL packet without having related PUBREC packet in storage");
return;
}
this.publishComplete(pubrelMessageId);
}
Handler<MqttPublishMessage> handler = this.publishHandler();
if (handler != null) {
handler.handle((MqttPublishMessage) message);
}
}
private void handleConnack(MqttConnAckMessage msg) {
synchronized (this) {
this.isConnected = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
}
Handler<AsyncResult<MqttConnAckMessage>> handler = connectHandler();
if (handler != null) {
if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
handler.handle(Future.succeededFuture(msg));
} else {
MqttConnectionException exception = new MqttConnectionException(msg.code());
log.error(String.format("Connection refused by the server - code: %s", msg.code()));
handler.handle(Future.failedFuture(exception));
}
}
}
private void handleException(Throwable t) {
Handler<Throwable> handler = exceptionHandler();
if (handler != null) {
handler.handle(t);
}
}
private String generateRandomClientId() {
return UUID.randomUUID().toString();
}
private boolean isValidTopicName(String topicName) {
if(!isValidStringSizeInUTF8(topicName)){
return false;
}
Matcher matcher = validTopicNamePattern.matcher(topicName);
return matcher.find();
}
private boolean isValidTopicFilter(String topicFilter) {
if(!isValidStringSizeInUTF8(topicFilter)){
return false;
}
Matcher matcher = validTopicFilterPattern.matcher(topicFilter);
return matcher.find();
}
private boolean isValidStringSizeInUTF8(String string) {
try {
int length = string.getBytes("UTF-8").length;
return length >= MIN_TOPIC_LEN && length <= MAX_TOPIC_LEN;
} catch (UnsupportedEncodingException e) {
log.error("UTF-8 charset is not supported", e);
}
return false;
}
}