package org.apache.cassandra.hints;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.SyncUtil;
final class HintsStore
{
private static final Logger logger = LoggerFactory.getLogger(HintsStore.class);
public final UUID hostId;
private final File hintsDirectory;
private final ImmutableMap<String, Object> writerParams;
private final Map<HintsDescriptor, InputPosition> dispatchPositions;
private final Deque<HintsDescriptor> dispatchDequeue;
private final Queue<HintsDescriptor> blacklistedFiles;
private volatile long lastUsedTimestamp;
private volatile HintsWriter hintsWriter;
private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
this.hostId = hostId;
this.hintsDirectory = hintsDirectory;
this.writerParams = writerParams;
dispatchPositions = new ConcurrentHashMap<>();
dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors);
blacklistedFiles = new ConcurrentLinkedQueue<>();
lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L);
}
static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors)
{
descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp));
return new HintsStore(hostId, hintsDirectory, writerParams, descriptors);
}
@VisibleForTesting
int getDispatchQueueSize()
{
return dispatchDequeue.size();
}
InetAddress address()
{
return StorageService.instance.getEndpointForHostId(hostId);
}
boolean isLive()
{
InetAddress address = address();
return address != null && FailureDetector.instance.isAlive(address);
}
HintsDescriptor poll()
{
return dispatchDequeue.poll();
}
void offerFirst(HintsDescriptor descriptor)
{
dispatchDequeue.offerFirst(descriptor);
}
void offerLast(HintsDescriptor descriptor)
{
dispatchDequeue.offerLast(descriptor);
}
void deleteAllHints()
{
HintsDescriptor descriptor;
while ((descriptor = poll()) != null)
{
cleanUp(descriptor);
delete(descriptor);
}
while ((descriptor = blacklistedFiles.poll()) != null)
{
cleanUp(descriptor);
delete(descriptor);
}
}
void delete(HintsDescriptor descriptor)
{
File hintsFile = new File(hintsDirectory, descriptor.fileName());
if (hintsFile.delete())
logger.info("Deleted hint file {}", descriptor.fileName());
else
logger.error("Failed to delete hint file {}", descriptor.fileName());
new File(hintsDirectory, descriptor.checksumFileName()).delete();
}
boolean hasFiles()
{
return !dispatchDequeue.isEmpty();
}
InputPosition getDispatchOffset(HintsDescriptor descriptor)
{
return dispatchPositions.get(descriptor);
}
void markDispatchOffset(HintsDescriptor descriptor, InputPosition inputPosition)
{
dispatchPositions.put(descriptor, inputPosition);
}
void cleanUp(HintsDescriptor descriptor)
{
dispatchPositions.remove(descriptor);
}
void blacklist(HintsDescriptor descriptor)
{
blacklistedFiles.add(descriptor);
}
boolean isWriting()
{
return hintsWriter != null;
}
HintsWriter getOrOpenWriter()
{
if (hintsWriter == null)
hintsWriter = openWriter();
return hintsWriter;
}
HintsWriter getWriter()
{
return hintsWriter;
}
private HintsWriter openWriter()
{
lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1);
HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams);
try
{
return HintsWriter.create(hintsDirectory, descriptor);
}
catch (IOException e)
{
throw new FSWriteError(e, descriptor.fileName());
}
}
void closeWriter()
{
if (hintsWriter != null)
{
hintsWriter.close();
offerLast(hintsWriter.descriptor());
hintsWriter = null;
SyncUtil.trySyncDir(hintsDirectory);
}
}
void fsyncWriter()
{
if (hintsWriter != null)
hintsWriter.fsync();
}
}