package org.glassfish.grizzly.http.server.util;
import java.io.IOException;
import java.util.Arrays;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Context;
import org.glassfish.grizzly.IOEventLifeCycleListener;
import org.glassfish.grizzly.ThreadCache;
import org.glassfish.grizzly.asyncqueue.MessageCloner;
import org.glassfish.grizzly.asyncqueue.WritableMessage;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.attributes.AttributeBuilder;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.filterchain.TransportFilter;
import org.glassfish.grizzly.http.server.AddOn;
import org.glassfish.grizzly.http.server.NetworkListener;
import org.glassfish.grizzly.memory.CompositeBuffer;
public class HttpPipelineOptAddOn implements AddOn {
private static final int DEFAULT_MAX_BUFFER_SIZE = 16384;
private final int maxBufferSize;
public HttpPipelineOptAddOn() {
this(DEFAULT_MAX_BUFFER_SIZE);
}
public HttpPipelineOptAddOn(final int maxBufferSize) {
this.maxBufferSize = maxBufferSize;
}
@Override
public void setup(final NetworkListener networkListener, final FilterChainBuilder builder) {
final int tfIdx = builder.indexOfType(TransportFilter.class);
builder.add(tfIdx + 1, new PlugFilter(maxBufferSize, networkListener.getTransport().getAttributeBuilder()));
}
private static class PlugFilter extends BaseFilter {
private final Attribute<Plug> plugAttr;
private final int maxBufferSize;
public PlugFilter(final int maxBufferSize, final AttributeBuilder builder) {
this.maxBufferSize = maxBufferSize;
plugAttr = builder.createAttribute(PlugFilter.class + ".plug");
}
@Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
if (!ctx.getTransportContext().isBlocking()) {
final Plug plug = Plug.create(ctx, this);
ctx.getInternalContext().addLifeCycleListener(plug);
plugAttr.set(ctx, plug);
}
return ctx.getInvokeAction();
}
@Override
@SuppressWarnings("unchecked")
public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
final Plug plug = plugAttr.get(ctx);
if (plug != null && plug.isPlugged) {
final WritableMessage msg = ctx.getMessage();
if (!msg.isExternal()) {
final Buffer buf = (Buffer) msg;
final MessageCloner<Buffer> cloner = ctx.getTransportContext().getMessageCloner();
plug.append(cloner == null ? buf : cloner.clone(ctx.getConnection(), buf), ctx.getTransportContext().getCompletionHandler());
if (plug.size() > maxBufferSize) {
plug.flush();
}
return ctx.getStopAction();
} else {
plug.flush();
}
}
return ctx.getInvokeAction();
}
public static class Plug extends IOEventLifeCycleListener.Adapter {
private static final ThreadCache.CachedTypeIndex<Plug> CACHE_IDX = ThreadCache.obtainIndex(Plug.class, 4);
public static Plug create(final FilterChainContext ctx, final PlugFilter plugFilter) {
Plug plug = ThreadCache.takeFromCache(CACHE_IDX);
if (plug == null) {
plug = new Plug();
}
return plug.init(ctx, plugFilter);
}
private final MessageCloner<Buffer> cloner = new MessageCloner<Buffer>() {
@Override
public Buffer clone(final Connection connection, final Buffer originalMessage) {
isWrittenInThisThread = false;
return originalMessage;
}
};
private FilterChainContext ctx;
private PlugFilter plugFilter;
private CompositeBuffer buffer;
private boolean isPlugged;
private AggrCompletionHandler aggrCompletionHandler;
private boolean isWrittenInThisThread;
Plug init(final FilterChainContext ctx, final PlugFilter plugFilter) {
this.ctx = ctx.copy();
this.plugFilter = plugFilter;
isPlugged = true;
return this;
}
private boolean append(final Buffer msg, final CompletionHandler completionHandler) {
if (isPlugged) {
obtainCompositeBuffer().append(msg);
if (completionHandler != null) {
obtainAggrCompletionHandler().add(completionHandler);
}
return true;
}
return false;
}
private CompositeBuffer obtainCompositeBuffer() {
if (buffer == null) {
buffer = CompositeBuffer.newBuffer(ctx.getMemoryManager());
buffer.allowBufferDispose(true);
buffer.allowInternalBuffersDispose(true);
buffer.disposeOrder(CompositeBuffer.DisposeOrder.LAST_TO_FIRST);
}
return buffer;
}
private AggrCompletionHandler obtainAggrCompletionHandler() {
if (aggrCompletionHandler == null) {
aggrCompletionHandler = new AggrCompletionHandler();
}
return aggrCompletionHandler;
}
@Override
public void onContextSuspend(final Context context) throws IOException {
unplug(context);
}
@Override
public void onContextManualIOEventControl(final Context context) throws IOException {
unplug(context);
}
@Override
public void onComplete(final Context context, final Object data) throws IOException {
unplug(context);
}
private void unplug(final Context context) {
if (isPlugged) {
flush();
ctx.completeAndRecycle();
isPlugged = false;
context.removeLifeCycleListener(this);
plugFilter.plugAttr.remove(context);
recycle();
}
}
@SuppressWarnings("unchecked")
private void flush() {
if (isPlugged && buffer != null) {
isWrittenInThisThread = true;
ctx.write(null, buffer, aggrCompletionHandler, cloner);
buffer = null;
if (isWrittenInThisThread && aggrCompletionHandler != null) {
aggrCompletionHandler.clear();
} else {
aggrCompletionHandler = null;
}
}
}
private void recycle() {
if (aggrCompletionHandler != null) {
aggrCompletionHandler.clear();
}
ctx = null;
plugFilter = null;
ThreadCache.putToCache(CACHE_IDX, this);
}
private int size() {
return isPlugged && buffer != null ? buffer.remaining() : 0;
}
}
public static final class AggrCompletionHandler implements CompletionHandler {
private CompletionHandler[] handlers = new CompletionHandler[16];
private int sz;
public void add(final CompletionHandler handler) {
ensureSize();
handlers[sz++] = handler;
}
@Override
public void cancelled() {
for (int i = 0; i < sz; i++) {
handlers[i].cancelled();
}
}
@Override
public void failed(final Throwable throwable) {
for (int i = 0; i < sz; i++) {
handlers[i].failed(throwable);
}
}
@Override
@SuppressWarnings("unchecked")
public void completed(final Object result) {
for (int i = 0; i < sz; i++) {
handlers[i].completed(result);
}
}
@Override
@SuppressWarnings("unchecked")
public void updated(final Object result) {
for (int i = 0; i < sz; i++) {
handlers[i].updated(result);
}
}
public void clear() {
for (int i = 0; i < sz; i++) {
handlers[i] = null;
}
sz = 0;
}
private void ensureSize() {
if (handlers.length == sz) {
handlers = Arrays.copyOf(handlers, sz * 3 / 2 + 1);
}
}
}
}
}