/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl.clientconnection;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
import java.util.Set;
An endpoint, i.e a set of connection to the same address.
Author: Julien Viet
/**
* An endpoint, i.e a set of connection to the same address.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public abstract class Endpoint<C> {
private final Set<C> connectionMap = new ConcurrentHashSet<>();
private final Runnable dispose;
private boolean closed;
private boolean disposed;
private long pendingRequestCount;
private long openConnectionCount;
public Endpoint(Runnable dispose) {
this.dispose = dispose;
}
public boolean getConnection(ContextInternal ctx, Handler<AsyncResult<C>> handler) {
synchronized (this) {
if (disposed) {
return false;
}
pendingRequestCount++;
}
requestConnection(ctx, ar -> {
boolean dispose;
synchronized (Endpoint.this) {
pendingRequestCount--;
dispose = checkDispose();
}
// Dispose before callback otherwise we can have the callback handler retrying the same
// endpoint and never get the callback it expects to creating an infinite loop
if (dispose) {
disposeInternal();
}
handler.handle(ar);
});
return true;
}
public abstract void requestConnection(ContextInternal ctx, Handler<AsyncResult<C>> handler);
protected void connectionAdded(C conn) {
synchronized (this) {
if (connectionMap.add(conn)) {
openConnectionCount++;
} else {
System.out.println("BUG!!!");
}
if (!closed) {
return;
}
}
close(conn);
}
protected void connectionRemoved(C conn) {
// CHECK SHOULD CLOSE
synchronized (this) {
if (connectionMap.remove(conn)) {
openConnectionCount--;
} else {
System.out.println("BUG!!!!");
}
if (!checkDispose()) {
return;
}
}
disposeInternal();
}
private void disposeInternal() {
dispose.run();
dispose();
}
private boolean checkDispose() {
if (!disposed && openConnectionCount == 0 && pendingRequestCount == 0) {
disposed = true;
return true;
}
return false;
}
Hook to cleanup when all metrics have been processed, e.g unregistering metrics, this method is called when
the endpoint will not accept anymore requests.
/**
* Hook to cleanup when all metrics have been processed, e.g unregistering metrics, this method is called when
* the endpoint will not accept anymore requests.
*/
protected void dispose() {
}
Close the connection
. /**
* Close the {@code connection}.
*/
protected void close(C connection) {
}
Close the endpoint, this will close all connections, this method is called by the ConnectionManager
when it is closed. /**
* Close the endpoint, this will close all connections, this method is called by the {@link ConnectionManager} when
* it is closed.
*/
protected void close() {
synchronized (this) {
if (closed) {
throw new IllegalStateException();
}
closed = true;
}
for (C conn : connectionMap) {
close(conn);
}
}
}