/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.repair;

import java.net.InetAddress;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationComplete;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTree.RowHash;
import org.apache.cassandra.utils.MerkleTrees;

Handles the building of a merkle tree for a column family. Lifecycle: 1. prepare() - Initialize tree with samples. 2. add() - 0 or more times, to add hashes to the tree. 3. complete() - Enqueues any operations that were blocked waiting for a valid tree.
/** * Handles the building of a merkle tree for a column family. * * Lifecycle: * 1. prepare() - Initialize tree with samples. * 2. add() - 0 or more times, to add hashes to the tree. * 3. complete() - Enqueues any operations that were blocked waiting for a valid tree. */
public class Validator implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Validator.class); public final RepairJobDesc desc; public final InetAddress initiator; public final int gcBefore; private final boolean evenTreeDistribution; // null when all rows with the min token have been consumed private long validated; private MerkleTrees trees; // current range being updated private MerkleTree.TreeRange range; // iterator for iterating sub ranges (MT's leaves) private MerkleTrees.TreeRangeIterator ranges; // last key seen private DecoratedKey lastKey; public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore) { this(desc, initiator, gcBefore, false); } public Validator(RepairJobDesc desc, InetAddress initiator, int gcBefore, boolean evenTreeDistribution) { this.desc = desc; this.initiator = initiator; this.gcBefore = gcBefore; validated = 0; range = null; ranges = null; this.evenTreeDistribution = evenTreeDistribution; } public void prepare(ColumnFamilyStore cfs, MerkleTrees tree) { this.trees = tree; if (!tree.partitioner().preservesOrder() || evenTreeDistribution) { // You can't beat an even tree distribution for md5 tree.init(); } else { List<DecoratedKey> keys = new ArrayList<>(); Random random = new Random(); for (Range<Token> range : tree.ranges()) { for (DecoratedKey sample : cfs.keySamples(range)) { assert range.contains(sample.getToken()) : "Token " + sample.getToken() + " is not within range " + desc.ranges; keys.add(sample); } if (keys.isEmpty()) { // use an even tree distribution tree.init(range); } else { int numKeys = keys.size(); // sample the column family using random keys from the index while (true) { DecoratedKey dk = keys.get(random.nextInt(numKeys)); if (!tree.split(dk.getToken())) break; } keys.clear(); } } } logger.debug("Prepared AEService trees of size {} for {}", trees.size(), desc); ranges = tree.invalids(); }
Called (in order) for every row present in the CF. Hashes the row, and adds it to the tree being built.
Params:
  • partition – Partition to add hash
/** * Called (in order) for every row present in the CF. * Hashes the row, and adds it to the tree being built. * * @param partition Partition to add hash */
public void add(UnfilteredRowIterator partition) { assert Range.isInRanges(partition.partitionKey().getToken(), desc.ranges) : partition.partitionKey().getToken() + " is not contained in " + desc.ranges; assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0 : "partition " + partition.partitionKey() + " received out of order wrt " + lastKey; lastKey = partition.partitionKey(); if (range == null) range = ranges.next(); // generate new ranges as long as case 1 is true if (!findCorrectRange(lastKey.getToken())) { // add the empty hash, and move to the next range ranges = trees.invalids(); findCorrectRange(lastKey.getToken()); } assert range.contains(lastKey.getToken()) : "Token not in MerkleTree: " + lastKey.getToken(); // case 3 must be true: mix in the hashed row RowHash rowHash = rowHash(partition); if (rowHash != null) { range.addHash(rowHash); } } public boolean findCorrectRange(Token t) { while (!range.contains(t) && ranges.hasNext()) { range = ranges.next(); } return range.contains(t); } static class CountingDigest extends MessageDigest { private long count; private MessageDigest underlying; public CountingDigest(MessageDigest underlying) { super(underlying.getAlgorithm()); this.underlying = underlying; } @Override protected void engineUpdate(byte input) { underlying.update(input); count += 1; } @Override protected void engineUpdate(byte[] input, int offset, int len) { underlying.update(input, offset, len); count += len; } @Override protected byte[] engineDigest() { return underlying.digest(); } @Override protected void engineReset() { underlying.reset(); } } private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition) { validated++; // MerkleTree uses XOR internally, so we want lots of output bits here CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256")); UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 return digest.count > 0 ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count) : null; }
Registers the newly created tree for rendezvous in Stage.ANTIENTROPY.
/** * Registers the newly created tree for rendezvous in Stage.ANTIENTROPY. */
public void complete() { completeTree(); StageManager.getStage(Stage.ANTI_ENTROPY).execute(this); if (logger.isDebugEnabled()) { // log distribution of rows in tree logger.debug("Validated {} partitions for {}. Partitions per leaf are:", validated, desc.sessionId); trees.logRowCountPerLeaf(logger); logger.debug("Validated {} partitions for {}. Partition sizes are:", validated, desc.sessionId); trees.logRowSizePerLeaf(logger); } } @VisibleForTesting public void completeTree() { assert ranges != null : "Validator was not prepared()"; ranges = trees.invalids(); while (ranges.hasNext()) { range = ranges.next(); range.ensureHashInitialised(); } }
Called when some error during the validation happened. This sends RepairStatus to inform the initiator that the validation has failed. The actual reason for failure should be looked up in the log of the host calling this function.
/** * Called when some error during the validation happened. * This sends RepairStatus to inform the initiator that the validation has failed. * The actual reason for failure should be looked up in the log of the host calling this function. */
public void fail() { logger.error("Failed creating a merkle tree for {}, {} (see log for details)", desc, initiator); // send fail message only to nodes >= version 2.0 MessagingService.instance().sendOneWay(new ValidationComplete(desc).createMessage(), initiator); }
Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY.
/** * Called after the validation lifecycle to respond with the now valid tree. Runs in Stage.ANTIENTROPY. */
public void run() { // respond to the request that triggered this validation if (!initiator.equals(FBUtilities.getBroadcastAddress())) { logger.info("[repair #{}] Sending completed merkle tree to {} for {}.{}", desc.sessionId, initiator, desc.keyspace, desc.columnFamily); Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily); } MessagingService.instance().sendOneWay(new ValidationComplete(desc, trees).createMessage(), initiator); } }