package org.xnio.ssl;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
import static org.xnio.Bits.allAreClear;
import static org.xnio.Bits.allAreSet;
import static org.xnio.Bits.anyAreClear;
import static org.xnio.Bits.anyAreSet;
import static org.xnio._private.Messages.msg;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import org.xnio.Buffers;
import org.xnio.Pool;
import org.xnio.Pooled;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.conduits.ConduitReadableByteChannel;
import org.xnio.conduits.ConduitWritableByteChannel;
import org.xnio.conduits.Conduits;
import org.xnio.conduits.ReadReadyHandler;
import org.xnio.conduits.StreamSinkConduit;
import org.xnio.conduits.StreamSourceConduit;
import org.xnio.conduits.WriteReadyHandler;
final class JsseStreamConduit implements StreamSourceConduit, StreamSinkConduit, Runnable {
private static final boolean TRACE_SSL = Boolean.getBoolean("org.xnio.ssl.TRACE_SSL");
private final JsseSslConnection connection;
private final SSLEngine engine;
private final StreamSourceConduit sourceConduit;
private final StreamSinkConduit sinkConduit;
private final Pooled<ByteBuffer> receiveBuffer;
private final Pooled<ByteBuffer> sendBuffer;
private final Pooled<ByteBuffer> readBuffer;
private int state = FLAG_INLINE_TASKS;
private int tasks;
private ReadReadyHandler readReadyHandler;
private WriteReadyHandler writeReadyHandler;
JsseStreamConduit(final JsseSslConnection connection, final SSLEngine engine, final StreamSourceConduit sourceConduit, final StreamSinkConduit sinkConduit, final Pool<ByteBuffer> socketBufferPool, final Pool<ByteBuffer> applicationBufferPool) {
Pooled<ByteBuffer> receiveBuffer;
Pooled<ByteBuffer> sendBuffer;
Pooled<ByteBuffer> readBuffer;
boolean ok = false;
final SSLSession session = engine.getSession();
final int packetBufferSize = session.getPacketBufferSize();
receiveBuffer = socketBufferPool.allocate();
try {
receiveBuffer.getResource().flip();
sendBuffer = socketBufferPool.allocate();
try {
if (receiveBuffer.getResource().capacity() < packetBufferSize || sendBuffer.getResource().capacity() < packetBufferSize) {
throw msg.socketBufferTooSmall();
}
final int applicationBufferSize = session.getApplicationBufferSize();
readBuffer = applicationBufferPool.allocate();
try {
if (readBuffer.getResource().capacity() < applicationBufferSize) {
throw msg.appBufferTooSmall();
}
ok = true;
} finally {
if (! ok) readBuffer.free();
}
} finally {
if (! ok) sendBuffer.free();
}
} finally {
if (! ok) receiveBuffer.free();
}
this.receiveBuffer = receiveBuffer;
this.sendBuffer = sendBuffer;
this.readBuffer = readBuffer;
receiveBuffer.getResource().clear().limit(0);
if (sourceConduit.getReadThread() != sinkConduit.getWriteThread()) {
throw new IllegalArgumentException("Source and sink thread mismatch");
}
this.connection = connection;
this.engine = engine;
this.sourceConduit = sourceConduit;
this.sinkConduit = sinkConduit;
sourceConduit.setReadReadyHandler(readReady);
sinkConduit.setWriteReadyHandler(writeReady);
}
private static final int FLAG_TLS = 0b00001_000000000_00000000;
private static final int FLAG_INLINE_TASKS = 0b00010_000000000_00000000;
private static final int FLAG_TASK_QUEUED = 0b00100_000000000_00000000;
private static final int FLAG_NEED_ENGINE_TASK = 0b01000_000000000_00000000;
private static final int FLAG_FLUSH_NEEDED = 0b10000_000000000_00000000;
private static final int READ_FLAG_SHUTDOWN = 0b00000_000000000_00000001;
private static final int READ_FLAG_EOF = 0b00000_000000000_00000010;
private static final int READ_FLAG_RESUMED = 0b00000_000000000_00000100;
private static final int READ_FLAG_UP_RESUMED = 0b00000_000000000_00001000;
private static final int READ_FLAG_WAKEUP = 0b00000_000000000_00010000;
private static final int READ_FLAG_READY = 0b00000_000000000_00100000;
private static final int READ_FLAG_NEEDS_WRITE = 0b00000_000000000_01000000;
private static final int WRITE_FLAG_SHUTDOWN = 0b00000_000000001_00000000;
private static final int WRITE_FLAG_SHUTDOWN2 = 0b00000_000000010_00000000;
private static final int WRITE_FLAG_SHUTDOWN3 = 0b00000_000000100_00000000;
private static final int WRITE_FLAG_FINISHED = 0b00000_000001000_00000000;
private static final int WRITE_FLAG_RESUMED = 0b00000_000010000_00000000;
private static final int WRITE_FLAG_UP_RESUMED = 0b00000_000100000_00000000;
private static final int WRITE_FLAG_WAKEUP = 0b00000_001000000_00000000;
private static final int WRITE_FLAG_READY = 0b00000_010000000_00000000;
private static final int WRITE_FLAG_NEEDS_READ = 0b00000_100000000_00000000;
public String getStatus() {
final StringBuilder b = new StringBuilder();
b.append("General flags:");
final int state = this.state;
if (allAreSet(state, FLAG_TLS)) b.append(" TLS");
if (allAreSet(state, FLAG_INLINE_TASKS)) b.append(" INLINE_TASKS");
if (allAreSet(state, FLAG_TASK_QUEUED)) b.append(" TASK_QUEUED");
if (allAreSet(state, FLAG_NEED_ENGINE_TASK)) b.append(" NEED_ENGINE_TASK");
if (allAreSet(state, FLAG_FLUSH_NEEDED)) b.append(" FLUSH_NEEDED");
b.append("\nRead flags:");
if (allAreSet(state, READ_FLAG_SHUTDOWN)) b.append(" SHUTDOWN");
if (allAreSet(state, READ_FLAG_EOF)) b.append(" EOF");
if (allAreSet(state, READ_FLAG_RESUMED)) b.append(" RESUMED");
if (allAreSet(state, READ_FLAG_UP_RESUMED)) b.append(" UP_RESUMED");
if (allAreSet(state, READ_FLAG_WAKEUP)) b.append(" WAKEUP");
if (allAreSet(state, READ_FLAG_READY)) b.append(" READY");
if (allAreSet(state, READ_FLAG_NEEDS_WRITE)) b.append(" NEEDS_WRITE");
b.append("\nWrite flags:");
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) b.append(" SHUTDOWN");
if (allAreSet(state, WRITE_FLAG_SHUTDOWN2)) b.append(" SHUTDOWN2");
if (allAreSet(state, WRITE_FLAG_SHUTDOWN3)) b.append(" SHUTDOWN3");
if (allAreSet(state, WRITE_FLAG_FINISHED)) b.append(" FINISHED");
if (allAreSet(state, WRITE_FLAG_RESUMED)) b.append(" RESUMED");
if (allAreSet(state, WRITE_FLAG_UP_RESUMED)) b.append(" UP_RESUMED");
if (allAreSet(state, WRITE_FLAG_WAKEUP)) b.append(" WAKEUP");
if (allAreSet(state, WRITE_FLAG_READY)) b.append(" READY");
if (allAreSet(state, WRITE_FLAG_NEEDS_READ)) b.append(" NEEDS_READ");
b.append('\n');
return b.toString();
}
public String toString() {
return String.format("JSSE Stream Conduit for %s, status:%n%s", connection, getStatus());
}
public XnioWorker getWorker() {
return connection.getIoThread().getWorker();
}
public XnioIoThread getReadThread() {
return connection.getIoThread();
}
public XnioIoThread getWriteThread() {
return connection.getIoThread();
}
private final WriteReadyHandler writeReady = new WriteReadyHandler() {
@Override
public void forceTermination() {
if (anyAreClear(state, WRITE_FLAG_FINISHED)) {
state |= WRITE_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN2 | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
}
final WriteReadyHandler writeReadyHandler = JsseStreamConduit.this.writeReadyHandler;
if (writeReadyHandler != null) try {
writeReadyHandler.forceTermination();
} catch (Throwable ignored) {
}
}
@Override
public void terminated() {
if (anyAreClear(state, WRITE_FLAG_FINISHED)) {
state |= WRITE_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN2 | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
}
final WriteReadyHandler writeReadyHandler = JsseStreamConduit.this.writeReadyHandler;
if (writeReadyHandler != null) try {
writeReadyHandler.terminated();
} catch (Throwable ignored) {
}
}
@Override
public void writeReady() {
JsseStreamConduit.this.writeReady();
}
};
private final ReadReadyHandler readReady = new ReadReadyHandler() {
@Override
public void forceTermination() {
if (anyAreClear(state, READ_FLAG_SHUTDOWN)) {
state |= READ_FLAG_SHUTDOWN;
}
final ReadReadyHandler readReadyHandler = JsseStreamConduit.this.readReadyHandler;
if (readReadyHandler != null) try {
readReadyHandler.forceTermination();
} catch (Throwable ignored) {
}
}
@Override
public void terminated() {
if (anyAreClear(state, READ_FLAG_SHUTDOWN)) {
state |= READ_FLAG_SHUTDOWN;
}
final ReadReadyHandler readReadyHandler = JsseStreamConduit.this.readReadyHandler;
if (readReadyHandler != null) try {
readReadyHandler.terminated();
} catch (Throwable ignored) {
}
}
@Override
public void readReady() {
JsseStreamConduit.this.readReady();
}
};
void beginHandshake() throws IOException {
final int state = this.state;
if (anyAreSet(state, READ_FLAG_EOF | WRITE_FLAG_SHUTDOWN)) {
throw new ClosedChannelException();
}
if (allAreClear(state, FLAG_TLS)) {
this.state = state | FLAG_TLS;
}
engine.beginHandshake();
}
SSLSession getSslSession() {
return allAreSet(state, FLAG_TLS) ? engine.getSession() : null;
}
SSLEngine getEngine() {
return engine;
}
boolean isTls() {
return allAreSet(state, FLAG_TLS);
}
boolean markTerminated() {
readBuffer.free();
receiveBuffer.free();
sendBuffer.free();
if (anyAreClear(state, READ_FLAG_SHUTDOWN | WRITE_FLAG_FINISHED)) {
state |= READ_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN2 | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
return true;
} else {
return false;
}
}
public void run() {
assert currentThread() == getWriteThread();
int state = JsseStreamConduit.this.state;
final boolean flagTaskQueued = allAreSet(state, FLAG_TASK_QUEUED);
boolean modify = flagTaskQueued;
boolean queueTask = false;
state &= ~FLAG_TASK_QUEUED;
try {
if (allAreSet(state, FLAG_NEED_ENGINE_TASK)) {
throw new UnsupportedOperationException();
}
if (anyAreSet(state, WRITE_FLAG_WAKEUP) || allAreSet(state, WRITE_FLAG_RESUMED | WRITE_FLAG_READY)) {
final WriteReadyHandler writeReadyHandler = JsseStreamConduit.this.writeReadyHandler;
if (allAreSet(state, WRITE_FLAG_WAKEUP)) {
state = state & ~WRITE_FLAG_WAKEUP | WRITE_FLAG_RESUMED;
modify = true;
}
if (writeReadyHandler != null) {
if (allAreSet(state, WRITE_FLAG_RESUMED)) {
try {
if (modify) {
modify = false;
JsseStreamConduit.this.state = state;
}
writeReadyHandler.writeReady();
} catch (Throwable ignored) {
} finally {
state = JsseStreamConduit.this.state & ~FLAG_TASK_QUEUED;
modify = true;
}
if (allAreSet(state, WRITE_FLAG_RESUMED)) {
if (!allAreSet(state, WRITE_FLAG_READY) && allAreSet(state, WRITE_FLAG_NEEDS_READ) && allAreClear(state, READ_FLAG_UP_RESUMED)) {
state |= READ_FLAG_UP_RESUMED;
modify = true;
sourceConduit.resumeReads();
} else if (allAreClear(state, WRITE_FLAG_UP_RESUMED)) {
sinkConduit.resumeWrites();
}
}
} else {
if (allAreClear(state, READ_FLAG_NEEDS_WRITE | READ_FLAG_RESUMED) && allAreSet(state, WRITE_FLAG_UP_RESUMED)) {
state &= ~WRITE_FLAG_UP_RESUMED;
modify = true;
suspendWrites();
}
}
} else {
state &= ~WRITE_FLAG_RESUMED;
modify = true;
if (allAreClear(state, READ_FLAG_NEEDS_WRITE | READ_FLAG_RESUMED) && allAreSet(state, WRITE_FLAG_UP_RESUMED)) {
state &= ~WRITE_FLAG_UP_RESUMED;
modify = true;
suspendWrites();
}
}
}
if (anyAreSet(state, READ_FLAG_WAKEUP) || allAreSet(state, READ_FLAG_RESUMED | READ_FLAG_READY)) {
final ReadReadyHandler readReadyHandler = JsseStreamConduit.this.readReadyHandler;
if (allAreSet(state, READ_FLAG_WAKEUP)) {
state = state & ~READ_FLAG_WAKEUP | READ_FLAG_RESUMED;
modify = true;
}
if (readReadyHandler != null) {
if (allAreSet(state, READ_FLAG_RESUMED)) {
try {
if (modify) {
modify = false;
JsseStreamConduit.this.state = state;
}
readReadyHandler.readReady();
} catch (Throwable ignored) {
} finally {
state = JsseStreamConduit.this.state & ~FLAG_TASK_QUEUED;
modify = true;
}
if (allAreSet(state, READ_FLAG_RESUMED)) {
if (allAreSet(state, READ_FLAG_READY)) {
if (!flagTaskQueued) {
state |= FLAG_TASK_QUEUED;
modify = queueTask = true;
}
} else if (allAreSet(state, READ_FLAG_NEEDS_WRITE) && allAreClear(state, WRITE_FLAG_UP_RESUMED)) {
state |= WRITE_FLAG_UP_RESUMED;
modify = true;
sinkConduit.resumeWrites();
} else if (allAreClear(state, READ_FLAG_UP_RESUMED)) {
sourceConduit.resumeReads();
}
}
} else {
if (allAreClear(state, WRITE_FLAG_NEEDS_READ | WRITE_FLAG_RESUMED) && allAreSet(state, READ_FLAG_UP_RESUMED)) {
state &= ~READ_FLAG_UP_RESUMED;
modify = true;
suspendReads();
}
}
} else {
state &= ~READ_FLAG_RESUMED;
modify = true;
if (allAreClear(state, WRITE_FLAG_NEEDS_READ | WRITE_FLAG_RESUMED) && allAreSet(state, READ_FLAG_UP_RESUMED)) {
state &= ~READ_FLAG_UP_RESUMED;
suspendReads();
}
}
}
} finally {
if (modify) {
JsseStreamConduit.this.state = state;
if (queueTask) getReadThread().execute(this);
}
}
}
public void setWriteReadyHandler(final WriteReadyHandler handler) {
this.writeReadyHandler = handler;
}
public void setReadReadyHandler(final ReadReadyHandler handler) {
this.readReadyHandler = handler;
}
public void writeReady() {
int state = this.state;
state |= WRITE_FLAG_READY;
if (allAreSet(state, READ_FLAG_NEEDS_WRITE)) {
state |= READ_FLAG_READY;
}
this.state = state;
if (allAreClear(state, FLAG_TASK_QUEUED)) {
run();
}
state = this.state;
if (sinkConduit.isWriteResumed() && allAreClear(state, WRITE_FLAG_RESUMED | READ_FLAG_NEEDS_WRITE)) {
sinkConduit.suspendWrites();
}
if (sourceConduit.isReadResumed() && allAreClear(state, READ_FLAG_RESUMED | WRITE_FLAG_NEEDS_READ)) {
sourceConduit.suspendReads();
}
}
public void readReady() {
int state = this.state;
state |= READ_FLAG_READY;
if (allAreSet(state, WRITE_FLAG_NEEDS_READ)) {
state |= WRITE_FLAG_READY;
}
this.state = state;
if (allAreClear(state, FLAG_TASK_QUEUED)) {
run();
}
state = this.state;
if (sourceConduit.isReadResumed() && allAreClear(state, READ_FLAG_RESUMED | WRITE_FLAG_NEEDS_READ)) {
sourceConduit.suspendReads();
}
if (sinkConduit.isWriteResumed() && allAreClear(state, WRITE_FLAG_RESUMED | READ_FLAG_NEEDS_WRITE)) {
sinkConduit.suspendWrites();
}
}
public void suspendWrites() {
int state = this.state;
try {
if (allAreSet(state, WRITE_FLAG_RESUMED)) {
state &= ~WRITE_FLAG_RESUMED;
if (allAreSet(state, WRITE_FLAG_UP_RESUMED) && allAreClear(state, READ_FLAG_NEEDS_WRITE)) {
state &= ~WRITE_FLAG_UP_RESUMED;
sinkConduit.suspendWrites();
}
if (allAreSet(state, READ_FLAG_UP_RESUMED) && allAreClear(state, READ_FLAG_RESUMED)) {
state &= ~READ_FLAG_UP_RESUMED;
sourceConduit.suspendReads();
}
}
} finally {
this.state = state;
}
}
public void resumeWrites() {
int state = this.state;
if (allAreClear(state, WRITE_FLAG_RESUMED)) {
if (allAreSet(state, WRITE_FLAG_FINISHED)) {
wakeupWrites();
return;
}
boolean queueTask = false;
try {
state |= WRITE_FLAG_RESUMED;
if (allAreSet(state, WRITE_FLAG_READY)) {
if (queueTask = allAreClear(state, FLAG_TASK_QUEUED)) {
state |= FLAG_TASK_QUEUED;
}
} else if (allAreSet(state, WRITE_FLAG_NEEDS_READ) && allAreClear(state, READ_FLAG_UP_RESUMED)) {
state |= READ_FLAG_UP_RESUMED;
sourceConduit.resumeReads();
} else if (allAreClear(state, WRITE_FLAG_UP_RESUMED)) {
state |= WRITE_FLAG_UP_RESUMED;
sinkConduit.resumeWrites();
}
} finally {
this.state = state;
if (queueTask) getReadThread().execute(this);
}
}
}
public void wakeupWrites() {
final int state = this.state;
if (allAreClear(state, WRITE_FLAG_WAKEUP)) {
if (allAreClear(state, FLAG_TASK_QUEUED)) {
this.state = state | WRITE_FLAG_WAKEUP | FLAG_TASK_QUEUED;
getReadThread().execute(this);
} else {
this.state = state | WRITE_FLAG_WAKEUP;
}
}
}
public void terminateWrites() throws IOException {
int state = this.state;
if (allAreClear(state, WRITE_FLAG_FINISHED)) {
this.state = state | WRITE_FLAG_SHUTDOWN;
if (allAreSet(state, FLAG_TLS)) try {
if (engine.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
engine.closeOutbound();
}
performIO(IO_GOAL_FLUSH, NO_BUFFERS, 0, 0, NO_BUFFERS, 0, 0);
if (allAreSet(this.state, WRITE_FLAG_FINISHED)) {
sinkConduit.terminateWrites();
}
} catch (Throwable t) {
this.state |= WRITE_FLAG_FINISHED;
try {
sinkConduit.truncateWrites();
} catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
} else {
sinkConduit.terminateWrites();
}
}
}
public void truncateWrites() throws IOException {
int state = this.state;
if (allAreClear(state, WRITE_FLAG_SHUTDOWN)) {
if (allAreSet(state, FLAG_TLS)) try {
state |= WRITE_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
try {
engine.closeOutbound();
} catch (Throwable t) {
try {
sinkConduit.truncateWrites();
} catch (Throwable t2) {
t.addSuppressed(t2);
}
throw t;
}
sinkConduit.truncateWrites();
} finally {
this.state = state;
} else {
this.state = state | WRITE_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
sinkConduit.truncateWrites();
}
}
}
public boolean isWriteResumed() {
return anyAreSet(state, WRITE_FLAG_RESUMED | WRITE_FLAG_WAKEUP);
}
public boolean isWriteShutdown() {
return allAreSet(state, WRITE_FLAG_SHUTDOWN);
}
public void awaitWritable() throws IOException {
int state = this.state;
while (allAreSet(state, FLAG_NEED_ENGINE_TASK)) {
synchronized (this) {
while (tasks != 0) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
state &= ~FLAG_NEED_ENGINE_TASK;
this.state = state;
}
}
if (allAreClear(state, WRITE_FLAG_READY)) {
if (allAreSet(state, WRITE_FLAG_NEEDS_READ)) {
sourceConduit.awaitReadable();
} else {
sinkConduit.awaitWritable();
}
}
}
public void awaitWritable(final long time, final TimeUnit timeUnit) throws IOException {
int state = this.state;
long nanos = timeUnit.toNanos(time);
while (allAreSet(state, FLAG_NEED_ENGINE_TASK)) {
synchronized (this) {
long start = System.nanoTime();
while (tasks != 0) {
try {
if (nanos <= 0) {
return;
}
wait(nanos / 1_000_000, (int) (nanos % 1_000_000));
nanos -= -start + (start = System.nanoTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
state &= ~FLAG_NEED_ENGINE_TASK;
this.state = state;
}
}
if (allAreClear(state, WRITE_FLAG_READY)) {
if (allAreSet(state, WRITE_FLAG_NEEDS_READ)) {
sourceConduit.awaitReadable(nanos, TimeUnit.NANOSECONDS);
} else {
sinkConduit.awaitWritable(nanos, TimeUnit.NANOSECONDS);
}
}
}
public void suspendReads() {
int state = this.state;
try {
if (allAreSet(state, READ_FLAG_RESUMED)) {
state &= ~READ_FLAG_RESUMED;
if (allAreSet(state, READ_FLAG_UP_RESUMED) && allAreClear(state, WRITE_FLAG_NEEDS_READ)) {
state &= ~READ_FLAG_UP_RESUMED;
sourceConduit.suspendReads();
}
if (allAreSet(state, WRITE_FLAG_UP_RESUMED) && allAreClear(state, WRITE_FLAG_RESUMED)) {
state &= ~WRITE_FLAG_UP_RESUMED;
sinkConduit.suspendWrites();
}
}
} finally {
this.state = state;
}
}
public void resumeReads() {
int state = this.state;
boolean queueTask = false;
if (allAreClear(state, READ_FLAG_RESUMED)) try {
state |= READ_FLAG_RESUMED;
if (allAreClear(state, WRITE_FLAG_RESUMED)) {
state |= READ_FLAG_READY;
}
if (allAreSet(state, READ_FLAG_READY)) {
if (queueTask = allAreClear(state, FLAG_TASK_QUEUED)) {
state |= FLAG_TASK_QUEUED;
}
} else if (allAreSet(state, READ_FLAG_NEEDS_WRITE) && allAreClear(state, WRITE_FLAG_UP_RESUMED)) {
state |= WRITE_FLAG_UP_RESUMED;
sinkConduit.resumeWrites();
} else if (allAreClear(state, READ_FLAG_UP_RESUMED)) {
state |= READ_FLAG_UP_RESUMED;
sourceConduit.resumeReads();
}
} finally {
this.state = state;
if (queueTask) getReadThread().execute(this);
}
}
public void wakeupReads() {
final int state = this.state;
if (allAreClear(state, READ_FLAG_WAKEUP)) {
if (allAreClear(state, FLAG_TASK_QUEUED)) {
this.state = state | READ_FLAG_WAKEUP | FLAG_TASK_QUEUED;
getReadThread().execute(this);
} else {
this.state = state | READ_FLAG_WAKEUP;
}
}
}
public void terminateReads() throws IOException {
int state = this.state;
if (allAreClear(state, READ_FLAG_SHUTDOWN)) {
if (allAreClear(state, FLAG_TLS)) {
sourceConduit.terminateReads();
} else {
this.state = state | READ_FLAG_SHUTDOWN;
if (allAreClear(state, READ_FLAG_EOF)) {
performIO(IO_GOAL_FLUSH, NO_BUFFERS, 0, 0, NO_BUFFERS, 0, 0);
if (allAreSet(state, WRITE_FLAG_NEEDS_READ)) {
if (allAreClear(state, READ_FLAG_EOF)) {
return;
}
}
if (!engine.isInboundDone() && engine.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
engine.closeInbound();
}
final long res = performIO(IO_GOAL_READ, NO_BUFFERS, 0, 0, NO_BUFFERS, 0, 0);
if (res == -1) {
this.state |= READ_FLAG_EOF;
}
}
if (allAreClear(this.state, READ_FLAG_EOF) || this.receiveBuffer.getResource().hasRemaining()) {
final EOFException exception = msg.connectionClosedEarly();
try {
sourceConduit.terminateReads();
} catch (IOException e) {
exception.addSuppressed(e);
}
throw exception;
} else {
sourceConduit.terminateReads();
}
}
}
}
public boolean isReadResumed() {
return anyAreSet(state, READ_FLAG_RESUMED | READ_FLAG_WAKEUP);
}
public boolean isReadShutdown() {
return allAreSet(state, READ_FLAG_SHUTDOWN);
}
public void awaitReadable() throws IOException {
int state = this.state;
while (allAreSet(state, FLAG_NEED_ENGINE_TASK)) {
synchronized (this) {
while (tasks != 0) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
state &= ~FLAG_NEED_ENGINE_TASK;
this.state = state;
}
}
if (allAreClear(state, READ_FLAG_READY)) {
if (allAreSet(state, READ_FLAG_NEEDS_WRITE)) {
sinkConduit.awaitWritable();
} else {
sourceConduit.awaitReadable();
}
}
}
public void awaitReadable(final long time, final TimeUnit timeUnit) throws IOException {
int state = this.state;
long nanos = timeUnit.toNanos(time);
while (allAreSet(state, FLAG_NEED_ENGINE_TASK)) {
synchronized (this) {
long start = System.nanoTime();
while (tasks != 0) {
try {
if (nanos <= 0) {
return;
}
wait(nanos / 1_000_000, (int) (nanos % 1_000_000));
nanos -= -start + (start = System.nanoTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
state &= ~FLAG_NEED_ENGINE_TASK;
this.state = state;
}
}
if (allAreClear(state, READ_FLAG_READY)) {
if (allAreSet(state, READ_FLAG_NEEDS_WRITE)) {
sinkConduit.awaitWritable(nanos, TimeUnit.NANOSECONDS);
} else {
sourceConduit.awaitReadable(nanos, TimeUnit.NANOSECONDS);
}
}
}
private final ByteBuffer[] readBufferHolder = new ByteBuffer[1];
public int read(final ByteBuffer dst) throws IOException {
final int state = this.state;
if (anyAreSet(state, READ_FLAG_SHUTDOWN)) {
return -1;
}
if (anyAreSet(state, READ_FLAG_EOF)) {
if (readBuffer.getResource().position() > 0) {
final ByteBuffer readBufferResource = readBuffer.getResource();
readBufferResource.flip();
try {
if (TRACE_SSL) msg.tracef("TLS copy unwrapped data from %s to %s", Buffers.debugString(readBufferResource), Buffers.debugString(dst));
return Buffers.copy(dst, readBufferResource);
} finally {
readBufferResource.compact();
}
}
return -1;
} else if (allAreClear(state, FLAG_TLS)) {
int res = sourceConduit.read(dst);
if (res == 0) {
if (allAreSet(state, READ_FLAG_READY)) {
this.state = state & ~READ_FLAG_READY;
}
} else if (res == -1) {
this.state = (state | READ_FLAG_EOF) & ~READ_FLAG_READY;
}
return res;
} else {
final ByteBuffer[] readBufferHolder = this.readBufferHolder;
readBufferHolder[0] = dst;
try {
return (int) performIO(IO_GOAL_READ, NO_BUFFERS, 0, 0, readBufferHolder, 0, 1);
} finally {
readBufferHolder[0] = null;
}
}
}
public long read(final ByteBuffer[] dsts, final int offs, final int len) throws IOException {
final int state = this.state;
if (anyAreSet(state, READ_FLAG_SHUTDOWN)) {
return -1;
} else if (anyAreSet(state, READ_FLAG_EOF)){
if (readBuffer.getResource().position() > 0) {
final ByteBuffer readBufferResource = readBuffer.getResource();
readBufferResource.flip();
try {
if (TRACE_SSL) msg.tracef("TLS copy unwrapped data from %s to %s", Buffers.debugString(readBufferResource), Buffers.debugString(dsts, offs, len));
return Buffers.copy(dsts, offs, len, readBufferResource);
} finally {
readBufferResource.compact();
}
}
return -1;
} else if (allAreClear(state, FLAG_TLS)) {
long res = sourceConduit.read(dsts, offs, len);
if (res == 0) {
if (allAreSet(state, READ_FLAG_READY)) {
this.state = state & ~READ_FLAG_READY;
}
} else if (res == -1) {
this.state = (state | READ_FLAG_EOF) & ~READ_FLAG_READY;
}
return res;
} else {
return performIO(IO_GOAL_READ, NO_BUFFERS, 0, 0, dsts, offs, len);
}
}
public long transferTo(final long position, final long count, final FileChannel target) throws IOException {
if (allAreClear(state, FLAG_TLS)) {
return sourceConduit.transferTo(position, count, target);
} else {
return target.transferFrom(new ConduitReadableByteChannel(this), position, count);
}
}
public long transferTo(final long count, final ByteBuffer throughBuffer, final StreamSinkChannel target) throws IOException {
if (allAreClear(state, FLAG_TLS)) {
return sourceConduit.transferTo(count, throughBuffer, target);
} else {
return Conduits.transfer(this, count, throughBuffer, target);
}
}
private final ByteBuffer[] writeBufferHolder = new ByteBuffer[1];
public int write(final ByteBuffer src) throws IOException {
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
throw new ClosedChannelException();
}
if (allAreClear(state, FLAG_TLS)) {
return sinkConduit.write(src);
} else {
final ByteBuffer[] writeBufferHolder = this.writeBufferHolder;
writeBufferHolder[0] = src;
try {
return (int) write(writeBufferHolder, 0, 1);
} finally {
writeBufferHolder[0] = null;
}
}
}
public int writeFinal(final ByteBuffer src) throws IOException {
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
throw new ClosedChannelException();
}
if (allAreClear(state, FLAG_TLS)) {
return sinkConduit.writeFinal(src);
} else {
final ByteBuffer[] writeBufferHolder = this.writeBufferHolder;
writeBufferHolder[0] = src;
try {
return (int) writeFinal(writeBufferHolder, 0, 1);
} finally {
writeBufferHolder[0] = null;
}
}
}
public long write(final ByteBuffer[] srcs, final int offs, final int len) throws IOException {
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
throw new ClosedChannelException();
}
if (allAreClear(state, FLAG_TLS)) {
return sinkConduit.write(srcs, offs, len);
} else {
final long r1 = Buffers.remaining(srcs, offs, len);
performIO(IO_GOAL_WRITE, srcs, offs, len, NO_BUFFERS, 0, 0);
return (r1 - Buffers.remaining(srcs, offs, len));
}
}
public long writeFinal(final ByteBuffer[] srcs, final int offs, final int len) throws IOException {
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
throw new ClosedChannelException();
}
if (allAreClear(state, FLAG_TLS)) {
return sinkConduit.writeFinal(srcs, offs, len);
} else {
final long r1 = Buffers.remaining(srcs, offs, len);
performIO(IO_GOAL_WRITE_FINAL, srcs, offs, len, NO_BUFFERS, 0, 0);
return (r1 - Buffers.remaining(srcs, offs, len));
}
}
public boolean flush() throws IOException {
int state = this.state;
if (allAreSet(state, WRITE_FLAG_FINISHED)) {
return true;
} else if (allAreSet(state, WRITE_FLAG_SHUTDOWN3)) {
if (sinkConduit.flush()) {
this.state = state | WRITE_FLAG_FINISHED;
return true;
} else {
return false;
}
} else if (allAreClear(state, FLAG_TLS)) {
final boolean flushed = sinkConduit.flush();
if (allAreSet(state, WRITE_FLAG_SHUTDOWN) && flushed) {
this.state = state | WRITE_FLAG_SHUTDOWN2 | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
}
return flushed;
} else if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
return performIO(IO_GOAL_FLUSH, NO_BUFFERS, 0, 0, NO_BUFFERS, 0, 0) != 0L;
} else {
return performIO(IO_GOAL_FLUSH, NO_BUFFERS, 0, 0, NO_BUFFERS, 0, 0) != 0L;
}
}
public long transferFrom(final FileChannel src, final long position, final long count) throws IOException {
if (allAreClear(state, FLAG_TLS)) {
return sinkConduit.transferFrom(src, position, count);
} else {
return src.transferTo(position, count, new ConduitWritableByteChannel(this));
}
}
public long transferFrom(final StreamSourceChannel source, final long count, final ByteBuffer throughBuffer) throws IOException {
if (allAreClear(state, FLAG_TLS)) {
return sinkConduit.transferFrom(source, count, throughBuffer);
} else {
return Conduits.transfer(source, count, throughBuffer, this);
}
}
private static final ByteBuffer[] NO_BUFFERS = new ByteBuffer[0];
private static final int IO_GOAL_READ = 0;
private static final int IO_GOAL_WRITE = 1;
private static final int IO_GOAL_FLUSH = 2;
private static final int IO_GOAL_WRITE_FINAL = 3;
private static long actualIOResult(final long xfer, final int goal, final boolean flushed, final boolean eof) {
final long result = goal == IO_GOAL_FLUSH && flushed ? 1L : goal == IO_GOAL_READ && eof && xfer == 0L ? -1L : xfer;
if (TRACE_SSL) msg.tracef("returned TLS result %d", result);
return result;
}
private static String decodeGoal(int goal) {
switch (goal) {
case 0: return "READ";
case 1: return "WRITE";
case 2: return "FLUSH";
case 3: return "WRITE_FINAL";
default: return "UNKNOWN(" + goal + ")";
}
}
private long performIO(final int goal, final ByteBuffer[] srcs, final int srcOff, final int srcLen, final ByteBuffer[] dsts, final int dstOff, final int dstLen) throws IOException {
if (TRACE_SSL) msg.tracef("performing TLS I/O operation, goal %s, src: %s, dst: %s", decodeGoal(goal), Buffers.debugString(srcs, srcOff, srcLen), Buffers.debugString(dsts, dstOff, dstLen));
assert srcs == NO_BUFFERS || dsts == NO_BUFFERS;
int state = this.state;
assert ! allAreSet(state, READ_FLAG_NEEDS_WRITE | WRITE_FLAG_NEEDS_READ);
if (allAreSet(state, FLAG_NEED_ENGINE_TASK)) {
return 0L;
}
final SSLEngine engine = this.engine;
final ByteBuffer sendBuffer = this.sendBuffer.getResource();
final ByteBuffer receiveBuffer = this.receiveBuffer.getResource();
final ByteBuffer readBuffer = this.readBuffer.getResource();
final ByteBuffer[] realDsts = Arrays.copyOfRange(dsts, dstOff, dstLen + 1);
realDsts[dstLen] = readBuffer;
long remaining = max(Buffers.remaining(srcs, srcOff, srcLen), Buffers.remaining(dsts, dstOff, dstLen));
boolean wrap = goal == IO_GOAL_READ ? anyAreSet(state, READ_FLAG_NEEDS_WRITE | FLAG_FLUSH_NEEDED) : allAreSet(state, FLAG_FLUSH_NEEDED) || allAreClear(state, WRITE_FLAG_NEEDS_READ);
boolean unwrap = !wrap;
boolean flushed = false;
boolean eof = false;
boolean readBlocked = false;
boolean writeBlocked = false;
boolean copiedUnwrappedBytes = false;
boolean wakeupReads = false;
SSLEngineResult result;
SSLEngineResult.HandshakeStatus handshakeStatus;
int rv = 0;
long xfer = 0L;
if (TRACE_SSL) msg.trace("TLS perform IO");
try {
for (;;) {
if (TRACE_SSL) msg.trace("TLS begin IO operation");
if (goal == IO_GOAL_READ && remaining > 0 && readBuffer.position() > 0) {
readBuffer.flip();
try {
if (TRACE_SSL) msg.tracef("TLS copy unwrapped data from %s to %s", Buffers.debugString(readBuffer), Buffers.debugString(dsts, dstOff, dstLen));
rv = Buffers.copy(dsts, dstOff, dstLen, readBuffer);
} finally {
readBuffer.compact();
}
if (rv > 0) {
copiedUnwrappedBytes = true;
xfer += rv;
if ((remaining -= rv) == 0L) {
return actualIOResult(xfer, goal, flushed, eof);
}
}
}
assert ! (wrap && unwrap);
if (wrap) {
if (TRACE_SSL) msg.tracef("TLS wrap from %s to %s", Buffers.debugString(srcs, srcOff, srcLen), Buffers.debugString(sendBuffer));
result = engine.wrap(srcs, srcOff, srcLen, sendBuffer);
WRAP_RESULT: switch (result.getStatus()) {
case BUFFER_UNDERFLOW: {
assert result.bytesConsumed() == 0;
assert result.bytesProduced() == 0;
if (TRACE_SSL) msg.trace("TLS wrap operation UNDERFLOW");
break;
}
case BUFFER_OVERFLOW: {
assert result.bytesConsumed() == 0;
assert result.bytesProduced() == 0;
if (TRACE_SSL) msg.trace("TLS wrap operation OVERFLOW");
if (sendBuffer.position() == 0) {
throw msg.wrongBufferExpansion();
} else {
sendBuffer.flip();
try {
while (sendBuffer.hasRemaining()) {
if (TRACE_SSL) msg.tracef("TLS wrap operation send %s", Buffers.debugString(sendBuffer));
final int res = sinkConduit.write(sendBuffer);
if (res == 0) {
writeBlocked = true;
state &= ~WRITE_FLAG_READY;
assert goal != IO_GOAL_FLUSH || xfer == 0L;
flushed = false;
wrap = false;
break WRAP_RESULT;
}
}
} finally {
sendBuffer.compact();
}
if (goal == IO_GOAL_FLUSH || allAreSet(state, FLAG_FLUSH_NEEDED)) {
if (flushed = sinkConduit.flush()) {
state &= ~FLAG_FLUSH_NEEDED;
}
}
if (goal == IO_GOAL_FLUSH && allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
state |= WRITE_FLAG_SHUTDOWN2;
}
}
break;
}
case CLOSED: {
if (TRACE_SSL) msg.trace("TLS wrap operation CLOSED");
if (allAreClear(state, WRITE_FLAG_SHUTDOWN) && result.bytesProduced() == 0) {
if (goal == IO_GOAL_FLUSH) {
wrap = false;
if (goal == IO_GOAL_FLUSH || allAreSet(state, FLAG_FLUSH_NEEDED)) {
if (flushed = sinkConduit.flush()) {
state &= ~FLAG_FLUSH_NEEDED;
}
}
break;
}
state &= ~(WRITE_FLAG_NEEDS_READ | READ_FLAG_NEEDS_WRITE);
state |= WRITE_FLAG_SHUTDOWN | WRITE_FLAG_SHUTDOWN2 | WRITE_FLAG_SHUTDOWN3 | WRITE_FLAG_FINISHED;
final ClosedChannelException exception = new ClosedChannelException();
try {
sinkConduit.truncateWrites();
} catch (IOException e) {
exception.addSuppressed(e);
}
throw exception;
}
if (allAreSet(state, WRITE_FLAG_SHUTDOWN2)) {
state |= WRITE_FLAG_SHUTDOWN3;
}
}
case OK: {
if (TRACE_SSL) msg.tracef("TLS wrap operation OK consumed: %d produced: %d", result.bytesConsumed(), result.bytesProduced());
state &= ~(WRITE_FLAG_NEEDS_READ | READ_FLAG_NEEDS_WRITE);
final int consumed = result.bytesConsumed();
if (goal == IO_GOAL_READ) {
assert consumed == 0;
wrap = false;
unwrap = true;
} else {
if (consumed > 0 || remaining == 0) {
assert remaining != 0 || consumed == 0;
wrap = false;
}
xfer += consumed;
remaining -= consumed;
}
sendBuffer.flip();
try {
flushed = false;
while (sendBuffer.hasRemaining()) {
final int res = allAreSet(state, WRITE_FLAG_SHUTDOWN3) ? sinkConduit.writeFinal(sendBuffer) : sinkConduit.write(sendBuffer);
if (res == 0) {
writeBlocked = true;
wrap = false;
break;
}
}
} finally {
sendBuffer.compact();
}
if (sendBuffer.position() == 0) {
if (goal == IO_GOAL_FLUSH || allAreSet(state, FLAG_FLUSH_NEEDED)) {
if (flushed = sinkConduit.flush()) {
state &= ~FLAG_FLUSH_NEEDED;
}
}
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
if (allAreClear(state, WRITE_FLAG_SHUTDOWN2)) {
assert sendBuffer.position() == 0;
state |= WRITE_FLAG_SHUTDOWN2;
if (result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
state |= WRITE_FLAG_SHUTDOWN3;
}
}
if (allAreSet(state, WRITE_FLAG_SHUTDOWN3)) {
if (goal == IO_GOAL_FLUSH || sinkConduit.flush()) {
state |= WRITE_FLAG_FINISHED;
}
sinkConduit.terminateWrites();
}
}
}
break;
}
default: {
throw msg.unexpectedWrapResult(result.getStatus());
}
}
} else if (unwrap) {
if (TRACE_SSL) msg.tracef("TLS unwrap from %s to %s", Buffers.debugString(receiveBuffer), Buffers.debugString(realDsts, 0, dstLen + 1));
assert realDsts.length == 1 || realDsts[0] == dsts[dstOff];
assert realDsts[dstLen] == readBuffer;
final long preRem = Buffers.remaining(dsts, dstOff, dstLen);
result = engine.unwrap(receiveBuffer, realDsts, 0, dstLen + 1);
final long userProduced = preRem - Buffers.remaining(dsts, dstOff, dstLen);
switch (result.getStatus()) {
case BUFFER_OVERFLOW: {
assert result.bytesConsumed() == 0;
assert result.bytesProduced() == 0;
assert userProduced == 0;
if (TRACE_SSL) msg.trace("TLS unwrap operation OVERFLOW");
if (!copiedUnwrappedBytes) {
return actualIOResult(xfer, goal, flushed, eof);
}
unwrap = false;
break;
}
case BUFFER_UNDERFLOW: {
assert result.bytesConsumed() == 0;
assert result.bytesProduced() == 0;
assert userProduced == 0;
if (TRACE_SSL) msg.trace("TLS unwrap operation UNDERFLOW");
receiveBuffer.compact();
try {
int res;
res = sourceConduit.read(receiveBuffer);
if (TRACE_SSL) msg.tracef("TLS unwrap operation read %s", Buffers.debugString(receiveBuffer));
if (res == -1) {
state &= ~READ_FLAG_READY;
engine.closeInbound();
} else if (res == 0) {
readBlocked = true;
state &= ~READ_FLAG_READY;
unwrap = false;
} else if (receiveBuffer.hasRemaining()) {
do {
res = sourceConduit.read(receiveBuffer);
} while (res > 0 && receiveBuffer.hasRemaining());
if (res == 0) {
state &= ~READ_FLAG_READY;
}
}
} finally {
receiveBuffer.flip();
}
break;
}
case CLOSED: {
if (result.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
receiveBuffer.compact();
try {
int res;
res = sourceConduit.read(receiveBuffer);
if (TRACE_SSL) msg.tracef("TLS unwrap operation read %s", Buffers.debugString(receiveBuffer));
if (res == -1) {
state &= ~READ_FLAG_READY;
engine.closeInbound();
return actualIOResult(xfer, goal, flushed, eof);
} else if (res == 0) {
readBlocked = true;
state &= ~READ_FLAG_READY;
unwrap = false;
return actualIOResult(xfer, goal, flushed, eof);
} else if (receiveBuffer.hasRemaining()) {
do {
res = sourceConduit.read(receiveBuffer);
} while (res > 0 && receiveBuffer.hasRemaining());
if (res == 0) {
state &= ~READ_FLAG_READY;
}
}
} finally {
receiveBuffer.flip();
}
break;
}
if (TRACE_SSL) msg.trace("TLS unwrap operation CLOSED");
state &= ~(WRITE_FLAG_NEEDS_READ | READ_FLAG_NEEDS_WRITE);
if (goal == IO_GOAL_READ) {
xfer += userProduced;
remaining -= userProduced;
state = (state & ~READ_FLAG_READY) | READ_FLAG_EOF;
} else {
wakeupReads = true;
}
eof = true;
unwrap = false;
if (goal == IO_GOAL_FLUSH) {
wrap = true;
}
break;
}
case OK: {
if (TRACE_SSL) msg.tracef("TLS unwrap operation OK consumed: %d produced: %d", result.bytesConsumed(), result.bytesProduced());
if (allAreClear(state, READ_FLAG_READY)) {
state |= READ_FLAG_READY;
}
state &= ~(WRITE_FLAG_NEEDS_READ | READ_FLAG_NEEDS_WRITE);
if (goal == IO_GOAL_READ) {
xfer += userProduced;
remaining -= userProduced;
} else {
wrap = true;
unwrap = false;
if (result.bytesProduced() > 0) {
wakeupReads = true;
}
}
break;
}
default: {
throw msg.unexpectedUnwrapResult(result.getStatus());
}
}
} else {
return actualIOResult(xfer, goal, flushed, eof);
}
handshakeStatus = result.getHandshakeStatus();
HS: for (;;) {
switch (handshakeStatus) {
case FINISHED: {
if (TRACE_SSL) msg.trace("TLS handshake FINISHED");
connection.invokeHandshakeListener();
}
case NOT_HANDSHAKING: {
if (allAreSet(state, WRITE_FLAG_SHUTDOWN)) {
engine.closeOutbound();
}
break HS;
}
case NEED_TASK: {
if (TRACE_SSL) msg.trace("TLS handshake NEED_TASK");
if (xfer != 0L) {
return actualIOResult(xfer, goal, flushed, eof);
}
if (allAreSet(state, FLAG_INLINE_TASKS)) {
Runnable task;
for (; ; ) {
task = engine.getDelegatedTask();
if (task == null) {
break;
}
try {
task.run();
} catch (Throwable cause) {
throw new SSLException("Delegated task threw an exception", cause);
}
}
handshakeStatus = engine.getHandshakeStatus();
break;
} else {
state |= FLAG_NEED_ENGINE_TASK;
final ArrayList<Runnable> tasks = new ArrayList<>(4);
Runnable task;
for (;;) {
task = engine.getDelegatedTask();
if (task != null) {
tasks.add(task);
} else {
break;
}
}
final int size = tasks.size();
synchronized (JsseStreamConduit.this) {
this.tasks = size;
}
for (int i = 0; i < size; i ++) {
getWorker().execute(new TaskWrapper(tasks.get(i)));
}
return actualIOResult(xfer, goal, flushed, eof);
}
}
case NEED_WRAP: {
if (TRACE_SSL) msg.trace("TLS handshake NEED_WRAP");
state |= READ_FLAG_NEEDS_WRITE | FLAG_FLUSH_NEEDED;
if (writeBlocked) {
return actualIOResult(xfer, goal, flushed, eof);
}
wrap = true;
unwrap = false;
break HS;
}
case NEED_UNWRAP: {
if (TRACE_SSL) msg.trace("TLS handshake NEED_UNWRAP");
if (wrap && ! flushed && ! sinkConduit.flush()) {
state |= FLAG_FLUSH_NEEDED;
}
state |= WRITE_FLAG_NEEDS_READ;
if (readBlocked) {
return actualIOResult(xfer, goal, flushed, eof);
}
wrap = false;
unwrap = true;
break HS;
}
default: {
throw msg.unexpectedHandshakeStatus(result.getHandshakeStatus());
}
}
}
}
} finally {
this.state = state;
if (wakeupReads) {
wakeupReads();
}
}
}
class TaskWrapper implements Runnable {
private final Runnable task;
TaskWrapper(final Runnable task) {
this.task = task;
}
public void run() {
try {
task.run();
} finally {
synchronized (JsseStreamConduit.this) {
if (tasks -- == 1) JsseStreamConduit.this.notifyAll();
}
}
}
}
}