Skip to content

Commit

Permalink
Merge 'Make restore task abortable' from Calle Wilund
Browse files Browse the repository at this point in the history
Fixes scylladb#20717

Enables abortable interface and propagates abort_source to all s3 objects used for reading the restore data.

Note: because restore is done on each shard, we have to maintain a per-shard abort source proxy for each, and do a background per-shard abort on abort call. This is synced at the end of "run()".

Abort source is added as an optional parameter to s3 storage and the s3 path in distributed loader.

There is no attempt to "clean up" an aborted restore. As we read on a mutation level from remote sstables, we should not cause incomplete sstables as such, even though we might end up of course with partial data restored.

Closes scylladb#21567

* github.com:scylladb/scylladb:
  test_backup: Add restore abort test case
  sstables_loader: Make restore task abortable
  distributed_loader: Add optional abort_source to get_sstables_from_object_store
  s3_storage: Add optional abort_source to params/object
  s3::client: Make "readable_file" abortable
  • Loading branch information
xemul committed Dec 19, 2024
2 parents 2a31a82 + 91d7798 commit bb094cc
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 37 deletions.
5 changes: 5 additions & 0 deletions data_dictionary/storage_options.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
#include "schema/schema_fwd.hh"
#include "seastarx.hh"

namespace seastar {
class abort_source;
}

namespace data_dictionary {

struct storage_options {
Expand All @@ -30,6 +34,7 @@ struct storage_options {
sstring bucket;
sstring endpoint;
std::variant<sstring, table_id> location;
seastar::abort_source* abort_source = nullptr;
static constexpr std::string_view name = "S3";

static s3 from_map(const std::map<sstring, sstring>&);
Expand Down
9 changes: 5 additions & 4 deletions replica/distributed_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,13 @@ distributed_loader::get_sstables_from_upload_dir(distributed<replica::database>&
}

future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables)] (auto& global_table, auto& directory) {
distributed_loader::get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sharded_parameter([bucket, endpoint, prefix] {
sharded_parameter([bucket, endpoint, prefix, &get_abort_src] {
data_dictionary::storage_options opts;
opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix};
seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr;
opts.value = data_dictionary::storage_options::s3{bucket, endpoint, prefix, as};
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}),
sstables,
Expand Down
6 changes: 5 additions & 1 deletion replica/distributed_loader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include "db/system_keyspace.hh"
#include "sstables/sstable_directory.hh"

namespace seastar {
class abort_source;
}

namespace replica {
class database;
class table;
Expand Down Expand Up @@ -89,7 +93,7 @@ public:
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg);
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
};

Expand Down
12 changes: 7 additions & 5 deletions sstables/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ class s3_storage : public sstables::storage {
shared_ptr<s3::client> _client;
sstring _bucket;
std::variant<sstring, table_id> _location;
seastar::abort_source* _as;

static constexpr auto status_creating = "creating";
static constexpr auto status_sealed = "sealed";
Expand All @@ -522,10 +523,11 @@ class s3_storage : public sstables::storage {
}

public:
s3_storage(shared_ptr<s3::client> client, sstring bucket, std::variant<sstring, table_id> loc)
s3_storage(shared_ptr<s3::client> client, sstring bucket, std::variant<sstring, table_id> loc, seastar::abort_source* as)
: _client(std::move(client))
, _bucket(std::move(bucket))
, _location(std::move(loc))
, _as(as)
{
}

Expand Down Expand Up @@ -585,17 +587,17 @@ void s3_storage::open(sstable& sst) {
}

future<file> s3_storage::open_component(const sstable& sst, component_type type, open_flags flags, file_open_options options, bool check_integrity) {
co_return _client->make_readable_file(make_s3_object_name(sst, type));
co_return _client->make_readable_file(make_s3_object_name(sst, type), _as);
}

future<data_sink> s3_storage::make_data_or_index_sink(sstable& sst, component_type type) {
SCYLLA_ASSERT(type == component_type::Data || type == component_type::Index);
// FIXME: if we have file size upper bound upfront, it's better to use make_upload_sink() instead
co_return _client->make_upload_jumbo_sink(make_s3_object_name(sst, type));
co_return _client->make_upload_jumbo_sink(make_s3_object_name(sst, type), std::nullopt, _as);
}

future<data_sink> s3_storage::make_component_sink(sstable& sst, component_type type, open_flags oflags, file_output_stream_options options) {
co_return _client->make_upload_sink(make_s3_object_name(sst, type));
co_return _client->make_upload_sink(make_s3_object_name(sst, type), _as);
}

future<> s3_storage::seal(const sstable& sst) {
Expand Down Expand Up @@ -676,7 +678,7 @@ std::unique_ptr<sstables::storage> make_storage(sstables_manager& manager, const
}, os.location)) {
on_internal_error(sstlog, "S3 storage options is missing 'location'");
}
return std::make_unique<sstables::s3_storage>(manager.get_endpoint_client(os.endpoint), os.bucket, os.location);
return std::make_unique<sstables::s3_storage>(manager.get_endpoint_client(os.endpoint), os.bucket, os.location, os.abort_source);
}
}, s_opts.value);
}
Expand Down
54 changes: 48 additions & 6 deletions sstables_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@ class sstables_loader::download_task_impl : public tasks::task_manager::task::im
virtual tasks::is_user_task is_user_task() const noexcept override {
return tasks::is_user_task::yes;
}

tasks::is_abortable is_abortable() const noexcept override {
return tasks::is_abortable::yes;
}

virtual future<tasks::task_manager::task::progress> get_progress() const override {
llog.debug("get_progress: {}", _num_sstables_processed);
unsigned processed = co_await _loader.map_reduce(adder<unsigned>(), [this] (auto&) {
Expand All @@ -515,14 +520,50 @@ future<> sstables_loader::download_task_impl::run() {
.load_bloom_filter = false,
};
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg);

std::vector<seastar::abort_source> shard_aborts(smp::count);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg, [&] {
return &shard_aborts[this_shard_id()];
});
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false,
[this] (unsigned num_streamed) {
_num_sstables_processed[this_shard_id()] += num_streamed;
std::exception_ptr ex;
gate g;
try {
_as.check();

auto s = _as.subscribe([&]() noexcept {
try {
auto h = g.hold();
(void)smp::invoke_on_all([&shard_aborts, ex = _as.abort_requested_exception_ptr()] {
shard_aborts[this_shard_id()].request_abort_ex(ex);
}).finally([h = std::move(h)] {});
} catch (...) {
}
});
});

co_await _loader.invoke_on_all([this, &sstables_on_shards, table_id] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(_ks, _cf, table_id, std::move(sstables_on_shards[this_shard_id()]), false, false, [this] (unsigned num_streamed) {
_num_sstables_processed[this_shard_id()] += num_streamed;
});
});
} catch (...) {
ex = std::current_exception();
}

co_await g.close();

if (_as.abort_requested()) {
if (!ex) {
ex = _as.abort_requested_exception_ptr();
}
}

if (ex) {
co_await _loader.invoke_on_all([&sstables_on_shards] (sstables_loader&) {
sstables_on_shards[this_shard_id()] = {}; // clear on correct shard
});
co_await coroutine::return_exception_ptr(std::move(ex));
}
}

sstables_loader::sstables_loader(sharded<replica::database>& db,
Expand Down Expand Up @@ -552,6 +593,7 @@ future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, s
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
llog.info("Restore sstables from {}({}) to {}", endpoint, prefix, ks_name);

auto task = co_await _task_manager_module->make_and_start_task<download_task_impl>({}, container(), std::move(endpoint), std::move(bucket), std::move(ks_name), std::move(cf_name), std::move(prefix), std::move(sstables));
co_return task->id();
}
55 changes: 42 additions & 13 deletions test/object_store/test_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ async def test_backup_is_abortable_in_s3_client(manager: ManagerClient, s3_serve
await do_test_backup_abort(manager, s3_server, breakpoint_name="backup_task_pre_upload", min_files=0, max_files=1)


@pytest.mark.asyncio
async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
async def do_test_simple_backup_and_restore(manager: ManagerClient, s3_server, do_abort = False):
'''check that restoring from backed up snapshot for a keyspace:table works'''

cfg = {'enable_user_defined_functions': False,
Expand Down Expand Up @@ -252,7 +251,8 @@ def list_sstables():
# - {suffix}/2-TOC.txt
# - ...
suffix = 'suffix'
toc_names = [f'{suffix}/{entry.name}' for entry in list_sstables() if entry.name.endswith('TOC.txt')]
old_files = list_sstables();
toc_names = [f'{suffix}/{entry.name}' for entry in old_files if entry.name.endswith('TOC.txt')]

prefix = f'{cf}/{snap_name}'
tid = await manager.api.backup(server.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, f'{prefix}/{suffix}')
Expand All @@ -270,21 +270,50 @@ def list_sstables():

print('Try to restore')
tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names)

if do_abort:
await manager.api.abort_task(server.ip_addr, tid)

status = await manager.api.wait_task(server.ip_addr, tid)
assert status is not None
assert status['state'] == 'done'
assert status['progress_units'] == "sstables"
assert status['progress_completed'] == len(toc_names)
assert status['progress_total'] == len(toc_names)
if not do_abort:
assert status is not None
assert status['state'] == 'done'
assert status['progress_units'] == "sstables"
assert status['progress_completed'] == len(toc_names)
assert status['progress_total'] == len(toc_names)

print('Check that sstables came back')
files = list_sstables()
assert len(files) > 0
print('Check that data came back too')
res = cql.execute(f"SELECT * FROM {ks}.{cf};")
rows = {x.name: x.value for x in res}
assert rows == orig_rows, "Unexpected table contents after restore"

sstable_names = [f'{entry.name}' for entry in files if entry.name.endswith('.db')]
db_objects = [object for object in objects if object.endswith('.db')]

if do_abort:
assert len(files) >= 0
# These checks can be viewed as dubious. We restore (atm) on a mutation basis mostly.
# There is no guarantee we'll generate the same amount of sstables as was in the original
# backup (?). But, since we are not stressing the server here (not provoking memtable flushes),
# we should in principle never generate _more_ sstables than originated the backup.
assert len(old_files) >= len(files)
assert len(sstable_names) <= len(db_objects)
else:
assert len(files) > 0
assert (status is not None) and (status['state'] == 'done')
print(f'Check that data came back too')
res = cql.execute(f"SELECT * FROM {ks}.{cf};")
rows = { x.name: x.value for x in res }
assert rows == orig_rows, "Unexpected table contents after restore"

print('Check that backup files are still there') # regression test for #20938
post_objects = set(o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.filter(Prefix=prefix))
assert objects == post_objects

@pytest.mark.asyncio
async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
'''check that restoring from backed up snapshot for a keyspace:table works'''
await do_test_simple_backup_and_restore(manager, s3_server, False)

@pytest.mark.asyncio
async def test_abort_simple_backup_and_restore(manager: ManagerClient, s3_server):
'''check that restoring from backed up snapshot for a keyspace:table works'''
await do_test_simple_backup_and_restore(manager, s3_server, True)
17 changes: 10 additions & 7 deletions utils/s3/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ class client::readable_file : public file_impl {
shared_ptr<client> _client;
sstring _object_name;
std::optional<stats> _stats;
seastar::abort_source* _as;

[[noreturn]] void unsupported() {
throw_with_backtrace<std::logic_error>("unsupported operation on s3 readable file");
Expand All @@ -1215,9 +1216,10 @@ class client::readable_file : public file_impl {
}

public:
readable_file(shared_ptr<client> cln, sstring object_name)
readable_file(shared_ptr<client> cln, sstring object_name, seastar::abort_source* as = nullptr)
: _client(std::move(cln))
, _object_name(std::move(object_name))
, _as(as)
{
}

Expand Down Expand Up @@ -1245,7 +1247,8 @@ class client::readable_file : public file_impl {
}

virtual shared_ptr<file_impl> to_file() && override {
return make_shared<readable_file>(std::move(_h).to_client(), std::move(_object_name));
// TODO: cannot traverse abort source across shards.
return make_shared<readable_file>(std::move(_h).to_client(), std::move(_object_name), nullptr);
}
};

Expand Down Expand Up @@ -1277,7 +1280,7 @@ class client::readable_file : public file_impl {
co_return 0;
}

auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len });
auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, len }, _as);
std::copy_n(buf.get(), buf.size(), reinterpret_cast<uint8_t*>(buffer));
co_return buf.size();
}
Expand All @@ -1288,7 +1291,7 @@ class client::readable_file : public file_impl {
co_return 0;
}

auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) });
auto buf = co_await _client->get_object_contiguous(_object_name, range{ pos, utils::iovec_len(iov) }, _as);
uint64_t off = 0;
for (auto& v : iov) {
auto sz = std::min(v.iov_len, buf.size() - off);
Expand All @@ -1307,7 +1310,7 @@ class client::readable_file : public file_impl {
co_return temporary_buffer<uint8_t>();
}

auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size });
auto buf = co_await _client->get_object_contiguous(_object_name, range{ offset, range_size }, _as);
co_return temporary_buffer<uint8_t>(reinterpret_cast<uint8_t*>(buf.get_write()), buf.size(), buf.release());
}

Expand All @@ -1316,8 +1319,8 @@ class client::readable_file : public file_impl {
}
};

file client::make_readable_file(sstring object_name) {
return file(make_shared<readable_file>(shared_from_this(), std::move(object_name)));
file client::make_readable_file(sstring object_name, seastar::abort_source* as) {
return file(make_shared<readable_file>(shared_from_this(), std::move(object_name), as));
}

future<> client::close() {
Expand Down
2 changes: 1 addition & 1 deletion utils/s3/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public:
future<> put_object(sstring object_name, ::memory_data_sink_buffers bufs, seastar::abort_source* = nullptr);
future<> delete_object(sstring object_name, seastar::abort_source* = nullptr);

file make_readable_file(sstring object_name);
file make_readable_file(sstring object_name, seastar::abort_source* = nullptr);
data_sink make_upload_sink(sstring object_name, seastar::abort_source* = nullptr);
data_sink make_upload_jumbo_sink(sstring object_name, std::optional<unsigned> max_parts_per_piece = {}, seastar::abort_source* = nullptr);
/// upload a file with specified path to s3
Expand Down

0 comments on commit bb094cc

Please sign in to comment.