package io.undertow.server.protocol.framed;
import static org.xnio.IoUtils.safeClose;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.xnio.Buffers;
import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListener.Setter;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.Option;
import org.xnio.OptionMap;
import org.xnio.StreamConnection;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
import org.xnio.channels.CloseableChannel;
import org.xnio.channels.ConnectedChannel;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.channels.SuspendableWriteChannel;
import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;
import io.undertow.UndertowOptions;
import io.undertow.conduits.IdleTimeoutConduit;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.util.ReferenceCountedPooled;
public abstract class AbstractFramedChannel<C extends AbstractFramedChannel<C, R, S>, R extends AbstractFramedStreamSourceChannel<C, R, S>, S extends AbstractFramedStreamSinkChannel<C, R, S>> implements ConnectedChannel {
private final int maxQueuedBuffers;
private final StreamConnection channel;
private final IdleTimeoutConduit idleTimeoutConduit;
private final ChannelListener.SimpleSetter<C> closeSetter;
private final ChannelListener.SimpleSetter<C> receiveSetter;
private final ByteBufferPool bufferPool;
private final FramePriority<C, R, S> framePriority;
private final List<S> pendingFrames = new LinkedList<>();
private final Deque<S> heldFrames = new ArrayDeque<>();
private final Deque<S> newFrames = new LinkedBlockingDeque<>();
private volatile long frameDataRemaining;
private volatile R receiver;
private volatile boolean receivesSuspendedByUser = true;
private volatile boolean receivesSuspendedTooManyQueuedMessages = false;
private volatile boolean receivesSuspendedTooManyBuffers = false;
@SuppressWarnings("unused")
private volatile int readsBroken = 0;
@SuppressWarnings("unused")
private volatile int writesBroken = 0;
private static final AtomicIntegerFieldUpdater<AbstractFramedChannel> readsBrokenUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractFramedChannel.class, "readsBroken");
private static final AtomicIntegerFieldUpdater<AbstractFramedChannel> writesBrokenUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractFramedChannel.class, "writesBroken");
private volatile ReferenceCountedPooled readData = null;
private final List<ChannelListener<C>> closeTasks = new CopyOnWriteArrayList<>();
private volatile boolean flushingSenders = false;
private boolean partialRead = false;
@SuppressWarnings("unused")
private volatile int outstandingBuffers;
private static final AtomicIntegerFieldUpdater<AbstractFramedChannel> outstandingBuffersUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractFramedChannel.class, "outstandingBuffers");
private final LinkedBlockingDeque<Runnable> taskRunQueue = new LinkedBlockingDeque<>();
private final Runnable taskRunQueueRunnable = new Runnable() {
@Override
public void run() {
Runnable runnable;
while ((runnable = taskRunQueue.poll()) != null) {
runnable.run();
}
}
};
private final OptionMap settings;
private volatile boolean requireExplicitFlush = false;
private volatile boolean readChannelDone = false;
private final int queuedFrameHighWaterMark;
private final int queuedFrameLowWaterMark;
private final ReferenceCountedPooled.FreeNotifier freeNotifier = new ReferenceCountedPooled.FreeNotifier() {
@Override
public void freed() {
int res = outstandingBuffersUpdater.decrementAndGet(AbstractFramedChannel.this);
if (!receivesSuspendedByUser && res == maxQueuedBuffers - 1) {
getIoThread().execute(new Runnable() {
@Override
public void run() {
synchronized (AbstractFramedChannel.this) {
if (outstandingBuffersUpdater.get(AbstractFramedChannel.this) < maxQueuedBuffers) {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef("Resuming reads on %s as buffers have been consumed", AbstractFramedChannel.this);
}
new UpdateResumeState(null, false, null).run();
}
}
}
});
}
}
};
private static final ChannelListener<AbstractFramedChannel> DRAIN_LISTENER = new ChannelListener<AbstractFramedChannel>() {
@Override
public void handleEvent(AbstractFramedChannel channel) {
try {
AbstractFramedStreamSourceChannel stream = channel.receive();
if(stream != null) {
UndertowLogger.REQUEST_IO_LOGGER.debugf("Draining channel %s as no receive listener has been set", stream);
stream.getReadSetter().set(ChannelListeners.drainListener(Long.MAX_VALUE, null, null));
stream.wakeupReads();
}
} catch (IOException | RuntimeException | Error e) {
IoUtils.safeClose(channel);
}
}
};
protected AbstractFramedChannel(final StreamConnection connectedStreamChannel, ByteBufferPool bufferPool, FramePriority<C, R, S> framePriority, final PooledByteBuffer readData, OptionMap settings) {
this.framePriority = framePriority;
this.maxQueuedBuffers = settings.get(UndertowOptions.MAX_QUEUED_READ_BUFFERS, 10);
this.settings = settings;
if (readData != null) {
if(readData.getBuffer().hasRemaining()) {
this.readData = new ReferenceCountedPooled(readData, 1);
} else {
readData.close();
}
}
if(bufferPool == null) {
throw UndertowMessages.MESSAGES.argumentCannotBeNull("bufferPool");
}
if(connectedStreamChannel == null) {
throw UndertowMessages.MESSAGES.argumentCannotBeNull("connectedStreamChannel");
}
IdleTimeoutConduit idle = createIdleTimeoutChannel(connectedStreamChannel);
connectedStreamChannel.getSourceChannel().setConduit(idle);
connectedStreamChannel.getSinkChannel().setConduit(idle);
this.idleTimeoutConduit = idle;
this.channel = connectedStreamChannel;
this.bufferPool = bufferPool;
closeSetter = new ChannelListener.SimpleSetter<>();
receiveSetter = new ChannelListener.SimpleSetter<>();
channel.getSourceChannel().getReadSetter().set(null);
channel.getSourceChannel().suspendReads();
channel.getSourceChannel().getReadSetter().set(new FrameReadListener());
connectedStreamChannel.getSinkChannel().getWriteSetter().set(new FrameWriteListener());
FrameCloseListener closeListener = new FrameCloseListener();
connectedStreamChannel.getSinkChannel().getCloseSetter().set(closeListener);
connectedStreamChannel.getSourceChannel().getCloseSetter().set(closeListener);
this.queuedFrameHighWaterMark = settings.get(UndertowOptions.QUEUED_FRAMES_HIGH_WATER_MARK, 50);
this.queuedFrameLowWaterMark = settings.get(UndertowOptions.QUEUED_FRAMES_LOW_WATER_MARK, 10);
}
protected IdleTimeoutConduit createIdleTimeoutChannel(StreamConnection connectedStreamChannel) {
return new IdleTimeoutConduit(connectedStreamChannel);
}
void runInIoThread(Runnable task) {
this.taskRunQueue.add(task);
try {
getIoThread().execute(taskRunQueueRunnable);
} catch (RejectedExecutionException e) {
ShutdownFallbackExecutor.execute(taskRunQueueRunnable);
}
}
public ByteBufferPool getBufferPool() {
return bufferPool;
}
@Override
public SocketAddress getLocalAddress() {
return channel.getLocalAddress();
}
@Override
public <A extends SocketAddress> A getLocalAddress(Class<A> type) {
return channel.getLocalAddress(type);
}
@Override
public XnioWorker getWorker() {
return channel.getWorker();
}
@Override
public XnioIoThread getIoThread() {
return channel.getIoThread();
}
@Override
public boolean supportsOption(Option<?> option) {
return channel.supportsOption(option);
}
@Override
public <T> T getOption(Option<T> option) throws IOException {
return channel.getOption(option);
}
@Override
public <T> T setOption(Option<T> option, T value) throws IOException {
return channel.setOption(option, value);
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public SocketAddress getPeerAddress() {
return channel.getPeerAddress();
}
@Override
public <A extends SocketAddress> A getPeerAddress(Class<A> type) {
return channel.getPeerAddress(type);
}
public InetSocketAddress getSourceAddress() {
return getPeerAddress(InetSocketAddress.class);
}
public InetSocketAddress getDestinationAddress() {
return getLocalAddress(InetSocketAddress.class);
}
public synchronized R receive() throws IOException {
if (readChannelDone && receiver == null) {
if(readData != null) {
readData.close();
readData = null;
}
channel.getSourceChannel().suspendReads();
channel.getSourceChannel().shutdownReads();
return null;
}
partialRead = false;
boolean requiresReinvoke = false;
int reinvokeDataRemaining = 0;
ReferenceCountedPooled pooled = this.readData;
boolean hasData = false;
if (pooled == null) {
pooled = allocateReferenceCountedBuffer();
if (pooled == null) {
return null;
}
} else if(pooled.isFreed()) {
if(!pooled.tryUnfree()) {
pooled = allocateReferenceCountedBuffer();
if (pooled == null) {
return null;
}
}
pooled.getBuffer().clear();
} else {
hasData = pooled.getBuffer().hasRemaining();
pooled.getBuffer().compact();
}
boolean forceFree = false;
int read = 0;
try {
read = channel.getSourceChannel().read(pooled.getBuffer());
if (read == 0 && !hasData) {
forceFree = true;
return null;
} else if (read == -1 && !hasData) {
forceFree = true;
readChannelDone = true;
lastDataRead();
return null;
} else if(isLastFrameReceived() && frameDataRemaining == 0) {
forceFree = true;
markReadsBroken(new ClosedChannelException());
}
pooled.getBuffer().flip();
if(read == -1) {
requiresReinvoke = true;
reinvokeDataRemaining = pooled.getBuffer().remaining();
}
if (frameDataRemaining > 0) {
if (frameDataRemaining >= pooled.getBuffer().remaining()) {
frameDataRemaining -= pooled.getBuffer().remaining();
if(receiver != null) {
PooledByteBuffer frameData = pooled.createView();
receiver.dataReady(null, frameData);
} else {
pooled.close();
readData = null;
}
if(frameDataRemaining == 0) {
receiver = null;
}
return null;
} else {
PooledByteBuffer frameData = pooled.createView((int) frameDataRemaining);
frameDataRemaining = 0;
if(receiver != null) {
receiver.dataReady(null, frameData);
} else{
frameData.close();
}
receiver = null;
}
return null;
}
FrameHeaderData data = parseFrame(pooled.getBuffer());
if (data != null) {
PooledByteBuffer frameData;
if (data.getFrameLength() >= pooled.getBuffer().remaining()) {
frameDataRemaining = data.getFrameLength() - pooled.getBuffer().remaining();
frameData = pooled.createView();
pooled.getBuffer().position(pooled.getBuffer().limit());
} else {
frameData = pooled.createView((int) data.getFrameLength());
}
AbstractFramedStreamSourceChannel<?, ?, ?> existing = data.getExistingChannel();
if (existing != null) {
if (data.getFrameLength() > frameData.getBuffer().remaining()) {
receiver = (R) existing;
}
existing.dataReady(data, frameData);
if(isLastFrameReceived()) {
handleLastFrame(existing);
}
return null;
} else {
boolean moreData = data.getFrameLength() > frameData.getBuffer().remaining();
R newChannel = createChannel(data, frameData);
if (newChannel != null) {
if (moreData) {
receiver = newChannel;
}
if(isLastFrameReceived()) {
handleLastFrame(newChannel);
}
} else {
frameData.close();
}
return newChannel;
}
} else {
partialRead = true;
}
return null;
} catch (IOException|RuntimeException|Error e) {
markReadsBroken(e);
forceFree = true;
throw e;
}finally {
if (readData != null) {
if (!pooled.getBuffer().hasRemaining() || forceFree) {
if(pooled.getBuffer().capacity() < 1024 || forceFree) {
readData = null;
}
pooled.close();
}
}
if(requiresReinvoke) {
if(readData != null && !readData.isFreed()) {
if(readData.getBuffer().remaining() == reinvokeDataRemaining) {
readData.close();
readData = null;
UndertowLogger.REQUEST_IO_LOGGER.debugf("Partial message read before connection close %s", this);
}
}
channel.getSourceChannel().wakeupReads();
}
}
}
private void handleLastFrame(AbstractFramedStreamSourceChannel newChannel) {
Set<AbstractFramedStreamSourceChannel<C, R, S>> receivers = new HashSet<>(getReceivers());
for(AbstractFramedStreamSourceChannel<C, R, S> r : receivers) {
if(r != newChannel) {
r.markStreamBroken();
}
}
}
private ReferenceCountedPooled allocateReferenceCountedBuffer() {
if(maxQueuedBuffers > 0) {
int expect;
do {
expect = outstandingBuffersUpdater.get(this);
if (expect == maxQueuedBuffers) {
synchronized (this) {
expect = outstandingBuffersUpdater.get(this);
if (expect == maxQueuedBuffers) {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef("Suspending reads on %s due to too many outstanding buffers", this);
}
getIoThread().execute(new UpdateResumeState(null, true, null));
return null;
}
}
}
} while (!outstandingBuffersUpdater.compareAndSet(this, expect, expect + 1));
}
PooledByteBuffer buf = bufferPool.allocate();
return this.readData = new ReferenceCountedPooled(buf, 1, maxQueuedBuffers > 0 ? freeNotifier : null);
}
protected void lastDataRead() {
}
protected abstract R (FrameHeaderData frameHeaderData, PooledByteBuffer frameData) throws IOException;
protected abstract FrameHeaderData parseFrame(ByteBuffer data) throws IOException;
protected synchronized void recalculateHeldFrames() throws IOException {
if (!heldFrames.isEmpty()) {
framePriority.frameAdded(null, pendingFrames, heldFrames);
flushSenders();
}
}
protected synchronized void flushSenders() {
if(flushingSenders) {
throw UndertowMessages.MESSAGES.recursiveCallToFlushingSenders();
}
flushingSenders = true;
try {
int toSend = 0;
S frame;
while ((frame = newFrames.poll()) != null) {
frame.preWrite();
if (framePriority.insertFrame(frame, pendingFrames)) {
if (!heldFrames.isEmpty()) {
framePriority.frameAdded(frame, pendingFrames, heldFrames);
}
} else {
heldFrames.add(frame);
}
}
boolean finalFrame = false;
ListIterator<S> it = pendingFrames.listIterator();
while (it.hasNext()) {
S sender = it.next();
if (sender.isReadyForFlush()) {
++toSend;
} else {
break;
}
if (sender.isLastFrame()) {
finalFrame = true;
}
}
if (toSend == 0) {
try {
if(channel.getSinkChannel().flush()) {
channel.getSinkChannel().suspendWrites();
}
} catch (Throwable e) {
safeClose(channel);
markWritesBroken(e);
}
return;
}
ByteBuffer[] data = new ByteBuffer[toSend * 3];
int j = 0;
it = pendingFrames.listIterator();
try {
while (j < toSend) {
S next = it.next();
SendFrameHeader frameHeader = next.getFrameHeader();
PooledByteBuffer frameHeaderByteBuffer = frameHeader.getByteBuffer();
ByteBuffer frameTrailerBuffer = frameHeader.getTrailer();
data[j * 3] = frameHeaderByteBuffer != null
? frameHeaderByteBuffer.getBuffer()
: Buffers.EMPTY_BYTE_BUFFER;
data[(j * 3) + 1] = next.getBuffer() == null ? Buffers.EMPTY_BYTE_BUFFER : next.getBuffer();
data[(j * 3) + 2] = frameTrailerBuffer != null ? frameTrailerBuffer : Buffers.EMPTY_BYTE_BUFFER;
++j;
}
long toWrite = Buffers.remaining(data);
long res;
do {
res = channel.getSinkChannel().write(data);
toWrite -= res;
} while (res > 0 && toWrite > 0);
int max = toSend;
while (max > 0) {
S sinkChannel = pendingFrames.get(0);
PooledByteBuffer frameHeaderByteBuffer = sinkChannel.getFrameHeader().getByteBuffer();
ByteBuffer frameTrailerBuffer = sinkChannel.getFrameHeader().getTrailer();
if (frameHeaderByteBuffer != null && frameHeaderByteBuffer.getBuffer().hasRemaining()
|| sinkChannel.getBuffer() != null && sinkChannel.getBuffer().hasRemaining()
|| frameTrailerBuffer != null && frameTrailerBuffer.hasRemaining()) {
break;
}
sinkChannel.flushComplete();
pendingFrames.remove(sinkChannel);
max--;
}
if (!pendingFrames.isEmpty() || !channel.getSinkChannel().flush()) {
channel.getSinkChannel().resumeWrites();
} else {
channel.getSinkChannel().suspendWrites();
}
if (pendingFrames.isEmpty() && finalFrame) {
channel.getSinkChannel().shutdownWrites();
if (!channel.getSinkChannel().flush()) {
channel.getSinkChannel().setWriteListener(ChannelListeners.flushingChannelListener(null, null));
channel.getSinkChannel().resumeWrites();
}
} else if (pendingFrames.size() > queuedFrameHighWaterMark) {
new UpdateResumeState(null, null, true).run();
} else if (receivesSuspendedTooManyQueuedMessages && pendingFrames.size() < queuedFrameLowWaterMark) {
new UpdateResumeState(null, null, false).run();
}
} catch (IOException|RuntimeException|Error e) {
safeClose(channel);
markWritesBroken(e);
}
} finally {
flushingSenders = false;
if(!newFrames.isEmpty()) {
runInIoThread(new Runnable() {
@Override
public void run() {
flushSenders();
}
});
}
}
}
void awaitWritable() throws IOException {
this.channel.getSinkChannel().awaitWritable();
}
void awaitWritable(long time, TimeUnit unit) throws IOException {
this.channel.getSinkChannel().awaitWritable(time, unit);
}
protected void queueFrame(final S channel) throws IOException {
assert !newFrames.contains(channel);
if (isWritesBroken() || !this.channel.getSinkChannel().isOpen() || channel.isBroken() || !channel.isOpen()) {
IoUtils.safeClose(channel);
throw UndertowMessages.MESSAGES.channelIsClosed();
}
newFrames.add(channel);
if (!requireExplicitFlush || channel.isBufferFull()) {
flush();
}
}
public void flush() {
if (!flushingSenders) {
if(channel.getIoThread() == Thread.currentThread()) {
flushSenders();
} else {
runInIoThread(new Runnable() {
@Override
public void run() {
flushSenders();
}
});
}
}
}
protected abstract boolean isLastFrameReceived();
protected abstract boolean isLastFrameSent();
protected abstract void handleBrokenSourceChannel(Throwable e);
protected abstract void handleBrokenSinkChannel(Throwable e);
public Setter<C> getReceiveSetter() {
return receiveSetter;
}
public synchronized void suspendReceives() {
receivesSuspendedByUser = true;
getIoThread().execute(new UpdateResumeState(true, null, null));
}
public synchronized void resumeReceives() {
receivesSuspendedByUser = false;
getIoThread().execute(new UpdateResumeState(false, null, null));
}
private void doResume() {
if (readData != null && !readData.isFreed()) {
channel.getSourceChannel().wakeupReads();
} else {
channel.getSourceChannel().resumeReads();
}
}
public boolean isReceivesResumed() {
return !receivesSuspendedByUser;
}
@Override
public void close() throws IOException {
if (UndertowLogger.REQUEST_IO_LOGGER.isTraceEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.tracef(new ClosedChannelException(), "Channel %s is being closed", this);
}
safeClose(channel);
if (readData != null) {
readData.close();
readData = null;
}
closeSubChannels();
}
@Override
public Setter<? extends AbstractFramedChannel> getCloseSetter() {
return closeSetter;
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected void markReadsBroken(Throwable cause) {
if (readsBrokenUpdater.compareAndSet(this, 0, 1)) {
if(UndertowLogger.REQUEST_IO_LOGGER.isDebugEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.debugf(new ClosedChannelException(), "Marking reads broken on channel %s", this);
}
if(receiver != null) {
receiver.markStreamBroken();
}
for(AbstractFramedStreamSourceChannel<C, R, S> r : new ArrayList<>(getReceivers())) {
r.markStreamBroken();
}
handleBrokenSourceChannel(cause);
safeClose(channel.getSourceChannel());
closeSubChannels();
}
}
protected abstract void closeSubChannels();
@SuppressWarnings({"unchecked", "rawtypes"})
protected void markWritesBroken(Throwable cause) {
if (writesBrokenUpdater.compareAndSet(this, 0, 1)) {
if(UndertowLogger.REQUEST_IO_LOGGER.isDebugEnabled()) {
UndertowLogger.REQUEST_IO_LOGGER.debugf(new ClosedChannelException(), "Marking writes broken on channel %s", this);
}
handleBrokenSinkChannel(cause);
safeClose(channel.getSinkChannel());
synchronized (this) {
for (final S channel : pendingFrames) {
channel.markBroken();
}
pendingFrames.clear();
for (final S channel : newFrames) {
channel.markBroken();
}
newFrames.clear();
for (final S channel : heldFrames) {
channel.markBroken();
}
heldFrames.clear();
}
}
}
protected boolean isWritesBroken() {
return writesBrokenUpdater.get(this) != 0;
}
protected boolean isReadsBroken() {
return readsBrokenUpdater.get(this) != 0;
}
void resumeWrites() {
channel.getSinkChannel().resumeWrites();
}
void suspendWrites() {
channel.getSinkChannel().suspendWrites();
}
void wakeupWrites() {
channel.getSinkChannel().wakeupWrites();
}
StreamSourceChannel getSourceChannel() {
return channel.getSourceChannel();
}
void notifyFrameReadComplete(AbstractFramedStreamSourceChannel<C, R, S> channel) {
}
private final class FrameReadListener implements ChannelListener<StreamSourceChannel> {
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public void handleEvent(final StreamSourceChannel channel) {
Runnable runnable;
while ((runnable = taskRunQueue.poll()) != null) {
runnable.run();
}
final R receiver = AbstractFramedChannel.this.receiver;
if ((readChannelDone || isReadsSuspended()) && receiver == null) {
channel.suspendReads();
return;
} else {
ChannelListener listener = receiveSetter.get();
if (listener == null) {
listener = DRAIN_LISTENER;
}
UndertowLogger.REQUEST_IO_LOGGER.tracef("Invoking receive listener", receiver);
ChannelListeners.invokeChannelListener(AbstractFramedChannel.this, listener);
}
final boolean partialRead;
synchronized (AbstractFramedChannel.this) {
partialRead = AbstractFramedChannel.this.partialRead;
}
if (readData != null && !readData.isFreed() && channel.isOpen() && !partialRead) {
try {
runInIoThread(new Runnable() {
@Override
public void run() {
ChannelListeners.invokeChannelListener(channel, FrameReadListener.this);
}
});
} catch (RejectedExecutionException e) {
IoUtils.safeClose(AbstractFramedChannel.this);
}
}
synchronized (AbstractFramedChannel.this) {
AbstractFramedChannel.this.partialRead = false;
}
}
}
private boolean isReadsSuspended() {
return receivesSuspendedByUser || receivesSuspendedTooManyBuffers || receivesSuspendedTooManyQueuedMessages;
}
private class FrameWriteListener implements ChannelListener<StreamSinkChannel> {
@Override
public void handleEvent(final StreamSinkChannel channel) {
flushSenders();
}
}
private class FrameCloseListener implements ChannelListener<CloseableChannel> {
private boolean sinkClosed;
private boolean sourceClosed;
@Override
public void handleEvent(final CloseableChannel c) {
if (Thread.currentThread() != c.getIoThread() && !c.getWorker().isShutdown()) {
runInIoThread(new Runnable() {
@Override
public void run() {
ChannelListeners.invokeChannelListener(c, FrameCloseListener.this);
}
});
return;
}
if(c instanceof StreamSinkChannel) {
sinkClosed = true;
} else if(c instanceof StreamSourceChannel) {
sourceClosed = true;
}
if(!sourceClosed || !sinkClosed) {
return;
} else if(readData != null && !readData.isFreed()) {
runInIoThread(new Runnable() {
@Override
public void run() {
while (readData != null && !readData.isFreed()) {
int rem = readData.getBuffer().remaining();
ChannelListener listener = receiveSetter.get();
if(listener == null) {
listener = DRAIN_LISTENER;
}
ChannelListeners.invokeChannelListener(AbstractFramedChannel.this, listener);
if(!AbstractFramedChannel.this.isOpen()) {
break;
}
if (readData != null && rem == readData.getBuffer().remaining()) {
break;
}
}
handleEvent(c);
}
});
return;
}
R receiver = AbstractFramedChannel.this.receiver;
try {
if (receiver != null && receiver.isOpen() && receiver.isReadResumed()) {
ChannelListeners.invokeChannelListener(receiver, ((SimpleSetter) receiver.getReadSetter()).get());
}
final List<S> pendingFrames;
final List<S> newFrames;
final List<S> heldFrames;
final List<AbstractFramedStreamSourceChannel<C, R, S>> receivers;
synchronized (AbstractFramedChannel.this) {
pendingFrames = new ArrayList<>(AbstractFramedChannel.this.pendingFrames);
newFrames = new ArrayList<>(AbstractFramedChannel.this.newFrames);
heldFrames = new ArrayList<>(AbstractFramedChannel.this.heldFrames);
receivers = new ArrayList<>(getReceivers());
}
for (final S channel : pendingFrames) {
channel.markBroken();
}
for (final S channel : newFrames) {
channel.markBroken();
}
for (final S channel : heldFrames) {
channel.markBroken();
}
for (AbstractFramedStreamSourceChannel<C, R, S> r : receivers) {
IoUtils.safeClose(r);
}
} finally {
try {
for (ChannelListener<C> task : closeTasks) {
ChannelListeners.invokeChannelListener((C) AbstractFramedChannel.this, task);
}
} finally {
synchronized (AbstractFramedChannel.this) {
closeSubChannels();
if (readData != null) {
readData.close();
readData = null;
}
}
ChannelListeners.invokeChannelListener((C) AbstractFramedChannel.this, closeSetter.get());
}
}
}
}
protected abstract Collection<AbstractFramedStreamSourceChannel<C, R, S>> getReceivers();
public void setIdleTimeout(long timeout) {
idleTimeoutConduit.setIdleTimeout(timeout);
}
public long getIdleTimeout() {
return idleTimeoutConduit.getIdleTimeout();
}
protected FramePriority<C, R, S> getFramePriority() {
return framePriority;
}
public void addCloseTask(final ChannelListener<C> task) {
closeTasks.add(task);
}
@Override
public String toString() {
return getClass().getSimpleName() + " peer " + channel.getPeerAddress() + " local " + channel.getLocalAddress() + "[ " + (receiver == null ? "No Receiver" : receiver.toString()) + " " + pendingFrames.toString() + " -- " + heldFrames.toString() + " -- " + newFrames.toString() + "]";
}
protected StreamConnection getUnderlyingConnection() {
return channel;
}
protected ChannelExceptionHandler<SuspendableWriteChannel> writeExceptionHandler() {
return new ChannelExceptionHandler<SuspendableWriteChannel>() {
@Override
public void handleException(SuspendableWriteChannel channel, IOException exception) {
markWritesBroken(exception);
}
};
}
public boolean isRequireExplicitFlush() {
return requireExplicitFlush;
}
public void setRequireExplicitFlush(boolean requireExplicitFlush) {
this.requireExplicitFlush = requireExplicitFlush;
}
protected OptionMap getSettings() {
return settings;
}
private class UpdateResumeState implements Runnable {
private final Boolean user;
private final Boolean buffers;
private final Boolean frames;
private UpdateResumeState(Boolean user, Boolean buffers, Boolean frames) {
this.user = user;
this.buffers = buffers;
this.frames = frames;
}
@Override
public void run() {
if (user != null) {
receivesSuspendedByUser = user;
}
if (buffers != null) {
receivesSuspendedTooManyBuffers = buffers;
}
if (frames != null) {
receivesSuspendedTooManyQueuedMessages = frames;
}
if (receivesSuspendedByUser || receivesSuspendedTooManyQueuedMessages || receivesSuspendedTooManyBuffers) {
channel.getSourceChannel().suspendReads();
} else {
doResume();
}
}
}
}