package org.apache.cassandra.db.lifecycle;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Interval;
import static com.google.common.base.Predicates.equalTo;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.ImmutableList.copyOf;
import static com.google.common.collect.ImmutableList.of;
import static com.google.common.collect.Iterables.all;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
import static org.apache.cassandra.db.lifecycle.Helpers.filterOut;
import static org.apache.cassandra.db.lifecycle.Helpers.replace;
public class View
{
public final List<Memtable> liveMemtables;
public final List<Memtable> flushingMemtables;
final Set<SSTableReader> compacting;
final Set<SSTableReader> sstables;
final Map<SSTableReader, SSTableReader> sstablesMap;
final Map<SSTableReader, SSTableReader> compactingMap;
final SSTableIntervalTree intervalTree;
View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Map<SSTableReader, SSTableReader> sstables, Map<SSTableReader, SSTableReader> compacting, SSTableIntervalTree intervalTree)
{
assert liveMemtables != null;
assert flushingMemtables != null;
assert sstables != null;
assert compacting != null;
assert intervalTree != null;
this.liveMemtables = liveMemtables;
this.flushingMemtables = flushingMemtables;
this.sstablesMap = sstables;
this.sstables = sstablesMap.keySet();
this.compactingMap = compacting;
this.compacting = compactingMap.keySet();
this.intervalTree = intervalTree;
}
public Memtable getCurrentMemtable()
{
return liveMemtables.get(liveMemtables.size() - 1);
}
public Iterable<Memtable> getAllMemtables()
{
return concat(flushingMemtables, liveMemtables);
}
public Set<SSTableReader> liveSSTables()
{
return sstables;
}
public Iterable<SSTableReader> sstables(SSTableSet sstableSet, Predicate<SSTableReader> filter)
{
return filter(select(sstableSet), filter);
}
@VisibleForTesting
public Iterable<SSTableReader> allKnownSSTables()
{
return Iterables.concat(sstables, filterOut(compacting, sstables));
}
public Iterable<SSTableReader> select(SSTableSet sstableSet)
{
switch (sstableSet)
{
case LIVE:
return sstables;
case NONCOMPACTING:
return filter(sstables, (s) -> !compacting.contains(s));
case CANONICAL:
Set<SSTableReader> canonicalSSTables = new HashSet<>();
for (SSTableReader sstable : compacting)
if (sstable.openReason != SSTableReader.OpenReason.EARLY)
canonicalSSTables.add(sstable);
for (SSTableReader sstable : sstables)
if (!compacting.contains(sstable) && sstable.openReason != SSTableReader.OpenReason.EARLY)
canonicalSSTables.add(sstable);
return canonicalSSTables;
default:
throw new IllegalStateException();
}
}
public Iterable<SSTableReader> getUncompacting(Iterable<SSTableReader> candidates)
{
return filter(candidates, new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader sstable)
{
return !compacting.contains(sstable);
}
});
}
public boolean isEmpty()
{
return sstables.isEmpty()
&& liveMemtables.size() <= 1
&& flushingMemtables.size() == 0
&& (liveMemtables.size() == 0 || liveMemtables.get(0).getOperations() == 0);
}
@Override
public String toString()
{
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
public Iterable<SSTableReader> liveSSTablesInBounds(PartitionPosition left, PartitionPosition right)
{
assert !AbstractBounds.strictlyWrapsAround(left, right);
if (intervalTree.isEmpty())
return Collections.emptyList();
PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
return intervalTree.search(Interval.create(left, stopInTree));
}
public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree)
{
assert !AbstractBounds.strictlyWrapsAround(left, right);
if (intervalTree.isEmpty())
return Collections.emptyList();
PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
return intervalTree.search(Interval.create(left, stopInTree));
}
public static Function<View, Iterable<SSTableReader>> selectFunction(SSTableSet sstableSet)
{
return (view) -> view.select(sstableSet);
}
public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)
{
return (view) -> view.sstables(sstableSet, filter);
}
public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key)
{
assert sstableSet == SSTableSet.LIVE;
return (view) -> view.intervalTree.search(key);
}
public static Function<View, Iterable<SSTableReader>> selectLive(AbstractBounds<PartitionPosition> rowBounds)
{
return (view) -> view.liveSSTablesInBounds(rowBounds.left, rowBounds.right);
}
static Function<View, View> updateCompacting(final Set<SSTableReader> unmark, final Iterable<SSTableReader> mark)
{
if (unmark.isEmpty() && Iterables.isEmpty(mark))
return Functions.identity();
return new Function<View, View>()
{
public View apply(View view)
{
assert all(mark, Helpers.idIn(view.sstablesMap));
return new View(view.liveMemtables, view.flushingMemtables, view.sstablesMap,
replace(view.compactingMap, unmark, mark),
view.intervalTree);
}
};
}
static Predicate<View> permitCompacting(final Iterable<SSTableReader> readers)
{
return new Predicate<View>()
{
public boolean apply(View view)
{
for (SSTableReader reader : readers)
if (view.compacting.contains(reader) || view.sstablesMap.get(reader) != reader || reader.isMarkedCompacted())
return false;
return true;
}
};
}
static Function<View, View> updateLiveSet(final Set<SSTableReader> remove, final Iterable<SSTableReader> add)
{
if (remove.isEmpty() && Iterables.isEmpty(add))
return Functions.identity();
return new Function<View, View>()
{
public View apply(View view)
{
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, remove, add);
return new View(view.liveMemtables, view.flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
static Function<View, View> switchMemtable(final Memtable newMemtable)
{
return new Function<View, View>()
{
public View apply(View view)
{
List<Memtable> newLive = ImmutableList.<Memtable>builder().addAll(view.liveMemtables).add(newMemtable).build();
assert newLive.size() == view.liveMemtables.size() + 1;
return new View(newLive, view.flushingMemtables, view.sstablesMap, view.compactingMap, view.intervalTree);
}
};
}
static Function<View, View> markFlushing(final Memtable toFlush)
{
return new Function<View, View>()
{
public View apply(View view)
{
List<Memtable> live = view.liveMemtables, flushing = view.flushingMemtables;
List<Memtable> newLive = copyOf(filter(live, not(equalTo(toFlush))));
List<Memtable> newFlushing = copyOf(concat(filter(flushing, lessThan(toFlush)),
of(toFlush),
filter(flushing, not(lessThan(toFlush)))));
assert newLive.size() == live.size() - 1;
assert newFlushing.size() == flushing.size() + 1;
return new View(newLive, newFlushing, view.sstablesMap, view.compactingMap, view.intervalTree);
}
};
}
static Function<View, View> replaceFlushed(final Memtable memtable, final Iterable<SSTableReader> flushed)
{
return new Function<View, View>()
{
public View apply(View view)
{
List<Memtable> flushingMemtables = copyOf(filter(view.flushingMemtables, not(equalTo(memtable))));
assert flushingMemtables.size() == view.flushingMemtables.size() - 1;
if (flushed == null || Iterables.isEmpty(flushed))
return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
view.compactingMap, view.intervalTree);
Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
SSTableIntervalTree.build(sstableMap.keySet()));
}
};
}
private static <T extends Comparable<T>> Predicate<T> lessThan(final T lessThan)
{
return new Predicate<T>()
{
public boolean apply(T t)
{
return t.compareTo(lessThan) < 0;
}
};
}
}