package org.apache.cassandra.db.commitlog;
import java.io.DataInput;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.security.EncryptionContext;
import org.json.simple.JSONValue;
import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
public class CommitLogDescriptor
{
private static final String SEPARATOR = "-";
private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
private static final String FILENAME_EXTENSION = ".log";
private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION);
static final String COMPRESSION_PARAMETERS_KEY = "compressionParameters";
static final String COMPRESSION_CLASS_KEY = "compressionClass";
public static final int VERSION_12 = 2;
public static final int VERSION_20 = 3;
public static final int VERSION_21 = 4;
public static final int VERSION_22 = 5;
public static final int VERSION_30 = 6;
@VisibleForTesting
public static final int current_version = VERSION_30;
final int version;
public final long id;
public final ParameterizedClass compression;
private final EncryptionContext encryptionContext;
public CommitLogDescriptor(int version, long id, ParameterizedClass compression, EncryptionContext encryptionContext)
{
this.version = version;
this.id = id;
this.compression = compression;
this.encryptionContext = encryptionContext;
}
public CommitLogDescriptor(long id, ParameterizedClass compression, EncryptionContext encryptionContext)
{
this(current_version, id, compression, encryptionContext);
}
public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
{
writeHeader(out, descriptor, Collections.<String, String>emptyMap());
}
public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor, Map<String, String> additionalHeaders)
{
CRC32 crc = new CRC32();
out.putInt(descriptor.version);
updateChecksumInt(crc, descriptor.version);
out.putLong(descriptor.id);
updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
updateChecksumInt(crc, (int) (descriptor.id >>> 32));
if (descriptor.version >= VERSION_22)
{
String parametersString = constructParametersString(descriptor.compression, descriptor.encryptionContext, additionalHeaders);
byte[] parametersBytes = parametersString.getBytes(StandardCharsets.UTF_8);
if (parametersBytes.length != (((short) parametersBytes.length) & 0xFFFF))
throw new ConfigurationException(String.format("Compression parameters too long, length %d cannot be above 65535.",
parametersBytes.length));
out.putShort((short) parametersBytes.length);
updateChecksumInt(crc, parametersBytes.length);
out.put(parametersBytes);
crc.update(parametersBytes, 0, parametersBytes.length);
}
else
assert descriptor.compression == null;
out.putInt((int) crc.getValue());
}
@VisibleForTesting
static String constructParametersString(ParameterizedClass compression, EncryptionContext encryptionContext, Map<String, String> additionalHeaders)
{
Map<String, Object> params = new TreeMap<>();
if (compression != null)
{
params.put(COMPRESSION_PARAMETERS_KEY, compression.parameters);
params.put(COMPRESSION_CLASS_KEY, compression.class_name);
}
if (encryptionContext != null)
params.putAll(encryptionContext.toHeaderParameters());
params.putAll(additionalHeaders);
return JSONValue.toJSONString(params);
}
public static CommitLogDescriptor fromHeader(File file, EncryptionContext encryptionContext)
{
try (RandomAccessFile raf = new RandomAccessFile(file, "r"))
{
assert raf.getFilePointer() == 0;
return readHeader(raf, encryptionContext);
}
catch (EOFException e)
{
throw new RuntimeException(e);
}
catch (IOException e)
{
throw new FSReadError(e, file);
}
}
public static CommitLogDescriptor readHeader(DataInput input, EncryptionContext encryptionContext) throws IOException
{
CRC32 checkcrc = new CRC32();
int version = input.readInt();
updateChecksumInt(checkcrc, version);
long id = input.readLong();
updateChecksumInt(checkcrc, (int) (id & 0xFFFFFFFFL));
updateChecksumInt(checkcrc, (int) (id >>> 32));
int parametersLength = 0;
if (version >= VERSION_22)
{
parametersLength = input.readShort() & 0xFFFF;
updateChecksumInt(checkcrc, parametersLength);
}
byte[] parametersBytes = new byte[parametersLength];
input.readFully(parametersBytes);
checkcrc.update(parametersBytes, 0, parametersBytes.length);
int crc = input.readInt();
if (crc == (int) checkcrc.getValue())
{
Map<?, ?> map = (Map<?, ?>) JSONValue.parse(new String(parametersBytes, StandardCharsets.UTF_8));
return new CommitLogDescriptor(version, id, parseCompression(map), EncryptionContext.createFromMap(map, encryptionContext));
}
return null;
}
@SuppressWarnings("unchecked")
@VisibleForTesting
static ParameterizedClass parseCompression(Map<?, ?> params)
{
if (params == null || params.isEmpty())
return null;
String className = (String) params.get(COMPRESSION_CLASS_KEY);
if (className == null)
return null;
Map<String, String> cparams = (Map<String, String>) params.get(COMPRESSION_PARAMETERS_KEY);
return new ParameterizedClass(className, cparams);
}
public static CommitLogDescriptor fromFileName(String name)
{
Matcher matcher;
if (!(matcher = COMMIT_LOG_FILE_PATTERN.matcher(name)).matches())
throw new RuntimeException("Cannot parse the version of the file: " + name);
if (matcher.group(3) == null)
throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]);
return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id, null, new EncryptionContext());
}
public int getMessagingVersion()
{
switch (version)
{
case VERSION_12:
return MessagingService.VERSION_12;
case VERSION_20:
return MessagingService.VERSION_20;
case VERSION_21:
return MessagingService.VERSION_21;
case VERSION_22:
return MessagingService.VERSION_22;
case VERSION_30:
return MessagingService.FORCE_3_0_PROTOCOL_VERSION ? MessagingService.VERSION_30 : MessagingService.VERSION_3014;
default:
throw new IllegalStateException("Unknown commitlog version " + version);
}
}
public String fileName()
{
return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION;
}
public static boolean isValid(String filename)
{
return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
}
public EncryptionContext getEncryptionContext()
{
return encryptionContext;
}
public String toString()
{
return "(" + version + "," + id + (compression != null ? "," + compression : "") + ")";
}
public boolean equals(Object that)
{
return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that);
}
public boolean equalsIgnoringCompression(CommitLogDescriptor that)
{
return this.version == that.version && this.id == that.id;
}
public boolean equals(CommitLogDescriptor that)
{
return equalsIgnoringCompression(that) && Objects.equal(this.compression, that.compression)
&& Objects.equal(encryptionContext, that.encryptionContext);
}
}