diff --git a/score/mw/com/example/ipc_bridge/BUILD b/score/mw/com/example/ipc_bridge/BUILD index 3979affd..bbb5ccf4 100644 --- a/score/mw/com/example/ipc_bridge/BUILD +++ b/score/mw/com/example/ipc_bridge/BUILD @@ -32,6 +32,57 @@ cc_binary( ], ) +cc_binary( + name = "find_stop_find_test", + srcs = ["find_stop_find_test.cpp"], + data = ["etc/mw_com_config.json"], + features = COMPILER_WARNING_FEATURES, + deps = [ + ":datatype", + "//score/mw/com", + "@score_baselibs//score/concurrency", + "@score_baselibs//score/mw/log", + ], +) + +cc_binary( + name = "find_long_running_handler_test", + srcs = ["find_long_running_handler_test.cpp"], + data = ["etc/mw_com_config.json"], + features = COMPILER_WARNING_FEATURES, + deps = [ + ":datatype", + "//score/mw/com", + "@score_baselibs//score/concurrency", + "@score_baselibs//score/mw/log", + ], +) + +cc_binary( + name = "find_inter_stop_test", + srcs = ["find_inter_stop_test.cpp"], + data = ["etc/mw_com_config.json"], + features = COMPILER_WARNING_FEATURES, + deps = [ + ":datatype", + "//score/mw/com", + "@score_baselibs//score/concurrency", + "@score_baselibs//score/mw/log", + ], +) + +cc_binary( + name = "find_concurrent_stop_test", + srcs = ["find_concurrent_stop_test.cpp"], + data = ["etc/mw_com_config.json"], + features = COMPILER_WARNING_FEATURES, + deps = [ + ":datatype", + "//score/mw/com", + "@score_baselibs//score/concurrency", + "@score_baselibs//score/mw/log", + ], +) cc_library( name = "sample_sender_receiver", srcs = [ diff --git a/score/mw/com/example/ipc_bridge/etc/mw_com_config.json b/score/mw/com/example/ipc_bridge/etc/mw_com_config.json index 6e72ad2b..a554c05b 100644 --- a/score/mw/com/example/ipc_bridge/etc/mw_com_config.json +++ b/score/mw/com/example/ipc_bridge/etc/mw_com_config.json @@ -56,6 +56,40 @@ "maxSubscribers": 5 } ] + } + ] +}, +{ + "instanceSpecifier": "xpad/cp60/MapApiLanesStamped_B", + "serviceTypeName": "/bmw/adp/MapApiLanesStamped", + "version": { + "major": 1, + "minor": 0 + }, + "instances": [ + { + "instanceId": 2, + "allowedConsumer": { + "QM": [ + 4002, + 0 + ] + }, + "allowedProvider": { + "QM": [ + 4001, + 0 + ] + }, + "asil-level": "QM", + "binding": "SHM", + "events": [ + { + "eventName": "map_api_lanes_stamped", + "numberOfSampleSlots": 30, + "maxSubscribers": 5 + } + ] } ] } diff --git a/score/mw/com/example/ipc_bridge/find_concurrent_stop_test.cpp b/score/mw/com/example/ipc_bridge/find_concurrent_stop_test.cpp new file mode 100644 index 00000000..1db13b38 --- /dev/null +++ b/score/mw/com/example/ipc_bridge/find_concurrent_stop_test.cpp @@ -0,0 +1,136 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + + +#include "datatype.h" +#include "score/mw/com/runtime.h" +#include "score/mw/com/types.h" + +#include + +#include +#include +#include +#include + +using namespace std::chrono_literals; +using IpcBridgeProxy = score::mw::com::AsProxy; + +/// \brief Runs the service provider (skeleton) logic. +void RunSkeleton(const score::mw::com::InstanceSpecifier& instance_specifier, std::future test_complete_future) +{ + auto create_result = score::mw::com::AsSkeleton::Create(instance_specifier); + if (!create_result.has_value()) + { + std::cerr << "SKELETON: Unable to construct skeleton: " << create_result.error() << ", bailing!\n"; + return; + } + auto& skeleton = create_result.value(); + + const auto offer_result = skeleton.OfferService(); + if (!offer_result.has_value()) + { + std::cerr << "SKELETON: Unable to offer service: " << offer_result.error() << ", bailing!\n"; + return; + } + std::cout << "SKELETON: Service offered successfully.\n"; + + // Wait until the main test logic is complete. + test_complete_future.wait(); + + std::cout << "SKELETON: Stopping offer service...\n"; + skeleton.StopOfferService(); + std::cout << "SKELETON: Terminating.\n"; +} + +/// \brief Main entry point for the concurrent stop test. +/// This test verifies that calling StopFindService for the same handle from two threads +/// concurrently is handled gracefully. +int main() +{ + // 1. Initialize the communication runtime. + score::mw::com::runtime::RuntimeConfiguration config{"/etc/mw_com_config.json"}; + score::mw::com::runtime::InitializeRuntime(config); + std::cout << "MAIN: Communication runtime initialized.\n"; + + // 2. Create the instance specifier for the service we want to test. + const auto instance_specifier_result = + score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped"}); + if (!instance_specifier_result.has_value()) + { + std::cerr << "MAIN: Invalid instance specifier: " << instance_specifier_result.error() << ", bailing!\n"; + return EXIT_FAILURE; + } + const auto& instance_specifier = instance_specifier_result.value(); + + // 3. Start the skeleton in a background thread. + std::promise test_complete_promise; + auto test_complete_future = test_complete_promise.get_future(); + std::thread skeleton_thread(RunSkeleton, std::cref(instance_specifier), std::move(test_complete_future)); + + // 4. Set up synchronization primitives for the concurrent stop calls. + std::promise handler_finished_promise; + auto handler_finished_future = handler_finished_promise.get_future(); + std::promise release_stopper_promise; + auto release_stopper_future = release_stopper_promise.get_future(); + + // 5. Define the service discovery handler. + auto find_service_handler = + [&release_stopper_promise, &handler_finished_promise]( + const score::mw::com::ServiceHandleContainer&, + score::mw::com::FindServiceHandle find_handle_in_handler) { + std::cout << "HANDLER: Service found. Releasing stopper thread and stopping find...\n"; + // Release the stopper thread to create a race. + release_stopper_promise.set_value(); + // The handler also tries to stop. + auto result = IpcBridgeProxy::StopFindService(find_handle_in_handler); + std::cout << "HANDLER: StopFindService called with result: " + << (result.has_value() ? "success" : result.error().Message()) << "\n"; + handler_finished_promise.set_value(); + }; + + // 6. Start asynchronous service discovery. + std::cout << "MAIN: Starting to find service asynchronously...\n"; + auto find_handle_result = IpcBridgeProxy::StartFindService(find_service_handler, instance_specifier); + if (!find_handle_result.has_value()) + { + std::cerr << "MAIN: Failed to start service discovery: " << find_handle_result.error() << ", bailing!\n"; + test_complete_promise.set_value(); + skeleton_thread.join(); + return EXIT_FAILURE; + } + auto find_handle = find_handle_result.value(); + + // 7. Create and start the "stopper" thread. + std::thread stopper_thread([&release_stopper_future, find_handle]() { + std::cout << "STOPPER: Waiting for release signal from handler...\n"; + release_stopper_future.wait(); + std::cout << "STOPPER: Released. Calling StopFindService...\n"; + // The stopper thread also tries to stop. + auto result = IpcBridgeProxy::StopFindService(find_handle); + std::cout << "STOPPER: StopFindService called with result: " + << (result.has_value() ? "success" : result.error().Message()) << "\n"; + }); + + // 8. Wait for the handler to complete its logic. + handler_finished_future.wait(); + std::cout << "MAIN: Test logic complete.\n"; + + // 9. Clean up all threads. + stopper_thread.join(); + test_complete_promise.set_value(); + skeleton_thread.join(); + + std::cout << "MAIN: Test finished successfully.\n"; + return EXIT_SUCCESS; +} diff --git a/score/mw/com/example/ipc_bridge/find_inter_stop_test.cpp b/score/mw/com/example/ipc_bridge/find_inter_stop_test.cpp new file mode 100644 index 00000000..dd0e713a --- /dev/null +++ b/score/mw/com/example/ipc_bridge/find_inter_stop_test.cpp @@ -0,0 +1,159 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + + +#include "datatype.h" +#include "score/mw/com/runtime.h" +#include "score/mw/com/types.h" + +#include + +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; +using IpcBridgeProxy = score::mw::com::AsProxy; + +/// \brief Runs a skeleton service provider. +void RunSkeleton(const score::mw::com::InstanceSpecifier& instance_specifier, std::atomic& shutdown_flag) +{ + auto create_result = score::mw::com::AsSkeleton::Create(instance_specifier); + if (!create_result.has_value()) + { + std::cerr << "SKELETON (" << instance_specifier.ToString() << "): Unable to construct skeleton: " << create_result.error() << ", bailing!\n"; + return; + } + auto& skeleton = create_result.value(); + + const auto offer_result = skeleton.OfferService(); + if (!offer_result.has_value()) + { + std::cerr << "SKELETON (" << instance_specifier.ToString() << "): Unable to offer service: " << offer_result.error() << ", bailing!\n"; + return; + } + std::cout << "SKELETON (" << instance_specifier.ToString() << "): Service offered.\n"; + + while (!shutdown_flag) + { + std::this_thread::sleep_for(100ms); + } + + skeleton.StopOfferService(); + std::cout << "SKELETON (" << instance_specifier.ToString() << "): Terminating.\n"; +} + +/// \brief Main entry point for the inter-handler stop test. +/// This test verifies that calling StopFindService for one discovery operation from +/// within the handler of another discovery operation works correctly. +int main() +{ + // 1. Initialize the communication runtime. + score::mw::com::runtime::RuntimeConfiguration config{"/etc/mw_com_config.json"}; + score::mw::com::runtime::InitializeRuntime(config); + std::cout << "MAIN: Communication runtime initialized.\n"; + + // 2. Create instance specifiers for two different services. + const auto spec_A_res = score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped"}); + if (!spec_A_res.has_value()) + { + std::cerr << "MAIN: Invalid instance specifier A: " << spec_A_res.error() << ", bailing!\n"; + return EXIT_FAILURE; + } + const auto spec_B_res = score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped_B"}); + if (!spec_B_res.has_value()) + { + std::cerr << "MAIN: Invalid instance specifier B: " << spec_B_res.error() << ", bailing!\n"; + return EXIT_FAILURE; + } + const auto& spec_A = spec_A_res.value(); + const auto& spec_B = spec_B_res.value(); + + // 3. Start two skeletons in background threads. + std::atomic shutdown_flag{false}; + std::thread skeleton_A_thread(RunSkeleton, std::cref(spec_A), std::ref(shutdown_flag)); + std::thread skeleton_B_thread(RunSkeleton, std::cref(spec_B), std::ref(shutdown_flag)); + + // 4. Set up synchronization primitives. + std::promise handler_A_finished_promise; + auto handler_A_finished_future = handler_A_finished_promise.get_future(); + + // We need to capture the handle for discovery B to stop it from handler A. + // Since the handle is only created after StartFindService, we use a shared_ptr to allow late assignment. + std::shared_ptr find_handle_B_ptr; + + // 5. Define the handlers for each discovery. + auto find_service_handler_A = + [&handler_A_finished_promise, find_handle_B_ptr]( + const score::mw::com::ServiceHandleContainer&, + score::mw::com::FindServiceHandle) { + std::cout << "HANDLER A: Service A found. Stopping discovery for service B...\n"; + // Stop the *other* discovery operation. + if (find_handle_B_ptr) // Check if the pointer is valid before dereferencing + { + auto result = IpcBridgeProxy::StopFindService(*find_handle_B_ptr); + if (!result.has_value()) + std::cerr << "HANDLER A: Failed to stop discovery for service B: " << result.error().Message() << "\n"; + } + handler_A_finished_promise.set_value(); + }; + + auto find_service_handler_B = [](const score::mw::com::ServiceHandleContainer&, + score::mw::com::FindServiceHandle) { + // This handler should ideally not be called if A stops it fast enough, + // but it's not an error if it is. + std::cout << "HANDLER B: Service B found.\n"; + }; + + // 6. Start both asynchronous service discoveries. + std::cout << "MAIN: Starting discovery for Service A and Service B...\n"; + auto find_handle_A_result = IpcBridgeProxy::StartFindService(find_service_handler_A, spec_A); + auto find_handle_B_result = IpcBridgeProxy::StartFindService(find_service_handler_B, spec_B); + + if (!find_handle_A_result.has_value()) + { + std::cerr << "MAIN: Failed to start discovery for A: " << find_handle_A_result.error() << ", bailing!\n"; + shutdown_flag = true; + skeleton_A_thread.join(); + skeleton_B_thread.join(); + return EXIT_FAILURE; + } + if (!find_handle_B_result.has_value()) + { + std::cerr << "MAIN: Failed to start discovery for B: " << find_handle_B_result.error() << ", bailing!\n"; + IpcBridgeProxy::StopFindService(find_handle_A_result.value()); + shutdown_flag = true; + skeleton_A_thread.join(); + skeleton_B_thread.join(); + return EXIT_FAILURE; + } + + find_handle_B_ptr = std::make_shared(find_handle_B_result.value()); + + // 7. Wait for handler A to complete its logic. + handler_A_finished_future.wait(); + std::cout << "MAIN: Test logic complete.\n"; + + // 8. Clean up all threads. + // Stop the remaining discovery if it's still active. + IpcBridgeProxy::StopFindService(find_handle_A_result.value()); + + shutdown_flag = true; + skeleton_A_thread.join(); + skeleton_B_thread.join(); + + std::cout << "MAIN: Test finished successfully.\n"; + return EXIT_SUCCESS; +} \ No newline at end of file diff --git a/score/mw/com/example/ipc_bridge/find_long_running_handler_test.cpp b/score/mw/com/example/ipc_bridge/find_long_running_handler_test.cpp new file mode 100644 index 00000000..7b29e899 --- /dev/null +++ b/score/mw/com/example/ipc_bridge/find_long_running_handler_test.cpp @@ -0,0 +1,120 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + + + +#include "datatype.h" +#include "score/mw/com/runtime.h" +#include "score/mw/com/types.h" + +#include + + +#include +#include +#include +#include + +using namespace std::chrono_literals; +using IpcBridgeProxy = score::mw::com::AsProxy; + +/// \brief Runs the service provider (skeleton) logic. +void RunSkeleton(const score::mw::com::InstanceSpecifier& instance_specifier, std::future test_complete_future) +{ + auto create_result = score::mw::com::AsSkeleton::Create(instance_specifier); + if (!create_result.has_value()) + { + std::cerr << "SKELETON: Unable to construct skeleton: " << create_result.error() << ", bailing!\n"; + return; + } + auto& skeleton = create_result.value(); + + const auto offer_result = skeleton.OfferService(); + if (!offer_result.has_value()) + { + std::cerr << "SKELETON: Unable to offer service: " << offer_result.error() << ", bailing!\n"; + return; + } + std::cout << "SKELETON: Service offered successfully.\n"; + + // Wait until the main test logic is complete. + test_complete_future.wait(); + + std::cout << "SKELETON: Stopping offer service...\n"; + skeleton.StopOfferService(); + std::cout << "SKELETON: Terminating.\n"; +} + +/// \brief Main entry point for the long-running handler test. +/// This test verifies that a handler can continue to execute safely after calling +/// StopFindService on its own discovery operation. +int main() +{ + // 1. Initialize the communication runtime. + score::mw::com::runtime::RuntimeConfiguration config{"/etc/mw_com_config.json"}; + score::mw::com::runtime::InitializeRuntime(config); + std::cout << "MAIN: Communication runtime initialized.\n"; + + // 2. Create the instance specifier for the service. + const auto instance_specifier_result = + score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped"}); + if (!instance_specifier_result.has_value()) + { + std::cerr << "MAIN: Invalid instance specifier: " << instance_specifier_result.error() << ", bailing!\n"; + return EXIT_FAILURE; + } + const auto& instance_specifier = instance_specifier_result.value(); + + // 3. Start the skeleton in a background thread. + std::promise test_complete_promise; + auto test_complete_future = test_complete_promise.get_future(); + std::thread skeleton_thread(RunSkeleton, std::cref(instance_specifier), std::move(test_complete_future)); + + // 4. Define the service discovery handler. + std::promise handler_finished_promise; + auto find_service_handler = + [&handler_finished_promise](const score::mw::com::ServiceHandleContainer&, + score::mw::com::FindServiceHandle find_handle_in_handler) { + std::cout << "HANDLER: Service found. Stopping find...\n"; + IpcBridgeProxy::StopFindService(find_handle_in_handler); + + // Continue to do work after stopping. This stresses the "late removal" of the handler. + std::cout << "HANDLER: StopFindService called. Now sleeping for 1 second...\n"; + std::this_thread::sleep_for(1s); + + std::cout << "HANDLER: Sleep finished. Handler is completing.\n"; + handler_finished_promise.set_value(); + }; + + // 5. Start asynchronous service discovery. + std::cout << "MAIN: Starting to find service asynchronously...\n"; + auto find_handle_result = IpcBridgeProxy::StartFindService(find_service_handler, instance_specifier); + if (!find_handle_result.has_value()) + { + std::cerr << "MAIN: Failed to start service discovery: " << find_handle_result.error() << ", bailing!\n"; + test_complete_promise.set_value(); + skeleton_thread.join(); + return EXIT_FAILURE; + } + + // 6. Wait for the handler to complete its logic. + handler_finished_promise.get_future().wait(); + std::cout << "MAIN: Test logic complete.\n"; + + // 7. Clean up all threads. + test_complete_promise.set_value(); + skeleton_thread.join(); + + std::cout << "MAIN: Test finished successfully.\n"; + return EXIT_SUCCESS; +} \ No newline at end of file diff --git a/score/mw/com/example/ipc_bridge/find_stop_find_test.cpp b/score/mw/com/example/ipc_bridge/find_stop_find_test.cpp new file mode 100644 index 00000000..1b6fbe2c --- /dev/null +++ b/score/mw/com/example/ipc_bridge/find_stop_find_test.cpp @@ -0,0 +1,116 @@ +/******************************************************************************** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + + +#include "datatype.h" +#include "score/mw/com/runtime.h" +#include "score/mw/com/types.h" + +#include + + +#include +#include +#include +#include + +using namespace std::chrono_literals; +using IpcBridgeProxy = score::mw::com::AsProxy; + +/// \brief Runs the service provider (skeleton) logic in a loop. +/// +/// Creates a skeleton for the IpcBridge service, offers it, and then waits +/// until the test is complete before stopping the offer. +void RunSkeleton(const score::mw::com::InstanceSpecifier& instance_specifier, std::future test_complete_future) +{ + auto create_result = score::mw::com::AsSkeleton::Create(instance_specifier); + if (!create_result.has_value()) + { + std::cerr << "SKELETON: Unable to construct skeleton: " << create_result.error() << ", bailing!\n"; + return; + } + auto& skeleton = create_result.value(); + + const auto offer_result = skeleton.OfferService(); + if (!offer_result.has_value()) + { + std::cerr << "SKELETON: Unable to offer service: " << offer_result.error() << ", bailing!\n"; + return; + } + std::cout << "SKELETON: Service offered successfully.\n"; + + // Wait until the main test logic is complete. + test_complete_future.wait(); + + std::cout << "SKELETON: Stopping offer service...\n"; + skeleton.StopOfferService(); + std::cout << "SKELETON: Terminating.\n"; +} + +/// \brief Main entry point for the standalone test application. +int main() +{ + // 1. Initialize the communication runtime. + score::mw::com::runtime::RuntimeConfiguration config{"/etc/mw_com_config.json"}; + score::mw::com::runtime::InitializeRuntime(config); + std::cout << "MAIN: Communication runtime initialized.\n"; + + // 2. Create the instance specifier for the service we want to test. + const auto instance_specifier_result = + score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped"}); + if (!instance_specifier_result.has_value()) + { + std::cerr << "MAIN: Invalid instance specifier: " << instance_specifier_result.error() << ", bailing!\n"; + return EXIT_FAILURE; + } + const auto& instance_specifier = instance_specifier_result.value(); + + // 3. Start the skeleton in a background thread to provide the service. + std::promise test_complete_promise; + auto test_complete_future = test_complete_promise.get_future(); + std::thread skeleton_thread(RunSkeleton, std::cref(instance_specifier), std::move(test_complete_future)); + + // 4. Define the service discovery handler. This is the core of the test. + std::promise service_found_promise; + auto find_service_handler = + [&service_found_promise]( + const score::mw::com::ServiceHandleContainer& /*handles*/, // Not used in this test + score::mw::com::FindServiceHandle find_handle_in_handler) { // Use the handle passed to the handler + std::cout << "PROXY: Service found. Calling StopFindService() from within the handler.\n"; + // This is the action that tests the race condition. + IpcBridgeProxy::StopFindService(find_handle_in_handler); + service_found_promise.set_value(); + }; + // 5. Start asynchronous service discovery. + std::cout << "PROXY: Starting to find service asynchronously...\n"; + auto find_handle_result = IpcBridgeProxy::StartFindService(find_service_handler, instance_specifier); + if (!find_handle_result.has_value()) + { + std::cerr << "MAIN: Failed to start service discovery: " << find_handle_result.error() << std::endl; + // Perform cleanup and exit + test_complete_promise.set_value(); + skeleton_thread.join(); + return EXIT_FAILURE; + } + + // 6. Wait for the handler to be called and complete its logic. + auto service_found_future = service_found_promise.get_future(); + service_found_future.wait(); + std::cout << "MAIN: Test logic complete.\n"; + + // 7. Clean up: signal the skeleton thread to stop and then join it. + test_complete_promise.set_value(); + skeleton_thread.join(); + + std::cout << "MAIN: Test finished successfully.\n"; + return EXIT_SUCCESS; +} diff --git a/score/mw/com/impl/proxy_base_test.cpp b/score/mw/com/impl/proxy_base_test.cpp index 118d5491..ea707ee7 100644 --- a/score/mw/com/impl/proxy_base_test.cpp +++ b/score/mw/com/impl/proxy_base_test.cpp @@ -45,6 +45,10 @@ #include #include +#include +#include +#include +#include #include #include #include @@ -59,6 +63,9 @@ using ::testing::ByMove; using ::testing::MockFunction; using ::testing::Return; using ::testing::StrictMock; +using ::testing::DoAll; +using ::testing::Invoke; +using ::testing::SaveArg; const auto kInstanceSpecifier = InstanceSpecifier::Create(std::string{"abc/abc/TirePressurePort"}).value(); const auto kServiceIdentifier = make_ServiceIdentifierType("foo", 13, 37); @@ -439,6 +446,215 @@ TEST_F(ProxyBaseFindServiceInstanceIdentifierFixture, EXPECT_EQ(handles_result.error(), returned_error_code); } +class ProxyBaseStopFindServiceMultithreadedFixture : public ProxyBaseFixture +{ + public: + void SetUp() override + { + ProxyBaseFixture::SetUp(); + + // By default, StartFindService saves the handler and returns a valid handle. + ON_CALL(service_discovery_mock_, StartFindService(_, kInstanceSpecifier)) + .WillByDefault( + testing::Invoke([this](FindServiceHandler handler, const InstanceSpecifier&) -> Result { + std::lock_guard lock{handler_mutex_}; + saved_handler_ = std::move(handler); + handler_set_promise_.set_value(); + return kFindServiceHandle; + })); + } + + FindServiceHandler saved_handler_{}; + std::mutex handler_mutex_{}; + std::promise handler_set_promise_{}; + const FindServiceHandle kFindServiceHandle{make_FindServiceHandle(42U)}; + const FindServiceHandle kFindServiceHandle2{make_FindServiceHandle(43U)}; +}; + +TEST_F(ProxyBaseStopFindServiceMultithreadedFixture, find_stop_find_test) +{ + std::promise stop_called_promise; + auto stop_called_future = stop_called_promise.get_future(); + std::promise handler_can_finish_promise; + auto handler_can_finish_future = handler_can_finish_promise.get_future(); + std::atomic handler_finished{false}; + + // StopFindService will block until the handler finishes. + EXPECT_CALL(service_discovery_mock_, StopFindService(kFindServiceHandle)) + .WillOnce(testing::Invoke([&](const FindServiceHandle&) { + stop_called_promise.set_value(); + // Wait until the handler is allowed to finish. + handler_can_finish_future.wait(); + return ResultBlank{}; + })); + + // The handler will call StopFindService. + auto handler = [&](ServiceHandleContainer, FindServiceHandle) { + ProxyBase::StopFindService(kFindServiceHandle); + handler_finished = true; + }; + + // Start finding the service in a separate thread. + std::thread client_thread([&] { + auto find_service_handle_result = ProxyBase::StartFindService(handler, kInstanceSpecifier); + ASSERT_TRUE(find_service_handle_result.has_value()); + EXPECT_EQ(find_service_handle_result.value(), kFindServiceHandle); + }); + + // Wait for the handler to be registered. + handler_set_promise_.get_future().wait(); + + // Invoke the handler in another thread. + std::thread discovery_thread([&] { + std::lock_guard lock{handler_mutex_}; + saved_handler_({}, kFindServiceHandle); + }); + + // Wait for StopFindService to be called from within the handler. + stop_called_future.wait(); + + // At this point, the handler is blocked inside StopFindService. + // We check that the handler has not finished yet. + EXPECT_FALSE(handler_finished); + + // Allow the handler to finish. + handler_can_finish_promise.set_value(); + + client_thread.join(); + discovery_thread.join(); + + // The handler should have finished. + EXPECT_TRUE(handler_finished); +} + +TEST_F(ProxyBaseStopFindServiceMultithreadedFixture, find_concurrent_stop_test) +{ + std::atomic handler_running{false}; + std::condition_variable cv; + std::mutex m; + + auto handler = [&](ServiceHandleContainer, FindServiceHandle) { + handler_running = true; + cv.notify_one(); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Simulate work + }; + + auto find_service_handle_result = ProxyBase::StartFindService(handler, kInstanceSpecifier); + ASSERT_TRUE(find_service_handle_result.has_value()); + + std::thread handler_thread([&] { + std::lock_guard lock{handler_mutex_}; + if (saved_handler_ != nullptr) + { + saved_handler_({}, kFindServiceHandle); + } + }); + + // Wait until handler starts execution + { + std::unique_lock lock(m); + cv.wait(lock, [&] { return handler_running.load(); }); + } + + // Call StopFindService while handler is running + auto stop_result = ProxyBase::StopFindService(kFindServiceHandle); + EXPECT_TRUE(stop_result.has_value()); + + handler_thread.join(); +} + +TEST_F(ProxyBaseStopFindServiceMultithreadedFixture, find_long_running_handler_test) +{ + std::atomic handler_call_count{0}; + const int num_handler_invocations = 100; + + auto handler = [&](ServiceHandleContainer, FindServiceHandle) { + handler_call_count++; + // Short sleep to increase chance of concurrency issues + std::this_thread::sleep_for(std::chrono::microseconds(100)); + }; + + auto find_service_handle_result = ProxyBase::StartFindService(handler, kInstanceSpecifier); + ASSERT_TRUE(find_service_handle_result.has_value()); + const auto find_handle = find_service_handle_result.value(); + + // Wait for the handler to be registered before starting the storm + handler_set_promise_.get_future().wait(); + + std::thread storm_thread([&] { + for (int i = 0; i < num_handler_invocations; ++i) + { + std::lock_guard lock{handler_mutex_}; + if (saved_handler_ != nullptr) + { + saved_handler_({}, find_handle); + } + } + }); + + // Give the storm a moment to start, then stop it mid-way + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + auto stop_result = ProxyBase::StopFindService(find_handle); + EXPECT_TRUE(stop_result.has_value()); + + storm_thread.join(); + // We don't assert on the exact count, as it's non-deterministic. + // The main goal is to ensure the test completes without crashing or deadlocking. + SUCCEED() << "Test completed without crashing. Handler was called " << handler_call_count << " times."; +} + +TEST_F(ProxyBaseStopFindServiceMultithreadedFixture, find_inter_stop_test) +{ + // Two handlers for two separate find operations + FindServiceHandler saved_handler_1; + FindServiceHandler saved_handler_2; + std::atomic handler_1_calls{0}; + std::atomic handler_2_calls{0}; + + // Mock StartFindService to save both handlers and return distinct handles + EXPECT_CALL(service_discovery_mock_, StartFindService(_, kInstanceSpecifier)) + .WillOnce(Invoke([&](FindServiceHandler handler, const InstanceSpecifier&) { + saved_handler_1 = std::move(handler); + return Result{kFindServiceHandle}; + })) + .WillOnce(Invoke([&](FindServiceHandler handler, const InstanceSpecifier&) { + saved_handler_2 = std::move(handler); + return Result{kFindServiceHandle2}; + })); + + // Mock StopFindService for the first handle only + EXPECT_CALL(service_discovery_mock_, StopFindService(kFindServiceHandle)).WillOnce(Return(ResultBlank{})); + + // Define the handlers + auto handler_1 = [&](ServiceHandleContainer, FindServiceHandle) { handler_1_calls++; }; + auto handler_2 = [&](ServiceHandleContainer, FindServiceHandle) { handler_2_calls++; }; + + // Start both find operations + auto find_result_1 = ProxyBase::StartFindService(handler_1, kInstanceSpecifier); + ASSERT_TRUE(find_result_1.has_value()); + EXPECT_EQ(find_result_1.value(), kFindServiceHandle); + + auto find_result_2 = ProxyBase::StartFindService(handler_2, kInstanceSpecifier); + ASSERT_TRUE(find_result_2.has_value()); + EXPECT_EQ(find_result_2.value(), kFindServiceHandle2); + + // Stop the first find operation + auto stop_result = ProxyBase::StopFindService(kFindServiceHandle); + ASSERT_TRUE(stop_result.has_value()); + + // Now, invoke the handler for the second (still active) find operation + // This should succeed without issue. + ASSERT_TRUE(saved_handler_2 != nullptr); + saved_handler_2({}, kFindServiceHandle2); + + // The handler for the stopped service should have been removed and should not be callable. + // In the real implementation, the handler is removed. In this test, we can check if it was nulled out. + // The mock for StopFindService would need to be updated to simulate this removal for a more robust check. + // For now, we confirm the second handler was called and the first was not (after the stop). + EXPECT_EQ(handler_2_calls, 1); + EXPECT_EQ(handler_1_calls, 0); +} + /// \todo Enable test when support for multiple bindings is added (Ticket-149776) using ProxyBaseFindServiceMultipleBindingsFixture = ProxyBaseFixture; TEST_F(ProxyBaseFindServiceMultipleBindingsFixture,