package io.vertx.core.datagram.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.MaxMessagesRecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.impl.AddressResolver;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.core.spi.metrics.*;
import io.vertx.core.streams.WriteStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Objects;
public class DatagramSocketImpl implements DatagramSocket, MetricsProvider {
public static DatagramSocketImpl create(VertxInternal vertx, DatagramSocketOptions options) {
DatagramSocketImpl socket = new DatagramSocketImpl(vertx, options);
socket.init();
return socket;
}
private final ContextInternal context;
private final DatagramSocketMetrics metrics;
private DatagramChannel channel;
private Handler<io.vertx.core.datagram.DatagramPacket> packetHandler;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private long demand;
private DatagramSocketImpl(VertxInternal vertx, DatagramSocketOptions options) {
Transport transport = vertx.transport();
DatagramChannel channel = transport.datagramChannel(options.isIpV6() ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4);
transport.configure(channel, new DatagramSocketOptions(options));
ContextInternal context = vertx.getOrCreateContext();
channel.config().setOption(ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, true);
MaxMessagesRecvByteBufAllocator bufAllocator = channel.config().getRecvByteBufAllocator();
bufAllocator.maxMessagesPerRead(1);
context.nettyEventLoop().register(channel);
if (options.getLogActivity()) {
channel.pipeline().addLast("logging", new LoggingHandler());
}
VertxMetrics metrics = vertx.metricsSPI();
this.metrics = metrics != null ? metrics.createDatagramSocketMetrics(options) : null;
this.channel = channel;
this.context = context;
this.demand = Long.MAX_VALUE;
}
private void init() {
channel.pipeline().addLast("handler", VertxHandler.create(this::createConnection));
}
@Override
public DatagramSocket listenMulticastGroup(String multicastAddress, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = listenMulticastGroup(multicastAddress);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> listenMulticastGroup(String multicastAddress) {
ChannelFuture fut;
try {
fut = channel.joinGroup(InetAddress.getByName(multicastAddress));
} catch (UnknownHostException e) {
return context.failedFuture(e);
}
PromiseInternal<Void> promise = context.promise();
fut.addListener(promise);
return promise.future();
}
@Override
public DatagramSocket listenMulticastGroup(String multicastAddress, String networkInterface, String source, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = listenMulticastGroup(multicastAddress, networkInterface, source);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> listenMulticastGroup(String multicastAddress, String networkInterface, @Nullable String source) {
ChannelFuture fut;
try {
InetAddress sourceAddress;
if (source == null) {
sourceAddress = null;
} else {
sourceAddress = InetAddress.getByName(source);
}
fut = channel.joinGroup(InetAddress.getByName(multicastAddress), NetworkInterface.getByName(networkInterface), sourceAddress);
} catch (Exception e) {
return context.failedFuture(e);
}
PromiseInternal<Void> promise = context.promise();
fut.addListener(promise);
return promise.future();
}
@Override
public DatagramSocket unlistenMulticastGroup(String multicastAddress, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = unlistenMulticastGroup(multicastAddress);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> unlistenMulticastGroup(String multicastAddress) {
ChannelFuture fut;
try {
fut = channel.leaveGroup(InetAddress.getByName(multicastAddress));
} catch (Exception e) {
return context.failedFuture(e);
}
PromiseInternal<Void> promise = context.promise();
fut.addListener(promise);
return promise.future();
}
@Override
public DatagramSocket unlistenMulticastGroup(String multicastAddress, String networkInterface, String source, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = unlistenMulticastGroup(multicastAddress, networkInterface, source);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> unlistenMulticastGroup(String multicastAddress, String networkInterface, @Nullable String source) {
ChannelFuture fut;
try {
InetAddress sourceAddress;
if (source == null) {
sourceAddress = null;
} else {
sourceAddress = InetAddress.getByName(source);
}
fut = channel.leaveGroup(InetAddress.getByName(multicastAddress), NetworkInterface.getByName(networkInterface), sourceAddress);
} catch (Exception e) {
return context.failedFuture(e);
}
PromiseInternal<Void> promise = context.promise();
fut.addListener(promise);
return promise.future();
}
@Override
public DatagramSocket blockMulticastGroup(String multicastAddress, String networkInterface, String sourceToBlock, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = blockMulticastGroup(multicastAddress, networkInterface, sourceToBlock);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> blockMulticastGroup(String multicastAddress, String networkInterface, String sourceToBlock) {
ChannelFuture fut;
try {
InetAddress sourceAddress;
if (sourceToBlock == null) {
sourceAddress = null;
} else {
sourceAddress = InetAddress.getByName(sourceToBlock);
}
fut = channel.block(InetAddress.getByName(multicastAddress), NetworkInterface.getByName(networkInterface), sourceAddress);
} catch (Exception e) {
return context.failedFuture(e);
}
PromiseInternal<Void> promise = context.promise();
fut.addListener(promise);
return promise.future();
}
@Override
public DatagramSocket blockMulticastGroup(String multicastAddress, String sourceToBlock, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = blockMulticastGroup(multicastAddress, sourceToBlock);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> blockMulticastGroup(String multicastAddress, String sourceToBlock) {
ChannelFuture fut;
try {
fut = channel.block(InetAddress.getByName(multicastAddress), InetAddress.getByName(sourceToBlock));
} catch (Exception e) {
return context.failedFuture(e);
}
PromiseInternal<Void> promise = context.promise();
fut.addListener(promise);
return promise.future();
}
@Override
public DatagramSocket listen(int port, String address, Handler<AsyncResult<DatagramSocket>> handler) {
Objects.requireNonNull(handler, "no null handler accepted");
listen(SocketAddress.inetSocketAddress(port, address)).onComplete(handler);
return this;
}
@Override
public Future<DatagramSocket> listen(int port, String address) {
return listen(SocketAddress.inetSocketAddress(port, address));
}
@Override
public synchronized DatagramSocket handler(Handler<io.vertx.core.datagram.DatagramPacket> handler) {
this.packetHandler = handler;
return this;
}
@Override
public DatagramSocketImpl endHandler(Handler<Void> handler) {
endHandler = handler;
return this;
}
@Override
public DatagramSocketImpl exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
private Future<DatagramSocket> listen(SocketAddress local) {
AddressResolver resolver = context.owner().addressResolver();
PromiseInternal<Void> promise = context.promise();
io.netty.util.concurrent.Future<InetSocketAddress> f1 = resolver.resolveHostname(context.nettyEventLoop(), local.host());
f1.addListener((GenericFutureListener<io.netty.util.concurrent.Future<InetSocketAddress>>) res1 -> {
if (res1.isSuccess()) {
ChannelFuture f2 = channel.bind(new InetSocketAddress(res1.getNow().getAddress(), local.port()));
if (metrics != null) {
f2.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Void>>) res2 -> {
if (res2.isSuccess()) {
metrics.listening(local.host(), localAddress());
}
});
}
f2.addListener(promise);
} else {
promise.fail(res1.cause());
}
});
return promise.future().map(this);
}
public synchronized DatagramSocket pause() {
if (demand > 0L) {
demand = 0L;
channel.config().setAutoRead(false);
}
return this;
}
public synchronized DatagramSocket resume() {
if (demand == 0L) {
demand = Long.MAX_VALUE;
channel.config().setAutoRead(true);
}
return this;
}
@Override
public synchronized DatagramSocket fetch(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException("Illegal fetch " + amount);
}
if (amount > 0L) {
if (demand == 0L) {
channel.config().setAutoRead(true);
}
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
}
return this;
}
@Override
public DatagramSocket send(Buffer packet, int port, String host, Handler<AsyncResult<Void>> handler) {
Future<Void> fut = send(packet, port, host);
if (handler != null) {
fut.onComplete(handler);
}
return this;
}
@Override
public Future<Void> send(Buffer packet, int port, String host) {
Objects.requireNonNull(packet, "no null packet accepted");
Objects.requireNonNull(host, "no null host accepted");
if (port < 0 || port > 65535) {
throw new IllegalArgumentException("port out of range:" + port);
}
AddressResolver resolver = context.owner().addressResolver();
PromiseInternal<Void> promise = context.promise();
io.netty.util.concurrent.Future<InetSocketAddress> f1 = resolver.resolveHostname(context.nettyEventLoop(), host);
f1.addListener((GenericFutureListener<io.netty.util.concurrent.Future<InetSocketAddress>>) res1 -> {
if (res1.isSuccess()) {
ChannelFuture f2 = channel.writeAndFlush(new DatagramPacket(packet.getByteBuf(), new InetSocketAddress(f1.getNow().getAddress(), port)));
if (metrics != null) {
f2.addListener(fut -> {
if (fut.isSuccess()) {
metrics.bytesWritten(null, SocketAddress.inetSocketAddress(port, host), packet.length());
}
});
}
f2.addListener(promise);
} else {
promise.fail(res1.cause());
}
});
return promise.future();
}
@Override
public WriteStream<Buffer> sender(int port, String host) {
Arguments.requireInRange(port, 0, 65535, "port p must be in range 0 <= p <= 65535");
Objects.requireNonNull(host, "no null host accepted");
return new PacketWriteStreamImpl(this, port, host);
}
@Override
public DatagramSocket send(String str, int port, String host, Handler<AsyncResult<Void>> handler) {
return send(Buffer.buffer(str), port, host, handler);
}
@Override
public Future<Void> send(String str, int port, String host) {
return send(Buffer.buffer(str), port, host);
}
@Override
public DatagramSocket send(String str, String enc, int port, String host, Handler<AsyncResult<Void>> handler) {
return send(Buffer.buffer(str, enc), port, host, handler);
}
@Override
public Future<Void> send(String str, String enc, int port, String host) {
return send(Buffer.buffer(str, enc), port, host);
}
@Override
public SocketAddress localAddress() {
return context.owner().transport().convert(channel.localAddress());
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
Future<Void> future = close();
if (handler != null) {
future.onComplete(handler);
}
}
@Override
public synchronized Future<Void> close() {
if (!channel.isOpen()) {
return context.succeededFuture();
}
channel.flush();
ChannelFuture future = channel.close();
PromiseInternal<Void> promise = context.promise();
future.addListener(promise);
return promise.future();
}
@Override
public boolean isMetricsEnabled() {
return metrics != null;
}
@Override
public Metrics getMetrics() {
return metrics;
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
private Connection createConnection(ChannelHandlerContext chctx) {
return new Connection(context, chctx);
}
class Connection extends ConnectionBase {
public Connection(ContextInternal context, ChannelHandlerContext channel) {
super(context, channel);
}
@Override
public NetworkMetrics metrics() {
return metrics;
}
@Override
protected void handleInterestedOpsChanged() {
}
@Override
protected void handleException(Throwable t) {
super.handleException(t);
Handler<Throwable> handler;
synchronized (DatagramSocketImpl.this) {
handler = exceptionHandler;
}
if (handler != null) {
handler.handle(t);
}
}
@Override
protected void handleClosed() {
super.handleClosed();
Handler<Void> handler;
DatagramSocketMetrics metrics;
synchronized (DatagramSocketImpl.this) {
handler = endHandler;
metrics = DatagramSocketImpl.this.metrics;
}
if (metrics != null) {
metrics.close();
}
if (handler != null) {
context.emit(null, handler);
}
}
public void handleMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.isDirect()) {
content = VertxHandler.safeBuffer(content, chctx.alloc());
}
handlePacket(new DatagramPacketImpl(packet.sender(), Buffer.buffer(content)));
}
}
void handlePacket(io.vertx.core.datagram.DatagramPacket packet) {
Handler<io.vertx.core.datagram.DatagramPacket> handler;
synchronized (DatagramSocketImpl.this) {
if (metrics != null) {
metrics.bytesRead(null, packet.sender(), packet.data().length());
}
if (demand > 0L) {
if (demand != Long.MAX_VALUE) {
demand--;
}
handler = packetHandler;
} else {
handler = null;
}
}
if (handler != null) {
context.emit(packet, handler);
}
}
}
}