Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions score/mw/com/example/ipc_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,57 @@ cc_binary(
],
)

cc_binary(
name = "find_stop_find_test",
srcs = ["find_stop_find_test.cpp"],
data = ["etc/mw_com_config.json"],
features = COMPILER_WARNING_FEATURES,
deps = [
":datatype",
"//score/mw/com",
"@score_baselibs//score/concurrency",
"@score_baselibs//score/mw/log",
],
)

cc_binary(
name = "find_long_running_handler_test",
srcs = ["find_long_running_handler_test.cpp"],
data = ["etc/mw_com_config.json"],
features = COMPILER_WARNING_FEATURES,
deps = [
":datatype",
"//score/mw/com",
"@score_baselibs//score/concurrency",
"@score_baselibs//score/mw/log",
],
)

cc_binary(
name = "find_inter_stop_test",
srcs = ["find_inter_stop_test.cpp"],
data = ["etc/mw_com_config.json"],
features = COMPILER_WARNING_FEATURES,
deps = [
":datatype",
"//score/mw/com",
"@score_baselibs//score/concurrency",
"@score_baselibs//score/mw/log",
],
)

cc_binary(
name = "find_concurrent_stop_test",
srcs = ["find_concurrent_stop_test.cpp"],
data = ["etc/mw_com_config.json"],
features = COMPILER_WARNING_FEATURES,
deps = [
":datatype",
"//score/mw/com",
"@score_baselibs//score/concurrency",
"@score_baselibs//score/mw/log",
],
)
cc_library(
name = "sample_sender_receiver",
srcs = [
Expand Down
34 changes: 34 additions & 0 deletions score/mw/com/example/ipc_bridge/etc/mw_com_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,40 @@
"maxSubscribers": 5
}
]
}
]
},
{
"instanceSpecifier": "xpad/cp60/MapApiLanesStamped_B",
"serviceTypeName": "/bmw/adp/MapApiLanesStamped",
"version": {
"major": 1,
"minor": 0
},
"instances": [
{
"instanceId": 2,
"allowedConsumer": {
"QM": [
4002,
0
]
},
"allowedProvider": {
"QM": [
4001,
0
]
},
"asil-level": "QM",
"binding": "SHM",
"events": [
{
"eventName": "map_api_lanes_stamped",
"numberOfSampleSlots": 30,
"maxSubscribers": 5
}
]
}
]
}
Expand Down
136 changes: 136 additions & 0 deletions score/mw/com/example/ipc_bridge/find_concurrent_stop_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/********************************************************************************
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/


#include "datatype.h"
#include "score/mw/com/runtime.h"
#include "score/mw/com/types.h"

#include <score/assert.hpp>

#include <chrono>
#include <future>
#include <iostream>
#include <thread>

using namespace std::chrono_literals;
using IpcBridgeProxy = score::mw::com::AsProxy<score::mw::com::IpcBridgeInterface>;

/// \brief Runs the service provider (skeleton) logic.
void RunSkeleton(const score::mw::com::InstanceSpecifier& instance_specifier, std::future<void> test_complete_future)
{
auto create_result = score::mw::com::AsSkeleton<score::mw::com::IpcBridgeInterface>::Create(instance_specifier);
if (!create_result.has_value())
{
std::cerr << "SKELETON: Unable to construct skeleton: " << create_result.error() << ", bailing!\n";
return;
}
auto& skeleton = create_result.value();

const auto offer_result = skeleton.OfferService();
if (!offer_result.has_value())
{
std::cerr << "SKELETON: Unable to offer service: " << offer_result.error() << ", bailing!\n";
return;
}
std::cout << "SKELETON: Service offered successfully.\n";

// Wait until the main test logic is complete.
test_complete_future.wait();

std::cout << "SKELETON: Stopping offer service...\n";
skeleton.StopOfferService();
std::cout << "SKELETON: Terminating.\n";
}

/// \brief Main entry point for the concurrent stop test.
/// This test verifies that calling StopFindService for the same handle from two threads
/// concurrently is handled gracefully.
int main()
{
// 1. Initialize the communication runtime.
score::mw::com::runtime::RuntimeConfiguration config{"/etc/mw_com_config.json"};
score::mw::com::runtime::InitializeRuntime(config);
std::cout << "MAIN: Communication runtime initialized.\n";

// 2. Create the instance specifier for the service we want to test.
const auto instance_specifier_result =
score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped"});
if (!instance_specifier_result.has_value())
{
std::cerr << "MAIN: Invalid instance specifier: " << instance_specifier_result.error() << ", bailing!\n";
return EXIT_FAILURE;
}
const auto& instance_specifier = instance_specifier_result.value();

// 3. Start the skeleton in a background thread.
std::promise<void> test_complete_promise;
auto test_complete_future = test_complete_promise.get_future();
std::thread skeleton_thread(RunSkeleton, std::cref(instance_specifier), std::move(test_complete_future));

// 4. Set up synchronization primitives for the concurrent stop calls.
std::promise<void> handler_finished_promise;
auto handler_finished_future = handler_finished_promise.get_future();
std::promise<void> release_stopper_promise;
auto release_stopper_future = release_stopper_promise.get_future();

// 5. Define the service discovery handler.
auto find_service_handler =
[&release_stopper_promise, &handler_finished_promise](
const score::mw::com::ServiceHandleContainer<IpcBridgeProxy::HandleType>&,
score::mw::com::FindServiceHandle find_handle_in_handler) {
std::cout << "HANDLER: Service found. Releasing stopper thread and stopping find...\n";
// Release the stopper thread to create a race.
release_stopper_promise.set_value();
// The handler also tries to stop.
auto result = IpcBridgeProxy::StopFindService(find_handle_in_handler);
std::cout << "HANDLER: StopFindService called with result: "
<< (result.has_value() ? "success" : result.error().Message()) << "\n";
handler_finished_promise.set_value();
};

// 6. Start asynchronous service discovery.
std::cout << "MAIN: Starting to find service asynchronously...\n";
auto find_handle_result = IpcBridgeProxy::StartFindService(find_service_handler, instance_specifier);
if (!find_handle_result.has_value())
{
std::cerr << "MAIN: Failed to start service discovery: " << find_handle_result.error() << ", bailing!\n";
test_complete_promise.set_value();
skeleton_thread.join();
return EXIT_FAILURE;
}
auto find_handle = find_handle_result.value();

// 7. Create and start the "stopper" thread.
std::thread stopper_thread([&release_stopper_future, find_handle]() {
std::cout << "STOPPER: Waiting for release signal from handler...\n";
release_stopper_future.wait();
std::cout << "STOPPER: Released. Calling StopFindService...\n";
// The stopper thread also tries to stop.
auto result = IpcBridgeProxy::StopFindService(find_handle);
std::cout << "STOPPER: StopFindService called with result: "
<< (result.has_value() ? "success" : result.error().Message()) << "\n";
});

// 8. Wait for the handler to complete its logic.
handler_finished_future.wait();
std::cout << "MAIN: Test logic complete.\n";

// 9. Clean up all threads.
stopper_thread.join();
test_complete_promise.set_value();
skeleton_thread.join();

std::cout << "MAIN: Test finished successfully.\n";
return EXIT_SUCCESS;
}
159 changes: 159 additions & 0 deletions score/mw/com/example/ipc_bridge/find_inter_stop_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/********************************************************************************
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/


#include "datatype.h"
#include "score/mw/com/runtime.h"
#include "score/mw/com/types.h"

#include <score/assert.hpp>

#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <thread>

using namespace std::chrono_literals;
using IpcBridgeProxy = score::mw::com::AsProxy<score::mw::com::IpcBridgeInterface>;

/// \brief Runs a skeleton service provider.
void RunSkeleton(const score::mw::com::InstanceSpecifier& instance_specifier, std::atomic<bool>& shutdown_flag)
{
auto create_result = score::mw::com::AsSkeleton<score::mw::com::IpcBridgeInterface>::Create(instance_specifier);
if (!create_result.has_value())
{
std::cerr << "SKELETON (" << instance_specifier.ToString() << "): Unable to construct skeleton: " << create_result.error() << ", bailing!\n";
return;
}
auto& skeleton = create_result.value();

const auto offer_result = skeleton.OfferService();
if (!offer_result.has_value())
{
std::cerr << "SKELETON (" << instance_specifier.ToString() << "): Unable to offer service: " << offer_result.error() << ", bailing!\n";
return;
}
std::cout << "SKELETON (" << instance_specifier.ToString() << "): Service offered.\n";

while (!shutdown_flag)
{
std::this_thread::sleep_for(100ms);
}

skeleton.StopOfferService();
std::cout << "SKELETON (" << instance_specifier.ToString() << "): Terminating.\n";
}

/// \brief Main entry point for the inter-handler stop test.
/// This test verifies that calling StopFindService for one discovery operation from
/// within the handler of another discovery operation works correctly.
int main()
{
// 1. Initialize the communication runtime.
score::mw::com::runtime::RuntimeConfiguration config{"/etc/mw_com_config.json"};
score::mw::com::runtime::InitializeRuntime(config);
std::cout << "MAIN: Communication runtime initialized.\n";

// 2. Create instance specifiers for two different services.
const auto spec_A_res = score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped"});
if (!spec_A_res.has_value())
{
std::cerr << "MAIN: Invalid instance specifier A: " << spec_A_res.error() << ", bailing!\n";
return EXIT_FAILURE;
}
const auto spec_B_res = score::mw::com::InstanceSpecifier::Create(std::string{"xpad/cp60/MapApiLanesStamped_B"});
if (!spec_B_res.has_value())
{
std::cerr << "MAIN: Invalid instance specifier B: " << spec_B_res.error() << ", bailing!\n";
return EXIT_FAILURE;
}
const auto& spec_A = spec_A_res.value();
const auto& spec_B = spec_B_res.value();

// 3. Start two skeletons in background threads.
std::atomic<bool> shutdown_flag{false};
std::thread skeleton_A_thread(RunSkeleton, std::cref(spec_A), std::ref(shutdown_flag));
std::thread skeleton_B_thread(RunSkeleton, std::cref(spec_B), std::ref(shutdown_flag));

// 4. Set up synchronization primitives.
std::promise<void> handler_A_finished_promise;
auto handler_A_finished_future = handler_A_finished_promise.get_future();

// We need to capture the handle for discovery B to stop it from handler A.
// Since the handle is only created after StartFindService, we use a shared_ptr to allow late assignment.
std::shared_ptr<score::mw::com::FindServiceHandle> find_handle_B_ptr;

// 5. Define the handlers for each discovery.
auto find_service_handler_A =
[&handler_A_finished_promise, find_handle_B_ptr](
const score::mw::com::ServiceHandleContainer<IpcBridgeProxy::HandleType>&,
score::mw::com::FindServiceHandle) {
std::cout << "HANDLER A: Service A found. Stopping discovery for service B...\n";
// Stop the *other* discovery operation.
if (find_handle_B_ptr) // Check if the pointer is valid before dereferencing
{
auto result = IpcBridgeProxy::StopFindService(*find_handle_B_ptr);
if (!result.has_value())
std::cerr << "HANDLER A: Failed to stop discovery for service B: " << result.error().Message() << "\n";
}
handler_A_finished_promise.set_value();
};

auto find_service_handler_B = [](const score::mw::com::ServiceHandleContainer<IpcBridgeProxy::HandleType>&,
score::mw::com::FindServiceHandle) {
// This handler should ideally not be called if A stops it fast enough,
// but it's not an error if it is.
std::cout << "HANDLER B: Service B found.\n";
};

// 6. Start both asynchronous service discoveries.
std::cout << "MAIN: Starting discovery for Service A and Service B...\n";
auto find_handle_A_result = IpcBridgeProxy::StartFindService(find_service_handler_A, spec_A);
auto find_handle_B_result = IpcBridgeProxy::StartFindService(find_service_handler_B, spec_B);

if (!find_handle_A_result.has_value())
{
std::cerr << "MAIN: Failed to start discovery for A: " << find_handle_A_result.error() << ", bailing!\n";
shutdown_flag = true;
skeleton_A_thread.join();
skeleton_B_thread.join();
return EXIT_FAILURE;
}
if (!find_handle_B_result.has_value())
{
std::cerr << "MAIN: Failed to start discovery for B: " << find_handle_B_result.error() << ", bailing!\n";
IpcBridgeProxy::StopFindService(find_handle_A_result.value());
shutdown_flag = true;
skeleton_A_thread.join();
skeleton_B_thread.join();
return EXIT_FAILURE;
}

find_handle_B_ptr = std::make_shared<score::mw::com::FindServiceHandle>(find_handle_B_result.value());

// 7. Wait for handler A to complete its logic.
handler_A_finished_future.wait();
std::cout << "MAIN: Test logic complete.\n";

// 8. Clean up all threads.
// Stop the remaining discovery if it's still active.
IpcBridgeProxy::StopFindService(find_handle_A_result.value());

shutdown_flag = true;
skeleton_A_thread.join();
skeleton_B_thread.join();

std::cout << "MAIN: Test finished successfully.\n";
return EXIT_SUCCESS;
}
Loading
Loading