package org.apache.cassandra.dht.tokenallocator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
public class NoReplicationTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
{
PriorityQueue<Weighted<UnitInfo>> sortedUnits = Queues.newPriorityQueue();
Map<Unit, PriorityQueue<Weighted<TokenInfo>>> tokensInUnits = Maps.newHashMap();
private static final double MAX_TAKEOVER_RATIO = 0.90;
private static final double MIN_TAKEOVER_RATIO = 1.0 - MAX_TAKEOVER_RATIO;
public NoReplicationTokenAllocator(NavigableMap<Token, Unit> sortedTokens,
ReplicationStrategy<Unit> strategy,
IPartitioner partitioner)
{
super(sortedTokens, strategy, partitioner);
}
private TokenInfo<Unit> createTokenInfos(Map<Unit, UnitInfo<Unit>> units)
{
if (units.isEmpty())
return null;
TokenInfo<Unit> prev = null;
TokenInfo<Unit> first = null;
for (Map.Entry<Token, Unit> en : sortedTokens.entrySet())
{
Token t = en.getKey();
UnitInfo<Unit> ni = units.get(en.getValue());
TokenInfo<Unit> ti = new TokenInfo<>(t, ni);
first = ti.insertAfter(first, prev);
prev = ti;
}
TokenInfo<Unit> curr = first;
tokensInUnits.clear();
sortedUnits.clear();
do
{
populateTokenInfoAndAdjustUnit(curr);
curr = curr.next;
} while (curr != first);
for (UnitInfo<Unit> unitInfo : units.values())
{
sortedUnits.add(new Weighted<UnitInfo>(unitInfo.ownership, unitInfo));
}
return first;
}
protected void createTokenInfos()
{
createTokenInfos(createUnitInfos(Maps.newHashMap()));
}
private void populateTokenInfoAndAdjustUnit(TokenInfo<Unit> token)
{
token.replicationStart = token.prevInRing().token;
token.replicationThreshold = token.token;
token.replicatedOwnership = token.replicationStart.size(token.token);
token.owningUnit.ownership += token.replicatedOwnership;
PriorityQueue<Weighted<TokenInfo>> unitTokens = tokensInUnits.get(token.owningUnit.unit);
if (unitTokens == null)
{
unitTokens = Queues.newPriorityQueue();
tokensInUnits.put(token.owningUnit.unit, unitTokens);
}
unitTokens.add(new Weighted<TokenInfo>(token.replicatedOwnership, token));
}
private Collection<Token> generateRandomTokens(UnitInfo<Unit> newUnit, int numTokens, Map<Unit, UnitInfo<Unit>> unitInfos)
{
Set<Token> tokens = new HashSet<>(numTokens);
while (tokens.size() < numTokens)
{
Token token = partitioner.getRandomToken();
if (!sortedTokens.containsKey(token))
{
tokens.add(token);
sortedTokens.put(token, newUnit.unit);
}
}
unitInfos.put(newUnit.unit, newUnit);
createTokenInfos(unitInfos);
return tokens;
}
public Collection<Token> addUnit(Unit newUnit, int numTokens)
{
assert !tokensInUnits.containsKey(newUnit);
Map<Object, GroupInfo> groups = Maps.newHashMap();
UnitInfo<Unit> newUnitInfo = new UnitInfo<>(newUnit, 0, groups, strategy);
Map<Unit, UnitInfo<Unit>> unitInfos = createUnitInfos(groups);
if (unitInfos.isEmpty())
return generateRandomTokens(newUnitInfo, numTokens, unitInfos);
if (numTokens > sortedTokens.size())
return generateRandomTokens(newUnitInfo, numTokens, unitInfos);
TokenInfo<Unit> head = createTokenInfos(unitInfos);
double targetAverage = 0.0;
double sum = 0.0;
List<Weighted<UnitInfo>> unitsToChange = new ArrayList<>();
for (int i = 0; i < numTokens; i++)
{
Weighted<UnitInfo> unit = sortedUnits.peek();
if (unit == null)
break;
sum += unit.weight;
double average = sum / (unitsToChange.size() + 2);
if (unit.weight <= average)
break;
sortedUnits.remove();
unitsToChange.add(unit);
targetAverage = average;
}
List<Token> newTokens = Lists.newArrayListWithCapacity(numTokens);
int nr = 0;
for (Weighted<UnitInfo> unit : unitsToChange)
{
int tokensToChange = numTokens / unitsToChange.size() + (nr < numTokens % unitsToChange.size() ? 1 : 0);
Queue<Weighted<TokenInfo>> unitTokens = tokensInUnits.get(unit.value.unit);
List<Weighted<TokenInfo>> tokens = Lists.newArrayListWithCapacity(tokensToChange);
double workWeight = 0;
for (int i = 0; i < tokensToChange; i++)
{
Weighted<TokenInfo> wt = unitTokens.remove();
tokens.add(wt);
workWeight += wt.weight;
unit.value.ownership -= wt.weight;
}
double toTakeOver = unit.weight - targetAverage;
for (Weighted<TokenInfo> wt : tokens)
{
double slice;
Token token;
if (toTakeOver < workWeight)
{
slice = toTakeOver / workWeight;
if (slice < MIN_TAKEOVER_RATIO)
slice = MIN_TAKEOVER_RATIO;
if (slice > MAX_TAKEOVER_RATIO)
slice = MAX_TAKEOVER_RATIO;
}
else
{
slice = MAX_TAKEOVER_RATIO;
}
token = partitioner.split(wt.value.prevInRing().token, wt.value.token, slice);
sortedTokens.put(token, newUnit);
TokenInfo<Unit> ti = new TokenInfo<>(token, newUnitInfo);
ti.insertAfter(head, wt.value.prevInRing());
populateTokenInfoAndAdjustUnit(ti);
populateTokenInfoAndAdjustUnit(wt.value);
newTokens.add(token);
}
sortedUnits.add(new Weighted<>(unit.value.ownership, unit.value));
++nr;
}
sortedUnits.add(new Weighted<>(newUnitInfo.ownership, newUnitInfo));
return newTokens;
}
void removeUnit(Unit n)
{
Iterator<Weighted<UnitInfo>> it = sortedUnits.iterator();
while (it.hasNext())
{
if (it.next().value.unit.equals(n))
{
it.remove();
break;
}
}
PriorityQueue<Weighted<TokenInfo>> tokenInfos = tokensInUnits.remove(n);
Collection<Token> tokens = Lists.newArrayListWithCapacity(tokenInfos.size());
for (Weighted<TokenInfo> tokenInfo : tokenInfos)
{
tokens.add(tokenInfo.value.token);
}
sortedTokens.keySet().removeAll(tokens);
}
public int getReplicas()
{
return 1;
}
}