package org.apache.cassandra.schema;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.SuperColumnCompatibility;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import static java.lang.String.format;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
@SuppressWarnings("deprecation")
public final class LegacySchemaMigrator
{
private LegacySchemaMigrator()
{
}
private static final Logger logger = LoggerFactory.getLogger(LegacySchemaMigrator.class);
static final List<CFMetaData> LegacySchemaTables =
ImmutableList.of(SystemKeyspace.LegacyKeyspaces,
SystemKeyspace.LegacyColumnfamilies,
SystemKeyspace.LegacyColumns,
SystemKeyspace.LegacyTriggers,
SystemKeyspace.LegacyUsertypes,
SystemKeyspace.LegacyFunctions,
SystemKeyspace.LegacyAggregates);
public static void migrate()
{
Collection<Keyspace> keyspaces = readSchema();
if (keyspaces.isEmpty())
{
unloadLegacySchemaTables();
return;
}
logger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
keyspaces.size(),
SchemaConstants.SCHEMA_KEYSPACE_NAME);
keyspaces.forEach(LegacySchemaMigrator::storeKeyspaceInNewSchemaTables);
keyspaces.forEach(LegacySchemaMigrator::migrateBuiltIndexesForKeyspace);
SchemaKeyspace.flush();
logger.info("Truncating legacy schema tables");
truncateLegacySchemaTables();
unloadLegacySchemaTables();
logger.info("Completed migration of legacy schema tables");
}
private static void migrateBuiltIndexesForKeyspace(Keyspace keyspace)
{
keyspace.tables.forEach(LegacySchemaMigrator::migrateBuiltIndexesForTable);
}
private static void migrateBuiltIndexesForTable(Table table)
{
table.metadata.getIndexes().forEach((index) -> migrateIndexBuildStatus(table.metadata.ksName,
table.metadata.cfName,
index));
}
private static void migrateIndexBuildStatus(String keyspace, String table, IndexMetadata index)
{
if (SystemKeyspace.isIndexBuilt(keyspace, table + '.' + index.name))
{
SystemKeyspace.setIndexBuilt(keyspace, index.name);
SystemKeyspace.setIndexRemoved(keyspace, table + '.' + index.name);
}
}
static void unloadLegacySchemaTables()
{
KeyspaceMetadata systemKeyspace = Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME);
Tables systemTables = systemKeyspace.tables;
for (CFMetaData table : LegacySchemaTables)
systemTables = systemTables.without(table.cfName);
LegacySchemaTables.forEach(Schema.instance::unload);
LegacySchemaTables.forEach((cfm) -> org.apache.cassandra.db.Keyspace.openAndGetStore(cfm).invalidate());
Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
}
private static void truncateLegacySchemaTables()
{
LegacySchemaTables.forEach(table -> Schema.instance.getColumnFamilyStoreInstance(table.cfId).truncateBlocking());
}
private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace)
{
logger.info("Migrating keyspace {}", keyspace);
Mutation.SimpleBuilder builder = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, keyspace.timestamp);
for (Table table : keyspace.tables)
SchemaKeyspace.addTableToSchemaMutation(table.metadata, true, builder.timestamp(table.timestamp));
for (Type type : keyspace.types)
SchemaKeyspace.addTypeToSchemaMutation(type.metadata, builder.timestamp(type.timestamp));
for (Function function : keyspace.functions)
SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, builder.timestamp(function.timestamp));
for (Aggregate aggregate : keyspace.aggregates)
SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, builder.timestamp(aggregate.timestamp));
builder.build().apply();
}
private static Collection<Keyspace> readSchema()
{
String query = format("SELECT keyspace_name FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.LEGACY_KEYSPACES);
Collection<String> keyspaceNames = new ArrayList<>();
query(query).forEach(row -> keyspaceNames.add(row.getString("keyspace_name")));
keyspaceNames.removeAll(SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES);
Collection<Keyspace> keyspaces = new ArrayList<>();
keyspaceNames.forEach(name -> keyspaces.add(readKeyspace(name)));
return keyspaces;
}
private static Keyspace readKeyspace(String keyspaceName)
{
long timestamp = readKeyspaceTimestamp(keyspaceName);
KeyspaceParams params = readKeyspaceParams(keyspaceName);
Collection<Table> tables = readTables(keyspaceName);
Collection<Type> types = readTypes(keyspaceName);
Collection<Function> functions = readFunctions(keyspaceName);
Functions.Builder functionsBuilder = Functions.builder();
functions.forEach(udf -> functionsBuilder.add(udf.metadata));
Collection<Aggregate> aggregates = readAggregates(functionsBuilder.build(), keyspaceName);
return new Keyspace(timestamp, keyspaceName, params, tables, types, functions, aggregates);
}
private static long readKeyspaceTimestamp(String keyspaceName)
{
String query = format("SELECT writeTime(durable_writes) AS timestamp FROM %s.%s WHERE keyspace_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_KEYSPACES);
return query(query, keyspaceName).one().getLong("timestamp");
}
private static KeyspaceParams readKeyspaceParams(String keyspaceName)
{
String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_KEYSPACES);
UntypedResultSet.Row row = query(query, keyspaceName).one();
boolean durableWrites = row.getBoolean("durable_writes");
Map<String, String> replication = new HashMap<>();
replication.putAll(fromJsonMap(row.getString("strategy_options")));
replication.put(ReplicationParams.CLASS, row.getString("strategy_class"));
return KeyspaceParams.create(durableWrites, replication);
}
private static Collection<Table> readTables(String keyspaceName)
{
String query = format("SELECT columnfamily_name FROM %s.%s WHERE keyspace_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_COLUMNFAMILIES);
Collection<String> tableNames = new ArrayList<>();
query(query, keyspaceName).forEach(row -> tableNames.add(row.getString("columnfamily_name")));
Collection<Table> tables = new ArrayList<>();
tableNames.forEach(name -> tables.add(readTable(keyspaceName, name)));
return tables;
}
private static Table readTable(String keyspaceName, String tableName)
{
long timestamp = readTableTimestamp(keyspaceName, tableName);
CFMetaData metadata = readTableMetadata(keyspaceName, tableName);
return new Table(timestamp, metadata);
}
private static long readTableTimestamp(String keyspaceName, String tableName)
{
String query = format("SELECT writeTime(type) AS timestamp FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_COLUMNFAMILIES);
return query(query, keyspaceName, tableName).one().getLong("timestamp");
}
private static CFMetaData readTableMetadata(String keyspaceName, String tableName)
{
String tableQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_COLUMNFAMILIES);
UntypedResultSet.Row tableRow = query(tableQuery, keyspaceName, tableName).one();
String columnsQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_COLUMNS);
UntypedResultSet columnRows = query(columnsQuery, keyspaceName, tableName);
String triggersQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND columnfamily_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_TRIGGERS);
UntypedResultSet triggerRows = query(triggersQuery, keyspaceName, tableName);
return decodeTableMetadata(tableName, tableRow, columnRows, triggerRows);
}
private static CFMetaData decodeTableMetadata(String tableName,
UntypedResultSet.Row tableRow,
UntypedResultSet columnRows,
UntypedResultSet triggerRows)
{
String ksName = tableRow.getString("keyspace_name");
String cfName = tableRow.getString("columnfamily_name");
AbstractType<?> rawComparator = TypeParser.parse(tableRow.getString("comparator"));
AbstractType<?> subComparator = tableRow.has("subcomparator") ? TypeParser.parse(tableRow.getString("subcomparator")) : null;
boolean isSuper = "super".equals(tableRow.getString("type").toLowerCase(Locale.ENGLISH));
boolean isCompound = rawComparator instanceof CompositeType || isSuper;
Boolean rawIsDense = tableRow.has("is_dense") ? tableRow.getBoolean("is_dense") : null;
boolean isDense;
if (rawIsDense != null && !rawIsDense)
isDense = false;
else
isDense = calculateIsDense(rawComparator, columnRows, isSuper);
Iterable<UntypedResultSet.Row> filteredColumnRows = !isDense && (rawIsDense == null || rawIsDense)
? filterOutRedundantRowsForSparse(columnRows, isSuper, isCompound)
: columnRows;
AbstractType<?> defaultValidator = TypeParser.parse(tableRow.getString("default_validator"));
boolean isCounter = defaultValidator instanceof CounterColumnType;
UUID cfId = tableRow.has("cf_id")
? tableRow.getUUID("cf_id")
: CFMetaData.generateLegacyCfId(ksName, cfName);
boolean isCQLTable = !isSuper && !isDense && isCompound;
boolean isStaticCompactTable = !isDense && !isCompound;
boolean needsUpgrade = !isCQLTable && checkNeedsUpgrade(filteredColumnRows, isSuper, isStaticCompactTable);
List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(filteredColumnRows,
ksName,
cfName,
rawComparator,
subComparator,
isSuper,
isCQLTable,
isStaticCompactTable,
needsUpgrade);
if (needsUpgrade)
{
addDefinitionForUpgrade(columnDefs,
ksName,
cfName,
isStaticCompactTable,
isSuper,
rawComparator,
subComparator,
defaultValidator);
}
CFMetaData cfm = CFMetaData.create(ksName,
cfName,
cfId,
isDense,
isCompound,
isSuper,
isCounter,
false,
columnDefs,
DatabaseDescriptor.getPartitioner());
Indexes indexes = createIndexesFromColumnRows(cfm,
filteredColumnRows,
ksName,
cfName,
rawComparator,
subComparator,
isSuper,
isCQLTable,
isStaticCompactTable,
needsUpgrade);
cfm.indexes(indexes);
if (tableRow.has("dropped_columns"))
addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance));
return cfm.params(decodeTableParams(tableRow))
.triggers(createTriggersFromTriggerRows(triggerRows));
}
private static boolean calculateIsDense(AbstractType<?> comparator, UntypedResultSet columnRows, boolean isSuper)
{
for (UntypedResultSet.Row columnRow : columnRows)
{
if ("regular".equals(columnRow.getString("type")))
return false;
}
if (isSuper)
return true;
int maxClusteringIdx = -1;
for (UntypedResultSet.Row columnRow : columnRows)
if ("clustering_key".equals(columnRow.getString("type")))
maxClusteringIdx = Math.max(maxClusteringIdx, columnRow.has("component_index") ? columnRow.getInt("component_index") : 0);
return maxClusteringIdx >= 0
? maxClusteringIdx == comparator.componentsCount() - 1
: !isCQL3OnlyPKComparator(comparator);
}
private static Iterable<UntypedResultSet.Row> filterOutRedundantRowsForSparse(UntypedResultSet columnRows, boolean isSuper, boolean isCompound)
{
Collection<UntypedResultSet.Row> filteredRows = new ArrayList<>();
for (UntypedResultSet.Row columnRow : columnRows)
{
String kind = columnRow.getString("type");
if (!isSuper && "compact_value".equals(kind))
continue;
if ("clustering_key".equals(kind) && !isSuper && !isCompound)
continue;
filteredRows.add(columnRow);
}
return filteredRows;
}
private static boolean isCQL3OnlyPKComparator(AbstractType<?> comparator)
{
if (!(comparator instanceof CompositeType))
return false;
CompositeType ct = (CompositeType)comparator;
return ct.types.size() == 1 && ct.types.get(0) instanceof UTF8Type;
}
private static TableParams decodeTableParams(UntypedResultSet.Row row)
{
TableParams.Builder params = TableParams.builder();
params.readRepairChance(row.getDouble("read_repair_chance"))
.dcLocalReadRepairChance(row.getDouble("local_read_repair_chance"))
.gcGraceSeconds(row.getInt("gc_grace_seconds"));
if (row.has("comment"))
params.comment(row.getString("comment"));
if (row.has("memtable_flush_period_in_ms"))
params.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"));
params.caching(cachingFromRow(row.getString("caching")));
if (row.has("default_time_to_live"))
params.defaultTimeToLive(row.getInt("default_time_to_live"));
if (row.has("speculative_retry"))
params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")));
Map<String, String> compressionParameters = fromJsonMap(row.getString("compression_parameters"));
String crcCheckChance = compressionParameters.remove("crc_check_chance");
if (crcCheckChance != null)
params.crcCheckChance(Double.parseDouble(crcCheckChance));
params.compression(CompressionParams.fromMap(compressionParameters));
params.compaction(compactionFromRow(row));
if (row.has("min_index_interval"))
params.minIndexInterval(row.getInt("min_index_interval"));
if (row.has("max_index_interval"))
params.maxIndexInterval(row.getInt("max_index_interval"));
if (row.has("bloom_filter_fp_chance"))
params.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance"));
return params.build();
}
@VisibleForTesting
public static CachingParams cachingFromRow(String caching)
{
switch(caching)
{
case "NONE":
return CachingParams.CACHE_NOTHING;
case "KEYS_ONLY":
return CachingParams.CACHE_KEYS;
case "ROWS_ONLY":
return new CachingParams(false, Integer.MAX_VALUE);
case "ALL":
return CachingParams.CACHE_EVERYTHING;
default:
return CachingParams.fromMap(fromJsonMap(caching));
}
}
@SuppressWarnings("unchecked")
private static CompactionParams compactionFromRow(UntypedResultSet.Row row)
{
Class<? extends AbstractCompactionStrategy> klass =
CFMetaData.createCompactionStrategy(row.getString("compaction_strategy_class"));
Map<String, String> options = fromJsonMap(row.getString("compaction_strategy_options"));
int minThreshold = row.getInt("min_compaction_threshold");
int maxThreshold = row.getInt("max_compaction_threshold");
Map<String, String> optionsWithThresholds = new HashMap<>(options);
optionsWithThresholds.putIfAbsent(CompactionParams.Option.MIN_THRESHOLD.toString(), Integer.toString(minThreshold));
optionsWithThresholds.putIfAbsent(CompactionParams.Option.MAX_THRESHOLD.toString(), Integer.toString(maxThreshold));
try
{
Map<String, String> unrecognizedOptions =
(Map<String, String>) klass.getMethod("validateOptions", Map.class).invoke(null, optionsWithThresholds);
if (unrecognizedOptions.isEmpty())
options = optionsWithThresholds;
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return CompactionParams.create(klass, options);
}
private static boolean checkNeedsUpgrade(Iterable<UntypedResultSet.Row> defs, boolean isSuper, boolean isStaticCompactTable)
{
if (isSuper)
return true;
if (isStaticCompactTable)
return !hasKind(defs, ColumnDefinition.Kind.STATIC);
return !hasRegularColumns(defs);
}
private static boolean hasRegularColumns(Iterable<UntypedResultSet.Row> columnRows)
{
for (UntypedResultSet.Row row : columnRows)
{
if (isEmptyCompactValueColumn(row))
return false;
if (deserializeKind(row.getString("type")) == ColumnDefinition.Kind.REGULAR)
return true;
}
return false;
}
private static boolean isEmptyCompactValueColumn(UntypedResultSet.Row row)
{
return "compact_value".equals(row.getString("type")) && row.getString("column_name").isEmpty();
}
private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
String ksName,
String cfName,
boolean isStaticCompactTable,
boolean isSuper,
AbstractType<?> rawComparator,
AbstractType<?> subComparator,
AbstractType<?> defaultValidator)
{
CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs);
if (isSuper)
{
defs.add(ColumnDefinition.regularDef(ksName, cfName, SuperColumnCompatibility.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true)));
}
else if (isStaticCompactTable)
{
defs.add(ColumnDefinition.clusteringDef(ksName, cfName, names.defaultClusteringName(), rawComparator, 0));
defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator));
}
else
{
defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance));
}
}
private static boolean hasKind(Iterable<UntypedResultSet.Row> defs, ColumnDefinition.Kind kind)
{
for (UntypedResultSet.Row row : defs)
if (deserializeKind(row.getString("type")) == kind)
return true;
return false;
}
private static void addDroppedColumns(CFMetaData cfm, AbstractType<?> comparator, Map<String, Long> droppedTimes)
{
AbstractType<?> last = comparator.getComponents().get(comparator.componentsCount() - 1);
Map<ByteBuffer, CollectionType> collections = last instanceof ColumnToCollectionType
? ((ColumnToCollectionType) last).defined
: Collections.emptyMap();
for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
{
String name = entry.getKey();
ByteBuffer nameBytes = UTF8Type.instance.decompose(name);
long time = entry.getValue();
AbstractType<?> type = collections.containsKey(nameBytes)
? collections.get(nameBytes)
: BytesType.instance;
cfm.getDroppedColumns().put(nameBytes, new CFMetaData.DroppedColumn(name, null, type, time));
}
}
private static List<ColumnDefinition> createColumnsFromColumnRows(Iterable<UntypedResultSet.Row> rows,
String keyspace,
String table,
AbstractType<?> rawComparator,
AbstractType<?> rawSubComparator,
boolean isSuper,
boolean isCQLTable,
boolean isStaticCompactTable,
boolean needsUpgrade)
{
List<ColumnDefinition> columns = new ArrayList<>();
for (UntypedResultSet.Row row : rows)
{
if (isEmptyCompactValueColumn(row))
continue;
columns.add(createColumnFromColumnRow(row,
keyspace,
table,
rawComparator,
rawSubComparator,
isSuper,
isCQLTable,
isStaticCompactTable,
needsUpgrade));
}
return columns;
}
private static ColumnDefinition createColumnFromColumnRow(UntypedResultSet.Row row,
String keyspace,
String table,
AbstractType<?> rawComparator,
AbstractType<?> rawSubComparator,
boolean isSuper,
boolean isCQLTable,
boolean isStaticCompactTable,
boolean needsUpgrade)
{
String rawKind = row.getString("type");
ColumnDefinition.Kind kind = deserializeKind(rawKind);
if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR)
kind = ColumnDefinition.Kind.STATIC;
int componentIndex = ColumnDefinition.NO_POSITION;
if (kind.isPrimaryKeyKind())
componentIndex = row.has("component_index") ? row.getInt("component_index") : 0;
AbstractType<?> comparator = isCQLTable
? UTF8Type.instance
: CompactTables.columnDefinitionComparator(rawKind, isSuper, rawComparator, rawSubComparator);
ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
AbstractType<?> validator = parseType(row.getString("validator"));
if (validator.isUDT() && validator.isMultiCell())
validator = validator.freeze();
else
validator = validator.freezeNestedMulticellTypes();
return new ColumnDefinition(keyspace, table, name, validator, componentIndex, kind);
}
private static Indexes createIndexesFromColumnRows(CFMetaData cfm,
Iterable<UntypedResultSet.Row> rows,
String keyspace,
String table,
AbstractType<?> rawComparator,
AbstractType<?> rawSubComparator,
boolean isSuper,
boolean isCQLTable,
boolean isStaticCompactTable,
boolean needsUpgrade)
{
Indexes.Builder indexes = Indexes.builder();
for (UntypedResultSet.Row row : rows)
{
IndexMetadata.Kind kind = null;
if (row.has("index_type"))
kind = IndexMetadata.Kind.valueOf(row.getString("index_type"));
if (kind == null)
continue;
Map<String, String> indexOptions = null;
if (row.has("index_options"))
indexOptions = fromJsonMap(row.getString("index_options"));
if (row.has("index_name"))
{
String indexName = row.getString("index_name");
ColumnDefinition column = createColumnFromColumnRow(row,
keyspace,
table,
rawComparator,
rawSubComparator,
isSuper,
isCQLTable,
isStaticCompactTable,
needsUpgrade);
indexes.add(IndexMetadata.fromLegacyMetadata(cfm, column, indexName, kind, indexOptions));
}
else
{
logger.error("Failed to find index name for legacy migration of index on {}.{}", keyspace, table);
}
}
return indexes.build();
}
private static ColumnDefinition.Kind deserializeKind(String kind)
{
if ("clustering_key".equalsIgnoreCase(kind))
return ColumnDefinition.Kind.CLUSTERING;
if ("compact_value".equalsIgnoreCase(kind))
return ColumnDefinition.Kind.REGULAR;
return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
}
private static Triggers createTriggersFromTriggerRows(UntypedResultSet rows)
{
Triggers.Builder triggers = org.apache.cassandra.schema.Triggers.builder();
rows.forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
return triggers.build();
}
private static TriggerMetadata createTriggerFromTriggerRow(UntypedResultSet.Row row)
{
String name = row.getString("trigger_name");
String classOption = row.getTextMap("trigger_options").get("class");
return new TriggerMetadata(name, classOption);
}
private static Collection<Type> readTypes(String keyspaceName)
{
String query = format("SELECT type_name FROM %s.%s WHERE keyspace_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_USERTYPES);
Collection<String> typeNames = new ArrayList<>();
query(query, keyspaceName).forEach(row -> typeNames.add(row.getString("type_name")));
Collection<Type> types = new ArrayList<>();
typeNames.forEach(name -> types.add(readType(keyspaceName, name)));
return types;
}
private static Type readType(String keyspaceName, String typeName)
{
long timestamp = readTypeTimestamp(keyspaceName, typeName);
UserType metadata = readTypeMetadata(keyspaceName, typeName);
return new Type(timestamp, metadata);
}
private static long readTypeTimestamp(String keyspaceName, String typeName)
{
ColumnFamilyStore store = org.apache.cassandra.db.Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME)
.getColumnFamilyStore(SystemKeyspace.LEGACY_USERTYPES);
ClusteringComparator comparator = store.metadata.comparator;
Slices slices = Slices.with(comparator, Slice.make(comparator, typeName));
int nowInSec = FBUtilities.nowInSeconds();
DecoratedKey key = store.metadata.decorateKey(AsciiType.instance.fromString(keyspaceName));
SinglePartitionReadCommand command = SinglePartitionReadCommand.create(store.metadata, nowInSec, key, slices);
try (ReadExecutionController controller = command.executionController();
RowIterator partition = UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store, controller), nowInSec))
{
return partition.next().primaryKeyLivenessInfo().timestamp();
}
}
private static UserType readTypeMetadata(String keyspaceName, String typeName)
{
String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND type_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_USERTYPES);
UntypedResultSet.Row row = query(query, keyspaceName, typeName).one();
List<FieldIdentifier> names =
row.getList("field_names", UTF8Type.instance)
.stream()
.map(t -> FieldIdentifier.forInternalString(t))
.collect(Collectors.toList());
List<AbstractType<?>> types =
row.getList("field_types", UTF8Type.instance)
.stream()
.map(LegacySchemaMigrator::parseType)
.collect(Collectors.toList());
return new UserType(keyspaceName, bytes(typeName), names, types, true);
}
private static Collection<Function> readFunctions(String keyspaceName)
{
String query = format("SELECT function_name, signature FROM %s.%s WHERE keyspace_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_FUNCTIONS);
HashMultimap<String, List<String>> functionSignatures = HashMultimap.create();
query(query, keyspaceName).forEach(row -> functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance)));
Collection<Function> functions = new ArrayList<>();
functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue())));
return functions;
}
private static Function readFunction(String keyspaceName, String functionName, List<String> signature)
{
long timestamp = readFunctionTimestamp(keyspaceName, functionName, signature);
UDFunction metadata = readFunctionMetadata(keyspaceName, functionName, signature);
return new Function(timestamp, metadata);
}
private static long readFunctionTimestamp(String keyspaceName, String functionName, List<String> signature)
{
String query = format("SELECT writeTime(return_type) AS timestamp " +
"FROM %s.%s " +
"WHERE keyspace_name = ? AND function_name = ? AND signature = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_FUNCTIONS);
return query(query, keyspaceName, functionName, signature).one().getLong("timestamp");
}
private static UDFunction readFunctionMetadata(String keyspaceName, String functionName, List<String> signature)
{
String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND function_name = ? AND signature = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_FUNCTIONS);
UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
FunctionName name = new FunctionName(keyspaceName, functionName);
List<ColumnIdentifier> argNames = new ArrayList<>();
if (row.has("argument_names"))
for (String arg : row.getList("argument_names", UTF8Type.instance))
argNames.add(new ColumnIdentifier(arg, true));
List<AbstractType<?>> argTypes = new ArrayList<>();
if (row.has("argument_types"))
for (String type : row.getList("argument_types", UTF8Type.instance))
argTypes.add(parseType(type));
AbstractType<?> returnType = parseType(row.getString("return_type"));
String language = row.getString("language");
String body = row.getString("body");
boolean calledOnNullInput = row.getBoolean("called_on_null_input");
try
{
return UDFunction.create(name, argNames, argTypes, returnType, calledOnNullInput, language, body);
}
catch (InvalidRequestException e)
{
return UDFunction.createBrokenFunction(name, argNames, argTypes, returnType, calledOnNullInput, language, body, e);
}
}
private static Collection<Aggregate> readAggregates(Functions functions, String keyspaceName)
{
String query = format("SELECT aggregate_name, signature FROM %s.%s WHERE keyspace_name = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_AGGREGATES);
HashMultimap<String, List<String>> aggregateSignatures = HashMultimap.create();
query(query, keyspaceName).forEach(row -> aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance)));
Collection<Aggregate> aggregates = new ArrayList<>();
aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(functions, keyspaceName, pair.getKey(), pair.getValue())));
return aggregates;
}
private static Aggregate readAggregate(Functions functions, String keyspaceName, String aggregateName, List<String> signature)
{
long timestamp = readAggregateTimestamp(keyspaceName, aggregateName, signature);
UDAggregate metadata = readAggregateMetadata(functions, keyspaceName, aggregateName, signature);
return new Aggregate(timestamp, metadata);
}
private static long readAggregateTimestamp(String keyspaceName, String aggregateName, List<String> signature)
{
String query = format("SELECT writeTime(return_type) AS timestamp " +
"FROM %s.%s " +
"WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_AGGREGATES);
return query(query, keyspaceName, aggregateName, signature).one().getLong("timestamp");
}
private static UDAggregate readAggregateMetadata(Functions functions, String keyspaceName, String functionName, List<String> signature)
{
String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND aggregate_name = ? AND signature = ?",
SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.LEGACY_AGGREGATES);
UntypedResultSet.Row row = query(query, keyspaceName, functionName, signature).one();
FunctionName name = new FunctionName(keyspaceName, functionName);
List<String> types = row.getList("argument_types", UTF8Type.instance);
List<AbstractType<?>> argTypes = new ArrayList<>();
if (types != null)
{
argTypes = new ArrayList<>(types.size());
for (String type : types)
argTypes.add(parseType(type));
}
AbstractType<?> returnType = parseType(row.getString("return_type"));
FunctionName stateFunc = new FunctionName(keyspaceName, row.getString("state_func"));
AbstractType<?> stateType = parseType(row.getString("state_type"));
FunctionName finalFunc = row.has("final_func") ? new FunctionName(keyspaceName, row.getString("final_func")) : null;
ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") : null;
try
{
return UDAggregate.create(functions, name, argTypes, returnType, stateFunc, finalFunc, stateType, initcond);
}
catch (InvalidRequestException reason)
{
return UDAggregate.createBroken(name, argTypes, returnType, initcond, reason);
}
}
private static UntypedResultSet query(String query, Object... values)
{
return QueryProcessor.executeOnceInternal(query, values);
}
private static AbstractType<?> parseType(String str)
{
return TypeParser.parse(str);
}
private static final class Keyspace
{
final long timestamp;
final String name;
final KeyspaceParams params;
final Collection<Table> tables;
final Collection<Type> types;
final Collection<Function> functions;
final Collection<Aggregate> aggregates;
Keyspace(long timestamp,
String name,
KeyspaceParams params,
Collection<Table> tables,
Collection<Type> types,
Collection<Function> functions,
Collection<Aggregate> aggregates)
{
this.timestamp = timestamp;
this.name = name;
this.params = params;
this.tables = tables;
this.types = types;
this.functions = functions;
this.aggregates = aggregates;
}
}
private static final class Table
{
final long timestamp;
final CFMetaData metadata;
Table(long timestamp, CFMetaData metadata)
{
this.timestamp = timestamp;
this.metadata = metadata;
}
}
private static final class Type
{
final long timestamp;
final UserType metadata;
Type(long timestamp, UserType metadata)
{
this.timestamp = timestamp;
this.metadata = metadata;
}
}
private static final class Function
{
final long timestamp;
final UDFunction metadata;
Function(long timestamp, UDFunction metadata)
{
this.timestamp = timestamp;
this.metadata = metadata;
}
}
private static final class Aggregate
{
final long timestamp;
final UDAggregate metadata;
Aggregate(long timestamp, UDAggregate metadata)
{
this.timestamp = timestamp;
this.metadata = metadata;
}
}
}