package io.vertx.ext.web.handler.sockjs.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.shareddata.LocalMap;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSHandlerOptions;
import io.vertx.ext.web.handler.sockjs.SockJSSocket;
class WebSocketTransport extends BaseTransport {
private static final Logger log = LoggerFactory.getLogger(WebSocketTransport.class);
WebSocketTransport(Vertx vertx,
Router router, LocalMap<String, SockJSSession> sessions,
SockJSHandlerOptions options,
Handler<SockJSSocket> sockHandler) {
super(vertx, sessions, options);
String wsRE = COMMON_PATH_ELEMENT_RE + "websocket";
router.getWithRegex(wsRE).handler(rc -> {
HttpServerRequest req = rc.request();
String connectionHeader = req.headers().get(io.vertx.core.http.HttpHeaders.CONNECTION);
if (connectionHeader == null || !connectionHeader.toLowerCase().contains("upgrade")) {
rc.response().setStatusCode(400);
rc.response().end("Can \"Upgrade\" only to \"WebSocket\".");
} else {
ServerWebSocket ws = rc.request().upgrade();
if (log.isTraceEnabled()) log.trace("WS, handler");
SockJSSession session = new SockJSSession(vertx, sessions, rc, options.getHeartbeatInterval(), sockHandler);
session.register(req, new WebSocketListener(ws, session));
}
});
router.getWithRegex(wsRE).handler(rc -> {
if (log.isTraceEnabled()) log.trace("WS, get: " + rc.request().uri());
rc.response().setStatusCode(400);
rc.response().end("Can \"Upgrade\" only to \"WebSocket\".");
});
router.routeWithRegex(wsRE).handler(rc -> {
if (log.isTraceEnabled()) log.trace("WS, all: " + rc.request().uri());
rc.response().putHeader("Allow", "GET").setStatusCode(405).end();
});
}
private static class WebSocketListener implements TransportListener {
final ServerWebSocket ws;
final SockJSSession session;
boolean closed;
WebSocketListener(ServerWebSocket ws, SockJSSession session) {
this.ws = ws;
this.session = session;
ws.textMessageHandler(this::handleMessages);
ws.closeHandler(v -> {
closed = true;
session.shutdown();
});
ws.exceptionHandler(t -> {
closed = true;
session.shutdown();
session.handleException(t);
});
}
private void handleMessages(String msgs) {
if (!session.isClosed()) {
if (msgs.equals("") || msgs.equals("[]")) {
} else if ((msgs.startsWith("[\"") && msgs.endsWith("\"]")) ||
(msgs.startsWith("\"") && msgs.endsWith("\""))) {
session.handleMessages(msgs);
} else {
close();
}
}
}
@Override
public void sendFrame(String body, Handler<AsyncResult<Void>> handler) {
if (log.isTraceEnabled()) log.trace("WS, sending frame");
if (!closed) {
ws.writeTextMessage(body, handler);
} else {
handler.handle(Future.failedFuture(ConnectionBase.CLOSED_EXCEPTION));
}
}
public void close() {
if (!closed) {
ws.close();
session.shutdown();
closed = true;
}
}
public void sessionClosed() {
session.writeClosed(this);
closed = true;
session.context().runOnContext(v -> ws.close());
}
}
}