/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.netty.stream;
import io.micronaut.core.annotation.Internal;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.netty.channel.ChannelPipelineCustomizer;
import io.micronaut.http.netty.reactive.CancelledSubscriber;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.*;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Handler that converts written StreamedHttpRequest
messages into HttpRequest
messages followed by HttpContent
messages and reads HttpResponse
messages followed by HttpContent
messages and produces StreamedHttpResponse
messages.
This allows request and response bodies to be handled using reactive streams.
There are two types of messages that this handler accepts for writing, StreamedHttpRequest
and FullHttpRequest
. Writing any other messages may potentially lead to HTTP message mangling.
There are two types of messages that this handler will send down the chain, StreamedHttpResponse
, and FullHttpResponse
. If ChannelOption.AUTO_READ
is false for the channel, then any StreamedHttpResponse
messages must be subscribed to consume the body, otherwise
it's possible that no read will be done of the messages.
As long as messages are returned in the order that they arrive, this handler implicitly supports HTTP
pipelining.
Author: jroper, Graeme Rocher Since: 1.0
/**
* Handler that converts written {@link StreamedHttpRequest} messages into {@link HttpRequest} messages
* followed by {@link HttpContent} messages and reads {@link HttpResponse} messages followed by
* {@link HttpContent} messages and produces {@link StreamedHttpResponse} messages.
* <p>
* This allows request and response bodies to be handled using reactive streams.
* <p>
* There are two types of messages that this handler accepts for writing, {@link StreamedHttpRequest} and
* {@link io.netty.handler.codec.http.FullHttpRequest}. Writing any other messages may potentially lead to HTTP message mangling.
* <p>
* There are two types of messages that this handler will send down the chain, {@link StreamedHttpResponse},
* and {@link FullHttpResponse}. If {@link io.netty.channel.ChannelOption#AUTO_READ} is false for the channel,
* then any {@link StreamedHttpResponse} messages <em>must</em> be subscribed to consume the body, otherwise
* it's possible that no read will be done of the messages.
* <p>
* As long as messages are returned in the order that they arrive, this handler implicitly supports HTTP
* pipelining.
*
* @author jroper
* @author Graeme Rocher
* @since 1.0
*/
@Internal
public class HttpStreamsClientHandler extends HttpStreamsHandler<HttpResponse, HttpRequest> {
private int inFlight = 0;
private int withServer = 0;
private ChannelPromise closeOnZeroInFlight = null;
private Subscriber<HttpContent> awaiting100Continue;
private StreamedHttpMessage awaiting100ContinueMessage;
private boolean ignoreResponseBody = false;
Default constructor.
/**
* Default constructor.
*/
public HttpStreamsClientHandler() {
super(HttpResponse.class, HttpRequest.class);
}
@Override
protected boolean hasBody(HttpResponse response) {
if (response.status().code() >= HttpStatus.CONTINUE.getCode() && response.status().code() < HttpStatus.OK.getCode()) {
return false;
}
if (response.status().equals(HttpResponseStatus.NO_CONTENT) ||
response.status().equals(HttpResponseStatus.NOT_MODIFIED)) {
return false;
}
if (HttpUtil.isTransferEncodingChunked(response)) {
return true;
}
if (HttpUtil.isContentLengthSet(response)) {
return HttpUtil.getContentLength(response) > 0;
}
return true;
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
if (inFlight == 0) {
ctx.close(future);
} else {
closeOnZeroInFlight = future;
}
}
@Override
protected void consumedInMessage(ChannelHandlerContext ctx) {
inFlight--;
withServer--;
if (inFlight == 0 && closeOnZeroInFlight != null) {
ctx.close(closeOnZeroInFlight);
}
}
@Override
protected void receivedOutMessage(ChannelHandlerContext ctx) {
inFlight++;
}
@Override
protected void sentOutMessage(ChannelHandlerContext ctx) {
withServer++;
}
@Override
protected HttpResponse createEmptyMessage(HttpResponse response) {
return new EmptyHttpResponse(response);
}
@Override
protected HttpResponse createStreamedMessage(HttpResponse response, Publisher<? extends HttpContent> stream) {
return new DelegateStreamedHttpResponse(response, stream);
}
@Override
protected void subscribeSubscriberToStream(StreamedHttpMessage msg, Subscriber<HttpContent> subscriber) {
if (HttpUtil.is100ContinueExpected(msg)) {
awaiting100Continue = subscriber;
awaiting100ContinueMessage = msg;
} else {
super.subscribeSubscriberToStream(msg, subscriber);
}
}
@Override
protected final boolean isClient() {
return true;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse && awaiting100Continue != null && withServer == 0) {
HttpResponse response = (HttpResponse) msg;
if (response.status().equals(HttpResponseStatus.CONTINUE)) {
super.subscribeSubscriberToStream(awaiting100ContinueMessage, awaiting100Continue);
awaiting100Continue = null;
awaiting100ContinueMessage = null;
if (msg instanceof FullHttpResponse) {
ReferenceCountUtil.release(msg);
} else {
ignoreResponseBody = true;
}
} else {
awaiting100ContinueMessage.subscribe(new CancelledSubscriber<>());
awaiting100ContinueMessage = null;
awaiting100Continue.onSubscribe(new Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
});
awaiting100Continue.onComplete();
awaiting100Continue = null;
super.channelRead(ctx, msg);
}
} else if (ignoreResponseBody && msg instanceof HttpContent) {
ReferenceCountUtil.release(msg);
if (msg instanceof LastHttpContent) {
ignoreResponseBody = false;
}
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void write(final ChannelHandlerContext ctx, Object msg, final ChannelPromise promise) throws Exception {
if (ctx.channel().attr(AttributeKey.valueOf(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK)).get() == Boolean.TRUE) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}