package io.vertx.ext.stomp.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.core.shareddata.Lock;
import io.vertx.ext.auth.AuthProvider;
import io.vertx.ext.auth.User;
import io.vertx.ext.stomp.Acknowledgement;
import io.vertx.ext.stomp.BridgeOptions;
import io.vertx.ext.stomp.DefaultAbortHandler;
import io.vertx.ext.stomp.DefaultAckHandler;
import io.vertx.ext.stomp.DefaultBeginHandler;
import io.vertx.ext.stomp.DefaultCommitHandler;
import io.vertx.ext.stomp.DefaultConnectHandler;
import io.vertx.ext.stomp.DefaultNackHandler;
import io.vertx.ext.stomp.DefaultSendHandler;
import io.vertx.ext.stomp.DefaultSubscribeHandler;
import io.vertx.ext.stomp.DefaultUnsubscribeHandler;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.DestinationFactory;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.Frames;
import io.vertx.ext.stomp.ServerFrame;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.StompServerHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultStompHandler implements StompServerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStompHandler.class);
private final Vertx vertx;
private final Context context;
private Handler<ServerFrame> connectHandler = new DefaultConnectHandler();
private Handler<ServerFrame> stompHandler;
private Handler<ServerFrame> sendHandler = new DefaultSendHandler();
private Handler<ServerFrame> subscribeHandler = new DefaultSubscribeHandler();
private Handler<ServerFrame> unsubscribeHandler = new DefaultUnsubscribeHandler();
private Handler<StompServerConnection> closeHandler;
private Handler<ServerFrame> commitHandler = new DefaultCommitHandler();
private Handler<ServerFrame> abortHandler = new DefaultAbortHandler();
private Handler<ServerFrame> beginHandler = new DefaultBeginHandler();
private Handler<ServerFrame> ackHandler = new DefaultAckHandler();
private Handler<ServerFrame> nackHandler = new DefaultNackHandler();
private Handler<ServerFrame> disconnectHandler = (sf -> {
StompServerConnection connection = sf.connection();
Frames.handleReceipt(sf.frame(), connection);
connection.close();
});
private AuthProvider authProvider;
private Handler<StompServerConnection> pingHandler = StompServerConnection::ping;
private Handler<Acknowledgement> onAckHandler = (acknowledgement) -> LOGGER.info("Acknowledge messages - " +
acknowledgement.frames());
private Handler<Acknowledgement> onNackHandler = (acknowledgement) ->
LOGGER.warn("Messages not acknowledge - " + acknowledgement.frames());
private final LocalMap<Destination, String> destinations;
private final ConcurrentHashMap<String, User> users;
private DestinationFactory factory = Destination::topic;
private Handler<ServerFrame> receivedFrameHandler;
public DefaultStompHandler(Vertx vertx) {
this.vertx = vertx;
this.context = Vertx.currentContext();
this.destinations = vertx.sharedData().getLocalMap("stomp.destinations");
this.users = new ConcurrentHashMap<>();
}
@Override
public synchronized void onClose(StompServerConnection connection) {
getDestinations().stream().forEach((d) -> d.unsubscribeConnection(connection));
Transactions.instance().unregisterTransactionsFromConnection(connection);
this.users.remove(connection.session());
if (closeHandler != null) {
closeHandler.handle(connection);
}
}
@Override
public synchronized StompServerHandler receivedFrameHandler(Handler<ServerFrame> handler) {
this.receivedFrameHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler connectHandler(Handler<ServerFrame> handler) {
this.connectHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler stompHandler(Handler<ServerFrame> handler) {
this.stompHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler subscribeHandler(Handler<ServerFrame> handler) {
this.subscribeHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler unsubscribeHandler(Handler<ServerFrame> handler) {
this.unsubscribeHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler sendHandler(Handler<ServerFrame> handler) {
this.sendHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler closeHandler(Handler<StompServerConnection> handler) {
this.closeHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler commitHandler(Handler<ServerFrame> handler) {
this.commitHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler abortHandler(Handler<ServerFrame> handler) {
this.abortHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler beginHandler(Handler<ServerFrame> handler) {
this.beginHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler disconnectHandler(Handler<ServerFrame> handler) {
this.disconnectHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler ackHandler(Handler<ServerFrame> handler) {
this.ackHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler nackHandler(Handler<ServerFrame> handler) {
this.nackHandler = handler;
return this;
}
@Override
public void handle(ServerFrame serverFrame) {
Frame frame = serverFrame.frame();
StompServerConnection connection = serverFrame.connection();
connection.onServerActivity();
synchronized (this) {
if (receivedFrameHandler != null) {
receivedFrameHandler.handle(serverFrame);
}
}
switch (frame.getCommand()) {
case CONNECT:
handleConnect(frame, connection);
break;
case STOMP:
handleStomp(frame, connection);
break;
case SEND:
handleSend(frame, connection);
break;
case SUBSCRIBE:
handleSubscribe(frame, connection);
break;
case UNSUBSCRIBE:
handleUnsubscribe(frame, connection);
break;
case BEGIN:
handleBegin(frame, connection);
break;
case ABORT:
handleAbort(frame, connection);
break;
case COMMIT:
handleCommit(frame, connection);
break;
case ACK:
handleAck(frame, connection);
break;
case NACK:
handleNack(frame, connection);
break;
case DISCONNECT:
handleDisconnect(frame, connection);
break;
case PING:
break;
default:
break;
}
}
private void handleAck(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = ackHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleNack(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = nackHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleBegin(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = beginHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleAbort(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = abortHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleCommit(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = commitHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleSubscribe(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = subscribeHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleUnsubscribe(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = unsubscribeHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleSend(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = sendHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleConnect(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
Handler<StompServerConnection> pingH;
synchronized (this) {
handler = connectHandler;
pingH = pingHandler;
}
long ping = Frame.Heartbeat.computePingPeriod(
Frame.Heartbeat.create(connection.server().options().getHeartbeat()),
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));
long pong = Frame.Heartbeat.computePongPeriod(
Frame.Heartbeat.create(connection.server().options().getHeartbeat()),
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));
connection.configureHeartbeat(ping, pong, pingH);
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleDisconnect(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = disconnectHandler;
}
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, connection));
}
}
private void handleStomp(Frame frame, StompServerConnection connection) {
Handler<ServerFrame> handler;
synchronized (this) {
handler = stompHandler;
}
if (handler == null) {
handleConnect(frame, connection);
return;
}
handler.handle(new ServerFrameImpl(frame, connection));
}
@Override
public synchronized StompServerHandler authProvider(AuthProvider handler) {
this.authProvider = handler;
return this;
}
@Override
public StompServerHandler onAuthenticationRequest(StompServerConnection connection,
String login, String passcode,
Handler<AsyncResult<Boolean>> handler) {
final AuthProvider auth;
synchronized (this) {
auth = authProvider;
}
final StompServer server = connection.server();
if (!server.options().isSecured()) {
if (auth != null) {
LOGGER.warn("Authentication handler set while the server is not secured");
}
context.runOnContext(v -> handler.handle(Future.succeededFuture(true)));
return this;
}
if (server.options().isSecured() && auth == null) {
LOGGER.error("Cannot authenticate connection - no authentication provider");
context.runOnContext(v -> handler.handle(Future.succeededFuture(false)));
return this;
}
context.runOnContext(v ->
auth.authenticate(new JsonObject().put("username", login).put("password", passcode), ar -> {
if (ar.succeeded()) {
users.put(connection.session(), ar.result());
context.runOnContext(v2 -> handler.handle(Future.succeededFuture(true)));
} else {
context.runOnContext(v2 -> handler.handle(Future.succeededFuture(false)));
}
}));
return this;
}
@Override
public User getUserBySession(String session) {
return this.users.get(session);
}
@Override
public List<Destination> getDestinations() {
return new ArrayList<>(destinations.keySet());
}
public Destination getDestination(String destination) {
for (Destination d : destinations.keySet()) {
if (d.matches(destination)) {
return d;
}
}
return null;
}
public Destination getOrCreateDestination(String destination) {
DestinationFactory destinationFactory;
synchronized (this) {
destinationFactory = this.factory;
}
synchronized (vertx) {
Destination d = getDestination(destination);
if (d == null) {
d = destinationFactory.create(vertx, destination);
if (d != null) {
destinations.put(d, "");
}
}
return d;
}
}
@Override
public synchronized StompServerHandler destinationFactory(DestinationFactory factory) {
this.factory = factory;
return this;
}
@Override
public synchronized StompServerHandler bridge(BridgeOptions options) {
destinations.put(Destination.bridge(vertx, options), "");
return this;
}
@Override
public StompServerHandler onAck(StompServerConnection connection, Frame subscription, List<Frame> messages) {
Handler<Acknowledgement> handler;
synchronized (this) {
handler = onAckHandler;
}
if (handler != null) {
handler.handle(new AcknowledgementImpl(subscription, messages));
}
return this;
}
@Override
public StompServerHandler onNack(StompServerConnection connection, Frame subscribe, List<Frame> messages) {
Handler<Acknowledgement> handler;
synchronized (this) {
handler = onNackHandler;
}
if (handler != null) {
handler.handle(new AcknowledgementImpl(subscribe, messages));
}
return this;
}
@Override
public synchronized StompServerHandler onAckHandler(Handler<Acknowledgement> handler) {
this.onAckHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler onNackHandler(Handler<Acknowledgement> handler) {
this.onNackHandler = handler;
return this;
}
@Override
public synchronized StompServerHandler pingHandler(Handler<StompServerConnection> handler) {
this.pingHandler = handler;
return this;
}
}