/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.cql3.statements;

import java.util.*;
import java.util.stream.Collectors;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.RawSelector;
import org.apache.cassandra.cql3.selection.Selectable;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.DurationType;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.exceptions.AlreadyExistsException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.UnauthorizedException;
import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.Event;

public class CreateViewStatement extends SchemaAlteringStatement
{
    private static final Logger logger = LoggerFactory.getLogger(CreateViewStatement.class);

    private final CFName baseName;
    private final List<RawSelector> selectClause;
    private final WhereClause whereClause;
    private final List<ColumnDefinition.Raw> partitionKeys;
    private final List<ColumnDefinition.Raw> clusteringKeys;
    public final CFProperties properties = new CFProperties();
    private final boolean ifNotExists;

    public CreateViewStatement(CFName viewName,
                               CFName baseName,
                               List<RawSelector> selectClause,
                               WhereClause whereClause,
                               List<ColumnDefinition.Raw> partitionKeys,
                               List<ColumnDefinition.Raw> clusteringKeys,
                               boolean ifNotExists)
    {
        super(viewName);
        this.baseName = baseName;
        this.selectClause = selectClause;
        this.whereClause = whereClause;
        this.partitionKeys = partitionKeys;
        this.clusteringKeys = clusteringKeys;
        this.ifNotExists = ifNotExists;
    }


    public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
    {
        if (!baseName.hasKeyspace())
            baseName.setKeyspace(keyspace(), true);
        state.hasColumnFamilyAccess(keyspace(), baseName.getColumnFamily(), Permission.ALTER);
    }

    public void validate(ClientState state) throws RequestValidationException
    {
        // We do validation in announceMigration to reduce doubling up of work
    }

    private interface AddColumn
    {
        void add(ColumnIdentifier identifier, AbstractType<?> type);
    }

    private void add(CFMetaData baseCfm, Iterable<ColumnIdentifier> columns, AddColumn adder)
    {
        for (ColumnIdentifier column : columns)
        {
            AbstractType<?> type = baseCfm.getColumnDefinition(column).type;
            if (properties.definedOrdering.containsKey(column))
            {
                boolean desc = properties.definedOrdering.get(column);
                if (!desc && type.isReversed())
                {
                    type = ((ReversedType)type).baseType;
                }
                else if (desc && !type.isReversed())
                {
                    type = ReversedType.getInstance(type);
                }
            }
            adder.add(column, type);
        }
    }

    public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException
    {
        if (!DatabaseDescriptor.getEnableMaterializedViews())
        {
            throw new InvalidRequestException("Materialized views are disabled. Enable in cassandra.yaml to use.");
        }

        // We need to make sure that:
        //  - primary key includes all columns in base table's primary key
        //  - make sure that the select statement does not have anything other than columns
        //    and their names match the base table's names
        //  - make sure that primary key does not include any collections
        //  - make sure there is no where clause in the select statement
        //  - make sure there is not currently a table or view
        //  - make sure baseTable gcGraceSeconds > 0

        properties.validate();

        if (properties.useCompactStorage)
            throw new InvalidRequestException("Cannot use 'COMPACT STORAGE' when defining a materialized view");

        // We enforce the keyspace because if the RF is different, the logic to wait for a
        // specific replica would break
        if (!baseName.getKeyspace().equals(keyspace()))
            throw new InvalidRequestException("Cannot create a materialized view on a table in a separate keyspace");

        CFMetaData cfm = ThriftValidation.validateColumnFamily(baseName.getKeyspace(), baseName.getColumnFamily());

        if (cfm.isCounter())
            throw new InvalidRequestException("Materialized views are not supported on counter tables");
        if (cfm.isSuper())
            throw new InvalidRequestException("Materialized views are not supported on SuperColumn tables");
        if (cfm.isView())
            throw new InvalidRequestException("Materialized views cannot be created against other materialized views");

        if (cfm.params.gcGraceSeconds == 0)
        {
            throw new InvalidRequestException(String.format("Cannot create materialized view '%s' for base table " +
                                                            "'%s' with gc_grace_seconds of 0, since this value is " +
                                                            "used to TTL undelivered updates. Setting gc_grace_seconds" +
                                                            " too low might cause undelivered updates to expire " +
                                                            "before being replayed.", cfName.getColumnFamily(),
                                                            baseName.getColumnFamily()));
        }

        Set<ColumnIdentifier> included = Sets.newHashSetWithExpectedSize(selectClause.size());
        for (RawSelector selector : selectClause)
        {
            Selectable.Raw selectable = selector.selectable;
            if (selectable instanceof Selectable.WithFieldSelection.Raw)
                throw new InvalidRequestException("Cannot select out a part of type when defining a materialized view");
            if (selectable instanceof Selectable.WithFunction.Raw)
                throw new InvalidRequestException("Cannot use function when defining a materialized view");
            if (selectable instanceof Selectable.WritetimeOrTTL.Raw)
                throw new InvalidRequestException("Cannot use function when defining a materialized view");
            if (selector.alias != null)
                throw new InvalidRequestException("Cannot use alias when defining a materialized view");

            Selectable s = selectable.prepare(cfm);
            if (s instanceof Term.Raw)
                throw new InvalidRequestException("Cannot use terms in selection when defining a materialized view");

            ColumnDefinition cdef = (ColumnDefinition)s;
            included.add(cdef.name);
        }

        Set<ColumnDefinition.Raw> targetPrimaryKeys = new HashSet<>();
        for (ColumnDefinition.Raw identifier : Iterables.concat(partitionKeys, clusteringKeys))
        {
            if (!targetPrimaryKeys.add(identifier))
                throw new InvalidRequestException("Duplicate entry found in PRIMARY KEY: "+identifier);

            ColumnDefinition cdef = identifier.prepare(cfm);

            if (cdef.type.isMultiCell())
                throw new InvalidRequestException(String.format("Cannot use MultiCell column '%s' in PRIMARY KEY of materialized view", identifier));

            if (cdef.isStatic())
                throw new InvalidRequestException(String.format("Cannot use Static column '%s' in PRIMARY KEY of materialized view", identifier));

            if (cdef.type instanceof DurationType)
                throw new InvalidRequestException(String.format("Cannot use Duration column '%s' in PRIMARY KEY of materialized view", identifier));
        }

        // build the select statement
        Map<ColumnDefinition.Raw, Boolean> orderings = Collections.emptyMap();
        List<ColumnDefinition.Raw> groups = Collections.emptyList();
        SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, groups, false, true, false);

        SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null, null);

        ClientState state = ClientState.forInternalCalls();
        state.setKeyspace(keyspace());

        rawSelect.prepareKeyspace(state);
        rawSelect.setBoundVariables(getBoundVariables());

        ParsedStatement.Prepared prepared = rawSelect.prepare(true, queryState.getClientState());
        SelectStatement select = (SelectStatement) prepared.statement;
        StatementRestrictions restrictions = select.getRestrictions();

        if (!prepared.boundNames.isEmpty())
            throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements");

        // SEE CASSANDRA-13798, use it if the use case is append-only.
        final boolean allowFilteringNonKeyColumns = Boolean.parseBoolean(System.getProperty("cassandra.mv.allow_filtering_nonkey_columns_unsafe",
                                                                                            "false"));
        if (!restrictions.nonPKRestrictedColumns(false).isEmpty() && !allowFilteringNonKeyColumns)
        {
            throw new InvalidRequestException(
                                              String.format("Non-primary key columns cannot be restricted in the SELECT statement used"
                                                      + " for materialized view creation (got restrictions on: %s)",
                                                            restrictions.nonPKRestrictedColumns(false)
                                                                        .stream()
                                                                        .map(def -> def.name.toString())
                                                                        .collect(Collectors.joining(", "))));
        }

        String whereClauseText = View.relationsToWhereClause(whereClause.relations);

        Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>();
        for (ColumnDefinition definition : Iterables.concat(cfm.partitionKeyColumns(), cfm.clusteringColumns()))
            basePrimaryKeyCols.add(definition.name);

        List<ColumnIdentifier> targetClusteringColumns = new ArrayList<>();
        List<ColumnIdentifier> targetPartitionKeys = new ArrayList<>();

        // This is only used as an intermediate state; this is to catch whether multiple non-PK columns are used
        boolean hasNonPKColumn = false;
        for (ColumnDefinition.Raw raw : partitionKeys)
            hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetPartitionKeys, restrictions);

        for (ColumnDefinition.Raw raw : clusteringKeys)
            hasNonPKColumn |= getColumnIdentifier(cfm, basePrimaryKeyCols, hasNonPKColumn, raw, targetClusteringColumns, restrictions);

        // We need to include all of the primary key columns from the base table in order to make sure that we do not
        // overwrite values in the view. We cannot support "collapsing" the base table into a smaller number of rows in
        // the view because if we need to generate a tombstone, we have no way of knowing which value is currently being
        // used in the view and whether or not to generate a tombstone. In order to not surprise our users, we require
        // that they include all of the columns. We provide them with a list of all of the columns left to include.
        boolean missingClusteringColumns = false;
        StringBuilder columnNames = new StringBuilder();
        List<ColumnIdentifier> includedColumns = new ArrayList<>();
        for (ColumnDefinition def : cfm.allColumns())
        {
            ColumnIdentifier identifier = def.name;
            boolean includeDef = included.isEmpty() || included.contains(identifier);

            if (includeDef && def.isStatic())
            {
                throw new InvalidRequestException(String.format("Unable to include static column '%s' which would be included by Materialized View SELECT * statement", identifier));
            }

            boolean defInTargetPrimaryKey = targetClusteringColumns.contains(identifier)
                                            || targetPartitionKeys.contains(identifier);

            if (includeDef && !defInTargetPrimaryKey)
            {
                includedColumns.add(identifier);
            }
            if (!def.isPrimaryKeyColumn()) continue;

            if (!defInTargetPrimaryKey)
            {
                if (missingClusteringColumns)
                    columnNames.append(',');
                else
                    missingClusteringColumns = true;
                columnNames.append(identifier);
            }
        }
        if (missingClusteringColumns)
            throw new InvalidRequestException(String.format("Cannot create Materialized View %s without primary key columns from base %s (%s)",
                                                            columnFamily(), baseName.getColumnFamily(), columnNames.toString()));

        if (targetPartitionKeys.isEmpty())
            throw new InvalidRequestException("Must select at least a column for a Materialized View");

        if (targetClusteringColumns.isEmpty())
            throw new InvalidRequestException("No columns are defined for Materialized View other than primary key");

        TableParams params = properties.properties.asNewTableParams();

        if (params.defaultTimeToLive > 0)
        {
            throw new InvalidRequestException("Cannot set default_time_to_live for a materialized view. " +
                                              "Data in a materialized view always expire at the same time than " +
                                              "the corresponding data in the parent table.");
        }

        CFMetaData.Builder cfmBuilder = CFMetaData.Builder.createView(keyspace(), columnFamily());
        add(cfm, targetPartitionKeys, cfmBuilder::addPartitionKey);
        add(cfm, targetClusteringColumns, cfmBuilder::addClusteringColumn);
        add(cfm, includedColumns, cfmBuilder::addRegularColumn);
        cfmBuilder.withId(properties.properties.getId());

        CFMetaData viewCfm = cfmBuilder.build().params(params);
        ViewDefinition definition = new ViewDefinition(keyspace(),
                                                       columnFamily(),
                                                       Schema.instance.getId(keyspace(), baseName.getColumnFamily()),
                                                       baseName.getColumnFamily(),
                                                       included.isEmpty(),
                                                       rawSelect,
                                                       whereClauseText,
                                                       viewCfm);

        logger.warn("Creating materialized view {} for {}.{}. {}",
                    definition.viewName, cfm.ksName, cfm.cfName, View.USAGE_WARNING);

        try
        {
            ClientWarn.instance.warn(View.USAGE_WARNING);
            MigrationManager.announceNewView(definition, isLocalOnly);
            return new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, keyspace(), columnFamily());
        }
        catch (AlreadyExistsException e)
        {
            if (ifNotExists)
                return null;
            throw e;
        }
    }

    private static boolean getColumnIdentifier(CFMetaData cfm,
                                               Set<ColumnIdentifier> basePK,
                                               boolean hasNonPKColumn,
                                               ColumnDefinition.Raw raw,
                                               List<ColumnIdentifier> columns,
                                               StatementRestrictions restrictions)
    {
        ColumnDefinition def = raw.prepare(cfm);

        boolean isPk = basePK.contains(def.name);
        if (!isPk && hasNonPKColumn)
            throw new InvalidRequestException(String.format("Cannot include more than one non-primary key column '%s' in materialized view primary key", def.name));

        // We don't need to include the "IS NOT NULL" filter on a non-composite partition key
        // because we will never allow a single partition key to be NULL
        boolean isSinglePartitionKey = def.isPartitionKey()
                                       && cfm.partitionKeyColumns().size() == 1;
        if (!isSinglePartitionKey && !restrictions.isRestricted(def))
            throw new InvalidRequestException(String.format("Primary key column '%s' is required to be filtered by 'IS NOT NULL'", def.name));

        columns.add(def.name);
        return !isPk;
    }
}