package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.impl.headers.VertxHttpHeaders;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static io.vertx.core.http.HttpHeaders.*;
public class HttpClientRequestImpl extends HttpClientRequestBase implements HttpClientRequest {
static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class);
private final VertxInternal vertx;
private boolean chunked;
private String ;
private String rawMethod;
private Handler<Void> continueHandler;
private Handler<Void> drainHandler;
private Handler<HttpClientRequest> pushHandler;
private Handler<HttpConnection> connectionHandler;
private Handler<Throwable> exceptionHandler;
private Promise<Void> endPromise = Promise.promise();
private Future<Void> endFuture = endPromise.future();
private boolean ended;
private Throwable reset;
private ByteBuf pendingChunks;
private List<Handler<AsyncResult<Void>>> pendingHandlers;
private int pendingMaxSize = -1;
private int followRedirects;
private VertxHttpHeaders ;
private StreamPriority priority;
private HttpClientStream stream;
private boolean connecting;
private Handler<HttpClientResponse> respHandler;
private Handler<Void> endHandler;
HttpClientRequestImpl(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server,
String host, int port,
String relativeURI, VertxInternal vertx) {
super(client, ssl, method, server, host, port, relativeURI);
this.chunked = false;
this.vertx = vertx;
this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY;
}
@Override
public synchronized int streamId() {
return stream == null ? -1 : stream.id();
}
@Override
public synchronized HttpClientRequest handler(Handler<HttpClientResponse> handler) {
if (handler != null) {
checkEnded();
}
respHandler = handler;
return this;
}
@Override
public HttpClientRequest setFollowRedirects(boolean followRedirects) {
synchronized (this) {
checkEnded();
if (followRedirects) {
this.followRedirects = client.getOptions().getMaxRedirects() - 1;
} else {
this.followRedirects = 0;
}
return this;
}
}
@Override
public HttpClientRequest setMaxRedirects(int maxRedirects) {
Arguments.require(maxRedirects >= 0, "Max redirects must be >= 0");
synchronized (this) {
checkEnded();
followRedirects = maxRedirects;
return this;
}
}
@Override
public HttpClientRequest endHandler(Handler<Void> handler) {
synchronized (this) {
if (handler != null) {
checkEnded();
}
endHandler = handler;
return this;
}
}
@Override
public HttpClientRequestImpl setChunked(boolean chunked) {
synchronized (this) {
checkEnded();
if (stream != null) {
throw new IllegalStateException("Cannot set chunked after data has been written on request");
}
if (client.getOptions().getProtocolVersion() != io.vertx.core.http.HttpVersion.HTTP_1_0) {
this.chunked = chunked;
}
return this;
}
}
@Override
public synchronized boolean isChunked() {
return chunked;
}
@Override
public synchronized String getRawMethod() {
return rawMethod;
}
@Override
public synchronized HttpClientRequest setRawMethod(String method) {
this.rawMethod = method;
return this;
}
@Override
public synchronized HttpClientRequest setHost(String host) {
this.hostHeader = host;
return this;
}
@Override
public synchronized String getHost() {
return hostHeader;
}
@Override
public synchronized MultiMap () {
if (headers == null) {
headers = new VertxHttpHeaders();
}
return headers;
}
@Override
public synchronized HttpClientRequest (String name, String value) {
checkEnded();
headers().set(name, value);
return this;
}
@Override
public synchronized HttpClientRequest (String name, Iterable<String> values) {
checkEnded();
headers().set(name, values);
return this;
}
@Override
public HttpClientRequest setWriteQueueMaxSize(int maxSize) {
HttpClientStream s;
synchronized (this) {
checkEnded();
if ((s = stream) == null) {
pendingMaxSize = maxSize;
return this;
}
}
s.doSetWriteQueueMaxSize(maxSize);
return this;
}
@Override
public boolean writeQueueFull() {
HttpClientStream s;
synchronized (this) {
checkEnded();
if ((s = stream) == null) {
return false;
}
}
return s.isNotWritable();
}
@Override
public HttpClientRequest drainHandler(Handler<Void> handler) {
synchronized (this) {
if (handler != null) {
checkEnded();
drainHandler = handler;
HttpClientStream s;
if ((s = stream) == null) {
return this;
}
s.getContext().runOnContext(v -> {
synchronized (HttpClientRequestImpl.this) {
if (!stream.isNotWritable()) {
handleDrained();
}
}
});
} else {
drainHandler = null;
}
return this;
}
}
@Override
public synchronized HttpClientRequest continueHandler(Handler<Void> handler) {
if (handler != null) {
checkEnded();
}
this.continueHandler = handler;
return this;
}
@Override
public HttpClientRequest sendHead() {
return sendHead(null);
}
@Override
public synchronized HttpClientRequest sendHead(Handler<HttpVersion> headersHandler) {
checkEnded();
checkResponseHandler();
if (stream != null) {
throw new IllegalStateException("Head already written");
} else {
connect(headersHandler);
}
return this;
}
@Override
public synchronized HttpClientRequest (CharSequence name, CharSequence value) {
checkEnded();
headers().set(name, value);
return this;
}
@Override
public synchronized HttpClientRequest (CharSequence name, Iterable<CharSequence> values) {
checkEnded();
headers().set(name, values);
return this;
}
@Override
public synchronized HttpClientRequest pushHandler(Handler<HttpClientRequest> handler) {
pushHandler = handler;
return this;
}
@Override
boolean reset(Throwable cause) {
HttpClientStream s;
synchronized (this) {
if (reset != null) {
return false;
}
reset = cause;
s = stream;
}
if (s != null) {
s.reset(cause);
} else {
handleException(cause);
}
return true;
}
private void tryComplete() {
endPromise.tryComplete();
}
@Override
public HttpConnection connection() {
HttpClientStream s;
synchronized (this) {
if ((s = stream) == null) {
return null;
}
}
return s.connection();
}
@Override
public synchronized HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> handler) {
connectionHandler = handler;
return this;
}
@Override
public synchronized HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) {
HttpClientStream s;
synchronized (this) {
checkEnded();
if ((s = stream) == null) {
throw new IllegalStateException("Not yet connected");
}
}
s.writeFrame(type, flags, payload.getByteBuf());
return this;
}
void handleDrained() {
Handler<Void> handler;
synchronized (this) {
if ((handler = drainHandler) == null || endFuture.isComplete()) {
return;
}
}
try {
handler.handle(null);
} catch (Throwable t) {
handleException(t);
}
}
private void handleNextRequest(HttpClientRequest next, long timeoutMs) {
next.handler(respHandler);
next.exceptionHandler(exceptionHandler());
exceptionHandler(null);
next.endHandler(endHandler);
next.pushHandler(pushHandler);
next.setMaxRedirects(followRedirects - 1);
if (next.getHost() == null) {
next.setHost(hostHeader);
}
if (headers != null) {
next.headers().addAll(headers);
}
endFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (timeoutMs > 0) {
next.setTimeout(timeoutMs);
}
next.end();
} else {
next.reset(0);
}
});
}
@Override
public void handleException(Throwable t) {
super.handleException(t);
endPromise.tryFail(t);
}
void handleResponse(HttpClientResponse resp, long timeoutMs) {
if (reset == null) {
int statusCode = resp.statusCode();
if (followRedirects > 0 && statusCode >= 300 && statusCode < 400) {
Future<HttpClientRequest> next = client.redirectHandler().apply(resp);
if (next != null) {
next.setHandler(ar -> {
if (ar.succeeded()) {
handleNextRequest(ar.result(), timeoutMs);
} else {
handleException(ar.cause());
}
});
return;
}
}
if (respHandler != null) {
respHandler.handle(resp);
}
if (endHandler != null) {
endHandler.handle(null);
}
}
}
@Override
protected String () {
return hostHeader != null ? hostHeader : super.hostHeader();
}
private synchronized void connect(Handler<HttpVersion> headersHandler) {
if (!connecting) {
if (method == HttpMethod.OTHER && rawMethod == null) {
throw new IllegalStateException("You must provide a rawMethod when using an HttpMethod.OTHER method");
}
SocketAddress peerAddress;
if (hostHeader != null) {
int idx = hostHeader.lastIndexOf(':');
if (idx != -1) {
peerAddress = SocketAddress.inetSocketAddress(Integer.parseInt(hostHeader.substring(idx + 1)), hostHeader.substring(0, idx));
} else {
peerAddress = SocketAddress.inetSocketAddress(80, hostHeader);
}
} else {
peerAddress = SocketAddress.inetSocketAddress(port, host);
}
Handler<HttpConnection> h1 = connectionHandler;
Handler<HttpConnection> h2 = client.connectionHandler();
Handler<HttpConnection> initializer;
if (h1 != null) {
if (h2 != null) {
initializer = conn -> {
h1.handle(conn);
h2.handle(conn);
};
} else {
initializer = h1;
}
} else {
initializer = h2;
}
ContextInternal connectCtx = vertx.getOrCreateContext();
connecting = true;
client.getConnectionForRequest(connectCtx, peerAddress, ssl, server, ar1 -> {
if (ar1.succeeded()) {
HttpClientStream stream = ar1.result();
ContextInternal ctx = (ContextInternal) stream.getContext();
if (stream.id() == 1 && initializer != null) {
ctx.executeFromIO(v -> {
initializer.handle(stream.connection());
});
}
if (reset != null) {
stream.reset(reset);
} else {
ctx.executeFromIO(v -> {
connected(headersHandler, stream);
});
}
} else {
connectCtx.executeFromIO(v -> {
handleException(ar1.cause());
});
}
});
}
}
private void connected(Handler<HttpVersion> headersHandler, HttpClientStream stream) {
synchronized (this) {
this.stream = stream;
stream.beginRequest(this);
if (pendingMaxSize != -1) {
stream.doSetWriteQueueMaxSize(pendingMaxSize);
}
ByteBuf pending = null;
Handler<AsyncResult<Void>> handler = null;
if (pendingChunks != null) {
List<Handler<AsyncResult<Void>>> handlers = pendingHandlers;
pendingHandlers = null;
pending = pendingChunks;
pendingChunks = null;
if (handlers != null) {
handler = ar -> {
handlers.forEach(h -> h.handle(ar));
};
}
}
stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, ended, priority, continueHandler, handler);
if (ended) {
stream.endRequest();
tryComplete();
}
this.connecting = false;
this.stream = stream;
}
if (headersHandler != null) {
headersHandler.handle(stream.version());
}
}
@Override
public void end(String chunk) {
end(chunk, (Handler<AsyncResult<Void>>) null);
}
@Override
public void end(String chunk, Handler<AsyncResult<Void>> handler) {
end(Buffer.buffer(chunk), handler);
}
@Override
public void end(String chunk, String enc) {
end(chunk, enc, null);
}
@Override
public void end(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
Objects.requireNonNull(enc, "no null encoding accepted");
end(Buffer.buffer(chunk, enc), handler);
}
@Override
public void end(Buffer chunk) {
write(chunk.getByteBuf(), true, null);
}
@Override
public void end(Buffer chunk, Handler<AsyncResult<Void>> handler) {
write(chunk.getByteBuf(), true, handler);
}
@Override
public void end() {
write(null, true, null);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
write(null, true, handler);
}
@Override
public HttpClientRequest write(Buffer chunk) {
return write(chunk, null);
}
@Override
public HttpClientRequest write(Buffer chunk, Handler<AsyncResult<Void>> handler) {
ByteBuf buf = chunk.getByteBuf();
write(buf, false, handler);
return this;
}
@Override
public HttpClientRequest write(String chunk) {
return write(chunk, (Handler<AsyncResult<Void>>) null);
}
@Override
public HttpClientRequest write(String chunk, Handler<AsyncResult<Void>> handler) {
write(Buffer.buffer(chunk).getByteBuf(), false, handler);
return this;
}
@Override
public HttpClientRequest write(String chunk, String enc) {
return write(chunk, enc, null);
}
@Override
public HttpClientRequest write(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
Objects.requireNonNull(enc, "no null encoding accepted");
write(Buffer.buffer(chunk, enc).getByteBuf(), false, handler);
return this;
}
private boolean requiresContentLength() {
return !chunked && (headers == null || !headers.contains(CONTENT_LENGTH));
}
private void write(ByteBuf buff, boolean end, Handler<AsyncResult<Void>> completionHandler) {
if (buff == null && !end) {
return;
}
HttpClientStream s;
synchronized (this) {
checkEnded();
checkResponseHandler();
if (end) {
if (buff != null && requiresContentLength()) {
headers().set(CONTENT_LENGTH, String.valueOf(buff.readableBytes()));
}
} else if (requiresContentLength()) {
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
+ "body BEFORE sending any data if you are not using HTTP chunked encoding.");
}
ended |= end;
if (stream == null) {
if (buff != null) {
if (pendingChunks == null) {
pendingChunks = buff;
} else {
CompositeByteBuf pending;
if (pendingChunks instanceof CompositeByteBuf) {
pending = (CompositeByteBuf) pendingChunks;
} else {
pending = Unpooled.compositeBuffer();
pending.addComponent(true, pendingChunks);
pendingChunks = pending;
}
pending.addComponent(true, buff);
}
if (completionHandler != null) {
if (pendingHandlers == null) {
pendingHandlers = new ArrayList<>();
}
pendingHandlers.add(completionHandler);
}
}
connect(null);
return;
}
s = stream;
}
s.writeBuffer(buff, end, completionHandler);
if (end) {
s.endRequest();
tryComplete();
}
}
protected void checkEnded() {
if (ended) {
throw new IllegalStateException("Request already complete");
}
}
private void checkResponseHandler() {
if (respHandler == null) {
throw new IllegalStateException("You must set an handler for the HttpClientResponse before connecting");
}
}
synchronized Handler<HttpClientRequest> pushHandler() {
return pushHandler;
}
@Override
public synchronized HttpClientRequest setStreamPriority(StreamPriority priority) {
synchronized (this) {
if (stream != null) {
stream.updatePriority(priority);
} else {
this.priority = priority;
}
}
return this;
}
@Override
public synchronized StreamPriority getStreamPriority() {
HttpClientStream s = stream;
return s != null ? s.priority() : priority;
}
}