 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.cassandra.auth;

import java.util.*;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.serializers.SetSerializer;
import org.apache.cassandra.serializers.UTF8Serializer;
import org.apache.cassandra.service.ClientState;

import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;

CassandraAuthorizer is an IAuthorizer implementation that keeps user permissions internally in C* using the system_auth.role_permissions table.
public class CassandraAuthorizer implements IAuthorizer { private static final Logger logger = LoggerFactory.getLogger(CassandraAuthorizer.class); private static final String ROLE = "role"; private static final String RESOURCE = "resource"; private static final String PERMISSIONS = "permissions"; // used during upgrades to perform authz on mixed clusters public static final String USERNAME = "username"; public static final String USER_PERMISSIONS = "permissions"; private SelectStatement authorizeRoleStatement; private SelectStatement legacyAuthorizeRoleStatement; public CassandraAuthorizer() { } // Returns every permission on the resource granted to the user either directly // or indirectly via roles granted to the user. public Set<Permission> authorize(AuthenticatedUser user, IResource resource) { try { if (user.isSuper()) return resource.applicablePermissions(); Set<Permission> permissions = EnumSet.noneOf(Permission.class); for (RoleResource role: user.getRoles()) addPermissionsForRole(permissions, resource, role); return permissions; } catch (RequestExecutionException | RequestValidationException e) { logger.debug("Failed to authorize {} for {}", user, resource); throw new UnauthorizedException("Unable to perform authorization of permissions: " + e.getMessage(), e); } } public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee) throws RequestValidationException, RequestExecutionException { modifyRolePermissions(permissions, resource, grantee, "+"); addLookupEntry(resource, grantee); } public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource revokee) throws RequestValidationException, RequestExecutionException { modifyRolePermissions(permissions, resource, revokee, "-"); removeLookupEntry(resource, revokee); } // Called when deleting a role with DROP ROLE query. // Internal hook, so no permission checks are needed here. // Executes a logged batch removing the granted premissions // for the role as well as the entries from the reverse index // table public void revokeAllFrom(RoleResource revokee) { try { UntypedResultSet rows = process(String.format("SELECT resource FROM %s.%s WHERE role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, escape(revokee.getRoleName()))); List<CQLStatement> statements = new ArrayList<>(); for (UntypedResultSet.Row row : rows) { statements.add( QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s' AND role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(row.getString("resource")), escape(revokee.getRoleName())), ClientState.forInternalCalls()).statement); } statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, escape(revokee.getRoleName())), ClientState.forInternalCalls()).statement); executeLoggedBatch(statements); } catch (RequestExecutionException | RequestValidationException e) { logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", revokee.getRoleName(), e); } } // Called after a resource is removed (DROP KEYSPACE, DROP TABLE, etc.). // Execute a logged batch removing all the permissions for the resource // as well as the index table entry public void revokeAllOn(IResource droppedResource) { try { UntypedResultSet rows = process(String.format("SELECT role FROM %s.%s WHERE resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(droppedResource.getName()))); List<CQLStatement> statements = new ArrayList<>(); for (UntypedResultSet.Row row : rows) { statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s' AND resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, escape(row.getString("role")), escape(droppedResource.getName())), ClientState.forInternalCalls()).statement); } statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(droppedResource.getName())), ClientState.forInternalCalls()).statement); executeLoggedBatch(statements); } catch (RequestExecutionException | RequestValidationException e) { logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", droppedResource, e); return; } } private void executeLoggedBatch(List<CQLStatement> statements) throws RequestExecutionException, RequestValidationException { BatchStatement batch = new BatchStatement(0, BatchStatement.Type.LOGGED, Lists.newArrayList(Iterables.filter(statements, ModificationStatement.class)), Attributes.none()); QueryProcessor.instance.processBatch(batch, QueryState.forInternalCalls(), BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT), System.nanoTime()); } // Add every permission on the resource granted to the role private void addPermissionsForRole(Set<Permission> permissions, IResource resource, RoleResource role) throws RequestExecutionException, RequestValidationException { QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE, Lists.newArrayList(ByteBufferUtil.bytes(role.getRoleName()), ByteBufferUtil.bytes(resource.getName()))); SelectStatement statement; // If it exists, read from the legacy user permissions table to handle the case where the cluster // is being upgraded and so is running with mixed versions of the authz schema if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, USER_PERMISSIONS) == null) statement = authorizeRoleStatement; else { // If the permissions table was initialised only after the statement got prepared, re-prepare (CASSANDRA-12813) if (legacyAuthorizeRoleStatement == null) legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS); statement = legacyAuthorizeRoleStatement; } ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime()); UntypedResultSet result = UntypedResultSet.create(rows.result); if (!result.isEmpty() && result.one().has(PERMISSIONS)) { for (String perm : result.one().getSet(PERMISSIONS, UTF8Type.instance)) { permissions.add(Permission.valueOf(perm)); } } } // Adds or removes permissions from a role_permissions table (adds if op is "+", removes if op is "-") private void modifyRolePermissions(Set<Permission> permissions, IResource resource, RoleResource role, String op) throws RequestExecutionException { process(String.format("UPDATE %s.%s SET permissions = permissions %s {%s} WHERE role = '%s' AND resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, op, "'" + StringUtils.join(permissions, "','") + "'", escape(role.getRoleName()), escape(resource.getName()))); } // Removes an entry from the inverted index table (from resource -> role with defined permissions) private void removeLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException { process(String.format("DELETE FROM %s.%s WHERE resource = '%s' and role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(resource.getName()), escape(role.getRoleName()))); } // Adds an entry to the inverted index table (from resource -> role with defined permissions) private void addLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException { process(String.format("INSERT INTO %s.%s (resource, role) VALUES ('%s','%s')", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(resource.getName()), escape(role.getRoleName()))); } // 'of' can be null - in that case everyone's permissions have been requested. Otherwise only single user's. // If the user requesting 'LIST PERMISSIONS' is not a superuser OR their username doesn't match 'of', we // throw UnauthorizedException. So only a superuser can view everybody's permissions. Regular users are only // allowed to see their own permissions. public Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee) throws RequestValidationException, RequestExecutionException { if (!(performer.isSuper() || performer.isSystem()) && !performer.getRoles().contains(grantee)) throw new UnauthorizedException(String.format("You are not authorized to view %s's permissions", grantee == null ? "everyone" : grantee.getRoleName())); if (null == grantee) return listPermissionsForRole(permissions, resource, grantee); Set<RoleResource> roles = DatabaseDescriptor.getRoleManager().getRoles(grantee, true); Set<PermissionDetails> details = new HashSet<>(); for (RoleResource role : roles) details.addAll(listPermissionsForRole(permissions, resource, role)); return details; } private Set<PermissionDetails> listPermissionsForRole(Set<Permission> permissions, IResource resource, RoleResource role) throws RequestExecutionException { Set<PermissionDetails> details = new HashSet<>(); // If it exists, try the legacy user permissions table first. This is to handle the case // where the cluster is being upgraded and so is running with mixed versions of the perms table boolean useLegacyTable = Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, USER_PERMISSIONS) != null; String entityColumnName = useLegacyTable ? USERNAME : ROLE; for (UntypedResultSet.Row row : process(buildListQuery(resource, role, useLegacyTable))) { if (row.has(PERMISSIONS)) { for (String p : row.getSet(PERMISSIONS, UTF8Type.instance)) { Permission permission = Permission.valueOf(p); if (permissions.contains(permission)) details.add(new PermissionDetails(row.getString(entityColumnName), Resources.fromName(row.getString(RESOURCE)), permission)); } } } return details; } private String buildListQuery(IResource resource, RoleResource grantee, boolean useLegacyTable) { String tableName = useLegacyTable ? USER_PERMISSIONS : AuthKeyspace.ROLE_PERMISSIONS; String entityName = useLegacyTable ? USERNAME : ROLE; List<String> vars = Lists.newArrayList(SchemaConstants.AUTH_KEYSPACE_NAME, tableName); List<String> conditions = new ArrayList<>(); if (resource != null) { conditions.add("resource = '%s'"); vars.add(escape(resource.getName())); } if (grantee != null) { conditions.add(entityName + " = '%s'"); vars.add(escape(grantee.getRoleName())); } String query = "SELECT " + entityName + ", resource, permissions FROM %s.%s"; if (!conditions.isEmpty()) query += " WHERE " + StringUtils.join(conditions, " AND "); if (resource != null && grantee == null) query += " ALLOW FILTERING"; return String.format(query, vars.toArray()); } public Set<DataResource> protectedResources() { return ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS)); } public void validateConfiguration() throws ConfigurationException { } public void setup() { authorizeRoleStatement = prepare(ROLE, AuthKeyspace.ROLE_PERMISSIONS); // If old user permissions table exists, migrate the legacy authz data to the new table // The delay is to give the node a chance to see its peers before attempting the conversion if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, "permissions") != null) { legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS); ScheduledExecutors.optionalTasks.schedule(new Runnable() { public void run() { convertLegacyData(); } }, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS); } } private SelectStatement prepare(String entityname, String permissionsTable) { String query = String.format("SELECT permissions FROM %s.%s WHERE %s = ? AND resource = ?", SchemaConstants.AUTH_KEYSPACE_NAME, permissionsTable, entityname); return (SelectStatement) QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement; }
/** * Copy legacy authz data from the system_auth.permissions table to the new system_auth.role_permissions table and * also insert entries into the reverse lookup table. * In theory, we could simply rename the existing table as the schema is structurally the same, but this would * break mixed clusters during a rolling upgrade. * This setup is not performed if AllowAllAuthenticator is configured (see Auth#setup). */
private void convertLegacyData() { try { if (Schema.instance.getCFMetaData("system_auth", "permissions") != null) { logger.info("Converting legacy permissions data"); CQLStatement insertStatement = QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (role, resource, permissions) " + "VALUES (?, ?, ?)", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS), ClientState.forInternalCalls()).statement; CQLStatement indexStatement = QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (resource, role) VALUES (?,?)", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX), ClientState.forInternalCalls()).statement; UntypedResultSet permissions = process("SELECT * FROM system_auth.permissions"); for (UntypedResultSet.Row row : permissions) { final IResource resource = Resources.fromName(row.getString("resource")); Predicate<String> isApplicable = new Predicate<String>() { public boolean apply(String s) { return resource.applicablePermissions().contains(Permission.valueOf(s)); } }; SetSerializer<String> serializer = SetSerializer.getInstance(UTF8Serializer.instance, UTF8Type.instance); Set<String> originalPerms = serializer.deserialize(row.getBytes("permissions")); Set<String> filteredPerms = ImmutableSet.copyOf(Iterables.filter(originalPerms, isApplicable)); insertStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(row.getBytes("username"), row.getBytes("resource"), serializer.serialize(filteredPerms))), System.nanoTime()); indexStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(row.getBytes("resource"), row.getBytes("username"))), System.nanoTime()); } logger.info("Completed conversion of legacy permissions"); } } catch (Exception e) { logger.info("Unable to complete conversion of legacy permissions data (perhaps not enough nodes are upgraded yet). " + "Conversion should not be considered complete"); logger.trace("Conversion error", e); } } // We only worry about one character ('). Make sure it's properly escaped. private String escape(String name) { return StringUtils.replace(name, "'", "''"); } private UntypedResultSet process(String query) throws RequestExecutionException { return QueryProcessor.process(query, ConsistencyLevel.LOCAL_ONE); } }