package io.undertow.server.handlers.proxy.mod_cluster;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import io.undertow.UndertowLogger;
import io.undertow.client.ClientCallback;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientExchange;
import io.undertow.client.ClientRequest;
import io.undertow.client.UndertowClient;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.proxy.ProxyCallback;
import io.undertow.server.handlers.proxy.ProxyConnection;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import io.undertow.util.SameThreadExecutor;
import org.xnio.ChannelExceptionHandler;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.IoFuture;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import io.undertow.connector.ByteBufferPool;
import io.undertow.util.WorkerUtils;
import org.xnio.StreamConnection;
import org.xnio.XnioExecutor;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;
import org.xnio.ssl.XnioSsl;
class NodePingUtil {
interface PingCallback {
void completed();
void failed();
}
static void pingHost(InetSocketAddress address, HttpServerExchange exchange, PingCallback callback, OptionMap options) {
final XnioIoThread thread = exchange.getIoThread();
final XnioWorker worker = thread.getWorker();
final HostPingTask r = new HostPingTask(address, worker, callback, options);
scheduleCancelTask(exchange.getIoThread(), r, 5, TimeUnit.SECONDS);
exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : thread, r);
}
static void pingHttpClient(URI connection, PingCallback callback, HttpServerExchange exchange, UndertowClient client, XnioSsl xnioSsl, OptionMap options) {
final XnioIoThread thread = exchange.getIoThread();
final RequestExchangeListener exchangeListener = new RequestExchangeListener(callback, NodeHealthChecker.NO_CHECK, true);
final Runnable r = new HttpClientPingTask(connection, exchangeListener, thread, client, xnioSsl, exchange.getConnection().getByteBufferPool(), options);
exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : thread, r);
scheduleCancelTask(exchange.getIoThread(), exchangeListener, 5, TimeUnit.SECONDS);
}
static void pingNode(final Node node, final HttpServerExchange exchange, final PingCallback callback) {
if (node == null) {
callback.failed();
return;
}
final int timeout = node.getNodeConfig().getPing();
exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : exchange.getIoThread(), new Runnable() {
@Override
public void run() {
node.getConnectionPool().connect(null, exchange, new ProxyCallback<ProxyConnection>() {
@Override
public void completed(final HttpServerExchange exchange, ProxyConnection result) {
final RequestExchangeListener exchangeListener = new RequestExchangeListener(callback, NodeHealthChecker.NO_CHECK, false);
exchange.dispatch(SameThreadExecutor.INSTANCE, new ConnectionPoolPingTask(result, exchangeListener, node.getNodeConfig().getConnectionURI()));
scheduleCancelTask(exchange.getIoThread(), exchangeListener, timeout, TimeUnit.SECONDS);
}
@Override
public void failed(HttpServerExchange exchange) {
callback.failed();
}
@Override
public void queuedRequestFailed(HttpServerExchange exchange) {
callback.failed();
}
@Override
public void couldNotResolveBackend(HttpServerExchange exchange) {
callback.failed();
}
}, timeout, TimeUnit.SECONDS, false);
}
});
}
static void internalPingNode(Node node, PingCallback callback, NodeHealthChecker healthChecker, XnioIoThread ioThread, ByteBufferPool bufferPool, UndertowClient client, XnioSsl xnioSsl, OptionMap options) {
final URI uri = node.getNodeConfig().getConnectionURI();
final long timeout = node.getNodeConfig().getPing();
final RequestExchangeListener exchangeListener = new RequestExchangeListener(callback, healthChecker, true);
final HttpClientPingTask r = new HttpClientPingTask(uri, exchangeListener, ioThread, client, xnioSsl, bufferPool, options);
scheduleCancelTask(ioThread, exchangeListener, timeout, TimeUnit.SECONDS);
ioThread.execute(r);
}
static class ConnectionPoolPingTask implements Runnable {
private final RequestExchangeListener exchangeListener;
private final ProxyConnection proxyConnection;
private final URI uri;
ConnectionPoolPingTask(ProxyConnection proxyConnection, RequestExchangeListener exchangeListener, URI uri) {
this.proxyConnection = proxyConnection;
this.exchangeListener = exchangeListener;
this.uri = uri;
}
@Override
public void run() {
final ClientRequest request = new ClientRequest();
request.setMethod(Methods.OPTIONS);
request.setPath("*");
request.getRequestHeaders()
.add(Headers.USER_AGENT, "mod_cluster ping")
.add(Headers.HOST, uri.getHost());
proxyConnection.getConnection().sendRequest(request, new ClientCallback<ClientExchange>() {
@Override
public void completed(final ClientExchange result) {
if (exchangeListener.isDone()) {
IoUtils.safeClose(proxyConnection.getConnection());
return;
}
exchangeListener.exchange = result;
result.setResponseListener(exchangeListener);
try {
result.getRequestChannel().shutdownWrites();
if (!result.getRequestChannel().flush()) {
result.getRequestChannel().getWriteSetter().set(ChannelListeners.flushingChannelListener(null, new ChannelExceptionHandler<StreamSinkChannel>() {
@Override
public void handleException(StreamSinkChannel channel, IOException exception) {
IoUtils.safeClose(proxyConnection.getConnection());
exchangeListener.taskFailed();
}
}));
result.getRequestChannel().resumeWrites();
}
} catch (IOException e) {
IoUtils.safeClose(proxyConnection.getConnection());
exchangeListener.taskFailed();
}
}
@Override
public void failed(IOException e) {
exchangeListener.taskFailed();
}
});
}
}
static class HostPingTask extends CancellableTask implements Runnable {
private final InetSocketAddress address;
private final XnioWorker worker;
private final OptionMap options;
HostPingTask(InetSocketAddress address, XnioWorker worker, PingCallback callback, OptionMap options) {
super(callback);
this.address = address;
this.worker = worker;
this.options = options;
}
@Override
public void run() {
try {
final IoFuture<StreamConnection> future = worker.openStreamConnection(address, new ChannelListener<StreamConnection>() {
@Override
public void handleEvent(StreamConnection channel) {
IoUtils.safeClose(channel);
}
}, options);
future.addNotifier(new IoFuture.HandlingNotifier<StreamConnection, Void>() {
@Override
public void handleCancelled(Void attachment) {
cancel();
}
@Override
public void handleFailed(IOException exception, Void attachment) {
taskFailed();
}
@Override
public void handleDone(StreamConnection data, Void attachment) {
taskCompleted();
}
}, null);
} catch (Exception e) {
taskFailed();
}
}
}
static class HttpClientPingTask implements Runnable {
private final URI connection;
private final XnioIoThread thread;
private final UndertowClient client;
private final XnioSsl xnioSsl;
private final ByteBufferPool bufferPool;
private final OptionMap options;
private final RequestExchangeListener exchangeListener;
HttpClientPingTask(URI connection, RequestExchangeListener exchangeListener, XnioIoThread thread, UndertowClient client, XnioSsl xnioSsl, ByteBufferPool bufferPool, OptionMap options) {
this.connection = connection;
this.thread = thread;
this.client = client;
this.xnioSsl = xnioSsl;
this.bufferPool = bufferPool;
this.options = options;
this.exchangeListener = exchangeListener;
}
@Override
public void run() {
UndertowLogger.ROOT_LOGGER.httpClientPingTask(connection);
client.connect(new ClientCallback<ClientConnection>() {
@Override
public void completed(final ClientConnection clientConnection) {
if (exchangeListener.isDone()) {
IoUtils.safeClose(clientConnection);
return;
}
final ClientRequest request = new ClientRequest();
request.setMethod(Methods.OPTIONS);
request.setPath("*");
request.getRequestHeaders()
.add(Headers.USER_AGENT, "mod_cluster ping")
.add(Headers.HOST, connection.getHost());
clientConnection.sendRequest(request, new ClientCallback<ClientExchange>() {
@Override
public void completed(ClientExchange result) {
exchangeListener.exchange = result;
if (exchangeListener.isDone()) {
return;
}
result.setResponseListener(exchangeListener);
try {
result.getRequestChannel().shutdownWrites();
if (!result.getRequestChannel().flush()) {
result.getRequestChannel().getWriteSetter().set(ChannelListeners.flushingChannelListener(null, new ChannelExceptionHandler<StreamSinkChannel>() {
@Override
public void handleException(StreamSinkChannel channel, IOException exception) {
IoUtils.safeClose(clientConnection);
exchangeListener.taskFailed();
}
}));
result.getRequestChannel().resumeWrites();
}
} catch (IOException e) {
IoUtils.safeClose(clientConnection);
exchangeListener.taskFailed();
}
}
@Override
public void failed(IOException e) {
exchangeListener.taskFailed();
IoUtils.safeClose(clientConnection);
}
});
}
@Override
public void failed(IOException e) {
exchangeListener.taskFailed();
}
}, connection, thread, xnioSsl, bufferPool, options);
}
}
static class RequestExchangeListener extends CancellableTask implements ClientCallback<ClientExchange> {
private ClientExchange exchange;
private final boolean closeConnection;
private final NodeHealthChecker healthChecker;
RequestExchangeListener(PingCallback callback, NodeHealthChecker healthChecker, boolean closeConnection) {
super(callback);
assert healthChecker != null;
this.closeConnection = closeConnection;
this.healthChecker = healthChecker;
}
@Override
public void completed(final ClientExchange result) {
if (isDone()) {
IoUtils.safeClose(result.getConnection());
return;
}
final ChannelListener<StreamSourceChannel> listener = ChannelListeners.drainListener(Long.MAX_VALUE, new ChannelListener<StreamSourceChannel>() {
@Override
public void handleEvent(StreamSourceChannel channel) {
try {
if (healthChecker.checkResponse(result.getResponse())) {
taskCompleted();
} else {
taskFailed();
}
} finally {
if (closeConnection) {
if (exchange != null) {
IoUtils.safeClose(exchange.getConnection());
}
}
}
}
}, new ChannelExceptionHandler<StreamSourceChannel>() {
@Override
public void handleException(StreamSourceChannel channel, IOException exception) {
taskFailed();
if (exception != null) {
IoUtils.safeClose(exchange.getConnection());
}
}
});
StreamSourceChannel responseChannel = result.getResponseChannel();
responseChannel.getReadSetter().set(listener);
responseChannel.resumeReads();
listener.handleEvent(responseChannel);
}
@Override
public void failed(IOException e) {
taskFailed();
if (exchange != null) {
IoUtils.safeClose(exchange.getConnection());
}
}
}
enum State {
WAITING, DONE, CANCELLED;
}
static class CancellableTask {
private final PingCallback delegate;
private volatile State state = State.WAITING;
private volatile XnioExecutor.Key cancelKey;
CancellableTask(PingCallback callback) {
this.delegate = callback;
}
boolean isDone() {
return state != State.WAITING;
}
void setCancelKey(XnioExecutor.Key cancelKey) {
if (state == State.WAITING) {
this.cancelKey = cancelKey;
} else {
cancelKey.remove();
}
}
void taskCompleted() {
if (state == State.WAITING) {
state = State.DONE;
if (cancelKey != null) {
cancelKey.remove();
}
delegate.completed();
}
}
void taskFailed() {
if (state == State.WAITING) {
state = State.DONE;
if (cancelKey != null) {
cancelKey.remove();
}
delegate.failed();
}
}
void cancel() {
if (state == State.WAITING) {
state = State.CANCELLED;
if (cancelKey != null) {
cancelKey.remove();
}
delegate.failed();
}
}
}
static void scheduleCancelTask(final XnioIoThread ioThread, final CancellableTask cancellable, final long timeout, final TimeUnit timeUnit ) {
final XnioExecutor.Key key = WorkerUtils.executeAfter(ioThread, new Runnable() {
@Override
public void run() {
cancellable.cancel();
}
}, timeout, timeUnit);
cancellable.setCancelKey(key);
}
}