package org.apache.cassandra.auth;
import java.util.*;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.serializers.SetSerializer;
import org.apache.cassandra.serializers.UTF8Serializer;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
public class CassandraAuthorizer implements IAuthorizer
{
private static final Logger logger = LoggerFactory.getLogger(CassandraAuthorizer.class);
private static final String ROLE = "role";
private static final String RESOURCE = "resource";
private static final String PERMISSIONS = "permissions";
public static final String USERNAME = "username";
public static final String USER_PERMISSIONS = "permissions";
private SelectStatement authorizeRoleStatement;
private SelectStatement legacyAuthorizeRoleStatement;
public CassandraAuthorizer()
{
}
public Set<Permission> authorize(AuthenticatedUser user, IResource resource)
{
try
{
if (user.isSuper())
return resource.applicablePermissions();
Set<Permission> permissions = EnumSet.noneOf(Permission.class);
for (RoleResource role: user.getRoles())
addPermissionsForRole(permissions, resource, role);
return permissions;
}
catch (RequestExecutionException | RequestValidationException e)
{
logger.debug("Failed to authorize {} for {}", user, resource);
throw new UnauthorizedException("Unable to perform authorization of permissions: " + e.getMessage(), e);
}
}
public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee)
throws RequestValidationException, RequestExecutionException
{
modifyRolePermissions(permissions, resource, grantee, "+");
addLookupEntry(resource, grantee);
}
public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource revokee)
throws RequestValidationException, RequestExecutionException
{
modifyRolePermissions(permissions, resource, revokee, "-");
removeLookupEntry(resource, revokee);
}
public void revokeAllFrom(RoleResource revokee)
{
try
{
UntypedResultSet rows = process(String.format("SELECT resource FROM %s.%s WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_PERMISSIONS,
escape(revokee.getRoleName())));
List<CQLStatement> statements = new ArrayList<>();
for (UntypedResultSet.Row row : rows)
{
statements.add(
QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s' AND role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.RESOURCE_ROLE_INDEX,
escape(row.getString("resource")),
escape(revokee.getRoleName())),
ClientState.forInternalCalls()).statement);
}
statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_PERMISSIONS,
escape(revokee.getRoleName())),
ClientState.forInternalCalls()).statement);
executeLoggedBatch(statements);
}
catch (RequestExecutionException | RequestValidationException e)
{
logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", revokee.getRoleName(), e);
}
}
public void revokeAllOn(IResource droppedResource)
{
try
{
UntypedResultSet rows = process(String.format("SELECT role FROM %s.%s WHERE resource = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.RESOURCE_ROLE_INDEX,
escape(droppedResource.getName())));
List<CQLStatement> statements = new ArrayList<>();
for (UntypedResultSet.Row row : rows)
{
statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s' AND resource = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_PERMISSIONS,
escape(row.getString("role")),
escape(droppedResource.getName())),
ClientState.forInternalCalls()).statement);
}
statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.RESOURCE_ROLE_INDEX,
escape(droppedResource.getName())),
ClientState.forInternalCalls()).statement);
executeLoggedBatch(statements);
}
catch (RequestExecutionException | RequestValidationException e)
{
logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", droppedResource, e);
return;
}
}
private void executeLoggedBatch(List<CQLStatement> statements)
throws RequestExecutionException, RequestValidationException
{
BatchStatement batch = new BatchStatement(0,
BatchStatement.Type.LOGGED,
Lists.newArrayList(Iterables.filter(statements, ModificationStatement.class)),
Attributes.none());
QueryProcessor.instance.processBatch(batch,
QueryState.forInternalCalls(),
BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT),
System.nanoTime());
}
private void addPermissionsForRole(Set<Permission> permissions, IResource resource, RoleResource role)
throws RequestExecutionException, RequestValidationException
{
QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE,
Lists.newArrayList(ByteBufferUtil.bytes(role.getRoleName()),
ByteBufferUtil.bytes(resource.getName())));
SelectStatement statement;
if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, USER_PERMISSIONS) == null)
statement = authorizeRoleStatement;
else
{
if (legacyAuthorizeRoleStatement == null)
legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS);
statement = legacyAuthorizeRoleStatement;
}
ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
UntypedResultSet result = UntypedResultSet.create(rows.result);
if (!result.isEmpty() && result.one().has(PERMISSIONS))
{
for (String perm : result.one().getSet(PERMISSIONS, UTF8Type.instance))
{
permissions.add(Permission.valueOf(perm));
}
}
}
private void modifyRolePermissions(Set<Permission> permissions, IResource resource, RoleResource role, String op)
throws RequestExecutionException
{
process(String.format("UPDATE %s.%s SET permissions = permissions %s {%s} WHERE role = '%s' AND resource = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_PERMISSIONS,
op,
"'" + StringUtils.join(permissions, "','") + "'",
escape(role.getRoleName()),
escape(resource.getName())));
}
private void removeLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException
{
process(String.format("DELETE FROM %s.%s WHERE resource = '%s' and role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.RESOURCE_ROLE_INDEX,
escape(resource.getName()),
escape(role.getRoleName())));
}
private void addLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException
{
process(String.format("INSERT INTO %s.%s (resource, role) VALUES ('%s','%s')",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.RESOURCE_ROLE_INDEX,
escape(resource.getName()),
escape(role.getRoleName())));
}
public Set<PermissionDetails> list(AuthenticatedUser performer,
Set<Permission> permissions,
IResource resource,
RoleResource grantee)
throws RequestValidationException, RequestExecutionException
{
if (!(performer.isSuper() || performer.isSystem()) && !performer.getRoles().contains(grantee))
throw new UnauthorizedException(String.format("You are not authorized to view %s's permissions",
grantee == null ? "everyone" : grantee.getRoleName()));
if (null == grantee)
return listPermissionsForRole(permissions, resource, grantee);
Set<RoleResource> roles = DatabaseDescriptor.getRoleManager().getRoles(grantee, true);
Set<PermissionDetails> details = new HashSet<>();
for (RoleResource role : roles)
details.addAll(listPermissionsForRole(permissions, resource, role));
return details;
}
private Set<PermissionDetails> listPermissionsForRole(Set<Permission> permissions,
IResource resource,
RoleResource role)
throws RequestExecutionException
{
Set<PermissionDetails> details = new HashSet<>();
boolean useLegacyTable = Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, USER_PERMISSIONS) != null;
String entityColumnName = useLegacyTable ? USERNAME : ROLE;
for (UntypedResultSet.Row row : process(buildListQuery(resource, role, useLegacyTable)))
{
if (row.has(PERMISSIONS))
{
for (String p : row.getSet(PERMISSIONS, UTF8Type.instance))
{
Permission permission = Permission.valueOf(p);
if (permissions.contains(permission))
details.add(new PermissionDetails(row.getString(entityColumnName),
Resources.fromName(row.getString(RESOURCE)),
permission));
}
}
}
return details;
}
private String buildListQuery(IResource resource, RoleResource grantee, boolean useLegacyTable)
{
String tableName = useLegacyTable ? USER_PERMISSIONS : AuthKeyspace.ROLE_PERMISSIONS;
String entityName = useLegacyTable ? USERNAME : ROLE;
List<String> vars = Lists.newArrayList(SchemaConstants.AUTH_KEYSPACE_NAME, tableName);
List<String> conditions = new ArrayList<>();
if (resource != null)
{
conditions.add("resource = '%s'");
vars.add(escape(resource.getName()));
}
if (grantee != null)
{
conditions.add(entityName + " = '%s'");
vars.add(escape(grantee.getRoleName()));
}
String query = "SELECT " + entityName + ", resource, permissions FROM %s.%s";
if (!conditions.isEmpty())
query += " WHERE " + StringUtils.join(conditions, " AND ");
if (resource != null && grantee == null)
query += " ALLOW FILTERING";
return String.format(query, vars.toArray());
}
public Set<DataResource> protectedResources()
{
return ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS));
}
public void validateConfiguration() throws ConfigurationException
{
}
public void setup()
{
authorizeRoleStatement = prepare(ROLE, AuthKeyspace.ROLE_PERMISSIONS);
if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, "permissions") != null)
{
legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS);
ScheduledExecutors.optionalTasks.schedule(new Runnable()
{
public void run()
{
convertLegacyData();
}
}, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
}
}
private SelectStatement prepare(String entityname, String permissionsTable)
{
String query = String.format("SELECT permissions FROM %s.%s WHERE %s = ? AND resource = ?",
SchemaConstants.AUTH_KEYSPACE_NAME,
permissionsTable,
entityname);
return (SelectStatement) QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement;
}
private void convertLegacyData()
{
try
{
if (Schema.instance.getCFMetaData("system_auth", "permissions") != null)
{
logger.info("Converting legacy permissions data");
CQLStatement insertStatement =
QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (role, resource, permissions) " +
"VALUES (?, ?, ?)",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_PERMISSIONS),
ClientState.forInternalCalls()).statement;
CQLStatement indexStatement =
QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (resource, role) VALUES (?,?)",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.RESOURCE_ROLE_INDEX),
ClientState.forInternalCalls()).statement;
UntypedResultSet permissions = process("SELECT * FROM system_auth.permissions");
for (UntypedResultSet.Row row : permissions)
{
final IResource resource = Resources.fromName(row.getString("resource"));
Predicate<String> isApplicable = new Predicate<String>()
{
public boolean apply(String s)
{
return resource.applicablePermissions().contains(Permission.valueOf(s));
}
};
SetSerializer<String> serializer = SetSerializer.getInstance(UTF8Serializer.instance, UTF8Type.instance);
Set<String> originalPerms = serializer.deserialize(row.getBytes("permissions"));
Set<String> filteredPerms = ImmutableSet.copyOf(Iterables.filter(originalPerms, isApplicable));
insertStatement.execute(QueryState.forInternalCalls(),
QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
Lists.newArrayList(row.getBytes("username"),
row.getBytes("resource"),
serializer.serialize(filteredPerms))),
System.nanoTime());
indexStatement.execute(QueryState.forInternalCalls(),
QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
Lists.newArrayList(row.getBytes("resource"),
row.getBytes("username"))),
System.nanoTime());
}
logger.info("Completed conversion of legacy permissions");
}
}
catch (Exception e)
{
logger.info("Unable to complete conversion of legacy permissions data (perhaps not enough nodes are upgraded yet). " +
"Conversion should not be considered complete");
logger.trace("Conversion error", e);
}
}
private String escape(String name)
{
return StringUtils.replace(name, "'", "''");
}
private UntypedResultSet process(String query) throws RequestExecutionException
{
return QueryProcessor.process(query, ConsistencyLevel.LOCAL_ONE);
}
}