/*
 * Copyright 2014 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package io.netty.handler.codec.http2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.util.internal.UnstableApi;

import java.util.ArrayDeque;

import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Integer.MAX_VALUE;
import static java.lang.Math.min;

Default implementation of Http2ConnectionEncoder.
/** * Default implementation of {@link Http2ConnectionEncoder}. */
@UnstableApi public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { private final Http2FrameWriter frameWriter; private final Http2Connection connection; private Http2LifecycleManager lifecycleManager; // We prefer ArrayDeque to LinkedList because later will produce more GC. // This initial capacity is plenty for SETTINGS traffic. private final ArrayDeque<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4); public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) { this.connection = checkNotNull(connection, "connection"); this.frameWriter = checkNotNull(frameWriter, "frameWriter"); if (connection.remote().flowController() == null) { connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection)); } } @Override public void lifecycleManager(Http2LifecycleManager lifecycleManager) { this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager"); } @Override public Http2FrameWriter frameWriter() { return frameWriter; } @Override public Http2Connection connection() { return connection; } @Override public final Http2RemoteFlowController flowController() { return connection().remote().flowController(); } @Override public void remoteSettings(Http2Settings settings) throws Http2Exception { Boolean pushEnabled = settings.pushEnabled(); Http2FrameWriter.Configuration config = configuration(); Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration(); Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy(); if (pushEnabled != null) { if (!connection.isServer() && pushEnabled) { throw connectionError(PROTOCOL_ERROR, "Client received a value of ENABLE_PUSH specified to other than 0"); } connection.remote().allowPushTo(pushEnabled); } Long maxConcurrentStreams = settings.maxConcurrentStreams(); if (maxConcurrentStreams != null) { connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE)); } Long headerTableSize = settings.headerTableSize(); if (headerTableSize != null) { outboundHeaderConfig.maxHeaderTableSize((int) min(headerTableSize, MAX_VALUE)); } Long maxHeaderListSize = settings.maxHeaderListSize(); if (maxHeaderListSize != null) { outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize); } Integer maxFrameSize = settings.maxFrameSize(); if (maxFrameSize != null) { outboundFrameSizePolicy.maxFrameSize(maxFrameSize); } Integer initialWindowSize = settings.initialWindowSize(); if (initialWindowSize != null) { flowController().initialWindowSize(initialWindowSize); } } @Override public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding, final boolean endOfStream, ChannelPromise promise) { final Http2Stream stream; try { stream = requireStream(streamId); // Verify that the stream is in the appropriate state for sending DATA frames. switch (stream.state()) { case OPEN: case HALF_CLOSED_REMOTE: // Allowed sending DATA frames in these states. break; default: throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state()); } } catch (Throwable e) { data.release(); return promise.setFailure(e); } // Hand control of the frame to the flow controller. flowController().addFlowControlled(stream, new FlowControlledData(stream, data, padding, endOfStream, promise)); return promise; } @Override public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise) { return writeHeaders(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endStream, promise); } private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer, boolean endOfStream) { boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL; if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) { throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream); } return isInformational; } @Override public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers, final int streamDependency, final short weight, final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) { try { Http2Stream stream = connection.stream(streamId); if (stream == null) { try { stream = connection.local().createStream(streamId, endOfStream); } catch (Http2Exception cause) { if (connection.remote().mayHaveCreatedStream(streamId)) { promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause)); return promise; } throw cause; } } else { switch (stream.state()) { case RESERVED_LOCAL: stream.open(endOfStream); break; case OPEN: case HALF_CLOSED_REMOTE: // Allowed sending headers in these states. break; default: throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state()); } } // Trailing headers must go through flow control if there are other frames queued in flow control // for this stream. Http2RemoteFlowController flowController = flowController(); if (!endOfStream || !flowController.hasFlowControlled(stream)) { boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream); if (endOfStream) { final Http2Stream finalStream = stream; final ChannelFutureListener closeStreamLocalListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lifecycleManager.closeStreamLocal(finalStream, future); } }; promise = promise.unvoid().addListener(closeStreamLocalListener); } ChannelFuture future = frameWriter.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, padding, endOfStream, promise); // Writing headers may fail during the encode state if they violate HPACK limits. Throwable failureCause = future.cause(); if (failureCause == null) { // Synchronously set the headersSent flag to ensure that we do not subsequently write // other headers containing pseudo-header fields. // // This just sets internal stream state which is used elsewhere in the codec and doesn't // necessarily mean the write will complete successfully. stream.headersSent(isInformational); if (!future.isSuccess()) { // Either the future is not done or failed in the meantime. notifyLifecycleManagerOnError(future, ctx); } } else { lifecycleManager.onError(ctx, true, failureCause); } return future; } else { // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames. flowController.addFlowControlled(stream, new FlowControlledHeaders(stream, headers, streamDependency, weight, exclusive, padding, true, promise)); return promise; } } catch (Throwable t) { lifecycleManager.onError(ctx, true, t); promise.tryFailure(t); return promise; } } @Override public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive, ChannelPromise promise) { return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise); } @Override public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) { // Delegate to the lifecycle manager for proper updating of connection state. return lifecycleManager.resetStream(ctx, streamId, errorCode, promise); } @Override public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings, ChannelPromise promise) { outstandingLocalSettingsQueue.add(settings); try { Boolean pushEnabled = settings.pushEnabled(); if (pushEnabled != null && connection.isServer()) { throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified"); } } catch (Throwable e) { return promise.setFailure(e); } return frameWriter.writeSettings(ctx, settings, promise); } @Override public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) { return frameWriter.writeSettingsAck(ctx, promise); } @Override public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { return frameWriter.writePing(ctx, ack, data, promise); } @Override public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding, ChannelPromise promise) { try { if (connection.goAwayReceived()) { throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received."); } Http2Stream stream = requireStream(streamId); // Reserve the promised stream. connection.local().reservePushStream(promisedStreamId, stream); ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise); // Writing headers may fail during the encode state if they violate HPACK limits. Throwable failureCause = future.cause(); if (failureCause == null) { // This just sets internal stream state which is used elsewhere in the codec and doesn't // necessarily mean the write will complete successfully. stream.pushPromiseSent(); if (!future.isSuccess()) { // Either the future is not done or failed in the meantime. notifyLifecycleManagerOnError(future, ctx); } } else { lifecycleManager.onError(ctx, true, failureCause); } return future; } catch (Throwable t) { lifecycleManager.onError(ctx, true, t); promise.tryFailure(t); return promise; } } @Override public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData, ChannelPromise promise) { return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise); } @Override public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" + " objects to control window sizes")); } @Override public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload, ChannelPromise promise) { return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise); } @Override public void close() { frameWriter.close(); } @Override public Http2Settings pollSentSettings() { return outstandingLocalSettingsQueue.poll(); } @Override public Configuration configuration() { return frameWriter.configuration(); } private Http2Stream requireStream(int streamId) { Http2Stream stream = connection.stream(streamId); if (stream == null) { final String message; if (connection.streamMayHaveExisted(streamId)) { message = "Stream no longer exists: " + streamId; } else { message = "Stream does not exist: " + streamId; } throw new IllegalArgumentException(message); } return stream; }
Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the size calculation deterministic thereby greatly simplifying the implementation.

If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that the passed promise is not completed until last frame write.

/** * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the * {@link #size} calculation deterministic thereby greatly simplifying the implementation. * <p> * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that * the passed promise is not completed until last frame write. * </p> */
private final class FlowControlledData extends FlowControlledBase { private final CoalescingBufferQueue queue; private int dataSize; FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream, ChannelPromise promise) { super(stream, padding, endOfStream, promise); queue = new CoalescingBufferQueue(promise.channel()); queue.add(buf, promise); dataSize = queue.readableBytes(); } @Override public int size() { return dataSize + padding; } @Override public void error(ChannelHandlerContext ctx, Throwable cause) { queue.releaseAndFailAll(cause); // Don't update dataSize because we need to ensure the size() method returns a consistent size even after // error so we don't invalidate flow control when returning bytes to flow control. lifecycleManager.onError(ctx, true, cause); } @Override public void write(ChannelHandlerContext ctx, int allowedBytes) { int queuedData = queue.readableBytes(); if (!endOfStream) { if (queuedData == 0) { // There's no need to write any data frames because there are only empty data frames in the queue // and it is not end of stream yet. Just complete their promises by getting the buffer corresponding // to 0 bytes and writing it to the channel (to preserve notification order). ChannelPromise writePromise = ctx.newPromise().addListener(this); ctx.write(queue.remove(0, writePromise), writePromise); return; } if (allowedBytes == 0) { return; } } // Determine how much data to write. int writableData = min(queuedData, allowedBytes); ChannelPromise writePromise = ctx.newPromise().addListener(this); ByteBuf toWrite = queue.remove(writableData, writePromise); dataSize = queue.readableBytes(); // Determine how much padding to write. int writablePadding = min(allowedBytes - writableData, padding); padding -= writablePadding; // Write the frame(s). frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding, endOfStream && size() == 0, writePromise); } @Override public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) { FlowControlledData nextData; if (FlowControlledData.class != next.getClass() || MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) { return false; } nextData.queue.copyTo(queue); dataSize = queue.readableBytes(); // Given that we're merging data into a frame it doesn't really make sense to accumulate padding. padding = Math.max(padding, nextData.padding); endOfStream = nextData.endOfStream; return true; } } private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) { future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { lifecycleManager.onError(ctx, true, cause); } } }); }
Wrap headers so they can be written subject to flow-control. While headers do not have cost against the flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is blocked on flow-control a HEADER frame must wait until this frame has been written.
/** * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is * blocked on flow-control a HEADER frame must wait until this frame has been written. */
private final class FlowControlledHeaders extends FlowControlledBase { private final Http2Headers headers; private final int streamDependency; private final short weight; private final boolean exclusive; FlowControlledHeaders(Http2Stream stream, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream, ChannelPromise promise) { super(stream, padding, endOfStream, promise); this.headers = headers; this.streamDependency = streamDependency; this.weight = weight; this.exclusive = exclusive; } @Override public int size() { return 0; } @Override public void error(ChannelHandlerContext ctx, Throwable cause) { if (ctx != null) { lifecycleManager.onError(ctx, true, cause); } promise.tryFailure(cause); } @Override public void write(ChannelHandlerContext ctx, int allowedBytes) { boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream); if (promise.isVoid()) { promise = ctx.newPromise(); } promise.addListener(this); ChannelFuture f = frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, padding, endOfStream, promise); // Writing headers may fail during the encode state if they violate HPACK limits. Throwable failureCause = f.cause(); if (failureCause == null) { // This just sets internal stream state which is used elsewhere in the codec and doesn't // necessarily mean the write will complete successfully. stream.headersSent(isInformational); } } @Override public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) { return false; } }
Common base type for payloads to deliver via flow-control.
/** * Common base type for payloads to deliver via flow-control. */
public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled, ChannelFutureListener { protected final Http2Stream stream; protected ChannelPromise promise; protected boolean endOfStream; protected int padding; FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream, final ChannelPromise promise) { if (padding < 0) { throw new IllegalArgumentException("padding must be >= 0"); } this.padding = padding; this.endOfStream = endOfStream; this.stream = stream; this.promise = promise; } @Override public void writeComplete() { if (endOfStream) { lifecycleManager.closeStreamLocal(stream, promise); } } @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { error(flowController().channelHandlerContext(), future.cause()); } } } }