package io.vertx.core.http.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.net.impl.clientconnection.ConnectResult;
import io.vertx.core.net.impl.clientconnection.ConnectionListener;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import java.util.ArrayDeque;
import java.util.Deque;
class WebSocketEndpoint extends ClientHttpEndpointBase {
private final int maxPoolSize;
private final HttpChannelConnector connector;
private final Deque<Waiter> waiters;
private int inflightConnections;
WebSocketEndpoint(ClientMetrics metrics, int port, String host, Object metric, int maxPoolSize, HttpChannelConnector connector, Runnable dispose) {
super(metrics, port, host, metric, dispose);
this.maxPoolSize = maxPoolSize;
this.connector = connector;
this.waiters = new ArrayDeque<>();
}
private static class Waiter {
final Handler<AsyncResult<HttpClientConnection>> handler;
final ContextInternal context;
Waiter(Handler<AsyncResult<HttpClientConnection>> handler, ContextInternal context) {
this.handler = handler;
this.context = context;
}
}
private void tryConnect(ContextInternal ctx, Handler<AsyncResult<HttpClientConnection>> handler) {
class Listener implements ConnectionListener<HttpClientConnection>, Handler<AsyncResult<ConnectResult<HttpClientConnection>>> {
private HttpClientConnection conn;
@Override
public void onConcurrencyChange(long concurrency) {
}
@Override
public void onRecycle() {
}
@Override
public void onEvict() {
connectionRemoved(conn);
Waiter h;
synchronized (WebSocketEndpoint.this) {
if (--inflightConnections > maxPoolSize || waiters.isEmpty()) {
return;
}
h = waiters.poll();
}
tryConnect(h.context, h.handler);
}
@Override
public void handle(AsyncResult<ConnectResult<HttpClientConnection>> ar) {
if (ar.succeeded()) {
ConnectResult<HttpClientConnection> res = ar.result();
HttpClientConnection c = res.connection();
conn = c;
connectionAdded(c);
handler.handle(Future.succeededFuture(c));
} else {
handler.handle(Future.failedFuture(ar.cause()));
}
}
}
Listener listener = new Listener();
connector.connect(listener, ctx, listener);
}
@Override
public void requestConnection2(ContextInternal ctx, Handler<AsyncResult<HttpClientConnection>> handler) {
synchronized (this) {
if (inflightConnections >= maxPoolSize) {
waiters.add(new Waiter(handler, ctx));
return;
}
inflightConnections++;
}
tryConnect(ctx, handler);
}
@Override
public void close() {
super.close();
synchronized (this) {
waiters.forEach(waiter -> {
waiter.context.runOnContext(v -> {
waiter.handler.handle(Future.failedFuture("Closed"));
});
});
waiters.clear();
}
}
}