package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.*;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.OpOrder;
import static com.google.common.base.Predicates.and;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Iterables.filter;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.cassandra.db.lifecycle.Helpers.*;
import static org.apache.cassandra.db.lifecycle.View.permitCompacting;
import static org.apache.cassandra.db.lifecycle.View.updateCompacting;
import static org.apache.cassandra.db.lifecycle.View.updateLiveSet;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
import static org.apache.cassandra.utils.concurrent.Refs.release;
import static org.apache.cassandra.utils.concurrent.Refs.selfRefs;
public class Tracker
{
private static final Logger logger = LoggerFactory.getLogger(Tracker.class);
private final Collection<INotificationConsumer> subscribers = new CopyOnWriteArrayList<>();
public final ColumnFamilyStore cfstore;
final AtomicReference<View> view;
public final boolean loadsstables;
public Tracker(Memtable memtable, boolean loadsstables)
{
this.cfstore = memtable != null ? memtable.cfs : null;
this.view = new AtomicReference<>();
this.loadsstables = loadsstables;
this.reset(memtable);
}
public LifecycleTransaction tryModify(SSTableReader sstable, OperationType operationType)
{
return tryModify(singleton(sstable), operationType);
}
public LifecycleTransaction tryModify(Iterable<SSTableReader> sstables, OperationType operationType)
{
if (Iterables.isEmpty(sstables))
return new LifecycleTransaction(this, operationType, sstables);
if (null == apply(permitCompacting(sstables), updateCompacting(emptySet(), sstables)))
return null;
return new LifecycleTransaction(this, operationType, sstables);
}
Pair<View, View> apply(Function<View, View> function)
{
return apply(Predicates.<View>alwaysTrue(), function);
}
Throwable apply(Function<View, View> function, Throwable accumulate)
{
try
{
apply(function);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
return accumulate;
}
Pair<View, View> apply(Predicate<View> permit, Function<View, View> function)
{
while (true)
{
View cur = view.get();
if (!permit.apply(cur))
return null;
View updated = function.apply(cur);
if (view.compareAndSet(cur, updated))
return Pair.create(cur, updated);
}
}
Throwable updateSizeTracking(Iterable<SSTableReader> oldSSTables, Iterable<SSTableReader> newSSTables, Throwable accumulate)
{
if (isDummy())
return accumulate;
long add = 0;
for (SSTableReader sstable : newSSTables)
{
if (logger.isTraceEnabled())
logger.trace("adding {} to list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
try
{
add += sstable.bytesOnDisk();
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
long subtract = 0;
for (SSTableReader sstable : oldSSTables)
{
if (logger.isTraceEnabled())
logger.trace("removing {} from list of files tracked for {}.{}", sstable.descriptor, cfstore.keyspace.getName(), cfstore.name);
try
{
subtract += sstable.bytesOnDisk();
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
StorageMetrics.load.inc(add - subtract);
cfstore.metric.liveDiskSpaceUsed.inc(add - subtract);
cfstore.metric.totalDiskSpaceUsed.inc(add);
return accumulate;
}
public void addInitialSSTables(Iterable<SSTableReader> sstables)
{
if (!isDummy())
setupOnline(sstables);
apply(updateLiveSet(emptySet(), sstables));
maybeFail(updateSizeTracking(emptySet(), sstables, null));
}
public void addSSTables(Iterable<SSTableReader> sstables)
{
addInitialSSTables(sstables);
maybeIncrementallyBackup(sstables);
notifyAdded(sstables);
}
@VisibleForTesting
public void reset(Memtable memtable)
{
view.set(new View(memtable != null ? singletonList(memtable) : Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
Collections.emptyMap(),
SSTableIntervalTree.empty()));
}
public Throwable dropSSTablesIfInvalid(Throwable accumulate)
{
if (!isDummy() && !cfstore.isValid())
accumulate = dropSSTables(accumulate);
return accumulate;
}
public void dropSSTables()
{
maybeFail(dropSSTables(null));
}
public Throwable dropSSTables(Throwable accumulate)
{
return dropSSTables(Predicates.<SSTableReader>alwaysTrue(), OperationType.UNKNOWN, accumulate);
}
public Throwable dropSSTables(final Predicate<SSTableReader> remove, OperationType operationType, Throwable accumulate)
{
try (LogTransaction txnLogs = new LogTransaction(operationType, this))
{
Pair<View, View> result = apply(view -> {
Set<SSTableReader> toremove = copyOf(filter(view.sstables, and(remove, notIn(view.compacting))));
return updateLiveSet(toremove, emptySet()).apply(view);
});
Set<SSTableReader> removed = Sets.difference(result.left.sstables, result.right.sstables);
assert Iterables.all(removed, remove);
List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
try
{
txnLogs.finish();
if (!removed.isEmpty())
{
accumulate = markObsolete(obsoletions, accumulate);
accumulate = updateSizeTracking(removed, emptySet(), accumulate);
accumulate = release(selfRefs(removed), accumulate);
accumulate = notifySSTablesChanged(removed, Collections.<SSTableReader>emptySet(), txnLogs.type(), accumulate);
}
}
catch (Throwable t)
{
accumulate = abortObsoletion(obsoletions, accumulate);
accumulate = Throwables.merge(accumulate, t);
}
}
catch (Throwable t)
{
accumulate = Throwables.merge(accumulate, t);
}
return accumulate;
}
public void removeUnreadableSSTables(final File directory)
{
maybeFail(dropSSTables(new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader reader)
{
return reader.descriptor.directory.equals(directory);
}
}, OperationType.UNKNOWN, null));
}
public Memtable getMemtableFor(OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
{
for (Memtable memtable : view.get().liveMemtables)
{
if (memtable.accepts(opGroup, commitLogPosition))
return memtable;
}
throw new AssertionError(view.get().liveMemtables.toString());
}
public Memtable switchMemtable(boolean truncating, Memtable newMemtable)
{
Pair<View, View> result = apply(View.switchMemtable(newMemtable));
if (truncating)
notifyRenewed(newMemtable);
else
notifySwitched(result.left.getCurrentMemtable());
return result.left.getCurrentMemtable();
}
public void markFlushing(Memtable memtable)
{
apply(View.markFlushing(memtable));
}
public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
{
assert !isDummy();
if (Iterables.isEmpty(sstables))
{
apply(View.replaceFlushed(memtable, null));
return;
}
sstables.forEach(SSTableReader::setupOnline);
maybeIncrementallyBackup(sstables);
apply(View.replaceFlushed(memtable, sstables));
Throwable fail;
fail = updateSizeTracking(emptySet(), sstables, null);
notifyDiscarded(memtable);
fail = notifyAdded(sstables, fail);
if (!isDummy() && !cfstore.isValid())
dropSSTables();
maybeFail(fail);
}
public Set<SSTableReader> getCompacting()
{
return view.get().compacting;
}
public Iterable<SSTableReader> getUncompacting()
{
return view.get().select(SSTableSet.NONCOMPACTING);
}
public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
{
return view.get().getUncompacting(candidates);
}
public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables)
{
if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
return;
for (SSTableReader sstable : sstables)
{
File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
}
}
Throwable notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType, Throwable accumulate)
{
INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
for (INotificationConsumer subscriber : subscribers)
{
try
{
subscriber.handleNotification(notification, this);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
return accumulate;
}
Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
{
INotification notification = new SSTableAddedNotification(added);
for (INotificationConsumer subscriber : subscribers)
{
try
{
subscriber.handleNotification(notification, this);
}
catch (Throwable t)
{
accumulate = merge(accumulate, t);
}
}
return accumulate;
}
public void notifyAdded(Iterable<SSTableReader> added)
{
maybeFail(notifyAdded(added, null));
}
public void notifySSTableRepairedStatusChanged(Collection<SSTableReader> repairStatusesChanged)
{
INotification notification = new SSTableRepairStatusChanged(repairStatusesChanged);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyDeleting(SSTableReader deleting)
{
INotification notification = new SSTableDeletingNotification(deleting);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyTruncated(long truncatedAt)
{
INotification notification = new TruncationNotification(truncatedAt);
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public void notifyRenewed(Memtable renewed)
{
notify(new MemtableRenewedNotification(renewed));
}
public void notifySwitched(Memtable previous)
{
notify(new MemtableSwitchedNotification(previous));
}
public void notifyDiscarded(Memtable discarded)
{
notify(new MemtableDiscardedNotification(discarded));
}
private void notify(INotification notification)
{
for (INotificationConsumer subscriber : subscribers)
subscriber.handleNotification(notification, this);
}
public boolean isDummy()
{
return cfstore == null || !DatabaseDescriptor.isDaemonInitialized();
}
public void subscribe(INotificationConsumer consumer)
{
subscribers.add(consumer);
}
public void unsubscribe(INotificationConsumer consumer)
{
subscribers.remove(consumer);
}
private static Set<SSTableReader> emptySet()
{
return Collections.emptySet();
}
public View getView()
{
return view.get();
}
@VisibleForTesting
public void removeUnsafe(Set<SSTableReader> toRemove)
{
Pair<View, View> result = apply(view -> {
return updateLiveSet(toRemove, emptySet()).apply(view);
});
}
}