package org.apache.cassandra.service.paxos;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Striped;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.UUIDGen;
public class PaxosState
{
private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024);
private final Commit promised;
private final Commit accepted;
private final Commit mostRecentCommit;
public PaxosState(DecoratedKey key, CFMetaData metadata)
{
this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
}
public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit)
{
assert promised.update.partitionKey().equals(accepted.update.partitionKey()) && accepted.update.partitionKey().equals(mostRecentCommit.update.partitionKey());
assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata();
this.promised = promised;
this.accepted = accepted;
this.mostRecentCommit = mostRecentCommit;
}
public static PrepareResponse prepare(Commit toPrepare)
{
long start = System.nanoTime();
try
{
Lock lock = LOCKS.get(toPrepare.update.partitionKey());
lock.lock();
try
{
int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot);
PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec);
if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
SystemKeyspace.savePaxosPromise(toPrepare);
return new PrepareResponse(true, state.accepted, state.mostRecentCommit);
}
else
{
Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised);
return new PrepareResponse(false, state.promised, state.mostRecentCommit);
}
}
finally
{
lock.unlock();
}
}
finally
{
Keyspace.open(toPrepare.update.metadata().ksName).getColumnFamilyStore(toPrepare.update.metadata().cfId).metric.casPrepare.addNano(System.nanoTime() - start);
}
}
public static Boolean propose(Commit proposal)
{
long start = System.nanoTime();
try
{
Lock lock = LOCKS.get(proposal.update.partitionKey());
lock.lock();
try
{
int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot);
PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec);
if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
SystemKeyspace.savePaxosProposal(proposal);
return true;
}
else
{
Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised);
return false;
}
}
finally
{
lock.unlock();
}
}
finally
{
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casPropose.addNano(System.nanoTime() - start);
}
}
public static void commit(Commit proposal)
{
long start = System.nanoTime();
try
{
if (UUIDGen.unixTimestamp(proposal.ballot) >= SystemKeyspace.getTruncatedAt(proposal.update.metadata().cfId))
{
Tracing.trace("Committing proposal {}", proposal);
Mutation mutation = proposal.makeMutation();
Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
}
else
{
Tracing.trace("Not committing proposal {} as ballot timestamp predates last truncation time", proposal);
}
SystemKeyspace.savePaxosCommit(proposal);
}
finally
{
Keyspace.open(proposal.update.metadata().ksName).getColumnFamilyStore(proposal.update.metadata().cfId).metric.casCommit.addNano(System.nanoTime() - start);
}
}
}