/*
 * Copyright 2004-2019 H2 Group. Multiple-Licensed under the MPL 2.0,
 * and the EPL 1.0 (http://h2database.com/html/license.html).
 * Initial Developer: H2 Group
 */
package org.h2.mvstore;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

A facility to store streams in a map. Streams are split into blocks, which are stored in a map. Very small streams are inlined in the stream id.

The key of the map is a long (incremented for each stored block). The default initial value is 0. Before storing blocks into the map, the stream store checks if there is already a block with the next key, and if necessary searches the next free entry using a binary search (0 to Long.MAX_VALUE).

Before storing

The format of the binary id is: An empty id represents 0 bytes of data. In-place data is encoded as 0, the size (a variable size int), then the data. A stored block is encoded as 1, the length of the block (a variable size int), then the key (a variable size long). Multiple ids can be concatenated to concatenate the data. If the id is large, it is stored itself, which is encoded as 2, the total length (a variable size long), and the key of the block that contains the id (a variable size long).

/** * A facility to store streams in a map. Streams are split into blocks, which * are stored in a map. Very small streams are inlined in the stream id. * <p> * The key of the map is a long (incremented for each stored block). The default * initial value is 0. Before storing blocks into the map, the stream store * checks if there is already a block with the next key, and if necessary * searches the next free entry using a binary search (0 to Long.MAX_VALUE). * <p> * Before storing * <p> * The format of the binary id is: An empty id represents 0 bytes of data. * In-place data is encoded as 0, the size (a variable size int), then the data. * A stored block is encoded as 1, the length of the block (a variable size * int), then the key (a variable size long). Multiple ids can be concatenated * to concatenate the data. If the id is large, it is stored itself, which is * encoded as 2, the total length (a variable size long), and the key of the * block that contains the id (a variable size long). */
public class StreamStore { private final Map<Long, byte[]> map; private int minBlockSize = 256; private int maxBlockSize = 256 * 1024; private final AtomicLong nextKey = new AtomicLong(); private final AtomicReference<byte[]> nextBuffer = new AtomicReference<>();
Create a stream store instance.
Params:
  • map – the map to store blocks of data
/** * Create a stream store instance. * * @param map the map to store blocks of data */
public StreamStore(Map<Long, byte[]> map) { this.map = map; } public Map<Long, byte[]> getMap() { return map; } public void setNextKey(long nextKey) { this.nextKey.set(nextKey); } public long getNextKey() { return nextKey.get(); }
Set the minimum block size. The default is 256 bytes.
Params:
  • minBlockSize – the new value
/** * Set the minimum block size. The default is 256 bytes. * * @param minBlockSize the new value */
public void setMinBlockSize(int minBlockSize) { this.minBlockSize = minBlockSize; } public int getMinBlockSize() { return minBlockSize; }
Set the maximum block size. The default is 256 KB.
Params:
  • maxBlockSize – the new value
/** * Set the maximum block size. The default is 256 KB. * * @param maxBlockSize the new value */
public void setMaxBlockSize(int maxBlockSize) { this.maxBlockSize = maxBlockSize; } public long getMaxBlockSize() { return maxBlockSize; }
Store the stream, and return the id. The stream is not closed.
Params:
  • in – the stream
Throws:
Returns:the id (potentially an empty array)
/** * Store the stream, and return the id. The stream is not closed. * * @param in the stream * @return the id (potentially an empty array) * @throws IOException If an I/O error occurs */
@SuppressWarnings("resource") public byte[] put(InputStream in) throws IOException { ByteArrayOutputStream id = new ByteArrayOutputStream(); int level = 0; try { while (!put(id, in, level)) { if (id.size() > maxBlockSize / 2) { id = putIndirectId(id); level++; } } } catch (IOException e) { remove(id.toByteArray()); throw e; } if (id.size() > minBlockSize * 2) { id = putIndirectId(id); } return id.toByteArray(); } private boolean put(ByteArrayOutputStream id, InputStream in, int level) throws IOException { if (level > 0) { ByteArrayOutputStream id2 = new ByteArrayOutputStream(); while (true) { boolean eof = put(id2, in, level - 1); if (id2.size() > maxBlockSize / 2) { id2 = putIndirectId(id2); id2.writeTo(id); return eof; } else if (eof) { id2.writeTo(id); return true; } } } byte[] readBuffer = nextBuffer.getAndSet(null); if (readBuffer == null) { readBuffer = new byte[maxBlockSize]; } byte[] buff = read(in, readBuffer); if (buff != readBuffer) { // re-use the buffer if the result was shorter nextBuffer.set(readBuffer); } int len = buff.length; if (len == 0) { return true; } boolean eof = len < maxBlockSize; if (len < minBlockSize) { // in-place: 0, len (int), data id.write(0); DataUtils.writeVarInt(id, len); id.write(buff); } else { // block: 1, len (int), blockId (long) id.write(1); DataUtils.writeVarInt(id, len); DataUtils.writeVarLong(id, writeBlock(buff)); } return eof; } private static byte[] read(InputStream in, byte[] target) throws IOException { int copied = 0; int remaining = target.length; while (remaining > 0) { try { int len = in.read(target, copied, remaining); if (len < 0) { return Arrays.copyOf(target, copied); } copied += len; remaining -= len; } catch (RuntimeException e) { throw new IOException(e); } } return target; } private ByteArrayOutputStream putIndirectId(ByteArrayOutputStream id) throws IOException { byte[] data = id.toByteArray(); id = new ByteArrayOutputStream(); // indirect: 2, total len (long), blockId (long) id.write(2); DataUtils.writeVarLong(id, length(data)); DataUtils.writeVarLong(id, writeBlock(data)); return id; } private long writeBlock(byte[] data) { long key = getAndIncrementNextKey(); map.put(key, data); onStore(data.length); return key; }
This method is called after a block of data is stored. Override this method to persist data if necessary.
Params:
  • len – the length of the stored block.
/** * This method is called after a block of data is stored. Override this * method to persist data if necessary. * * @param len the length of the stored block. */
@SuppressWarnings("unused") protected void onStore(int len) { // do nothing by default }
Generate a new key.
Returns:the new key
/** * Generate a new key. * * @return the new key */
private long getAndIncrementNextKey() { long key = nextKey.getAndIncrement(); if (!map.containsKey(key)) { return key; } // search the next free id using binary search synchronized (this) { long low = key, high = Long.MAX_VALUE; while (low < high) { long x = (low + high) >>> 1; if (map.containsKey(x)) { low = x + 1; } else { high = x; } } key = low; nextKey.set(key + 1); return key; } }
Get the key of the biggest block, of -1 for inline data. This method is used to garbage collect orphaned blocks.
Params:
  • id – the id
Returns:the key, or -1
/** * Get the key of the biggest block, of -1 for inline data. * This method is used to garbage collect orphaned blocks. * * @param id the id * @return the key, or -1 */
public long getMaxBlockKey(byte[] id) { long maxKey = -1; ByteBuffer idBuffer = ByteBuffer.wrap(id); while (idBuffer.hasRemaining()) { switch (idBuffer.get()) { case 0: // in-place: 0, len (int), data int len = DataUtils.readVarInt(idBuffer); idBuffer.position(idBuffer.position() + len); break; case 1: // block: 1, len (int), blockId (long) DataUtils.readVarInt(idBuffer); long k = DataUtils.readVarLong(idBuffer); maxKey = Math.max(maxKey, k); break; case 2: // indirect: 2, total len (long), blockId (long) DataUtils.readVarLong(idBuffer); long k2 = DataUtils.readVarLong(idBuffer); maxKey = k2; byte[] r = map.get(k2); // recurse long m = getMaxBlockKey(r); if (m >= 0) { maxKey = Math.max(maxKey, m); } break; default: throw DataUtils.newIllegalArgumentException( "Unsupported id {0}", Arrays.toString(id)); } } return maxKey; }
Remove all stored blocks for the given id.
Params:
  • id – the id
/** * Remove all stored blocks for the given id. * * @param id the id */
public void remove(byte[] id) { ByteBuffer idBuffer = ByteBuffer.wrap(id); while (idBuffer.hasRemaining()) { switch (idBuffer.get()) { case 0: // in-place: 0, len (int), data int len = DataUtils.readVarInt(idBuffer); idBuffer.position(idBuffer.position() + len); break; case 1: // block: 1, len (int), blockId (long) DataUtils.readVarInt(idBuffer); long k = DataUtils.readVarLong(idBuffer); map.remove(k); break; case 2: // indirect: 2, total len (long), blockId (long) DataUtils.readVarLong(idBuffer); long k2 = DataUtils.readVarLong(idBuffer); // recurse remove(map.get(k2)); map.remove(k2); break; default: throw DataUtils.newIllegalArgumentException( "Unsupported id {0}", Arrays.toString(id)); } } }
Convert the id to a human readable string.
Params:
  • id – the stream id
Returns:the string
/** * Convert the id to a human readable string. * * @param id the stream id * @return the string */
public static String toString(byte[] id) { StringBuilder buff = new StringBuilder(); ByteBuffer idBuffer = ByteBuffer.wrap(id); long length = 0; while (idBuffer.hasRemaining()) { long block; int len; switch (idBuffer.get()) { case 0: // in-place: 0, len (int), data len = DataUtils.readVarInt(idBuffer); idBuffer.position(idBuffer.position() + len); buff.append("data len=").append(len); length += len; break; case 1: // block: 1, len (int), blockId (long) len = DataUtils.readVarInt(idBuffer); length += len; block = DataUtils.readVarLong(idBuffer); buff.append("block ").append(block).append(" len=").append(len); break; case 2: // indirect: 2, total len (long), blockId (long) len = DataUtils.readVarInt(idBuffer); length += DataUtils.readVarLong(idBuffer); block = DataUtils.readVarLong(idBuffer); buff.append("indirect block ").append(block).append(" len=").append(len); break; default: buff.append("error"); } buff.append(", "); } buff.append("length=").append(length); return buff.toString(); }
Calculate the number of data bytes for the given id. As the length is encoded in the id, this operation does not cause any reads in the map.
Params:
  • id – the id
Returns:the length
/** * Calculate the number of data bytes for the given id. As the length is * encoded in the id, this operation does not cause any reads in the map. * * @param id the id * @return the length */
public long length(byte[] id) { ByteBuffer idBuffer = ByteBuffer.wrap(id); long length = 0; while (idBuffer.hasRemaining()) { switch (idBuffer.get()) { case 0: // in-place: 0, len (int), data int len = DataUtils.readVarInt(idBuffer); idBuffer.position(idBuffer.position() + len); length += len; break; case 1: // block: 1, len (int), blockId (long) length += DataUtils.readVarInt(idBuffer); DataUtils.readVarLong(idBuffer); break; case 2: // indirect: 2, total len (long), blockId (long) length += DataUtils.readVarLong(idBuffer); DataUtils.readVarLong(idBuffer); break; default: throw DataUtils.newIllegalArgumentException( "Unsupported id {0}", Arrays.toString(id)); } } return length; }
Check whether the id itself contains all the data. This operation does not cause any reads in the map.
Params:
  • id – the id
Returns:if the id contains the data
/** * Check whether the id itself contains all the data. This operation does * not cause any reads in the map. * * @param id the id * @return if the id contains the data */
public boolean isInPlace(byte[] id) { ByteBuffer idBuffer = ByteBuffer.wrap(id); while (idBuffer.hasRemaining()) { if (idBuffer.get() != 0) { return false; } int len = DataUtils.readVarInt(idBuffer); idBuffer.position(idBuffer.position() + len); } return true; }
Open an input stream to read data.
Params:
  • id – the id
Returns:the stream
/** * Open an input stream to read data. * * @param id the id * @return the stream */
public InputStream get(byte[] id) { return new Stream(this, id); }
Get the block.
Params:
  • key – the key
Returns:the block
/** * Get the block. * * @param key the key * @return the block */
byte[] getBlock(long key) { byte[] data = map.get(key); if (data == null) { throw DataUtils.newIllegalStateException( DataUtils.ERROR_BLOCK_NOT_FOUND, "Block {0} not found", key); } return data; }
A stream backed by a map.
/** * A stream backed by a map. */
static class Stream extends InputStream { private final StreamStore store; private byte[] oneByteBuffer; private ByteBuffer idBuffer; private ByteArrayInputStream buffer; private long skip; private final long length; private long pos; Stream(StreamStore store, byte[] id) { this.store = store; this.length = store.length(id); this.idBuffer = ByteBuffer.wrap(id); } @Override public int read() throws IOException { byte[] buffer = oneByteBuffer; if (buffer == null) { buffer = oneByteBuffer = new byte[1]; } int len = read(buffer, 0, 1); return len == -1 ? -1 : (buffer[0] & 255); } @Override public long skip(long n) { n = Math.min(length - pos, n); if (n == 0) { return 0; } if (buffer != null) { long s = buffer.skip(n); if (s > 0) { n = s; } else { buffer = null; skip += n; } } else { skip += n; } pos += n; return n; } @Override public void close() { buffer = null; idBuffer.position(idBuffer.limit()); pos = length; } @Override public int read(byte[] b, int off, int len) throws IOException { if (len <= 0) { return 0; } while (true) { if (buffer == null) { try { buffer = nextBuffer(); } catch (IllegalStateException e) { String msg = DataUtils.formatMessage( DataUtils.ERROR_BLOCK_NOT_FOUND, "Block not found in id {0}", Arrays.toString(idBuffer.array())); throw new IOException(msg, e); } if (buffer == null) { return -1; } } int result = buffer.read(b, off, len); if (result > 0) { pos += result; return result; } buffer = null; } } private ByteArrayInputStream nextBuffer() { while (idBuffer.hasRemaining()) { switch (idBuffer.get()) { case 0: { int len = DataUtils.readVarInt(idBuffer); if (skip >= len) { skip -= len; idBuffer.position(idBuffer.position() + len); continue; } int p = (int) (idBuffer.position() + skip); int l = (int) (len - skip); idBuffer.position(p + l); return new ByteArrayInputStream(idBuffer.array(), p, l); } case 1: { int len = DataUtils.readVarInt(idBuffer); long key = DataUtils.readVarLong(idBuffer); if (skip >= len) { skip -= len; continue; } byte[] data = store.getBlock(key); int s = (int) skip; skip = 0; return new ByteArrayInputStream(data, s, data.length - s); } case 2: { long len = DataUtils.readVarLong(idBuffer); long key = DataUtils.readVarLong(idBuffer); if (skip >= len) { skip -= len; continue; } byte[] k = store.getBlock(key); ByteBuffer newBuffer = ByteBuffer.allocate(k.length + idBuffer.limit() - idBuffer.position()); newBuffer.put(k); newBuffer.put(idBuffer); newBuffer.flip(); idBuffer = newBuffer; return nextBuffer(); } default: throw DataUtils.newIllegalArgumentException( "Unsupported id {0}", Arrays.toString(idBuffer.array())); } } return null; } } }