/*
 * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
 * which is available at https://www.apache.org/licenses/LICENSE-2.0.
 *
 * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
 */

package io.vertx.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.impl.headers.VertxHttpHeaders;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static io.vertx.core.http.HttpHeaders.*;

This class is optimised for performance when used on the same event loop that is was passed to the handler with. However it can be used safely from other threads. The internal state is protected using the synchronized keyword. If always used on the same event loop, then we benefit from biased locking which makes the overhead of synchronized near zero. This class uses this for synchronization purpose. The HttpClientRequestBase.client orstream instead are called must not be called under this lock to avoid deadlocks.
Author:Tim Fox
/** * This class is optimised for performance when used on the same event loop that is was passed to the handler with. * However it can be used safely from other threads. * * The internal state is protected using the synchronized keyword. If always used on the same event loop, then * we benefit from biased locking which makes the overhead of synchronized near zero. * * This class uses {@code this} for synchronization purpose. The {@link #client} or{@link #stream} instead are * called must not be called under this lock to avoid deadlocks. * * @author <a href="http://tfox.org">Tim Fox</a> */
public class HttpClientRequestImpl extends HttpClientRequestBase implements HttpClientRequest { static final Logger log = LoggerFactory.getLogger(HttpClientRequestImpl.class); private final VertxInternal vertx; private boolean chunked; private String hostHeader; private String rawMethod; private Handler<Void> continueHandler; private Handler<Void> drainHandler; private Handler<HttpClientRequest> pushHandler; private Handler<HttpConnection> connectionHandler; private Handler<Throwable> exceptionHandler; private Promise<Void> endPromise = Promise.promise(); private Future<Void> endFuture = endPromise.future(); private boolean ended; private Throwable reset; private ByteBuf pendingChunks; private List<Handler<AsyncResult<Void>>> pendingHandlers; private int pendingMaxSize = -1; private int followRedirects; private VertxHttpHeaders headers; private StreamPriority priority; private HttpClientStream stream; private boolean connecting; private Handler<HttpClientResponse> respHandler; private Handler<Void> endHandler; HttpClientRequestImpl(HttpClientImpl client, boolean ssl, HttpMethod method, SocketAddress server, String host, int port, String relativeURI, VertxInternal vertx) { super(client, ssl, method, server, host, port, relativeURI); this.chunked = false; this.vertx = vertx; this.priority = HttpUtils.DEFAULT_STREAM_PRIORITY; } @Override public synchronized int streamId() { return stream == null ? -1 : stream.id(); } @Override public synchronized HttpClientRequest handler(Handler<HttpClientResponse> handler) { if (handler != null) { checkEnded(); } respHandler = handler; return this; } @Override public HttpClientRequest setFollowRedirects(boolean followRedirects) { synchronized (this) { checkEnded(); if (followRedirects) { this.followRedirects = client.getOptions().getMaxRedirects() - 1; } else { this.followRedirects = 0; } return this; } } @Override public HttpClientRequest setMaxRedirects(int maxRedirects) { Arguments.require(maxRedirects >= 0, "Max redirects must be >= 0"); synchronized (this) { checkEnded(); followRedirects = maxRedirects; return this; } } @Override public HttpClientRequest endHandler(Handler<Void> handler) { synchronized (this) { if (handler != null) { checkEnded(); } endHandler = handler; return this; } } @Override public HttpClientRequestImpl setChunked(boolean chunked) { synchronized (this) { checkEnded(); if (stream != null) { throw new IllegalStateException("Cannot set chunked after data has been written on request"); } // HTTP 1.0 does not support chunking so we ignore this if HTTP 1.0 if (client.getOptions().getProtocolVersion() != io.vertx.core.http.HttpVersion.HTTP_1_0) { this.chunked = chunked; } return this; } } @Override public synchronized boolean isChunked() { return chunked; } @Override public synchronized String getRawMethod() { return rawMethod; } @Override public synchronized HttpClientRequest setRawMethod(String method) { this.rawMethod = method; return this; } @Override public synchronized HttpClientRequest setHost(String host) { this.hostHeader = host; return this; } @Override public synchronized String getHost() { return hostHeader; } @Override public synchronized MultiMap headers() { if (headers == null) { headers = new VertxHttpHeaders(); } return headers; } @Override public synchronized HttpClientRequest putHeader(String name, String value) { checkEnded(); headers().set(name, value); return this; } @Override public synchronized HttpClientRequest putHeader(String name, Iterable<String> values) { checkEnded(); headers().set(name, values); return this; } @Override public HttpClientRequest setWriteQueueMaxSize(int maxSize) { HttpClientStream s; synchronized (this) { checkEnded(); if ((s = stream) == null) { pendingMaxSize = maxSize; return this; } } s.doSetWriteQueueMaxSize(maxSize); return this; } @Override public boolean writeQueueFull() { HttpClientStream s; synchronized (this) { checkEnded(); if ((s = stream) == null) { // Should actually check with max queue size and not always blindly return false return false; } } return s.isNotWritable(); } @Override public HttpClientRequest drainHandler(Handler<Void> handler) { synchronized (this) { if (handler != null) { checkEnded(); drainHandler = handler; HttpClientStream s; if ((s = stream) == null) { return this; } s.getContext().runOnContext(v -> { synchronized (HttpClientRequestImpl.this) { if (!stream.isNotWritable()) { handleDrained(); } } }); } else { drainHandler = null; } return this; } } @Override public synchronized HttpClientRequest continueHandler(Handler<Void> handler) { if (handler != null) { checkEnded(); } this.continueHandler = handler; return this; } @Override public HttpClientRequest sendHead() { return sendHead(null); } @Override public synchronized HttpClientRequest sendHead(Handler<HttpVersion> headersHandler) { checkEnded(); checkResponseHandler(); if (stream != null) { throw new IllegalStateException("Head already written"); } else { connect(headersHandler); } return this; } @Override public synchronized HttpClientRequest putHeader(CharSequence name, CharSequence value) { checkEnded(); headers().set(name, value); return this; } @Override public synchronized HttpClientRequest putHeader(CharSequence name, Iterable<CharSequence> values) { checkEnded(); headers().set(name, values); return this; } @Override public synchronized HttpClientRequest pushHandler(Handler<HttpClientRequest> handler) { pushHandler = handler; return this; } @Override boolean reset(Throwable cause) { HttpClientStream s; synchronized (this) { if (reset != null) { return false; } reset = cause; s = stream; } if (s != null) { s.reset(cause); } else { handleException(cause); } return true; } private void tryComplete() { endPromise.tryComplete(); } @Override public HttpConnection connection() { HttpClientStream s; synchronized (this) { if ((s = stream) == null) { return null; } } return s.connection(); } @Override public synchronized HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> handler) { connectionHandler = handler; return this; } @Override public synchronized HttpClientRequest writeCustomFrame(int type, int flags, Buffer payload) { HttpClientStream s; synchronized (this) { checkEnded(); if ((s = stream) == null) { throw new IllegalStateException("Not yet connected"); } } s.writeFrame(type, flags, payload.getByteBuf()); return this; } void handleDrained() { Handler<Void> handler; synchronized (this) { if ((handler = drainHandler) == null || endFuture.isComplete()) { return; } } try { handler.handle(null); } catch (Throwable t) { handleException(t); } } private void handleNextRequest(HttpClientRequest next, long timeoutMs) { next.handler(respHandler); next.exceptionHandler(exceptionHandler()); exceptionHandler(null); next.endHandler(endHandler); next.pushHandler(pushHandler); next.setMaxRedirects(followRedirects - 1); if (next.getHost() == null) { next.setHost(hostHeader); } if (headers != null) { next.headers().addAll(headers); } endFuture.setHandler(ar -> { if (ar.succeeded()) { if (timeoutMs > 0) { next.setTimeout(timeoutMs); } next.end(); } else { next.reset(0); } }); } @Override public void handleException(Throwable t) { super.handleException(t); endPromise.tryFail(t); } void handleResponse(HttpClientResponse resp, long timeoutMs) { if (reset == null) { int statusCode = resp.statusCode(); if (followRedirects > 0 && statusCode >= 300 && statusCode < 400) { Future<HttpClientRequest> next = client.redirectHandler().apply(resp); if (next != null) { next.setHandler(ar -> { if (ar.succeeded()) { handleNextRequest(ar.result(), timeoutMs); } else { handleException(ar.cause()); } }); return; } } if (respHandler != null) { respHandler.handle(resp); } if (endHandler != null) { endHandler.handle(null); } } } @Override protected String hostHeader() { return hostHeader != null ? hostHeader : super.hostHeader(); } private synchronized void connect(Handler<HttpVersion> headersHandler) { if (!connecting) { if (method == HttpMethod.OTHER && rawMethod == null) { throw new IllegalStateException("You must provide a rawMethod when using an HttpMethod.OTHER method"); } SocketAddress peerAddress; if (hostHeader != null) { int idx = hostHeader.lastIndexOf(':'); if (idx != -1) { peerAddress = SocketAddress.inetSocketAddress(Integer.parseInt(hostHeader.substring(idx + 1)), hostHeader.substring(0, idx)); } else { peerAddress = SocketAddress.inetSocketAddress(80, hostHeader); } } else { peerAddress = SocketAddress.inetSocketAddress(port, host); } // Capture some stuff Handler<HttpConnection> h1 = connectionHandler; Handler<HttpConnection> h2 = client.connectionHandler(); Handler<HttpConnection> initializer; if (h1 != null) { if (h2 != null) { initializer = conn -> { h1.handle(conn); h2.handle(conn); }; } else { initializer = h1; } } else { initializer = h2; } ContextInternal connectCtx = vertx.getOrCreateContext(); // We defer actual connection until the first part of body is written or end is called // This gives the user an opportunity to set an exception handler before connecting so // they can capture any exceptions on connection connecting = true; client.getConnectionForRequest(connectCtx, peerAddress, ssl, server, ar1 -> { if (ar1.succeeded()) { HttpClientStream stream = ar1.result(); ContextInternal ctx = (ContextInternal) stream.getContext(); if (stream.id() == 1 && initializer != null) { ctx.executeFromIO(v -> { initializer.handle(stream.connection()); }); } // No need to synchronize as the thread is the same that set exceptionOccurred to true // exceptionOccurred=true getting the connection => it's a TimeoutException if (reset != null) { stream.reset(reset); } else { ctx.executeFromIO(v -> { connected(headersHandler, stream); }); } } else { connectCtx.executeFromIO(v -> { handleException(ar1.cause()); }); } }); } } private void connected(Handler<HttpVersion> headersHandler, HttpClientStream stream) { synchronized (this) { this.stream = stream; stream.beginRequest(this); // If anything was written or the request ended before we got the connection, then // we need to write it now if (pendingMaxSize != -1) { stream.doSetWriteQueueMaxSize(pendingMaxSize); } ByteBuf pending = null; Handler<AsyncResult<Void>> handler = null; if (pendingChunks != null) { List<Handler<AsyncResult<Void>>> handlers = pendingHandlers; pendingHandlers = null; pending = pendingChunks; pendingChunks = null; if (handlers != null) { handler = ar -> { handlers.forEach(h -> h.handle(ar)); }; } } stream.writeHead(method, rawMethod, uri, headers, hostHeader(), chunked, pending, ended, priority, continueHandler, handler); if (ended) { // we also need to write the head so optimize this and write all out in once stream.endRequest(); tryComplete(); } this.connecting = false; this.stream = stream; } if (headersHandler != null) { headersHandler.handle(stream.version()); } } @Override public void end(String chunk) { end(chunk, (Handler<AsyncResult<Void>>) null); } @Override public void end(String chunk, Handler<AsyncResult<Void>> handler) { end(Buffer.buffer(chunk), handler); } @Override public void end(String chunk, String enc) { end(chunk, enc, null); } @Override public void end(String chunk, String enc, Handler<AsyncResult<Void>> handler) { Objects.requireNonNull(enc, "no null encoding accepted"); end(Buffer.buffer(chunk, enc), handler); } @Override public void end(Buffer chunk) { write(chunk.getByteBuf(), true, null); } @Override public void end(Buffer chunk, Handler<AsyncResult<Void>> handler) { write(chunk.getByteBuf(), true, handler); } @Override public void end() { write(null, true, null); } @Override public void end(Handler<AsyncResult<Void>> handler) { write(null, true, handler); } @Override public HttpClientRequest write(Buffer chunk) { return write(chunk, null); } @Override public HttpClientRequest write(Buffer chunk, Handler<AsyncResult<Void>> handler) { ByteBuf buf = chunk.getByteBuf(); write(buf, false, handler); return this; } @Override public HttpClientRequest write(String chunk) { return write(chunk, (Handler<AsyncResult<Void>>) null); } @Override public HttpClientRequest write(String chunk, Handler<AsyncResult<Void>> handler) { write(Buffer.buffer(chunk).getByteBuf(), false, handler); return this; } @Override public HttpClientRequest write(String chunk, String enc) { return write(chunk, enc, null); } @Override public HttpClientRequest write(String chunk, String enc, Handler<AsyncResult<Void>> handler) { Objects.requireNonNull(enc, "no null encoding accepted"); write(Buffer.buffer(chunk, enc).getByteBuf(), false, handler); return this; } private boolean requiresContentLength() { return !chunked && (headers == null || !headers.contains(CONTENT_LENGTH)); } private void write(ByteBuf buff, boolean end, Handler<AsyncResult<Void>> completionHandler) { if (buff == null && !end) { return; } HttpClientStream s; synchronized (this) { checkEnded(); checkResponseHandler(); if (end) { if (buff != null && requiresContentLength()) { headers().set(CONTENT_LENGTH, String.valueOf(buff.readableBytes())); } } else if (requiresContentLength()) { throw new IllegalStateException("You must set the Content-Length header to be the total size of the message " + "body BEFORE sending any data if you are not using HTTP chunked encoding."); } ended |= end; if (stream == null) { if (buff != null) { if (pendingChunks == null) { pendingChunks = buff; } else { CompositeByteBuf pending; if (pendingChunks instanceof CompositeByteBuf) { pending = (CompositeByteBuf) pendingChunks; } else { pending = Unpooled.compositeBuffer(); pending.addComponent(true, pendingChunks); pendingChunks = pending; } pending.addComponent(true, buff); } if (completionHandler != null) { if (pendingHandlers == null) { pendingHandlers = new ArrayList<>(); } pendingHandlers.add(completionHandler); } } connect(null); return; } s = stream; } s.writeBuffer(buff, end, completionHandler); if (end) { s.endRequest(); tryComplete(); } } protected void checkEnded() { if (ended) { throw new IllegalStateException("Request already complete"); } } private void checkResponseHandler() { if (respHandler == null) { throw new IllegalStateException("You must set an handler for the HttpClientResponse before connecting"); } } synchronized Handler<HttpClientRequest> pushHandler() { return pushHandler; } @Override public synchronized HttpClientRequest setStreamPriority(StreamPriority priority) { synchronized (this) { if (stream != null) { stream.updatePriority(priority); } else { this.priority = priority; } } return this; } @Override public synchronized StreamPriority getStreamPriority() { HttpClientStream s = stream; return s != null ? s.priority() : priority; } }