/*
* Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.ch;
import java.nio.channels.spi.AsynchronousChannelProvider;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import static sun.nio.ch.EPoll.EPOLLIN;
import static sun.nio.ch.EPoll.EPOLLONESHOT;
import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
AsynchronousChannelGroup implementation based on the Linux epoll facility.
/**
* AsynchronousChannelGroup implementation based on the Linux epoll facility.
*/
final class EPollPort
extends Port
{
// maximum number of events to poll at a time
private static final int MAX_EPOLL_EVENTS = 512;
// errors
private static final int ENOENT = 2;
// epoll file descriptor
private final int epfd;
// address of the poll array passed to epoll_wait
private final long address;
// true if epoll closed
private boolean closed;
// socket pair used for wakeup
private final int sp[];
// number of wakeups pending
private final AtomicInteger wakeupCount = new AtomicInteger();
// encapsulates an event for a channel
static class Event {
final PollableChannel channel;
final int events;
Event(PollableChannel channel, int events) {
this.channel = channel;
this.events = events;
}
PollableChannel channel() { return channel; }
int events() { return events; }
}
// queue of events for cases that a polling thread dequeues more than one
// event
private final ArrayBlockingQueue<Event> queue;
private final Event NEED_TO_POLL = new Event(null, 0);
private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
throws IOException
{
super(provider, pool);
this.epfd = EPoll.create();
this.address = EPoll.allocatePollArray(MAX_EPOLL_EVENTS);
// create socket pair for wakeup mechanism
try {
long fds = IOUtil.makePipe(true);
this.sp = new int[]{(int) (fds >>> 32), (int) fds};
} catch (IOException ioe) {
EPoll.freePollArray(address);
FileDispatcherImpl.closeIntFD(epfd);
throw ioe;
}
// register one end with epoll
EPoll.ctl(epfd, EPOLL_CTL_ADD, sp[0], EPOLLIN);
// create the queue and offer the special event to ensure that the first
// threads polls
this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
this.queue.offer(NEED_TO_POLL);
}
EPollPort start() {
startThreads(new EventHandlerTask());
return this;
}
Release all resources
/**
* Release all resources
*/
private void implClose() {
synchronized (this) {
if (closed)
return;
closed = true;
}
try { FileDispatcherImpl.closeIntFD(epfd); } catch (IOException ioe) { }
try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
EPoll.freePollArray(address);
}
private void wakeup() {
if (wakeupCount.incrementAndGet() == 1) {
// write byte to socketpair to force wakeup
try {
IOUtil.write1(sp[1], (byte)0);
} catch (IOException x) {
throw new AssertionError(x);
}
}
}
@Override
void executeOnHandlerTask(Runnable task) {
synchronized (this) {
if (closed)
throw new RejectedExecutionException();
offerTask(task);
wakeup();
}
}
@Override
void shutdownHandlerTasks() {
/*
* If no tasks are running then just release resources; otherwise
* write to the one end of the socketpair to wakeup any polling threads.
*/
int nThreads = threadCount();
if (nThreads == 0) {
implClose();
} else {
// send wakeup to each thread
while (nThreads-- > 0) {
wakeup();
}
}
}
// invoke by clients to register a file descriptor
@Override
void startPoll(int fd, int events) {
// update events (or add to epoll on first usage)
int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
if (err == ENOENT)
err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
if (err != 0)
throw new AssertionError(); // should not happen
}
Task to process events from epoll and dispatch to the channel's
onEvent handler.
Events are retrieved from epoll in batch and offered to a BlockingQueue
where they are consumed by handler threads. A special "NEED_TO_POLL"
event is used to signal one consumer to re-poll when all events have
been consumed.
/**
* Task to process events from epoll and dispatch to the channel's
* onEvent handler.
*
* Events are retrieved from epoll in batch and offered to a BlockingQueue
* where they are consumed by handler threads. A special "NEED_TO_POLL"
* event is used to signal one consumer to re-poll when all events have
* been consumed.
*/
private class EventHandlerTask implements Runnable {
private Event poll() throws IOException {
try {
for (;;) {
int n;
do {
n = EPoll.wait(epfd, address, MAX_EPOLL_EVENTS, -1);
} while (n == IOStatus.INTERRUPTED);
/**
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
* they can be handled by other handler threads. The last
* event is handled by this thread (and so is not queued).
*/
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
long eventAddress = EPoll.getEvent(address, n);
int fd = EPoll.getDescriptor(eventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// consume one wakeup byte, never more as this
// would interfere with shutdown when there is
// a wakeup byte queued to wake each thread
int nread;
do {
nread = IOUtil.drain1(sp[0]);
} while (nread == IOStatus.INTERRUPTED);
}
// queue special event if there are more events
// to handle.
if (n > 0) {
queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
continue;
}
return EXECUTE_TASK_OR_SHUTDOWN;
}
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
int events = EPoll.getEvents(eventAddress);
Event ev = new Event(channel, events);
// n-1 events are queued; This thread handles
// the last one except for the wakeup
if (n > 0) {
queue.offer(ev);
} else {
return ev;
}
}
}
} finally {
fdToChannelLock.readLock().unlock();
}
}
} finally {
// to ensure that some thread will poll when all events have
// been consumed
queue.offer(NEED_TO_POLL);
}
}
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
final boolean isPooledThread = (myGroupAndInvokeCount != null);
boolean replaceMe = false;
Event ev;
try {
for (;;) {
// reset invoke count
if (isPooledThread)
myGroupAndInvokeCount.resetInvokeCount();
try {
replaceMe = false;
ev = queue.take();
// no events and this thread has been "selected" to
// poll for more.
if (ev == NEED_TO_POLL) {
try {
ev = poll();
} catch (IOException x) {
x.printStackTrace();
return;
}
}
} catch (InterruptedException x) {
continue;
}
// handle wakeup to execute task or shutdown
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
Runnable task = pollTask();
if (task == null) {
// shutdown request
return;
}
// run task (may throw error/exception)
replaceMe = true;
task.run();
continue;
}
// process event
try {
ev.channel().onEvent(ev.events(), isPooledThread);
} catch (Error x) {
replaceMe = true; throw x;
} catch (RuntimeException x) {
replaceMe = true; throw x;
}
}
} finally {
// last handler to exit when shutdown releases resources
int remaining = threadExit(this, replaceMe);
if (remaining == 0 && isShutdown()) {
implClose();
}
}
}
}
}