package org.apache.cassandra.db;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Splitter;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
public class DiskBoundaryManager
{
private static final Logger logger = LoggerFactory.getLogger(DiskBoundaryManager.class);
private volatile DiskBoundaries diskBoundaries;
public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
{
if (!cfs.getPartitioner().splitter().isPresent())
return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
if (diskBoundaries == null || diskBoundaries.isOutOfDate())
{
synchronized (this)
{
if (diskBoundaries == null || diskBoundaries.isOutOfDate())
{
logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
DiskBoundaries oldBoundaries = diskBoundaries;
diskBoundaries = getDiskBoundaryValue(cfs);
logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
}
}
}
return diskBoundaries;
}
public void invalidate()
{
if (diskBoundaries != null)
diskBoundaries.invalidate();
}
private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
{
Collection<Range<Token>> localRanges;
long ringVersion;
TokenMetadata tmd;
do
{
tmd = StorageService.instance.getTokenMetadata();
ringVersion = tmd.getRingVersion();
if (StorageService.instance.isBootstrapMode()
&& !StorageService.isReplacingSameAddress())
{
PendingRangeCalculatorService.instance.blockUntilFinished();
localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
}
else
{
localRanges = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd.cloneAfterAllSettled()).get(FBUtilities.getBroadcastAddress());
}
logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion);
}
while (ringVersion != tmd.getRingVersion());
int directoriesVersion;
Directories.DataDirectory[] dirs;
do
{
directoriesVersion = BlacklistedDirectories.getDirectoriesVersion();
dirs = cfs.getDirectories().getWriteableLocations();
}
while (directoriesVersion != BlacklistedDirectories.getDirectoriesVersion());
if (localRanges == null || localRanges.isEmpty())
return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion);
List<Range<Token>> sortedLocalRanges = Range.sort(localRanges);
List<PartitionPosition> positions = getDiskBoundaries(sortedLocalRanges, cfs.getPartitioner(), dirs);
return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion);
}
private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> sortedLocalRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
{
assert partitioner.splitter().isPresent();
Splitter splitter = partitioner.splitter().get();
boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1;
List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, dontSplitRanges);
if (dontSplitRanges && boundaries.size() < dataDirectories.length)
boundaries = splitter.splitOwnedRanges(dataDirectories.length, sortedLocalRanges, false);
List<PartitionPosition> diskBoundaries = new ArrayList<>();
for (int i = 0; i < boundaries.size() - 1; i++)
diskBoundaries.add(boundaries.get(i).maxKeyBound());
diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
return diskBoundaries;
}
}