package org.apache.cassandra.locator;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BiMultiValMap;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SortedBiMultiValMap;
public class TokenMetadata
{
private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap;
private final BiMap<InetAddress, UUID> endpointToHostIdMap;
private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
private final Set<InetAddress> leavingEndpoints = new HashSet<>();
private final ConcurrentMap<String, PendingRangeMaps> pendingRanges = new ConcurrentHashMap<String, PendingRangeMaps>();
private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private volatile ArrayList<Token> sortedTokens;
private volatile Topology topology;
public final IPartitioner partitioner;
private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
{
public int compare(InetAddress o1, InetAddress o2)
{
return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress()));
}
};
private volatile long ringVersion = 0;
public TokenMetadata()
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
HashBiMap.<InetAddress, UUID>create(),
Topology.empty(),
DatabaseDescriptor.getPartitioner());
}
private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
this.partitioner = partitioner;
endpointToHostIdMap = endpointsMap;
sortedTokens = sortTokens();
}
@VisibleForTesting
public TokenMetadata cloneWithNewPartitioner(IPartitioner newPartitioner)
{
return new TokenMetadata(tokenToEndpointMap, endpointToHostIdMap, topology, newPartitioner);
}
private ArrayList<Token> sortTokens()
{
return new ArrayList<>(tokenToEndpointMap.keySet());
}
public int pendingRangeChanges(InetAddress source)
{
int n = 0;
Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
lock.readLock().lock();
try
{
for (Token token : bootstrapTokens.keySet())
for (Range<Token> range : sourceRanges)
if (range.contains(token))
n++;
}
finally
{
lock.readLock().unlock();
}
return n;
}
public void updateNormalToken(Token token, InetAddress endpoint)
{
updateNormalTokens(Collections.singleton(token), endpoint);
}
public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint)
{
Multimap<InetAddress, Token> endpointTokens = HashMultimap.create();
for (Token token : tokens)
endpointTokens.put(endpoint, token);
updateNormalTokens(endpointTokens);
}
public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
{
if (endpointTokens.isEmpty())
return;
lock.writeLock().lock();
try
{
boolean shouldSortTokens = false;
Topology.Builder topologyBuilder = topology.unbuild();
for (InetAddress endpoint : endpointTokens.keySet())
{
Collection<Token> tokens = endpointTokens.get(endpoint);
assert tokens != null && !tokens.isEmpty();
bootstrapTokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
topologyBuilder.addEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
replacementToOriginal.remove(endpoint);
removeFromMoving(endpoint);
for (Token token : tokens)
{
InetAddress prev = tokenToEndpointMap.put(token, endpoint);
if (!endpoint.equals(prev))
{
if (prev != null)
logger.warn("Token {} changing ownership from {} to {}", token, prev, endpoint);
shouldSortTokens = true;
}
}
}
topology = topologyBuilder.build();
if (shouldSortTokens)
sortedTokens = sortTokens();
}
finally
{
lock.writeLock().unlock();
}
}
public void updateHostId(UUID hostId, InetAddress endpoint)
{
assert hostId != null;
assert endpoint != null;
lock.writeLock().lock();
try
{
InetAddress storedEp = endpointToHostIdMap.inverse().get(hostId);
if (storedEp != null)
{
if (!storedEp.equals(endpoint) && (FailureDetector.instance.isAlive(storedEp)))
{
throw new RuntimeException(String.format("Host ID collision between active endpoint %s and %s (id=%s)",
storedEp,
endpoint,
hostId));
}
}
UUID storedId = endpointToHostIdMap.get(endpoint);
if ((storedId != null) && (!storedId.equals(hostId)))
logger.warn("Changing {}'s host ID from {} to {}", endpoint, storedId, hostId);
endpointToHostIdMap.forcePut(endpoint, hostId);
}
finally
{
lock.writeLock().unlock();
}
}
public UUID getHostId(InetAddress endpoint)
{
lock.readLock().lock();
try
{
return endpointToHostIdMap.get(endpoint);
}
finally
{
lock.readLock().unlock();
}
}
public InetAddress getEndpointForHostId(UUID hostId)
{
lock.readLock().lock();
try
{
return endpointToHostIdMap.inverse().get(hostId);
}
finally
{
lock.readLock().unlock();
}
}
public Map<InetAddress, UUID> getEndpointToHostIdMapForReading()
{
lock.readLock().lock();
try
{
Map<InetAddress, UUID> readMap = new HashMap<>();
readMap.putAll(endpointToHostIdMap);
return readMap;
}
finally
{
lock.readLock().unlock();
}
}
@Deprecated
public void addBootstrapToken(Token token, InetAddress endpoint)
{
addBootstrapTokens(Collections.singleton(token), endpoint);
}
public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
{
addBootstrapTokens(tokens, endpoint, null);
}
private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
{
assert tokens != null && !tokens.isEmpty();
assert endpoint != null;
lock.writeLock().lock();
try
{
InetAddress oldEndpoint;
for (Token token : tokens)
{
oldEndpoint = bootstrapTokens.get(token);
if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
oldEndpoint = tokenToEndpointMap.get(token);
if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original))
throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
}
bootstrapTokens.removeValue(endpoint);
for (Token token : tokens)
bootstrapTokens.put(token, endpoint);
}
finally
{
lock.writeLock().unlock();
}
}
public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
{
assert replacingTokens != null && !replacingTokens.isEmpty();
assert newNode != null && oldNode != null;
lock.writeLock().lock();
try
{
Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
{
throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
"different set of tokens %s.", newNode, oldNode, oldNodeTokens,
replacingTokens));
}
logger.debug("Replacing {} with {}", newNode, oldNode);
replacementToOriginal.put(newNode, oldNode);
addBootstrapTokens(replacingTokens, newNode, oldNode);
}
finally
{
lock.writeLock().unlock();
}
}
public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
{
lock.readLock().lock();
try
{
return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
}
finally
{
lock.readLock().unlock();
}
}
public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
{
lock.readLock().lock();
try
{
return Optional.ofNullable((replacementToOriginal.get(endpoint)));
}
finally
{
lock.readLock().unlock();
}
}
public void removeBootstrapTokens(Collection<Token> tokens)
{
assert tokens != null && !tokens.isEmpty();
lock.writeLock().lock();
try
{
for (Token token : tokens)
bootstrapTokens.remove(token);
}
finally
{
lock.writeLock().unlock();
}
}
public void addLeavingEndpoint(InetAddress endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
leavingEndpoints.add(endpoint);
}
finally
{
lock.writeLock().unlock();
}
}
public void addMovingEndpoint(Token token, InetAddress endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
movingEndpoints.add(Pair.create(token, endpoint));
}
finally
{
lock.writeLock().unlock();
}
}
public void removeEndpoint(InetAddress endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
bootstrapTokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
topology = topology.unbuild().removeEndpoint(endpoint).build();
leavingEndpoints.remove(endpoint);
if (replacementToOriginal.remove(endpoint) != null)
{
logger.debug("Node {} failed during replace.", endpoint);
}
endpointToHostIdMap.remove(endpoint);
sortedTokens = sortTokens();
invalidateCachedRings();
}
finally
{
lock.writeLock().unlock();
}
}
public Topology updateTopology(InetAddress endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
logger.info("Updating topology for {}", endpoint);
topology = topology.unbuild().updateEndpoint(endpoint).build();
invalidateCachedRings();
return topology;
}
finally
{
lock.writeLock().unlock();
}
}
public Topology updateTopology()
{
lock.writeLock().lock();
try
{
logger.info("Updating topology for all endpoints that have changed");
topology = topology.unbuild().updateEndpoints().build();
invalidateCachedRings();
return topology;
}
finally
{
lock.writeLock().unlock();
}
}
public void removeFromMoving(InetAddress endpoint)
{
assert endpoint != null;
lock.writeLock().lock();
try
{
for (Pair<Token, InetAddress> pair : movingEndpoints)
{
if (pair.right.equals(endpoint))
{
movingEndpoints.remove(pair);
break;
}
}
invalidateCachedRings();
}
finally
{
lock.writeLock().unlock();
}
}
public Collection<Token> getTokens(InetAddress endpoint)
{
assert endpoint != null;
assert isMember(endpoint);
lock.readLock().lock();
try
{
return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint));
}
finally
{
lock.readLock().unlock();
}
}
@Deprecated
public Token getToken(InetAddress endpoint)
{
return getTokens(endpoint).iterator().next();
}
public boolean isMember(InetAddress endpoint)
{
assert endpoint != null;
lock.readLock().lock();
try
{
return tokenToEndpointMap.inverse().containsKey(endpoint);
}
finally
{
lock.readLock().unlock();
}
}
public boolean isLeaving(InetAddress endpoint)
{
assert endpoint != null;
lock.readLock().lock();
try
{
return leavingEndpoints.contains(endpoint);
}
finally
{
lock.readLock().unlock();
}
}
public boolean isMoving(InetAddress endpoint)
{
assert endpoint != null;
lock.readLock().lock();
try
{
for (Pair<Token, InetAddress> pair : movingEndpoints)
{
if (pair.right.equals(endpoint))
return true;
}
return false;
}
finally
{
lock.readLock().unlock();
}
}
private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<>();
public TokenMetadata cloneOnlyTokenMap()
{
lock.readLock().lock();
try
{
return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
HashBiMap.create(endpointToHostIdMap),
topology,
partitioner);
}
finally
{
lock.readLock().unlock();
}
}
public TokenMetadata cachedOnlyTokenMap()
{
TokenMetadata tm = cachedTokenMap.get();
if (tm != null)
return tm;
synchronized (this)
{
if ((tm = cachedTokenMap.get()) != null)
return tm;
tm = cloneOnlyTokenMap();
cachedTokenMap.set(tm);
return tm;
}
}
public TokenMetadata cloneAfterAllLeft()
{
lock.readLock().lock();
try
{
return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
}
finally
{
lock.readLock().unlock();
}
}
private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
{
for (InetAddress endpoint : leavingEndpoints)
allLeftMetadata.removeEndpoint(endpoint);
return allLeftMetadata;
}
public TokenMetadata cloneAfterAllSettled()
{
lock.readLock().lock();
try
{
TokenMetadata metadata = cloneOnlyTokenMap();
for (InetAddress endpoint : leavingEndpoints)
metadata.removeEndpoint(endpoint);
for (Pair<Token, InetAddress> pair : movingEndpoints)
metadata.updateNormalToken(pair.left, pair.right);
return metadata;
}
finally
{
lock.readLock().unlock();
}
}
public InetAddress getEndpoint(Token token)
{
lock.readLock().lock();
try
{
return tokenToEndpointMap.get(token);
}
finally
{
lock.readLock().unlock();
}
}
public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens)
{
Collection<Range<Token>> ranges = new ArrayList<>(tokens.size());
for (Token right : tokens)
ranges.add(new Range<>(getPredecessor(right), right));
return ranges;
}
@Deprecated
public Range<Token> getPrimaryRangeFor(Token right)
{
return getPrimaryRangesFor(Arrays.asList(right)).iterator().next();
}
public ArrayList<Token> sortedTokens()
{
return sortedTokens;
}
public Multimap<Range<Token>, InetAddress> getPendingRangesMM(String keyspaceName)
{
Multimap<Range<Token>, InetAddress> map = HashMultimap.create();
PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
if (pendingRangeMaps != null)
{
for (Map.Entry<Range<Token>, List<InetAddress>> entry : pendingRangeMaps)
{
Range<Token> range = entry.getKey();
for (InetAddress address : entry.getValue())
{
map.put(range, address);
}
}
}
return map;
}
public PendingRangeMaps getPendingRanges(String keyspaceName)
{
return this.pendingRanges.get(keyspaceName);
}
public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint)
{
List<Range<Token>> ranges = new ArrayList<>();
for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries())
{
if (entry.getValue().equals(endpoint))
{
ranges.add(entry.getKey());
}
}
return ranges;
}
public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
{
long startedAt = System.currentTimeMillis();
synchronized (pendingRanges)
{
BiMultiValMap<Token, InetAddress> bootstrapTokensClone;
Set<InetAddress> leavingEndpointsClone;
Set<Pair<Token, InetAddress>> movingEndpointsClone;
TokenMetadata metadata;
lock.readLock().lock();
try
{
if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
{
if (logger.isTraceEnabled())
logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
pendingRanges.put(keyspaceName, new PendingRangeMaps());
return;
}
if (logger.isDebugEnabled())
logger.debug("Starting pending range calculation for {}", keyspaceName);
bootstrapTokensClone = new BiMultiValMap<>(this.bootstrapTokens);
leavingEndpointsClone = new HashSet<>(this.leavingEndpoints);
movingEndpointsClone = new HashSet<>(this.movingEndpoints);
metadata = this.cloneOnlyTokenMap();
}
finally
{
lock.readLock().unlock();
}
pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokensClone,
leavingEndpointsClone, movingEndpointsClone));
long took = System.currentTimeMillis() - startedAt;
if (logger.isDebugEnabled())
logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
if (logger.isTraceEnabled())
logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
}
}
private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
TokenMetadata metadata,
BiMultiValMap<Token, InetAddress> bootstrapTokens,
Set<InetAddress> leavingEndpoints,
Set<Pair<Token, InetAddress>> movingEndpoints)
{
PendingRangeMaps newPendingRanges = new PendingRangeMaps();
Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
for (InetAddress endpoint : leavingEndpoints)
affectedRanges.addAll(addressRanges.get(endpoint));
for (Range<Token> range : affectedRanges)
{
Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
{
newPendingRanges.addPendingRange(range, address);
}
}
Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
for (InetAddress endpoint : bootstrapAddresses.keySet())
{
Collection<Token> tokens = bootstrapAddresses.get(endpoint);
allLeftMetadata.updateNormalTokens(tokens, endpoint);
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
newPendingRanges.addPendingRange(range, endpoint);
}
allLeftMetadata.removeEndpoint(endpoint);
}
for (Pair<Token, InetAddress> moving : movingEndpoints)
{
Set<Range<Token>> moveAffectedRanges = new HashSet<>();
InetAddress endpoint = moving.right;
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
moveAffectedRanges.add(range);
}
allLeftMetadata.updateNormalToken(moving.left, endpoint);
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
moveAffectedRanges.add(range);
}
for(Range<Token> range : moveAffectedRanges)
{
Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
for(final InetAddress address : difference)
{
Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
newRanges.removeAll(oldRanges);
for(Range<Token> newRange : newRanges)
{
for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
{
newPendingRanges.addPendingRange(pendingRange, address);
}
}
}
}
allLeftMetadata.removeEndpoint(endpoint);
}
return newPendingRanges;
}
public Token getPredecessor(Token token)
{
List<Token> tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1);
}
public Token getSuccessor(Token token)
{
List<Token> tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1);
}
private String tokenToEndpointMapKeysAsStrings()
{
lock.readLock().lock();
try
{
return StringUtils.join(tokenToEndpointMap.keySet(), ", ");
}
finally
{
lock.readLock().unlock();
}
}
public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
{
lock.readLock().lock();
try
{
return new BiMultiValMap<>(bootstrapTokens);
}
finally
{
lock.readLock().unlock();
}
}
public Set<InetAddress> getAllEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(endpointToHostIdMap.keySet());
}
finally
{
lock.readLock().unlock();
}
}
public Set<InetAddress> getLeavingEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(leavingEndpoints);
}
finally
{
lock.readLock().unlock();
}
}
public Set<Pair<Token, InetAddress>> getMovingEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(movingEndpoints);
}
finally
{
lock.readLock().unlock();
}
}
public static int firstTokenIndex(final ArrayList<Token> ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
int i = Collections.binarySearch(ring, start);
if (i < 0)
{
i = (i + 1) * (-1);
if (i >= ring.size())
i = insertMin ? -1 : 0;
}
return i;
}
public static Token firstToken(final ArrayList<Token> ring, Token start)
{
return ring.get(firstTokenIndex(ring, start, false));
}
public static Iterator<Token> ringIterator(final ArrayList<Token> ring, Token start, boolean includeMin)
{
if (ring.isEmpty())
return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken())
: Collections.emptyIterator();
final boolean insertMin = includeMin && !ring.get(0).isMinimum();
final int startIndex = firstTokenIndex(ring, start, insertMin);
return new AbstractIterator<Token>()
{
int j = startIndex;
protected Token computeNext()
{
if (j < -1)
return endOfData();
try
{
if (j == -1)
return start.getPartitioner().getMinimumToken();
return ring.get(j);
}
finally
{
j++;
if (j == ring.size())
j = insertMin ? -1 : 0;
if (j == startIndex)
j = -2;
}
}
};
}
public void clearUnsafe()
{
lock.writeLock().lock();
try
{
tokenToEndpointMap.clear();
endpointToHostIdMap.clear();
bootstrapTokens.clear();
leavingEndpoints.clear();
pendingRanges.clear();
movingEndpoints.clear();
sortedTokens.clear();
topology = Topology.empty();
invalidateCachedRings();
}
finally
{
lock.writeLock().unlock();
}
}
public String toString()
{
StringBuilder sb = new StringBuilder();
lock.readLock().lock();
try
{
Multimap<InetAddress, Token> endpointToTokenMap = tokenToEndpointMap.inverse();
Set<InetAddress> eps = endpointToTokenMap.keySet();
if (!eps.isEmpty())
{
sb.append("Normal Tokens:");
sb.append(System.getProperty("line.separator"));
for (InetAddress ep : eps)
{
sb.append(ep);
sb.append(':');
sb.append(endpointToTokenMap.get(ep));
sb.append(System.getProperty("line.separator"));
}
}
if (!bootstrapTokens.isEmpty())
{
sb.append("Bootstrapping Tokens:" );
sb.append(System.getProperty("line.separator"));
for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
{
sb.append(entry.getValue()).append(':').append(entry.getKey());
sb.append(System.getProperty("line.separator"));
}
}
if (!leavingEndpoints.isEmpty())
{
sb.append("Leaving Endpoints:");
sb.append(System.getProperty("line.separator"));
for (InetAddress ep : leavingEndpoints)
{
sb.append(ep);
sb.append(System.getProperty("line.separator"));
}
}
if (!pendingRanges.isEmpty())
{
sb.append("Pending Ranges:");
sb.append(System.getProperty("line.separator"));
sb.append(printPendingRanges());
}
}
finally
{
lock.readLock().unlock();
}
return sb.toString();
}
private String printPendingRanges()
{
StringBuilder sb = new StringBuilder();
for (PendingRangeMaps pendingRangeMaps : pendingRanges.values())
{
sb.append(pendingRangeMaps.printPendingRanges());
}
return sb.toString();
}
public Collection<InetAddress> pendingEndpointsFor(Token token, String keyspaceName)
{
PendingRangeMaps pendingRangeMaps = this.pendingRanges.get(keyspaceName);
if (pendingRangeMaps == null)
return Collections.emptyList();
return pendingRangeMaps.pendingEndpointsFor(token);
}
public Collection<InetAddress> getWriteEndpoints(Token token, String keyspaceName, Collection<InetAddress> naturalEndpoints)
{
return ImmutableList.copyOf(Iterables.concat(naturalEndpoints, pendingEndpointsFor(token, keyspaceName)));
}
public Multimap<InetAddress, Token> getEndpointToTokenMapForReading()
{
lock.readLock().lock();
try
{
Multimap<InetAddress, Token> cloned = HashMultimap.create();
for (Map.Entry<Token, InetAddress> entry : tokenToEndpointMap.entrySet())
cloned.put(entry.getValue(), entry.getKey());
return cloned;
}
finally
{
lock.readLock().unlock();
}
}
public Map<Token, InetAddress> getNormalAndBootstrappingTokenToEndpointMap()
{
lock.readLock().lock();
try
{
Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
map.putAll(tokenToEndpointMap);
map.putAll(bootstrapTokens);
return map;
}
finally
{
lock.readLock().unlock();
}
}
public Topology getTopology()
{
assert this != StorageService.instance.getTokenMetadata();
return topology;
}
public long getRingVersion()
{
return ringVersion;
}
public void invalidateCachedRings()
{
ringVersion++;
cachedTokenMap.set(null);
}
public DecoratedKey decorateKey(ByteBuffer key)
{
return partitioner.decorateKey(key);
}
public static class Topology
{
private final ImmutableMultimap<String, InetAddress> dcEndpoints;
private final ImmutableMap<String, ImmutableMultimap<String, InetAddress>> dcRacks;
private final ImmutableMap<InetAddress, Pair<String, String>> currentLocations;
private Topology(Builder builder)
{
this.dcEndpoints = ImmutableMultimap.copyOf(builder.dcEndpoints);
ImmutableMap.Builder<String, ImmutableMultimap<String, InetAddress>> dcRackBuilder = ImmutableMap.builder();
for (Map.Entry<String, Multimap<String, InetAddress>> entry : builder.dcRacks.entrySet())
dcRackBuilder.put(entry.getKey(), ImmutableMultimap.copyOf(entry.getValue()));
this.dcRacks = dcRackBuilder.build();
this.currentLocations = ImmutableMap.copyOf(builder.currentLocations);
}
public Multimap<String, InetAddress> getDatacenterEndpoints()
{
return dcEndpoints;
}
public ImmutableMap<String, ImmutableMultimap<String, InetAddress>> getDatacenterRacks()
{
return dcRacks;
}
public Pair<String, String> getLocation(InetAddress addr)
{
return currentLocations.get(addr);
}
Builder unbuild()
{
return new Builder(this);
}
static Builder builder()
{
return new Builder();
}
static Topology empty()
{
return builder().build();
}
private static class Builder
{
private final Multimap<String, InetAddress> dcEndpoints;
private final Map<String, Multimap<String, InetAddress>> dcRacks;
private final Map<InetAddress, Pair<String, String>> currentLocations;
Builder()
{
this.dcEndpoints = HashMultimap.create();
this.dcRacks = new HashMap<>();
this.currentLocations = new HashMap<>();
}
Builder(Topology from)
{
this.dcEndpoints = HashMultimap.create(from.dcEndpoints);
this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size());
for (Map.Entry<String, ImmutableMultimap<String, InetAddress>> entry : from.dcRacks.entrySet())
dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue()));
this.currentLocations = new HashMap<>(from.currentLocations);
}
Builder addEndpoint(InetAddress ep)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
String dc = snitch.getDatacenter(ep);
String rack = snitch.getRack(ep);
Pair<String, String> current = currentLocations.get(ep);
if (current != null)
{
if (current.left.equals(dc) && current.right.equals(rack))
return this;
doRemoveEndpoint(ep, current);
}
doAddEndpoint(ep, dc, rack);
return this;
}
private void doAddEndpoint(InetAddress ep, String dc, String rack)
{
dcEndpoints.put(dc, ep);
if (!dcRacks.containsKey(dc))
dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
dcRacks.get(dc).put(rack, ep);
currentLocations.put(ep, Pair.create(dc, rack));
}
Builder removeEndpoint(InetAddress ep)
{
if (!currentLocations.containsKey(ep))
return this;
doRemoveEndpoint(ep, currentLocations.remove(ep));
return this;
}
private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
{
dcRacks.get(current.left).remove(current.right, ep);
dcEndpoints.remove(current.left, ep);
}
Builder updateEndpoint(InetAddress ep)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
if (snitch == null || !currentLocations.containsKey(ep))
return this;
updateEndpoint(ep, snitch);
return this;
}
Builder updateEndpoints()
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
if (snitch == null)
return this;
for (InetAddress ep : currentLocations.keySet())
updateEndpoint(ep, snitch);
return this;
}
private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
{
Pair<String, String> current = currentLocations.get(ep);
String dc = snitch.getDatacenter(ep);
String rack = snitch.getRack(ep);
if (dc.equals(current.left) && rack.equals(current.right))
return;
doRemoveEndpoint(ep, current);
doAddEndpoint(ep, dc, rack);
}
Topology build()
{
return new Topology(this);
}
}
}
}