package org.apache.cassandra.auth;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.*;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.mindrot.jbcrypt.BCrypt;
public class CassandraRoleManager implements IRoleManager
{
private static final Logger logger = LoggerFactory.getLogger(CassandraRoleManager.class);
static final String DEFAULT_SUPERUSER_NAME = "cassandra";
static final String DEFAULT_SUPERUSER_PASSWORD = "cassandra";
private static final Function<UntypedResultSet.Row, Role> ROW_TO_ROLE = new Function<UntypedResultSet.Row, Role>()
{
public Role apply(UntypedResultSet.Row row)
{
try
{
return new Role(row.getString("role"),
row.getBoolean("is_superuser"),
row.getBoolean("can_login"),
row.has("member_of") ? row.getSet("member_of", UTF8Type.instance)
: Collections.<String>emptySet());
}
catch (NullPointerException e)
{
logger.warn("An invalid value has been detected in the {} table for role {}. If you are " +
"unable to login, you may need to disable authentication and confirm " +
"that values in that table are accurate", AuthKeyspace.ROLES, row.getString("role"));
throw new RuntimeException(String.format("Invalid metadata has been detected for role %s", row.getString("role")), e);
}
}
};
public static final String LEGACY_USERS_TABLE = "users";
private static final Function<UntypedResultSet.Row, Role> LEGACY_ROW_TO_ROLE = new Function<UntypedResultSet.Row, Role>()
{
public Role apply(UntypedResultSet.Row row)
{
return new Role(row.getString("name"),
row.getBoolean("super"),
true,
Collections.<String>emptySet());
}
};
private static final String GENSALT_LOG2_ROUNDS_PROPERTY = Config.PROPERTY_PREFIX + "auth_bcrypt_gensalt_log2_rounds";
private static final int GENSALT_LOG2_ROUNDS = getGensaltLogRounds();
static int getGensaltLogRounds()
{
int rounds = Integer.getInteger(GENSALT_LOG2_ROUNDS_PROPERTY, 10);
if (rounds < 4 || rounds > 31)
throw new ConfigurationException(String.format("Bad value for system property -D%s." +
"Please use a value between 4 and 31 inclusively",
GENSALT_LOG2_ROUNDS_PROPERTY));
return rounds;
}
private static final Role NULL_ROLE = new Role(null, false, false, Collections.<String>emptySet());
private SelectStatement loadRoleStatement;
private SelectStatement legacySelectUserStatement;
private final Set<Option> supportedOptions;
private final Set<Option> alterableOptions;
private volatile boolean isClusterReady = false;
public CassandraRoleManager()
{
supportedOptions = DatabaseDescriptor.getAuthenticator().getClass() == PasswordAuthenticator.class
? ImmutableSet.of(Option.LOGIN, Option.SUPERUSER, Option.PASSWORD)
: ImmutableSet.of(Option.LOGIN, Option.SUPERUSER);
alterableOptions = DatabaseDescriptor.getAuthenticator().getClass().equals(PasswordAuthenticator.class)
? ImmutableSet.of(Option.PASSWORD)
: ImmutableSet.<Option>of();
}
public void setup()
{
loadRoleStatement = (SelectStatement) prepare("SELECT * from %s.%s WHERE role = ?",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES);
if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, "users") != null)
{
legacySelectUserStatement = prepareLegacySelectUserStatement();
scheduleSetupTask(() -> {
convertLegacyData();
return null;
});
}
else
{
scheduleSetupTask(() -> {
setupDefaultRole();
return null;
});
}
}
public Set<Option> supportedOptions()
{
return supportedOptions;
}
public Set<Option> alterableOptions()
{
return alterableOptions;
}
public void createRole(AuthenticatedUser performer, RoleResource role, RoleOptions options)
throws RequestValidationException, RequestExecutionException
{
String insertCql = options.getPassword().isPresent()
? String.format("INSERT INTO %s.%s (role, is_superuser, can_login, salted_hash) VALUES ('%s', %s, %s, '%s')",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
escape(role.getRoleName()),
options.getSuperuser().or(false),
options.getLogin().or(false),
escape(hashpw(options.getPassword().get())))
: String.format("INSERT INTO %s.%s (role, is_superuser, can_login) VALUES ('%s', %s, %s)",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
escape(role.getRoleName()),
options.getSuperuser().or(false),
options.getLogin().or(false));
process(insertCql, consistencyForRole(role.getRoleName()));
}
public void dropRole(AuthenticatedUser performer, RoleResource role) throws RequestValidationException, RequestExecutionException
{
process(String.format("DELETE FROM %s.%s WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
escape(role.getRoleName())),
consistencyForRole(role.getRoleName()));
removeAllMembers(role.getRoleName());
}
public void alterRole(AuthenticatedUser performer, RoleResource role, RoleOptions options)
{
String assignments = Joiner.on(',').join(Iterables.filter(optionsToAssignments(options.getOptions()),
Predicates.notNull()));
if (!Strings.isNullOrEmpty(assignments))
{
process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
assignments,
escape(role.getRoleName())),
consistencyForRole(role.getRoleName()));
}
}
public void grantRole(AuthenticatedUser performer, RoleResource role, RoleResource grantee)
throws RequestValidationException, RequestExecutionException
{
if (getRoles(grantee, true).contains(role))
throw new InvalidRequestException(String.format("%s is a member of %s",
grantee.getRoleName(),
role.getRoleName()));
if (getRoles(role, true).contains(grantee))
throw new InvalidRequestException(String.format("%s is a member of %s",
role.getRoleName(),
grantee.getRoleName()));
modifyRoleMembership(grantee.getRoleName(), role.getRoleName(), "+");
process(String.format("INSERT INTO %s.%s (role, member) values ('%s', '%s')",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_MEMBERS,
escape(role.getRoleName()),
escape(grantee.getRoleName())),
consistencyForRole(role.getRoleName()));
}
public void revokeRole(AuthenticatedUser performer, RoleResource role, RoleResource revokee)
throws RequestValidationException, RequestExecutionException
{
if (!getRoles(revokee, false).contains(role))
throw new InvalidRequestException(String.format("%s is not a member of %s",
revokee.getRoleName(),
role.getRoleName()));
modifyRoleMembership(revokee.getRoleName(), role.getRoleName(), "-");
process(String.format("DELETE FROM %s.%s WHERE role = '%s' and member = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_MEMBERS,
escape(role.getRoleName()),
escape(revokee.getRoleName())),
consistencyForRole(role.getRoleName()));
}
public Set<RoleResource> getRoles(RoleResource grantee, boolean includeInherited) throws RequestValidationException, RequestExecutionException
{
Set<RoleResource> roles = new HashSet<>();
Role role = getRole(grantee.getRoleName());
if (!role.equals(NULL_ROLE))
{
roles.add(RoleResource.role(role.name));
collectRoles(role, roles, includeInherited);
}
return roles;
}
public Set<RoleResource> getAllRoles() throws RequestValidationException, RequestExecutionException
{
UntypedResultSet rows = process(String.format("SELECT role from %s.%s", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLES), ConsistencyLevel.QUORUM);
Iterable<RoleResource> roles = Iterables.transform(rows, new Function<UntypedResultSet.Row, RoleResource>()
{
public RoleResource apply(UntypedResultSet.Row row)
{
return RoleResource.role(row.getString("role"));
}
});
return ImmutableSet.<RoleResource>builder().addAll(roles).build();
}
public boolean isSuper(RoleResource role)
{
try
{
return getRole(role.getRoleName()).isSuper;
}
catch (RequestExecutionException e)
{
logger.debug("Failed to authorize {} for super-user permission", role.getRoleName());
throw new UnauthorizedException("Unable to perform authorization of super-user permission: " + e.getMessage(), e);
}
}
public boolean canLogin(RoleResource role)
{
try
{
return getRole(role.getRoleName()).canLogin;
}
catch (RequestExecutionException e)
{
logger.debug("Failed to authorize {} for login permission", role.getRoleName());
throw new UnauthorizedException("Unable to perform authorization of login permission: " + e.getMessage(), e);
}
}
public Map<String, String> getCustomOptions(RoleResource role)
{
return Collections.emptyMap();
}
public boolean isExistingRole(RoleResource role)
{
return getRole(role.getRoleName()) != NULL_ROLE;
}
public Set<? extends IResource> protectedResources()
{
return ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLES),
DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLE_MEMBERS));
}
public void validateConfiguration() throws ConfigurationException
{
}
private static void setupDefaultRole()
{
if (StorageService.instance.getTokenMetadata().sortedTokens().isEmpty())
throw new IllegalStateException("CassandraRoleManager skipped default role setup: no known tokens in ring");
try
{
if (!hasExistingRoles())
{
QueryProcessor.process(String.format("INSERT INTO %s.%s (role, is_superuser, can_login, salted_hash) " +
"VALUES ('%s', true, true, '%s')",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
DEFAULT_SUPERUSER_NAME,
escape(hashpw(DEFAULT_SUPERUSER_PASSWORD))),
consistencyForRole(DEFAULT_SUPERUSER_NAME));
logger.info("Created default superuser role '{}'", DEFAULT_SUPERUSER_NAME);
}
}
catch (RequestExecutionException e)
{
logger.warn("CassandraRoleManager skipped default role setup: some nodes were not ready");
throw e;
}
}
private static boolean hasExistingRoles() throws RequestExecutionException
{
String defaultSUQuery = String.format("SELECT * FROM %s.%s WHERE role = '%s'", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLES, DEFAULT_SUPERUSER_NAME);
String allUsersQuery = String.format("SELECT * FROM %s.%s LIMIT 1", SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.ROLES);
return !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty()
|| !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty()
|| !QueryProcessor.process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty();
}
private void scheduleSetupTask(final Callable<Void> setupTask)
{
ScheduledExecutors.optionalTasks.schedule(new Runnable()
{
public void run()
{
if (!MessagingService.instance().areAllNodesAtLeast22())
{
logger.trace("Not all nodes are upgraded to a version that supports Roles yet, rescheduling setup task");
scheduleSetupTask(setupTask);
return;
}
isClusterReady = true;
try
{
setupTask.call();
}
catch (Exception e)
{
logger.info("Setup task failed with error, rescheduling");
scheduleSetupTask(setupTask);
}
}
}, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
}
private void convertLegacyData() throws Exception
{
try
{
if (Schema.instance.getCFMetaData("system_auth", "users") != null)
{
logger.info("Converting legacy users");
UntypedResultSet users = QueryProcessor.process("SELECT * FROM system_auth.users",
ConsistencyLevel.QUORUM);
for (UntypedResultSet.Row row : users)
{
RoleOptions options = new RoleOptions();
options.setOption(Option.SUPERUSER, row.getBoolean("super"));
options.setOption(Option.LOGIN, true);
createRole(null, RoleResource.role(row.getString("name")), options);
}
logger.info("Completed conversion of legacy users");
}
if (Schema.instance.getCFMetaData("system_auth", "credentials") != null)
{
logger.info("Migrating legacy credentials data to new system table");
UntypedResultSet credentials = QueryProcessor.process("SELECT * FROM system_auth.credentials",
ConsistencyLevel.QUORUM);
for (UntypedResultSet.Row row : credentials)
{
QueryProcessor.process(String.format("UPDATE %s.%s SET salted_hash = '%s' WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
row.getString("salted_hash"),
row.getString("username")),
consistencyForRole(row.getString("username")));
}
logger.info("Completed conversion of legacy credentials");
}
}
catch (Exception e)
{
logger.info("Unable to complete conversion of legacy auth data (perhaps not enough nodes are upgraded yet). " +
"Conversion should not be considered complete");
logger.trace("Conversion error", e);
throw e;
}
}
private SelectStatement prepareLegacySelectUserStatement()
{
return (SelectStatement) prepare("SELECT * FROM %s.%s WHERE name = ?",
SchemaConstants.AUTH_KEYSPACE_NAME,
LEGACY_USERS_TABLE);
}
private CQLStatement prepare(String template, String keyspace, String table)
{
try
{
return QueryProcessor.parseStatement(String.format(template, keyspace, table)).prepare(ClientState.forInternalCalls()).statement;
}
catch (RequestValidationException e)
{
throw new AssertionError(e);
}
}
private void collectRoles(Role role, Set<RoleResource> collected, boolean includeInherited) throws RequestValidationException, RequestExecutionException
{
for (String memberOf : role.memberOf)
{
Role granted = getRole(memberOf);
if (granted.equals(NULL_ROLE))
continue;
collected.add(RoleResource.role(granted.name));
if (includeInherited)
collectRoles(granted, collected, true);
}
}
private Role getRole(String name)
{
if (Schema.instance.getCFMetaData(SchemaConstants.AUTH_KEYSPACE_NAME, "users") == null)
return getRoleFromTable(name, loadRoleStatement, ROW_TO_ROLE);
else
{
if (legacySelectUserStatement == null)
legacySelectUserStatement = prepareLegacySelectUserStatement();
return getRoleFromTable(name, legacySelectUserStatement, LEGACY_ROW_TO_ROLE);
}
}
private Role getRoleFromTable(String name, SelectStatement statement, Function<UntypedResultSet.Row, Role> function)
throws RequestExecutionException, RequestValidationException
{
ResultMessage.Rows rows =
statement.execute(QueryState.forInternalCalls(),
QueryOptions.forInternalCalls(consistencyForRole(name),
Collections.singletonList(ByteBufferUtil.bytes(name))),
System.nanoTime());
if (rows.result.isEmpty())
return NULL_ROLE;
return function.apply(UntypedResultSet.create(rows.result).one());
}
private void modifyRoleMembership(String grantee, String role, String op)
throws RequestExecutionException
{
process(String.format("UPDATE %s.%s SET member_of = member_of %s {'%s'} WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLES,
op,
escape(role),
escape(grantee)),
consistencyForRole(grantee));
}
private void removeAllMembers(String role) throws RequestValidationException, RequestExecutionException
{
UntypedResultSet rows = process(String.format("SELECT member FROM %s.%s WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_MEMBERS,
escape(role)),
consistencyForRole(role));
if (rows.isEmpty())
return;
for (UntypedResultSet.Row row : rows)
modifyRoleMembership(row.getString("member"), role, "-");
process(String.format("DELETE FROM %s.%s WHERE role = '%s'",
SchemaConstants.AUTH_KEYSPACE_NAME,
AuthKeyspace.ROLE_MEMBERS,
escape(role)),
consistencyForRole(role));
}
private Iterable<String> optionsToAssignments(Map<Option, Object> options)
{
return Iterables.transform(
options.entrySet(),
new Function<Map.Entry<Option, Object>, String>()
{
public String apply(Map.Entry<Option, Object> entry)
{
switch (entry.getKey())
{
case LOGIN:
return String.format("can_login = %s", entry.getValue());
case SUPERUSER:
return String.format("is_superuser = %s", entry.getValue());
case PASSWORD:
return String.format("salted_hash = '%s'", escape(hashpw((String) entry.getValue())));
default:
return null;
}
}
});
}
protected static ConsistencyLevel consistencyForRole(String role)
{
if (role.equals(DEFAULT_SUPERUSER_NAME))
return ConsistencyLevel.QUORUM;
else
return ConsistencyLevel.LOCAL_ONE;
}
private static String hashpw(String password)
{
return BCrypt.hashpw(password, BCrypt.gensalt(GENSALT_LOG2_ROUNDS));
}
private static String escape(String name)
{
return StringUtils.replace(name, "'", "''");
}
private UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException
{
if (!isClusterReady)
throw new InvalidRequestException("Cannot process role related query as the role manager isn't yet setup. "
+ "This is likely because some of nodes in the cluster are on version 2.1 or earlier. "
+ "You need to upgrade all nodes to Cassandra 2.2 or more to use roles.");
return QueryProcessor.process(query, consistencyLevel);
}
private static final class Role
{
private String name;
private final boolean isSuper;
private final boolean canLogin;
private Set<String> memberOf;
private Role(String name, boolean isSuper, boolean canLogin, Set<String> memberOf)
{
this.name = name;
this.isSuper = isSuper;
this.canLogin = canLogin;
this.memberOf = memberOf;
}
public boolean equals(Object o)
{
if (this == o)
return true;
if (!(o instanceof Role))
return false;
Role r = (Role) o;
return Objects.equal(name, r.name);
}
public int hashCode()
{
return Objects.hashCode(name);
}
}
}