/*
* Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http.internal.common;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class AsyncWriteQueue implements Closeable {
private static final int IDLE = 0; // nobody is flushing from the queue
private static final int FLUSHING = 1; // there is the only thread flushing from the queue
private static final int REFLUSHING = 2; // while one thread was flushing from the queue
// the other thread put data into the queue.
// flushing thread should recheck queue before switching to idle state.
private static final int DELAYED = 3; // flushing is delayed
// either by PlainHttpConnection.WriteEvent registration, or
// SSL handshaking
private static final int CLOSED = 4; // queue is closed
private final AtomicInteger state = new AtomicInteger(IDLE);
private final Deque<ByteBufferReference[]> queue = new ConcurrentLinkedDeque<>();
private final BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction;
// Queue may be processed in two modes:
// 1. if(!doFullDrain) - invoke callback on each chunk
// 2. if(doFullDrain) - drain the whole queue, merge all chunks into the single array and invoke callback
private final boolean doFullDrain;
private ByteBufferReference[] delayedElement = null;
public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction) {
this(consumeAction, true);
}
public AsyncWriteQueue(BiConsumer<ByteBufferReference[], AsyncWriteQueue> consumeAction, boolean doFullDrain) {
this.consumeAction = consumeAction;
this.doFullDrain = doFullDrain;
}
public void put(ByteBufferReference[] e) throws IOException {
ensureOpen();
queue.addLast(e);
}
public void putFirst(ByteBufferReference[] e) throws IOException {
ensureOpen();
queue.addFirst(e);
}
retruns true if flushing was performed
Throws: Returns:
/**
* retruns true if flushing was performed
* @return
* @throws IOException
*/
public boolean flush() throws IOException {
while(true) {
switch (state.get()) {
case IDLE:
if(state.compareAndSet(IDLE, FLUSHING)) {
flushLoop();
return true;
}
break;
case FLUSHING:
if(state.compareAndSet(FLUSHING, REFLUSHING)) {
return false;
}
break;
case REFLUSHING:
case DELAYED:
return false;
case CLOSED:
throw new IOException("Queue closed");
}
}
}
/*
* race invocations of flushDelayed are not allowed.
* flushDelayed should be invoked only from:
* - SelectorManager thread
* - Handshaking thread
*/
public void flushDelayed() throws IOException {
ensureOpen();
if(!state.compareAndSet(DELAYED, FLUSHING)) {
ensureOpen(); // if CAS failed when close was set - throw proper exception
throw new RuntimeException("Shouldn't happen");
}
flushLoop();
}
private ByteBufferReference[] drain(ByteBufferReference[] prev) {
assert prev != null;
if(doFullDrain) {
ByteBufferReference[] next = queue.poll();
if(next == null) {
return prev;
}
List<ByteBufferReference> drained = new ArrayList<>();
drained.addAll(Arrays.asList(prev));
drained.addAll(Arrays.asList(next));
while ((next = queue.poll()) != null) {
drained.addAll(Arrays.asList(next));
}
return drained.toArray(new ByteBufferReference[0]);
} else {
return prev;
}
}
private ByteBufferReference[] drain() {
ByteBufferReference[] next = queue.poll();
return next == null ? null : drain(next);
}
private void flushLoop() throws IOException {
ByteBufferReference[] element;
if (delayedElement != null) {
element = drain(delayedElement);
delayedElement = null;
} else {
element = drain();
}
while(true) {
while (element != null) {
consumeAction.accept(element, this);
if (state.get() == DELAYED) {
return;
}
element = drain();
}
switch (state.get()) {
case IDLE:
case DELAYED:
throw new RuntimeException("Shouldn't happen");
case FLUSHING:
if(state.compareAndSet(FLUSHING, IDLE)) {
return;
}
break;
case REFLUSHING:
// We need to check if new elements were put after last poll() and do graceful exit
state.compareAndSet(REFLUSHING, FLUSHING);
break;
case CLOSED:
throw new IOException("Queue closed");
}
element = drain();
}
}
/*
* The methods returns unprocessed chunk of buffers into beginning of the queue.
* Invocation of the method allowed only inside consume callback,
* and consume callback is invoked only when the queue in FLUSHING or REFLUSHING state.
*/
public void setDelayed(ByteBufferReference[] delayedElement) throws IOException {
while(true) {
int state = this.state.get();
switch (state) {
case IDLE:
case DELAYED:
throw new RuntimeException("Shouldn't happen");
case FLUSHING:
case REFLUSHING:
if(this.state.compareAndSet(state, DELAYED)) {
this.delayedElement = delayedElement;
return;
}
break;
case CLOSED:
throw new IOException("Queue closed");
}
}
}
private void ensureOpen() throws IOException {
if (state.get() == CLOSED) {
throw new IOException("Queue closed");
}
}
@Override
public void close() throws IOException {
state.getAndSet(CLOSED);
}
}