/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.index.sasi.disk;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;

import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.index.sasi.plan.Expression.Op;
import org.apache.cassandra.index.sasi.sa.IndexedTerm;
import org.apache.cassandra.index.sasi.sa.IntegralSA;
import org.apache.cassandra.index.sasi.sa.SA;
import org.apache.cassandra.index.sasi.sa.TermIterator;
import org.apache.cassandra.index.sasi.sa.SuffixSA;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;

import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.LongSet;
import com.carrotsearch.hppc.ShortArrayList;
import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OnDiskIndexBuilder
{
    private static final Logger logger = LoggerFactory.getLogger(OnDiskIndexBuilder.class);

    public enum Mode
    {
        PREFIX(EnumSet.of(Op.EQ, Op.MATCH, Op.PREFIX, Op.NOT_EQ, Op.RANGE)),
        CONTAINS(EnumSet.of(Op.EQ, Op.MATCH, Op.CONTAINS, Op.PREFIX, Op.SUFFIX, Op.NOT_EQ)),
        SPARSE(EnumSet.of(Op.EQ, Op.NOT_EQ, Op.RANGE));

        Set<Op> supportedOps;

        Mode(Set<Op> ops)
        {
            supportedOps = ops;
        }

        public static Mode mode(String mode)
        {
            return Mode.valueOf(mode.toUpperCase());
        }

        public boolean supports(Op op)
        {
            return supportedOps.contains(op);
        }
    }

    public enum TermSize
    {
        INT(4), LONG(8), UUID(16), VARIABLE(-1);

        public final int size;

        TermSize(int size)
        {
            this.size = size;
        }

        public boolean isConstant()
        {
            return this != VARIABLE;
        }

        public static TermSize of(int size)
        {
            switch (size)
            {
                case -1:
                    return VARIABLE;

                case 4:
                    return INT;

                case 8:
                    return LONG;

                case 16:
                    return UUID;

                default:
                    throw new IllegalStateException("unknown state: " + size);
            }
        }

        public static TermSize sizeOf(AbstractType<?> comparator)
        {
            if (comparator instanceof Int32Type || comparator instanceof FloatType)
                return INT;

            if (comparator instanceof LongType || comparator instanceof DoubleType
                    || comparator instanceof TimestampType || comparator instanceof DateType)
                return LONG;

            if (comparator instanceof TimeUUIDType || comparator instanceof UUIDType)
                return UUID;

            return VARIABLE;
        }
    }

    public static final int BLOCK_SIZE = 4096;
    public static final int MAX_TERM_SIZE = 1024;
    public static final int SUPER_BLOCK_SIZE = 64;
    public static final int IS_PARTIAL_BIT = 15;

    private static final SequentialWriterOption WRITER_OPTION = SequentialWriterOption.newBuilder()
                                                                                      .bufferSize(BLOCK_SIZE)
                                                                                      .build();

    private final List<MutableLevel<InMemoryPointerTerm>> levels = new ArrayList<>();
    private MutableLevel<InMemoryDataTerm> dataLevel;

    private final TermSize termSize;

    private final AbstractType<?> keyComparator, termComparator;

    private final Map<ByteBuffer, TokenTreeBuilder> terms;
    private final Mode mode;
    private final boolean marksPartials;

    private ByteBuffer minKey, maxKey;
    private long estimatedBytes;

    public OnDiskIndexBuilder(AbstractType<?> keyComparator, AbstractType<?> comparator, Mode mode)
    {
        this(keyComparator, comparator, mode, true);
    }

    public OnDiskIndexBuilder(AbstractType<?> keyComparator, AbstractType<?> comparator, Mode mode, boolean marksPartials)
    {
        this.keyComparator = keyComparator;
        this.termComparator = comparator;
        this.terms = new HashMap<>();
        this.termSize = TermSize.sizeOf(comparator);
        this.mode = mode;
        this.marksPartials = marksPartials;
    }

    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
    {
        if (term.remaining() >= MAX_TERM_SIZE)
        {
            logger.error("Rejecting value (value size {}, maximum size {}).",
                         FBUtilities.prettyPrintMemory(term.remaining()),
                         FBUtilities.prettyPrintMemory(Short.MAX_VALUE));
            return this;
        }

        TokenTreeBuilder tokens = terms.get(term);
        if (tokens == null)
        {
            terms.put(term, (tokens = new DynamicTokenTreeBuilder()));

            // on-heap size estimates from jol
            // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for the term (map key)
            estimatedBytes += 64 + 48 + term.remaining();
        }

        tokens.add((Long) key.getToken().getTokenValue(), keyPosition);

        // calculate key range (based on actual key values) for current index
        minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
        maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;

        // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
        // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
        // in the case of hash collision for the token we may overestimate but this is extremely rare
        estimatedBytes += 60 + 40 + 8;

        return this;
    }

    public long estimatedMemoryUse()
    {
        return estimatedBytes;
    }

    private void addTerm(InMemoryDataTerm term, SequentialWriter out) throws IOException
    {
        InMemoryPointerTerm ptr = dataLevel.add(term);
        if (ptr == null)
            return;

        int levelIdx = 0;
        for (;;)
        {
            MutableLevel<InMemoryPointerTerm> level = getIndexLevel(levelIdx++, out);
            if ((ptr = level.add(ptr)) == null)
                break;
        }
    }

    public boolean isEmpty()
    {
        return terms.isEmpty();
    }

    public void finish(Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms)
    {
        finish(Descriptor.CURRENT, range, file, terms);
    }

    
Finishes up index building process by creating/populating index file.
Params:
  • indexFile – The file to write index contents to.
Throws:
Returns:true if index was written successfully, false otherwise (e.g. if index was empty).
/** * Finishes up index building process by creating/populating index file. * * @param indexFile The file to write index contents to. * * @return true if index was written successfully, false otherwise (e.g. if index was empty). * * @throws FSWriteError on I/O error. */
public boolean finish(File indexFile) throws FSWriteError { return finish(Descriptor.CURRENT, indexFile); } @VisibleForTesting protected boolean finish(Descriptor descriptor, File file) throws FSWriteError { // no terms means there is nothing to build if (terms.isEmpty()) { try { file.createNewFile(); } catch (IOException e) { throw new FSWriteError(e, file); } return false; } // split terms into suffixes only if it's text, otherwise (even if CONTAINS is set) use terms in original form SA sa = ((termComparator instanceof UTF8Type || termComparator instanceof AsciiType) && mode == Mode.CONTAINS) ? new SuffixSA(termComparator, mode) : new IntegralSA(termComparator, mode); for (Map.Entry<ByteBuffer, TokenTreeBuilder> term : terms.entrySet()) sa.add(term.getKey(), term.getValue()); finish(descriptor, Pair.create(minKey, maxKey), file, sa.finish()); return true; } @SuppressWarnings("resource") protected void finish(Descriptor descriptor, Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms) { SequentialWriter out = null; try { out = new SequentialWriter(file, WRITER_OPTION); out.writeUTF(descriptor.version.toString()); out.writeShort(termSize.size); // min, max term (useful to find initial scan range from search expressions) ByteBufferUtil.writeWithShortLength(terms.minTerm(), out); ByteBufferUtil.writeWithShortLength(terms.maxTerm(), out); // min, max keys covered by index (useful when searching across multiple indexes) ByteBufferUtil.writeWithShortLength(range.left, out); ByteBufferUtil.writeWithShortLength(range.right, out); out.writeUTF(mode.toString()); out.writeBoolean(marksPartials); out.skipBytes((int) (BLOCK_SIZE - out.position())); dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(termComparator, mode)) : new MutableLevel<>(out, new MutableDataBlock(termComparator, mode)); while (terms.hasNext()) { Pair<IndexedTerm, TokenTreeBuilder> term = terms.next(); addTerm(new InMemoryDataTerm(term.left, term.right), out); } dataLevel.finalFlush(); for (MutableLevel l : levels) l.flush(); // flush all of the buffers // and finally write levels index final long levelIndexPosition = out.position(); out.writeInt(levels.size()); for (int i = levels.size() - 1; i >= 0; i--) levels.get(i).flushMetadata(); dataLevel.flushMetadata(); out.writeLong(levelIndexPosition); // sync contents of the output and disk, // since it's not done implicitly on close out.sync(); } catch (IOException e) { throw new FSWriteError(e, file); } finally { FileUtils.closeQuietly(out); } } private MutableLevel<InMemoryPointerTerm> getIndexLevel(int idx, SequentialWriter out) { if (levels.size() == 0) levels.add(new MutableLevel<>(out, new MutableBlock<>())); if (levels.size() - 1 < idx) { int toAdd = idx - (levels.size() - 1); for (int i = 0; i < toAdd; i++) levels.add(new MutableLevel<>(out, new MutableBlock<>())); } return levels.get(idx); } protected static void alignToBlock(SequentialWriter out) throws IOException { long endOfBlock = out.position(); if ((endOfBlock & (BLOCK_SIZE - 1)) != 0) // align on the block boundary if needed out.skipBytes((int) (FBUtilities.align(endOfBlock, BLOCK_SIZE) - endOfBlock)); } private class InMemoryTerm { protected final IndexedTerm term; public InMemoryTerm(IndexedTerm term) { this.term = term; } public int serializedSize() { return (termSize.isConstant() ? 0 : 2) + term.getBytes().remaining(); } public void serialize(DataOutputPlus out) throws IOException { if (termSize.isConstant()) { out.write(term.getBytes()); } else { out.writeShort(term.getBytes().remaining() | ((marksPartials && term.isPartial() ? 1 : 0) << IS_PARTIAL_BIT)); out.write(term.getBytes()); } } } private class InMemoryPointerTerm extends InMemoryTerm { protected final int blockCnt; public InMemoryPointerTerm(IndexedTerm term, int blockCnt) { super(term); this.blockCnt = blockCnt; } public int serializedSize() { return super.serializedSize() + 4; } public void serialize(DataOutputPlus out) throws IOException { super.serialize(out); out.writeInt(blockCnt); } } private class InMemoryDataTerm extends InMemoryTerm { private final TokenTreeBuilder keys; public InMemoryDataTerm(IndexedTerm term, TokenTreeBuilder keys) { super(term); this.keys = keys; } } private class MutableLevel<T extends InMemoryTerm> { private final LongArrayList blockOffsets = new LongArrayList(); protected final SequentialWriter out; private final MutableBlock<T> inProcessBlock; private InMemoryPointerTerm lastTerm; public MutableLevel(SequentialWriter out, MutableBlock<T> block) { this.out = out; this.inProcessBlock = block; }
Returns:If we flushed a block, return the last term of that block; else, null.
/** * @return If we flushed a block, return the last term of that block; else, null. */
public InMemoryPointerTerm add(T term) throws IOException { InMemoryPointerTerm toPromote = null; if (!inProcessBlock.hasSpaceFor(term)) { flush(); toPromote = lastTerm; } inProcessBlock.add(term); lastTerm = new InMemoryPointerTerm(term.term, blockOffsets.size()); return toPromote; } public void flush() throws IOException { blockOffsets.add(out.position()); inProcessBlock.flushAndClear(out); } public void finalFlush() throws IOException { flush(); } public void flushMetadata() throws IOException { flushMetadata(blockOffsets); } protected void flushMetadata(LongArrayList longArrayList) throws IOException { out.writeInt(longArrayList.size()); for (int i = 0; i < longArrayList.size(); i++) out.writeLong(longArrayList.get(i)); } }
builds standard data blocks and super blocks, as well
/** builds standard data blocks and super blocks, as well */
private class DataBuilderLevel extends MutableLevel<InMemoryDataTerm> { private final LongArrayList superBlockOffsets = new LongArrayList();
count of regular data blocks written since current super block was init'd
/** count of regular data blocks written since current super block was init'd */
private int dataBlocksCnt; private TokenTreeBuilder superBlockTree; public DataBuilderLevel(SequentialWriter out, MutableBlock<InMemoryDataTerm> block) { super(out, block); superBlockTree = new DynamicTokenTreeBuilder(); } public InMemoryPointerTerm add(InMemoryDataTerm term) throws IOException { InMemoryPointerTerm ptr = super.add(term); if (ptr != null) { dataBlocksCnt++; flushSuperBlock(false); } superBlockTree.add(term.keys); return ptr; } public void flushSuperBlock(boolean force) throws IOException { if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.isEmpty())) { superBlockOffsets.add(out.position()); superBlockTree.finish().write(out); alignToBlock(out); dataBlocksCnt = 0; superBlockTree = new DynamicTokenTreeBuilder(); } } public void finalFlush() throws IOException { super.flush(); flushSuperBlock(true); } public void flushMetadata() throws IOException { super.flushMetadata(); flushMetadata(superBlockOffsets); } } private static class MutableBlock<T extends InMemoryTerm> { protected final DataOutputBufferFixed buffer; protected final ShortArrayList offsets; public MutableBlock() { buffer = new DataOutputBufferFixed(BLOCK_SIZE); offsets = new ShortArrayList(); } public final void add(T term) throws IOException { offsets.add((short) buffer.position()); addInternal(term); } protected void addInternal(T term) throws IOException { term.serialize(buffer); } public boolean hasSpaceFor(T element) { return sizeAfter(element) < BLOCK_SIZE; } protected int sizeAfter(T element) { return getWatermark() + 4 + element.serializedSize(); } protected int getWatermark() { return 4 + offsets.size() * 2 + (int) buffer.position(); } public void flushAndClear(SequentialWriter out) throws IOException { out.writeInt(offsets.size()); for (int i = 0; i < offsets.size(); i++) out.writeShort(offsets.get(i)); out.write(buffer.buffer()); alignToBlock(out); offsets.clear(); buffer.clear(); } } private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm> { private static final int MAX_KEYS_SPARSE = 5; private final AbstractType<?> comparator; private final Mode mode; private int offset = 0; private final List<TokenTreeBuilder> containers = new ArrayList<>(); private TokenTreeBuilder combinedIndex; public MutableDataBlock(AbstractType<?> comparator, Mode mode) { this.comparator = comparator; this.mode = mode; this.combinedIndex = initCombinedIndex(); } protected void addInternal(InMemoryDataTerm term) throws IOException { TokenTreeBuilder keys = term.keys; if (mode == Mode.SPARSE) { if (keys.getTokenCount() > MAX_KEYS_SPARSE) throw new IOException(String.format("Term - '%s' belongs to more than %d keys in %s mode, which is not allowed.", comparator.getString(term.term.getBytes()), MAX_KEYS_SPARSE, mode.name())); writeTerm(term, keys); } else { writeTerm(term, offset); offset += keys.serializedSize(); containers.add(keys); } if (mode == Mode.SPARSE) combinedIndex.add(keys); } protected int sizeAfter(InMemoryDataTerm element) { return super.sizeAfter(element) + ptrLength(element); } public void flushAndClear(SequentialWriter out) throws IOException { super.flushAndClear(out); out.writeInt(mode == Mode.SPARSE ? offset : -1); if (containers.size() > 0) { for (TokenTreeBuilder tokens : containers) tokens.write(out); } if (mode == Mode.SPARSE && combinedIndex != null) combinedIndex.finish().write(out); alignToBlock(out); containers.clear(); combinedIndex = initCombinedIndex(); offset = 0; } private int ptrLength(InMemoryDataTerm term) { return (term.keys.getTokenCount() > 5) ? 5 // 1 byte type + 4 byte offset to the tree : 1 + (8 * (int) term.keys.getTokenCount()); // 1 byte size + n 8 byte tokens } private void writeTerm(InMemoryTerm term, TokenTreeBuilder keys) throws IOException { term.serialize(buffer); buffer.writeByte((byte) keys.getTokenCount()); for (Pair<Long, LongSet> key : keys) buffer.writeLong(key.left); } private void writeTerm(InMemoryTerm term, int offset) throws IOException { term.serialize(buffer); buffer.writeByte(0x0); buffer.writeInt(offset); } private TokenTreeBuilder initCombinedIndex() { return mode == Mode.SPARSE ? new DynamicTokenTreeBuilder() : null; } } }