/*
 * Copyright 2014 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel;

import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

A queue of write operations which are pending for later execution. It also updates the writability of the associated Channel, so that the pending write operations are also considered to determine the writability.
/** * A queue of write operations which are pending for later execution. It also updates the * {@linkplain Channel#isWritable() writability} of the associated {@link Channel}, so that * the pending write operations are also considered to determine the writability. */
public final class PendingWriteQueue { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PendingWriteQueue.class); // Assuming a 64-bit JVM: // - 16 bytes object header // - 4 reference fields // - 1 long fields private static final int PENDING_WRITE_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64); private final ChannelHandlerContext ctx; private final PendingBytesTracker tracker; // head and tail pointers for the linked-list structure. If empty head and tail are null. private PendingWrite head; private PendingWrite tail; private int size; private long bytes; public PendingWriteQueue(ChannelHandlerContext ctx) { tracker = PendingBytesTracker.newTracker(ctx.channel()); this.ctx = ctx; }
Returns true if there are no pending write operations left in this queue.
/** * Returns {@code true} if there are no pending write operations left in this queue. */
public boolean isEmpty() { assert ctx.executor().inEventLoop(); return head == null; }
Returns the number of pending write operations.
/** * Returns the number of pending write operations. */
public int size() { assert ctx.executor().inEventLoop(); return size; }
Returns the total number of bytes that are pending because of pending messages. This is only an estimate so it should only be treated as a hint.
/** * Returns the total number of bytes that are pending because of pending messages. This is only an estimate so * it should only be treated as a hint. */
public long bytes() { assert ctx.executor().inEventLoop(); return bytes; } private int size(Object msg) { // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, // we should add them to the queue and let removeAndFailAll() fail them later. int messageSize = tracker.size(msg); if (messageSize < 0) { // Size may be unknown so just use 0 messageSize = 0; } return messageSize + PENDING_WRITE_OVERHEAD; }
Add the given msg and ChannelPromise.
/** * Add the given {@code msg} and {@link ChannelPromise}. */
public void add(Object msg, ChannelPromise promise) { assert ctx.executor().inEventLoop(); if (msg == null) { throw new NullPointerException("msg"); } if (promise == null) { throw new NullPointerException("promise"); } // It is possible for writes to be triggered from removeAndFailAll(). To preserve ordering, // we should add them to the queue and let removeAndFailAll() fail them later. int messageSize = size(msg); PendingWrite write = PendingWrite.newInstance(msg, messageSize, promise); PendingWrite currentTail = tail; if (currentTail == null) { tail = head = write; } else { currentTail.next = write; tail = write; } size ++; bytes += messageSize; tracker.incrementPendingOutboundBytes(write.size); }
Remove all pending write operation and performs them via ChannelOutboundInvoker.write(Object, ChannelPromise).
Returns: ChannelFuture if something was written and null if the PendingWriteQueue is empty.
/** * Remove all pending write operation and performs them via * {@link ChannelHandlerContext#write(Object, ChannelPromise)}. * * @return {@link ChannelFuture} if something was written and {@code null} * if the {@link PendingWriteQueue} is empty. */
public ChannelFuture removeAndWriteAll() { assert ctx.executor().inEventLoop(); if (isEmpty()) { return null; } ChannelPromise p = ctx.newPromise(); PromiseCombiner combiner = new PromiseCombiner(); try { // It is possible for some of the written promises to trigger more writes. The new writes // will "revive" the queue, so we need to write them up until the queue is empty. for (PendingWrite write = head; write != null; write = head) { head = tail = null; size = 0; bytes = 0; while (write != null) { PendingWrite next = write.next; Object msg = write.msg; ChannelPromise promise = write.promise; recycle(write, false); if (!(promise instanceof VoidChannelPromise)) { combiner.add(promise); } ctx.write(msg, promise); write = next; } } combiner.finish(p); } catch (Throwable cause) { p.setFailure(cause); } assertEmpty(); return p; }
Remove all pending write operation and fail them with the given Throwable. The message will be released via ReferenceCountUtil.safeRelease(Object).
/** * Remove all pending write operation and fail them with the given {@link Throwable}. The message will be released * via {@link ReferenceCountUtil#safeRelease(Object)}. */
public void removeAndFailAll(Throwable cause) { assert ctx.executor().inEventLoop(); if (cause == null) { throw new NullPointerException("cause"); } // It is possible for some of the failed promises to trigger more writes. The new writes // will "revive" the queue, so we need to clean them up until the queue is empty. for (PendingWrite write = head; write != null; write = head) { head = tail = null; size = 0; bytes = 0; while (write != null) { PendingWrite next = write.next; ReferenceCountUtil.safeRelease(write.msg); ChannelPromise promise = write.promise; recycle(write, false); safeFail(promise, cause); write = next; } } assertEmpty(); }
Remove a pending write operation and fail it with the given Throwable. The message will be released via ReferenceCountUtil.safeRelease(Object).
/** * Remove a pending write operation and fail it with the given {@link Throwable}. The message will be released via * {@link ReferenceCountUtil#safeRelease(Object)}. */
public void removeAndFail(Throwable cause) { assert ctx.executor().inEventLoop(); if (cause == null) { throw new NullPointerException("cause"); } PendingWrite write = head; if (write == null) { return; } ReferenceCountUtil.safeRelease(write.msg); ChannelPromise promise = write.promise; safeFail(promise, cause); recycle(write, true); } private void assertEmpty() { assert tail == null && head == null && size == 0; }
Removes a pending write operation and performs it via ChannelOutboundInvoker.write(Object, ChannelPromise).
Returns: ChannelFuture if something was written and null if the PendingWriteQueue is empty.
/** * Removes a pending write operation and performs it via * {@link ChannelHandlerContext#write(Object, ChannelPromise)}. * * @return {@link ChannelFuture} if something was written and {@code null} * if the {@link PendingWriteQueue} is empty. */
public ChannelFuture removeAndWrite() { assert ctx.executor().inEventLoop(); PendingWrite write = head; if (write == null) { return null; } Object msg = write.msg; ChannelPromise promise = write.promise; recycle(write, true); return ctx.write(msg, promise); }
Removes a pending write operation and release it's message via ReferenceCountUtil.safeRelease(Object).
Returns: ChannelPromise of the pending write or null if the queue is empty.
/** * Removes a pending write operation and release it's message via {@link ReferenceCountUtil#safeRelease(Object)}. * * @return {@link ChannelPromise} of the pending write or {@code null} if the queue is empty. * */
public ChannelPromise remove() { assert ctx.executor().inEventLoop(); PendingWrite write = head; if (write == null) { return null; } ChannelPromise promise = write.promise; ReferenceCountUtil.safeRelease(write.msg); recycle(write, true); return promise; }
Return the current message or null if empty.
/** * Return the current message or {@code null} if empty. */
public Object current() { assert ctx.executor().inEventLoop(); PendingWrite write = head; if (write == null) { return null; } return write.msg; } private void recycle(PendingWrite write, boolean update) { final PendingWrite next = write.next; final long writeSize = write.size; if (update) { if (next == null) { // Handled last PendingWrite so rest head and tail // Guard against re-entrance by directly reset head = tail = null; size = 0; bytes = 0; } else { head = next; size --; bytes -= writeSize; assert size > 0 && bytes >= 0; } } write.recycle(); tracker.decrementPendingOutboundBytes(writeSize); } private static void safeFail(ChannelPromise promise, Throwable cause) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); } }
Holds all meta-data and construct the linked-list structure.
/** * Holds all meta-data and construct the linked-list structure. */
static final class PendingWrite { private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() { @Override protected PendingWrite newObject(Handle<PendingWrite> handle) { return new PendingWrite(handle); } }; private final Recycler.Handle<PendingWrite> handle; private PendingWrite next; private long size; private ChannelPromise promise; private Object msg; private PendingWrite(Recycler.Handle<PendingWrite> handle) { this.handle = handle; } static PendingWrite newInstance(Object msg, int size, ChannelPromise promise) { PendingWrite write = RECYCLER.get(); write.size = size; write.msg = msg; write.promise = promise; return write; } private void recycle() { size = 0; next = null; msg = null; promise = null; handle.recycle(this); } } }