package io.vertx.ext.stomp;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.ext.stomp.impl.FrameParser;
import io.vertx.ext.stomp.utils.Headers;
import io.vertx.ext.stomp.utils.Server;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class DefaultConnectHandler implements Handler<ServerFrame> {
@Override
public void handle(ServerFrame sf) {
List<String> accepted = new ArrayList<>();
String accept = sf.frame().getHeader(Frame.ACCEPT_VERSION);
if (accept == null) {
accepted.add("1.0");
} else {
accepted.addAll(Arrays.asList(accept.split(FrameParser.COMMA)));
}
String version = negotiate(accepted, sf.connection());
if (version == null) {
sf.connection().write(Frames.createErrorFrame(
"Incompatible versions",
Headers.create(
Frame.VERSION, getSupportedVersionsHeaderLine(sf.connection()),
Frame.CONTENT_TYPE, "text/plain"),
"Client protocol requirement does not mach versions supported by the server. " +
"Supported protocol versions are " + getSupportedVersionsHeaderLine(sf.connection()))
);
sf.connection().close();
return;
}
authenticate(sf.frame(), sf.connection(), ar -> {
sf.connection().write(new Frame(Frame.Command.CONNECTED, Headers.create(
Frame.VERSION, version,
Frame.SERVER, Server.SERVER_NAME,
Frame.SESSION, sf.connection().session(),
Frame.HEARTBEAT, Frame.Heartbeat.create(sf.connection().server().options().getHeartbeat()).toString()), null));
});
}
private void authenticate(Frame frame, StompServerConnection connection,
Handler<AsyncResult<Void>> remainingActions) {
if (connection.server().options().isSecured()) {
String login = frame.getHeader(Frame.LOGIN);
String passcode = frame.getHeader(Frame.PASSCODE);
connection.handler().onAuthenticationRequest(connection, login, passcode, ar -> {
if (ar.result()) {
remainingActions.handle(Future.succeededFuture());
} else {
connection.write(Frames.createErrorFrame(
"Authentication failed",
Headers.create(
Frame.VERSION, getSupportedVersionsHeaderLine(connection),
Frame.CONTENT_TYPE, "text/plain"),
"The connection frame does not contain valid credentials.")
);
connection.close();
}
});
} else {
remainingActions.handle(Future.succeededFuture());
}
}
private String getSupportedVersionsHeaderLine(StompServerConnection connection) {
StringBuilder builder = new StringBuilder();
connection.server().options().getSupportedVersions().stream().forEach(
v -> builder.append(builder.length() == 0 ? v : FrameParser.COMMA + v));
return builder.toString();
}
private String negotiate(List<String> accepted, StompServerConnection connection) {
List<String> supported = connection.server().options().getSupportedVersions();
for (String v : supported) {
if (accepted.contains(v)) {
return v;
}
}
return null;
}
}