/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.netty.stream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http2.*;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
Implementation of Http2EventAdapter
that allows streaming requests for servers and responses for clients by establishing a processor that emits chunks as HttpContent
. This implementation does not buffer the data. If you need data buffering a FlowControlHandler
can be placed after this implementation so that downstream handlers can control flow. Based on code in InboundHttp2ToHttpAdapter
. Author: graemerocher Since: 2.0
/**
* Implementation of {@link Http2EventAdapter} that allows streaming requests for servers and responses for clients by
* establishing a processor that emits chunks as {@link HttpContent}.
*
* This implementation does not buffer the data. If you need data buffering a {@link io.netty.handler.flow.FlowControlHandler}
* can be placed after this implementation so that downstream handlers can control flow.
*
* Based on code in {@link InboundHttp2ToHttpAdapter}.
*
* @author graemerocher
* @since 2.0
*/
public class StreamingInboundHttp2ToHttpAdapter extends Http2EventAdapter {
protected final Http2Connection connection;
protected final boolean validateHttpHeaders;
private final int maxContentLength;
private final Http2Connection.PropertyKey messageKey;
private final boolean propagateSettings;
private final Http2Connection.PropertyKey dataReadKey;
Default constructor.
Params: - connection – The connection
- maxContentLength – The max content length
- validateHttpHeaders – Whether to validate headers
- propagateSettings – Whether to propagate settings
/**
* Default constructor.
* @param connection The connection
* @param maxContentLength The max content length
* @param validateHttpHeaders Whether to validate headers
* @param propagateSettings Whether to propagate settings
*/
public StreamingInboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
boolean validateHttpHeaders, boolean propagateSettings) {
if (maxContentLength <= 0) {
throw new IllegalArgumentException("maxContentLength: " + maxContentLength + " (expected: > 0)");
}
this.connection = checkNotNull(connection, "connection");
this.maxContentLength = maxContentLength;
this.validateHttpHeaders = validateHttpHeaders;
this.propagateSettings = propagateSettings;
messageKey = connection.newKey();
dataReadKey = connection.newKey();
}
Default constructor.
Params: - connection – The connection
- maxContentLength – The max content length
/**
* Default constructor.
* @param connection The connection
* @param maxContentLength The max content length
*/
public StreamingInboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength) {
this(connection, maxContentLength, true, true);
}
The stream is out of scope for the HTTP message flow and will no longer be tracked.
Params: - stream – The stream to remove associated state with
/**
* The stream is out of scope for the HTTP message flow and will no longer be tracked.
*
* @param stream The stream to remove associated state with
*/
protected final void removeMessage(Http2Stream stream) {
stream.removeProperty(messageKey);
}
Get the FullHttpMessage
associated with stream
. Params: - stream – The stream to get the associated state from
Returns: The FullHttpMessage
associated with stream
.
/**
* Get the {@link FullHttpMessage} associated with {@code stream}.
* @param stream The stream to get the associated state from
* @return The {@link FullHttpMessage} associated with {@code stream}.
*/
protected final HttpMessage getMessage(Http2Stream stream) {
return (HttpMessage) stream.getProperty(messageKey);
}
Make message
be the state associated with stream
. Params: - stream – The stream which
message
is associated with. - message – The message which contains the HTTP semantics.
/**
* Make {@code message} be the state associated with {@code stream}.
* @param stream The stream which {@code message} is associated with.
* @param message The message which contains the HTTP semantics.
*/
protected final void putMessage(Http2Stream stream, HttpMessage message) {
// reset the data read key
stream.setProperty(dataReadKey, new AtomicInteger(0));
stream.setProperty(messageKey, message);
}
@Override
public void onStreamRemoved(Http2Stream stream) {
removeMessage(stream);
}
fire a channel read event.
Params: - ctx – The context to fire the event on
- msg – The message to send
- stream – the stream of the message which is being fired
/**
* fire a channel read event.
*
* @param ctx The context to fire the event on
* @param msg The message to send
* @param stream the stream of the message which is being fired
*/
protected void fireChannelRead(ChannelHandlerContext ctx, HttpContent msg,
Http2Stream stream) {
ctx.fireChannelRead(msg);
}
fire a channel read event.
Params: - ctx – The context to fire the event on
- msg – The message to send
- stream – the stream of the message which is being fired
/**
* fire a channel read event.
*
* @param ctx The context to fire the event on
* @param msg The message to send
* @param stream the stream of the message which is being fired
*/
protected void fireChannelRead(ChannelHandlerContext ctx, HttpMessage msg,
Http2Stream stream) {
if (connection.isServer()) {
// the event has to come after the flow control handler to avoid buffering
// there may be a better way to do this.
final ChannelHandlerContext context = ctx.pipeline().context("flow-control-handler");
if (context != null) {
context.fireChannelRead(msg);
} else {
ctx.fireChannelRead(msg);
}
} else {
ctx.fireChannelRead(msg);
}
}
Create a new FullHttpMessage
based upon the current connection parameters. Params: - ctx – The channel context
- stream – The stream to create a message for
- headers – The headers associated with
stream
- validateHttpHeaders –
true
to validate HTTP headers in the http-codec
false
not to validate HTTP headers in the http-codec
Throws: - Http2Exception – thrown if an error occurs creating the request
Returns: A new StreamedHttpMessage
/**
* Create a new {@link FullHttpMessage} based upon the current connection parameters.
*
*
* @param ctx The channel context
* @param stream The stream to create a message for
* @param headers The headers associated with {@code stream}
* @param validateHttpHeaders
* <ul>
* <li>{@code true} to validate HTTP headers in the http-codec</li>
* <li>{@code false} not to validate HTTP headers in the http-codec</li>
* </ul>
* @throws Http2Exception thrown if an error occurs creating the request
* @return A new {@link StreamedHttpMessage}
*/
protected HttpMessage newMessage(
ChannelHandlerContext ctx,
Http2Stream stream,
Http2Headers headers,
boolean validateHttpHeaders)
throws Http2Exception {
return connection.isServer() ? HttpConversionUtil.toHttpRequest(stream.id(), headers, validateHttpHeaders) :
HttpConversionUtil.toHttpResponse(stream.id(), headers, validateHttpHeaders);
}
Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
is in a valid state for additional headers.
Params: - ctx – The context for which this message has been received.
Used to send informational header if detected.
- stream – The stream the
headers
apply to - headers – The headers to process
- allowAppend –
true
if headers will be appended if the stream already exists.
- if
false
and the stream already exists this method returns null
.
- appendToTrailer –
true
if a message stream
already exists then the headers should be added to the trailing headers.
false
then appends will be done to the initial headers.
Throws: - Http2Exception – If the stream id is not in the correct state to process the headers request
Returns: The object used to track the stream corresponding to stream
. null
if allowAppend
is false
and the stream already exists.
/**
* Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
* is in a valid state for additional headers.
*
* @param ctx The context for which this message has been received.
* Used to send informational header if detected.
* @param stream The stream the {@code headers} apply to
* @param headers The headers to process
* @param allowAppend
* <ul>
* <li>{@code true} if headers will be appended if the stream already exists.</li>
* <li>if {@code false} and the stream already exists this method returns {@code null}.</li>
* </ul>
* @param appendToTrailer
* <ul>
* <li>{@code true} if a message {@code stream} already exists then the headers
* should be added to the trailing headers.</li>
* <li>{@code false} then appends will be done to the initial headers.</li>
* </ul>
* @return The object used to track the stream corresponding to {@code stream}. {@code null} if
* {@code allowAppend} is {@code false} and the stream already exists.
* @throws Http2Exception If the stream id is not in the correct state to process the headers request
*/
protected HttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
boolean allowAppend, boolean appendToTrailer) throws Http2Exception {
HttpMessage msg = getMessage(stream);
if (msg == null) {
msg = newMessage(ctx, stream, headers, validateHttpHeaders);
putMessage(stream, msg);
} else if (allowAppend) {
HttpConversionUtil.addHttp2ToHttpHeaders(
stream.id(),
headers,
msg.headers(),
HttpVersion.HTTP_1_1,
appendToTrailer,
msg instanceof HttpRequest
);
} else {
msg = null;
}
return msg;
}
After HTTP/2 headers have been processed by processHeadersBegin
this method either sends the result up the pipeline or retains the message for future processing. Params: - ctx – The context for which this message has been received
- stream – The stream the
objAccumulator
corresponds to - msg – The object which represents all headers/data for corresponding to
stream
- endOfStream –
true
if this is the last event for the stream
/**
* After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
* sends the result up the pipeline or retains the message for future processing.
*
* @param ctx The context for which this message has been received
* @param stream The stream the {@code objAccumulator} corresponds to
* @param msg The object which represents all headers/data for corresponding to {@code stream}
* @param endOfStream {@code true} if this is the last event for the stream
*/
private void processHeadersEnd(
ChannelHandlerContext ctx,
Http2Stream stream,
HttpMessage msg,
boolean endOfStream) {
if (endOfStream) {
if (connection.isServer()) {
HttpRequest existing = (HttpRequest) msg;
msg = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
existing.method(),
existing.uri(),
Unpooled.EMPTY_BUFFER,
existing.headers(),
EmptyHttpHeaders.INSTANCE);
} else {
HttpResponse existing = (HttpResponse) msg;
msg = new DefaultFullHttpResponse(
existing.protocolVersion(),
existing.status(),
Unpooled.EMPTY_BUFFER,
existing.headers(),
EmptyHttpHeaders.INSTANCE
);
}
// no more data after headers to just fire as a regular http request
HttpUtil.setContentLength(msg, 0);
fireChannelRead(ctx, msg, stream);
} else {
if (!msg.headers().contains(HttpHeaderNames.CONTENT_LENGTH)) {
HttpUtil.setTransferEncodingChunked(msg, true);
}
fireChannelRead(ctx, msg, stream);
}
}
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
HttpMessage msg = getMessage(stream);
if (msg == null) {
throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
}
AtomicInteger dataRead = getDataRead(stream);
final int dataReadableBytes = data.readableBytes();
final int readSoFar = dataRead.getAndAdd(dataReadableBytes);
if (readSoFar > maxContentLength - dataReadableBytes) {
throw connectionError(INTERNAL_ERROR,
"Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
}
if (endOfStream) {
// end of stream, emits a LastHttpContent
// will be released by HttpStreamsHandler
if (dataReadableBytes > 0) {
final DefaultLastHttpContent content = new DefaultLastHttp2Content(data.retain(), stream);
fireChannelRead(ctx, content, stream);
} else {
fireChannelRead(ctx, new DefaultLastHttp2Content(Unpooled.EMPTY_BUFFER, stream), stream);
}
} else {
// will be released by HttpStreamsHandler
final DefaultHttp2Content content = new DefaultHttp2Content(data.retain(), stream);
fireChannelRead(ctx, content, stream);
}
// All bytes have been processed.
return dataReadableBytes + padding;
}
private AtomicInteger getDataRead(Http2Stream stream) {
final Object demand = stream.getProperty(dataReadKey);
if (demand instanceof AtomicInteger) {
return ((AtomicInteger) demand);
} else {
final AtomicInteger newValue = new AtomicInteger(0);
stream.setProperty(dataReadKey, newValue);
return newValue;
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
HttpMessage msg =
processHeadersBegin(ctx, stream, headers, true, true);
if (msg != null) {
processHeadersEnd(ctx, stream, msg, endOfStream);
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = connection.stream(streamId);
HttpMessage msg = processHeadersBegin(ctx, stream, headers, true, true);
if (msg != null) {
// Add headers for dependency and weight.
// See https://github.com/netty/netty/issues/5866
if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
streamDependency);
}
msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
processHeadersEnd(ctx, stream, msg, endOfStream);
}
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
Http2Stream stream = connection.stream(streamId);
HttpMessage msg = getMessage(stream);
if (msg != null) {
onRstStreamRead(stream, msg);
}
// discard stream since it has been reset
stream.close();
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
// A push promise should not be allowed to add headers to an existing stream
Http2Stream promisedStream = connection.stream(promisedStreamId);
if (headers.status() == null) {
// A PUSH_PROMISE frame has no Http response status.
// https://tools.ietf.org/html/rfc7540#section-8.2.1
// Server push is semantically equivalent to a server responding to a
// request; however, in this case, that request is also sent by the
// server, as a PUSH_PROMISE frame.
headers.status(OK.codeAsText());
}
HttpMessage msg = processHeadersBegin(ctx, promisedStream, headers, false, false);
if (msg == null) {
throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
promisedStreamId);
}
msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
processHeadersEnd(ctx, promisedStream, msg, false);
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
if (propagateSettings) {
// Provide an interface for non-listeners to capture settings
ctx.fireChannelRead(settings);
}
}
Called if a RST_STREAM
is received but we have some data for that stream. Params: - stream – The stream
- msg – The message
/**
* Called if a {@code RST_STREAM} is received but we have some data for that stream.
*
* @param stream The stream
* @param msg The message
*/
protected void onRstStreamRead(Http2Stream stream, HttpMessage msg) {
removeMessage(stream);
}
}