/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.compaction;


import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;

import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.service.ActiveRepairService;

Manages the compaction strategies. Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for unrepaired data. This is done to be able to totally separate the different sets of sstables. Operations on this class are guarded by a ReentrantReadWriteLock. This lock performs mutual exclusion on reads and writes to the following variables: this#repaired, this#unrepaired, this#isActive, this#params, this#currentBoundaries. Whenever performing reads on these variables, the this#readLock should be acquired. Likewise, updates to these variables should be guarded by this#writeLock. Whenever the DiskBoundaries change, the compaction strategies must be reloaded, so in order to ensure the compaction strategy placement reflect most up-to-date disk boundaries, call this#maybeReloadDiskBoundaries() before acquiring the read lock to acess the strategies.
/** * Manages the compaction strategies. * * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for * unrepaired data. This is done to be able to totally separate the different sets of sstables. * * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive}, * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables, * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by * {@link this#writeLock}. * * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()} * before acquiring the read lock to acess the strategies. * */
public class CompactionStrategyManager implements INotificationConsumer { private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); public final CompactionLogger compactionLogger; private final ColumnFamilyStore cfs; private final boolean partitionSSTablesByTokenRange; private final Supplier<DiskBoundaries> boundariesSupplier;
Performs mutual exclusion on the variables below
/** * Performs mutual exclusion on the variables below */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
Variables guarded by read and write lock above
/** * Variables guarded by read and write lock above */
//TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object private final List<AbstractCompactionStrategy> repaired = new ArrayList<>(); private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>(); private volatile CompactionParams params; private DiskBoundaries currentBoundaries; private volatile boolean enabled = true; private volatile boolean isActive = true;
We keep a copy of the schema compaction parameters here to be able to decide if we should update the compaction strategy in this#maybeReload(CFMetaData) due to an ALTER. If a user changes the local compaction strategy and then later ALTERs a compaction parameter, we will use the new compaction parameters.
/** We keep a copy of the schema compaction parameters here to be able to decide if we should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER. If a user changes the local compaction strategy and then later ALTERs a compaction parameter, we will use the new compaction parameters. **/
private volatile CompactionParams schemaCompactionParams; private boolean shouldDefragment; private boolean supportsEarlyOpen; private int fanout; public CompactionStrategyManager(ColumnFamilyStore cfs) { this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent()); } @VisibleForTesting public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier, boolean partitionSSTablesByTokenRange) { cfs.getTracker().subscribe(this); logger.trace("{} subscribed to the data tracker.", this); this.cfs = cfs; this.compactionLogger = new CompactionLogger(cfs, this); this.boundariesSupplier = boundariesSupplier; this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange; params = cfs.metadata.params.compaction; enabled = params.isEnabled(); reload(cfs.metadata.params.compaction); }
Return the next background task Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
/** * Return the next background task * * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks) * */
public AbstractCompactionTask getNextBackgroundTask(int gcBefore) { maybeReloadDiskBoundaries(); readLock.lock(); try { if (!isEnabled()) return null; List<AbstractCompactionStrategy> strategies = new ArrayList<>(); strategies.addAll(repaired); strategies.addAll(unrepaired); Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks())); for (AbstractCompactionStrategy strategy : strategies) { AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore); if (task != null) return task; } } finally { readLock.unlock(); } return null; } public boolean isEnabled() { return enabled && isActive; } public boolean isActive() { return isActive; } public void resume() { writeLock.lock(); try { isActive = true; } finally { writeLock.unlock(); } }
pause compaction while we cancel all ongoing compactions Separate call from enable/disable to not have to save the enabled-state externally
/** * pause compaction while we cancel all ongoing compactions * * Separate call from enable/disable to not have to save the enabled-state externally */
public void pause() { writeLock.lock(); try { isActive = false; } finally { writeLock.unlock(); } } private void startup() { writeLock.lock(); try { for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) { if (sstable.openReason != SSTableReader.OpenReason.EARLY) compactionStrategyFor(sstable).addSSTable(sstable); } repaired.forEach(AbstractCompactionStrategy::startup); unrepaired.forEach(AbstractCompactionStrategy::startup); shouldDefragment = repaired.get(0).shouldDefragment(); supportsEarlyOpen = repaired.get(0).supportsEarlyOpen(); fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE; } finally { writeLock.unlock(); } repaired.forEach(AbstractCompactionStrategy::startup); unrepaired.forEach(AbstractCompactionStrategy::startup); if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll)) compactionLogger.enable(); }
return the compaction strategy for the given sstable returns differently based on the repaired status and which vnode the compaction strategy belongs to
Params:
  • sstable –
Returns:
/** * return the compaction strategy for the given sstable * * returns differently based on the repaired status and which vnode the compaction strategy belongs to * @param sstable * @return */
protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) { maybeReloadDiskBoundaries(); return compactionStrategyFor(sstable); } @VisibleForTesting protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable) { // should not call maybeReloadDiskBoundaries because it may be called from within lock readLock.lock(); try { int index = compactionStrategyIndexFor(sstable); if (sstable.isRepaired()) return repaired.get(index); else return unrepaired.get(index); } finally { readLock.unlock(); } }
Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we will add it to that compaction strategy. In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out sstables in the correct locations and give them to the correct compaction strategy instance.
Params:
  • sstable –
Returns:
/** * Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we * will add it to that compaction strategy. * * In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out * sstables in the correct locations and give them to the correct compaction strategy instance. * * @param sstable * @return */
@VisibleForTesting protected int compactionStrategyIndexFor(SSTableReader sstable) { // should not call maybeReload because it may be called from within lock readLock.lock(); try { //We only have a single compaction strategy when sstables are not //partitioned by token range if (!partitionSSTablesByTokenRange) return 0; return currentBoundaries.getDiskIndex(sstable); } finally { readLock.unlock(); } } public void shutdown() { writeLock.lock(); try { isActive = false; repaired.forEach(AbstractCompactionStrategy::shutdown); unrepaired.forEach(AbstractCompactionStrategy::shutdown); compactionLogger.disable(); } finally { writeLock.unlock(); } } public void maybeReload(CFMetaData metadata) { // compare the old schema configuration to the new one, ignore any locally set changes. if (metadata.params.compaction.equals(schemaCompactionParams)) return; writeLock.lock(); try { // compare the old schema configuration to the new one, ignore any locally set changes. if (metadata.params.compaction.equals(schemaCompactionParams)) return; reload(metadata.params.compaction); } finally { writeLock.unlock(); } }
Checks if the disk boundaries changed and reloads the compaction strategies to reflect the most up-to-date disk boundaries. This is typically called before acquiring the this#readLock to ensure the most up-to-date disk locations and boundaries are used. This should *never* be called inside by a thread holding the this#readLock, since it will potentially acquire the this#writeLock to update the compaction strategies what can cause a deadlock.
/** * Checks if the disk boundaries changed and reloads the compaction strategies * to reflect the most up-to-date disk boundaries. * * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date * disk locations and boundaries are used. * * This should *never* be called inside by a thread holding the {@link this#readLock}, since it * will potentially acquire the {@link this#writeLock} to update the compaction strategies * what can cause a deadlock. */
//TODO improve this to reload after receiving a notification rather than trying to reload on every operation @VisibleForTesting protected boolean maybeReloadDiskBoundaries() { if (!currentBoundaries.isOutOfDate()) return false; writeLock.lock(); try { if (!currentBoundaries.isOutOfDate()) return false; reload(params); return true; } finally { writeLock.unlock(); } }
Reload the compaction strategies Called after changing configuration and at startup.
Params:
  • newCompactionParams –
/** * Reload the compaction strategies * * Called after changing configuration and at startup. * @param newCompactionParams */
private void reload(CompactionParams newCompactionParams) { boolean enabledWithJMX = enabled && !shouldBeEnabled(); boolean disabledWithJMX = !enabled && shouldBeEnabled(); if (currentBoundaries != null) { if (!newCompactionParams.equals(schemaCompactionParams)) logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName()); else if (currentBoundaries.isOutOfDate()) logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName()); } if (currentBoundaries == null || currentBoundaries.isOutOfDate()) currentBoundaries = boundariesSupplier.get(); setStrategy(newCompactionParams); schemaCompactionParams = cfs.metadata.params.compaction; if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX) disable(); else enable(); startup(); } public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { cfs.getTracker().replaceFlushed(memtable, sstables); if (sstables != null && !sstables.isEmpty()) CompactionManager.instance.submitBackground(cfs); } public int getUnleveledSSTables() { maybeReloadDiskBoundaries(); readLock.lock(); try { if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) { int count = 0; for (AbstractCompactionStrategy strategy : repaired) count += ((LeveledCompactionStrategy) strategy).getLevelSize(0); for (AbstractCompactionStrategy strategy : unrepaired) count += ((LeveledCompactionStrategy) strategy).getLevelSize(0); return count; } } finally { readLock.unlock(); } return 0; } public int getLevelFanoutSize() { return fanout; } public int[] getSSTableCountPerLevel() { maybeReloadDiskBoundaries(); readLock.lock(); try { if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy) { int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; for (AbstractCompactionStrategy strategy : repaired) { int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); res = sumArrays(res, repairedCountPerLevel); } for (AbstractCompactionStrategy strategy : unrepaired) { int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize(); res = sumArrays(res, unrepairedCountPerLevel); } return res; } } finally { readLock.unlock(); } return null; } private static int[] sumArrays(int[] a, int[] b) { int[] res = new int[Math.max(a.length, b.length)]; for (int i = 0; i < res.length; i++) { if (i < a.length && i < b.length) res[i] = a[i] + b[i]; else if (i < a.length) res[i] = a[i]; else res[i] = b[i]; } return res; } public boolean shouldDefragment() { return shouldDefragment; } public Directories getDirectories() { maybeReloadDiskBoundaries(); readLock.lock(); try { assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); return repaired.get(0).getDirectories(); } finally { readLock.unlock(); } } private void handleFlushNotification(Iterable<SSTableReader> added) { // If reloaded, SSTables will be placed in their correct locations // so there is no need to process notification if (maybeReloadDiskBoundaries()) return; readLock.lock(); try { for (SSTableReader sstable : added) compactionStrategyFor(sstable).addSSTable(sstable); } finally { readLock.unlock(); } } private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) { // If reloaded, SSTables will be placed in their correct locations // so there is no need to process notification if (maybeReloadDiskBoundaries()) return; readLock.lock(); try { // a bit of gymnastics to be able to replace sstables in compaction strategies // we use this to know that a compaction finished and where to start the next compaction in LCS int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1; List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); for (int i = 0; i < locationSize; i++) { repairedRemoved.add(new HashSet<>()); repairedAdded.add(new HashSet<>()); unrepairedRemoved.add(new HashSet<>()); unrepairedAdded.add(new HashSet<>()); } for (SSTableReader sstable : removed) { int i = compactionStrategyIndexFor(sstable); if (sstable.isRepaired()) repairedRemoved.get(i).add(sstable); else unrepairedRemoved.get(i).add(sstable); } for (SSTableReader sstable : added) { int i = compactionStrategyIndexFor(sstable); if (sstable.isRepaired()) repairedAdded.get(i).add(sstable); else unrepairedAdded.get(i).add(sstable); } for (int i = 0; i < locationSize; i++) { if (!repairedRemoved.get(i).isEmpty()) repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i)); else repaired.get(i).addSSTables(repairedAdded.get(i)); if (!unrepairedRemoved.get(i).isEmpty()) unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i)); else unrepaired.get(i).addSSTables(unrepairedAdded.get(i)); } } finally { readLock.unlock(); } } private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables) { // If reloaded, SSTables will be placed in their correct locations // so there is no need to process notification if (maybeReloadDiskBoundaries()) return; // we need a write lock here since we move sstables from one strategy instance to another readLock.lock(); try { for (SSTableReader sstable : sstables) { int index = compactionStrategyIndexFor(sstable); if (sstable.isRepaired()) { unrepaired.get(index).removeSSTable(sstable); repaired.get(index).addSSTable(sstable); } else { repaired.get(index).removeSSTable(sstable); unrepaired.get(index).addSSTable(sstable); } } } finally { readLock.unlock(); } } private void handleDeletingNotification(SSTableReader deleted) { // If reloaded, SSTables will be placed in their correct locations // so there is no need to process notification if (maybeReloadDiskBoundaries()) return; readLock.lock(); try { compactionStrategyFor(deleted).removeSSTable(deleted); } finally { readLock.unlock(); } } public void handleNotification(INotification notification, Object sender) { if (notification instanceof SSTableAddedNotification) { handleFlushNotification(((SSTableAddedNotification) notification).added); } else if (notification instanceof SSTableListChangedNotification) { SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed); } else if (notification instanceof SSTableRepairStatusChanged) { handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables); } else if (notification instanceof SSTableDeletingNotification) { handleDeletingNotification(((SSTableDeletingNotification) notification).deleting); } } public void enable() { writeLock.lock(); try { if (repaired != null) repaired.forEach(AbstractCompactionStrategy::enable); if (unrepaired != null) unrepaired.forEach(AbstractCompactionStrategy::enable); // enable this last to make sure the strategies are ready to get calls. enabled = true; } finally { writeLock.unlock(); } } public void disable() { writeLock.lock(); try { // disable this first avoid asking disabled strategies for compaction tasks enabled = false; if (repaired != null) repaired.forEach(AbstractCompactionStrategy::disable); if (unrepaired != null) unrepaired.forEach(AbstractCompactionStrategy::disable); } finally { writeLock.unlock(); } }
Create ISSTableScanners from the given sstables Delegates the call to the compaction strategies to allow LCS to create a scanner
Params:
  • sstables –
  • ranges –
Returns:
/** * Create ISSTableScanners from the given sstables * * Delegates the call to the compaction strategies to allow LCS to create a scanner * @param sstables * @param ranges * @return */
@SuppressWarnings("resource") public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { maybeReloadDiskBoundaries(); readLock.lock(); try { assert repaired.size() == unrepaired.size(); List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(); List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>(); for (int i = 0; i < repaired.size(); i++) { repairedSSTables.add(new HashSet<>()); unrepairedSSTables.add(new HashSet<>()); } for (SSTableReader sstable : sstables) { if (sstable.isRepaired()) repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable); else unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable); } List<ISSTableScanner> scanners = new ArrayList<>(sstables.size()); for (int i = 0; i < repairedSSTables.size(); i++) { if (!repairedSSTables.get(i).isEmpty()) scanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), ranges).scanners); } for (int i = 0; i < unrepairedSSTables.size(); i++) { if (!unrepairedSSTables.get(i).isEmpty()) scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), ranges).scanners); } return new AbstractCompactionStrategy.ScannerList(scanners); } finally { readLock.unlock(); } } public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) { return getScanners(sstables, null); } public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) { maybeReloadDiskBoundaries(); readLock.lock(); try { Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s))); Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>(); for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet()) anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue())); return anticompactionGroups; } finally { readLock.unlock(); } } public long getMaxSSTableBytes() { readLock.lock(); try { return unrepaired.get(0).getMaxSSTableBytes(); } finally { readLock.unlock(); } } public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) { maybeReloadDiskBoundaries(); readLock.lock(); try { validateForCompaction(txn.originals()); return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes); } finally { readLock.unlock(); } } private void validateForCompaction(Iterable<SSTableReader> input) { readLock.lock(); try { SSTableReader firstSSTable = Iterables.getFirst(input, null); assert firstSSTable != null; boolean repaired = firstSSTable.isRepaired(); int firstIndex = compactionStrategyIndexFor(firstSSTable); for (SSTableReader sstable : input) { if (sstable.isRepaired() != repaired) throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction"); if (firstIndex != compactionStrategyIndexFor(sstable)) throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction"); } } finally { readLock.unlock(); } } public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput) { maybeReloadDiskBoundaries(); // runWithCompactionsDisabled cancels active compactions and disables them, then we are able // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the // sstables are marked the compactions are re-enabled return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>() { @Override public Collection<AbstractCompactionTask> call() { List<AbstractCompactionTask> tasks = new ArrayList<>(); readLock.lock(); try { for (AbstractCompactionStrategy strategy : repaired) { Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput); if (task != null) tasks.addAll(task); } for (AbstractCompactionStrategy strategy : unrepaired) { Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput); if (task != null) tasks.addAll(task); } } finally { readLock.unlock(); } if (tasks.isEmpty()) return null; return tasks; } }, false, false); }
Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according to whether they are repaired or not, and by disk location. Return a task per disk location and repair status group.
Params:
  • sstables – the sstables to compact
  • gcBefore – gc grace period, throw away tombstones older than this
Returns:a list of compaction tasks corresponding to the sstables requested
/** * Return a list of compaction tasks corresponding to the sstables requested. Split the sstables according * to whether they are repaired or not, and by disk location. Return a task per disk location and repair status * group. * * @param sstables the sstables to compact * @param gcBefore gc grace period, throw away tombstones older than this * @return a list of compaction tasks corresponding to the sstables requested */
public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore) { return getUserDefinedTasks(sstables, gcBefore, false); } public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore, boolean validateForCompaction) { maybeReloadDiskBoundaries(); List<AbstractCompactionTask> ret = new ArrayList<>(); readLock.lock(); try { if (validateForCompaction) validateForCompaction(sstables); Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream() .filter(s -> !s.isMarkedSuspect() && s.isRepaired()) .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s))); Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream() .filter(s -> !s.isMarkedSuspect() && !s.isRepaired()) .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s))); for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet()) ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); for (Map.Entry<Integer, List<SSTableReader>> group : unrepairedSSTables.entrySet()) ret.add(unrepaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore)); return ret; } finally { readLock.unlock(); } }
Deprecated:use getUserDefinedTasks(Collection<SSTableReader>, int) instead.
/** * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead. */
@Deprecated() public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore, true); assert tasks.size() == 1; return tasks.get(0); } public int getEstimatedRemainingTasks() { maybeReloadDiskBoundaries(); int tasks = 0; readLock.lock(); try { for (AbstractCompactionStrategy strategy : repaired) tasks += strategy.getEstimatedRemainingTasks(); for (AbstractCompactionStrategy strategy : unrepaired) tasks += strategy.getEstimatedRemainingTasks(); } finally { readLock.unlock(); } return tasks; } public boolean shouldBeEnabled() { return params.isEnabled(); } public String getName() { maybeReloadDiskBoundaries(); readLock.lock(); try { return unrepaired.get(0).getName(); } finally { readLock.unlock(); } } public List<List<AbstractCompactionStrategy>> getStrategies() { maybeReloadDiskBoundaries(); readLock.lock(); try { return Arrays.asList(repaired, unrepaired); } finally { readLock.unlock(); } } public void setNewLocalCompactionStrategy(CompactionParams params) { logger.info("Switching local compaction strategy from {} to {}}", this.params, params); writeLock.lock(); try { setStrategy(params); if (shouldBeEnabled()) enable(); else disable(); startup(); } finally { writeLock.unlock(); } } private void setStrategy(CompactionParams params) { repaired.forEach(AbstractCompactionStrategy::shutdown); unrepaired.forEach(AbstractCompactionStrategy::shutdown); repaired.clear(); unrepaired.clear(); if (partitionSSTablesByTokenRange) { for (int i = 0; i < currentBoundaries.directories.size(); i++) { repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params)); unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params)); } } else { repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params)); unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params)); } this.params = params; } public CompactionParams getCompactionParams() { return params; } public boolean onlyPurgeRepairedTombstones() { return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES)); } public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, Collection<Index> indexes, LifecycleNewTracker lifecycleNewTracker) { maybeReloadDiskBoundaries(); readLock.lock(); try { if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) { return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker); } else { return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, lifecycleNewTracker); } } finally { readLock.unlock(); } } public boolean isRepaired(AbstractCompactionStrategy strategy) { readLock.lock(); try { return repaired.contains(strategy); } finally { readLock.unlock(); } } public List<String> getStrategyFolders(AbstractCompactionStrategy strategy) { readLock.lock(); try { List<Directories.DataDirectory> locations = currentBoundaries.directories; if (partitionSSTablesByTokenRange) { int unrepairedIndex = unrepaired.indexOf(strategy); if (unrepairedIndex > 0) { return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath()); } int repairedIndex = repaired.indexOf(strategy); if (repairedIndex > 0) { return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath()); } } List<String> folders = new ArrayList<>(locations.size()); for (Directories.DataDirectory location : locations) { folders.add(location.location.getAbsolutePath()); } return folders; } finally { readLock.unlock(); } } public boolean supportsEarlyOpen() { return supportsEarlyOpen; } }