package org.apache.cassandra.hints;
import java.io.File;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.metrics.HintsServiceMetrics;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
final class HintsDispatcher implements AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(HintsDispatcher.class);
private enum Action { CONTINUE, ABORT }
private final HintsReader reader;
private final UUID hostId;
private final InetAddress address;
private final int messagingVersion;
private final BooleanSupplier abortRequested;
private InputPosition currentPagePosition;
private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, BooleanSupplier abortRequested)
{
currentPagePosition = null;
this.reader = reader;
this.hostId = hostId;
this.address = address;
this.messagingVersion = messagingVersion;
this.abortRequested = abortRequested;
}
static HintsDispatcher create(File file, RateLimiter rateLimiter, InetAddress address, UUID hostId, BooleanSupplier abortRequested)
{
int messagingVersion = MessagingService.instance().getVersion(address);
return new HintsDispatcher(HintsReader.open(file, rateLimiter), hostId, address, messagingVersion, abortRequested);
}
public void close()
{
reader.close();
}
void seek(InputPosition position)
{
reader.seek(position);
}
boolean dispatch()
{
for (HintsReader.Page page : reader)
{
currentPagePosition = page.position;
if (dispatch(page) != Action.CONTINUE)
return false;
}
return true;
}
InputPosition dispatchPosition()
{
return currentPagePosition;
}
private Action dispatch(HintsReader.Page page)
{
return sendHintsAndAwait(page);
}
private Action sendHintsAndAwait(HintsReader.Page page)
{
Collection<Callback> callbacks = new ArrayList<>();
Action action = reader.descriptor().messagingVersion() == messagingVersion
? sendHints(page.buffersIterator(), callbacks, this::sendEncodedHint)
: sendHints(page.hintsIterator(), callbacks, this::sendHint);
if (action == Action.ABORT)
return action;
boolean hadFailures = false;
for (Callback cb : callbacks)
{
Callback.Outcome outcome = cb.await();
updateMetrics(outcome);
if (outcome != Callback.Outcome.SUCCESS)
hadFailures = true;
}
return hadFailures ? Action.ABORT : Action.CONTINUE;
}
private void updateMetrics(Callback.Outcome outcome)
{
switch (outcome)
{
case SUCCESS:
HintsServiceMetrics.hintsSucceeded.mark();
break;
case FAILURE:
HintsServiceMetrics.hintsFailed.mark();
break;
case TIMEOUT:
HintsServiceMetrics.hintsTimedOut.mark();
break;
}
}
private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction)
{
while (hints.hasNext())
{
if (abortRequested.getAsBoolean())
return Action.ABORT;
callbacks.add(sendFunction.apply(hints.next()));
}
return Action.CONTINUE;
}
private Callback sendHint(Hint hint)
{
Callback callback = new Callback();
HintMessage message = new HintMessage(hostId, hint);
MessagingService.instance().sendRRWithFailure(message.createMessageOut(), address, callback);
return callback;
}
private Callback sendEncodedHint(ByteBuffer hint)
{
Callback callback = new Callback();
EncodedHintMessage message = new EncodedHintMessage(hostId, hint, messagingVersion);
MessagingService.instance().sendRRWithFailure(message.createMessageOut(), address, callback);
return callback;
}
private static final class Callback implements IAsyncCallbackWithFailure
{
enum Outcome { SUCCESS, TIMEOUT, FAILURE, INTERRUPTED }
private final long start = System.nanoTime();
private final SimpleCondition condition = new SimpleCondition();
private volatile Outcome outcome;
Outcome await()
{
long timeout = TimeUnit.MILLISECONDS.toNanos(MessagingService.Verb.HINT.getTimeout()) - (System.nanoTime() - start);
boolean timedOut;
try
{
timedOut = !condition.await(timeout, TimeUnit.NANOSECONDS);
}
catch (InterruptedException e)
{
logger.warn("Hint dispatch was interrupted", e);
return Outcome.INTERRUPTED;
}
return timedOut ? Outcome.TIMEOUT : outcome;
}
public void onFailure(InetAddress from, RequestFailureReason failureReason)
{
outcome = Outcome.FAILURE;
condition.signalAll();
}
public void response(MessageIn msg)
{
outcome = Outcome.SUCCESS;
condition.signalAll();
}
public boolean isLatencyForSnitch()
{
return false;
}
@Override
public boolean supportsBackPressure()
{
return true;
}
}
}