package org.apache.cassandra.db.lifecycle;
import java.io.File;
import java.io.FilenameFilter;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
final class LogRecord
{
public enum Type
{
UNKNOWN,
ADD,
REMOVE,
COMMIT,
ABORT;
public static Type fromPrefix(String prefix)
{
return valueOf(prefix.toUpperCase());
}
public boolean hasFile()
{
return this == Type.ADD || this == Type.REMOVE;
}
public boolean matches(LogRecord record)
{
return this == record.type;
}
public boolean isFinal() { return this == Type.COMMIT || this == Type.ABORT; }
}
public final static class Status
{
Optional<String> error = Optional.empty();
boolean partial = false;
LogRecord onDiskRecord;
void setError(String error)
{
if (!this.error.isPresent())
this.error = Optional.of(error);
}
boolean hasError()
{
return error.isPresent();
}
}
public final Type type;
public final Optional<String> absolutePath;
public final long updateTime;
public final int numFiles;
public final String raw;
public final long checksum;
public final Status status;
static Pattern REGEX = Pattern.compile("^(add|remove|commit|abort):\\[([^,]*),?([^,]*),?([^,]*)\\]\\[(\\d*)\\]$", Pattern.CASE_INSENSITIVE);
public static LogRecord make(String line)
{
try
{
Matcher matcher = REGEX.matcher(line);
if (!matcher.matches())
return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
.setError(String.format("Failed to parse [%s]", line));
Type type = Type.fromPrefix(matcher.group(1));
return new LogRecord(type,
matcher.group(2) + Component.separator,
Long.parseLong(matcher.group(3)),
Integer.parseInt(matcher.group(4)),
Long.parseLong(matcher.group(5)),
line);
}
catch (IllegalArgumentException e)
{
return new LogRecord(Type.UNKNOWN, null, 0, 0, 0, line)
.setError(String.format("Failed to parse line: %s", e.getMessage()));
}
}
public static LogRecord makeCommit(long updateTime)
{
return new LogRecord(Type.COMMIT, updateTime);
}
public static LogRecord makeAbort(long updateTime)
{
return new LogRecord(Type.ABORT, updateTime);
}
public static LogRecord make(Type type, SSTable table)
{
String absoluteTablePath = absolutePath(table.descriptor.baseFilename());
return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(), absoluteTablePath);
}
public static Map<SSTable, LogRecord> make(Type type, Iterable<SSTableReader> tables)
{
Map<String, SSTable> absolutePaths = new HashMap<>();
for (SSTableReader table : tables)
absolutePaths.put(absolutePath(table.descriptor.baseFilename()), table);
Map<String, List<File>> existingFiles = getExistingFiles(absolutePaths.keySet());
Map<SSTable, LogRecord> records = new HashMap<>(existingFiles.size());
for (Map.Entry<String, List<File>> entry : existingFiles.entrySet())
{
List<File> filesOnDisk = entry.getValue();
String baseFileName = entry.getKey();
SSTable sstable = absolutePaths.get(baseFileName);
records.put(sstable, make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName));
}
return records;
}
private static String absolutePath(String baseFilename)
{
return FileUtils.getCanonicalPath(baseFilename + Component.separator);
}
public LogRecord withExistingFiles()
{
return make(type, getExistingFiles(), 0, absolutePath.get());
}
public static LogRecord make(Type type, List<File> files, int minFiles, String absolutePath)
{
List<Long> positiveModifiedTimes = files.stream().map(File::lastModified).filter(lm -> lm > 0).collect(Collectors.toList());
long lastModified = positiveModifiedTimes.stream().reduce(0L, Long::max);
return new LogRecord(type, absolutePath, lastModified, Math.max(minFiles, positiveModifiedTimes.size()));
}
private LogRecord(Type type, long updateTime)
{
this(type, null, updateTime, 0, 0, null);
}
private LogRecord(Type type,
String absolutePath,
long updateTime,
int numFiles)
{
this(type, absolutePath, updateTime, numFiles, 0, null);
}
private LogRecord(Type type,
String absolutePath,
long updateTime,
int numFiles,
long checksum,
String raw)
{
assert !type.hasFile() || absolutePath != null : "Expected file path for file records";
this.type = type;
this.absolutePath = type.hasFile() ? Optional.of(absolutePath) : Optional.empty();
this.updateTime = type == Type.REMOVE ? updateTime : 0;
this.numFiles = type.hasFile() ? numFiles : 0;
this.status = new Status();
if (raw == null)
{
assert checksum == 0;
this.checksum = computeChecksum();
this.raw = format();
}
else
{
this.checksum = checksum;
this.raw = raw;
}
}
LogRecord setError(String error)
{
status.setError(error);
return this;
}
String error()
{
return status.error.orElse("");
}
void setPartial()
{
status.partial = true;
}
boolean partial()
{
return status.partial;
}
boolean isValid()
{
return !status.hasError() && type != Type.UNKNOWN;
}
boolean isInvalid()
{
return !isValid();
}
boolean isInvalidOrPartial()
{
return isInvalid() || partial();
}
private String format()
{
return String.format("%s:[%s,%d,%d][%d]",
type.toString(),
absolutePath(),
updateTime,
numFiles,
checksum);
}
public List<File> getExistingFiles()
{
assert absolutePath.isPresent() : "Expected a path in order to get existing files";
return getExistingFiles(absolutePath.get());
}
public static List<File> getExistingFiles(String absoluteFilePath)
{
Path path = Paths.get(absoluteFilePath);
File[] files = path.getParent().toFile().listFiles((dir, name) -> name.startsWith(path.getFileName().toString()));
return files == null ? Collections.emptyList() : Arrays.asList(files);
}
public static Map<String, List<File>> getExistingFiles(Set<String> absoluteFilePaths)
{
Set<File> uniqueDirectories = absoluteFilePaths.stream().map(path -> Paths.get(path).getParent().toFile()).collect(Collectors.toSet());
Map<String, List<File>> fileMap = new HashMap<>();
FilenameFilter ff = (dir, name) -> {
Descriptor descriptor = null;
try
{
descriptor = Descriptor.fromFilename(dir, name).left;
}
catch (Throwable t)
{
}
String absolutePath = descriptor != null ? absolutePath(descriptor.baseFilename()) : null;
if (absolutePath != null && absoluteFilePaths.contains(absolutePath))
fileMap.computeIfAbsent(absolutePath, k -> new ArrayList<>()).add(new File(dir, name));
return false;
};
for (File f : uniqueDirectories)
f.listFiles(ff);
return fileMap;
}
public boolean isFinal()
{
return type.isFinal();
}
String fileName()
{
return absolutePath.isPresent() ? Paths.get(absolutePath.get()).getFileName().toString() : "";
}
boolean isInFolder(Path folder)
{
return absolutePath.isPresent()
? FileUtils.isContained(folder.toFile(), Paths.get(absolutePath.get()).toFile())
: false;
}
private String absolutePath()
{
if (!absolutePath.isPresent())
return "";
String ret = absolutePath.get();
assert ret.charAt(ret.length() -1) == Component.separator : "Invalid absolute path, should end with '-'";
return ret.substring(0, ret.length() - 1);
}
@Override
public int hashCode()
{
return Objects.hash(type, absolutePath, numFiles, updateTime);
}
@Override
public boolean equals(Object obj)
{
if (!(obj instanceof LogRecord))
return false;
final LogRecord other = (LogRecord)obj;
return type == other.type &&
absolutePath.equals(other.absolutePath) &&
numFiles == other.numFiles &&
updateTime == other.updateTime;
}
@Override
public String toString()
{
return raw;
}
long computeChecksum()
{
CRC32 crc32 = new CRC32();
crc32.update((absolutePath()).getBytes(FileUtils.CHARSET));
crc32.update(type.toString().getBytes(FileUtils.CHARSET));
FBUtilities.updateChecksumInt(crc32, (int) updateTime);
FBUtilities.updateChecksumInt(crc32, (int) (updateTime >>> 32));
FBUtilities.updateChecksumInt(crc32, numFiles);
return crc32.getValue() & (Long.MAX_VALUE);
}
LogRecord asType(Type type)
{
return new LogRecord(type, absolutePath.orElse(null), updateTime, numFiles);
}
}