/*
* Copyright (c) 2008, 2016, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.fs;
import java.nio.file.*;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.*;
import java.io.IOException;
import jdk.internal.misc.Unsafe;
import static sun.nio.fs.UnixNativeDispatcher.*;
import static sun.nio.fs.UnixConstants.*;
Linux implementation of WatchService based on inotify.
In summary a background thread polls inotify plus a socket used for the wakeup
mechanism. Requests to add or remove a watch, or close the watch service,
cause the thread to wakeup and process the request. Events are processed
by the thread which causes it to signal/queue the corresponding watch keys.
/**
* Linux implementation of WatchService based on inotify.
*
* In summary a background thread polls inotify plus a socket used for the wakeup
* mechanism. Requests to add or remove a watch, or close the watch service,
* cause the thread to wakeup and process the request. Events are processed
* by the thread which causes it to signal/queue the corresponding watch keys.
*/
class LinuxWatchService
extends AbstractWatchService
{
private static final Unsafe unsafe = Unsafe.getUnsafe();
// background thread to read change events
private final Poller poller;
LinuxWatchService(UnixFileSystem fs) throws IOException {
// initialize inotify
int ifd = - 1;
try {
ifd = inotifyInit();
} catch (UnixException x) {
String msg = (x.errno() == EMFILE) ?
"User limit of inotify instances reached or too many open files" :
x.errorString();
throw new IOException(msg);
}
// configure inotify to be non-blocking
// create socketpair used in the close mechanism
int sp[] = new int[2];
try {
configureBlocking(ifd, false);
socketpair(sp);
configureBlocking(sp[0], false);
} catch (UnixException x) {
UnixNativeDispatcher.close(ifd);
throw new IOException(x.errorString());
}
this.poller = new Poller(fs, this, ifd, sp);
this.poller.start();
}
@Override
WatchKey register(Path dir,
WatchEvent.Kind<?>[] events,
WatchEvent.Modifier... modifiers)
throws IOException
{
// delegate to poller
return poller.register(dir, events, modifiers);
}
@Override
void implClose() throws IOException {
// delegate to poller
poller.close();
}
WatchKey implementation
/**
* WatchKey implementation
*/
private static class LinuxWatchKey extends AbstractWatchKey {
// inotify descriptor
private final int ifd;
// watch descriptor
private volatile int wd;
LinuxWatchKey(UnixPath dir, LinuxWatchService watcher, int ifd, int wd) {
super(dir, watcher);
this.ifd = ifd;
this.wd = wd;
}
int descriptor() {
return wd;
}
void invalidate(boolean remove) {
if (remove) {
try {
inotifyRmWatch(ifd, wd);
} catch (UnixException x) {
// ignore
}
}
wd = -1;
}
@Override
public boolean isValid() {
return (wd != -1);
}
@Override
public void cancel() {
if (isValid()) {
// delegate to poller
((LinuxWatchService)watcher()).poller.cancel(this);
}
}
}
Background thread to read from inotify
/**
* Background thread to read from inotify
*/
private static class Poller extends AbstractPoller {
struct inotify_event {
int wd;
uint32_t mask;
uint32_t len;
char name __flexarr; // present if len > 0
} act_t;
/**
* struct inotify_event {
* int wd;
* uint32_t mask;
* uint32_t len;
* char name __flexarr; // present if len > 0
* } act_t;
*/
private static final int SIZEOF_INOTIFY_EVENT = eventSize();
private static final int[] offsets = eventOffsets();
private static final int OFFSETOF_WD = offsets[0];
private static final int OFFSETOF_MASK = offsets[1];
private static final int OFFSETOF_LEN = offsets[3];
private static final int OFFSETOF_NAME = offsets[4];
private static final int IN_MODIFY = 0x00000002;
private static final int IN_ATTRIB = 0x00000004;
private static final int IN_MOVED_FROM = 0x00000040;
private static final int IN_MOVED_TO = 0x00000080;
private static final int IN_CREATE = 0x00000100;
private static final int IN_DELETE = 0x00000200;
private static final int IN_UNMOUNT = 0x00002000;
private static final int IN_Q_OVERFLOW = 0x00004000;
private static final int IN_IGNORED = 0x00008000;
// sizeof buffer for when polling inotify
private static final int BUFFER_SIZE = 8192;
private final UnixFileSystem fs;
private final LinuxWatchService watcher;
// inotify file descriptor
private final int ifd;
// socketpair used to shutdown polling thread
private final int socketpair[];
// maps watch descriptor to Key
private final Map<Integer,LinuxWatchKey> wdToKey;
// address of read buffer
private final long address;
Poller(UnixFileSystem fs, LinuxWatchService watcher, int ifd, int[] sp) {
this.fs = fs;
this.watcher = watcher;
this.ifd = ifd;
this.socketpair = sp;
this.wdToKey = new HashMap<>();
this.address = unsafe.allocateMemory(BUFFER_SIZE);
}
@Override
void wakeup() throws IOException {
// write to socketpair to wakeup polling thread
try {
write(socketpair[1], address, 1);
} catch (UnixException x) {
throw new IOException(x.errorString());
}
}
@Override
Object implRegister(Path obj,
Set<? extends WatchEvent.Kind<?>> events,
WatchEvent.Modifier... modifiers)
{
UnixPath dir = (UnixPath)obj;
int mask = 0;
for (WatchEvent.Kind<?> event: events) {
if (event == StandardWatchEventKinds.ENTRY_CREATE) {
mask |= IN_CREATE | IN_MOVED_TO;
continue;
}
if (event == StandardWatchEventKinds.ENTRY_DELETE) {
mask |= IN_DELETE | IN_MOVED_FROM;
continue;
}
if (event == StandardWatchEventKinds.ENTRY_MODIFY) {
mask |= IN_MODIFY | IN_ATTRIB;
continue;
}
}
// no modifiers supported at this time
if (modifiers.length > 0) {
for (WatchEvent.Modifier modifier: modifiers) {
if (modifier == null)
return new NullPointerException();
if (!ExtendedOptions.SENSITIVITY_HIGH.matches(modifier) &&
!ExtendedOptions.SENSITIVITY_MEDIUM.matches(modifier) &&
!ExtendedOptions.SENSITIVITY_LOW.matches(modifier)) {
return new UnsupportedOperationException("Modifier not supported");
}
}
}
// check file is directory
UnixFileAttributes attrs = null;
try {
attrs = UnixFileAttributes.get(dir, true);
} catch (UnixException x) {
return x.asIOException(dir);
}
if (!attrs.isDirectory()) {
return new NotDirectoryException(dir.getPathForExceptionMessage());
}
// register with inotify (replaces existing mask if already registered)
int wd = -1;
try {
NativeBuffer buffer =
NativeBuffers.asNativeBuffer(dir.getByteArrayForSysCalls());
try {
wd = inotifyAddWatch(ifd, buffer.address(), mask);
} finally {
buffer.release();
}
} catch (UnixException x) {
if (x.errno() == ENOSPC) {
return new IOException("User limit of inotify watches reached");
}
return x.asIOException(dir);
}
// ensure watch descriptor is in map
LinuxWatchKey key = wdToKey.get(wd);
if (key == null) {
key = new LinuxWatchKey(dir, watcher, ifd, wd);
wdToKey.put(wd, key);
}
return key;
}
// cancel single key
@Override
void implCancelKey(WatchKey obj) {
LinuxWatchKey key = (LinuxWatchKey)obj;
if (key.isValid()) {
wdToKey.remove(key.descriptor());
key.invalidate(true);
}
}
// close watch service
@Override
void implCloseAll() {
// invalidate all keys
for (Map.Entry<Integer,LinuxWatchKey> entry: wdToKey.entrySet()) {
entry.getValue().invalidate(true);
}
wdToKey.clear();
// free resources
unsafe.freeMemory(address);
UnixNativeDispatcher.close(socketpair[0]);
UnixNativeDispatcher.close(socketpair[1]);
UnixNativeDispatcher.close(ifd);
}
Poller main loop
/**
* Poller main loop
*/
@Override
public void run() {
try {
for (;;) {
int nReady, bytesRead;
// wait for close or inotify event
nReady = poll(ifd, socketpair[0]);
// read from inotify
try {
bytesRead = read(ifd, address, BUFFER_SIZE);
} catch (UnixException x) {
if (x.errno() != EAGAIN)
throw x;
bytesRead = 0;
}
// iterate over buffer to decode events
int offset = 0;
while (offset < bytesRead) {
long event = address + offset;
int wd = unsafe.getInt(event + OFFSETOF_WD);
int mask = unsafe.getInt(event + OFFSETOF_MASK);
int len = unsafe.getInt(event + OFFSETOF_LEN);
// file name
UnixPath name = null;
if (len > 0) {
int actual = len;
// null-terminated and maybe additional null bytes to
// align the next event
while (actual > 0) {
long last = event + OFFSETOF_NAME + actual - 1;
if (unsafe.getByte(last) != 0)
break;
actual--;
}
if (actual > 0) {
byte[] buf = new byte[actual];
unsafe.copyMemory(null, event + OFFSETOF_NAME,
buf, Unsafe.ARRAY_BYTE_BASE_OFFSET, actual);
name = new UnixPath(fs, buf);
}
}
// process event
processEvent(wd, mask, name);
offset += (SIZEOF_INOTIFY_EVENT + len);
}
// process any pending requests
if ((nReady > 1) || (nReady == 1 && bytesRead == 0)) {
try {
read(socketpair[0], address, BUFFER_SIZE);
boolean shutdown = processRequests();
if (shutdown)
break;
} catch (UnixException x) {
if (x.errno() != UnixConstants.EAGAIN)
throw x;
}
}
}
} catch (UnixException x) {
x.printStackTrace();
}
}
map inotify event to WatchEvent.Kind
/**
* map inotify event to WatchEvent.Kind
*/
private WatchEvent.Kind<?> maskToEventKind(int mask) {
if ((mask & IN_MODIFY) > 0)
return StandardWatchEventKinds.ENTRY_MODIFY;
if ((mask & IN_ATTRIB) > 0)
return StandardWatchEventKinds.ENTRY_MODIFY;
if ((mask & IN_CREATE) > 0)
return StandardWatchEventKinds.ENTRY_CREATE;
if ((mask & IN_MOVED_TO) > 0)
return StandardWatchEventKinds.ENTRY_CREATE;
if ((mask & IN_DELETE) > 0)
return StandardWatchEventKinds.ENTRY_DELETE;
if ((mask & IN_MOVED_FROM) > 0)
return StandardWatchEventKinds.ENTRY_DELETE;
return null;
}
Process event from inotify
/**
* Process event from inotify
*/
private void processEvent(int wd, int mask, final UnixPath name) {
// overflow - signal all keys
if ((mask & IN_Q_OVERFLOW) > 0) {
for (Map.Entry<Integer,LinuxWatchKey> entry: wdToKey.entrySet()) {
entry.getValue()
.signalEvent(StandardWatchEventKinds.OVERFLOW, null);
}
return;
}
// lookup wd to get key
LinuxWatchKey key = wdToKey.get(wd);
if (key == null)
return; // should not happen
// file deleted
if ((mask & IN_IGNORED) > 0) {
wdToKey.remove(wd);
key.invalidate(false);
key.signal();
return;
}
// event for directory itself
if (name == null)
return;
// map to event and queue to key
WatchEvent.Kind<?> kind = maskToEventKind(mask);
if (kind != null) {
key.signalEvent(kind, name);
}
}
}
// -- native methods --
// sizeof inotify_event
private static native int eventSize();
// offsets of inotify_event
private static native int[] eventOffsets();
private static native int inotifyInit() throws UnixException;
private static native int inotifyAddWatch(int fd, long pathAddress, int mask)
throws UnixException;
private static native void inotifyRmWatch(int fd, int wd)
throws UnixException;
private static native void configureBlocking(int fd, boolean blocking)
throws UnixException;
private static native void socketpair(int[] sv) throws UnixException;
private static native int poll(int fd1, int fd2) throws UnixException;
static {
AccessController.doPrivileged(new PrivilegedAction<>() {
public Void run() {
System.loadLibrary("nio");
return null;
}});
}
}