package org.apache.cassandra.dht;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.CachedHashDecoratedKey;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.PartitionerDefinedOrder;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.Pair;
public class RandomPartitioner implements IPartitioner
{
public static final BigInteger ZERO = new BigInteger("0");
public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
public static final BigInteger MAXIMUM = new BigInteger("2").pow(127);
private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>()
{
@Override
protected MessageDigest initialValue()
{
return FBUtilities.newMessageDigest("MD5");
}
@Override
public MessageDigest get()
{
MessageDigest digest = super.get();
digest.reset();
return digest;
}
};
private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(hashToBigInteger(ByteBuffer.allocate(1))));
public static final RandomPartitioner instance = new RandomPartitioner();
public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
private final Splitter splitter = new Splitter(this)
{
public Token tokenForValue(BigInteger value)
{
return new BigIntegerToken(value);
}
public BigInteger valueForToken(Token token)
{
return ((BigIntegerToken)token).getTokenValue();
}
};
public DecoratedKey decorateKey(ByteBuffer key)
{
return new CachedHashDecoratedKey(getToken(key), key);
}
public Token midpoint(Token ltoken, Token rtoken)
{
BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
return new BigIntegerToken(midpair.left);
}
public Token split(Token ltoken, Token rtoken, double ratioToLeft)
{
BigDecimal left = ltoken.equals(MINIMUM) ? BigDecimal.ZERO : new BigDecimal(((BigIntegerToken)ltoken).token),
right = rtoken.equals(MINIMUM) ? BigDecimal.ZERO : new BigDecimal(((BigIntegerToken)rtoken).token),
ratio = BigDecimal.valueOf(ratioToLeft);
BigInteger newToken;
if (left.compareTo(right) < 0)
{
newToken = right.subtract(left).multiply(ratio).add(left).toBigInteger();
}
else
{
BigDecimal max = new BigDecimal(MAXIMUM);
newToken = max.add(right).subtract(left).multiply(ratio).add(left).toBigInteger().mod(MAXIMUM);
}
assert isValidToken(newToken) : "Invalid tokens from split";
return new BigIntegerToken(newToken);
}
public BigIntegerToken getMinimumToken()
{
return MINIMUM;
}
public BigIntegerToken getRandomToken()
{
BigInteger token = hashToBigInteger(GuidGenerator.guidAsBytes());
if ( token.signum() == -1 )
token = token.multiply(BigInteger.valueOf(-1L));
return new BigIntegerToken(token);
}
public BigIntegerToken getRandomToken(Random random)
{
BigInteger token = hashToBigInteger(GuidGenerator.guidAsBytes(random, "host/127.0.0.1", 0));
if ( token.signum() == -1 )
token = token.multiply(BigInteger.valueOf(-1L));
return new BigIntegerToken(token);
}
private boolean isValidToken(BigInteger token) {
return token.compareTo(ZERO) >= 0 && token.compareTo(MAXIMUM) <= 0;
}
private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
{
public ByteBuffer toByteArray(Token token)
{
BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
}
public Token fromByteArray(ByteBuffer bytes)
{
return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
}
public String toString(Token token)
{
BigIntegerToken bigIntegerToken = (BigIntegerToken) token;
return bigIntegerToken.token.toString();
}
public void validate(String token) throws ConfigurationException
{
try
{
if(!isValidToken(new BigInteger(token)))
throw new ConfigurationException("Token must be >= 0 and <= 2**127");
}
catch (NumberFormatException e)
{
throw new ConfigurationException(e.getMessage());
}
}
public Token fromString(String string)
{
return new BigIntegerToken(new BigInteger(string));
}
};
public Token.TokenFactory getTokenFactory()
{
return tokenFactory;
}
public boolean preservesOrder()
{
return false;
}
public static class BigIntegerToken extends ComparableObjectToken<BigInteger>
{
static final long serialVersionUID = -5833589141319293006L;
public BigIntegerToken(BigInteger token)
{
super(token);
}
@VisibleForTesting
public BigIntegerToken(String token)
{
this(new BigInteger(token));
}
@Override
public IPartitioner getPartitioner()
{
return instance;
}
@Override
public long getHeapSize()
{
return HEAP_SIZE;
}
public Token increaseSlightly()
{
return new BigIntegerToken(token.add(BigInteger.ONE));
}
public double size(Token next)
{
BigIntegerToken n = (BigIntegerToken) next;
BigInteger v = n.token.subtract(token);
double d = Math.scalb(v.doubleValue(), -127);
return d > 0.0 ? d : (d + 1.0);
}
}
public BigIntegerToken getToken(ByteBuffer key)
{
if (key.remaining() == 0)
return MINIMUM;
return new BigIntegerToken(hashToBigInteger(key));
}
public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
{
Map<Token, Float> ownerships = new HashMap<Token, Float>();
Iterator<Token> i = sortedTokens.iterator();
if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. Has this node finished starting up?"); }
if (sortedTokens.size() == 1)
{
ownerships.put(i.next(), new Float(1.0));
}
else
{
final BigInteger ri = MAXIMUM;
final BigDecimal r = new BigDecimal(ri);
Token start = i.next(); BigInteger ti = ((BigIntegerToken)start).token;
Token t; BigInteger tim1 = ti;
while (i.hasNext())
{
t = i.next(); ti = ((BigIntegerToken)t).token;
float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue();
ownerships.put(t, x);
tim1 = ti;
}
float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue();
ownerships.put(start, x);
}
return ownerships;
}
public Token getMaximumToken()
{
return new BigIntegerToken(MAXIMUM);
}
public AbstractType<?> getTokenValidator()
{
return IntegerType.instance;
}
public AbstractType<?> partitionOrdering()
{
return partitionOrdering;
}
public Optional<Splitter> splitter()
{
return Optional.of(splitter);
}
private static BigInteger hashToBigInteger(ByteBuffer data)
{
MessageDigest messageDigest = localMD5Digest.get();
if (data.hasArray())
messageDigest.update(data.array(), data.arrayOffset() + data.position(), data.remaining());
else
messageDigest.update(data.duplicate());
return new BigInteger(messageDigest.digest()).abs();
}
}