package org.apache.http.impl.nio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import org.apache.http.ConnectionClosedException;
import org.apache.http.Consts;
import org.apache.http.Header;
import org.apache.http.HttpConnectionMetrics;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpInetConnection;
import org.apache.http.HttpMessage;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.config.MessageConstraints;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.entity.ContentLengthStrategy;
import org.apache.http.impl.HttpConnectionMetricsImpl;
import org.apache.http.impl.entity.LaxContentLengthStrategy;
import org.apache.http.impl.entity.StrictContentLengthStrategy;
import org.apache.http.impl.io.HttpTransportMetricsImpl;
import org.apache.http.impl.nio.codecs.ChunkDecoder;
import org.apache.http.impl.nio.codecs.ChunkEncoder;
import org.apache.http.impl.nio.codecs.IdentityDecoder;
import org.apache.http.impl.nio.codecs.IdentityEncoder;
import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
import org.apache.http.io.HttpTransportMetrics;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.reactor.EventMask;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.reactor.SessionBufferStatus;
import org.apache.http.nio.reactor.SessionInputBuffer;
import org.apache.http.nio.reactor.SessionOutputBuffer;
import org.apache.http.nio.reactor.SocketAccessor;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.Args;
import org.apache.http.util.CharsetUtils;
import org.apache.http.util.NetUtils;
@SuppressWarnings("deprecation")
public class NHttpConnectionBase
implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
protected final ContentLengthStrategy incomingContentStrategy;
protected final ContentLengthStrategy outgoingContentStrategy;
protected final SessionInputBufferImpl inbuf;
protected final SessionOutputBufferImpl outbuf;
private final int fragmentSizeHint;
private final MessageConstraints constraints;
protected final HttpTransportMetricsImpl inTransportMetrics;
protected final HttpTransportMetricsImpl outTransportMetrics;
protected final HttpConnectionMetricsImpl connMetrics;
protected HttpContext context;
protected IOSession session;
protected SocketAddress remote;
protected volatile ContentDecoder contentDecoder;
protected volatile boolean hasBufferedInput;
protected volatile ContentEncoder contentEncoder;
protected volatile boolean hasBufferedOutput;
protected volatile HttpRequest request;
protected volatile HttpResponse response;
protected volatile int status;
@Deprecated
public NHttpConnectionBase(
final IOSession session,
final ByteBufferAllocator allocator,
final HttpParams params) {
super();
Args.notNull(session, "I/O session");
Args.notNull(params, "HTTP params");
int bufferSize = params.getIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, -1);
if (bufferSize <= 0) {
bufferSize = 4096;
}
int lineBufferSize = bufferSize;
if (lineBufferSize > 512) {
lineBufferSize = 512;
}
CharsetDecoder decoder = null;
CharsetEncoder encoder = null;
Charset charset = CharsetUtils.lookup(
(String) params.getParameter(CoreProtocolPNames.HTTP_ELEMENT_CHARSET));
if (charset != null) {
charset = Consts.ASCII;
decoder = charset.newDecoder();
encoder = charset.newEncoder();
final CodingErrorAction malformedCharAction = (CodingErrorAction) params.getParameter(
CoreProtocolPNames.HTTP_MALFORMED_INPUT_ACTION);
final CodingErrorAction unmappableCharAction = (CodingErrorAction) params.getParameter(
CoreProtocolPNames.HTTP_UNMAPPABLE_INPUT_ACTION);
decoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
encoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
}
this.inbuf = new SessionInputBufferImpl(bufferSize, lineBufferSize, decoder, allocator);
this.outbuf = new SessionOutputBufferImpl(bufferSize, lineBufferSize, encoder, allocator);
this.fragmentSizeHint = bufferSize;
this.constraints = MessageConstraints.DEFAULT;
this.incomingContentStrategy = createIncomingContentStrategy();
this.outgoingContentStrategy = createOutgoingContentStrategy();
this.inTransportMetrics = createTransportMetrics();
this.outTransportMetrics = createTransportMetrics();
this.connMetrics = createConnectionMetrics(
this.inTransportMetrics,
this.outTransportMetrics);
setSession(session);
this.status = ACTIVE;
}
protected NHttpConnectionBase(
final IOSession session,
final int bufferSize,
final int fragmentSizeHint,
final ByteBufferAllocator allocator,
final CharsetDecoder charDecoder,
final CharsetEncoder charEncoder,
final MessageConstraints constraints,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy) {
Args.notNull(session, "I/O session");
Args.positive(bufferSize, "Buffer size");
int lineBufferSize = bufferSize;
if (lineBufferSize > 512) {
lineBufferSize = 512;
}
this.inbuf = new SessionInputBufferImpl(bufferSize, lineBufferSize, charDecoder, allocator);
this.outbuf = new SessionOutputBufferImpl(bufferSize, lineBufferSize, charEncoder, allocator);
this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize;
this.inTransportMetrics = new HttpTransportMetricsImpl();
this.outTransportMetrics = new HttpTransportMetricsImpl();
this.connMetrics = new HttpConnectionMetricsImpl(this.inTransportMetrics, this.outTransportMetrics);
this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT;
this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
LaxContentLengthStrategy.INSTANCE;
this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
StrictContentLengthStrategy.INSTANCE;
setSession(session);
this.status = ACTIVE;
}
protected NHttpConnectionBase(
final IOSession session,
final int bufferSize,
final int fragmentSizeHint,
final ByteBufferAllocator allocator,
final CharsetDecoder charDecoder,
final CharsetEncoder charEncoder,
final ContentLengthStrategy incomingContentStrategy,
final ContentLengthStrategy outgoingContentStrategy) {
this(session, bufferSize, fragmentSizeHint, allocator, charDecoder, charEncoder,
null, incomingContentStrategy, outgoingContentStrategy);
}
private void setSession(final IOSession session) {
this.session = session;
this.context = new SessionHttpContext(this.session);
this.session.setBufferStatus(this);
this.remote = this.session.getRemoteAddress();
}
protected void bind(final IOSession session) {
Args.notNull(session, "I/O session");
setSession(session);
}
@Deprecated
protected ContentLengthStrategy createIncomingContentStrategy() {
return new LaxContentLengthStrategy();
}
@Deprecated
protected ContentLengthStrategy createOutgoingContentStrategy() {
return new StrictContentLengthStrategy();
}
@Deprecated
protected HttpTransportMetricsImpl createTransportMetrics() {
return new HttpTransportMetricsImpl();
}
@Deprecated
protected HttpConnectionMetricsImpl createConnectionMetrics(
final HttpTransportMetrics inTransportMetric,
final HttpTransportMetrics outTransportMetric) {
return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
}
@Override
public int getStatus() {
return this.status;
}
@Override
public HttpContext getContext() {
return this.context;
}
@Override
public HttpRequest getHttpRequest() {
return this.request;
}
@Override
public HttpResponse getHttpResponse() {
return this.response;
}
@Override
public void requestInput() {
this.session.setEvent(EventMask.READ);
}
@Override
public void requestOutput() {
this.session.setEvent(EventMask.WRITE);
}
@Override
public void suspendInput() {
this.session.clearEvent(EventMask.READ);
}
@Override
public void suspendOutput() {
synchronized (this.session) {
if (!this.outbuf.hasData()) {
this.session.clearEvent(EventMask.WRITE);
}
}
}
protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
final BasicHttpEntity entity = new BasicHttpEntity();
final long len = this.incomingContentStrategy.determineLength(message);
this.contentDecoder = createContentDecoder(
len,
this.session.channel(),
this.inbuf,
this.inTransportMetrics);
if (len == ContentLengthStrategy.CHUNKED) {
entity.setChunked(true);
entity.setContentLength(-1);
} else if (len == ContentLengthStrategy.IDENTITY) {
entity.setChunked(false);
entity.setContentLength(-1);
} else {
entity.setChunked(false);
entity.setContentLength(len);
}
final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
if (contentTypeHeader != null) {
entity.setContentType(contentTypeHeader);
}
final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
if (contentEncodingHeader != null) {
entity.setContentEncoding(contentEncodingHeader);
}
return entity;
}
protected ContentDecoder createContentDecoder(
final long len,
final ReadableByteChannel channel,
final SessionInputBuffer buffer,
final HttpTransportMetricsImpl metrics) {
if (len == ContentLengthStrategy.CHUNKED) {
return new ChunkDecoder(channel, buffer, this.constraints, metrics);
} else if (len == ContentLengthStrategy.IDENTITY) {
return new IdentityDecoder(channel, buffer, metrics);
} else {
return new LengthDelimitedDecoder(channel, buffer, metrics, len);
}
}
protected void prepareEncoder(final HttpMessage message) throws HttpException {
final long len = this.outgoingContentStrategy.determineLength(message);
this.contentEncoder = createContentEncoder(
len,
this.session.channel(),
this.outbuf,
this.outTransportMetrics);
}
protected ContentEncoder createContentEncoder(
final long len,
final WritableByteChannel channel,
final SessionOutputBuffer buffer,
final HttpTransportMetricsImpl metrics) {
if (len == ContentLengthStrategy.CHUNKED) {
return new ChunkEncoder(channel, buffer, metrics, this.fragmentSizeHint);
} else if (len == ContentLengthStrategy.IDENTITY) {
return new IdentityEncoder(channel, buffer, metrics, this.fragmentSizeHint);
} else {
return new LengthDelimitedEncoder(channel, buffer, metrics, len, this.fragmentSizeHint);
}
}
@Override
public boolean hasBufferedInput() {
return this.hasBufferedInput;
}
@Override
public boolean hasBufferedOutput() {
return this.hasBufferedOutput;
}
protected void assertNotClosed() throws ConnectionClosedException {
if (this.status != ACTIVE) {
throw new ConnectionClosedException();
}
}
@Override
public void close() throws IOException {
if (this.status != ACTIVE) {
return;
}
this.status = CLOSING;
if (this.outbuf.hasData()) {
this.session.setEvent(EventMask.WRITE);
} else {
this.session.close();
this.status = CLOSED;
}
}
@Override
public boolean isOpen() {
return this.status == ACTIVE && !this.session.isClosed();
}
@Override
public boolean isStale() {
return this.session.isClosed();
}
@Override
public InetAddress getLocalAddress() {
final SocketAddress address = this.session.getLocalAddress();
return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getAddress() : null;
}
@Override
public int getLocalPort() {
final SocketAddress address = this.session.getLocalAddress();
return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getPort() : -1;
}
@Override
public InetAddress getRemoteAddress() {
final SocketAddress address = this.session.getRemoteAddress();
return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getAddress() : null;
}
@Override
public int getRemotePort() {
final SocketAddress address = this.session.getRemoteAddress();
return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getPort() : -1;
}
@Override
public void setSocketTimeout(final int timeout) {
this.session.setSocketTimeout(timeout);
}
@Override
public int getSocketTimeout() {
return this.session.getSocketTimeout();
}
@Override
public void shutdown() throws IOException {
this.status = CLOSED;
this.session.shutdown();
}
@Override
public HttpConnectionMetrics getMetrics() {
return this.connMetrics;
}
@Override
public String toString() {
final SocketAddress remoteAddress = this.session.getRemoteAddress();
final SocketAddress localAddress = this.session.getLocalAddress();
if (remoteAddress != null && localAddress != null) {
final StringBuilder buffer = new StringBuilder();
NetUtils.formatAddress(buffer, localAddress);
buffer.append("<->");
NetUtils.formatAddress(buffer, remoteAddress);
return buffer.toString();
}
return "[Not bound]";
}
@Override
public Socket getSocket() {
return this.session instanceof SocketAccessor ? ((SocketAccessor) this.session).getSocket() : null;
}
}