package io.undertow.client.ajp;
import static io.undertow.util.Headers.CLOSE;
import static io.undertow.util.Headers.CONNECTION;
import static io.undertow.util.Headers.CONTENT_LENGTH;
import static io.undertow.util.Headers.TRANSFER_ENCODING;
import static io.undertow.util.Headers.UPGRADE;
import static org.xnio.Bits.anyAreSet;
import static org.xnio.IoUtils.safeClose;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import io.undertow.client.ClientStatistics;
import org.jboss.logging.Logger;
import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.Option;
import org.xnio.OptionMap;
import io.undertow.connector.ByteBufferPool;
import org.xnio.StreamConnection;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
import org.xnio.channels.Channels;
import org.xnio.channels.StreamSinkChannel;
import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;
import io.undertow.client.ClientCallback;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientExchange;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.client.UndertowClientMessages;
import io.undertow.protocols.ajp.AbstractAjpClientStreamSourceChannel;
import io.undertow.protocols.ajp.AjpClientChannel;
import io.undertow.protocols.ajp.AjpClientRequestClientStreamSinkChannel;
import io.undertow.protocols.ajp.AjpClientResponseStreamSourceChannel;
import io.undertow.util.AbstractAttachable;
import io.undertow.util.Protocols;
class AjpClientConnection extends AbstractAttachable implements Closeable, ClientConnection {
public final ChannelListener<AjpClientRequestClientStreamSinkChannel> requestFinishListener = new ChannelListener<AjpClientRequestClientStreamSinkChannel>() {
@Override
public void handleEvent(AjpClientRequestClientStreamSinkChannel channel) {
if(currentRequest != null) {
currentRequest.terminateRequest();
}
}
};
public final ChannelListener<AjpClientResponseStreamSourceChannel> responseFinishedListener = new ChannelListener<AjpClientResponseStreamSourceChannel>() {
@Override
public void handleEvent(AjpClientResponseStreamSourceChannel channel) {
if(currentRequest != null) {
currentRequest.terminateResponse();
}
}
};
private static final Logger log = Logger.getLogger(AjpClientConnection.class);
private final Deque<AjpClientExchange> pendingQueue = new ArrayDeque<>();
private AjpClientExchange currentRequest;
private final OptionMap options;
private final AjpClientChannel connection;
private final ByteBufferPool bufferPool;
private static final int UPGRADED = 1 << 28;
private static final int UPGRADE_REQUESTED = 1 << 29;
private static final int CLOSE_REQ = 1 << 30;
private static final int CLOSED = 1 << 31;
private int state;
private final ChannelListener.SimpleSetter<AjpClientConnection> closeSetter = new ChannelListener.SimpleSetter<>();
private final ClientStatistics clientStatistics;
private final List<ChannelListener<ClientConnection>> closeListeners = new CopyOnWriteArrayList<>();
AjpClientConnection(final AjpClientChannel connection, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) {
this.clientStatistics = clientStatistics;
this.options = options;
this.connection = connection;
this.bufferPool = bufferPool;
connection.addCloseTask(new ChannelListener<AjpClientChannel>() {
@Override
public void handleEvent(AjpClientChannel channel) {
log.debugf("connection to %s closed", getPeerAddress());
AjpClientConnection.this.state |= CLOSED;
ChannelListeners.invokeChannelListener(AjpClientConnection.this, closeSetter.get());
for(ChannelListener<ClientConnection> listener : closeListeners) {
listener.handleEvent(AjpClientConnection.this);
}
AjpClientExchange pending = pendingQueue.poll();
while (pending != null) {
pending.setFailed(new ClosedChannelException());
pending = pendingQueue.poll();
}
if(currentRequest != null) {
currentRequest.setFailed(new ClosedChannelException());
currentRequest = null;
}
}
});
connection.getReceiveSetter().set(new ClientReceiveListener());
connection.resumeReceives();
}
@Override
public ByteBufferPool getBufferPool() {
return bufferPool;
}
@Override
public SocketAddress getPeerAddress() {
return connection.getPeerAddress();
}
@Override
public <A extends SocketAddress> A getPeerAddress(Class<A> type) {
return connection.getPeerAddress(type);
}
@Override
public ChannelListener.Setter<? extends AjpClientConnection> getCloseSetter() {
return closeSetter;
}
@Override
public SocketAddress getLocalAddress() {
return connection.getLocalAddress();
}
@Override
public <A extends SocketAddress> A getLocalAddress(Class<A> type) {
return connection.getLocalAddress(type);
}
@Override
public XnioWorker getWorker() {
return connection.getWorker();
}
@Override
public XnioIoThread getIoThread() {
return connection.getIoThread();
}
@Override
public boolean isOpen() {
return connection.isOpen();
}
@Override
public boolean supportsOption(Option<?> option) {
return connection.supportsOption(option);
}
@Override
public <T> T getOption(Option<T> option) throws IOException {
return connection.getOption(option);
}
@Override
public <T> T setOption(Option<T> option, T value) throws IllegalArgumentException, IOException {
return connection.setOption(option, value);
}
@Override
public boolean isUpgraded() {
return anyAreSet(state, UPGRADE_REQUESTED | UPGRADED);
}
@Override
public boolean isPushSupported() {
return false;
}
@Override
public boolean isMultiplexingSupported() {
return false;
}
@Override
public ClientStatistics getStatistics() {
return clientStatistics;
}
@Override
public boolean isUpgradeSupported() {
return false;
}
@Override
public void addCloseListener(ChannelListener<ClientConnection> listener) {
closeListeners.add(listener);
}
@Override
public void sendRequest(final ClientRequest request, final ClientCallback<ClientExchange> clientCallback) {
if (anyAreSet(state, UPGRADE_REQUESTED | UPGRADED | CLOSE_REQ | CLOSED)) {
clientCallback.failed(UndertowClientMessages.MESSAGES.invalidConnectionState());
return;
}
final AjpClientExchange AjpClientExchange = new AjpClientExchange(clientCallback, request, this);
if (currentRequest == null) {
initiateRequest(AjpClientExchange);
} else {
pendingQueue.add(AjpClientExchange);
}
}
@Override
public boolean isPingSupported() {
return true;
}
@Override
public void sendPing(PingListener listener, long timeout, TimeUnit timeUnit) {
connection.sendPing(listener, timeout, timeUnit);
}
private void initiateRequest(AjpClientExchange AjpClientExchange) {
currentRequest = AjpClientExchange;
ClientRequest request = AjpClientExchange.getRequest();
String connectionString = request.getRequestHeaders().getFirst(CONNECTION);
if (connectionString != null) {
if (CLOSE.equalToString(connectionString)) {
state |= CLOSE_REQ;
}
} else if (request.getProtocol() != Protocols.HTTP_1_1) {
state |= CLOSE_REQ;
}
if (request.getRequestHeaders().contains(UPGRADE)) {
state |= UPGRADE_REQUESTED;
}
long length = 0;
String fixedLengthString = request.getRequestHeaders().getFirst(CONTENT_LENGTH);
String transferEncodingString = request.getRequestHeaders().getLast(TRANSFER_ENCODING);
if (fixedLengthString != null) {
length = Long.parseLong(fixedLengthString);
} else if (transferEncodingString != null) {
length = -1;
}
AjpClientRequestClientStreamSinkChannel sinkChannel = connection.sendRequest(request.getMethod(), request.getPath(), request.getProtocol(), request.getRequestHeaders(), request, requestFinishListener);
currentRequest.setRequestChannel(sinkChannel);
AjpClientExchange.invokeReadReadyCallback(AjpClientExchange);
if (length == 0) {
try {
sinkChannel.shutdownWrites();
if (!sinkChannel.flush()) {
handleFailedFlush(sinkChannel);
}
} catch (Throwable t) {
handleError((t instanceof IOException) ? (IOException) t : new IOException(t));
}
}
}
private void handleFailedFlush(AjpClientRequestClientStreamSinkChannel sinkChannel) {
sinkChannel.getWriteSetter().set(ChannelListeners.flushingChannelListener(null, new ChannelExceptionHandler<StreamSinkChannel>() {
@Override
public void handleException(StreamSinkChannel channel, IOException exception) {
handleError(exception);
}
}));
sinkChannel.resumeWrites();
}
private void handleError(IOException exception) {
currentRequest.setFailed(exception);
safeClose(connection);
}
public StreamConnection performUpgrade() throws IOException {
throw UndertowMessages.MESSAGES.upgradeNotSupported();
}
public void close() throws IOException {
log.debugf("close called on connection to %s", getPeerAddress());
if (anyAreSet(state, CLOSED)) {
return;
}
state |= CLOSED | CLOSE_REQ;
connection.close();
}
public void requestDone() {
currentRequest = null;
if (anyAreSet(state, CLOSE_REQ)) {
safeClose(connection);
} else if (anyAreSet(state, UPGRADE_REQUESTED)) {
safeClose(connection);
return;
}
AjpClientExchange next = pendingQueue.poll();
if (next != null) {
initiateRequest(next);
}
}
public void requestClose() {
state |= CLOSE_REQ;
}
class ClientReceiveListener implements ChannelListener<AjpClientChannel> {
public void handleEvent(AjpClientChannel channel) {
try {
AbstractAjpClientStreamSourceChannel result = channel.receive();
if(result == null) {
if(!channel.isOpen()) {
getIoThread().execute(new Runnable() {
@Override
public void run() {
if(currentRequest != null) {
currentRequest.setFailed(new ClosedChannelException());
}
}
});
}
return;
}
if(result instanceof AjpClientResponseStreamSourceChannel) {
AjpClientResponseStreamSourceChannel response = (AjpClientResponseStreamSourceChannel) result;
response.setFinishListener(responseFinishedListener);
ClientResponse cr = new ClientResponse(response.getStatusCode(), response.getReasonPhrase(), currentRequest.getRequest().getProtocol(), response.getHeaders());
if (response.getStatusCode() == 100) {
currentRequest.setContinueResponse(cr);
} else {
currentRequest.setResponseChannel(response);
currentRequest.setResponse(cr);
}
} else {
Channels.drain(result, Long.MAX_VALUE);
}
} catch (Throwable e) {
UndertowLogger.CLIENT_LOGGER.exceptionProcessingRequest(e);
safeClose(connection);
if(currentRequest != null) {
currentRequest.setFailed(e instanceof IOException ? (IOException) e : new IOException(e));
}
}
}
}
}