/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistration;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.WebConnection;
import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.juli.logging.Log;
import org.apache.tomcat.InstanceManager;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
import org.apache.tomcat.util.net.SocketEvent;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.res.StringManager;
public abstract class AbstractProtocol<S> implements ProtocolHandler,
MBeanRegistration {
The string manager for this package.
/**
* The string manager for this package.
*/
private static final StringManager sm = StringManager.getManager(AbstractProtocol.class);
Counter used to generate unique JMX names for connectors using automatic
port binding.
/**
* Counter used to generate unique JMX names for connectors using automatic
* port binding.
*/
private static final AtomicInteger nameCounter = new AtomicInteger(0);
Name of MBean for the Global Request Processor.
/**
* Name of MBean for the Global Request Processor.
*/
protected ObjectName rgOname = null;
Unique ID for this connector. Only used if the connector is configured
to use a random port as the port will change if stop(), start() is
called.
/**
* Unique ID for this connector. Only used if the connector is configured
* to use a random port as the port will change if stop(), start() is
* called.
*/
private int nameIndex = 0;
Endpoint that provides low-level network I/O - must be matched to the
ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
Endpoint etc.).
/**
* Endpoint that provides low-level network I/O - must be matched to the
* ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO
* Endpoint etc.).
*/
private final AbstractEndpoint<S,?> endpoint;
private Handler<S> handler;
private final Set<Processor> waitingProcessors =
Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());
Controller for the timeout scheduling.
/**
* Controller for the timeout scheduling.
*/
private ScheduledFuture<?> timeoutFuture = null;
private ScheduledFuture<?> monitorFuture;
public AbstractProtocol(AbstractEndpoint<S,?> endpoint) {
this.endpoint = endpoint;
setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER);
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}
// ----------------------------------------------- Generic property handling
Generic property setter used by the digester. Other code should not need
to use this. The digester will only use this method if it can't find a
more specific setter. That means the property belongs to the Endpoint,
the ServerSocketFactory or some other lower level component. This method
ensures that it is visible to both.
Params: - name – The name of the property to set
- value – The value, in string form, to set for the property
Returns: true
if the property was set successfully, otherwise
false
/**
* Generic property setter used by the digester. Other code should not need
* to use this. The digester will only use this method if it can't find a
* more specific setter. That means the property belongs to the Endpoint,
* the ServerSocketFactory or some other lower level component. This method
* ensures that it is visible to both.
*
* @param name The name of the property to set
* @param value The value, in string form, to set for the property
*
* @return <code>true</code> if the property was set successfully, otherwise
* <code>false</code>
*/
public boolean setProperty(String name, String value) {
return endpoint.setProperty(name, value);
}
Generic property getter used by the digester. Other code should not need
to use this.
Params: - name – The name of the property to get
Returns: The value of the property converted to a string
/**
* Generic property getter used by the digester. Other code should not need
* to use this.
*
* @param name The name of the property to get
*
* @return The value of the property converted to a string
*/
public String getProperty(String name) {
return endpoint.getProperty(name);
}
// ------------------------------- Properties managed by the ProtocolHandler
The adapter provides the link between the ProtocolHandler and the
connector.
/**
* The adapter provides the link between the ProtocolHandler and the
* connector.
*/
protected Adapter adapter;
@Override
public void setAdapter(Adapter adapter) { this.adapter = adapter; }
@Override
public Adapter getAdapter() { return adapter; }
The maximum number of idle processors that will be retained in the cache and re-used with a subsequent request. The default is 200. A value of -1 means unlimited. In the unlimited case, the theoretical maximum number of cached Processor objects is getMaxConnections()
although it will usually be closer to getMaxThreads()
. /**
* The maximum number of idle processors that will be retained in the cache
* and re-used with a subsequent request. The default is 200. A value of -1
* means unlimited. In the unlimited case, the theoretical maximum number of
* cached Processor objects is {@link #getMaxConnections()} although it will
* usually be closer to {@link #getMaxThreads()}.
*/
protected int processorCache = 200;
public int getProcessorCache() { return this.processorCache; }
public void setProcessorCache(int processorCache) {
this.processorCache = processorCache;
}
private String clientCertProvider = null;
When client certificate information is presented in a form other than instances of X509Certificate
it needs to be converted before it can be used and this property controls which JSSE provider is used to perform the conversion. For example it is used with the AJP connectors, the HTTP APR connector and with the SSLValve
. If not specified, the default provider will be used. Returns: The name of the JSSE provider to use
/**
* When client certificate information is presented in a form other than
* instances of {@link java.security.cert.X509Certificate} it needs to be
* converted before it can be used and this property controls which JSSE
* provider is used to perform the conversion. For example it is used with
* the AJP connectors, the HTTP APR connector and with the
* {@link org.apache.catalina.valves.SSLValve}. If not specified, the
* default provider will be used.
*
* @return The name of the JSSE provider to use
*/
public String getClientCertProvider() { return clientCertProvider; }
public void setClientCertProvider(String s) { this.clientCertProvider = s; }
private int maxHeaderCount = 100;
public int getMaxHeaderCount() {
return maxHeaderCount;
}
public void setMaxHeaderCount(int maxHeaderCount) {
this.maxHeaderCount = maxHeaderCount;
}
@Override
public boolean isAprRequired() {
return false;
}
@Override
public boolean isSendfileSupported() {
return endpoint.getUseSendfile();
}
// ---------------------- Properties that are passed through to the EndPoint
@Override
public Executor getExecutor() { return endpoint.getExecutor(); }
@Override
public void setExecutor(Executor executor) {
endpoint.setExecutor(executor);
}
@Override
public ScheduledExecutorService getUtilityExecutor() { return endpoint.getUtilityExecutor(); }
@Override
public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
endpoint.setUtilityExecutor(utilityExecutor);
}
public int getMaxThreads() { return endpoint.getMaxThreads(); }
public void setMaxThreads(int maxThreads) {
endpoint.setMaxThreads(maxThreads);
}
public int getMaxConnections() { return endpoint.getMaxConnections(); }
public void setMaxConnections(int maxConnections) {
endpoint.setMaxConnections(maxConnections);
}
public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); }
public void setMinSpareThreads(int minSpareThreads) {
endpoint.setMinSpareThreads(minSpareThreads);
}
public int getThreadPriority() { return endpoint.getThreadPriority(); }
public void setThreadPriority(int threadPriority) {
endpoint.setThreadPriority(threadPriority);
}
public int getAcceptCount() { return endpoint.getAcceptCount(); }
public void setAcceptCount(int acceptCount) { endpoint.setAcceptCount(acceptCount); }
public boolean getTcpNoDelay() { return endpoint.getTcpNoDelay(); }
public void setTcpNoDelay(boolean tcpNoDelay) {
endpoint.setTcpNoDelay(tcpNoDelay);
}
public int getConnectionLinger() { return endpoint.getConnectionLinger(); }
public void setConnectionLinger(int connectionLinger) {
endpoint.setConnectionLinger(connectionLinger);
}
The time Tomcat will wait for a subsequent request before closing the connection. The default is getConnectionTimeout()
. Returns: The timeout in milliseconds
/**
* The time Tomcat will wait for a subsequent request before closing the
* connection. The default is {@link #getConnectionTimeout()}.
*
* @return The timeout in milliseconds
*/
public int getKeepAliveTimeout() { return endpoint.getKeepAliveTimeout(); }
public void setKeepAliveTimeout(int keepAliveTimeout) {
endpoint.setKeepAliveTimeout(keepAliveTimeout);
}
public InetAddress getAddress() { return endpoint.getAddress(); }
public void setAddress(InetAddress ia) {
endpoint.setAddress(ia);
}
public int getPort() { return endpoint.getPort(); }
public void setPort(int port) {
endpoint.setPort(port);
}
public int getPortOffset() { return endpoint.getPortOffset(); }
public void setPortOffset(int portOffset) {
endpoint.setPortOffset(portOffset);
}
public int getPortWithOffset() { return endpoint.getPortWithOffset(); }
public int getLocalPort() { return endpoint.getLocalPort(); }
/*
* When Tomcat expects data from the client, this is the time Tomcat will
* wait for that data to arrive before closing the connection.
*/
public int getConnectionTimeout() {
return endpoint.getConnectionTimeout();
}
public void setConnectionTimeout(int timeout) {
endpoint.setConnectionTimeout(timeout);
}
public long getConnectionCount() {
return endpoint.getConnectionCount();
}
NO-OP.
Params: - threadCount – Unused
Deprecated: Will be removed in Tomcat 10.
/**
* NO-OP.
*
* @param threadCount Unused
*
* @deprecated Will be removed in Tomcat 10.
*/
@Deprecated
public void setAcceptorThreadCount(int threadCount) {
}
Always returns 1.
Returns: Always 1. Deprecated: Will be removed in Tomcat 10.
/**
* Always returns 1.
*
* @return Always 1.
*
* @deprecated Will be removed in Tomcat 10.
*/
@Deprecated
public int getAcceptorThreadCount() {
return 1;
}
public void setAcceptorThreadPriority(int threadPriority) {
endpoint.setAcceptorThreadPriority(threadPriority);
}
public int getAcceptorThreadPriority() {
return endpoint.getAcceptorThreadPriority();
}
// ---------------------------------------------------------- Public methods
public synchronized int getNameIndex() {
if (nameIndex == 0) {
nameIndex = nameCounter.incrementAndGet();
}
return nameIndex;
}
The name will be prefix-address-port if address is non-null and
prefix-port if the address is null.
Returns: A name for this protocol instance that is appropriately quoted
for use in an ObjectName.
/**
* The name will be prefix-address-port if address is non-null and
* prefix-port if the address is null.
*
* @return A name for this protocol instance that is appropriately quoted
* for use in an ObjectName.
*/
public String getName() {
return ObjectName.quote(getNameInternal());
}
private String getNameInternal() {
StringBuilder name = new StringBuilder(getNamePrefix());
name.append('-');
if (getAddress() != null) {
name.append(getAddress().getHostAddress());
name.append('-');
}
int port = getPortWithOffset();
if (port == 0) {
// Auto binding is in use. Check if port is known
name.append("auto-");
name.append(getNameIndex());
port = getLocalPort();
if (port != -1) {
name.append('-');
name.append(port);
}
} else {
name.append(port);
}
return name.toString();
}
public void addWaitingProcessor(Processor processor) {
waitingProcessors.add(processor);
}
public void removeWaitingProcessor(Processor processor) {
waitingProcessors.remove(processor);
}
// ----------------------------------------------- Accessors for sub-classes
protected AbstractEndpoint<S,?> getEndpoint() {
return endpoint;
}
protected Handler<S> getHandler() {
return handler;
}
protected void setHandler(Handler<S> handler) {
this.handler = handler;
}
// -------------------------------------------------------- Abstract methods
Concrete implementations need to provide access to their logger to be
used by the abstract classes.
Returns: the logger
/**
* Concrete implementations need to provide access to their logger to be
* used by the abstract classes.
* @return the logger
*/
protected abstract Log getLog();
Obtain the prefix to be used when construction a name for this protocol
handler. The name will be prefix-address-port.
Returns: the prefix
/**
* Obtain the prefix to be used when construction a name for this protocol
* handler. The name will be prefix-address-port.
* @return the prefix
*/
protected abstract String getNamePrefix();
Obtain the name of the protocol, (Http, Ajp, etc.). Used with JMX.
Returns: the protocol name
/**
* Obtain the name of the protocol, (Http, Ajp, etc.). Used with JMX.
* @return the protocol name
*/
protected abstract String getProtocolName();
Find a suitable handler for the protocol negotiated
at the network layer.
Params: - name – The name of the requested negotiated protocol.
Returns: The instance where UpgradeProtocol.getAlpnName()
matches the requested protocol
/**
* Find a suitable handler for the protocol negotiated
* at the network layer.
* @param name The name of the requested negotiated protocol.
* @return The instance where {@link UpgradeProtocol#getAlpnName()} matches
* the requested protocol
*/
protected abstract UpgradeProtocol getNegotiatedProtocol(String name);
Find a suitable handler for the protocol upgraded name specified. This
is used for direct connection protocol selection.
Params: - name – The name of the requested negotiated protocol.
Returns: The instance where UpgradeProtocol.getAlpnName()
matches the requested protocol
/**
* Find a suitable handler for the protocol upgraded name specified. This
* is used for direct connection protocol selection.
* @param name The name of the requested negotiated protocol.
* @return The instance where {@link UpgradeProtocol#getAlpnName()} matches
* the requested protocol
*/
protected abstract UpgradeProtocol getUpgradeProtocol(String name);
Create and configure a new Processor instance for the current protocol
implementation.
Returns: A fully configured Processor instance that is ready to use
/**
* Create and configure a new Processor instance for the current protocol
* implementation.
*
* @return A fully configured Processor instance that is ready to use
*/
protected abstract Processor createProcessor();
protected abstract Processor createUpgradeProcessor(
SocketWrapperBase<?> socket,
UpgradeToken upgradeToken);
// ----------------------------------------------------- JMX related methods
protected String domain;
protected ObjectName oname;
protected MBeanServer mserver;
public ObjectName getObjectName() {
return oname;
}
public String getDomain() {
return domain;
}
@Override
public ObjectName preRegister(MBeanServer server, ObjectName name)
throws Exception {
oname = name;
mserver = server;
domain = name.getDomain();
return name;
}
@Override
public void postRegister(Boolean registrationDone) {
// NOOP
}
@Override
public void preDeregister() throws Exception {
// NOOP
}
@Override
public void postDeregister() {
// NOOP
}
private ObjectName createObjectName() throws MalformedObjectNameException {
// Use the same domain as the connector
domain = getAdapter().getDomain();
if (domain == null) {
return null;
}
StringBuilder name = new StringBuilder(getDomain());
name.append(":type=ProtocolHandler,port=");
int port = getPortWithOffset();
if (port > 0) {
name.append(port);
} else {
name.append("auto-");
name.append(getNameIndex());
}
InetAddress address = getAddress();
if (address != null) {
name.append(",address=");
name.append(ObjectName.quote(address.getHostAddress()));
}
return new ObjectName(name.toString());
}
// ------------------------------------------------------- Lifecycle methods
/*
* NOTE: There is no maintenance of state or checking for valid transitions
* within this class. It is expected that the connector will maintain state
* and prevent invalid state transitions.
*/
@Override
public void init() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
logPortOffset();
}
if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if (oname != null) {
Registry.getRegistry(null, null).registerComponent(this, oname, null);
}
}
if (this.domain != null) {
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null, null).registerComponent(
getHandler().getGlobal(), rgOname, null);
}
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
endpoint.init();
}
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
logPortOffset();
}
endpoint.start();
monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
if (!isPaused()) {
startAsyncTimeout();
}
}
}, 0, 60, TimeUnit.SECONDS);
}
Note: The name of this method originated with the Servlet 3.0
asynchronous processing but evolved over time to represent a timeout that
is triggered independently of the socket read/write timeouts.
/**
* Note: The name of this method originated with the Servlet 3.0
* asynchronous processing but evolved over time to represent a timeout that
* is triggered independently of the socket read/write timeouts.
*/
protected void startAsyncTimeout() {
if (timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) {
if (timeoutFuture != null && timeoutFuture.isDone()) {
// There was an error executing the scheduled task, get it and log it
try {
timeoutFuture.get();
} catch (InterruptedException | ExecutionException e) {
getLog().error(sm.getString("abstractProtocolHandler.asyncTimeoutError"), e);
}
}
timeoutFuture = getUtilityExecutor().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
long now = System.currentTimeMillis();
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(now);
}
}
}, 1, 1, TimeUnit.SECONDS);
}
}
protected void stopAsyncTimeout() {
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
timeoutFuture = null;
}
}
@Override
public void pause() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.pause", getName()));
}
stopAsyncTimeout();
endpoint.pause();
}
public boolean isPaused() {
return endpoint.isPaused();
}
@Override
public void resume() throws Exception {
if(getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.resume", getName()));
}
endpoint.resume();
startAsyncTimeout();
}
@Override
public void stop() throws Exception {
if(getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
logPortOffset();
}
if (monitorFuture != null) {
monitorFuture.cancel(true);
monitorFuture = null;
}
stopAsyncTimeout();
// Timeout any waiting processor
for (Processor processor : waitingProcessors) {
processor.timeoutAsync(-1);
}
endpoint.stop();
}
@Override
public void destroy() throws Exception {
if(getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
logPortOffset();
}
try {
endpoint.destroy();
} finally {
if (oname != null) {
if (mserver == null) {
Registry.getRegistry(null, null).unregisterComponent(oname);
} else {
// Possibly registered with a different MBeanServer
try {
mserver.unregisterMBean(oname);
} catch (MBeanRegistrationException | InstanceNotFoundException e) {
getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
oname, mserver));
}
}
}
if (rgOname != null) {
Registry.getRegistry(null, null).unregisterComponent(rgOname);
}
}
}
@Override
public void closeServerSocketGraceful() {
endpoint.closeServerSocketGraceful();
}
private void logPortOffset() {
if (getPort() != getPortWithOffset()) {
getLog().info(sm.getString("abstractProtocolHandler.portOffset", getName(),
String.valueOf(getPort()), String.valueOf(getPortOffset())));
}
}
// ------------------------------------------- Connection handler base class
protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> {
private final AbstractProtocol<S> proto;
private final RequestGroupInfo global = new RequestGroupInfo();
private final AtomicLong registerCount = new AtomicLong(0);
private final Map<S,Processor> connections = new ConcurrentHashMap<>();
private final RecycledProcessors recycledProcessors = new RecycledProcessors(this);
public ConnectionHandler(AbstractProtocol<S> proto) {
this.proto = proto;
}
protected AbstractProtocol<S> getProtocol() {
return proto;
}
protected Log getLog() {
return getProtocol().getLog();
}
@Override
public Object getGlobal() {
return global;
}
@Override
public void recycle() {
recycledProcessors.clear();
}
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.process",
wrapper.getSocket(), status));
}
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
processor, socket));
}
// Timeouts are calculated on a dedicated thread and then
// dispatched. Because of delays in the dispatch process, the
// timeout may no longer be required. Check here and avoid
// unnecessary processing.
if (SocketEvent.TIMEOUT == status &&
(processor == null ||
!processor.isAsync() && !processor.isUpgrade() ||
processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
// This is effectively a NO-OP
return SocketState.OPEN;
}
if (processor != null) {
// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);
} else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
ContainerThreadMarker.set();
try {
if (processor == null) {
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
// OpenSSL typically returns null whereas JSSE typically
// returns "" when no protocol is negotiated
if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
UpgradeProtocol upgradeProtocol =
getProtocol().getNegotiatedProtocol(negotiatedProtocol);
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
} else if (negotiatedProtocol.equals("http/1.1")) {
// Explicitly negotiated the default protocol.
// Obtain a processor below.
} else {
// TODO:
// OpenSSL 1.0.2's ALPN callback doesn't support
// failing the handshake with an error if no
// protocol can be negotiated. Therefore, we need to
// fail the connection here. Once this is fixed,
// replace the code below with the commented out
// block.
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
}
return SocketState.CLOSED;
/*
* To replace the code above once OpenSSL 1.1.0 is
* used.
// Failed to create processor. This is a bug.
throw new IllegalStateException(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
*/
}
}
}
if (processor == null) {
processor = recycledProcessors.pop();
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.processorPop",
processor));
}
}
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
// Associate the processor with the connection
connections.put(socket, processor);
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
UpgradeToken upgradeToken = processor.getUpgradeToken();
// Retrieve leftover input
ByteBuffer leftOverInput = processor.getLeftoverInput();
if (upgradeToken == null) {
// Assume direct HTTP/2 connection
UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
wrapper.unRead(leftOverInput);
// Associate with the processor with the connection
connections.put(socket, processor);
} else {
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
"h2c"));
}
return SocketState.CLOSED;
}
} else {
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(processor);
// Create the upgrade processor
processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
processor, wrapper));
}
wrapper.unRead(leftOverInput);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
if (upgradeToken.getInstanceManager() == null) {
httpUpgradeHandler.init((WebConnection) processor);
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.init((WebConnection) processor);
} finally {
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) {
// The handler will initiate all further I/O
state = SocketState.LONG;
}
}
}
}
} while ( state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketEvent.OPEN_WRITE) {
longPoll(wrapper, processor);
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.SUSPENDED) {
// Don't add sockets back to the poller.
// The resumeProcessing() method will add this socket
// to the poller.
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
if (instanceManager == null) {
httpUpgradeHandler.destroy();
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.destroy();
} finally {
try {
instanceManager.destroyInstance(httpUpgradeHandler);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
}
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
} else {
release(processor);
}
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
} catch (ProtocolException e) {
// Protocol exceptions normally mean the client sent invalid or
// incomplete data.
getLog().debug(sm.getString(
"abstractConnectionHandler.protocolexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (OutOfMemoryError oome) {
// Try and handle this here to give Tomcat a chance to close the
// connection and prevent clients waiting until they time out.
// Worst case, it isn't recoverable and the attempt at logging
// will trigger another OOME.
getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
} finally {
ContainerThreadMarker.clear();
}
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
release(processor);
return SocketState.CLOSED;
}
protected void longPoll(SocketWrapperBase<?> socket, Processor processor) {
if (!processor.isAsync()) {
// This is currently only used with HTTP
// Either:
// - this is an upgraded connection
// - the request line/headers have not been completely
// read
socket.registerReadInterest();
}
}
@Override
public Set<S> getOpenSockets() {
return connections.keySet();
}
Expected to be used by the handler once the processor is no longer
required.
Params: - processor – Processor being released (that was associated with
the socket)
/**
* Expected to be used by the handler once the processor is no longer
* required.
*
* @param processor Processor being released (that was associated with
* the socket)
*/
private void release(Processor processor) {
if (processor != null) {
processor.recycle();
// After recycling, only instances of UpgradeProcessorBase will
// return true for isUpgrade().
// Instances of UpgradeProcessorBase should not be added to
// recycledProcessors since that pool is only for AJP or HTTP
// processors
if (!processor.isUpgrade()) {
recycledProcessors.push(processor);
getLog().debug("Pushed Processor [" + processor + "]");
}
}
}
Expected to be used by the Endpoint to release resources on socket
close, errors etc.
/**
* Expected to be used by the Endpoint to release resources on socket
* close, errors etc.
*/
@Override
public void release(SocketWrapperBase<S> socketWrapper) {
S socket = socketWrapper.getSocket();
Processor processor = connections.remove(socket);
release(processor);
}
protected void register(Processor processor) {
if (getProtocol().getDomain() != null) {
synchronized (this) {
try {
long count = registerCount.incrementAndGet();
RequestInfo rp =
processor.getRequest().getRequestProcessor();
rp.setGlobalProcessor(global);
ObjectName rpName = new ObjectName(
getProtocol().getDomain() +
":type=RequestProcessor,worker="
+ getProtocol().getName() +
",name=" + getProtocol().getProtocolName() +
"Request" + count);
if (getLog().isDebugEnabled()) {
getLog().debug("Register " + rpName);
}
Registry.getRegistry(null, null).registerComponent(rp,
rpName, null);
rp.setRpName(rpName);
} catch (Exception e) {
getLog().warn("Error registering request");
}
}
}
}
protected void unregister(Processor processor) {
if (getProtocol().getDomain() != null) {
synchronized (this) {
try {
Request r = processor.getRequest();
if (r == null) {
// Probably an UpgradeProcessor
return;
}
RequestInfo rp = r.getRequestProcessor();
rp.setGlobalProcessor(null);
ObjectName rpName = rp.getRpName();
if (getLog().isDebugEnabled()) {
getLog().debug("Unregister " + rpName);
}
Registry.getRegistry(null, null).unregisterComponent(
rpName);
rp.setRpName(null);
} catch (Exception e) {
getLog().warn("Error unregistering request", e);
}
}
}
}
@Override
public final void pause() {
/*
* Inform all the processors associated with current connections
* that the endpoint is being paused. Most won't care. Those
* processing multiplexed streams may wish to take action. For
* example, HTTP/2 may wish to stop accepting new streams.
*
* Note that even if the endpoint is resumed, there is (currently)
* no API to inform the Processors of this.
*/
for (Processor processor : connections.values()) {
processor.pause();
}
}
}
protected static class RecycledProcessors extends SynchronizedStack<Processor> {
private final transient ConnectionHandler<?> handler;
protected final AtomicInteger size = new AtomicInteger(0);
public RecycledProcessors(ConnectionHandler<?> handler) {
this.handler = handler;
}
@SuppressWarnings("sync-override") // Size may exceed cache size a bit
@Override
public boolean push(Processor processor) {
int cacheSize = handler.getProtocol().getProcessorCache();
boolean offer = cacheSize == -1 ? true : size.get() < cacheSize;
//avoid over growing our cache or add after we have stopped
boolean result = false;
if (offer) {
result = super.push(processor);
if (result) {
size.incrementAndGet();
}
}
if (!result) handler.unregister(processor);
return result;
}
@SuppressWarnings("sync-override") // OK if size is too big briefly
@Override
public Processor pop() {
Processor result = super.pop();
if (result != null) {
size.decrementAndGet();
}
return result;
}
@Override
public synchronized void clear() {
Processor next = pop();
while (next != null) {
handler.unregister(next);
next = pop();
}
super.clear();
size.set(0);
}
}
}