diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb1662db8..811311fa46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ ### Internals * Update TestAppSession to allow scope-based usage for restarting the local app resources. ([PR #7672](https://github.com/realm/realm-core/pull/7672)) +* Added test to verify upload progress notification reporting during a client reset. [PR #7958](https://github.com/realm/realm-core/pull/7958) ---------------------------------------------- diff --git a/test/object-store/sync/session/progress_notifications.cpp b/test/object-store/sync/session/progress_notifications.cpp index 2d5cc2f8ce..bfba412c95 100644 --- a/test/object-store/sync/session/progress_notifications.cpp +++ b/test/object-store/sync/session/progress_notifications.cpp @@ -20,7 +20,7 @@ #include -#if REALM_ENABLE_AUTH_TESTS +#ifdef REALM_ENABLE_AUTH_TESTS #include "util/test_file.hpp" #include "util/sync/flx_sync_harness.hpp" #include "util/sync/sync_test_utils.hpp" @@ -28,6 +28,7 @@ #include #include #include +#include using namespace realm::app; #endif @@ -1051,7 +1052,7 @@ TEST_CASE("progress notification", "[sync][session][progress]") { } } -#if REALM_ENABLE_AUTH_TESTS +#ifdef REALM_ENABLE_AUTH_TESTS struct TestSetup { TableRef get_table(const SharedRealm& r) @@ -1059,20 +1060,27 @@ struct TestSetup { return r->read_group().get_table("class_" + table_name); } - size_t add_objects(SharedRealm& r, int num) + size_t add_objects(SharedRealm& r, int num, size_t data_size = 1024 * 1024) { CppContext ctx(r); for (int i = 0; i < num; ++i) { // use specifically separate transactions for a bit of history r->begin_transaction(); - Object::create(ctx, r, StringData(table_name), std::any(make_one(i))); + Object::create(ctx, r, StringData(table_name), std::any(make_one(i, data_size))); r->commit_transaction(); } return get_table(r)->size(); } + AutoVerifiedEmailCredentials create_user_and_log_in() + { + return ::create_user_and_log_in(app()); + } + virtual SyncTestFile make_config() = 0; - virtual AnyDict make_one(int64_t idx) = 0; + virtual AnyDict make_one(int64_t idx, size_t data_size) = 0; + virtual SharedApp app() const = 0; + virtual const AppSession& app_session() const = 0; std::string table_name; }; @@ -1088,17 +1096,32 @@ struct PBS : TestSetup { return SyncTestFile(session.app()->current_user(), partition, get_default_schema()); } - AnyDict make_one(int64_t /* idx */) override + AnyDict make_one(int64_t /* idx */, size_t data_size) override { return AnyDict{{"_id", std::any(ObjectId::gen())}, {"breed", std::string("bulldog")}, - {"name", random_string(1024 * 1024)}}; + {"name", random_string(data_size)}}; + } + + SharedApp app() const override + { + return session.app(); + } + + const AppSession& app_session() const override + { + return session.app_session(); } TestAppSession session; const std::string partition = random_string(100); }; +static std::ostream& operator<<(std::ostream& os, const PBS&) +{ + return os << "PBS"; +} + struct FLX : TestSetup { FLX(const std::string& app_id = "flx_sync_progress") : harness(app_id) @@ -1128,16 +1151,31 @@ struct FLX : TestSetup { sub.commit(); } - AnyDict make_one(int64_t idx) override + AnyDict make_one(int64_t idx, size_t data_size) override { return AnyDict{{"_id", ObjectId::gen()}, {"queryable_int_field", idx}, - {"queryable_str_field", random_string(1024 * 1024)}}; + {"queryable_str_field", random_string(data_size)}}; + } + + SharedApp app() const override + { + return harness.app(); + } + + const AppSession& app_session() const override + { + return harness.session().app_session(); } FLXSyncTestHarness harness; }; +static std::ostream& operator<<(std::ostream& os, const FLX&) +{ + return os << "FLX"; +} + struct ProgressIncreasesMatcher : Catch::Matchers::MatcherGenericBase { enum MatchMode { ByteCountOnly, All }; ProgressIncreasesMatcher() = default; @@ -1558,4 +1596,196 @@ TEST_CASE("sync progress: flx download progress", "[sync][baas][progress]") { } } +TEMPLATE_TEST_CASE("sync progress: upload progress during client reset", "[sync][baas][progress][client reset]", PBS, + FLX) +{ + std::mutex progress_mutex; + std::vector streaming_progress; + std::vector non_streaming_progress; + + enum TestMode { NO_CHANGES, LOCAL_CHANGES, REMOTE_CHANGES, BOTH_CHANGED, BOTH_CHANGED_W_DISCARD }; + auto xlate_test_mode = [](TestMode tm) -> std::string_view { + switch (tm) { + case NO_CHANGES: + return "no local or remote changes"; + case LOCAL_CHANGES: + return "local changes only"; + case REMOTE_CHANGES: + return "remote changes only"; + case BOTH_CHANGED: + return "both local and remote changes"; + case BOTH_CHANGED_W_DISCARD: + return "both local and remote changes"; + } + FAIL(util::format("Missing case for unhandled TestMode value: ", static_cast(tm))); + REALM_UNREACHABLE(); + }; + + auto logger = util::Logger::get_default_logger(); + TestType setup; + auto test_mode = GENERATE(TestMode::NO_CHANGES, TestMode::LOCAL_CHANGES, TestMode::REMOTE_CHANGES, + TestMode::BOTH_CHANGED, TestMode::BOTH_CHANGED_W_DISCARD); + + // Set up the main realm for the test + auto config = setup.make_config(); + auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler(); + config.sync_config->notify_after_client_reset = reset_handler; + if (test_mode == TestMode::BOTH_CHANGED_W_DISCARD) { + config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal; + } + else { + config.sync_config->client_resync_mode = ClientResyncMode::Recover; + } + + logger->debug("PROGRESS TEST: %1 upload progress notifications after %2 client reset with %3", setup, + config.sync_config->client_resync_mode, xlate_test_mode(test_mode)); + + // Functions to create the progress notification callbacks + auto make_streaming_cb = [&](std::string_view desc) { + return [&, desc](uint64_t transferred, uint64_t transferrable, double estimate) { + logger->debug("PROGRESS TEST: %1 Progress callback called xferred: %2, xferrable: %3, estimate: %4", desc, + transferred, transferrable, estimate_to_string(estimate)); + std::lock_guard lk(progress_mutex); + streaming_progress.push_back(ProgressEntry{transferred, transferrable, estimate}); + }; + }; + auto make_non_streaming_cb = [&](std::string_view desc) { + return [&, desc](uint64_t transferred, uint64_t transferrable, double estimate) { + logger->debug("PROGRESS TEST: %1 Progress callback called xferred: %2, xferrable: %3, estimate: %4", desc, + transferred, transferrable, estimate_to_string(estimate)); + std::lock_guard lk(progress_mutex); + non_streaming_progress.push_back(ProgressEntry{transferred, transferrable, estimate}); + }; + }; + + auto wait_for_sync = [](SharedRealm& r) { + // If a FLX session, also wait for the subscription to complete + if (r->config().sync_config->flx_sync_requested) { + auto sub = r->get_latest_subscription_set(); + REQUIRE(sub.state() != sync::SubscriptionSet::State::Error); + if (sub.state() != sync::SubscriptionSet::State::Complete) { + auto result = + sub.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw(); + REQUIRE(result.is_ok()); + REQUIRE(result.get_value() == sync::SubscriptionSet::State::Complete); + } + } + wait_for_download(*r); + wait_for_upload(*r); + }; + + { + // Setup the realm and add some data (FLX subscription is added during initialization) + auto realm = Realm::get_shared_realm(config); + + // For FLX sessions, don't create any more subscriptions for future realms + config.sync_config->rerun_init_subscription_on_open = false; + config.sync_config->subscription_initializer = nullptr; + + // Add some data and wait for upload + setup.add_objects(realm, 10, 100); + wait_for_sync(realm); // wait for sync/subs to complete + realm->sync_session()->shutdown_and_wait(); // Close the sync session + + // Set up some local changes if the test calls for it + if (test_mode == TestMode::LOCAL_CHANGES || test_mode == TestMode::BOTH_CHANGED || + test_mode == TestMode::BOTH_CHANGED_W_DISCARD) { + logger->trace("PROGRESS TEST: adding local objects"); + setup.add_objects(realm, 5, 100); // Add some local objects while offline + } + + // Set up some remote changes if the test calls for it + if (test_mode == TestMode::REMOTE_CHANGES || test_mode == TestMode::BOTH_CHANGED || + test_mode == TestMode::BOTH_CHANGED_W_DISCARD) { + logger->trace("PROGRESS TEST: adding remote objects"); + // Make a new config for a different user + setup.create_user_and_log_in(); + auto remote_config = setup.make_config(); // Includes the new user just created + auto remote_realm = Realm::get_shared_realm(remote_config); + setup.add_objects(remote_realm, 5, 100); // Add some objects remotely + wait_for_sync(remote_realm); // wait for sync/subs to complete + } + reset_utils::trigger_client_reset(setup.app_session(), realm); + } + auto realm = Realm::get_shared_realm(config); + // Register progress notifiers + realm->sync_session()->register_progress_notifier(make_non_streaming_cb("Non-Streaming Upload"), + NotifierType::upload, false); + realm->sync_session()->register_progress_notifier(make_streaming_cb("Streaming Upload"), NotifierType::upload, + true); + + // Wait for the client reset to complete + auto status = wait_for_future(std::move(reset_future)).get_no_throw(); + if (!status.is_ok()) { + FAIL(status.get_status()); + } + + // Progress notifications may not have been sent yet - wait for sync after client reset + wait_for_download(*realm); + wait_for_upload(*realm); + + { + std::lock_guard lk(progress_mutex); + logger->debug("PROGRESS TEST: retrieved progress calls: streaming - %1, non-streaming - %2", + streaming_progress.size(), non_streaming_progress.size()); + + auto print_progress = [&logger](const std::vector& entries) { + if (!logger->would_log(util::Logger::Level::trace)) + return; // don't print if wouldn't log + for (size_t i = 0; i < entries.size(); ++i) { + auto& entry = entries[i]; + logger->trace("PROGRESS TEST: entry[%1] - transferrable: %2 - transferred: %3 - estimate: %4", i, + entry.transferrable, entry.transferred, estimate_to_string(entry.estimate)); + } + }; + logger->trace("PROGRESS TEST: streaming progress size: %1", streaming_progress.size()); + print_progress(streaming_progress); + logger->trace("PROGRESS TEST: non-streaming progress size: %1", non_streaming_progress.size()); + print_progress(non_streaming_progress); + + // Validations for no changes, remote only changes, or both changes with discard local client reset + if (test_mode == TestMode::NO_CHANGES || test_mode == TestMode::REMOTE_CHANGES || + test_mode == TestMode::BOTH_CHANGED_W_DISCARD) { + // Sometimes a second upload would be sent, resulting in a size of 2 + REQUIRE(streaming_progress.size() > 0); + REQUIRE(streaming_progress[0] == ProgressEntry{0, 0, 1.0}); + REQUIRE(non_streaming_progress.size() > 0); + // Needs to be changed to 1.0 after PR #7957 is merged + REQUIRE(non_streaming_progress[0] == ProgressEntry{0, 0, 0.0}); + } + // Validations for local changes only or both local and remote changes + else if (test_mode == TestMode::LOCAL_CHANGES || test_mode == TestMode::BOTH_CHANGED) { + // Multiple notifications may sent for the changes to upload after client reset + if (config.sync_config->flx_sync_requested) { + // FLX sessions report upload progress as a single notification + REQUIRE(streaming_progress.size() > 0); + REQUIRE(non_streaming_progress.size() > 0); + } + else { + // PBS sessions report upload progress when changes are uploaded and when upload is acked + REQUIRE(streaming_progress.size() > 1); + REQUIRE(non_streaming_progress.size() > 1); + } + REQUIRE(streaming_progress.back().estimate == 1.0); // should end with progress of 1.0 + REQUIRE(non_streaming_progress.back().estimate == 1.0); // should end with progress of 1.0 + } + else { + // Unhandled TestMode case + FAIL(util::format("Unhandled TestMode case: ", static_cast(test_mode))); + } + } + + streaming_progress.clear(); + non_streaming_progress.clear(); + + // Verify the streaming notifications are still received and non-streaming notifications have expired + setup.add_objects(realm, 5, 100); + wait_for_upload(*realm); + + // More streaming upload notifications were received + REQUIRE(streaming_progress.size() > 0); + // Non-streaming upload notification callback was expired and no more were received + REQUIRE(non_streaming_progress.size() == 0); +} + #endif