package io.vertx.core.net.impl;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.vertx.core.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.NetworkMetrics;
import io.vertx.core.spi.metrics.TCPMetrics;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
public abstract class ConnectionBase {
public static final VertxException CLOSED_EXCEPTION = new VertxException("Connection was closed", true);
private static final Logger log = LoggerFactory.getLogger(ConnectionBase.class);
private static final int MAX_REGION_SIZE = 1024 * 1024;
public final VoidChannelPromise voidPromise;
protected final VertxInternal vertx;
protected final ChannelHandlerContext chctx;
protected final ContextInternal context;
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private int writeInProgress;
private Object metric;
private boolean read;
private boolean needsFlush;
protected ConnectionBase(VertxInternal vertx, ChannelHandlerContext chctx, ContextInternal context) {
this.vertx = vertx;
this.chctx = chctx;
this.context = context;
this.voidPromise = new VoidChannelPromise(chctx.channel(), false);
}
public void fail(Throwable error) {
handler().fail(error);
}
public VertxHandler handler() {
return (VertxHandler) chctx.handler();
}
final void endReadAndFlush() {
if (read) {
read = false;
if (needsFlush) {
needsFlush = false;
chctx.flush();
}
}
}
final void setRead() {
read = true;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
needsFlush = !flush;
if (flush) {
chctx.writeAndFlush(msg, promise);
} else {
chctx.write(msg, promise);
}
}
public final ChannelPromise toPromise(Handler<AsyncResult<Void>> handler) {
return handler == null ? voidPromise : wrap(handler);
}
private ChannelPromise wrap(Handler<AsyncResult<Void>> handler) {
ChannelPromise promise = chctx.newPromise();
promise.addListener((fut) -> {
if (fut.isSuccess()) {
handler.handle(Future.succeededFuture());
} else {
handler.handle(Future.failedFuture(fut.cause()));
}
}
);
return promise;
}
public void writeToChannel(Object msg, ChannelPromise promise) {
synchronized (this) {
if (!chctx.executor().inEventLoop() || writeInProgress > 0) {
queueForWrite(msg, promise);
return;
}
}
write(msg, !read, promise);
}
private void queueForWrite(Object msg, ChannelPromise promise) {
writeInProgress++;
chctx.executor().execute(() -> {
boolean flush;
synchronized (this) {
flush = --writeInProgress == 0 && !read;
}
write(msg, flush, promise);
});
}
public void writeToChannel(Object obj) {
writeToChannel(obj, voidPromise);
}
public final void flush() {
flush(voidPromise);
}
public final void flush(ChannelPromise promise) {
if (chctx.executor().inEventLoop()) {
if (needsFlush) {
needsFlush = false;
chctx.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
} else {
promise.setSuccess();
}
} else {
chctx.executor().execute(() -> flush(promise));
}
}
public boolean isNotWritable() {
return !chctx.channel().isWritable();
}
public void close() {
close(null);
}
public void close(Handler<AsyncResult<Void>> handler) {
ChannelPromise promise = chctx
.newPromise()
.addListener((ChannelFutureListener) f -> {
ChannelFuture closeFut = chctx.channel().close();
if (handler != null) {
closeFut.addListener(new ChannelFutureListenerAdapter<>(context, null, handler));
}
});
flush(promise);
}
public synchronized ConnectionBase closeHandler(Handler<Void> handler) {
closeHandler = handler;
return this;
}
public synchronized ConnectionBase exceptionHandler(Handler<Throwable> handler) {
this.exceptionHandler = handler;
return this;
}
protected synchronized Handler<Throwable> exceptionHandler() {
return exceptionHandler;
}
public void doPause() {
chctx.channel().config().setAutoRead(false);
}
public void doResume() {
chctx.channel().config().setAutoRead(true);
}
public void doSetWriteQueueMaxSize(int size) {
ChannelConfig config = chctx.channel().config();
config.setWriteBufferWaterMark(new WriteBufferWaterMark(size / 2, size));
}
protected final void checkContext() {
if (context != vertx.getContext()) {
throw new IllegalStateException("Wrong context!");
}
}
public final Channel channel() {
return chctx.channel();
}
public final ChannelHandlerContext channelHandlerContext() {
return chctx;
}
public final ContextInternal getContext() {
return context;
}
public final synchronized void metric(Object metric) {
this.metric = metric;
}
public final synchronized Object metric() {
return metric;
}
public abstract NetworkMetrics metrics();
protected synchronized void handleException(Throwable t) {
NetworkMetrics metrics = metrics();
if (metrics != null) {
metrics.exceptionOccurred(metric, remoteAddress(), t);
}
if (exceptionHandler != null) {
exceptionHandler.handle(t);
} else {
if (log.isDebugEnabled()) {
log.error(t.getMessage(), t);
} else {
log.error(t.getMessage());
}
}
}
protected void handleClosed() {
Handler<Void> handler;
synchronized (this) {
NetworkMetrics metrics = metrics();
if (metrics != null && metrics instanceof TCPMetrics) {
((TCPMetrics) metrics).disconnected(metric(), remoteAddress());
}
handler = closeHandler;
}
if (handler != null) {
handler.handle(null);
}
}
protected void handleIdle() {
chctx.close();
}
protected abstract void handleInterestedOpsChanged();
protected void addFuture(final Handler<AsyncResult<Void>> completionHandler, final ChannelFuture future) {
if (future != null) {
future.addListener(channelFuture -> context.executeFromIO(v -> {
if (completionHandler != null) {
if (channelFuture.isSuccess()) {
completionHandler.handle(Future.succeededFuture());
} else {
completionHandler.handle(Future.failedFuture(channelFuture.cause()));
}
} else if (!channelFuture.isSuccess()) {
handleException(channelFuture.cause());
}
}));
}
}
protected boolean supportsFileRegion() {
return !isSsl();
}
public void reportBytesRead(long numberOfBytes) {
NetworkMetrics metrics = metrics();
if (metrics != null) {
metrics.bytesRead(metric(), remoteAddress(), numberOfBytes);
}
}
public void reportBytesWritten(long numberOfBytes) {
NetworkMetrics metrics = metrics();
if (metrics != null) {
metrics.bytesWritten(metric(), remoteAddress(), numberOfBytes);
}
}
private void sendFileRegion(RandomAccessFile file, long offset, long length, ChannelPromise writeFuture) {
if (length < MAX_REGION_SIZE) {
writeToChannel(new DefaultFileRegion(file.getChannel(), offset, length), writeFuture);
} else {
ChannelPromise promise = chctx.newPromise();
FileRegion region = new DefaultFileRegion(file.getChannel(), offset, MAX_REGION_SIZE);
region.retain();
writeToChannel(region, promise);
promise.addListener(future -> {
if (future.isSuccess()) {
sendFileRegion(file, offset + MAX_REGION_SIZE, length - MAX_REGION_SIZE, writeFuture);
} else {
log.error(future.cause().getMessage(), future.cause());
writeFuture.setFailure(future.cause());
}
});
}
}
protected ChannelFuture sendFile(RandomAccessFile raf, long offset, long length) throws IOException {
ChannelPromise writeFuture = chctx.newPromise();
if (!supportsFileRegion()) {
writeToChannel(new ChunkedFile(raf, offset, length, 8192), writeFuture);
} else {
sendFileRegion(raf, offset, length, writeFuture);
}
if (writeFuture != null) {
writeFuture.addListener(fut -> raf.close());
} else {
raf.close();
}
return writeFuture;
}
public boolean isSsl() {
return chctx.pipeline().get(SslHandler.class) != null;
}
public SSLSession sslSession() {
ChannelHandlerContext sslHandlerContext = chctx.pipeline().context(SslHandler.class);
if (sslHandlerContext != null) {
SslHandler sslHandler = (SslHandler) sslHandlerContext.handler();
return sslHandler.engine().getSession();
} else {
return null;
}
}
public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException {
SSLSession session = sslSession();
if (session != null) {
return session.getPeerCertificateChain();
} else {
return null;
}
}
public String indicatedServerName() {
if (chctx.channel().hasAttr(SslHandshakeCompletionHandler.SERVER_NAME_ATTR)) {
return chctx.channel().attr(SslHandshakeCompletionHandler.SERVER_NAME_ATTR).get();
} else {
return null;
}
}
public ChannelPromise channelFuture() {
return chctx.newPromise();
}
public String remoteName() {
java.net.SocketAddress addr = chctx.channel().remoteAddress();
if (addr instanceof InetSocketAddress) {
return ((InetSocketAddress)addr).getHostString();
}
return null;
}
public SocketAddress remoteAddress() {
java.net.SocketAddress addr = chctx.channel().remoteAddress();
if (addr != null) {
return vertx.transport().convert(addr);
}
return null;
}
public SocketAddress localAddress() {
java.net.SocketAddress addr = chctx.channel().localAddress();
if (addr != null) {
return vertx.transport().convert(addr);
}
return null;
}
public void handleMessage(Object msg) {
}
}