package io.vertx.ext.web.client.impl;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.codec.spi.BodyStream;
import io.vertx.ext.web.multipart.MultipartForm;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
public class HttpContext<T> {
private final Handler<AsyncResult<HttpResponse<T>>> handler;
private final HttpClientImpl client;
private final List<Handler<HttpContext<?>>> interceptors;
private Context context;
private HttpRequestImpl<T> request;
private Object body;
private String contentType;
private Map<String, Object> attrs;
private int interceptorIdx;
private boolean invoking;
private boolean invokeNext;
private ClientPhase phase;
private RequestOptions requestOptions;
private HttpClientRequest clientRequest;
private HttpClientResponse clientResponse;
private HttpResponse<T> response;
private Throwable failure;
private int redirects;
private List<String> redirectedLocations = Collections.emptyList();
HttpContext(HttpClientImpl client, List<Handler<HttpContext<?>>> interceptors, Handler<AsyncResult<HttpResponse<T>>> handler) {
this.handler = handler;
this.client = client;
this.interceptors = interceptors;
}
public HttpClientRequest clientRequest() {
return clientRequest;
}
public HttpClientResponse clientResponse() {
return clientResponse;
}
public ClientPhase phase() {
return phase;
}
public HttpRequest<T> request() {
return request;
}
public RequestOptions requestOptions() {
return requestOptions;
}
public void setRequestOptions(RequestOptions requestOptions) {
this.requestOptions = requestOptions;
}
public HttpResponse<T> response() {
return response;
}
public HttpContext<T> response(HttpResponse<T> response) {
this.response = response;
return this;
}
public int redirects() {
return redirects;
}
public HttpContext<T> redirects(int redirects) {
this.redirects = redirects;
return this;
}
public String contentType() {
return contentType;
}
public Object body() {
return body;
}
public Throwable failure() {
return failure;
}
public List<String> getRedirectedLocations() {
return redirectedLocations;
}
public void prepareRequest(HttpRequest<T> request, String contentType, Object body) {
this.request = (HttpRequestImpl<T>) request;
this.contentType = contentType;
this.body = body;
fire(ClientPhase.PREPARE_REQUEST);
}
public void sendRequest(RequestOptions requestOptions) {
this.requestOptions = requestOptions;
fire(ClientPhase.SEND_REQUEST);
}
public void followRedirect() {
fire(ClientPhase.SEND_REQUEST);
}
public void receiveResponse(HttpClientResponse clientResponse) {
int sc = clientResponse.statusCode();
int maxRedirects = request.followRedirects ? client.getOptions().getMaxRedirects(): 0;
this.clientResponse = clientResponse;
if (redirects < maxRedirects && sc >= 300 && sc < 400) {
redirects++;
Future<RequestOptions> next = client.redirectHandler().apply(clientResponse);
if (next != null) {
if (redirectedLocations.isEmpty()) {
redirectedLocations = new ArrayList<>();
}
redirectedLocations.add(clientResponse.getHeader(HttpHeaders.LOCATION));
next.onComplete(ar -> {
if (ar.succeeded()) {
requestOptions = ar.result();
fire(ClientPhase.FOLLOW_REDIRECT);
} else {
fail(ar.cause());
}
});
return;
}
}
this.clientResponse = clientResponse;
fire(ClientPhase.RECEIVE_RESPONSE);
}
public void dispatchResponse(HttpResponse<T> response) {
this.response = response;
fire(ClientPhase.DISPATCH_RESPONSE);
}
public boolean fail(Throwable cause) {
if (phase == ClientPhase.FAILURE) {
return false;
}
failure = cause;
fire(ClientPhase.FAILURE);
return true;
}
private void fire(ClientPhase phase) {
Objects.requireNonNull(phase);
this.phase = phase;
if (invoking) {
this.interceptorIdx = 0;
this.invokeNext = true;
} else {
next();
}
}
public void next() {
if (invoking) {
invokeNext = true;
} else {
while (interceptorIdx < interceptors.size()) {
Handler<HttpContext<?>> interceptor = interceptors.get(interceptorIdx);
invoking = true;
interceptorIdx++;
try {
interceptor.handle(this);
} catch (Exception e ) {
failure = e;
invokeNext = false;
phase = ClientPhase.FAILURE;
break;
} finally {
invoking = false;
}
if (!invokeNext) {
return;
}
invokeNext = false;
}
interceptorIdx = 0;
execute();
}
}
private void execute() {
switch (phase) {
case PREPARE_REQUEST:
handlePrepareRequest();
break;
case SEND_REQUEST:
handleSendRequest();
break;
case FOLLOW_REDIRECT:
followRedirect();
break;
case RECEIVE_RESPONSE:
handleReceiveResponse();
break;
case DISPATCH_RESPONSE:
handleDispatchResponse();
break;
case FAILURE:
handleFailure();
break;
}
}
private void handleFailure() {
handler.handle(Future.failedFuture(failure));
}
private void handleDispatchResponse() {
handler.handle(Future.succeededFuture(response));
}
private void handlePrepareRequest() {
context = client.getVertx().getOrCreateContext();
String requestURI;
if (request.params != null && request.params.size() > 0) {
QueryStringEncoder enc = new QueryStringEncoder(request.uri);
request.params.forEach(param -> enc.addParam(param.getKey(), param.getValue()));
requestURI = enc.toString();
} else {
requestURI = request.uri;
}
int port = request.port();
String host = request.host();
RequestOptions options = new RequestOptions();
if (request.ssl != null && request.ssl != request.options.isSsl()) {
options.setServer(request.serverAddress)
.setMethod(request.method)
.setSsl(request.ssl)
.setHost(host)
.setPort(port)
.setURI(requestURI);
} else {
if (request.protocol != null && !request.protocol.equals("http") && !request.protocol.equals("https")) {
try {
URI uri = new URI(request.protocol, null, host, port, requestURI, null, null);
options.setServer(request.serverAddress)
.setMethod(request.method)
.setAbsoluteURI(uri.toString());
} catch (URISyntaxException ex) {
fail(ex);
return;
}
} else {
options.setServer(request.serverAddress)
.setMethod(request.method)
.setHost(host)
.setPort(port)
.setURI(requestURI);
}
}
redirects = 0;
if (request.virtualHost != null) {
if (options.getServer() == null) {
options.setServer(SocketAddress.inetSocketAddress(options.getPort(), options.getHost()));
}
options.setHost(request.virtualHost);
}
sendRequest(options);
}
private void handleReceiveResponse() {
HttpClientResponse resp = clientResponse;
Context context = Vertx.currentContext();
Promise<HttpResponse<T>> promise = Promise.promise();
promise.future().onComplete(r -> {
context.runOnContext(v -> {
if (r.succeeded()) {
dispatchResponse(r.result());
} else {
fail(r.cause());
}
});
});
resp.exceptionHandler(err -> {
if (!promise.future().isComplete()) {
promise.fail(err);
}
});
Pipe<Buffer> pipe = resp.pipe();
request.codec.create(ar1 -> {
if (ar1.succeeded()) {
BodyStream<T> stream = ar1.result();
pipe.to(stream, ar2 -> {
if (ar2.succeeded()) {
stream.result().onComplete(ar3 -> {
if (ar3.succeeded()) {
promise.complete(new HttpResponseImpl<>(
resp.version(),
resp.statusCode(),
resp.statusMessage(),
resp.headers(),
resp.trailers(),
resp.cookies(),
stream.result().result(),
redirectedLocations
));
} else {
promise.fail(ar3.cause());
}
});
} else {
promise.fail(ar2.cause());
}
});
} else {
pipe.close();
fail(ar1.cause());
}
});
}
private void handleSendRequest() {
if (request.headers != null) {
MultiMap headers = requestOptions.getHeaders();
if (headers == null) {
headers = MultiMap.caseInsensitiveMultiMap();
requestOptions.setHeaders(headers);
}
headers.addAll(request.headers);
}
if (contentType != null) {
String prev = requestOptions.getHeaders().get(HttpHeaders.CONTENT_TYPE);
if (prev == null) {
requestOptions.addHeader(HttpHeaders.CONTENT_TYPE, contentType);
} else {
contentType = prev;
}
}
requestOptions.setTimeout(this.request.timeout);
Handler<AsyncResult<HttpClientRequest>> continuation;
if (body != null || "application/json".equals(contentType)) {
if (body instanceof MultiMap) {
MultipartForm parts = MultipartForm.create();
MultiMap attributes = (MultiMap) body;
for (Map.Entry<String, String> attribute : attributes) {
parts.attribute(attribute.getKey(), attribute.getValue());
}
body = parts;
}
if (body instanceof MultipartForm) {
MultipartFormUpload multipartForm;
try {
boolean multipart = "multipart/form-data".equals(contentType);
HttpPostRequestEncoder.EncoderMode encoderMode = this.request.multipartMixed ? HttpPostRequestEncoder.EncoderMode.RFC1738 : HttpPostRequestEncoder.EncoderMode.HTML5;
multipartForm = new MultipartFormUpload(context, (MultipartForm) this.body, multipart, encoderMode);
this.body = multipartForm;
} catch (Exception e) {
fail(e);
return;
}
for (String headerName : this.request.headers().names()) {
requestOptions.putHeader(headerName, this.request.headers().get(headerName));
}
multipartForm.headers().forEach(header -> {
requestOptions.putHeader(header.getKey(), header.getValue());
});
multipartForm.run();
}
if (body instanceof ReadStream<?>) {
ReadStream<Buffer> stream = (ReadStream<Buffer>) body;
Pipe<Buffer> pipe = stream.pipe();
continuation = ar -> {
if (ar.succeeded()) {
HttpClientRequest req = ar.result();
if (this.request.headers == null || !this.request.headers.contains(HttpHeaders.CONTENT_LENGTH)) {
req.setChunked(true);
}
pipe.endOnFailure(false);
pipe.to(req, ar2 -> {
clientRequest = null;
if (ar2.failed()) {
req.reset(0L, ar2.cause());
}
});
} else {
clientRequest = null;
pipe.close();
}
};
} else {
Buffer buffer;
if (body instanceof Buffer) {
buffer = (Buffer) body;
} else if (body instanceof JsonObject) {
buffer = Buffer.buffer(((JsonObject)body).encode());
} else {
buffer = Buffer.buffer(Json.encode(body));
}
continuation = ar -> {
if (ar.succeeded()) {
clientRequest = null;
HttpClientRequest req = ar.result();
req.putHeader(HttpHeaders.CONTENT_LENGTH, "" + buffer.length());
req.end(buffer);
}
};
}
} else {
continuation = ar -> {
if (ar.succeeded()) {
clientRequest = null;
HttpClientRequest req = ar.result();
req.end();
}
};
}
Future<HttpClientRequest> f = client.request(requestOptions);
f.onComplete(ar1 -> {
if (ar1.succeeded()) {
clientRequest = ar1.result();
HttpClientRequest req = ar1.result();
req.response(ar2 -> {
if (ar2.succeeded()) {
HttpClientResponse resp = ar2.result();
resp.pause();
receiveResponse(resp);
} else {
fail(ar2.cause());
}
});
} else {
fail(ar1.cause());
}
continuation.handle(ar1);
});
}
public <T> T get(String key) {
return attrs != null ? (T) attrs.get(key) : null;
}
public HttpContext<T> set(String key, Object value) {
if (value == null) {
if (attrs != null) {
attrs.remove(key);
}
} else {
if (attrs == null) {
attrs = new HashMap<>();
}
attrs.put(key, value);
}
return this;
}
}