package io.undertow.util;
import io.undertow.UndertowLogger;
import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoUtils;
import org.xnio.StreamConnection;
import org.xnio.XnioExecutor;
import org.xnio.conduits.ConduitStreamSinkChannel;
import org.xnio.conduits.ConduitStreamSourceChannel;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
public class ConnectionUtils {
private static final long MAX_DRAIN_TIME = Long.getLong("io.undertow.max-drain-time", 10000);
private ConnectionUtils() {
}
public static void cleanClose(StreamConnection connection, Closeable... additional) {
try {
connection.getSinkChannel().shutdownWrites();
if (!connection.getSinkChannel().flush()) {
connection.getSinkChannel().setWriteListener(ChannelListeners.flushingChannelListener(new ChannelListener<ConduitStreamSinkChannel>() {
@Override
public void handleEvent(ConduitStreamSinkChannel channel) {
doDrain(connection, additional);
}
}, new ChannelExceptionHandler<ConduitStreamSinkChannel>() {
@Override
public void handleException(ConduitStreamSinkChannel channel, IOException exception) {
UndertowLogger.REQUEST_IO_LOGGER.ioException(exception);
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
}
}));
connection.getSinkChannel().resumeWrites();
} else {
doDrain(connection, additional);
}
} catch (Throwable e) {
if (e instanceof IOException) {
UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) e);
} else {
UndertowLogger.REQUEST_IO_LOGGER.ioException(new IOException(e));
}
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
}
}
private static void doDrain(final StreamConnection connection, final Closeable... additional) {
if (!connection.getSourceChannel().isOpen()) {
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
return;
}
final ByteBuffer b = ByteBuffer.allocate(1);
try {
int res = connection.getSourceChannel().read(b);
b.clear();
if (res == 0) {
final XnioExecutor.Key key = WorkerUtils.executeAfter(connection.getIoThread(), new Runnable() {
@Override
public void run() {
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
}
}, MAX_DRAIN_TIME, TimeUnit.MILLISECONDS);
connection.getSourceChannel().setReadListener(new ChannelListener<ConduitStreamSourceChannel>() {
@Override
public void handleEvent(ConduitStreamSourceChannel channel) {
try {
int res = channel.read(b);
if (res != 0) {
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
key.remove();
}
} catch (Exception e) {
if (e instanceof IOException) {
UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) e);
} else {
UndertowLogger.REQUEST_IO_LOGGER.ioException(new IOException(e));
}
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
key.remove();
}
}
});
connection.getSourceChannel().resumeReads();
} else {
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
}
} catch (Throwable e) {
if (e instanceof IOException) {
UndertowLogger.REQUEST_IO_LOGGER.ioException((IOException) e);
} else {
UndertowLogger.REQUEST_IO_LOGGER.ioException(new IOException(e));
}
IoUtils.safeClose(connection);
IoUtils.safeClose(additional);
}
}
}