package io.vertx.core.file.impl;
import io.netty.buffer.ByteBuf;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystemException;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.impl.InboundBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class AsyncFileImpl implements AsyncFile {
private static final Logger log = LoggerFactory.getLogger(AsyncFile.class);
public static final int DEFAULT_READ_BUFFER_SIZE = 8192;
private final VertxInternal vertx;
private final AsynchronousFileChannel ch;
private final ContextInternal context;
private boolean closed;
private Runnable closedDeferred;
private long writesOutstanding;
private Handler<Throwable> exceptionHandler;
private Handler<Void> drainHandler;
private long writePos;
private int maxWrites = 128 * 1024;
private int lwm = maxWrites / 2;
private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private InboundBuffer<Buffer> queue;
private Handler<Buffer> handler;
private Handler<Void> endHandler;
private long readPos;
AsyncFileImpl(VertxInternal vertx, String path, OpenOptions options, ContextInternal context) {
if (!options.isRead() && !options.isWrite()) {
throw new FileSystemException("Cannot open file for neither reading nor writing");
}
this.vertx = vertx;
Path file = Paths.get(path);
HashSet<OpenOption> opts = new HashSet<>();
if (options.isRead()) opts.add(StandardOpenOption.READ);
if (options.isWrite()) opts.add(StandardOpenOption.WRITE);
if (options.isCreate()) opts.add(StandardOpenOption.CREATE);
if (options.isCreateNew()) opts.add(StandardOpenOption.CREATE_NEW);
if (options.isSync()) opts.add(StandardOpenOption.SYNC);
if (options.isDsync()) opts.add(StandardOpenOption.DSYNC);
if (options.isDeleteOnClose()) opts.add(StandardOpenOption.DELETE_ON_CLOSE);
if (options.isSparse()) opts.add(StandardOpenOption.SPARSE);
if (options.isTruncateExisting()) opts.add(StandardOpenOption.TRUNCATE_EXISTING);
try {
if (options.getPerms() != null) {
FileAttribute<?> attrs = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString(options.getPerms()));
ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool(), attrs);
} else {
ch = AsynchronousFileChannel.open(file, opts, vertx.getWorkerPool());
}
if (options.isAppend()) writePos = ch.size();
} catch (IOException e) {
throw new FileSystemException(e);
}
this.context = context;
this.queue = new InboundBuffer<>(context, 0);
queue.handler(buff -> {
if (buff.length() > 0) {
handleBuffer(buff);
} else {
handleEnd();
}
});
queue.drainHandler(v -> {
doRead();
});
}
@Override
public void close() {
closeInternal(null);
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
closeInternal(handler);
}
@Override
public void end() {
close();
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
close(handler);
}
@Override
public synchronized AsyncFile read(Buffer buffer, int offset, long position, int length, Handler<AsyncResult<Buffer>> handler) {
Objects.requireNonNull(buffer, "buffer");
Objects.requireNonNull(handler, "handler");
Arguments.require(offset >= 0, "offset must be >= 0");
Arguments.require(position >= 0, "position must be >= 0");
Arguments.require(length >= 0, "length must be >= 0");
check();
ByteBuffer bb = ByteBuffer.allocate(length);
doRead(buffer, offset, bb, position, handler);
return this;
}
@Override
public AsyncFile fetch(long amount) {
queue.fetch(amount);
return this;
}
@Override
public AsyncFile write(Buffer buffer, long position, Handler<AsyncResult<Void>> handler) {
Objects.requireNonNull(handler, "handler");
return doWrite(buffer, position, handler);
}
private synchronized AsyncFile doWrite(Buffer buffer, long position, Handler<AsyncResult<Void>> handler) {
Objects.requireNonNull(buffer, "buffer");
Arguments.require(position >= 0, "position must be >= 0");
check();
Handler<AsyncResult<Void>> wrapped = ar -> {
if (ar.succeeded()) {
checkContext();
Runnable action;
synchronized (AsyncFileImpl.this) {
if (writesOutstanding == 0 && closedDeferred != null) {
action = closedDeferred;
} else {
action = this::checkDrained;
}
}
action.run();
if (handler != null) {
handler.handle(ar);
}
} else {
if (handler != null) {
handler.handle(ar);
} else {
handleException(ar.cause());
}
}
};
ByteBuf buf = buffer.getByteBuf();
if (buf.nioBufferCount() > 1) {
doWrite(buf.nioBuffers(), position, wrapped);
} else {
ByteBuffer bb = buf.nioBuffer();
doWrite(bb, position, bb.limit(), wrapped);
}
return this;
}
@Override
public AsyncFile write(Buffer buffer) {
return write(buffer, null);
}
@Override
public synchronized AsyncFile write(Buffer buffer, Handler<AsyncResult<Void>> handler) {
int length = buffer.length();
doWrite(buffer, writePos, handler);
writePos += length;
return this;
}
@Override
public synchronized AsyncFile setWriteQueueMaxSize(int maxSize) {
Arguments.require(maxSize >= 2, "maxSize must be >= 2");
check();
this.maxWrites = maxSize;
this.lwm = maxWrites / 2;
return this;
}
@Override
public synchronized AsyncFile setReadBufferSize(int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
@Override
public synchronized boolean writeQueueFull() {
check();
return writesOutstanding >= maxWrites;
}
@Override
public synchronized AsyncFile drainHandler(Handler<Void> handler) {
check();
this.drainHandler = handler;
checkDrained();
return this;
}
@Override
public synchronized AsyncFile exceptionHandler(Handler<Throwable> handler) {
check();
this.exceptionHandler = handler;
return this;
}
@Override
public synchronized AsyncFile handler(Handler<Buffer> handler) {
check();
if (closed) {
return this;
}
this.handler = handler;
if (handler != null) {
doRead();
} else {
queue.clear();
}
return this;
}
@Override
public synchronized AsyncFile endHandler(Handler<Void> handler) {
check();
this.endHandler = handler;
return this;
}
@Override
public synchronized AsyncFile pause() {
check();
queue.pause();
return this;
}
@Override
public synchronized AsyncFile resume() {
check();
if (!closed) {
queue.resume();
}
return this;
}
@Override
public AsyncFile flush() {
doFlush(null);
return this;
}
@Override
public AsyncFile flush(Handler<AsyncResult<Void>> handler) {
doFlush(handler);
return this;
}
@Override
public synchronized AsyncFile setReadPos(long readPos) {
this.readPos = readPos;
return this;
}
@Override
public synchronized AsyncFile setWritePos(long writePos) {
this.writePos = writePos;
return this;
}
@Override
public synchronized long getWritePos() {
return writePos;
}
private synchronized void checkDrained() {
if (drainHandler != null && writesOutstanding <= lwm) {
Handler<Void> handler = drainHandler;
drainHandler = null;
handler.handle(null);
}
}
private void handleException(Throwable t) {
if (exceptionHandler != null && t instanceof Exception) {
exceptionHandler.handle(t);
} else {
log.error("Unhandled exception", t);
}
}
private synchronized void doWrite(ByteBuffer[] buffers, long position, Handler<AsyncResult<Void>> handler) {
AtomicInteger cnt = new AtomicInteger();
AtomicBoolean sentFailure = new AtomicBoolean();
for (ByteBuffer b: buffers) {
int limit = b.limit();
doWrite(b, position, limit, ar -> {
if (ar.succeeded()) {
if (cnt.incrementAndGet() == buffers.length) {
handler.handle(ar);
}
} else {
if (sentFailure.compareAndSet(false, true)) {
handler.handle(ar);
}
}
});
position += limit;
}
}
private void doRead() {
doRead(ByteBuffer.allocate(readBufferSize));
}
private synchronized void doRead(ByteBuffer bb) {
Buffer buff = Buffer.buffer(readBufferSize);
doRead(buff, 0, bb, readPos, ar -> {
if (ar.succeeded()) {
Buffer buffer = ar.result();
readPos += buffer.length();
if (queue.write(buffer) && buffer.length() > 0) {
doRead(bb);
}
} else {
handleException(ar.cause());
}
});
}
private synchronized void handleBuffer(Buffer buff) {
if (handler != null) {
checkContext();
handler.handle(buff);
}
}
private synchronized void handleEnd() {
handler = null;
if (endHandler != null) {
checkContext();
endHandler.handle(null);
}
}
private synchronized void doFlush(Handler<AsyncResult<Void>> handler) {
checkClosed();
context.executeBlockingInternal((Promise<Void> fut) -> {
try {
ch.force(false);
fut.complete();
} catch (IOException e) {
throw new FileSystemException(e);
}
}, handler);
}
private void doWrite(ByteBuffer buff, long position, long toWrite, Handler<AsyncResult<Void>> handler) {
if (toWrite > 0) {
synchronized (this) {
writesOutstanding += toWrite;
}
writeInternal(buff, position, handler);
} else {
handler.handle(Future.succeededFuture());
}
}
private void writeInternal(ByteBuffer buff, long position, Handler<AsyncResult<Void>> handler) {
ch.write(buff, position, null, new java.nio.channels.CompletionHandler<Integer, Object>() {
public void completed(Integer bytesWritten, Object attachment) {
long pos = position;
if (buff.hasRemaining()) {
pos += bytesWritten;
writeInternal(buff, pos, handler);
} else {
context.runOnContext((v) -> {
synchronized (AsyncFileImpl.this) {
writesOutstanding -= buff.limit();
}
handler.handle(Future.succeededFuture());
});
}
}
public void failed(Throwable exc, Object attachment) {
if (exc instanceof Exception) {
context.runOnContext((v) -> handler.handle(Future.failedFuture(exc)));
} else {
log.error("Error occurred", exc);
}
}
});
}
private void doRead(Buffer writeBuff, int offset, ByteBuffer buff, long position, Handler<AsyncResult<Buffer>> handler) {
ch.read(buff, position, null, new java.nio.channels.CompletionHandler<Integer, Object>() {
long pos = position;
private void done() {
context.runOnContext((v) -> {
buff.flip();
writeBuff.setBytes(offset, buff);
buff.compact();
handler.handle(Future.succeededFuture(writeBuff));
});
}
public void completed(Integer bytesRead, Object attachment) {
if (bytesRead == -1) {
done();
} else if (buff.hasRemaining()) {
pos += bytesRead;
doRead(writeBuff, offset, buff, pos, handler);
} else {
done();
}
}
public void failed(Throwable t, Object attachment) {
context.runOnContext((v) -> handler.handle(Future.failedFuture(t)));
}
});
}
private void check() {
checkClosed();
}
private void checkClosed() {
if (closed) {
throw new IllegalStateException("File handle is closed");
}
}
private void checkContext() {
if (!vertx.getContext().equals(context)) {
throw new IllegalStateException("AsyncFile must only be used in the context that created it, expected: "
+ context + " actual " + vertx.getContext());
}
}
private void doClose(Handler<AsyncResult<Void>> handler) {
ContextInternal handlerContext = vertx.getOrCreateContext();
handlerContext.executeBlockingInternal(res -> {
try {
ch.close();
res.complete(null);
} catch (IOException e) {
res.fail(e);
}
}, handler);
}
private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
check();
closed = true;
if (writesOutstanding == 0) {
doClose(handler);
} else {
closedDeferred = () -> doClose(handler);
}
}
}