package org.apache.cassandra.repair;
import java.net.InetAddress;
import java.util.List;
import com.google.common.util.concurrent.AbstractFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.tracing.Tracing;
public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable
{
private static Logger logger = LoggerFactory.getLogger(SyncTask.class);
protected final RepairJobDesc desc;
protected final InetAddress firstEndpoint;
protected final InetAddress secondEndpoint;
private final List<Range<Token>> rangesToSync;
protected volatile SyncStat stat;
public SyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress secondEndpoint, List<Range<Token>> rangesToSync)
{
this.desc = desc;
this.firstEndpoint = firstEndpoint;
this.secondEndpoint = secondEndpoint;
this.rangesToSync = rangesToSync;
}
public void run()
{
stat = new SyncStat(new NodePair(firstEndpoint, secondEndpoint), rangesToSync.size());
String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, firstEndpoint, secondEndpoint, desc.columnFamily);
if (rangesToSync.isEmpty())
{
logger.info(String.format(format, "are consistent"));
Tracing.traceRepair("Endpoint {} is consistent with {} for {}", firstEndpoint, secondEndpoint, desc.columnFamily);
set(stat);
return;
}
logger.info(String.format(format, "have " + rangesToSync.size() + " range(s) out of sync"));
Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", firstEndpoint, rangesToSync.size(), secondEndpoint, desc.columnFamily);
startSync(rangesToSync);
}
public SyncStat getCurrentStat()
{
return stat;
}
protected abstract void startSync(List<Range<Token>> differences);
}