/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.index.internal.composites;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.internal.CassandraIndexSearcher;
import org.apache.cassandra.index.internal.IndexEntry;
import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.concurrent.OpOrder;


public class CompositesSearcher extends CassandraIndexSearcher
{
    public CompositesSearcher(ReadCommand command,
                              RowFilter.Expression expression,
                              CassandraIndex index)
    {
        super(command, expression, index);
    }

    private boolean isMatchingEntry(DecoratedKey partitionKey, IndexEntry entry, ReadCommand command)
    {
        return command.selectsKey(partitionKey) && command.selectsClustering(partitionKey, entry.indexedEntryClustering);
    }

    private boolean isStaticColumn()
    {
        return index.getIndexedColumn().isStatic();
    }

    protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
                                                             final RowIterator indexHits,
                                                             final ReadCommand command,
                                                             final ReadExecutionController executionController)
    {
        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;

        return new UnfilteredPartitionIterator()
        {
            private IndexEntry nextEntry;

            private UnfilteredRowIterator next;

            public boolean isForThrift()
            {
                return command.isForThrift();
            }

            public CFMetaData metadata()
            {
                return command.metadata();
            }

            public boolean hasNext()
            {
                return prepareNext();
            }

            public UnfilteredRowIterator next()
            {
                if (next == null)
                    prepareNext();

                UnfilteredRowIterator toReturn = next;
                next = null;
                return toReturn;
            }

            private boolean prepareNext()
            {
                while (true)
                {
                    if (next != null)
                        return true;

                    if (nextEntry == null)
                    {
                        if (!indexHits.hasNext())
                            return false;

                        nextEntry = index.decodeEntry(indexKey, indexHits.next());
                    }

                    SinglePartitionReadCommand dataCmd;
                    DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey);
                    List<IndexEntry> entries = new ArrayList<>();
                    if (isStaticColumn())
                    {
                        // The index hit may not match the commad key constraint
                        if (!isMatchingEntry(partitionKey, nextEntry, command)) {
                            nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                            continue;
                        }

                        // If the index is on a static column, we just need to do a full read on the partition.
                        // Note that we want to re-use the command.columnFilter() in case of future change.
                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
                                                                    command.nowInSec(),
                                                                    command.columnFilter(),
                                                                    RowFilter.NONE,
                                                                    DataLimits.NONE,
                                                                    partitionKey,
                                                                    new ClusteringIndexSliceFilter(Slices.ALL, false));
                        entries.add(nextEntry);
                        nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                    }
                    else
                    {
                        // Gather all index hits belonging to the same partition and query the data for those hits.
                        // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
                        // 1 read per index hit. However, this basically mean materializing all hits for a partition
                        // in memory so we should consider adding some paging mechanism. However, index hits should
                        // be relatively small so it's much better than the previous code that was materializing all
                        // *data* for a given partition.
                        BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
                        while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
                        {
                            // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
                            if (isMatchingEntry(partitionKey, nextEntry, command))
                            {
                                clusterings.add(nextEntry.indexedEntryClustering);
                                entries.add(nextEntry);
                            }

                            nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                        }

                        // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
                        if (clusterings.isEmpty())
                            continue;

                        // Query the gathered index hits. We still need to filter stale hits from the resulting query.
                        ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
                        dataCmd = SinglePartitionReadCommand.create(isForThrift(),
                                                                    index.baseCfs.metadata,
                                                                    command.nowInSec(),
                                                                    command.columnFilter(),
                                                                    command.rowFilter(),
                                                                    DataLimits.NONE,
                                                                    partitionKey,
                                                                    filter,
                                                                    null);
                    }

                    @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
                    // by the next caller of next, or through closing this iterator is this come before.
                    UnfilteredRowIterator dataIter =
                        filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs, executionController),
                                           indexKey.getKey(),
                                           entries,
                                           executionController.writeOpOrderGroup(),
                                           command.nowInSec());

                    if (dataIter.isEmpty())
                    {
                        dataIter.close();
                        continue;
                    }

                    next = dataIter;
                    return true;
                }
            }

            public void remove()
            {
                throw new UnsupportedOperationException();
            }

            public void close()
            {
                indexHits.close();
                if (next != null)
                    next.close();
            }
        };
    }

    private void deleteAllEntries(final List<IndexEntry> entries, final OpOrder.Group writeOp, final int nowInSec)
    {
        entries.forEach(entry ->
            index.deleteStaleEntry(entry.indexValue,
                                   entry.indexClustering,
                                   new DeletionTime(entry.timestamp, nowInSec),
                                   writeOp));
    }

    // We assume all rows in dataIter belong to the same partition.
    private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
                                                     final ByteBuffer indexValue,
                                                     final List<IndexEntry> entries,
                                                     final OpOrder.Group writeOp,
                                                     final int nowInSec)
    {
        // collect stale index entries and delete them when we close this iterator
        final List<IndexEntry> staleEntries = new ArrayList<>();

        // if there is a partition level delete in the base table, we need to filter
        // any index entries which would be shadowed by it
        if (!dataIter.partitionLevelDeletion().isLive())
        {
            DeletionTime deletion = dataIter.partitionLevelDeletion();
            entries.forEach(e -> {
                if (deletion.deletes(e.timestamp))
                    staleEntries.add(e);
            });
        }

        UnfilteredRowIterator iteratorToReturn = null;
        if (isStaticColumn())
        {
            if (entries.size() != 1)
                throw new AssertionError("A partition should have at most one index within a static column index");

            iteratorToReturn = dataIter;
            if (index.isStale(dataIter.staticRow(), indexValue, nowInSec))
            {
                // The entry is staled, we return no rows in this partition.
                staleEntries.addAll(entries);
                iteratorToReturn = UnfilteredRowIterators.noRowsIterator(dataIter.metadata(),
                                                                         dataIter.partitionKey(),
                                                                         Rows.EMPTY_STATIC_ROW,
                                                                         dataIter.partitionLevelDeletion(),
                                                                         dataIter.isReverseOrder());
            }
            deleteAllEntries(staleEntries, writeOp, nowInSec);
        }
        else
        {
            ClusteringComparator comparator = dataIter.metadata().comparator;

            class Transform extends Transformation
            {
                private int entriesIdx;

                @Override
                public Row applyToRow(Row row)
                {
                    IndexEntry entry = findEntry(row.clustering());
                    if (!index.isStale(row, indexValue, nowInSec))
                        return row;

                    staleEntries.add(entry);
                    return null;
                }

                private IndexEntry findEntry(Clustering clustering)
                {
                    assert entriesIdx < entries.size();
                    while (entriesIdx < entries.size())
                    {
                        IndexEntry entry = entries.get(entriesIdx++);
                        // The entries are in clustering order. So that the requested entry should be the
                        // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
                        // that have no corresponding row in the base table typically because of a range
                        // tombstone or partition level deletion. Delete such stale entries.
                        // For static column, we only need to compare the partition key, otherwise we compare
                        // the whole clustering.
                        int cmp = comparator.compare(entry.indexedEntryClustering, clustering);
                        assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
                        if (cmp == 0)
                            return entry;
                        else
                            staleEntries.add(entry);
                    }
                    // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
                    throw new AssertionError();
                }

                @Override
                public void onPartitionClose()
                {
                    deleteAllEntries(staleEntries, writeOp, nowInSec);
                }
            }
            iteratorToReturn = Transformation.apply(dataIter, new Transform());
        }

        return iteratorToReturn;
    }
}