/*
* JBoss, Home of Professional Open Source.
* Copyright 2014 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.undertow.server.handlers.proxy;
import io.undertow.UndertowLogger;
import io.undertow.client.ClientConnection;
import io.undertow.client.UndertowClient;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.ServerConnection;
import io.undertow.server.handlers.Cookie;
import io.undertow.util.AttachmentKey;
import io.undertow.util.AttachmentList;
import io.undertow.util.CopyOnWriteMap;
import org.xnio.OptionMap;
import org.xnio.ssl.XnioSsl;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static io.undertow.server.handlers.proxy.ProxyConnectionPool.AvailabilityType.*;
import static io.undertow.server.handlers.proxy.RouteIteratorFactory.*;
import java.util.ArrayList;
import java.util.List;
import static org.xnio.IoUtils.safeClose;
Initial implementation of a load balancing proxy client. This initial implementation is rather simplistic, and
will likely change.
Author: Stuart Douglas, Richard Opalka
/**
* Initial implementation of a load balancing proxy client. This initial implementation is rather simplistic, and
* will likely change.
*
* @author Stuart Douglas
* @author <a href="mailto:ropalka@redhat.com">Richard Opalka</a>
*/
public class LoadBalancingProxyClient implements ProxyClient {
The attachment key that is used to attach the proxy connection to the exchange.
This cannot be static as otherwise a connection from a different client could be re-used.
/**
* The attachment key that is used to attach the proxy connection to the exchange.
* <p>
* This cannot be static as otherwise a connection from a different client could be re-used.
*/
private final AttachmentKey<ExclusiveConnectionHolder> exclusiveConnectionKey = AttachmentKey.create(ExclusiveConnectionHolder.class);
private static final AttachmentKey<AttachmentList<Host>> ATTEMPTED_HOSTS = AttachmentKey.createList(Host.class);
Time in seconds between retries for problem servers
/**
* Time in seconds between retries for problem servers
*/
private volatile int problemServerRetry = 10;
private final Set<String> sessionCookieNames = new CopyOnWriteArraySet<>();
The number of connections to create per thread
/**
* The number of connections to create per thread
*/
private volatile int connectionsPerThread = 10;
private volatile int maxQueueSize = 0;
private volatile int softMaxConnectionsPerThread = 5;
private volatile int ttl = -1;
The hosts list.
/**
* The hosts list.
*/
private volatile Host[] hosts = {};
private final HostSelector hostSelector;
private final UndertowClient client;
private final Map<String, Host> routes = new CopyOnWriteMap<>();
private RouteIteratorFactory routeIteratorFactory = new RouteIteratorFactory(RouteParsingStrategy.SINGLE, ParsingCompatibility.MOD_JK);
private final ExclusivityChecker exclusivityChecker;
private static final ProxyTarget PROXY_TARGET = new ProxyTarget() {
};
@Override
public List<ProxyTarget> getAllTargets() {
List<ProxyTarget> arr = new ArrayList();
for (Host host : hosts) {
HostProxyTarget proxyTarget = new HostProxyTarget() {
Host host;
public void setHost(Host host) {
this.host = host;
}
public String toString(){
return host.getUri().toString();
}
};
proxyTarget.setHost(host);
arr.add(proxyTarget);
}
return arr;
}
public LoadBalancingProxyClient() {
this(UndertowClient.getInstance());
}
public LoadBalancingProxyClient(UndertowClient client) {
this(client, null, null);
}
public LoadBalancingProxyClient(ExclusivityChecker client) {
this(UndertowClient.getInstance(), client, null);
}
public LoadBalancingProxyClient(UndertowClient client, ExclusivityChecker exclusivityChecker) {
this(client, exclusivityChecker, null);
}
public LoadBalancingProxyClient(UndertowClient client, ExclusivityChecker exclusivityChecker, HostSelector hostSelector) {
this.client = client;
this.exclusivityChecker = exclusivityChecker;
sessionCookieNames.add("JSESSIONID");
if(hostSelector == null) {
this.hostSelector = new RoundRobinHostSelector();
} else {
this.hostSelector = hostSelector;
}
}
public LoadBalancingProxyClient addSessionCookieName(final String sessionCookieName) {
sessionCookieNames.add(sessionCookieName);
return this;
}
public LoadBalancingProxyClient removeSessionCookieName(final String sessionCookieName) {
sessionCookieNames.remove(sessionCookieName);
return this;
}
public LoadBalancingProxyClient setProblemServerRetry(int problemServerRetry) {
this.problemServerRetry = problemServerRetry;
return this;
}
public int getProblemServerRetry() {
return problemServerRetry;
}
public int getConnectionsPerThread() {
return connectionsPerThread;
}
public LoadBalancingProxyClient setConnectionsPerThread(int connectionsPerThread) {
this.connectionsPerThread = connectionsPerThread;
return this;
}
public int getMaxQueueSize() {
return maxQueueSize;
}
public LoadBalancingProxyClient setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
return this;
}
public LoadBalancingProxyClient setTtl(int ttl) {
this.ttl = ttl;
return this;
}
public LoadBalancingProxyClient setSoftMaxConnectionsPerThread(int softMaxConnectionsPerThread) {
this.softMaxConnectionsPerThread = softMaxConnectionsPerThread;
return this;
}
public LoadBalancingProxyClient setRouteParsingStrategy(RouteParsingStrategy routeParsingStrategy) {
this.routeIteratorFactory = new RouteIteratorFactory(routeParsingStrategy, ParsingCompatibility.MOD_JK, null);
return this;
}
Configures ranked route delimiter, enabling ranked routing parsing strategy.
/**
* Configures ranked route delimiter, enabling ranked routing parsing strategy.
*/
public LoadBalancingProxyClient setRankedRoutingDelimiter(String rankedRoutingDelimiter) {
this.routeIteratorFactory = new RouteIteratorFactory(RouteParsingStrategy.RANKED, ParsingCompatibility.MOD_JK, rankedRoutingDelimiter);
return this;
}
public synchronized LoadBalancingProxyClient addHost(final URI host) {
return addHost(host, null, null);
}
public synchronized LoadBalancingProxyClient addHost(final URI host, XnioSsl ssl) {
return addHost(host, null, ssl);
}
public synchronized LoadBalancingProxyClient addHost(final URI host, String jvmRoute) {
return addHost(host, jvmRoute, null);
}
public synchronized LoadBalancingProxyClient addHost(final URI host, String jvmRoute, XnioSsl ssl) {
Host h = new Host(jvmRoute, null, host, ssl, OptionMap.EMPTY);
Host[] existing = hosts;
Host[] newHosts = new Host[existing.length + 1];
System.arraycopy(existing, 0, newHosts, 0, existing.length);
newHosts[existing.length] = h;
this.hosts = newHosts;
if (jvmRoute != null) {
this.routes.put(jvmRoute, h);
}
return this;
}
public synchronized LoadBalancingProxyClient addHost(final URI host, String jvmRoute, XnioSsl ssl, OptionMap options) {
return addHost(null, host, jvmRoute, ssl, options);
}
public synchronized LoadBalancingProxyClient addHost(final InetSocketAddress bindAddress, final URI host, String jvmRoute, XnioSsl ssl, OptionMap options) {
Host h = new Host(jvmRoute, bindAddress, host, ssl, options);
Host[] existing = hosts;
Host[] newHosts = new Host[existing.length + 1];
System.arraycopy(existing, 0, newHosts, 0, existing.length);
newHosts[existing.length] = h;
this.hosts = newHosts;
if (jvmRoute != null) {
this.routes.put(jvmRoute, h);
}
return this;
}
public synchronized LoadBalancingProxyClient removeHost(final URI uri) {
int found = -1;
Host[] existing = hosts;
Host removedHost = null;
for (int i = 0; i < existing.length; ++i) {
if (existing[i].uri.equals(uri)) {
found = i;
removedHost = existing[i];
break;
}
}
if (found == -1) {
return this;
}
Host[] newHosts = new Host[existing.length - 1];
System.arraycopy(existing, 0, newHosts, 0, found);
System.arraycopy(existing, found + 1, newHosts, found, existing.length - found - 1);
this.hosts = newHosts;
removedHost.connectionPool.close();
if (removedHost.jvmRoute != null) {
routes.remove(removedHost.jvmRoute);
}
return this;
}
@Override
public ProxyTarget findTarget(HttpServerExchange exchange) {
return PROXY_TARGET;
}
@Override
public void getConnection(ProxyTarget target, HttpServerExchange exchange, final ProxyCallback<ProxyConnection> callback, long timeout, TimeUnit timeUnit) {
final ExclusiveConnectionHolder holder = exchange.getConnection().getAttachment(exclusiveConnectionKey);
if (holder != null && holder.connection.getConnection().isOpen()) {
// Something has already caused an exclusive connection to be allocated so keep using it.
callback.completed(exchange, holder.connection);
return;
}
final Host host = selectHost(exchange);
if (host == null) {
callback.couldNotResolveBackend(exchange);
} else {
exchange.addToAttachmentList(ATTEMPTED_HOSTS, host);
if (holder != null || (exclusivityChecker != null && exclusivityChecker.isExclusivityRequired(exchange))) {
// If we have a holder, even if the connection was closed we now exclusivity was already requested so our client
// may be assuming it still exists.
host.connectionPool.connect(target, exchange, new ProxyCallback<ProxyConnection>() {
@Override
public void completed(HttpServerExchange exchange, ProxyConnection result) {
if (holder != null) {
holder.connection = result;
} else {
final ExclusiveConnectionHolder newHolder = new ExclusiveConnectionHolder();
newHolder.connection = result;
ServerConnection connection = exchange.getConnection();
connection.putAttachment(exclusiveConnectionKey, newHolder);
connection.addCloseListener(new ServerConnection.CloseListener() {
@Override
public void closed(ServerConnection connection) {
ClientConnection clientConnection = newHolder.connection.getConnection();
if (clientConnection.isOpen()) {
safeClose(clientConnection);
}
}
});
}
callback.completed(exchange, result);
}
@Override
public void queuedRequestFailed(HttpServerExchange exchange) {
callback.queuedRequestFailed(exchange);
}
@Override
public void failed(HttpServerExchange exchange) {
UndertowLogger.PROXY_REQUEST_LOGGER.proxyFailedToConnectToBackend(exchange.getRequestURI(), host.uri);
callback.failed(exchange);
}
@Override
public void couldNotResolveBackend(HttpServerExchange exchange) {
callback.couldNotResolveBackend(exchange);
}
}, timeout, timeUnit, true);
} else {
host.connectionPool.connect(target, exchange, callback, timeout, timeUnit, false);
}
}
}
protected Host selectHost(HttpServerExchange exchange) {
AttachmentList<Host> attempted = exchange.getAttachment(ATTEMPTED_HOSTS);
Host[] hosts = this.hosts;
if (hosts.length == 0) {
return null;
}
Iterator<CharSequence> parsedRoutes = parseRoutes(exchange);
while (parsedRoutes.hasNext()) {
// Attempt to find the first existing host which was not yet attempted
Host host = this.routes.get(parsedRoutes.next().toString());
if (host != null) {
if(attempted == null || !attempted.contains(host)) {
return host;
}
}
}
int host = hostSelector.selectHost(hosts);
final int startHost = host; //if the all hosts have problems we come back to this one
Host full = null;
Host problem = null;
do {
Host selected = hosts[host];
if(attempted == null || !attempted.contains(selected)) {
ProxyConnectionPool.AvailabilityType available = selected.connectionPool.available();
if (available == AVAILABLE) {
return selected;
} else if (available == FULL && full == null) {
full = selected;
} else if ((available == PROBLEM || available == FULL_QUEUE) && problem == null) {
problem = selected;
}
}
host = (host + 1) % hosts.length;
} while (host != startHost);
if (full != null) {
return full;
}
if (problem != null) {
return problem;
}
//no available hosts
return null;
}
protected Iterator<CharSequence> parseRoutes(HttpServerExchange exchange) {
for (String cookieName : sessionCookieNames) {
for (Cookie cookie : exchange.requestCookies()) {
if (cookieName.equals(cookie.getName())) {
return routeIteratorFactory.iterator(cookie.getValue());
}
}
}
return routeIteratorFactory.iterator(null);
}
Should only be used for tests
DO NOT CALL THIS METHOD WHEN REQUESTS ARE IN PROGRESS
It is not thread safe so internal state can get messed up.
/**
* Should only be used for tests
*
* DO NOT CALL THIS METHOD WHEN REQUESTS ARE IN PROGRESS
*
* It is not thread safe so internal state can get messed up.
*/
public void closeCurrentConnections() {
for(Host host : hosts) {
host.closeCurrentConnections();
}
}
public final class Host extends ConnectionPoolErrorHandler.SimpleConnectionPoolErrorHandler implements ConnectionPoolManager {
final ProxyConnectionPool connectionPool;
final String jvmRoute;
final URI uri;
final XnioSsl ssl;
private Host(String jvmRoute, InetSocketAddress bindAddress, URI uri, XnioSsl ssl, OptionMap options) {
this.connectionPool = new ProxyConnectionPool(this, bindAddress, uri, ssl, client, options);
this.jvmRoute = jvmRoute;
this.uri = uri;
this.ssl = ssl;
}
@Override
public int getProblemServerRetry() {
return problemServerRetry;
}
@Override
public int getMaxConnections() {
return connectionsPerThread;
}
@Override
public int getMaxCachedConnections() {
return connectionsPerThread;
}
@Override
public int getSMaxConnections() {
return softMaxConnectionsPerThread;
}
@Override
public long getTtl() {
return ttl;
}
@Override
public int getMaxQueueSize() {
return maxQueueSize;
}
public URI getUri() {
return uri;
}
void closeCurrentConnections() {
connectionPool.closeCurrentConnections();
}
}
private static class ExclusiveConnectionHolder {
private ProxyConnection connection;
}
public interface HostSelector {
int selectHost(Host[] availableHosts);
}
static class RoundRobinHostSelector implements HostSelector {
private final AtomicInteger currentHost = new AtomicInteger(0);
@Override
public int selectHost(Host[] availableHosts) {
return currentHost.incrementAndGet() % availableHosts.length;
}
}
}