package io.vertx.ext.web.handler.sockjs.impl;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.ext.auth.User;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.handler.sockjs.SockJSHandlerOptions;
import io.vertx.ext.web.handler.sockjs.SockJSSocket;
class RawWebSocketTransport {
private static class RawWSSockJSSocket extends SockJSSocketBase {
final ServerWebSocket ws;
MultiMap headers;
boolean closed;
RawWSSockJSSocket(Vertx vertx, Session webSession, User webUser, SockJSHandlerOptions options, ServerWebSocket ws) {
super(vertx, webSession, webUser, options);
this.ws = ws;
ws.closeHandler(v -> {
synchronized (RawWSSockJSSocket.this) {
closed = true;
}
RawWSSockJSSocket.super.close();
});
}
public SockJSSocket handler(Handler<Buffer> handler) {
ws.binaryMessageHandler(handler);
ws.textMessageHandler(textMessage -> handler.handle(Buffer.buffer(textMessage)));
return this;
}
public SockJSSocket pause() {
ws.pause();
return this;
}
public SockJSSocket resume() {
ws.resume();
return this;
}
@Override
public SockJSSocket fetch(long amount) {
ws.fetch(amount);
return this;
}
private synchronized boolean canWrite(Handler<AsyncResult<Void>> handler) {
if (closed) {
if (handler != null) {
vertx.runOnContext(v -> handler.handle(Future.failedFuture(ConnectionBase.CLOSED_EXCEPTION)));
}
return false;
}
return true;
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
if (canWrite(handler)) {
ws.writeBinaryMessage(data, handler);
}
}
@Override
public void write(String data, Handler<AsyncResult<Void>> handler) {
if (canWrite(handler)) {
ws.writeTextMessage(data, handler);
}
}
public SockJSSocket setWriteQueueMaxSize(int maxQueueSize) {
ws.setWriteQueueMaxSize(maxQueueSize);
return this;
}
public boolean writeQueueFull() {
return ws.writeQueueFull();
}
public SockJSSocket drainHandler(Handler<Void> handler) {
ws.drainHandler(handler);
return this;
}
public SockJSSocket exceptionHandler(Handler<Throwable> handler) {
ws.exceptionHandler(handler);
return this;
}
public SockJSSocket endHandler(Handler<Void> endHandler) {
ws.endHandler(endHandler);
return this;
}
public void close() {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
super.close();
ws.close();
}
public void closeAfterSessionExpired() {
this.close((short) 1001, "Session expired");
}
public void close(int statusCode, String reason) {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
super.close();
ws.close((short) statusCode, reason);
}
@Override
public SocketAddress remoteAddress() {
return ws.remoteAddress();
}
@Override
public SocketAddress localAddress() {
return ws.localAddress();
}
@Override
public MultiMap headers() {
if (headers == null) {
headers = BaseTransport.removeCookieHeaders(ws.headers());
}
return headers;
}
@Override
public String uri() {
return ws.uri();
}
}
RawWebSocketTransport(Vertx vertx, Router router, SockJSHandlerOptions options, Handler<SockJSSocket> sockHandler) {
String wsRE = "/websocket";
router.get(wsRE).handler(rc -> {
rc.request().pause();
rc.request().toWebSocket(toWebSocket -> {
if (toWebSocket.succeeded()) {
rc.request().resume();
SockJSSocket sock = new RawWSSockJSSocket(vertx, rc.session(), rc.user(), options, toWebSocket.result());
sockHandler.handle(sock);
} else {
rc.fail(toWebSocket.cause());
}
});
});
router.get(wsRE).handler(rc -> rc.response().setStatusCode(400).end("Can \"Upgrade\" only to \"WebSocket\"."));
router.get(wsRE).handler(rc -> rc.response().putHeader(HttpHeaders.ALLOW, "GET").setStatusCode(405).end());
}
}