/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.lifecycle;

import java.io.File;
import java.nio.file.Path;
import java.util.*;
import java.util.function.BiFunction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier;
import org.apache.cassandra.utils.concurrent.Transactional;

import static com.google.common.base.Functions.compose;
import static com.google.common.base.Predicates.*;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Iterables.*;
import static java.util.Collections.singleton;
import static org.apache.cassandra.db.lifecycle.Helpers.*;
import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.concurrent.Refs.release;
import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;

IMPORTANT: When this object is involved in a transactional graph, for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take actions that can be rolled back.
/** * IMPORTANT: When this object is involved in a transactional graph, for correct behaviour its commit MUST occur before * any others, since it may legitimately fail. This is consistent with the Transactional API, which permits one failing * action to occur at the beginning of the commit phase, but also *requires* that the prepareToCommit() phase only take * actions that can be rolled back. */
public class LifecycleTransaction extends Transactional.AbstractTransactional implements ILifecycleTransaction { private static final Logger logger = LoggerFactory.getLogger(LifecycleTransaction.class);
A class that represents accumulated modifications to the Tracker. has two instances, one containing modifications that are "staged" (i.e. invisible) and one containing those "logged" that have been made visible through a call to checkpoint()
/** * A class that represents accumulated modifications to the Tracker. * has two instances, one containing modifications that are "staged" (i.e. invisible) * and one containing those "logged" that have been made visible through a call to checkpoint() */
private static class State { // readers that are either brand new, update a previous new reader, or update one of the original readers final Set<SSTableReader> update = new HashSet<>(); // disjoint from update, represents a subset of originals that is no longer needed final Set<SSTableReader> obsolete = new HashSet<>(); void log(State staged) { update.removeAll(staged.obsolete); update.removeAll(staged.update); update.addAll(staged.update); obsolete.addAll(staged.obsolete); } boolean contains(SSTableReader reader) { return update.contains(reader) || obsolete.contains(reader); } boolean isEmpty() { return update.isEmpty() && obsolete.isEmpty(); } void clear() { update.clear(); obsolete.clear(); } @Override public String toString() { return String.format("[obsolete: %s, update: %s]", obsolete, update); } } public final Tracker tracker; // The transaction logs keep track of new and old sstable files private final LogTransaction log; // the original readers this transaction was opened over, and that it guards // (no other transactions may operate over these readers concurrently) private final Set<SSTableReader> originals = new HashSet<>(); // the set of readers we've marked as compacting (only updated on creation and in checkpoint()) private final Set<SSTableReader> marked = new HashSet<>(); // the identity set of readers we've ever encountered; used to ensure we don't accidentally revisit the // same version of a reader. potentially a dangerous property if there are reference counting bugs // as they won't be caught until the transaction's lifespan is over. private final Set<UniqueIdentifier> identities = Collections.newSetFromMap(new IdentityHashMap<>()); // changes that have been made visible private final State logged = new State(); // changes that are pending private final State staged = new State(); // the tidier and their readers, to be used for marking readers obsoleted during a commit private List<LogTransaction.Obsoletion> obsoletions;
construct a Transaction for use in an offline operation
/** * construct a Transaction for use in an offline operation */
public static LifecycleTransaction offline(OperationType operationType, SSTableReader reader) { return offline(operationType, singleton(reader)); }
construct a Transaction for use in an offline operation
/** * construct a Transaction for use in an offline operation */
public static LifecycleTransaction offline(OperationType operationType, Iterable<SSTableReader> readers) { // if offline, for simplicity we just use a dummy tracker Tracker dummy = new Tracker(null, false); dummy.addInitialSSTables(readers); dummy.apply(updateCompacting(emptySet(), readers)); return new LifecycleTransaction(dummy, operationType, readers); }
construct an empty Transaction with no existing readers
/** * construct an empty Transaction with no existing readers */
@SuppressWarnings("resource") // log closed during postCleanup public static LifecycleTransaction offline(OperationType operationType) { Tracker dummy = new Tracker(null, false); return new LifecycleTransaction(dummy, new LogTransaction(operationType, dummy), Collections.emptyList()); } @SuppressWarnings("resource") // log closed during postCleanup LifecycleTransaction(Tracker tracker, OperationType operationType, Iterable<SSTableReader> readers) { this(tracker, new LogTransaction(operationType, tracker), readers); } LifecycleTransaction(Tracker tracker, LogTransaction log, Iterable<SSTableReader> readers) { this.tracker = tracker; this.log = log; for (SSTableReader reader : readers) { originals.add(reader); marked.add(reader); identities.add(reader.instanceId); } } public LogTransaction log() { return log; } @Override //LifecycleNewTracker public OperationType opType() { return log.type(); } public UUID opId() { return log.id(); } public void doPrepare() { // note for future: in anticompaction two different operations use the same Transaction, and both prepareToCommit() // separately: the second prepareToCommit is ignored as a "redundant" transition. since it is only a checkpoint // (and these happen anyway) this is fine but if more logic gets inserted here than is performed in a checkpoint, // it may break this use case, and care is needed checkpoint(); // prepare for compaction obsolete readers as long as they were part of the original set // since those that are not original are early readers that share the same desc with the finals maybeFail(prepareForObsoletion(filterIn(logged.obsolete, originals), log, obsoletions = new ArrayList<>(), null)); log.prepareToCommit(); }
point of no return: commit all changes, but leave all readers marked as compacting
/** * point of no return: commit all changes, but leave all readers marked as compacting */
public Throwable doCommit(Throwable accumulate) { assert staged.isEmpty() : "must be no actions introduced between prepareToCommit and a commit"; if (logger.isTraceEnabled()) logger.trace("Committing transaction over {} staged: {}, logged: {}", originals, staged, logged); // accumulate must be null if we have been used correctly, so fail immediately if it is not maybeFail(accumulate); // transaction log commit failure means we must abort; safe commit is not possible maybeFail(log.commit(null)); // this is now the point of no return; we cannot safely rollback, so we ignore exceptions until we're done // we restore state by obsoleting our obsolete files, releasing our references to them, and updating our size // and notification status for the obsolete and new files accumulate = markObsolete(obsoletions, accumulate); accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate); accumulate = release(selfRefs(logged.obsolete), accumulate); accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.type(), accumulate); return accumulate; }
undo all of the changes made by this transaction, resetting the state to its original form
/** * undo all of the changes made by this transaction, resetting the state to its original form */
public Throwable doAbort(Throwable accumulate) { if (logger.isTraceEnabled()) logger.trace("Aborting transaction over {} staged: {}, logged: {}", originals, staged, logged); accumulate = abortObsoletion(obsoletions, accumulate); if (logged.isEmpty() && staged.isEmpty()) return log.abort(accumulate); // mark obsolete all readers that are not versions of those present in the original set Iterable<SSTableReader> obsolete = filterOut(concatUniq(staged.update, logged.update), originals); logger.trace("Obsoleting {}", obsolete); accumulate = prepareForObsoletion(obsolete, log, obsoletions = new ArrayList<>(), accumulate); // it's safe to abort even if committed, see maybeFail in doCommit() above, in this case it will just report // a failure to abort, which is useful information to have for debug accumulate = log.abort(accumulate); accumulate = markObsolete(obsoletions, accumulate); // replace all updated readers with a version restored to its original state List<SSTableReader> restored = restoreUpdatedOriginals(); List<SSTableReader> invalid = Lists.newArrayList(Iterables.concat(logged.update, logged.obsolete)); accumulate = tracker.apply(updateLiveSet(logged.update, restored), accumulate); accumulate = tracker.notifySSTablesChanged(invalid, restored, OperationType.COMPACTION, accumulate); // setReplaced immediately preceding versions that have not been obsoleted accumulate = setReplaced(logged.update, accumulate); // we have replaced all of logged.update and never made visible staged.update, // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either // any _staged_ obsoletes should either be in staged.update already, and dealt with there, // or is still in its original form (so left as is); in either case no extra action is needed accumulate = release(selfRefs(concat(staged.update, logged.update, logged.obsolete)), accumulate); logged.clear(); staged.clear(); return accumulate; } @Override protected Throwable doPostCleanup(Throwable accumulate) { log.close(); return unmarkCompacting(marked, accumulate); } public boolean isOffline() { return tracker.isDummy(); }
call when a consistent batch of changes is ready to be made atomically visible these will be exposed in the Tracker atomically, or an exception will be thrown; in this case the transaction should be rolled back
/** * call when a consistent batch of changes is ready to be made atomically visible * these will be exposed in the Tracker atomically, or an exception will be thrown; in this case * the transaction should be rolled back */
public void checkpoint() { maybeFail(checkpoint(null)); } private Throwable checkpoint(Throwable accumulate) { if (logger.isTraceEnabled()) logger.trace("Checkpointing staged {}", staged); if (staged.isEmpty()) return accumulate; Set<SSTableReader> toUpdate = toUpdate(); Set<SSTableReader> fresh = copyOf(fresh()); // check the current versions of the readers we're replacing haven't somehow been replaced by someone else checkNotReplaced(filterIn(toUpdate, staged.update)); // ensure any new readers are in the compacting set, since we aren't done with them yet // and don't want anyone else messing with them // apply atomically along with updating the live set of readers tracker.apply(compose(updateCompacting(emptySet(), fresh), updateLiveSet(toUpdate, staged.update))); // log the staged changes and our newly marked readers marked.addAll(fresh); logged.log(staged); // setup our tracker, and mark our prior versions replaced, also releasing our references to them // we do not replace/release obsoleted readers, since we may need to restore them on rollback accumulate = setReplaced(filterOut(toUpdate, staged.obsolete), accumulate); accumulate = release(selfRefs(filterOut(toUpdate, staged.obsolete)), accumulate); staged.clear(); return accumulate; }
update a reader: if !original, this is a reader that is being introduced by this transaction; otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
/** * update a reader: if !original, this is a reader that is being introduced by this transaction; * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction */
public void update(SSTableReader reader, boolean original) { assert !staged.update.contains(reader) : "each reader may only be updated once per checkpoint: " + reader; assert !identities.contains(reader.instanceId) : "each reader instance may only be provided as an update once: " + reader; // check it isn't obsolete, and that it matches the original flag assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not update a reader that has been obsoleted"; assert original == originals.contains(reader) : String.format("the 'original' indicator was incorrect (%s provided): %s", original, reader); staged.update.add(reader); identities.add(reader.instanceId); if (!isOffline()) reader.setupOnline(); } public void update(Collection<SSTableReader> readers, boolean original) { for(SSTableReader reader: readers) { update(reader, original); } }
mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
/** * mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set */
public void obsolete(SSTableReader reader) { logger.trace("Staging for obsolescence {}", reader); // check this is: a reader guarded by the transaction, an instance we have already worked with // and that we haven't already obsoleted it, nor do we have other changes staged for it assert identities.contains(reader.instanceId) : "only reader instances that have previously been provided may be obsoleted: " + reader; assert originals.contains(reader) : "only readers in the 'original' set may be obsoleted: " + reader + " vs " + originals; assert !(logged.obsolete.contains(reader) || staged.obsolete.contains(reader)) : "may not obsolete a reader that has already been obsoleted: " + reader; assert !staged.update.contains(reader) : "may not obsolete a reader that has a staged update (must checkpoint first): " + reader; assert current(reader) == reader : "may only obsolete the latest version of the reader: " + reader; staged.obsolete.add(reader); }
obsolete every file in the original transaction
/** * obsolete every file in the original transaction */
public void obsoleteOriginals() { logger.trace("Staging for obsolescence {}", originals); // if we're obsoleting, we should have no staged updates for the original files assert Iterables.isEmpty(filterIn(staged.update, originals)) : staged.update; // stage obsoletes for any currently visible versions of any original readers Iterables.addAll(staged.obsolete, filterIn(current(), originals)); }
return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged
/** * return the readers we're replacing in checkpoint(), i.e. the currently visible version of those in staged */
private Set<SSTableReader> toUpdate() { return copyOf(filterIn(current(), staged.obsolete, staged.update)); }
new readers that haven't appeared previously (either in the original set or the logged updates)
/** * new readers that haven't appeared previously (either in the original set or the logged updates) */
private Iterable<SSTableReader> fresh() { return filterOut(staged.update, originals, logged.update); }
returns the currently visible readers managed by this transaction
/** * returns the currently visible readers managed by this transaction */
public Iterable<SSTableReader> current() { // i.e., those that are updates that have been logged (made visible), // and any original readers that have neither been obsoleted nor updated return concat(logged.update, filterOut(originals, logged.update, logged.obsolete)); }
update the current replacement of any original reader back to its original start
/** * update the current replacement of any original reader back to its original start */
private List<SSTableReader> restoreUpdatedOriginals() { Iterable<SSTableReader> torestore = filterIn(originals, logged.update, logged.obsolete); return ImmutableList.copyOf(transform(torestore, (reader) -> current(reader).cloneWithRestoredStart(reader.first))); }
the set of readers guarded by this transaction _in their original instance/state_ call current(SSTableReader) on any reader in this set to get the latest instance
/** * the set of readers guarded by this transaction _in their original instance/state_ * call current(SSTableReader) on any reader in this set to get the latest instance */
public Set<SSTableReader> originals() { return Collections.unmodifiableSet(originals); }
indicates if the reader has been marked for obsoletion
/** * indicates if the reader has been marked for obsoletion */
public boolean isObsolete(SSTableReader reader) { return logged.obsolete.contains(reader) || staged.obsolete.contains(reader); }
return the current version of the provided reader, whether or not it is visible or staged; i.e. returns the first version present by testing staged, logged and originals in order.
/** * return the current version of the provided reader, whether or not it is visible or staged; * i.e. returns the first version present by testing staged, logged and originals in order. */
public SSTableReader current(SSTableReader reader) { Set<SSTableReader> container; if (staged.contains(reader)) container = staged.update.contains(reader) ? staged.update : staged.obsolete; else if (logged.contains(reader)) container = logged.update.contains(reader) ? logged.update : logged.obsolete; else if (originals.contains(reader)) container = originals; else throw new AssertionError(); return select(reader, container); }
remove the reader from the set we're modifying
/** * remove the reader from the set we're modifying */
public void cancel(SSTableReader cancel) { logger.trace("Cancelling {} from transaction", cancel); assert originals.contains(cancel) : "may only cancel a reader in the 'original' set: " + cancel + " vs " + originals; assert !(staged.contains(cancel) || logged.contains(cancel)) : "may only cancel a reader that has not been updated or obsoleted in this transaction: " + cancel; originals.remove(cancel); marked.remove(cancel); identities.remove(cancel.instanceId); maybeFail(unmarkCompacting(singleton(cancel), null)); }
remove the readers from the set we're modifying
/** * remove the readers from the set we're modifying */
public void cancel(Iterable<SSTableReader> cancels) { for (SSTableReader cancel : cancels) cancel(cancel); }
remove the provided readers from this Transaction, and return a new Transaction to manage them only permitted to be called if the current Transaction has never been used
/** * remove the provided readers from this Transaction, and return a new Transaction to manage them * only permitted to be called if the current Transaction has never been used */
public LifecycleTransaction split(Collection<SSTableReader> readers) { logger.trace("Splitting {} into new transaction", readers); checkUnused(); for (SSTableReader reader : readers) assert identities.contains(reader.instanceId) : "may only split the same reader instance the transaction was opened with: " + reader; for (SSTableReader reader : readers) { identities.remove(reader.instanceId); originals.remove(reader); marked.remove(reader); } return new LifecycleTransaction(tracker, log.type(), readers); }
check this transaction has never been used
/** * check this transaction has never been used */
private void checkUnused() { assert logged.isEmpty(); assert staged.isEmpty(); assert identities.size() == originals.size(); assert originals.size() == marked.size(); } private Throwable unmarkCompacting(Set<SSTableReader> unmark, Throwable accumulate) { accumulate = tracker.apply(updateCompacting(unmark, emptySet()), accumulate); // when the CFS is invalidated, it will call unreferenceSSTables(). However, unreferenceSSTables only deals // with sstables that aren't currently being compacted. If there are ongoing compactions that finish or are // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here. accumulate = tracker.dropSSTablesIfInvalid(accumulate); return accumulate; } // convenience method for callers that know only one sstable is involved in the transaction public SSTableReader onlyOne() { assert originals.size() == 1; return getFirst(originals, null); } // LifecycleNewTracker @Override public void trackNew(SSTable table) { log.trackNew(table); } @Override public void untrackNew(SSTable table) { log.untrackNew(table); } public static boolean removeUnfinishedLeftovers(ColumnFamilyStore cfs) { return LogTransaction.removeUnfinishedLeftovers(cfs.getDirectories().getCFDirectories()); } public static boolean removeUnfinishedLeftovers(CFMetaData cfMetaData) { return LogTransaction.removeUnfinishedLeftovers(cfMetaData); }
Get the files in the folder specified, provided that the filter returns true. A filter is given each file and its type, and decides which files should be returned and which should be discarded. To classify files into their type, we read transaction log files. Should we fail to read these log files after a few times, we look at onTxnErr to determine what to do.
Params:
  • folder – - the folder to scan
  • onTxnErr – - how to handle a failure to read a txn log file
  • filter – - A function that receives each file and its type, it should return true to have the file returned
Returns:- the list of files that were scanned and for which the filter returned true
/** * Get the files in the folder specified, provided that the filter returns true. * A filter is given each file and its type, and decides which files should be returned * and which should be discarded. To classify files into their type, we read transaction * log files. Should we fail to read these log files after a few times, we look at onTxnErr * to determine what to do. * * @param folder - the folder to scan * @param onTxnErr - how to handle a failure to read a txn log file * @param filter - A function that receives each file and its type, it should return true to have the file returned * @return - the list of files that were scanned and for which the filter returned true */
public static List<File> getFiles(Path folder, BiFunction<File, Directories.FileType, Boolean> filter, Directories.OnTxnErr onTxnErr) { return new LogAwareFileLister(folder, filter, onTxnErr).list(); }
Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.) Useful because there are times when we know GC has been invoked; also exposed as an mbean.
/** * Retry all deletions that failed the first time around (presumably b/c the sstable was still mmap'd.) * Useful because there are times when we know GC has been invoked; also exposed as an mbean. */
public static void rescheduleFailedDeletions() { LogTransaction.rescheduleFailedDeletions(); }
Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader) so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed.
/** * Deletions run on the nonPeriodicTasks executor, (both failedDeletions or global tidiers in SSTableReader) * so by scheduling a new empty task and waiting for it we ensure any prior deletion has completed. */
public static void waitForDeletions() { LogTransaction.waitForDeletions(); } // a class representing the current state of the reader within this transaction, encoding the actions both logged // and pending, and the reader instances that are visible now, and will be after the next checkpoint (with null // indicating either obsolescence, or that the reader does not occur in the transaction; which is defined // by the corresponding Action) @VisibleForTesting public static class ReaderState { public enum Action { UPDATED, OBSOLETED, NONE; public static Action get(boolean updated, boolean obsoleted) { assert !(updated && obsoleted); return updated ? UPDATED : obsoleted ? OBSOLETED : NONE; } } final Action staged; final Action logged; final SSTableReader nextVisible; final SSTableReader currentlyVisible; final boolean original; public ReaderState(Action logged, Action staged, SSTableReader currentlyVisible, SSTableReader nextVisible, boolean original) { this.staged = staged; this.logged = logged; this.currentlyVisible = currentlyVisible; this.nextVisible = nextVisible; this.original = original; } public boolean equals(Object that) { return that instanceof ReaderState && equals((ReaderState) that); } public boolean equals(ReaderState that) { return this.staged == that.staged && this.logged == that.logged && this.original == that.original && this.currentlyVisible == that.currentlyVisible && this.nextVisible == that.nextVisible; } public String toString() { return String.format("[logged=%s staged=%s original=%s]", logged, staged, original); } public static SSTableReader visible(SSTableReader reader, Predicate<SSTableReader> obsolete, Collection<SSTableReader> ... selectFrom) { return obsolete.apply(reader) ? null : selectFirst(reader, selectFrom); } } @VisibleForTesting public ReaderState state(SSTableReader reader) { SSTableReader currentlyVisible = ReaderState.visible(reader, in(logged.obsolete), logged.update, originals); SSTableReader nextVisible = ReaderState.visible(reader, orIn(staged.obsolete, logged.obsolete), staged.update, logged.update, originals); return new ReaderState(ReaderState.Action.get(logged.update.contains(reader), logged.obsolete.contains(reader)), ReaderState.Action.get(staged.update.contains(reader), staged.obsolete.contains(reader)), currentlyVisible, nextVisible, originals.contains(reader) ); } public String toString() { return originals.toString(); } }