/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.view;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.btree.BTreeSet;


Groups all the views for a given table.
/** * Groups all the views for a given table. */
public class TableViews extends AbstractCollection<View> { private final CFMetaData baseTableMetadata; // We need this to be thread-safe, but the number of times this is changed (when a view is created in the keyspace) // is massively exceeded by the number of times it's read (for every mutation on the keyspace), so a copy-on-write // list is the best option. private final List<View> views = new CopyOnWriteArrayList(); public TableViews(CFMetaData baseTableMetadata) { this.baseTableMetadata = baseTableMetadata; } public int size() { return views.size(); } public Iterator<View> iterator() { return views.iterator(); } public boolean contains(String viewName) { return Iterables.any(views, view -> view.name.equals(viewName)); } public boolean add(View view) { // We should have validated that there is no existing view with this name at this point assert !contains(view.name); return views.add(view); } public Iterable<ColumnFamilyStore> allViewsCfs() { Keyspace keyspace = Keyspace.open(baseTableMetadata.ksName); return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName)); } public void forceBlockingFlush() { for (ColumnFamilyStore viewCfs : allViewsCfs()) viewCfs.forceBlockingFlush(); } public void dumpMemtables() { for (ColumnFamilyStore viewCfs : allViewsCfs()) viewCfs.dumpMemtable(); } public void truncateBlocking(CommitLogPosition replayAfter, long truncatedAt) { for (ColumnFamilyStore viewCfs : allViewsCfs()) { viewCfs.discardSSTables(truncatedAt); SystemKeyspace.saveTruncationRecord(viewCfs, truncatedAt, replayAfter); } } public void removeByName(String viewName) { views.removeIf(v -> v.name.equals(viewName)); }
Calculates and pushes updates to the views replicas. The replicas are determined by ViewUtils.getViewNaturalEndpoint(String, Token, Token).
Params:
  • update – an update on the base table represented by this object.
  • writeCommitLog – whether we should write the commit log for the view updates.
  • baseComplete – time from epoch in ms that the local base mutation was (or will be) completed
/** * Calculates and pushes updates to the views replicas. The replicas are determined by * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}. * * @param update an update on the base table represented by this object. * @param writeCommitLog whether we should write the commit log for the view updates. * @param baseComplete time from epoch in ms that the local base mutation was (or will be) completed */
public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong baseComplete) { assert update.metadata().cfId.equals(baseTableMetadata.cfId); Collection<View> views = updatedViews(update); if (views.isEmpty()) return; // Read modified rows int nowInSec = FBUtilities.nowInSeconds(); long queryStartNanoTime = System.nanoTime(); SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec); if (command == null) return; ColumnFamilyStore cfs = Keyspace.openAndGetStore(update.metadata()); long start = System.nanoTime(); Collection<Mutation> mutations; try (ReadExecutionController orderGroup = command.executionController(); UnfilteredRowIterator existings = UnfilteredPartitionIterators.getOnlyElement(command.executeLocally(orderGroup), command); UnfilteredRowIterator updates = update.unfilteredIterator()) { mutations = Iterators.getOnlyElement(generateViewUpdates(views, updates, existings, nowInSec, false)); } Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS); if (!mutations.isEmpty()) StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete, queryStartNanoTime); }
Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the mutation to be applied to the provided views.
Params:
  • views – the views potentially affected by updates.
  • updates – the base table updates being applied.
  • existings – the existing values for the rows affected by updates. This is used to decide if a view is obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass to updates is new.
  • nowInSec – the current time in seconds.
@paramseparateUpdates, if false, mutation is per partition.
Returns:the mutations to apply to the views. This can be empty.
/** * Given some updates on the base table of this object and the existing values for the rows affected by that update, generates the * mutation to be applied to the provided views. * * @param views the views potentially affected by {@code updates}. * @param updates the base table updates being applied. * @param existings the existing values for the rows affected by {@code updates}. This is used to decide if a view is * obsoleted by the update and should be removed, gather the values for columns that may not be part of the update if * a new view entry needs to be created, and compute the minimal updates to be applied if the view entry isn't changed * but has simply some updated values. This will be empty for view building as we want to assume anything we'll pass * to {@code updates} is new. * @param nowInSec the current time in seconds. * @param separateUpdates, if false, mutation is per partition. * @return the mutations to apply to the {@code views}. This can be empty. */
public Iterator<Collection<Mutation>> generateViewUpdates(Collection<View> views, UnfilteredRowIterator updates, UnfilteredRowIterator existings, int nowInSec, boolean separateUpdates) { assert updates.metadata().cfId.equals(baseTableMetadata.cfId); List<ViewUpdateGenerator> generators = new ArrayList<>(views.size()); for (View view : views) generators.add(new ViewUpdateGenerator(view, updates.partitionKey(), nowInSec)); DeletionTracker existingsDeletion = new DeletionTracker(existings.partitionLevelDeletion()); DeletionTracker updatesDeletion = new DeletionTracker(updates.partitionLevelDeletion()); /* * We iterate through the updates and the existing rows in parallel. This allows us to know the consequence * on the view of each update. */ PeekingIterator<Unfiltered> existingsIter = Iterators.peekingIterator(existings); PeekingIterator<Unfiltered> updatesIter = Iterators.peekingIterator(updates); while (existingsIter.hasNext() && updatesIter.hasNext()) { Unfiltered existing = existingsIter.peek(); Unfiltered update = updatesIter.peek(); Row existingRow; Row updateRow; int cmp = baseTableMetadata.comparator.compare(update, existing); if (cmp < 0) { // We have an update where there was nothing before if (update.isRangeTombstoneMarker()) { updatesDeletion.update(updatesIter.next()); continue; } updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion()); existingRow = emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion()); } else if (cmp > 0) { // We have something existing but no update (which will happen either because it's a range tombstone marker in // existing, or because we've fetched the existing row due to some partition/range deletion in the updates) if (existing.isRangeTombstoneMarker()) { existingsDeletion.update(existingsIter.next()); continue; } existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion()); updateRow = emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()); // The way we build the read command used for existing rows, we should always have updatesDeletion.currentDeletion() // that is not live, since we wouldn't have read the existing row otherwise. And we could assert that, but if we ever // change the read method so that it can slightly over-read in some case, that would be an easily avoiding bug lurking, // so we just handle the case. if (updateRow == null) continue; } else { // We're updating a row that had pre-existing data if (update.isRangeTombstoneMarker()) { assert existing.isRangeTombstoneMarker(); updatesDeletion.update(updatesIter.next()); existingsDeletion.update(existingsIter.next()); continue; } assert !existing.isRangeTombstoneMarker(); existingRow = ((Row)existingsIter.next()).withRowDeletion(existingsDeletion.currentDeletion()); updateRow = ((Row)updatesIter.next()).withRowDeletion(updatesDeletion.currentDeletion()); } addToViewUpdateGenerators(existingRow, updateRow, generators, nowInSec); } // We only care about more existing rows if the update deletion isn't live, i.e. if we had a partition deletion if (!updatesDeletion.currentDeletion().isLive()) { while (existingsIter.hasNext()) { Unfiltered existing = existingsIter.next(); // If it's a range tombstone, we don't care, we're only looking for existing entry that gets deleted by // the new partition deletion if (existing.isRangeTombstoneMarker()) continue; Row existingRow = (Row)existing; addToViewUpdateGenerators(existingRow, emptyRow(existingRow.clustering(), updatesDeletion.currentDeletion()), generators, nowInSec); } } if (separateUpdates) { final Collection<Mutation> firstBuild = buildMutations(baseTableMetadata, generators); return new Iterator<Collection<Mutation>>() { // If the previous values are already empty, this update must be either empty or exclusively appending. // In the case we are exclusively appending, we need to drop the build that was passed in and try to build a // new first update instead. // If there are no other updates, next will be null and the iterator will be empty. Collection<Mutation> next = firstBuild.isEmpty() ? buildNext() : firstBuild; private Collection<Mutation> buildNext() { while (updatesIter.hasNext()) { Unfiltered update = updatesIter.next(); // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates if (update.isRangeTombstoneMarker()) continue; Row updateRow = (Row) update; addToViewUpdateGenerators(emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion()), updateRow, generators, nowInSec); // If the updates have been filtered, then we won't have any mutations; we need to make sure that we // only return if the mutations are empty. Otherwise, we continue to search for an update which is // not filtered Collection<Mutation> mutations = buildMutations(baseTableMetadata, generators); if (!mutations.isEmpty()) return mutations; } return null; } public boolean hasNext() { return next != null; } public Collection<Mutation> next() { Collection<Mutation> mutations = next; next = buildNext(); assert !mutations.isEmpty() : "Expected mutations to be non-empty"; return mutations; } }; } else { while (updatesIter.hasNext()) { Unfiltered update = updatesIter.next(); // If it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it for view updates if (update.isRangeTombstoneMarker()) continue; Row updateRow = (Row) update; addToViewUpdateGenerators(emptyRow(updateRow.clustering(), existingsDeletion.currentDeletion()), updateRow, generators, nowInSec); } return Iterators.singletonIterator(buildMutations(baseTableMetadata, generators)); } }
Return the views that are potentially updated by the provided updates.
Params:
  • updates – the updates applied to the base table.
Returns:the views affected by updates.
/** * Return the views that are potentially updated by the provided updates. * * @param updates the updates applied to the base table. * @return the views affected by {@code updates}. */
public Collection<View> updatedViews(PartitionUpdate updates) { List<View> matchingViews = new ArrayList<>(views.size()); for (View view : views) { ReadQuery selectQuery = view.getReadQuery(); if (!selectQuery.selectsKey(updates.partitionKey())) continue; matchingViews.add(view); } return matchingViews; }
Returns the command to use to read the existing rows required to generate view updates for the provided base base updates.
Params:
  • updates – the base table updates being applied.
  • views – the views potentially affected by updates.
  • nowInSec – the current time in seconds.
Returns:the command to use to read the base table rows required to generate view updates for updates.
/** * Returns the command to use to read the existing rows required to generate view updates for the provided base * base updates. * * @param updates the base table updates being applied. * @param views the views potentially affected by {@code updates}. * @param nowInSec the current time in seconds. * @return the command to use to read the base table rows required to generate view updates for {@code updates}. */
private SinglePartitionReadCommand readExistingRowsCommand(PartitionUpdate updates, Collection<View> views, int nowInSec) { Slices.Builder sliceBuilder = null; DeletionInfo deletionInfo = updates.deletionInfo(); CFMetaData metadata = updates.metadata(); DecoratedKey key = updates.partitionKey(); // TODO: This is subtle: we need to gather all the slices that we have to fetch between partition del, range tombstones and rows. if (!deletionInfo.isLive()) { sliceBuilder = new Slices.Builder(metadata.comparator); // Everything covered by a deletion might invalidate an existing view entry, which means we must read it to know. In practice // though, the views involved might filter some base table clustering columns, in which case we can restrict what we read // using those restrictions. // If there is a partition deletion, then we can simply take each slices from each view select filter. They may overlap but // the Slices.Builder handles that for us. Note that in many case this will just involve reading everything (as soon as any // view involved has no clustering restrictions for instance). // For range tombstone, we should theoretically take the difference between the range tombstoned and the slices selected // by every views, but as we don't an easy way to compute that right now, we keep it simple and just use the tombstoned // range. // TODO: we should improve that latter part. if (!deletionInfo.getPartitionDeletion().isLive()) { for (View view : views) sliceBuilder.addAll(view.getSelectStatement().clusteringIndexFilterAsSlices()); } else { assert deletionInfo.hasRanges(); Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false); while (iter.hasNext()) sliceBuilder.add(iter.next().deletedSlice()); } } // We need to read every row that is updated, unless we can prove that it has no impact on any view entries. // If we had some slices from the deletions above, we'll continue using that. Otherwise, it's more efficient to build // a names query. BTreeSet.Builder<Clustering> namesBuilder = sliceBuilder == null ? BTreeSet.builder(metadata.comparator) : null; for (Row row : updates) { // Don't read the existing state if we can prove the update won't affect any views if (!affectsAnyViews(key, row, views)) continue; if (namesBuilder == null) sliceBuilder.add(Slice.make(row.clustering())); else namesBuilder.add(row.clustering()); } NavigableSet<Clustering> names = namesBuilder == null ? null : namesBuilder.build(); // If we have a slice builder, it means we had some deletions and we have to read. But if we had // only row updates, it's possible none of them affected the views, in which case we have nothing // to do. if (names != null && names.isEmpty()) return null; ClusteringIndexFilter clusteringFilter = names == null ? new ClusteringIndexSliceFilter(sliceBuilder.build(), false) : new ClusteringIndexNamesFilter(names, false); // since unselected columns also affect view liveness, we need to query all base columns if base and view have same key columns. // If we have more than one view, we should merge the queried columns by each views but to keep it simple we just // include everything. We could change that in the future. ColumnFilter queriedColumns = views.size() == 1 && metadata.enforceStrictLiveness() ? Iterables.getOnlyElement(views).getSelectStatement().queriedColumns() : ColumnFilter.all(metadata); // Note that the views could have restrictions on regular columns, but even if that's the case we shouldn't apply those // when we read, because even if an existing row doesn't match the view filter, the update can change that in which // case we'll need to know the existing content. There is also no easy way to merge those RowFilter when we have multiple views. // TODO: we could still make sense to special case for when there is a single view and a small number of updates (and // no deletions). Indeed, in that case we could check whether any of the update modify any of the restricted regular // column, and if that's not the case we could use view filter. We keep it simple for now though. RowFilter rowFilter = RowFilter.NONE; return SinglePartitionReadCommand.create(metadata, nowInSec, queriedColumns, rowFilter, DataLimits.NONE, key, clusteringFilter); } private boolean affectsAnyViews(DecoratedKey partitionKey, Row update, Collection<View> views) { for (View view : views) { if (view.mayBeAffectedBy(partitionKey, update)) return true; } return false; }
Given an existing base row and the update that we're going to apply to this row, generate the modifications to apply to MVs using the provided ViewUpdateGenerators.
Params:
  • existingBaseRow – the base table row as it is before an update.
  • updateBaseRow – the newly updates made to existingBaseRow.
  • generators – the view update generators to add the new changes to.
  • nowInSec – the current time in seconds. Used to decide if data is live or not.
/** * Given an existing base row and the update that we're going to apply to this row, generate the modifications * to apply to MVs using the provided {@code ViewUpdateGenerator}s. * * @param existingBaseRow the base table row as it is before an update. * @param updateBaseRow the newly updates made to {@code existingBaseRow}. * @param generators the view update generators to add the new changes to. * @param nowInSec the current time in seconds. Used to decide if data is live or not. */
private static void addToViewUpdateGenerators(Row existingBaseRow, Row updateBaseRow, Collection<ViewUpdateGenerator> generators, int nowInSec) { // Having existing empty is useful, it just means we'll insert a brand new entry for updateBaseRow, // but if we have no update at all, we shouldn't get there. assert !updateBaseRow.isEmpty(); // We allow existingBaseRow to be null, which we treat the same as being empty as an small optimization // to avoid allocating empty row objects when we know there was nothing existing. Row mergedBaseRow = existingBaseRow == null ? updateBaseRow : Rows.merge(existingBaseRow, updateBaseRow, nowInSec); for (ViewUpdateGenerator generator : generators) generator.addBaseTableUpdate(existingBaseRow, mergedBaseRow); } private static Row emptyRow(Clustering clustering, DeletionTime deletion) { // Returning null for an empty row is slightly ugly, but the case where there is no pre-existing row is fairly common // (especially when building the view), so we want to avoid a dummy allocation of an empty row every time. // And MultiViewUpdateBuilder knows how to deal with that. return deletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(deletion)); }
Extracts (and potentially groups) the mutations generated by the provided view update generator. Returns the mutation that needs to be done to the views given the base table updates passed to addBaseTableUpdate.
Params:
  • baseTableMetadata – the metadata for the base table being updated.
  • generators – the generators from which to extract the view mutations from.
Returns:the mutations created by all the generators in generators.
/** * Extracts (and potentially groups) the mutations generated by the provided view update generator. * Returns the mutation that needs to be done to the views given the base table updates * passed to {@link #addBaseTableUpdate}. * * @param baseTableMetadata the metadata for the base table being updated. * @param generators the generators from which to extract the view mutations from. * @return the mutations created by all the generators in {@code generators}. */
private Collection<Mutation> buildMutations(CFMetaData baseTableMetadata, List<ViewUpdateGenerator> generators) { // One view is probably common enough and we can optimize a bit easily if (generators.size() == 1) { ViewUpdateGenerator generator = generators.get(0); Collection<PartitionUpdate> updates = generator.generateViewUpdates(); List<Mutation> mutations = new ArrayList<>(updates.size()); for (PartitionUpdate update : updates) mutations.add(new Mutation(update)); generator.clear(); return mutations; } Map<DecoratedKey, Mutation> mutations = new HashMap<>(); for (ViewUpdateGenerator generator : generators) { for (PartitionUpdate update : generator.generateViewUpdates()) { DecoratedKey key = update.partitionKey(); Mutation mutation = mutations.get(key); if (mutation == null) { mutation = new Mutation(baseTableMetadata.ksName, key); mutations.put(key, mutation); } mutation.add(update); } generator.clear(); } return mutations.values(); }
A simple helper that tracks for a given UnfilteredRowIterator what is the current deletion at any time of the iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise.
/** * A simple helper that tracks for a given {@code UnfilteredRowIterator} what is the current deletion at any time of the * iteration. It will be the currently open range tombstone deletion if there is one and the partition deletion otherwise. */
private static class DeletionTracker { private final DeletionTime partitionDeletion; private DeletionTime deletion; public DeletionTracker(DeletionTime partitionDeletion) { this.partitionDeletion = partitionDeletion; } public void update(Unfiltered marker) { assert marker instanceof RangeTombstoneMarker; RangeTombstoneMarker rtm = (RangeTombstoneMarker)marker; this.deletion = rtm.isOpen(false) ? rtm.openDeletionTime(false) : null; } public DeletionTime currentDeletion() { return deletion == null ? partitionDeletion : deletion; } } }