/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.view;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
Creates the updates to apply to a view given the existing rows in the base table and the updates that we're applying to them (this handles updates on a single partition only). This class is used by passing the updates made to the base table to addBaseTableUpdate
and calling generateViewUpdates
once all updates have been handled to get the resulting view mutations. /**
* Creates the updates to apply to a view given the existing rows in the base
* table and the updates that we're applying to them (this handles updates
* on a single partition only).
*
* This class is used by passing the updates made to the base table to
* {@link #addBaseTableUpdate} and calling {@link #generateViewUpdates} once all updates have
* been handled to get the resulting view mutations.
*/
public class ViewUpdateGenerator
{
private final View view;
private final int nowInSec;
private final CFMetaData baseMetadata;
private final DecoratedKey baseDecoratedKey;
private final ByteBuffer[] basePartitionKey;
private final CFMetaData viewMetadata;
private final boolean baseEnforceStrictLiveness;
private final Map<DecoratedKey, PartitionUpdate> updates = new HashMap<>();
// Reused internally to build a new entry
private final ByteBuffer[] currentViewEntryPartitionKey;
private final Row.Builder currentViewEntryBuilder;
The type of type update action to perform to the view for a given base table
update.
/**
* The type of type update action to perform to the view for a given base table
* update.
*/
private enum UpdateAction
{
NONE, // There was no view entry and none should be added
NEW_ENTRY, // There was no entry but there is one post-update
DELETE_OLD, // There was an entry but there is nothing after update
UPDATE_EXISTING, // There was an entry and the update modifies it
SWITCH_ENTRY // There was an entry and there is still one after update,
// but they are not the same one.
}
Creates a new ViewUpdateBuilder
. Params: - view – the view for which this will be building updates for.
- basePartitionKey – the partition key for the base table partition for which
we'll handle updates for.
- nowInSec – the current time in seconds. Used to decide if data are live or not
and as base reference for new deletions.
/**
* Creates a new {@code ViewUpdateBuilder}.
*
* @param view the view for which this will be building updates for.
* @param basePartitionKey the partition key for the base table partition for which
* we'll handle updates for.
* @param nowInSec the current time in seconds. Used to decide if data are live or not
* and as base reference for new deletions.
*/
public ViewUpdateGenerator(View view, DecoratedKey basePartitionKey, int nowInSec)
{
this.view = view;
this.nowInSec = nowInSec;
this.baseMetadata = view.getDefinition().baseTableMetadata();
this.baseEnforceStrictLiveness = baseMetadata.enforceStrictLiveness();
this.baseDecoratedKey = basePartitionKey;
this.basePartitionKey = extractKeyComponents(basePartitionKey, baseMetadata.getKeyValidator());
this.viewMetadata = view.getDefinition().metadata;
this.currentViewEntryPartitionKey = new ByteBuffer[viewMetadata.partitionKeyColumns().size()];
this.currentViewEntryBuilder = BTreeRow.sortedBuilder();
}
private static ByteBuffer[] extractKeyComponents(DecoratedKey partitionKey, AbstractType<?> type)
{
return type instanceof CompositeType
? ((CompositeType)type).split(partitionKey.getKey())
: new ByteBuffer[]{ partitionKey.getKey() };
}
Adds to this generator the updates to be made to the view given a base table row
before and after an update.
Params: - existingBaseRow – the base table row as it is before an update.
- mergedBaseRow – the base table row after the update is applied (note that
this is not just the new update, but rather the resulting row).
/**
* Adds to this generator the updates to be made to the view given a base table row
* before and after an update.
*
* @param existingBaseRow the base table row as it is before an update.
* @param mergedBaseRow the base table row after the update is applied (note that
* this is not just the new update, but rather the resulting row).
*/
public void addBaseTableUpdate(Row existingBaseRow, Row mergedBaseRow)
{
switch (updateAction(existingBaseRow, mergedBaseRow))
{
case NONE:
return;
case NEW_ENTRY:
createEntry(mergedBaseRow);
return;
case DELETE_OLD:
deleteOldEntry(existingBaseRow, mergedBaseRow);
return;
case UPDATE_EXISTING:
updateEntry(existingBaseRow, mergedBaseRow);
return;
case SWITCH_ENTRY:
createEntry(mergedBaseRow);
deleteOldEntry(existingBaseRow, mergedBaseRow);
return;
}
}
Returns the updates that needs to be done to the view given the base table updates passed to addBaseTableUpdate
. Returns: the updates to do to the view.
/**
* Returns the updates that needs to be done to the view given the base table updates
* passed to {@link #addBaseTableUpdate}.
*
* @return the updates to do to the view.
*/
public Collection<PartitionUpdate> generateViewUpdates()
{
return updates.values();
}
Clears the current state so that the generator may be reused.
/**
* Clears the current state so that the generator may be reused.
*/
public void clear()
{
updates.clear();
}
Compute which type of action needs to be performed to the view for a base table row
before and after an update.
/**
* Compute which type of action needs to be performed to the view for a base table row
* before and after an update.
*/
private UpdateAction updateAction(Row existingBaseRow, Row mergedBaseRow)
{
// Having existing empty is useful, it just means we'll insert a brand new entry for mergedBaseRow,
// but if we have no update at all, we shouldn't get there.
assert !mergedBaseRow.isEmpty();
// Note that none of the base PK columns will differ since we're intrinsically dealing
// with the same base row. So we have to check 3 things:
// 1) that the clustering doesn't have a null, which can happen for compact tables. If that's the case,
// there is no corresponding entries.
// 2) if there is a column not part of the base PK in the view PK, whether it is changed by the update.
// 3) whether mergedBaseRow actually match the view SELECT filter
if (baseMetadata.isCompactTable())
{
Clustering clustering = mergedBaseRow.clustering();
for (int i = 0; i < clustering.size(); i++)
{
if (clustering.get(i) == null)
return UpdateAction.NONE;
}
}
assert view.baseNonPKColumnsInViewPK.size() <= 1 : "We currently only support one base non-PK column in the view PK";
if (view.baseNonPKColumnsInViewPK.isEmpty())
{
// The view entry is necessarily the same pre and post update.
// Note that we allow existingBaseRow to be null and treat it as empty (see MultiViewUpdateBuilder.generateViewsMutations).
boolean existingHasLiveData = existingBaseRow != null && existingBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness);
boolean mergedHasLiveData = mergedBaseRow.hasLiveData(nowInSec, baseEnforceStrictLiveness);
return existingHasLiveData
? (mergedHasLiveData ? UpdateAction.UPDATE_EXISTING : UpdateAction.DELETE_OLD)
: (mergedHasLiveData ? UpdateAction.NEW_ENTRY : UpdateAction.NONE);
}
ColumnDefinition baseColumn = view.baseNonPKColumnsInViewPK.get(0);
assert !baseColumn.isComplex() : "A complex column couldn't be part of the view PK";
Cell before = existingBaseRow == null ? null : existingBaseRow.getCell(baseColumn);
Cell after = mergedBaseRow.getCell(baseColumn);
// If the update didn't modified this column, the cells will be the same object so it's worth checking
if (before == after)
return isLive(before) ? UpdateAction.UPDATE_EXISTING : UpdateAction.NONE;
if (!isLive(before))
return isLive(after) ? UpdateAction.NEW_ENTRY : UpdateAction.NONE;
if (!isLive(after))
{
return UpdateAction.DELETE_OLD;
}
return baseColumn.cellValueType().compare(before.value(), after.value()) == 0
? UpdateAction.UPDATE_EXISTING
: UpdateAction.SWITCH_ENTRY;
}
private boolean matchesViewFilter(Row baseRow)
{
return view.matchesViewFilter(baseDecoratedKey, baseRow, nowInSec);
}
private boolean isLive(Cell cell)
{
return cell != null && cell.isLive(nowInSec);
}
Creates a view entry corresponding to the provided base row.
This method checks that the base row does match the view filter before applying it.
/**
* Creates a view entry corresponding to the provided base row.
* <p>
* This method checks that the base row does match the view filter before applying it.
*/
private void createEntry(Row baseRow)
{
// Before create a new entry, make sure it matches the view filter
if (!matchesViewFilter(baseRow))
return;
startNewUpdate(baseRow);
currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(baseRow));
currentViewEntryBuilder.addRowDeletion(baseRow.deletion());
for (ColumnData data : baseRow)
{
ColumnDefinition viewColumn = view.getViewColumn(data.column());
// If that base table column is not denormalized in the view, we had nothing to do.
// Alose, if it's part of the view PK it's already been taken into account in the clustering.
if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
continue;
addColumnData(viewColumn, data);
}
submitUpdate();
}
Creates the updates to apply to the existing view entry given the base table row before
and after the update, assuming that the update hasn't changed to which view entry the
row correspond (that is, we know the columns composing the view PK haven't changed).
This method checks that the base row (before and after) does match the view filter before
applying anything.
/**
* Creates the updates to apply to the existing view entry given the base table row before
* and after the update, assuming that the update hasn't changed to which view entry the
* row correspond (that is, we know the columns composing the view PK haven't changed).
* <p>
* This method checks that the base row (before and after) does match the view filter before
* applying anything.
*/
private void updateEntry(Row existingBaseRow, Row mergedBaseRow)
{
// While we know existingBaseRow and mergedBaseRow are corresponding to the same view entry,
// they may not match the view filter.
if (!matchesViewFilter(existingBaseRow))
{
createEntry(mergedBaseRow);
return;
}
if (!matchesViewFilter(mergedBaseRow))
{
deleteOldEntryInternal(existingBaseRow, mergedBaseRow);
return;
}
startNewUpdate(mergedBaseRow);
// In theory, it may be the PK liveness and row deletion hasn't been change by the update
// and we could condition the 2 additions below. In practice though, it's as fast (if not
// faster) to compute those info than to check if they have changed so we keep it simple.
currentViewEntryBuilder.addPrimaryKeyLivenessInfo(computeLivenessInfoForEntry(mergedBaseRow));
currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());
addDifferentCells(existingBaseRow, mergedBaseRow);
submitUpdate();
}
private void addDifferentCells(Row existingBaseRow, Row mergedBaseRow)
{
// We only add to the view update the cells from mergedBaseRow that differs from
// existingBaseRow. For that and for speed we can just cell pointer equality: if the update
// hasn't touched a cell, we know it will be the same object in existingBaseRow and
// mergedBaseRow (note that including more cells than we strictly should isn't a problem
// for correction, so even if the code change and pointer equality don't work anymore, it'll
// only a slightly inefficiency which we can fix then).
// Note: we could alternatively use Rows.diff() for this, but because it is a bit more generic
// than what we need here, it's also a bit less efficient (it allocates more in particular),
// and this might be called a lot of time for view updates. So, given that this is not a whole
// lot of code anyway, it's probably doing the diff manually.
PeekingIterator<ColumnData> existingIter = Iterators.peekingIterator(existingBaseRow.iterator());
for (ColumnData mergedData : mergedBaseRow)
{
ColumnDefinition baseColumn = mergedData.column();
ColumnDefinition viewColumn = view.getViewColumn(baseColumn);
// If that base table column is not denormalized in the view, we had nothing to do.
// Alose, if it's part of the view PK it's already been taken into account in the clustering.
if (viewColumn == null || viewColumn.isPrimaryKeyColumn())
continue;
ColumnData existingData = null;
// Find if there is data for that column in the existing row
while (existingIter.hasNext())
{
int cmp = baseColumn.compareTo(existingIter.peek().column());
if (cmp < 0)
break;
ColumnData next = existingIter.next();
if (cmp == 0)
{
existingData = next;
break;
}
}
if (existingData == null)
{
addColumnData(viewColumn, mergedData);
continue;
}
if (mergedData == existingData)
continue;
if (baseColumn.isComplex())
{
ComplexColumnData mergedComplexData = (ComplexColumnData)mergedData;
ComplexColumnData existingComplexData = (ComplexColumnData)existingData;
if (mergedComplexData.complexDeletion().supersedes(existingComplexData.complexDeletion()))
currentViewEntryBuilder.addComplexDeletion(viewColumn, mergedComplexData.complexDeletion());
PeekingIterator<Cell> existingCells = Iterators.peekingIterator(existingComplexData.iterator());
for (Cell mergedCell : mergedComplexData)
{
Cell existingCell = null;
// Find if there is corresponding cell in the existing row
while (existingCells.hasNext())
{
int cmp = baseColumn.cellPathComparator().compare(mergedCell.path(), existingCells.peek().path());
if (cmp > 0)
break;
Cell next = existingCells.next();
if (cmp == 0)
{
existingCell = next;
break;
}
}
if (mergedCell != existingCell)
addCell(viewColumn, mergedCell);
}
}
else
{
// Note that we've already eliminated the case where merged == existing
addCell(viewColumn, (Cell)mergedData);
}
}
}
Deletes the view entry corresponding to the provided base row.
This method checks that the base row does match the view filter before bothering.
/**
* Deletes the view entry corresponding to the provided base row.
* <p>
* This method checks that the base row does match the view filter before bothering.
*/
private void deleteOldEntry(Row existingBaseRow, Row mergedBaseRow)
{
// Before deleting an old entry, make sure it was matching the view filter (otherwise there is nothing to delete)
if (!matchesViewFilter(existingBaseRow))
return;
deleteOldEntryInternal(existingBaseRow, mergedBaseRow);
}
private void deleteOldEntryInternal(Row existingBaseRow, Row mergedBaseRow)
{
startNewUpdate(existingBaseRow);
long timestamp = computeTimestampForEntryDeletion(existingBaseRow, mergedBaseRow);
long rowDeletion = mergedBaseRow.deletion().time().markedForDeleteAt();
assert timestamp >= rowDeletion;
// If computed deletion timestamp greater than row deletion, it must be coming from
// 1. non-pk base column used in view pk, or
// 2. unselected base column
// any case, we need to use it as expired livenessInfo
// If computed deletion timestamp is from row deletion, we only need row deletion itself
if (timestamp > rowDeletion)
{
/**
* We use an expired liveness instead of a row tombstone to allow a shadowed MV
* entry to co-exist with a row tombstone, see ViewComplexTest#testCommutativeRowDeletion.
*
* TODO This is a dirty overload of LivenessInfo and we should modify
* the storage engine to properly support this on CASSANDRA-13826.
*/
LivenessInfo info = LivenessInfo.withExpirationTime(timestamp, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSec);
currentViewEntryBuilder.addPrimaryKeyLivenessInfo(info);
}
currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());
addDifferentCells(existingBaseRow, mergedBaseRow);
submitUpdate();
}
Computes the partition key and clustering for a new view entry, and setup the internal
row builder for the new row.
This assumes that there is corresponding entry, i.e. no values for the partition key and
clustering are null (since we have eliminated that case through updateAction).
/**
* Computes the partition key and clustering for a new view entry, and setup the internal
* row builder for the new row.
*
* This assumes that there is corresponding entry, i.e. no values for the partition key and
* clustering are null (since we have eliminated that case through updateAction).
*/
private void startNewUpdate(Row baseRow)
{
ByteBuffer[] clusteringValues = new ByteBuffer[viewMetadata.clusteringColumns().size()];
for (ColumnDefinition viewColumn : viewMetadata.primaryKeyColumns())
{
ColumnDefinition baseColumn = view.getBaseColumn(viewColumn);
ByteBuffer value = getValueForPK(baseColumn, baseRow);
if (viewColumn.isPartitionKey())
currentViewEntryPartitionKey[viewColumn.position()] = value;
else
clusteringValues[viewColumn.position()] = value;
}
currentViewEntryBuilder.newRow(Clustering.make(clusteringValues));
}
private LivenessInfo computeLivenessInfoForEntry(Row baseRow)
{
/**
* There 3 cases:
* 1. No extra primary key in view and all base columns are selected in MV. all base row's components(livenessInfo,
* deletion, cells) are same as view row. Simply map base components to view row.
* 2. There is a base non-key column used in view pk. This base non-key column determines the liveness of view row. view's row level
* info should based on this column.
* 3. Most tricky case is no extra primary key in view and some base columns are not selected in MV. We cannot use 1 livenessInfo or
* row deletion to represent the liveness of unselected column properly, see CASSANDRA-11500.
* We could make some simplification: the unselected columns will be used only when it affects view row liveness. eg. if view row
* already exists and not expiring, there is no need to use unselected columns.
* Note: if the view row is removed due to unselected column removal(ttl or cell tombstone), we will have problem keeping view
* row alive with a smaller or equal timestamp than the max unselected column timestamp.
*
*/
assert view.baseNonPKColumnsInViewPK.size() <= 1; // This may change, but is currently an enforced limitation
LivenessInfo baseLiveness = baseRow.primaryKeyLivenessInfo();
if (view.hasSamePrimaryKeyColumnsAsBaseTable())
{
if (view.getDefinition().includeAllColumns)
return baseLiveness;
long timestamp = baseLiveness.timestamp();
boolean hasNonExpiringLiveCell = false;
Cell biggestExpirationCell = null;
for (Cell cell : baseRow.cells())
{
if (view.getViewColumn(cell.column()) != null)
continue;
if (!isLive(cell))
continue;
timestamp = Math.max(timestamp, cell.maxTimestamp());
if (!cell.isExpiring())
hasNonExpiringLiveCell = true;
else
{
if (biggestExpirationCell == null)
biggestExpirationCell = cell;
else if (cell.localDeletionTime() > biggestExpirationCell.localDeletionTime())
biggestExpirationCell = cell;
}
}
if (baseLiveness.isLive(nowInSec) && !baseLiveness.isExpiring())
return LivenessInfo.create(timestamp, nowInSec);
if (hasNonExpiringLiveCell)
return LivenessInfo.create(timestamp, nowInSec);
if (biggestExpirationCell == null)
return baseLiveness;
if (biggestExpirationCell.localDeletionTime() > baseLiveness.localExpirationTime()
|| !baseLiveness.isLive(nowInSec))
return LivenessInfo.withExpirationTime(timestamp,
biggestExpirationCell.ttl(),
biggestExpirationCell.localDeletionTime());
return baseLiveness;
}
Cell cell = baseRow.getCell(view.baseNonPKColumnsInViewPK.get(0));
assert isLive(cell) : "We shouldn't have got there if the base row had no associated entry";
return LivenessInfo.withExpirationTime(cell.timestamp(), cell.ttl(), cell.localDeletionTime());
}
private long computeTimestampForEntryDeletion(Row existingBaseRow, Row mergedBaseRow)
{
DeletionTime deletion = mergedBaseRow.deletion().time();
if (view.hasSamePrimaryKeyColumnsAsBaseTable())
{
long timestamp = Math.max(deletion.markedForDeleteAt(), existingBaseRow.primaryKeyLivenessInfo().timestamp());
if (view.getDefinition().includeAllColumns)
return timestamp;
for (Cell cell : existingBaseRow.cells())
{
// selected column should not contribute to view deletion, itself is already included in view row
if (view.getViewColumn(cell.column()) != null)
continue;
// unselected column is used regardless live or dead, because we don't know if it was used for liveness.
timestamp = Math.max(timestamp, cell.maxTimestamp());
}
return timestamp;
}
// has base non-pk column in view pk
Cell before = existingBaseRow.getCell(view.baseNonPKColumnsInViewPK.get(0));
assert isLive(before) : "We shouldn't have got there if the base row had no associated entry";
return deletion.deletes(before) ? deletion.markedForDeleteAt() : before.timestamp();
}
private void addColumnData(ColumnDefinition viewColumn, ColumnData baseTableData)
{
assert viewColumn.isComplex() == baseTableData.column().isComplex();
if (!viewColumn.isComplex())
{
addCell(viewColumn, (Cell)baseTableData);
return;
}
ComplexColumnData complexData = (ComplexColumnData)baseTableData;
currentViewEntryBuilder.addComplexDeletion(viewColumn, complexData.complexDeletion());
for (Cell cell : complexData)
addCell(viewColumn, cell);
}
private void addCell(ColumnDefinition viewColumn, Cell baseTableCell)
{
assert !viewColumn.isPrimaryKeyColumn();
currentViewEntryBuilder.addCell(baseTableCell.withUpdatedColumn(viewColumn));
}
Finish building the currently updated view entry and add it to the other built
updates.
/**
* Finish building the currently updated view entry and add it to the other built
* updates.
*/
private void submitUpdate()
{
Row row = currentViewEntryBuilder.build();
// I'm not sure we can reach there is there is nothing is updated, but adding an empty row breaks things
// and it costs us nothing to be prudent here.
if (row.isEmpty())
return;
DecoratedKey partitionKey = makeCurrentPartitionKey();
PartitionUpdate update = updates.get(partitionKey);
if (update == null)
{
// We can't really know which columns of the view will be updated nor how many row will be updated for this key
// so we rely on hopefully sane defaults.
update = new PartitionUpdate(viewMetadata, partitionKey, viewMetadata.partitionColumns(), 4);
updates.put(partitionKey, update);
}
update.add(row);
}
private DecoratedKey makeCurrentPartitionKey()
{
ByteBuffer rawKey = viewMetadata.partitionKeyColumns().size() == 1
? currentViewEntryPartitionKey[0]
: CompositeType.build(currentViewEntryPartitionKey);
return viewMetadata.decorateKey(rawKey);
}
private ByteBuffer getValueForPK(ColumnDefinition column, Row row)
{
switch (column.kind)
{
case PARTITION_KEY:
return basePartitionKey[column.position()];
case CLUSTERING:
return row.clustering().get(column.position());
default:
// This shouldn't NPE as we shouldn't get there if the value can be null (or there is a bug in updateAction())
return row.getCell(column).value();
}
}
}