package org.terracotta.offheapstore.disk.paging;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.offheapstore.paging.OffHeapStorageArea;
import org.terracotta.offheapstore.paging.Page;
import org.terracotta.offheapstore.paging.PageSource;
import org.terracotta.offheapstore.util.DebuggingUtils;
import org.terracotta.offheapstore.util.Retryer;
public class MappedPageSource implements PageSource {
private static final Logger LOGGER = LoggerFactory.getLogger(MappedPageSource.class);
private static final Retryer ASYNC_FLUSH_EXECUTOR = new Retryer(10, 600, TimeUnit.SECONDS, r -> {
Thread t = new Thread(r, "MappedByteBufferSource Async Flush Thread");
t.setDaemon(true);
return t;
});
private final File file;
private final RandomAccessFile raf;
private final FileChannel channel;
private final PowerOfTwoFileAllocator allocator;
private final IdentityHashMap<MappedPage, Long> pages = new IdentityHashMap<>();
private final Map<Long, AllocatedRegion> allocated = new HashMap<>();
public MappedPageSource(File file) throws IOException {
this(file, true);
}
public MappedPageSource(File file, long size) throws IOException {
this(file, true, size);
}
public MappedPageSource(File file, boolean truncate) throws IOException {
this(file, truncate, Long.MAX_VALUE);
}
public MappedPageSource(File file, boolean truncate, long size) throws IOException {
if (!file.createNewFile() && file.isDirectory()) {
throw new IOException("File already exists and is a directory");
}
this.file = file;
this.raf = new RandomAccessFile(file, "rw");
this.channel = raf.getChannel();
if (truncate) {
try {
channel.truncate(0);
} catch (IOException e) {
LOGGER.info("Exception prevented truncation of disk store file", e);
}
} else if (channel.size() > size) {
throw new IllegalStateException("Existing file is larger than source limit");
}
this.allocator = new PowerOfTwoFileAllocator(size);
}
public synchronized Long allocateRegion(long size) {
Long address = allocator.allocate(size);
if (address == null) {
return null;
}
allocated.put(address, new AllocatedRegion(address, size));
long max = address + size;
try {
if (max > channel.size()) {
ByteBuffer one = ByteBuffer.allocate(1);
while (one.hasRemaining()) {
channel.write(one, max - 1);
}
}
} catch (IOException e) {
LOGGER.warn("IOException while attempting to extend file " + file.getAbsolutePath(), e);
}
return address;
}
public synchronized void freeRegion(long address) {
AllocatedRegion r = allocated.remove(address);
if (r == null) {
throw new AssertionError();
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Freeing a {}B region from {} &{}", DebuggingUtils.toBase2SuffixedString(r.size), file.getName(), r.address);
}
allocator.free(r.address, r.size);
}
}
public synchronized long claimRegion(long address, long size) throws IOException {
allocator.mark(address, size);
allocated.put(address, new AllocatedRegion(address, size));
return address;
}
public FileChannel getReadableChannel() {
try {
return new RandomAccessFile(file, "r").getChannel();
} catch (FileNotFoundException e) {
throw new AssertionError(e);
}
}
public FileChannel getWritableChannel() {
try {
return new RandomAccessFile(file, "rw").getChannel();
} catch (FileNotFoundException e) {
throw new AssertionError(e);
}
}
public File getFile() {
return file;
}
@Override
public synchronized MappedPage allocate(int size, boolean thief, boolean victim, OffHeapStorageArea owner) {
Long address = allocateRegion(size);
if (address == null) {
return null;
}
try {
MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, address, size);
MappedPage page = new MappedPage(buffer);
pages.put(page, address);
return page;
} catch (IOException e) {
freeRegion(address);
LOGGER.warn("Mapping a new file section failed", e);
return null;
}
}
@Override
public synchronized void free(final Page page) {
final Long a = pages.remove(page);
if (a == null) {
throw new AssertionError();
} else {
ASYNC_FLUSH_EXECUTOR.completeAsynchronously(new Runnable() {
@Override
public void run() {
((MappedByteBuffer) page.asByteBuffer()).force();
freeRegion(a);
}
@Override
public String toString() {
return "Asynchronous flush of Page[" + System.identityHashCode(page) + "] (size=" + page.size() + ")";
}
});
}
}
public synchronized MappedPage claimPage(long address, long size) throws IOException {
claimRegion(address, size);
MappedByteBuffer buffer = channel.map(MapMode.READ_WRITE, address, size);
MappedPage page = new MappedPage(buffer);
pages.put(page, address);
return page;
}
public long getAddress(Page underlying) {
return pages.get(underlying);
}
public synchronized void flush() throws IOException {
if (channel.isOpen()) {
channel.force(true);
}
}
public synchronized void close() throws IOException {
try {
channel.close();
} finally {
raf.close();
}
}
static class AllocatedRegion {
final long address;
final long size;
public AllocatedRegion(long address, long size) {
this.address = address;
this.size = size;
}
}
}