/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.service;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.AnticompactionTask;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;

ActiveRepairService is the starting point for manual "active" repairs. Each user triggered repair will correspond to one or multiple repair session, one for each token range to repair. On repair session might repair multiple column families. For each of those column families, the repair session will request merkle trees for each replica of the range being repaired, diff those trees upon receiving them, schedule the streaming ofthe parts to repair (based on the tree diffs) and wait for all those operation. See RepairSession for more details. The creation of a repair session is done through the submitRepairSession that returns a future on the completion of that session.
/** * ActiveRepairService is the starting point for manual "active" repairs. * * Each user triggered repair will correspond to one or multiple repair session, * one for each token range to repair. On repair session might repair multiple * column families. For each of those column families, the repair session will * request merkle trees for each replica of the range being repaired, diff those * trees upon receiving them, schedule the streaming ofthe parts to repair (based on * the tree diffs) and wait for all those operation. See RepairSession for more * details. * * The creation of a repair session is done through the submitRepairSession that * returns a future on the completion of that session. */
public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener {
Deprecated:this statuses are from the previous JMX notification service, which will be deprecated on 4.0. For statuses of the new notification service, see ProgressEvent
/** * @deprecated this statuses are from the previous JMX notification service, * which will be deprecated on 4.0. For statuses of the new notification * service, see {@link org.apache.cassandra.streaming.StreamEvent.ProgressEvent} */
@Deprecated public static enum Status { STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED } private boolean registeredForEndpointChanges = false; public static CassandraVersion SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION = new CassandraVersion("2.2.1"); private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); // singleton enforcement public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance); public static final long UNREPAIRED_SSTABLE = 0;
A map of active coordinator session.
/** * A map of active coordinator session. */
private final ConcurrentMap<UUID, RepairSession> sessions = new ConcurrentHashMap<>(); private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<>(); private final IFailureDetector failureDetector; private final Gossiper gossiper; public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper) { this.failureDetector = failureDetector; this.gossiper = gossiper; }
Requests repairs for the given keyspace and column families.
Returns:Future for asynchronous call or null if there is no need to repair
/** * Requests repairs for the given keyspace and column families. * * @return Future for asynchronous call or null if there is no need to repair */
public RepairSession submitRepairSession(UUID parentRepairSession, Collection<Range<Token>> range, String keyspace, RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, boolean pullRepair, ListeningExecutorService executor, String... cfnames) { if (endpoints.isEmpty()) return null; if (cfnames.length == 0) return null; final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames); sessions.put(session.getId(), session); // register listeners registerOnFdAndGossip(session); // remove session at completion session.addListener(new Runnable() {
When repair finished, do clean up
/** * When repair finished, do clean up */
public void run() { sessions.remove(session.getId()); } }, MoreExecutors.directExecutor()); session.start(executor); return session; } private <T extends AbstractFuture & IEndpointStateChangeSubscriber & IFailureDetectionEventListener> void registerOnFdAndGossip(final T task) { gossiper.register(task); failureDetector.registerFailureDetectionEventListener(task); // unregister listeners at completion task.addListener(new Runnable() {
When repair finished, do clean up
/** * When repair finished, do clean up */
public void run() { failureDetector.unregisterFailureDetectionEventListener(task); gossiper.unregister(task); } }, MoreExecutors.sameThreadExecutor()); } public synchronized void terminateSessions() { Throwable cause = new IOException("Terminate session is called"); for (RepairSession session : sessions.values()) { session.forceShutdown(cause); } parentRepairSessions.clear(); }
Return all of the neighbors with whom we share the provided range.
Params:
  • keyspaceName – keyspace to repair
  • keyspaceLocalRanges – local-range for given keyspaceName
  • toRepair – token to repair
  • dataCenters – the data centers to involve in the repair
Returns:neighbors with whom we share the provided range
/** * Return all of the neighbors with whom we share the provided range. * * @param keyspaceName keyspace to repair * @param keyspaceLocalRanges local-range for given keyspaceName * @param toRepair token to repair * @param dataCenters the data centers to involve in the repair * * @return neighbors with whom we share the provided range */
public static Set<InetAddress> getNeighbors(String keyspaceName, Collection<Range<Token>> keyspaceLocalRanges, Range<Token> toRepair, Collection<String> dataCenters, Collection<String> hosts) { StorageService ss = StorageService.instance; Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(keyspaceName); Range<Token> rangeSuperSet = null; for (Range<Token> range : keyspaceLocalRanges) { if (range.contains(toRepair)) { rangeSuperSet = range; break; } else if (range.intersects(toRepair)) { throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) " + "but is not fully contained in one; this would lead to " + "imprecise repair. keyspace: %s", toRepair.toString(), range.toString(), keyspaceName)); } } if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) return Collections.emptySet(); Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet)); neighbors.remove(FBUtilities.getBroadcastAddress()); if (dataCenters != null && !dataCenters.isEmpty()) { TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology(); Set<InetAddress> dcEndpoints = Sets.newHashSet(); Multimap<String,InetAddress> dcEndpointsMap = topology.getDatacenterEndpoints(); for (String dc : dataCenters) { Collection<InetAddress> c = dcEndpointsMap.get(dc); if (c != null) dcEndpoints.addAll(c); } return Sets.intersection(neighbors, dcEndpoints); } else if (hosts != null && !hosts.isEmpty()) { Set<InetAddress> specifiedHost = new HashSet<>(); for (final String host : hosts) { try { final InetAddress endpoint = InetAddress.getByName(host.trim()); if (endpoint.equals(FBUtilities.getBroadcastAddress()) || neighbors.contains(endpoint)) specifiedHost.add(endpoint); } catch (UnknownHostException e) { throw new IllegalArgumentException("Unknown host specified " + host, e); } } if (!specifiedHost.contains(FBUtilities.getBroadcastAddress())) throw new IllegalArgumentException("The current host must be part of the repair"); if (specifiedHost.size() <= 1) { String msg = "Specified hosts %s do not share range %s needed for repair. Either restrict repair ranges " + "with -st/-et options, or specify one of the neighbors that share this range with " + "this node: %s."; throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors)); } specifiedHost.remove(FBUtilities.getBroadcastAddress()); return specifiedHost; } return neighbors; } public UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores) { long timestamp = Clock.instance.currentTimeMillis(); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); IAsyncCallbackWithFailure callback = new IAsyncCallbackWithFailure() { public void response(MessageIn msg) { prepareLatch.countDown(); } public boolean isLatencyForSnitch() { return false; } public void onFailure(InetAddress from, RequestFailureReason failureReason) { status.set(false); failedNodes.add(from.getHostAddress()); prepareLatch.countDown(); } }; List<UUID> cfIds = new ArrayList<>(columnFamilyStores.size()); for (ColumnFamilyStore cfs : columnFamilyStores) cfIds.add(cfs.metadata.cfId); for (InetAddress neighbour : endpoints) { if (FailureDetector.instance.isAlive(neighbour)) { PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal()); MessageOut<RepairMessage> msg = message.createMessage(); MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); } else { // bailout early to avoid potentially waiting for a long time. failRepair(parentRepairSession, "Endpoint not alive: " + neighbour); } } try { // Failed repair is expensive so we wait for longer time. if (!prepareLatch.await(1, TimeUnit.HOURS)) { failRepair(parentRepairSession, "Did not get replies from all endpoints."); } } catch (InterruptedException e) { failRepair(parentRepairSession, "Interrupted while waiting for prepare repair response."); } if (!status.get()) { failRepair(parentRepairSession, "Got negative replies from endpoints " + failedNodes); } return parentRepairSession; } private void failRepair(UUID parentRepairSession, String errorMsg) { removeParentRepairSession(parentRepairSession); throw new RuntimeException(errorMsg); } public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long timestamp, boolean isGlobal) { if (!registeredForEndpointChanges) { Gossiper.instance.register(this); FailureDetector.instance.registerFailureDetectionEventListener(this); registeredForEndpointChanges = true; } if (!parentRepairSessions.containsKey(parentRepairSession)) { parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, timestamp, isGlobal)); } } public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession) { Set<SSTableReader> repairing = new HashSet<>(); for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet()) { Collection<SSTableReader> sstables = entry.getValue().getActiveSSTables(cfId); if (sstables != null && !entry.getKey().equals(parentRepairSession)) repairing.addAll(sstables); } return repairing; }
Run final process of repair. This removes all resources held by parent repair session, after performing anti compaction if necessary.
Params:
  • parentSession – Parent session ID
  • neighbors – Repair participants (not including self)
  • successfulRanges – Ranges that repaired successfully
/** * Run final process of repair. * This removes all resources held by parent repair session, after performing anti compaction if necessary. * * @param parentSession Parent session ID * @param neighbors Repair participants (not including self) * @param successfulRanges Ranges that repaired successfully */
public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) { List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1); for (InetAddress neighbor : neighbors) { AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges); registerOnFdAndGossip(task); tasks.add(task); task.run(); // 'run' is just sending message } tasks.add(doAntiCompaction(parentSession, successfulRanges)); return Futures.successfulAsList(tasks); } public ParentRepairSession getParentRepairSession(UUID parentSessionId) { ParentRepairSession session = parentRepairSessions.get(parentSessionId); // this can happen if a node thinks that the coordinator was down, but that coordinator got back before noticing // that it was down itself. if (session == null) throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed."); return session; }
called when the repair session is done - either failed or anticompaction has completed clears out any snapshots created by this repair
Params:
  • parentSessionId –
Returns:
/** * called when the repair session is done - either failed or anticompaction has completed * * clears out any snapshots created by this repair * * @param parentSessionId * @return */
public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId) { String snapshotName = parentSessionId.toString(); for (ColumnFamilyStore cfs : getParentRepairSession(parentSessionId).columnFamilyStores.values()) { if (cfs.snapshotExists(snapshotName)) cfs.clearSnapshot(snapshotName); } return parentRepairSessions.remove(parentSessionId); }
Submit anti-compaction jobs to CompactionManager. When all jobs are done, parent repair session is removed whether those are suceeded or not.
Params:
  • parentRepairSession – parent repair session ID
Returns:Future result of all anti-compaction jobs.
/** * Submit anti-compaction jobs to CompactionManager. * When all jobs are done, parent repair session is removed whether those are suceeded or not. * * @param parentRepairSession parent repair session ID * @return Future result of all anti-compaction jobs. */
@SuppressWarnings("resource") public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession, Collection<Range<Token>> successfulRanges) { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); //A repair will be marked as not global if it is a subrange repair to avoid many small anti-compactions //in addition to other scenarios such as repairs not involving all DCs or hosts if (!prs.isGlobal) { logger.info("[repair #{}] Not a global repair, will not do anticompaction", parentRepairSession); removeParentRepairSession(parentRepairSession); return Futures.immediateFuture(Collections.emptyList()); } assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List<ListenableFuture<?>> futures = new ArrayList<>(); // if we don't have successful repair ranges, then just skip anticompaction if (!successfulRanges.isEmpty()) { for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey(), parentRepairSession); ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue(); futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt, parentRepairSession)); } } ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures); allAntiCompactionResults.addListener(new Runnable() { @Override public void run() { removeParentRepairSession(parentRepairSession); } }, MoreExecutors.directExecutor()); return allAntiCompactionResults; } public void handleMessage(InetAddress endpoint, RepairMessage message) { RepairJobDesc desc = message.desc; RepairSession session = sessions.get(desc.sessionId); if (session == null) return; switch (message.messageType) { case VALIDATION_COMPLETE: ValidationComplete validation = (ValidationComplete) message; session.validationComplete(desc, endpoint, validation.trees); break; case SYNC_COMPLETE: // one of replica is synced. SyncComplete sync = (SyncComplete) message; session.syncComplete(desc, sync.nodes, sync.success); break; default: break; } }
We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables 768 times, instead we take all repaired ranges at the end of the repair and anticompact once. We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables, and when it is time for anticompaction we will only anticompact the sstables that are still on disk. Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair.
/** * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once. * * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables, * and when it is time for anticompaction we will only anticompact the sstables that are still on disk. * * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair. */
public static class ParentRepairSession { private final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>(); private final Collection<Range<Token>> ranges; public final Map<UUID, Set<String>> sstableMap = new HashMap<>(); public final boolean isIncremental; public final boolean isGlobal; public final long repairedAt; public final InetAddress coordinator;
Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
/** * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession */
private final Set<UUID> marked = new HashSet<>(); public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) { this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) { this.columnFamilyStores.put(cfs.metadata.cfId, cfs); sstableMap.put(cfs.metadata.cfId, new HashSet<String>()); } this.ranges = ranges; this.repairedAt = repairedAt; this.isIncremental = isIncremental; this.isGlobal = isGlobal; }
Mark sstables repairing - either all sstables or only the unrepaired ones depending on whether this is an incremental or full repair
Params:
  • cfId – the column family
  • parentSessionId – the parent repair session id, used to make sure we don't start multiple repairs over the same sstables
/** * Mark sstables repairing - either all sstables or only the unrepaired ones depending on * * whether this is an incremental or full repair * * @param cfId the column family * @param parentSessionId the parent repair session id, used to make sure we don't start multiple repairs over the same sstables */
public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId) { if (!marked.contains(cfId)) { List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(View.select(SSTableSet.CANONICAL, (s) -> !isIncremental || !s.isRepaired())).sstables; Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty()) { logger.error("Cannot start multiple repair sessions over the same sstables"); throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); } addSSTables(cfId, sstables); marked.add(cfId); } }
Get the still active sstables we should run anticompaction on note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this to know which sstables are still there that were there when we started the repair
Params:
  • cfId –
  • parentSessionId – for checking if there exists a snapshot for this repair
Returns:
/** * Get the still active sstables we should run anticompaction on * * note that validation and streaming do not call this method - they have to work on the actual active sstables on the node, we only call this * to know which sstables are still there that were there when we started the repair * * @param cfId * @param parentSessionId for checking if there exists a snapshot for this repair * @return */
@SuppressWarnings("resource") public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId, UUID parentSessionId) { assert marked.contains(cfId); if (!columnFamilyStores.containsKey(cfId)) throw new RuntimeException("Not possible to get sstables for anticompaction for " + cfId); boolean isSnapshotRepair = columnFamilyStores.get(cfId).snapshotExists(parentSessionId.toString()); ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder(); Iterable<SSTableReader> sstables = isSnapshotRepair ? getSSTablesForSnapshotRepair(cfId, parentSessionId) : getActiveSSTables(cfId); // we check this above - if columnFamilyStores contains the cfId sstables will not be null assert sstables != null; for (SSTableReader sstable : sstables) { Ref<SSTableReader> ref = sstable.tryRef(); if (ref == null) sstableMap.get(cfId).remove(sstable.getFilename()); else references.put(sstable, ref); } return new Refs<>(references.build()); }
If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the actual filename.
Params:
  • cfId –
  • parentSessionId –
Returns:
/** * If we are running a snapshot repair we need to find the 'real' sstables when we start anticompaction * * We use the generation of the sstables as identifiers instead of the file name to avoid having to parse out the * actual filename. * * @param cfId * @param parentSessionId * @return */
private Set<SSTableReader> getSSTablesForSnapshotRepair(UUID cfId, UUID parentSessionId) { Set<SSTableReader> activeSSTables = new HashSet<>(); ColumnFamilyStore cfs = columnFamilyStores.get(cfId); if (cfs == null) return null; Set<Integer> snapshotGenerations = new HashSet<>(); try (Refs<SSTableReader> snapshottedSSTables = cfs.getSnapshotSSTableReader(parentSessionId.toString())) { for (SSTableReader sstable : snapshottedSSTables) { snapshotGenerations.add(sstable.descriptor.generation); } } catch (IOException e) { throw new RuntimeException(e); } for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) if (snapshotGenerations.contains(sstable.descriptor.generation)) activeSSTables.add(sstable); return activeSSTables; } public synchronized void maybeSnapshot(UUID cfId, UUID parentSessionId) { String snapshotName = parentSessionId.toString(); if (!columnFamilyStores.get(cfId).snapshotExists(snapshotName)) { Set<SSTableReader> snapshottedSSTables = columnFamilyStores.get(cfId).snapshot(snapshotName, new Predicate<SSTableReader>() { public boolean apply(SSTableReader sstable) { return sstable != null && (!isIncremental || !sstable.isRepaired()) && !(sstable.metadata.isIndex()) && // exclude SSTables from 2i new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges); } }, true, false); if (isAlreadyRepairing(cfId, parentSessionId, snapshottedSSTables)) { columnFamilyStores.get(cfId).clearSnapshot(parentSessionId.toString()); logger.error("Cannot start multiple repair sessions over the same sstables"); throw new RuntimeException("Cannot start multiple repair sessions over the same sstables"); } addSSTables(cfId, snapshottedSSTables); marked.add(cfId); } }
Compares other repairing sstables *generation* to the ones we just snapshotted we compare generations since the sstables have different paths due to snapshot names
Params:
  • cfId – id of the column family store
  • parentSessionId – parent repair session
  • sstables – the newly snapshotted sstables
Returns:
/** * Compares other repairing sstables *generation* to the ones we just snapshotted * * we compare generations since the sstables have different paths due to snapshot names * * @param cfId id of the column family store * @param parentSessionId parent repair session * @param sstables the newly snapshotted sstables * @return */
private boolean isAlreadyRepairing(UUID cfId, UUID parentSessionId, Collection<SSTableReader> sstables) { Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId); Set<Integer> currentlyRepairingGenerations = new HashSet<>(); Set<Integer> newRepairingGenerations = new HashSet<>(); for (SSTableReader sstable : currentlyRepairing) currentlyRepairingGenerations.add(sstable.descriptor.generation); for (SSTableReader sstable : sstables) newRepairingGenerations.add(sstable.descriptor.generation); return !Sets.intersection(currentlyRepairingGenerations, newRepairingGenerations).isEmpty(); } private Set<SSTableReader> getActiveSSTables(UUID cfId) { if (!columnFamilyStores.containsKey(cfId)) return null; Set<String> repairedSSTables = sstableMap.get(cfId); Set<SSTableReader> activeSSTables = new HashSet<>(); Set<String> activeSSTableNames = new HashSet<>(); ColumnFamilyStore cfs = columnFamilyStores.get(cfId); for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (repairedSSTables.contains(sstable.getFilename())) { activeSSTables.add(sstable); activeSSTableNames.add(sstable.getFilename()); } } sstableMap.put(cfId, activeSSTableNames); return activeSSTables; } private void addSSTables(UUID cfId, Collection<SSTableReader> sstables) { for (SSTableReader sstable : sstables) sstableMap.get(cfId).add(sstable.getFilename()); } public long getRepairedAt() { if (isGlobal) return repairedAt; return ActiveRepairService.UNREPAIRED_SSTABLE; } @Override public String toString() { return "ParentRepairSession{" + "columnFamilyStores=" + columnFamilyStores + ", ranges=" + ranges + ", sstableMap=" + sstableMap + ", repairedAt=" + repairedAt + '}'; } } /* If the coordinator node dies we should remove the parent repair session from the other nodes. This uses the same notifications as we get in RepairSession */ public void onJoin(InetAddress endpoint, EndpointState epState) {} public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} public void onAlive(InetAddress endpoint, EndpointState state) {} public void onDead(InetAddress endpoint, EndpointState state) {} public void onRemove(InetAddress endpoint) { convict(endpoint, Double.MAX_VALUE); } public void onRestart(InetAddress endpoint, EndpointState state) { convict(endpoint, Double.MAX_VALUE); }
Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed. The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree that the repair failed, we need to fail the entire repair session
Params:
  • ep – endpoint to be convicted
  • phi – the value of phi with with ep was convicted
/** * Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed. * * The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree * that the repair failed, we need to fail the entire repair session * * @param ep endpoint to be convicted * @param phi the value of phi with with ep was convicted */
public void convict(InetAddress ep, double phi) { // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty()) return; Set<UUID> toRemove = new HashSet<>(); for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : parentRepairSessions.entrySet()) { if (repairSessionEntry.getValue().coordinator.equals(ep)) { toRemove.add(repairSessionEntry.getKey()); } } if (!toRemove.isEmpty()) { logger.debug("Removing {} in parent repair sessions", toRemove); for (UUID id : toRemove) removeParentRepairSession(id); } } }