diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c56fc01a566..444cfdc346de 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -261,6 +261,7 @@ add_custom_target(compiler-training) add_subdirectory(api) add_subdirectory(alternator) +add_subdirectory(audit) add_subdirectory(db) add_subdirectory(auth) add_subdirectory(cdc) @@ -300,6 +301,7 @@ add_version_library(scylla_version add_executable(scylla main.cc) set(scylla_libs + audit scylla-main api auth diff --git a/audit/CMakeLists.txt b/audit/CMakeLists.txt new file mode 100644 index 000000000000..2d606754e6c6 --- /dev/null +++ b/audit/CMakeLists.txt @@ -0,0 +1,19 @@ +include(add_whole_archive) + +add_library(scylla_audit STATIC) +target_sources(scylla_audit + PRIVATE + audit.cc + audit_cf_storage_helper.cc + audit_syslog_storage_helper.cc) +target_include_directories(scylla_audit + PUBLIC + ${CMAKE_SOURCE_DIR}) +target_link_libraries(scylla_audit + PUBLIC + Seastar::seastar + xxHash::xxhash + PRIVATE + cql3) + +add_whole_archive(audit scylla_audit) diff --git a/audit/audit.cc b/audit/audit.cc new file mode 100644 index 000000000000..7fa206e46e67 --- /dev/null +++ b/audit/audit.cc @@ -0,0 +1,263 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include +#include "audit/audit.hh" +#include "db/config.hh" +#include "cql3/cql_statement.hh" +#include "cql3/statements/batch_statement.hh" +#include "cql3/statements/modification_statement.hh" +#include "storage_helper.hh" +#include "audit.hh" +#include "../db/config.hh" +#include "utils/class_registrator.hh" + +#include +#include +#include + + +namespace audit { + +logging::logger logger("audit"); + +sstring audit_info::category_string() const { + switch (_category) { + case statement_category::QUERY: return "QUERY"; + case statement_category::DML: return "DML"; + case statement_category::DDL: return "DDL"; + case statement_category::DCL: return "DCL"; + case statement_category::AUTH: return "AUTH"; + case statement_category::ADMIN: return "ADMIN"; + } + return ""; +} + +audit::audit(locator::shared_token_metadata& token_metadata, + sstring&& storage_helper_name, + std::set&& audited_keyspaces, + std::map>&& audited_tables, + category_set&& audited_categories) + : _token_metadata(token_metadata) + , _audited_keyspaces(std::move(audited_keyspaces)) + , _audited_tables(std::move(audited_tables)) + , _audited_categories(std::move(audited_categories)) + , _storage_helper_class_name(std::move(storage_helper_name)) +{ } + +audit::~audit() = default; + +static category_set parse_audit_categories(const sstring& data) { + category_set result; + if (!data.empty()) { + std::vector tokens; + boost::split(tokens, data, boost::is_any_of(",")); + for (sstring& category : tokens) { + boost::trim(category); + if (category == "QUERY") { + result.set(statement_category::QUERY); + } else if (category == "DML") { + result.set(statement_category::DML); + } else if (category == "DDL") { + result.set(statement_category::DDL); + } else if (category == "DCL") { + result.set(statement_category::DCL); + } else if (category == "AUTH") { + result.set(statement_category::AUTH); + } else if (category == "ADMIN") { + result.set(statement_category::ADMIN); + } else { + throw audit_exception(fmt::format("Bad configuration: invalid 'audit_categories': {}", data)); + } + } + } + return result; +} + +static std::map> parse_audit_tables(const sstring& data) { + std::map> result; + if (!data.empty()) { + std::vector tokens; + boost::split(tokens, data, boost::is_any_of(",")); + for (sstring& token : tokens) { + std::vector parts; + boost::split(parts, token, boost::is_any_of(".")); + if (parts.size() != 2) { + throw audit_exception(fmt::format("Bad configuration: invalid 'audit_tables': {}", data)); + } + boost::trim(parts[0]); + boost::trim(parts[1]); + result[parts[0]].insert(std::move(parts[1])); + } + } + return result; +} + +static std::set parse_audit_keyspaces(const sstring& data) { + std::set result; + if (!data.empty()) { + std::vector tokens; + boost::split(tokens, data, boost::is_any_of(",")); + for (sstring& token : tokens) { + boost::trim(token); + result.insert(std::move(token)); + } + } + return result; +} + +future<> audit::create_audit(const db::config& cfg, sharded& stm) { + sstring storage_helper_name; + if (cfg.audit() == "table") { + storage_helper_name = "audit_cf_storage_helper"; + } else if (cfg.audit() == "syslog") { + storage_helper_name = "audit_syslog_storage_helper"; + } else if (cfg.audit() == "none") { + // Audit is off + logger.info("Audit is disabled"); + + return make_ready_future<>(); + } else { + throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", cfg.audit())); + } + category_set audited_categories = parse_audit_categories(cfg.audit_categories()); + if (!audited_categories) { + return make_ready_future<>(); + } + std::map> audited_tables = parse_audit_tables(cfg.audit_tables()); + std::set audited_keyspaces = parse_audit_keyspaces(cfg.audit_keyspaces()); + if (audited_tables.empty() + && audited_keyspaces.empty() + && !audited_categories.contains(statement_category::AUTH) + && !audited_categories.contains(statement_category::ADMIN) + && !audited_categories.contains(statement_category::DCL)) { + return make_ready_future<>(); + } + logger.info("Audit is enabled. Auditing to: \"{}\", with the following categories: \"{}\", keyspaces: \"{}\", and tables: \"{}\"", + cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables()); + + return audit_instance().start(std::ref(stm), + std::move(storage_helper_name), + std::move(audited_keyspaces), + std::move(audited_tables), + std::move(audited_categories)); +} + +future<> audit::start_audit(const db::config& cfg, sharded& qp, sharded& mm) { + if (!audit_instance().local_is_initialized()) { + return make_ready_future<>(); + } + return audit_instance().invoke_on_all([&cfg, &qp, &mm] (audit& local_audit) { + return local_audit.start(cfg, qp.local(), mm.local()); + }); +} + +future<> audit::stop_audit() { + if (!audit_instance().local_is_initialized()) { + return make_ready_future<>(); + } + return audit::audit::audit_instance().invoke_on_all([] (auto& local_audit) { + return local_audit.shutdown(); + }).then([] { + return audit::audit::audit_instance().stop(); + }); +} + +audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table) { + if (!audit_instance().local_is_initialized()) { + return nullptr; + } + return std::make_unique(cat, keyspace, table); +} + +audit_info_ptr audit::create_no_audit_info() { + return audit_info_ptr(); +} + +future<> audit::start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm) { + try { + _storage_helper_ptr = create_object(_storage_helper_class_name, qp, mm); + } catch (no_such_class& e) { + logger.error("Can't create audit storage helper {}: not supported", _storage_helper_class_name); + throw; + } catch (...) { + throw; + } + return _storage_helper_ptr->start(cfg); +} + +future<> audit::stop() { + return _storage_helper_ptr->stop(); +} + +future<> audit::shutdown() { + return make_ready_future<>(); +} + +future<> audit::log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error) { + const service::client_state& client_state = query_state.get_client_state(); + socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); + db::consistency_level cl = options.get_consistency(); + thread_local static sstring no_username("undefined"); + static const sstring anonymous_username("anonymous"); + const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username; + socket_address client_ip = client_state.get_client_address().addr(); + return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, audit_info, node_ip, client_ip, cl, username, error) + .handle_exception([audit_info, node_ip, client_ip, cl, username, error] (auto ep) { + logger.error("Unexpected exception when writing log with: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {} exception {}", + node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(), + audit_info->query(), client_ip, audit_info->table(),username, ep); + }); +} + +future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept { + socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); + return futurize_invoke(std::mem_fn(&storage_helper::write_login), _storage_helper_ptr, username, node_ip, client_ip, error) + .handle_exception([username, node_ip, client_ip, error] (auto ep) { + logger.error("Unexpected exception when writing login log with: node_ip {} client_ip {} username {} error {} exception {}", + node_ip, client_ip, username, error, ep); + }); +} + +future<> inspect(shared_ptr statement, service::query_state& query_state, const cql3::query_options& options, bool error) { + cql3::statements::batch_statement* batch = dynamic_cast(statement.get()); + if (batch != nullptr) { + return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) { + return inspect(m.statement, query_state, options, error); + }); + } else { + auto audit_info = statement->get_audit_info(); + if (bool(audit_info) && audit::local_audit_instance().should_log(audit_info)) { + return audit::local_audit_instance().log(audit_info, query_state, options, error); + } + } + return make_ready_future<>(); +} + +future<> inspect_login(const sstring& username, socket_address client_ip, bool error) { + if (!audit::audit_instance().local_is_initialized() || !audit::local_audit_instance().should_log_login()) { + return make_ready_future<>(); + } + return audit::local_audit_instance().log_login(username, client_ip, error); +} + +bool audit::should_log_table(const sstring& keyspace, const sstring& name) const { + auto keyspace_it = _audited_tables.find(keyspace); + return keyspace_it != _audited_tables.cend() && keyspace_it->second.find(name) != keyspace_it->second.cend(); +} + +bool audit::should_log(const audit_info* audit_info) const { + return _audited_categories.contains(audit_info->category()) + && (_audited_keyspaces.find(audit_info->keyspace()) != _audited_keyspaces.cend() + || should_log_table(audit_info->keyspace(), audit_info->table()) + || audit_info->category() == statement_category::AUTH + || audit_info->category() == statement_category::ADMIN + || audit_info->category() == statement_category::DCL); +} + +} diff --git a/audit/audit.hh b/audit/audit.hh new file mode 100644 index 000000000000..0e434fbeaa3e --- /dev/null +++ b/audit/audit.hh @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ +#pragma once + +#include "seastarx.hh" +#include "utils/log.hh" +#include "db/consistency_level.hh" +#include "locator/token_metadata_fwd.hh" +#include +#include + +#include "enum_set.hh" + +#include + +namespace db { + +class config; + +} + +namespace cql3 { + +class cql_statement; +class query_processor; +class query_options; + +} + +namespace service { + +class migration_manager; +class query_state; + +} + +namespace locator { + +class shared_token_metadata; + +} + +namespace audit { + +extern logging::logger logger; + +class audit_exception : public std::exception { + sstring _what; +public: + explicit audit_exception(sstring&& what) : _what(std::move(what)) { } + const char* what() const noexcept override { + return _what.c_str(); + } +}; + +enum class statement_category { + QUERY, DML, DDL, DCL, AUTH, ADMIN +}; + +using category_set = enum_set>; + +class audit_info final { + statement_category _category; + sstring _keyspace; + sstring _table; + sstring _query; +public: + audit_info(statement_category cat, sstring keyspace, sstring table) + : _category(cat) + , _keyspace(std::move(keyspace)) + , _table(std::move(table)) + { } + void set_query_string(const std::string_view& query_string) { + _query = sstring(query_string); + } + const sstring& keyspace() const { return _keyspace; } + const sstring& table() const { return _table; } + const sstring& query() const { return _query; } + sstring category_string() const; + statement_category category() const { return _category; } +}; + +using audit_info_ptr = std::unique_ptr; + +class storage_helper; + +class audit final : public seastar::async_sharded_service { + locator::shared_token_metadata& _token_metadata; + const std::set _audited_keyspaces; + // Maps keyspace name to set of table names in that keyspace + const std::map> _audited_tables; + const category_set _audited_categories; + sstring _storage_helper_class_name; + std::unique_ptr _storage_helper_ptr; + + bool should_log_table(const sstring& keyspace, const sstring& name) const; +public: + static seastar::sharded& audit_instance() { + // FIXME: leaked intentionally to avoid shutdown problems, see #293 + static seastar::sharded* audit_inst = new seastar::sharded(); + + return *audit_inst; + } + + static audit& local_audit_instance() { + return audit_instance().local(); + } + static future<> create_audit(const db::config& cfg, sharded& stm); + static future<> start_audit(const db::config& cfg, sharded& qp, sharded& mm); + static future<> stop_audit(); + static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table); + static audit_info_ptr create_no_audit_info(); + audit(locator::shared_token_metadata& stm, sstring&& storage_helper_name, + std::set&& audited_keyspaces, + std::map>&& audited_tables, + category_set&& audited_categories); + ~audit(); + future<> start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm); + future<> stop(); + future<> shutdown(); + bool should_log(const audit_info* audit_info) const; + bool should_log_login() const { return _audited_categories.contains(statement_category::AUTH); } + future<> log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error); + future<> log_login(const sstring& username, socket_address client_ip, bool error) noexcept; +}; + +future<> inspect(shared_ptr statement, service::query_state& query_state, const cql3::query_options& options, bool error); + +future<> inspect_login(const sstring& username, socket_address client_ip, bool error); + +} diff --git a/audit/audit_cf_storage_helper.cc b/audit/audit_cf_storage_helper.cc new file mode 100644 index 000000000000..b2657b46c0c2 --- /dev/null +++ b/audit/audit_cf_storage_helper.cc @@ -0,0 +1,202 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "audit/audit_cf_storage_helper.hh" + +#include "cql3/query_processor.hh" +#include "data_dictionary/keyspace_metadata.hh" +#include "utils/UUID_gen.hh" +#include "utils/class_registrator.hh" +#include "cql3/query_options.hh" +#include "cql3/statements/ks_prop_defs.hh" +#include "service/migration_manager.hh" +#include "service/storage_proxy.hh" + +namespace audit { + +const sstring audit_cf_storage_helper::KEYSPACE_NAME("audit"); +const sstring audit_cf_storage_helper::TABLE_NAME("audit_log"); + +audit_cf_storage_helper::audit_cf_storage_helper(cql3::query_processor& qp, service::migration_manager& mm) + : _qp(qp) + , _mm(mm) + , _table(KEYSPACE_NAME, TABLE_NAME, + fmt::format("CREATE TABLE IF NOT EXISTS {}.{} (" + "date timestamp, " + "node inet, " + "event_time timeuuid, " + "category text, " + "consistency text, " + "table_name text, " + "keyspace_name text, " + "operation text, " + "source inet, " + "username text, " + "error boolean, " + "PRIMARY KEY ((date, node), event_time))", + KEYSPACE_NAME, TABLE_NAME), + fmt::format("INSERT INTO {}.{} (" + "date," + "node," + "event_time," + "category," + "consistency," + "table_name," + "keyspace_name," + "operation," + "source," + "username," + "error) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + KEYSPACE_NAME, TABLE_NAME)) + , _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit()) +{ +} + +future<> audit_cf_storage_helper::migrate_audit_table(service::group0_guard group0_guard) { + while (true) { + auto const ks = _qp.db().try_find_keyspace(KEYSPACE_NAME); + if (ks && ks->metadata()->strategy_name() == "org.apache.cassandra.locator.SimpleStrategy") { + data_dictionary::database db = _qp.db(); + cql3::statements::ks_prop_defs old_ks_prop_defs; + auto old_ks_metadata = old_ks_prop_defs.as_ks_metadata_update( + ks->metadata(), *_qp.proxy().get_token_metadata_ptr(), db.features()); + std::map strategy_opts; + for (const auto &dc: _qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters()) + strategy_opts[dc] = "3"; + + auto new_ks_metadata = keyspace_metadata::new_keyspace(KEYSPACE_NAME, + "org.apache.cassandra.locator.NetworkTopologyStrategy", + strategy_opts, + std::nullopt, // initial_tablets + old_ks_metadata->durable_writes(), + old_ks_metadata->get_storage_options(), + old_ks_metadata->tables()); + auto ts = group0_guard.write_timestamp(); + try { + co_await _mm.announce( + service::prepare_keyspace_update_announcement(db.real_database(), new_ks_metadata, ts), + std::move(group0_guard), format("audit: Alter {} keyspace", KEYSPACE_NAME)); + break; + } catch (::service::group0_concurrent_modification &) { + logger.info("Concurrent operation is detected while altering {} keyspace, retrying.", KEYSPACE_NAME); + } + group0_guard = co_await _mm.start_group0_operation(); + } else { + co_return; + } + } +} + +future<> audit_cf_storage_helper::start(const db::config &cfg) { + if (this_shard_id() != 0) { + co_return; + } + + if (auto ks = _qp.db().try_find_keyspace(KEYSPACE_NAME); + !ks || + ks->metadata()->strategy_name() == "org.apache.cassandra.locator.SimpleStrategy") { + + auto group0_guard = co_await _mm.start_group0_operation(); + if (ks = _qp.db().try_find_keyspace(KEYSPACE_NAME); !ks) { + // releasing, because table_helper::setup_keyspace creates a raft guard of its own + service::release_guard(std::move(group0_guard)); + co_return co_await table_helper::setup_keyspace(_qp, _mm, KEYSPACE_NAME, + "org.apache.cassandra.locator.NetworkTopologyStrategy", + "3", _dummy_query_state, {&_table}); + } else if (ks->metadata()->strategy_name() == "org.apache.cassandra.locator.SimpleStrategy") { + // We want to migrate the old (pre-Scylla 6.0) SimpleStrategy to a newer one. + // The migrate_audit_table() function will do nothing if it races with another strategy change: + // - either by another node doing the same thing in parallel, + // - or a user manually changing the strategy of the same table. + // Note we only check the strategy, not the replication factor. + co_return co_await migrate_audit_table(std::move(group0_guard)); + } else { + co_return; + } + } +} + +future<> audit_cf_storage_helper::stop() { + return make_ready_future<>(); +} + +future<> audit_cf_storage_helper::write(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error) { + return _table.insert(_qp, _mm, _dummy_query_state, make_data, audit_info, node_ip, client_ip, cl, username, error); +} + +future<> audit_cf_storage_helper::write_login(const sstring& username, + socket_address node_ip, + socket_address client_ip, + bool error) { + return _table.insert(_qp, _mm, _dummy_query_state, make_login_data, node_ip, client_ip, username, error); +} + +cql3::query_options audit_cf_storage_helper::make_data(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error) { + auto time = std::chrono::system_clock::now(); + auto millis_since_epoch = std::chrono::duration_cast(time.time_since_epoch()).count(); + auto ticks_per_day = std::chrono::duration_cast(std::chrono::hours(24)).count(); + auto date = millis_since_epoch / ticks_per_day * ticks_per_day; + thread_local static int64_t last_nanos = 0; + auto time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(last_nanos, time)); + auto consistency_level = fmt::format("{}", cl); + std::vector values { + cql3::raw_value::make_value(timestamp_type->decompose(date)), + cql3::raw_value::make_value(inet_addr_type->decompose(node_ip.addr())), + cql3::raw_value::make_value(uuid_type->decompose(time_id)), + cql3::raw_value::make_value(utf8_type->decompose(audit_info->category_string())), + cql3::raw_value::make_value(utf8_type->decompose(sstring(consistency_level))), + cql3::raw_value::make_value(utf8_type->decompose(audit_info->table())), + cql3::raw_value::make_value(utf8_type->decompose(audit_info->keyspace())), + cql3::raw_value::make_value(utf8_type->decompose(audit_info->query())), + cql3::raw_value::make_value(inet_addr_type->decompose(client_ip.addr())), + cql3::raw_value::make_value(utf8_type->decompose(username)), + cql3::raw_value::make_value(boolean_type->decompose(error)), + }; + return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT); +} + +cql3::query_options audit_cf_storage_helper::make_login_data(socket_address node_ip, + socket_address client_ip, + const sstring& username, + bool error) { + auto time = std::chrono::system_clock::now(); + auto millis_since_epoch = std::chrono::duration_cast(time.time_since_epoch()).count(); + auto ticks_per_day = std::chrono::duration_cast(std::chrono::hours(24)).count(); + auto date = millis_since_epoch / ticks_per_day * ticks_per_day; + thread_local static int64_t last_nanos = 0; + auto time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(last_nanos, time)); + std::vector values { + cql3::raw_value::make_value(timestamp_type->decompose(date)), + cql3::raw_value::make_value(inet_addr_type->decompose(node_ip.addr())), + cql3::raw_value::make_value(uuid_type->decompose(time_id)), + cql3::raw_value::make_value(utf8_type->decompose(sstring("AUTH"))), + cql3::raw_value::make_value(utf8_type->decompose(sstring(""))), + cql3::raw_value::make_value(utf8_type->decompose(sstring(""))), + cql3::raw_value::make_value(utf8_type->decompose(sstring(""))), + cql3::raw_value::make_value(utf8_type->decompose(sstring("LOGIN"))), + cql3::raw_value::make_value(inet_addr_type->decompose(client_ip.addr())), + cql3::raw_value::make_value(utf8_type->decompose(username)), + cql3::raw_value::make_value(boolean_type->decompose(error)), + }; + return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT); +} + +using registry = class_registrator; +static registry registrator1("audit_cf_storage_helper"); + +} diff --git a/audit/audit_cf_storage_helper.hh b/audit/audit_cf_storage_helper.hh new file mode 100644 index 000000000000..376616786beb --- /dev/null +++ b/audit/audit_cf_storage_helper.hh @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ +#pragma once + +#include "audit/audit.hh" +#include "table_helper.hh" +#include "storage_helper.hh" +#include "db/config.hh" +#include "service/raft/raft_group0_client.hh" + +namespace cql3 { + +class query_processor; + +} + +namespace service { + +class migration_manager; + +} + +namespace audit { + +class audit_cf_storage_helper : public storage_helper { + static const sstring KEYSPACE_NAME; + static const sstring TABLE_NAME; + cql3::query_processor& _qp; + service::migration_manager& _mm; + table_helper _table; + service::query_state _dummy_query_state; + static cql3::query_options make_data(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error); + static cql3::query_options make_login_data(socket_address node_ip, + socket_address client_ip, + const sstring& username, + bool error); + + future<> migrate_audit_table(service::group0_guard guard); + +public: + explicit audit_cf_storage_helper(cql3::query_processor& qp, service::migration_manager& mm); + virtual ~audit_cf_storage_helper() {} + virtual future<> start(const db::config& cfg) override; + virtual future<> stop() override; + virtual future<> write(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error) override; + virtual future<> write_login(const sstring& username, + socket_address node_ip, + socket_address client_ip, + bool error) override; +}; + +} diff --git a/audit/audit_syslog_storage_helper.cc b/audit/audit_syslog_storage_helper.cc new file mode 100644 index 000000000000..dd13a8be3938 --- /dev/null +++ b/audit/audit_syslog_storage_helper.cc @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "audit/audit_syslog_storage_helper.hh" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "cql3/query_processor.hh" +#include "utils/class_registrator.hh" + +namespace cql3 { + +class query_processor; + +} + +namespace audit { + +namespace { + +future<> syslog_send_helper(net::datagram_channel& sender, + const socket_address& address, + const sstring& msg) { + return sender.send(address, net::packet{msg.data(), msg.size()}).handle_exception([address](auto&& exception_ptr) { + auto error_msg = seastar::format( + "Syslog audit backend failed (sending a message to {} resulted in {}).", + address, + exception_ptr + ); + logger.error("{}", error_msg); + throw audit_exception(std::move(error_msg)); + }); +} + +static auto syslog_address_helper(const db::config& cfg) +{ + return cfg.audit_unix_socket_path.is_set() + ? unix_domain_addr(cfg.audit_unix_socket_path()) + : unix_domain_addr(_PATH_LOG); +} + +} + +audit_syslog_storage_helper::audit_syslog_storage_helper(cql3::query_processor& qp, service::migration_manager&) : + _syslog_address(syslog_address_helper(qp.db().get_config())), + _sender(make_unbound_datagram_channel(AF_UNIX)) { +} + +audit_syslog_storage_helper::~audit_syslog_storage_helper() { +} + +/* + * We don't use openlog and syslog directly because it's already used by logger. + * Audit needs to use different ident so than logger but syslog.h uses a global ident + * and it's not possible to use more than one in a program. + * + * To work around it we directly communicate with the socket. + */ +future<> audit_syslog_storage_helper::start(const db::config& cfg) { + if (this_shard_id() != 0) { + return make_ready_future(); + } + + return syslog_send_helper(_sender, _syslog_address, "Initializing syslog audit backend."); +} + +future<> audit_syslog_storage_helper::stop() { + _sender.shutdown_output(); + co_return; +} + +future<> audit_syslog_storage_helper::write(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error) { + auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + tm time; + localtime_r(&now, &time); + sstring msg = seastar::format("<{}>{:%h %e %T} scylla-audit: \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\"", + LOG_NOTICE | LOG_USER, + time, + node_ip, + audit_info->category_string(), + cl, + (error ? "true" : "false"), + audit_info->keyspace(), + audit_info->query(), + client_ip, + audit_info->table(), + username); + + return syslog_send_helper(_sender, _syslog_address, msg); +} + +future<> audit_syslog_storage_helper::write_login(const sstring& username, + socket_address node_ip, + socket_address client_ip, + bool error) { + + auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + tm time; + localtime_r(&now, &time); + sstring msg = seastar::format("<{}>{:%h %e %T} scylla-audit: \"{}\", \"AUTH\", \"\", \"\", \"\", \"\", \"{}\", \"{}\", \"{}\"", + LOG_NOTICE | LOG_USER, + time, + node_ip, + client_ip, + username, + (error ? "true" : "false")); + + co_await syslog_send_helper(_sender, _syslog_address, msg.c_str()); +} + +using registry = class_registrator; +static registry registrator1("audit_syslog_storage_helper"); + +} diff --git a/audit/audit_syslog_storage_helper.hh b/audit/audit_syslog_storage_helper.hh new file mode 100644 index 000000000000..11f14c1ae5c3 --- /dev/null +++ b/audit/audit_syslog_storage_helper.hh @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ +#pragma once + +#include + +#include "audit/audit.hh" +#include "storage_helper.hh" +#include "db/config.hh" + +namespace service { + +class migration_manager; + +}; + +namespace audit { + +class audit_syslog_storage_helper : public storage_helper { + socket_address _syslog_address; + net::datagram_channel _sender; +public: + explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&); + virtual ~audit_syslog_storage_helper(); + virtual future<> start(const db::config& cfg) override; + virtual future<> stop() override; + virtual future<> write(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error) override; + virtual future<> write_login(const sstring& username, + socket_address node_ip, + socket_address client_ip, + bool error) override; +}; + +} diff --git a/audit/storage_helper.hh b/audit/storage_helper.hh new file mode 100644 index 000000000000..be11a2b0b5da --- /dev/null +++ b/audit/storage_helper.hh @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2017 ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ +#pragma once + +#include "audit/audit.hh" +#include + +namespace audit { + +class storage_helper { +public: + using ptr_type = std::unique_ptr; + storage_helper() {} + virtual ~storage_helper() {} + virtual future<> start(const db::config& cfg) = 0; + virtual future<> stop() = 0; + virtual future<> write(const audit_info* audit_info, + socket_address node_ip, + socket_address client_ip, + db::consistency_level cl, + const sstring& username, + bool error) = 0; + virtual future<> write_login(const sstring& username, + socket_address node_ip, + socket_address client_ip, + bool error) = 0; +}; + +} diff --git a/conf/scylla.yaml b/conf/scylla.yaml index 898581778afd..58234e5ada4f 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -564,6 +564,28 @@ commitlog_total_space_in_mb: -1 # it to 0.0.0.0 to listen on all interfaces. # prometheus_address: 1.2.3.4 +# audit settings +# By default, Scylla does not audit anything. +# 'audit' config option controls if and where to output audited events: +# - "none": auditing is disabled (default) +# - "table": save audited events in audit.audit_log column family +# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log) +# audit: "none" +# +# List of statement categories that should be audited. +# audit_categories: "DCL,DDL,AUTH" +# +# List of tables that should be audited. +# audit_tables: ".,." +# +# List of keyspaces that should be fully audited. +# All tables in those keyspaces will be audited +# audit_keyspaces: "," +# +# Overrides the Unix socket path used to connect to syslog. If left unset, it'll +# use the default on the build system, which is usually "/dev/log" +# audit_unix_socket_path: "/dev/log" + # Distribution of data among cores (shards) within a node # # Scylla distributes data within a node among shards, using a round-robin diff --git a/configure.py b/configure.py index 46100309705d..f5d1cf1a85b9 100755 --- a/configure.py +++ b/configure.py @@ -1124,6 +1124,9 @@ def find_ninja(): 'tracing/trace_state.cc', 'tracing/traced_file.cc', 'table_helper.cc', + 'audit/audit.cc', + 'audit/audit_cf_storage_helper.cc', + 'audit/audit_syslog_storage_helper.cc', 'tombstone_gc_options.cc', 'tombstone_gc.cc', 'utils/disk-error-handler.cc', diff --git a/cql3/cql_statement.hh b/cql3/cql_statement.hh index 951eaa2dfd4b..b6531a728a0b 100644 --- a/cql3/cql_statement.hh +++ b/cql3/cql_statement.hh @@ -12,6 +12,7 @@ #include "timeout_config.hh" #include "service/raft/raft_group0_client.hh" +#include "audit/audit.hh" namespace service { @@ -45,6 +46,7 @@ using cql_warnings_vec = std::vector; class cql_statement { timeout_config_selector _timeout_config_selector; + audit::audit_info_ptr _audit_info; public: // CQL statement text seastar::sstring raw_cql_statement; @@ -55,7 +57,8 @@ public: } explicit cql_statement(timeout_config_selector timeout_selector) : _timeout_config_selector(timeout_selector) {} - + cql_statement(cql_statement&& o) = default; + cql_statement(const cql_statement& o) : _timeout_config_selector(o._timeout_config_selector), _audit_info(o._audit_info ? std::make_unique(*o._audit_info) : nullptr) { } virtual ~cql_statement() { } @@ -111,6 +114,11 @@ public: virtual bool is_conditional() const { return false; } + + audit::audit_info* get_audit_info() { return _audit_info.get(); } + void set_audit_info(audit::audit_info_ptr&& info) { _audit_info = std::move(info); } + + virtual void sanitize_audit_info() {} }; class cql_statement_no_metadata : public cql_statement { diff --git a/cql3/statements/batch_statement.hh b/cql3/statements/batch_statement.hh index 6b7aa6bb9f16..9feb8971ce61 100644 --- a/cql3/statements/batch_statement.hh +++ b/cql3/statements/batch_statement.hh @@ -87,6 +87,8 @@ public: std::unique_ptr attrs, cql_stats& stats); + const std::vector& statements() const { return _statements; } + virtual bool depends_on(std::string_view ks_name, std::optional cf_name) const override; virtual uint32_t get_bound_terms() const override; diff --git a/cql3/statements/raw/modification_statement.hh b/cql3/statements/raw/modification_statement.hh index f96b1db51b59..9719b40d6bf4 100644 --- a/cql3/statements/raw/modification_statement.hh +++ b/cql3/statements/raw/modification_statement.hh @@ -28,6 +28,7 @@ class modification_statement; namespace raw { class modification_statement : public cf_statement { + sstring _raw_cql; protected: const std::unique_ptr _attrs; const std::optional _conditions; @@ -41,6 +42,8 @@ public: virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; ::shared_ptr prepare_statement(data_dictionary::database db, prepare_context& ctx, cql_stats& stats); ::shared_ptr prepare(data_dictionary::database db, prepare_context& ctx, cql_stats& stats) const; + void add_raw(sstring&& raw) { _raw_cql = std::move(raw); } + const sstring& get_raw_cql() const { return _raw_cql; } protected: virtual ::shared_ptr prepare_internal(data_dictionary::database db, schema_ptr schema, prepare_context& ctx, std::unique_ptr attrs, cql_stats& stats) const = 0; diff --git a/data_dictionary/data_dictionary.cc b/data_dictionary/data_dictionary.cc index 00cf400ab264..1245a3857ab6 100644 --- a/data_dictionary/data_dictionary.cc +++ b/data_dictionary/data_dictionary.cc @@ -237,9 +237,10 @@ keyspace_metadata::new_keyspace(std::string_view name, locator::replication_strategy_config_options options, std::optional initial_tablets, bool durables_writes, - storage_options storage_opts) + storage_options storage_opts, + std::vector cf_defs) { - return ::make_lw_shared(name, strategy_name, options, initial_tablets, durables_writes, std::vector{}, user_types_metadata{}, storage_opts); + return ::make_lw_shared(name, strategy_name, options, initial_tablets, durables_writes, cf_defs, user_types_metadata{}, storage_opts); } lw_shared_ptr diff --git a/data_dictionary/keyspace_metadata.hh b/data_dictionary/keyspace_metadata.hh index d1a66193fdf5..9c75a10f2ab5 100644 --- a/data_dictionary/keyspace_metadata.hh +++ b/data_dictionary/keyspace_metadata.hh @@ -48,7 +48,8 @@ public: locator::replication_strategy_config_options options, std::optional initial_tablets, bool durables_writes = true, - storage_options storage_opts = {}); + storage_options storage_opts = {}, + std::vector cf_defs = {}); static lw_shared_ptr new_keyspace(const keyspace_metadata& ksm); void validate(const gms::feature_service&, const locator::topology&) const; diff --git a/db/config.cc b/db/config.cc index da76b4d79529..73c712dfba6e 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1259,11 +1259,24 @@ db::config::config(std::shared_ptr exts) , replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.") , replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.") , service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table") + + , audit(this, "audit", value_status::Used, "none", + "Controls the audit feature:\n" + "\n" + "\tnone : No auditing enabled.\n" + "\tsyslog : Audit messages sent to Syslog.\n" + "\ttable : Audit messages written to column family named audit.audit_log.\n") + , audit_categories(this, "audit_categories", value_status::Used, "DCL,DDL,AUTH", "Comma separated list of operation categories that should be audited.") + , audit_tables(this, "audit_tables", value_status::Used, "", "Comma separated list of table names (.) that will be audited.") + , audit_keyspaces(this, "audit_keyspaces", value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited") + , audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writting to syslog. Only applicable when audit is set to syslog.") + , audit_syslog_write_buffer_size(this, "audit_syslog_write_buffer_size", value_status::Used, 1048576, "The size (in bytes) of a write buffer used when writting to syslog socket.") , ldap_url_template(this, "ldap_url_template", value_status::Used, "", "LDAP URL template used by LDAPRoleManager for crafting queries.") , ldap_attr_role(this, "ldap_attr_role", value_status::Used, "", "LDAP attribute containing Scylla role.") , ldap_bind_dn(this, "ldap_bind_dn", value_status::Used, "", "Distinguished name used by LDAPRoleManager for binding to LDAP server.") , ldap_bind_passwd(this, "ldap_bind_passwd", value_status::Used, "", "Password used by LDAPRoleManager for binding to LDAP server.") , saslauthd_socket_path(this, "saslauthd_socket_path", value_status::Used, "", "UNIX domain socket on which saslauthd is listening.") + , error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.") , topology_barrier_stall_detector_threshold_seconds(this, "topology_barrier_stall_detector_threshold_seconds", value_status::Used, 2, "Report sites blocking topology barrier if it takes longer than this.") , enable_tablets(this, "enable_tablets", value_status::Used, false, "Enable tablets for newly created keyspaces.") diff --git a/db/config.hh b/db/config.hh index 630754d1b534..eb337126837a 100644 --- a/db/config.hh +++ b/db/config.hh @@ -504,6 +504,13 @@ public: named_value service_levels_interval; + named_value audit; + named_value audit_categories; + named_value audit_tables; + named_value audit_keyspaces; + named_value audit_unix_socket_path; + named_value audit_syslog_write_buffer_size; + named_value ldap_url_template; named_value ldap_attr_role; named_value ldap_bind_dn; diff --git a/table_helper.cc b/table_helper.cc index 39733e9db231..7a7bd3a3eba4 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -16,6 +16,7 @@ #include "cql3/statements/modification_statement.hh" #include "replica/database.hh" #include "service/migration_manager.hh" +#include "service/storage_proxy.hh" static logging::logger tlogger("table_helper"); @@ -139,7 +140,8 @@ future<> table_helper::insert(cql3::query_processor& qp, service::migration_mana co_await _insert_stmt->execute(qp, qs, opts, std::nullopt); } -future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables) { +future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_strategy_name, + sstring replication_factor, service::query_state& qs, std::vector tables) { if (this_shard_id() != 0) { co_return; } @@ -165,6 +167,15 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat auto ts = group0_guard.write_timestamp(); if (!db.has_keyspace(keyspace_name)) { + std::map opts; + if (replication_strategy_name == "org.apache.cassandra.locator.NetworkTopologyStrategy") { + for (const auto &dc: qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters()) + opts[dc] = replication_factor; + } + else { + opts["replication_factor"] = replication_factor; + } + auto ksm = keyspace_metadata::new_keyspace(keyspace_name, replication_strategy_name, std::move(opts), true); try { co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts), std::move(group0_guard), seastar::format("table_helper: create {} keyspace", keyspace_name)); diff --git a/table_helper.hh b/table_helper.hh index e3c8ac932efc..d064066cdc5e 100644 --- a/table_helper.hh +++ b/table_helper.hh @@ -99,7 +99,8 @@ public: future<> insert(cql3::query_processor& qp, service::migration_manager& mm, service::query_state& qs, noncopyable_function opt_maker); - static future<> setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables); + static future<> setup_keyspace(cql3::query_processor& qp, service::migration_manager& mm, std::string_view keyspace_name, sstring replication_strategy_name, + sstring replication_factor, service::query_state& qs, std::vector tables); /** * Makes a monotonically increasing value in 100ns ("nanos") based on the given time diff --git a/test/lib/CMakeLists.txt b/test/lib/CMakeLists.txt index 0dd73c7ff2c1..cf74cbb0615d 100644 --- a/test/lib/CMakeLists.txt +++ b/test/lib/CMakeLists.txt @@ -28,6 +28,7 @@ target_link_libraries(test-lib Seastar::seastar xxHash::xxhash PRIVATE + audit auth cdc compaction diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index c6067e253f98..b93a7bc0c470 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -212,7 +212,7 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr) future<> trace_keyspace_helper::start(cql3::query_processor& qp, service::migration_manager& mm) { _qp_anchor = &qp; _mm_anchor = &mm; - return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); + return table_helper::setup_keyspace(qp, mm, KEYSPACE_NAME, "org.apache.cassandra.locator.SimpleStrategy", "2", _dummy_query_state, { &_sessions, &_sessions_time_idx, &_events, &_slow_query_log, &_slow_query_log_time_idx }); } gms::inet_address trace_keyspace_helper::my_address() const noexcept {