/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.net.impl;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.EventExecutor;
import io.vertx.core.Handler;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
A channel server load balancer that distributes channel processing to a list of workers.
Author: Tim Fox
/**
* A channel server load balancer that distributes channel processing to a list of workers.
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
class ServerChannelLoadBalancer extends ChannelInitializer<Channel> {
private final VertxEventLoopGroup workers;
private final ConcurrentMap<EventLoop, WorkerList> workerMap = new ConcurrentHashMap<>();
private final ChannelGroup channelGroup;
// We maintain a separate hasHandlers variable so we can implement hasHandlers() efficiently
// As it is called for every HTTP message received
private volatile boolean hasHandlers;
ServerChannelLoadBalancer(EventExecutor executor) {
this.workers = new VertxEventLoopGroup();
this.channelGroup = new DefaultChannelGroup(executor);
}
public VertxEventLoopGroup workers() {
return workers;
}
public boolean hasHandlers() {
return hasHandlers;
}
@Override
protected void initChannel(Channel ch) {
Handler<Channel> handler = chooseInitializer(ch.eventLoop());
if (handler == null) {
ch.close();
} else {
channelGroup.add(ch);
handler.handle(ch);
}
}
private Handler<Channel> chooseInitializer(EventLoop worker) {
WorkerList handlers = workerMap.get(worker);
return handlers == null ? null : handlers.chooseHandler();
}
public synchronized void addWorker(EventLoop eventLoop, Handler<Channel> handler) {
workers.addWorker(eventLoop);
WorkerList handlers = new WorkerList();
WorkerList prev = workerMap.putIfAbsent(eventLoop, handlers);
if (prev != null) {
handlers = prev;
}
handlers.addWorker(handler);
hasHandlers = true;
}
public synchronized boolean removeWorker(EventLoop worker, Handler<Channel> handler) {
WorkerList handlers = workerMap.get(worker);
if (handlers == null || !handlers.removeWorker(handler)) {
return false;
}
if (handlers.isEmpty()) {
workerMap.remove(worker);
}
if (workerMap.isEmpty()) {
hasHandlers = false;
}
//Available workers does it's own reference counting -since workers can be shared across different Handlers
workers.removeWorker(worker);
return true;
}
private static final class WorkerList {
private int pos;
private final List<Handler<Channel>> list = new CopyOnWriteArrayList<>();
Handler<Channel> chooseHandler() {
Handler<Channel> handler = list.get(pos);
pos++;
checkPos();
return handler;
}
void addWorker(Handler<Channel> handler) {
list.add(handler);
}
boolean removeWorker(Handler<Channel> handler) {
if (list.remove(handler)) {
checkPos();
return true;
} else {
return false;
}
}
boolean isEmpty() {
return list.isEmpty();
}
void checkPos() {
if (pos == list.size()) {
pos = 0;
}
}
}
Close the load-balancer and all registered channels.
/**
* Close the load-balancer and all registered channels.
*/
public void close() {
channelGroup.close();
}
}