/*
* Copyright (c) 2014, 2017 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.grizzly.http.server.util;
import java.io.IOException;
import java.util.Arrays;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Context;
import org.glassfish.grizzly.IOEventLifeCycleListener;
import org.glassfish.grizzly.ThreadCache;
import org.glassfish.grizzly.asyncqueue.MessageCloner;
import org.glassfish.grizzly.asyncqueue.WritableMessage;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.attributes.AttributeBuilder;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.filterchain.TransportFilter;
import org.glassfish.grizzly.http.server.AddOn;
import org.glassfish.grizzly.http.server.NetworkListener;
import org.glassfish.grizzly.memory.CompositeBuffer;
The plugin, that optimizes processing of pipelined HTTP requests by
buffering HTTP responses and then writing them as one operation.
Please note, this addon is not thread-safe, so it can't be used with HTTP
requests, that require asynchronous processing.
Author: Alexey Stashok
/**
* The plugin, that optimizes processing of pipelined HTTP requests by
* buffering HTTP responses and then writing them as one operation.
*
* Please note, this addon is not thread-safe, so it can't be used with HTTP
* requests, that require asynchronous processing.
*
* @author Alexey Stashok
*/
public class HttpPipelineOptAddOn implements AddOn {
private static final int DEFAULT_MAX_BUFFER_SIZE = 16384;
max number of response bytes to buffer before flush
/**
* max number of response bytes to buffer before flush
*/
private final int maxBufferSize;
Constructs HttpPipelineOptAddOn.
/**
* Constructs <tt>HttpPipelineOptAddOn</tt>.
*/
public HttpPipelineOptAddOn() {
this(DEFAULT_MAX_BUFFER_SIZE);
}
Constructs HttpPipelineOptAddOn.
Params: - maxBufferSize – the max number of response bytes to buffer before flush
/**
* Constructs <tt>HttpPipelineOptAddOn</tt>.
*
* @param maxBufferSize the max number of response bytes to buffer before flush
*/
public HttpPipelineOptAddOn(final int maxBufferSize) {
this.maxBufferSize = maxBufferSize;
}
@Override
public void setup(final NetworkListener networkListener,
final FilterChainBuilder builder) {
final int tfIdx = builder.indexOfType(TransportFilter.class);
builder.add(tfIdx + 1,
new PlugFilter(maxBufferSize,
networkListener.getTransport().getAttributeBuilder()));
}
The filter, that works as a plug in the FilterChain, and buffers output
data before passing it down to a transport filter, which will write
the data to network.
/**
* The filter, that works as a plug in the FilterChain, and buffers output
* data before passing it down to a transport filter, which will write
* the data to network.
*/
private static class PlugFilter extends BaseFilter {
private final Attribute<Plug> plugAttr;
private final int maxBufferSize;
public PlugFilter(final int maxBufferSize, final AttributeBuilder builder) {
this.maxBufferSize = maxBufferSize;
plugAttr = builder.createAttribute(PlugFilter.class + ".plug");
}
@Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
// blocking mode means this read is initiated from HttpHandler
if (!ctx.getTransportContext().isBlocking()) {
// create a plug for this FilterChainContext
final Plug plug = Plug.create(ctx, this);
ctx.getInternalContext().addLifeCycleListener(plug);
plugAttr.set(ctx, plug);
}
return ctx.getInvokeAction();
}
@Override
@SuppressWarnings("unchecked")
public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
final Plug plug = plugAttr.get(ctx);
// check if the output plug is installed
if (plug != null && plug.isPlugged) {
final WritableMessage msg = ctx.getMessage();
// check if the message could be appended
if (!msg.isExternal()) {
final Buffer buf = (Buffer) msg;
// if there's MessageCloner - call it,
// because the caller is not aware of buffering and will expect
// some result (either buffer is written or queued),
// so we notify the caller, that the buffer is queued
final MessageCloner<Buffer> cloner = ctx.getTransportContext().getMessageCloner();
plug.append(cloner == null
? buf
: cloner.clone(ctx.getConnection(), buf),
ctx.getTransportContext().getCompletionHandler());
// if we buffered more than max - flush
if (plug.size() > maxBufferSize) {
plug.flush();
}
return ctx.getStopAction();
} else {
plug.flush();
}
}
return ctx.getInvokeAction();
}
public static class Plug extends IOEventLifeCycleListener.Adapter {
private static final ThreadCache.CachedTypeIndex<Plug> CACHE_IDX
= ThreadCache.obtainIndex(Plug.class, 4);
public static Plug create(final FilterChainContext ctx,
final PlugFilter plugFilter) {
Plug plug = ThreadCache.takeFromCache(CACHE_IDX);
if (plug == null) {
plug = new Plug();
}
return plug.init(ctx, plugFilter);
}
// if this cloner is not called - it means the message has reached the network
// in the current thread
private final MessageCloner<Buffer> cloner = new MessageCloner<Buffer>() {
@Override
public Buffer clone(final Connection connection,
final Buffer originalMessage) {
isWrittenInThisThread = false;
return originalMessage;
}
};
// the copy of the original FilterChainContext to be used to flush data
private FilterChainContext ctx;
private PlugFilter plugFilter;
private CompositeBuffer buffer;
private boolean isPlugged;
private AggrCompletionHandler aggrCompletionHandler;
// optimization flag used to cache (or not) AggrCompletionHandler
private boolean isWrittenInThisThread;
Plug init(final FilterChainContext ctx, final PlugFilter plugFilter) {
this.ctx = ctx.copy();
this.plugFilter = plugFilter;
isPlugged = true;
return this;
}
Appends buffer to the queue.
Params: - msg –
- completionHandler –
Returns:
/**
* Appends buffer to the queue.
*
* @param msg
* @param completionHandler
* @return
*/
private boolean append(final Buffer msg,
final CompletionHandler completionHandler) {
if (isPlugged) {
obtainCompositeBuffer().append(msg);
if (completionHandler != null) {
obtainAggrCompletionHandler().add(completionHandler);
}
return true;
}
return false;
}
private CompositeBuffer obtainCompositeBuffer() {
if (buffer == null) {
buffer = CompositeBuffer.newBuffer(ctx.getMemoryManager());
buffer.allowBufferDispose(true);
buffer.allowInternalBuffersDispose(true);
buffer.disposeOrder(CompositeBuffer.DisposeOrder.LAST_TO_FIRST);
}
return buffer;
}
private AggrCompletionHandler obtainAggrCompletionHandler() {
if (aggrCompletionHandler == null) {
aggrCompletionHandler = new AggrCompletionHandler();
}
return aggrCompletionHandler;
}
@Override
public void onContextSuspend(final Context context) throws IOException {
unplug(context);
}
@Override
public void onContextManualIOEventControl(final Context context) throws IOException {
unplug(context);
}
@Override
public void onComplete(final Context context, final Object data) throws IOException {
unplug(context);
}
Releases the plug, which means we have to flush all the data and
remove the PlugFilter attr from the context.
Params: - context –
/**
* Releases the plug, which means we have to flush all the data and
* remove the PlugFilter attr from the context.
*
* @param context
*/
private void unplug(final Context context) {
if (isPlugged) {
flush();
ctx.completeAndRecycle();
isPlugged = false;
context.removeLifeCycleListener(this);
plugFilter.plugAttr.remove(context);
recycle();
}
}
flushes buffered data
/**
* flushes buffered data
*/
@SuppressWarnings("unchecked")
private void flush() {
if (isPlugged && buffer != null) {
isWrittenInThisThread = true;
ctx.write(null, buffer, aggrCompletionHandler, cloner);
buffer = null;
if (isWrittenInThisThread && aggrCompletionHandler != null) {
aggrCompletionHandler.clear();
} else {
aggrCompletionHandler = null;
}
}
}
private void recycle() {
if (aggrCompletionHandler != null) {
aggrCompletionHandler.clear();
}
ctx = null;
plugFilter = null;
ThreadCache.putToCache(CACHE_IDX, this);
}
private int size() {
return (isPlugged && buffer != null) ? buffer.remaining() : 0;
}
}
public static final class AggrCompletionHandler implements CompletionHandler {
private CompletionHandler[] handlers = new CompletionHandler[16];
private int sz;
public void add(final CompletionHandler handler) {
ensureSize();
handlers[sz++] = handler;
}
@Override
public void cancelled() {
for (int i = 0; i < sz; i++) {
handlers[i].cancelled();
}
}
@Override
public void failed(final Throwable throwable) {
for (int i = 0; i < sz; i++) {
handlers[i].failed(throwable);
}
}
@Override
@SuppressWarnings("unchecked")
public void completed(final Object result) {
for (int i = 0; i < sz; i++) {
handlers[i].completed(result);
}
}
@Override
@SuppressWarnings("unchecked")
public void updated(final Object result) {
for (int i = 0; i < sz; i++) {
handlers[i].updated(result);
}
}
public void clear() {
for (int i = 0; i < sz; i++) {
handlers[i] = null;
}
sz = 0;
}
private void ensureSize() {
if (handlers.length == sz) {
handlers = Arrays.copyOf(handlers, sz * 3 / 2 + 1);
}
}
}
}
}