/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.db.view;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.CompactionInfo.Unit;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;

public class ViewBuilder extends CompactionInfo.Holder
{
    private final ColumnFamilyStore baseCfs;
    private final View view;
    private final UUID compactionId;
    private volatile Token prevToken = null;

    private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);

    private volatile boolean isStopped = false;

    public ViewBuilder(ColumnFamilyStore baseCfs, View view)
    {
        this.baseCfs = baseCfs;
        this.view = view;
        compactionId = UUIDGen.getTimeUUID();
    }

    private void buildKey(DecoratedKey key)
    {
        ReadQuery selectQuery = view.getReadQuery();

        if (!selectQuery.selectsKey(key))
        {
            logger.trace("Skipping {}, view query filters", key);
            return;
        }

        int nowInSec = FBUtilities.nowInSeconds();
        SinglePartitionReadCommand command = view.getSelectStatement().internalReadForView(key, nowInSec);

        // We're rebuilding everything from what's on disk, so we read everything, consider that as new updates
        // and pretend that there is nothing pre-existing.
        UnfilteredRowIterator empty = UnfilteredRowIterators.noRowsIterator(baseCfs.metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, false);

        try (ReadExecutionController orderGroup = command.executionController();
             UnfilteredRowIterator data = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command))
        {
            Iterator<Collection<Mutation>> mutations = baseCfs.keyspace.viewManager
                                                      .forTable(baseCfs.metadata)
                                                      .generateViewUpdates(Collections.singleton(view), data, empty, nowInSec, true);

            AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
            mutations.forEachRemaining(m -> StorageProxy.mutateMV(key.getKey(), m, true, noBase, System.nanoTime()));
        }
    }

    public void run()
    {
        logger.debug("Starting view builder for {}.{}", baseCfs.metadata.ksName, view.name);
        logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name);
        UUID localHostId = SystemKeyspace.getLocalHostId();
        String ksname = baseCfs.metadata.ksName, viewName = view.name;

        if (SystemKeyspace.isViewBuilt(ksname, viewName))
        {
            logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
            if (!SystemKeyspace.isViewStatusReplicated(ksname, viewName))
                updateDistributed(ksname, viewName, localHostId);
            return;
        }

        Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);

        final Pair<Integer, Token> buildStatus = SystemKeyspace.getViewBuildStatus(ksname, viewName);
        Token lastToken;
        Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
        if (buildStatus == null)
        {
            logger.debug("Starting new view build. flushing base table {}.{}", baseCfs.metadata.ksName, baseCfs.name);
            lastToken = null;

            //We don't track the generation number anymore since if a rebuild is stopped and
            //restarted the max generation filter may yield no sstables due to compactions.
            //We only care about max generation *during* a build, not across builds.
            //see CASSANDRA-13405
            SystemKeyspace.beginViewBuild(ksname, viewName, 0);
        }
        else
        {
            lastToken = buildStatus.right;
            logger.debug("Resuming view build from token {}. flushing base table {}.{}", lastToken, baseCfs.metadata.ksName, baseCfs.name);
        }

        baseCfs.forceBlockingFlush();
        function = org.apache.cassandra.db.lifecycle.View.selectFunction(SSTableSet.CANONICAL);

        prevToken = lastToken;
        long keysBuilt = 0;
        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(function).refs;
             ReducingKeyIterator iter = new ReducingKeyIterator(sstables))
        {
            SystemDistributedKeyspace.startViewBuild(ksname, viewName, localHostId);
            while (!isStopped && iter.hasNext())
            {
                DecoratedKey key = iter.next();
                Token token = key.getToken();
                if (lastToken == null || lastToken.compareTo(token) < 0)
                {
                    for (Range<Token> range : ranges)
                    {
                        if (range.contains(token))
                        {
                            buildKey(key);
                            ++keysBuilt;

                            if (prevToken == null || prevToken.compareTo(token) != 0)
                            {
                                SystemKeyspace.updateViewBuildStatus(ksname, viewName, key.getToken());
                                prevToken = token;
                            }
                        }
                    }

                    lastToken = null;
                }
            }

            if (!isStopped)
            {
                logger.debug("Marking view({}.{}) as built covered {} keys ", ksname, viewName, keysBuilt);
                SystemKeyspace.finishViewBuildStatus(ksname, viewName);
                updateDistributed(ksname, viewName, localHostId);
            }
            else
            {
                logger.debug("Stopped build for view({}.{}) after covering {} keys", ksname, viewName, keysBuilt);
            }
        }
        catch (Exception e)
        {
            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this),
                                                         5,
                                                         TimeUnit.MINUTES);
            logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
        }
    }

    private void updateDistributed(String ksname, String viewName, UUID localHostId)
    {
        try
        {
            SystemDistributedKeyspace.successfulViewBuild(ksname, viewName, localHostId);
            SystemKeyspace.setViewBuiltReplicated(ksname, viewName);
        }
        catch (Exception e)
        {
            ScheduledExecutors.nonPeriodicTasks.schedule(() -> CompactionManager.instance.submitViewBuilder(this),
                                                         5,
                                                         TimeUnit.MINUTES);
            logger.warn("Failed to updated the distributed status of view, sleeping 5 minutes before retrying", e);
        }
    }

    public CompactionInfo getCompactionInfo()
    {
        long rangesCompleted = 0, rangesTotal = 0;
        Token lastToken = prevToken;

        // This approximation is not very accurate, but since we do not have a method which allows us to calculate the
        // percentage of a range covered by a second range, this is the best approximation that we can calculate.
        // Instead, we just count the total number of ranges that haven't been seen by the node (we use the order of
        // the tokens to determine whether they have been seen yet or not), and the total number of ranges that a node
        // has.
        for (Range<Token> range : StorageService.instance.getLocalRanges(baseCfs.keyspace.getName()))
        {
            rangesTotal++;
             if ((lastToken != null) && lastToken.compareTo(range.right) > 0)
                 rangesCompleted++;
          }
         return new CompactionInfo(baseCfs.metadata, OperationType.VIEW_BUILD, rangesCompleted, rangesTotal, Unit.RANGES, compactionId);
    }

    public void stop()
    {
        isStopped = true;
    }
}