package io.vertx.mqtt.impl;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
public class MqttServerImpl implements MqttServer {
private static final Logger log = LoggerFactory.getLogger(MqttServerImpl.class);
private final VertxInternal vertx;
private final NetServer server;
private Handler<MqttEndpoint> endpointHandler;
private Handler<Throwable> exceptionHandler;
private MqttServerOptions options;
public MqttServerImpl(Vertx vertx, MqttServerOptions options) {
this.vertx = (VertxInternal) vertx;
this.server = vertx.createNetServer(options);
this.options = options;
}
@Override
public Future<MqttServer> listen() {
return listen(this.options.getPort());
}
@Override
public Future<MqttServer> listen(int port, String host) {
Handler<MqttEndpoint> h1 = endpointHandler;
Handler<Throwable> h2 = exceptionHandler;
if (h1 == null) {
return vertx.getOrCreateContext().failedFuture(new IllegalStateException("Please set handler before server is listening"));
}
server.connectHandler(so -> {
NetSocketInternal soi = (NetSocketInternal) so;
ChannelPipeline pipeline = soi.channelHandlerContext().pipeline();
initChannel(pipeline);
MqttServerConnection conn = new MqttServerConnection(soi, h1, h2, options);
soi.messageHandler(msg -> {
synchronized (conn) {
conn.handleMessage(msg);
}
});
});
return server.listen(port, host).map(this);
}
@Override
public Future<MqttServer> listen(int port) {
return listen(port, this.options.getHost());
}
@Override
public MqttServer listen(int port, Handler<AsyncResult<MqttServer>> listenHandler) {
return listen(port, this.options.getHost(), listenHandler);
}
@Override
public MqttServer listen(Handler<AsyncResult<MqttServer>> listenHandler) {
return listen(this.options.getPort(), listenHandler);
}
@Override
public MqttServer listen(int port, String host, Handler<AsyncResult<MqttServer>> listenHandler) {
Future<MqttServer> fut = listen(port, host);
if (listenHandler != null) {
fut.onComplete(listenHandler);
}
return this;
}
@Override
public synchronized MqttServer endpointHandler(Handler<MqttEndpoint> handler) {
endpointHandler = handler;
return this;
}
@Override
public synchronized MqttServer exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public int actualPort() {
return server.actualPort();
}
@Override
public Future<Void> close() {
return server.close();
}
@Override
public void close(Handler<AsyncResult<Void>> completionHandler) {
server.close(completionHandler);
}
private void initChannel(ChannelPipeline pipeline) {
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.channel().close();
}
}
}
});
}
}