/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.auth;

import java.util.*;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.serializers.SetSerializer;
import org.apache.cassandra.serializers.UTF8Serializer;
import org.apache.cassandra.service.ClientState;

import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;

CassandraAuthorizer is an IAuthorizer implementation that keeps user permissions internally in C* using the system_auth.role_permissions table.
/** * CassandraAuthorizer is an IAuthorizer implementation that keeps * user permissions internally in C* using the system_auth.role_permissions * table. */
public class CassandraAuthorizer implements IAuthorizer { private static final Logger logger = LoggerFactory.getLogger(CassandraAuthorizer.class); private static final String ROLE = "role"; private static final String RESOURCE = "resource"; private static final String PERMISSIONS = "permissions"; // used during upgrades to perform authz on mixed clusters public static final String USERNAME = "username"; public static final String USER_PERMISSIONS = "permissions"; private SelectStatement authorizeRoleStatement; private SelectStatement legacyAuthorizeRoleStatement; public CassandraAuthorizer() { } // Returns every permission on the resource granted to the user either directly // or indirectly via roles granted to the user. public Set<Permission> authorize(AuthenticatedUser user, IResource resource) { try { if (user.isSuper()) return resource.applicablePermissions(); Set<Permission> permissions = EnumSet.noneOf(Permission.class); for (RoleResource role: user.getRoles()) addPermissionsForRole(permissions, resource, role); return permissions; } catch (RequestExecutionException | RequestValidationException e) { logger.debug("Failed to authorize {} for {}", user, resource); throw new UnauthorizedException("Unable to perform authorization of permissions: " + e.getMessage(), e); } } public void grant(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee) throws RequestValidationException, RequestExecutionException { modifyRolePermissions(permissions, resource, grantee, "+"); addLookupEntry(resource, grantee); } public void revoke(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource revokee) throws RequestValidationException, RequestExecutionException { modifyRolePermissions(permissions, resource, revokee, "-"); removeLookupEntry(resource, revokee); } // Called when deleting a role with DROP ROLE query. // Internal hook, so no permission checks are needed here. // Executes a logged batch removing the granted premissions // for the role as well as the entries from the reverse index // table public void revokeAllFrom(RoleResource revokee) { try { UntypedResultSet rows = process(String.format("SELECT resource FROM %s.%s WHERE role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, escape(revokee.getRoleName()))); List<CQLStatement> statements = new ArrayList<>(); for (UntypedResultSet.Row row : rows) { statements.add( QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s' AND role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(row.getString("resource")), escape(revokee.getRoleName())), ClientState.forInternalCalls()).statement); } statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, escape(revokee.getRoleName())), ClientState.forInternalCalls()).statement); executeLoggedBatch(statements); } catch (RequestExecutionException | RequestValidationException e) { logger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", revokee.getRoleName(), e); } } // Called after a resource is removed (DROP KEYSPACE, DROP TABLE, etc.). // Execute a logged batch removing all the permissions for the resource // as well as the index table entry public void revokeAllOn(IResource droppedResource) { try { UntypedResultSet rows = process(String.format("SELECT role FROM %s.%s WHERE resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(droppedResource.getName()))); List<CQLStatement> statements = new ArrayList<>(); for (UntypedResultSet.Row row : rows) { statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE role = '%s' AND resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, escape(row.getString("role")), escape(droppedResource.getName())), ClientState.forInternalCalls()).statement); } statements.add(QueryProcessor.getStatement(String.format("DELETE FROM %s.%s WHERE resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(droppedResource.getName())), ClientState.forInternalCalls()).statement); executeLoggedBatch(statements); } catch (RequestExecutionException | RequestValidationException e) { logger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", droppedResource, e); return; } } private void executeLoggedBatch(List<CQLStatement> statements) throws RequestExecutionException, RequestValidationException { BatchStatement batch = new BatchStatement(0, BatchStatement.Type.LOGGED, Lists.newArrayList(Iterables.filter(statements, ModificationStatement.class)), Attributes.none()); QueryProcessor.instance.processBatch(batch, QueryState.forInternalCalls(), BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT), System.nanoTime()); } // Add every permission on the resource granted to the role private void addPermissionsForRole(Set<Permission> permissions, IResource resource, RoleResource role) throws RequestExecutionException, RequestValidationException { QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.LOCAL_ONE, Lists.newArrayList(ByteBufferUtil.bytes(role.getRoleName()), ByteBufferUtil.bytes(resource.getName()))); SelectStatement statement; // If it exists, read from the legacy user permissions table to handle the case where the cluster // is being upgraded and so is running with mixed versions of the authz schema if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, USER_PERMISSIONS) == null) statement = authorizeRoleStatement; else { // If the permissions table was initialised only after the statement got prepared, re-prepare (CASSANDRA-12813) if (legacyAuthorizeRoleStatement == null) legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS); statement = legacyAuthorizeRoleStatement; } ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime()); UntypedResultSet result = UntypedResultSet.create(rows.result); if (!result.isEmpty() && result.one().has(PERMISSIONS)) { for (String perm : result.one().getSet(PERMISSIONS, UTF8Type.instance)) { permissions.add(Permission.valueOf(perm)); } } } // Adds or removes permissions from a role_permissions table (adds if op is "+", removes if op is "-") private void modifyRolePermissions(Set<Permission> permissions, IResource resource, RoleResource role, String op) throws RequestExecutionException { process(String.format("UPDATE %s.%s SET permissions = permissions %s {%s} WHERE role = '%s' AND resource = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS, op, "'" + StringUtils.join(permissions, "','") + "'", escape(role.getRoleName()), escape(resource.getName()))); } // Removes an entry from the inverted index table (from resource -> role with defined permissions) private void removeLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException { process(String.format("DELETE FROM %s.%s WHERE resource = '%s' and role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(resource.getName()), escape(role.getRoleName()))); } // Adds an entry to the inverted index table (from resource -> role with defined permissions) private void addLookupEntry(IResource resource, RoleResource role) throws RequestExecutionException { process(String.format("INSERT INTO %s.%s (resource, role) VALUES ('%s','%s')", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX, escape(resource.getName()), escape(role.getRoleName()))); } // 'of' can be null - in that case everyone's permissions have been requested. Otherwise only single user's. // If the user requesting 'LIST PERMISSIONS' is not a superuser OR their username doesn't match 'of', we // throw UnauthorizedException. So only a superuser can view everybody's permissions. Regular users are only // allowed to see their own permissions. public Set<PermissionDetails> list(AuthenticatedUser performer, Set<Permission> permissions, IResource resource, RoleResource grantee) throws RequestValidationException, RequestExecutionException { if (!(performer.isSuper() || performer.isSystem()) && !performer.getRoles().contains(grantee)) throw new UnauthorizedException(String.format("You are not authorized to view %s's permissions", grantee == null ? "everyone" : grantee.getRoleName())); if (null == grantee) return listPermissionsForRole(permissions, resource, grantee); Set<RoleResource> roles = DatabaseDescriptor.getRoleManager().getRoles(grantee, true); Set<PermissionDetails> details = new HashSet<>(); for (RoleResource role : roles) details.addAll(listPermissionsForRole(permissions, resource, role)); return details; } private Set<PermissionDetails> listPermissionsForRole(Set<Permission> permissions, IResource resource, RoleResource role) throws RequestExecutionException { Set<PermissionDetails> details = new HashSet<>(); // If it exists, try the legacy user permissions table first. This is to handle the case // where the cluster is being upgraded and so is running with mixed versions of the perms table boolean useLegacyTable = Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, USER_PERMISSIONS) != null; String entityColumnName = useLegacyTable ? USERNAME : ROLE; for (UntypedResultSet.Row row : process(buildListQuery(resource, role, useLegacyTable))) { if (row.has(PERMISSIONS)) { for (String p : row.getSet(PERMISSIONS, UTF8Type.instance)) { Permission permission = Permission.valueOf(p); if (permissions.contains(permission)) details.add(new PermissionDetails(row.getString(entityColumnName), Resources.fromName(row.getString(RESOURCE)), permission)); } } } return details; } private String buildListQuery(IResource resource, RoleResource grantee, boolean useLegacyTable) { String tableName = useLegacyTable ? USER_PERMISSIONS : AuthKeyspace.ROLE_PERMISSIONS; String entityName = useLegacyTable ? USERNAME : ROLE; List<String> vars = Lists.newArrayList(SchemaConstants.AUTH_KEYSPACE_NAME, tableName); List<String> conditions = new ArrayList<>(); if (resource != null) { conditions.add("resource = '%s'"); vars.add(escape(resource.getName())); } if (grantee != null) { conditions.add(entityName + " = '%s'"); vars.add(escape(grantee.getRoleName())); } String query = "SELECT " + entityName + ", resource, permissions FROM %s.%s"; if (!conditions.isEmpty()) query += " WHERE " + StringUtils.join(conditions, " AND "); if (resource != null && grantee == null) query += " ALLOW FILTERING"; return String.format(query, vars.toArray()); } public Set<DataResource> protectedResources() { return ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS)); } public void validateConfiguration() throws ConfigurationException { } public void setup() { authorizeRoleStatement = prepare(ROLE, AuthKeyspace.ROLE_PERMISSIONS); // If old user permissions table exists, migrate the legacy authz data to the new table // The delay is to give the node a chance to see its peers before attempting the conversion if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, "permissions") != null) { legacyAuthorizeRoleStatement = prepare(USERNAME, USER_PERMISSIONS); ScheduledExecutors.optionalTasks.schedule(new Runnable() { public void run() { convertLegacyData(); } }, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS); } } private SelectStatement prepare(String entityname, String permissionsTable) { String query = String.format("SELECT permissions FROM %s.%s WHERE %s = ? AND resource = ?", SchemaConstants.AUTH_KEYSPACE_NAME, permissionsTable, entityname); return (SelectStatement) QueryProcessor.getStatement(query, ClientState.forInternalCalls()).statement; }
Copy legacy authz data from the system_auth.permissions table to the new system_auth.role_permissions table and also insert entries into the reverse lookup table. In theory, we could simply rename the existing table as the schema is structurally the same, but this would break mixed clusters during a rolling upgrade. This setup is not performed if AllowAllAuthenticator is configured (see Auth#setup).
/** * Copy legacy authz data from the system_auth.permissions table to the new system_auth.role_permissions table and * also insert entries into the reverse lookup table. * In theory, we could simply rename the existing table as the schema is structurally the same, but this would * break mixed clusters during a rolling upgrade. * This setup is not performed if AllowAllAuthenticator is configured (see Auth#setup). */
private void convertLegacyData() { try { if (Schema.instance.getCFMetaData("system_auth", "permissions") != null) { logger.info("Converting legacy permissions data"); CQLStatement insertStatement = QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (role, resource, permissions) " + "VALUES (?, ?, ?)", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_PERMISSIONS), ClientState.forInternalCalls()).statement; CQLStatement indexStatement = QueryProcessor.getStatement(String.format("INSERT INTO %s.%s (resource, role) VALUES (?,?)", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.RESOURCE_ROLE_INDEX), ClientState.forInternalCalls()).statement; UntypedResultSet permissions = process("SELECT * FROM system_auth.permissions"); for (UntypedResultSet.Row row : permissions) { final IResource resource = Resources.fromName(row.getString("resource")); Predicate<String> isApplicable = new Predicate<String>() { public boolean apply(String s) { return resource.applicablePermissions().contains(Permission.valueOf(s)); } }; SetSerializer<String> serializer = SetSerializer.getInstance(UTF8Serializer.instance, UTF8Type.instance); Set<String> originalPerms = serializer.deserialize(row.getBytes("permissions")); Set<String> filteredPerms = ImmutableSet.copyOf(Iterables.filter(originalPerms, isApplicable)); insertStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(row.getBytes("username"), row.getBytes("resource"), serializer.serialize(filteredPerms))), System.nanoTime()); indexStatement.execute(QueryState.forInternalCalls(), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(row.getBytes("resource"), row.getBytes("username"))), System.nanoTime()); } logger.info("Completed conversion of legacy permissions"); } } catch (Exception e) { logger.info("Unable to complete conversion of legacy permissions data (perhaps not enough nodes are upgraded yet). " + "Conversion should not be considered complete"); logger.trace("Conversion error", e); } } // We only worry about one character ('). Make sure it's properly escaped. private String escape(String name) { return StringUtils.replace(name, "'", "''"); } private UntypedResultSet process(String query) throws RequestExecutionException { return QueryProcessor.process(query, ConsistencyLevel.LOCAL_ONE); } }