diff --git a/data_dictionary/storage_options.hh b/data_dictionary/storage_options.hh index f25d6d13b17f..f35820520a07 100644 --- a/data_dictionary/storage_options.hh +++ b/data_dictionary/storage_options.hh @@ -15,6 +15,10 @@ #include "schema/schema_fwd.hh" #include "seastarx.hh" +namespace seastar { + class abort_source; +} + namespace data_dictionary { struct storage_options { @@ -30,6 +34,7 @@ struct storage_options { sstring bucket; sstring endpoint; std::variant location; + seastar::abort_source* abort_source = nullptr; static constexpr std::string_view name = "S3"; static s3 from_map(const std::map&); diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 431389d070a5..28bb4eb90d5e 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -234,12 +234,13 @@ distributed_loader::get_sstables_from_upload_dir(distributed& } future>>> -distributed_loader::get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) { - return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables)] (auto& global_table, auto& directory) { +distributed_loader::get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function get_abort_src) { + return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) { return directory.start(global_table.as_sharded_parameter(), - sharded_parameter([bucket, endpoint, prefix] { + sharded_parameter([bucket, endpoint, prefix, &get_abort_src] { data_dictionary::storage_options opts; - opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix}; + seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr; + opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix, as}; return make_lw_shared(std::move(opts)); }), sstables, diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index 7f102f2b2de1..ef2261558447 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -22,6 +22,10 @@ #include "db/system_keyspace.hh" #include "sstables/sstable_directory.hh" +namespace seastar { + class abort_source; +} + namespace replica { class database; class table; @@ -89,7 +93,7 @@ public: static future>>> get_sstables_from_upload_dir(distributed& db, sstring ks, sstring cf, sstables::sstable_open_config cfg); static future>>> - get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg); + get_sstables_from_object_store(distributed& db, sstring ks, sstring cf, std::vector sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function = {}); static future<> process_upload_dir(distributed& db, sharded& vb, sstring ks_name, sstring cf_name); }; diff --git a/sstables/storage.cc b/sstables/storage.cc index 41ca0a93e8b6..3eb754df3e15 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -507,6 +507,7 @@ class s3_storage : public sstables::storage { shared_ptr _client; sstring _bucket; std::variant _location; + seastar::abort_source* _as; static constexpr auto status_creating = "creating"; static constexpr auto status_sealed = "sealed"; @@ -522,10 +523,11 @@ class s3_storage : public sstables::storage { } public: - s3_storage(shared_ptr client, sstring bucket, std::variant loc) + s3_storage(shared_ptr client, sstring bucket, std::variant loc, seastar::abort_source* as) : _client(std::move(client)) , _bucket(std::move(bucket)) , _location(std::move(loc)) + , _as(as) { } @@ -585,17 +587,17 @@ void s3_storage::open(sstable& sst) { } future s3_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) { - co_return _client->make_readable_file(make_s3_object_name(sst, type)); + co_return _client->make_readable_file(make_s3_object_name(sst, type), _as); } future s3_storage::make_data_or_index_sink(sstable& sst, component_type type) { SCYLLA_ASSERT(type == component_type::Data || type == component_type::Index); // FIXME: if we have file size upper bound upfront, it's better to use make_upload_sink() instead - co_return _client->make_upload_jumbo_sink(make_s3_object_name(sst, type)); + co_return _client->make_upload_jumbo_sink(make_s3_object_name(sst, type), std::nullopt, _as); } future s3_storage::make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) { - co_return _client->make_upload_sink(make_s3_object_name(sst, type)); + co_return _client->make_upload_sink(make_s3_object_name(sst, type), _as); } future<> s3_storage::seal(const sstable& sst) { @@ -676,7 +678,7 @@ std::unique_ptr make_storage(sstables_manager& manager, const }, os.location)) { on_internal_error(sstlog, "S3 storage options is missing 'location'"); } - return std::make_unique(manager.get_endpoint_client(os.endpoint), os.bucket, os.location); + return std::make_unique(manager.get_endpoint_client(os.endpoint), os.bucket, os.location, os.abort_source); } }, s_opts.value); } diff --git a/sstables_loader.cc b/sstables_loader.cc index d06d7bc00e22..015fc0ea92a9 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -496,6 +496,11 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im virtual tasks::is_user_task is_user_task() const noexcept override { return tasks::is_user_task::yes; } + + tasks::is_abortable is_abortable() const noexcept override { + return tasks::is_abortable::yes; + } + virtual future get_progress() const override { llog.debug("get_progress: {}", _num_sstables_processed); unsigned processed = co_await _loader.map_reduce(adder(), [this] (auto&) { @@ -515,14 +520,50 @@ future<> sstables_loader::download_task_impl::run() { .load_bloom_filter = false, }; llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix); - auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg); + + std::vector shard_aborts(smp::count); + auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg, [&] { + return &shard_aborts[this_shard_id()]; + }); llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix); - co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> { - co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false, - [this] (unsigned num_streamed) { - _num_sstables_processed[this_shard_id()] += num_streamed; + std::exception_ptr ex; + gate g; + try { + _as.check(); + + auto s = _as.subscribe([&]() noexcept { + try { + auto h = g.hold(); + (void)smp::invoke_on_all([&shard_aborts, ex = _as.abort_requested_exception_ptr()] { + shard_aborts[this_shard_id()].request_abort_ex(ex); + }).finally([h = std::move(h)] {}); + } catch (...) { + } }); - }); + + co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> { + co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false, [this] (unsigned num_streamed) { + _num_sstables_processed[this_shard_id()] += num_streamed; + }); + }); + } catch (...) { + ex = std::current_exception(); + } + + co_await g.close(); + + if (_as.abort_requested()) { + if (!ex) { + ex = _as.abort_requested_exception_ptr(); + } + } + + if (ex) { + co_await _loader.invoke_on_all([&sstables_on_shards] (sstables_loader&) { + sstables_on_shards[this_shard_id()] = {}; // clear on correct shard + }); + co_await coroutine::return_exception_ptr(std::move(ex)); + } } sstables_loader::sstables_loader(sharded& db, @@ -552,6 +593,7 @@ future sstables_loader::download_new_sstables(sstring ks_name, s throw std::invalid_argument(format("endpoint {} not found", endpoint)); } llog.info("Restore sstables from {}({}) to {}", endpoint, prefix, ks_name); + auto task = co_await _task_manager_module->make_and_start_task({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(prefix), std::move(sstables)); co_return task->id(); } diff --git a/test/object_store/test_backup.py b/test/object_store/test_backup.py index 685ab86c6455..5e222a3cb929 100644 --- a/test/object_store/test_backup.py +++ b/test/object_store/test_backup.py @@ -203,8 +203,7 @@ async def test_backup_is_abortable_in_s3_client(manager: ManagerClient, s3_serve await do_test_backup_abort(manager, s3_server, breakpoint_name="backup_task_pre_upload", min_files=0, max_files=1) -@pytest.mark.asyncio -async def test_simple_backup_and_restore(manager: ManagerClient, s3_server): +async def do_test_simple_backup_and_restore(manager: ManagerClient, s3_server, do_abort = False): '''check that restoring from backed up snapshot for a keyspace:table works''' cfg = {'enable_user_defined_functions': False, @@ -252,7 +251,8 @@ def list_sstables(): # - {suffix}/2-TOC.txt # - ... suffix = 'suffix' - toc_names = [f'{suffix}/{entry.name}' for entry in list_sstables() if entry.name.endswith('TOC.txt')] + old_files = list_sstables(); + toc_names = [f'{suffix}/{entry.name}' for entry in old_files if entry.name.endswith('TOC.txt')] prefix = f'{cf}/{snap_name}' tid = await manager.api.backup(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, f'{prefix}/{suffix}') @@ -270,21 +270,50 @@ def list_sstables(): print('Try to restore') tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names) + + if do_abort: + await manager.api.abort_task(server.ip_addr, tid) + status = await manager.api.wait_task(server.ip_addr, tid) - assert status is not None - assert status['state'] == 'done' - assert status['progress_units'] == "sstables" - assert status['progress_completed'] == len(toc_names) - assert status['progress_total'] == len(toc_names) + if not do_abort: + assert status is not None + assert status['state'] == 'done' + assert status['progress_units'] == "sstables" + assert status['progress_completed'] == len(toc_names) + assert status['progress_total'] == len(toc_names) print('Check that sstables came back') files = list_sstables() - assert len(files) > 0 - print('Check that data came back too') - res = cql.execute(f"SELECT * FROM {ks}.{cf};") - rows = {x.name: x.value for x in res} - assert rows == orig_rows, "Unexpected table contents after restore" + + sstable_names = [f'{entry.name}' for entry in files if entry.name.endswith('.db')] + db_objects = [object for object in objects if object.endswith('.db')] + + if do_abort: + assert len(files) >= 0 + # These checks can be viewed as dubious. We restore (atm) on a mutation basis mostly. + # There is no guarantee we'll generate the same amount of sstables as was in the original + # backup (?). But, since we are not stressing the server here (not provoking memtable flushes), + # we should in principle never generate _more_ sstables than originated the backup. + assert len(old_files) >= len(files) + assert len(sstable_names) <= len(db_objects) + else: + assert len(files) > 0 + assert (status is not None) and (status['state'] == 'done') + print(f'Check that data came back too') + res = cql.execute(f"SELECT * FROM {ks}.{cf};") + rows = { x.name: x.value for x in res } + assert rows == orig_rows, "Unexpected table contents after restore" print('Check that backup files are still there') # regression test for #20938 post_objects = set(o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.filter(Prefix=prefix)) assert objects == post_objects + +@pytest.mark.asyncio +async def test_simple_backup_and_restore(manager: ManagerClient, s3_server): + '''check that restoring from backed up snapshot for a keyspace:table works''' + await do_test_simple_backup_and_restore(manager, s3_server, False) + +@pytest.mark.asyncio +async def test_abort_simple_backup_and_restore(manager: ManagerClient, s3_server): + '''check that restoring from backed up snapshot for a keyspace:table works''' + await do_test_simple_backup_and_restore(manager, s3_server, True) diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 477ecb353069..0789821107d4 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -1198,6 +1198,7 @@ class client::readable_file : public file_impl { shared_ptr _client; sstring _object_name; std::optional _stats; + seastar::abort_source* _as; [[noreturn]] void unsupported() { throw_with_backtrace("unsupported operation on s3 readable file"); @@ -1215,9 +1216,10 @@ class client::readable_file : public file_impl { } public: - readable_file(shared_ptr cln, sstring object_name) + readable_file(shared_ptr cln, sstring object_name, seastar::abort_source* as = nullptr) : _client(std::move(cln)) , _object_name(std::move(object_name)) + , _as(as) { } @@ -1245,7 +1247,8 @@ class client::readable_file : public file_impl { } virtual shared_ptr to_file() && override { - return make_shared(std::move(_h).to_client(), std::move(_object_name)); + // TODO: cannot traverse abort source across shards. + return make_shared(std::move(_h).to_client(), std::move(_object_name), nullptr); } }; @@ -1277,7 +1280,7 @@ class client::readable_file : public file_impl { co_return 0; } - auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len }); + auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len }, _as); std::copy_n(buf.get(), buf.size(), reinterpret_cast(buffer)); co_return buf.size(); } @@ -1288,7 +1291,7 @@ class client::readable_file : public file_impl { co_return 0; } - auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) }); + auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) }, _as); uint64_t off = 0; for (auto& v : iov) { auto sz = std::min(v.iov_len, buf.size() - off); @@ -1307,7 +1310,7 @@ class client::readable_file : public file_impl { co_return temporary_buffer(); } - auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size }); + auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size }, _as); co_return temporary_buffer(reinterpret_cast(buf.get_write()), buf.size(), buf.release()); } @@ -1316,8 +1319,8 @@ class client::readable_file : public file_impl { } }; -file client::make_readable_file(sstring object_name) { - return file(make_shared(shared_from_this(), std::move(object_name))); +file client::make_readable_file(sstring object_name, seastar::abort_source* as) { + return file(make_shared(shared_from_this(), std::move(object_name), as)); } future<> client::close() { diff --git a/utils/s3/client.hh b/utils/s3/client.hh index 40752fc8cfef..cf56852a4d69 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -106,7 +106,7 @@ public: future<> put_object(sstring object_name, ::memory_data_sink_buffers bufs, seastar::abort_source* = nullptr); future<> delete_object(sstring object_name, seastar::abort_source* = nullptr); - file make_readable_file(sstring object_name); + file make_readable_file(sstring object_name, seastar::abort_source* = nullptr); data_sink make_upload_sink(sstring object_name, seastar::abort_source* = nullptr); data_sink make_upload_jumbo_sink(sstring object_name, std::optional max_parts_per_piece = {}, seastar::abort_source* = nullptr); /// upload a file with specified path to s3