Skip to content

Commit af4ca0c

Browse files
Address flakiness in rosbag2_play_end_to_end tests (ros2#1297)
* Make test_rosbag2_play_end_to_end more deterministic - Change QoS depth in test databases to correspond number of messages - Change QoS durability to transient local in test DB and mcap file - Explicitly specify QoS depth = 10 for subscribers - Explicitly specify QoS reliability to reliable for subscribers - Explicitly specify QoS durability to transient local for subscribers - Update metadata in test DB and mcap files to the latest version(7) - Remove xfail test_rosbag2_play_end_to_end Signed-off-by: Michael Orlov <[email protected]> * Add wait_for_matched for record_end_to_end_exits_gracefully_on_sigterm - Remove xfail for test_rosbag2_info_end_to_end Signed-off-by: Michael Orlov <[email protected]> * Fix for play_filters_by_topic test - Uncomment play_filters_by_topic test - Use proper qos settings for subscribers in `play_filters_by_topic` and fix expectations about number of published messages. - Log warning if SubscriptionManager::continue_spinning(..) finished by timeout. - Enable `play_end_to_end_test` for windows. Signed-off-by: Michael Orlov <[email protected]> * Make test_rosbag2_play_end_to_end deterministic - Start player in pause mode and wait on subscribers for matched publishers from player then send resume service call to unpause. - Add spin_and_wait_for_matched(topic_names) for SubscriptionManager Signed-off-by: Michael Orlov <[email protected]> * Remove redundant includes from test_rosbag2_play_end_to_end.cpp Signed-off-by: Michael Orlov <[email protected]> * Sleep for a few milliseconds in SubscriptionManager to avoid busy loop Signed-off-by: Michael Orlov <[email protected]> * Update rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp Co-authored-by: Chris Lalancette <[email protected]> Signed-off-by: Michael Orlov <[email protected]> Co-authored-by: Chris Lalancette <[email protected]> * Add missing include<thread> in process_execution_helpers_unix.hpp Signed-off-by: Michael Orlov <[email protected]> * Address race condition in process termination routines - Added wait_until_completion(process_id, timeout) helper function Signed-off-by: Michael Orlov <[email protected]> * Move wait_until_completion before stop_execution to fix compilation error Signed-off-by: Michael Orlov <[email protected]> * Fix for Windows build error. Rename process_id to handle. Signed-off-by: Michael Orlov <[email protected]> * Increase timeout for service call and wait_until_completion up to 10 sec Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Michael Orlov <[email protected]> Co-authored-by: Chris Lalancette <[email protected]>
1 parent f3bc1af commit af4ca0c

File tree

11 files changed

+337
-143
lines changed

11 files changed

+337
-143
lines changed

rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_unix.hpp

+48-17
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <signal.h>
2121
#include <stdlib.h>
22+
#include <thread>
2223

2324
#include <chrono>
2425
#include <cstdlib>
@@ -93,33 +94,63 @@ ProcessHandle start_execution(const std::string & command)
9394
return process_id;
9495
}
9596

96-
void stop_execution(
97+
/// @brief Wait for process to finish with timeout
98+
/// @param process_id Process ID
99+
/// @param timeout Timeout in fraction of seconds
100+
/// @return true if process has finished during timeout and false if timeout was reached and
101+
/// process is still running
102+
bool wait_until_completion(
97103
const ProcessHandle & process_id,
98-
int signum = SIGINT,
99-
std::chrono::seconds timeout = std::chrono::seconds(10))
104+
std::chrono::duration<double> timeout = std::chrono::seconds(10))
100105
{
101-
EXPECT_NE(kill(process_id, signum), -1) << "Failed to send signal " << signum <<
102-
" to process: " << process_id;
103-
int status = EXIT_FAILURE;
104106
pid_t wait_ret_code = 0;
107+
int status = EXIT_FAILURE;
105108
std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
106109
// Wait for process to finish with timeout
107110
while (wait_ret_code == 0 && std::chrono::steady_clock::now() - start < timeout) {
108-
std::this_thread::sleep_for(std::chrono::milliseconds(30));
111+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
109112
// WNOHANG - wait for processes without causing the caller to be blocked
110113
wait_ret_code = waitpid(process_id, &status, WNOHANG);
111114
}
112-
if (wait_ret_code == 0) {
113-
std::cerr << "Testing process " << process_id << " hangout. Killing it with SIGKILL \n";
114-
kill(process_id, SIGKILL);
115-
}
116-
// Make sure that the process does execute without issues before it is killed by
117-
// the user in the test or, in case it runs until completion, that it has correctly executed.
118115
EXPECT_NE(wait_ret_code, -1);
119-
EXPECT_EQ(wait_ret_code, process_id);
120-
EXPECT_EQ(WIFEXITED(status), true) << "status = " << status;
121-
EXPECT_EQ(WIFSIGNALED(status), false) << "Process terminated by signal: " << WTERMSIG(status);
122-
EXPECT_EQ(WEXITSTATUS(status), EXIT_SUCCESS) << "status = " << status;
116+
EXPECT_EQ(wait_ret_code, process_id) << "status = " << status;
117+
return wait_ret_code != 0;
118+
}
119+
120+
/// @brief Force to stop process with signal if it's currently running
121+
/// @param process_id Process ID
122+
/// @param signum Signal to use for stopping process. The default is SIGINT.
123+
/// @param timeout Timeout in fraction of seconds
124+
void stop_execution(
125+
const ProcessHandle & process_id,
126+
int signum = SIGINT,
127+
std::chrono::duration<double> timeout = std::chrono::seconds(10))
128+
{
129+
if (kill(process_id, 0) == 0) {
130+
// If process is still running, then send signal to stop it and check return code
131+
EXPECT_NE(kill(process_id, signum), -1) << "Failed to send signal " << signum <<
132+
" to process: " << process_id;
133+
int status = EXIT_FAILURE;
134+
pid_t wait_ret_code = 0;
135+
std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
136+
// Wait for process to finish with timeout
137+
while (wait_ret_code == 0 && std::chrono::steady_clock::now() - start < timeout) {
138+
std::this_thread::sleep_for(std::chrono::milliseconds(30));
139+
// WNOHANG - wait for processes without causing the caller to be blocked
140+
wait_ret_code = waitpid(process_id, &status, WNOHANG);
141+
}
142+
if (wait_ret_code == 0) {
143+
std::cerr << "Testing process " << process_id << " hangout. Killing it with SIGKILL \n";
144+
kill(process_id, SIGKILL);
145+
}
146+
// Make sure that the process does execute without issues before it is killed by
147+
// the user in the test or, in case it runs until completion, that it has correctly executed.
148+
EXPECT_NE(wait_ret_code, -1);
149+
EXPECT_EQ(wait_ret_code, process_id);
150+
EXPECT_EQ(WIFEXITED(status), true) << "status = " << status;
151+
EXPECT_EQ(WIFSIGNALED(status), false) << "Process terminated by signal: " << WTERMSIG(status);
152+
EXPECT_EQ(WEXITSTATUS(status), EXIT_SUCCESS) << "status = " << status;
153+
}
123154
}
124155

125156
#endif // ROSBAG2_TEST_COMMON__PROCESS_EXECUTION_HELPERS_UNIX_HPP_

rosbag2_test_common/include/rosbag2_test_common/process_execution_helpers_windows.hpp

+40-5
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ int execute_and_wait_until_completion(const std::string & command, const std::st
7575
auto process = create_process(command_char, path.c_str());
7676
DWORD exit_code = 259; // 259 is the code one gets if the process is still active.
7777
while (exit_code == 259) {
78-
GetExitCodeProcess(process.hProcess, &exit_code);
78+
EXPECT_TRUE(GetExitCodeProcess(process.hProcess, &exit_code));
7979
std::this_thread::sleep_for(std::chrono::milliseconds(10));
8080
}
8181
close_process_handles(process);
@@ -105,16 +105,51 @@ ProcessHandle start_execution(const std::string & command)
105105
return process;
106106
}
107107

108-
void stop_execution(const ProcessHandle & handle, int signum = SIGINT)
108+
/// @brief Wait for process to finish with timeout
109+
/// @param handle Process handle returned from start_execution(command)
110+
/// @param timeout Timeout in fraction of seconds
111+
/// @return true if process has finished during timeout and false if timeout was reached and
112+
/// process is still running
113+
bool wait_until_completion(
114+
const ProcessHandle & handle,
115+
std::chrono::duration<double> timeout = std::chrono::seconds(10))
116+
{
117+
DWORD exit_code = 0;
118+
std::chrono::steady_clock::time_point const start = std::chrono::steady_clock::now();
119+
EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
120+
// 259 indicates that the process is still active
121+
while (exit_code == 259 && std::chrono::steady_clock::now() - start < timeout) {
122+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
123+
EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
124+
}
125+
return exit_code != 259;
126+
}
127+
128+
/// @brief Force to stop process with signal if it's currently running
129+
/// @param handle Process handle returned from start_execution(command)
130+
/// @param signum Not uses for Windows version
131+
/// @param timeout Timeout in fraction of seconds
132+
void stop_execution(
133+
const ProcessHandle & handle,
134+
int signum = SIGINT,
135+
std::chrono::duration<double> timeout = std::chrono::seconds(10))
109136
{
110137
// Match the Unix version by allowing for int signum argument - however Windows does not have
111138
// Linux signals in the same way, so there isn't a 1:1 mapping to dispatch e.g. SIGTERM
112139
DWORD exit_code;
113-
GetExitCodeProcess(handle.process_info.hProcess, &exit_code);
140+
EXPECT_TRUE(GetExitCodeProcess(handle.process_info.hProcess, &exit_code));
114141
// 259 indicates that the process is still active: we want to make sure that the process is
115142
// still running properly before killing it.
116-
ASSERT_THAT(exit_code, Eq(259));
117-
GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwThreadId);
143+
if (exit_code == 259) {
144+
EXPECT_TRUE(GenerateConsoleCtrlEvent(CTRL_C_EVENT, handle.process_info.dwThreadId));
145+
bool process_finished = wait_until_completion(handle, timeout);
146+
if (!process_finished) {
147+
std::cerr << "Testing process " << handle.process_info.hProcess <<
148+
" hangout. Killing it with TerminateProcess(..) \n";
149+
EXPECT_TRUE(TerminateProcess(handle.process_info.hProcess, 2));
150+
EXPECT_TRUE(process_finished);
151+
}
152+
}
118153
close_process_handles(handle.process_info);
119154
CloseHandle(handle.job_handle);
120155
}

rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp

+66-14
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ class SubscriptionManager
5959
auto options = rclcpp::SubscriptionOptions();
6060
options.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable;
6161

62-
subscriptions_.push_back(
62+
subscriptions_[topic_name] =
6363
subscriber_node_->create_subscription<MessageT>(
64-
topic_name,
65-
qos,
66-
[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> msg) {
67-
subscribed_messages_[topic_name].push_back(msg);
68-
},
69-
options));
64+
topic_name,
65+
qos,
66+
[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> msg) {
67+
subscribed_messages_[topic_name].push_back(msg);
68+
},
69+
options);
7070
}
7171

7272
template<typename MessageT>
@@ -113,7 +113,6 @@ class SubscriptionManager
113113
bool matched = false;
114114
while (!matched && ((clock::now() - start) < timeout)) {
115115
exec.spin_node_some(subscriber_node_);
116-
117116
matched = true;
118117
for (const auto publisher_ptr : publishers) {
119118
if (publisher_ptr->get_subscription_count() +
@@ -123,17 +122,63 @@ class SubscriptionManager
123122
break;
124123
}
125124
}
125+
// Sleep for a few milliseconds to avoid busy loop
126+
std::this_thread::sleep_for(sleep_per_loop_);
127+
}
128+
return matched;
129+
}
130+
131+
/// @brief Wait until publishers will be connected to the subscriptions or timeout occur.
132+
/// @param topic_names List of topic names
133+
/// @param timeout Maximum time duration during which discovery should happen.
134+
/// @param n_publishers_to_match Number of publishers each subscription should have for match.
135+
/// @return true if subscriptions have specified number of publishers, otherwise false.
136+
bool spin_and_wait_for_matched(
137+
const std::vector<std::string> & topic_names,
138+
std::chrono::duration<double> timeout = std::chrono::seconds(10),
139+
size_t n_publishers_to_match = 1)
140+
{
141+
// Sanity check that we have valid input
142+
if (topic_names.empty()) {
143+
throw std::invalid_argument("List of topic names is empty");
144+
}
145+
for (const auto & topic_name : topic_names) {
146+
if (subscriptions_.count(topic_name) == 0) {
147+
throw std::invalid_argument(
148+
"Publisher's topic name = `" + topic_name + "` not found in expected topics list");
149+
}
150+
}
151+
152+
using clock = std::chrono::steady_clock;
153+
auto start = clock::now();
154+
155+
rclcpp::executors::SingleThreadedExecutor exec;
156+
bool matched = false;
157+
while (!matched && ((clock::now() - start) < timeout)) {
158+
exec.spin_node_some(subscriber_node_);
159+
matched = true;
160+
for (const auto & topic_name : topic_names) {
161+
if (subscriptions_.find(topic_name) == subscriptions_.end() ||
162+
subscriptions_[topic_name]->get_publisher_count() < n_publishers_to_match)
163+
{
164+
matched = false;
165+
break;
166+
}
167+
}
168+
// Sleep for a few milliseconds to avoid busy loop
169+
std::this_thread::sleep_for(sleep_per_loop_);
126170
}
127171
return matched;
128172
}
129173

130-
std::future<void> spin_subscriptions()
174+
std::future<void> spin_subscriptions(
175+
std::chrono::duration<double> timeout = std::chrono::seconds(10))
131176
{
132177
return async(
133-
std::launch::async, [this]() {
178+
std::launch::async, [this, timeout]() {
134179
rclcpp::executors::SingleThreadedExecutor exec;
135180
auto start = std::chrono::high_resolution_clock::now();
136-
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) {
181+
while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start, timeout)) {
137182
exec.spin_node_some(subscriber_node_);
138183
}
139184
});
@@ -151,26 +196,33 @@ class SubscriptionManager
151196
private:
152197
bool continue_spinning(
153198
const std::unordered_map<std::string, size_t> & expected_topics_with_sizes,
154-
std::chrono::time_point<std::chrono::high_resolution_clock> start_time)
199+
std::chrono::time_point<std::chrono::high_resolution_clock> start_time,
200+
std::chrono::duration<double> timeout = std::chrono::seconds(10))
155201
{
156202
auto current = std::chrono::high_resolution_clock::now();
157-
if (current - start_time > std::chrono::seconds(10)) {
203+
if (current - start_time > timeout) {
204+
RCLCPP_WARN(
205+
subscriber_node_->get_logger(),
206+
"SubscriptionManager::continue_spinning(..) finished by timeout");
158207
return false;
159208
}
160209

161210
for (const auto & topic_expected : expected_topics_with_sizes) {
162211
if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) {
212+
// Sleep for a few milliseconds to avoid busy loop
213+
std::this_thread::sleep_for(sleep_per_loop_);
163214
return true;
164215
}
165216
}
166217
return false;
167218
}
168219

169-
std::vector<rclcpp::SubscriptionBase::SharedPtr> subscriptions_;
220+
std::unordered_map<std::string, rclcpp::SubscriptionBase::SharedPtr> subscriptions_;
170221
std::unordered_map<std::string,
171222
std::vector<std::shared_ptr<rclcpp::SerializedMessage>>> subscribed_messages_;
172223
std::unordered_map<std::string, size_t> expected_topics_with_size_;
173224
rclcpp::Node::SharedPtr subscriber_node_;
225+
const std::chrono::milliseconds sleep_per_loop_ = std::chrono::milliseconds(2);
174226
};
175227

176228
} // namespace rosbag2_test_common

rosbag2_tests/CMakeLists.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ if(BUILD_TESTING)
3535
find_package(rcpputils REQUIRED)
3636
find_package(rosbag2_compression REQUIRED)
3737
find_package(rosbag2_cpp REQUIRED)
38+
find_package(rosbag2_interfaces REQUIRED)
3839
find_package(rosbag2_storage REQUIRED)
3940
find_package(rosbag2_storage_default_plugins REQUIRED)
4041
find_package(rosbag2_test_common REQUIRED)
@@ -63,11 +64,11 @@ if(BUILD_TESTING)
6364
if(TARGET test_rosbag2_play_end_to_end)
6465
target_link_libraries(test_rosbag2_play_end_to_end
6566
rclcpp::rclcpp
67+
${rosbag2_interfaces_TARGETS}
6668
rosbag2_storage::rosbag2_storage
6769
rosbag2_test_common::rosbag2_test_common
6870
${test_msgs_TARGETS}
6971
)
70-
ament_add_test_label(test_rosbag2_play_end_to_end xfail)
7172
endif()
7273

7374
ament_add_gmock(test_rosbag2_info_end_to_end
@@ -78,7 +79,6 @@ if(BUILD_TESTING)
7879
rosbag2_storage::rosbag2_storage
7980
rosbag2_test_common::rosbag2_test_common
8081
)
81-
ament_add_test_label(test_rosbag2_info_end_to_end xfail)
8282
endif()
8383

8484
ament_add_gmock(test_converter

rosbag2_tests/package.xml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
<test_depend>rosbag2_compression</test_depend>
2525
<test_depend>rosbag2_compression_zstd</test_depend>
2626
<test_depend>rosbag2_cpp</test_depend>
27+
<test_depend>rosbag2_interfaces</test_depend>
2728
<test_depend>rosbag2_storage_default_plugins</test_depend>
2829
<test_depend>rosbag2_storage</test_depend>
2930
<test_depend>rosbag2_test_common</test_depend>
Binary file not shown.

rosbag2_tests/resources/mcap/cdr_test/metadata.yaml

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
rosbag2_bagfile_information:
2-
version: 6
2+
version: 7
33
storage_identifier: mcap
44
duration:
55
nanoseconds: 151137181
@@ -11,13 +11,15 @@ rosbag2_bagfile_information:
1111
name: /array_topic
1212
type: test_msgs/msg/Arrays
1313
serialization_format: cdr
14-
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
14+
offered_qos_profiles: "- history: 3\n depth: 4\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
15+
type_description_hash: ""
1516
message_count: 4
1617
- topic_metadata:
1718
name: /test_topic
1819
type: test_msgs/msg/BasicTypes
1920
serialization_format: cdr
20-
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
21+
offered_qos_profiles: "- history: 3\n depth: 3\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
22+
type_description_hash: ""
2123
message_count: 3
2224
compression_format: ""
2325
compression_mode: ""
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,35 @@
11
rosbag2_bagfile_information:
2-
version: 4
2+
version: 7
33
storage_identifier: sqlite3
4-
relative_file_paths:
5-
- cdr_test_0.db3
64
duration:
75
nanoseconds: 151137181
86
starting_time:
97
nanoseconds_since_epoch: 1586406456763032325
108
message_count: 7
119
topics_with_message_count:
12-
- topic_metadata:
13-
name: /test_topic
14-
type: test_msgs/msg/BasicTypes
15-
serialization_format: cdr
16-
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
17-
message_count: 3
1810
- topic_metadata:
1911
name: /array_topic
2012
type: test_msgs/msg/Arrays
2113
serialization_format: cdr
22-
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
14+
offered_qos_profiles: "- history: 3\n depth: 4\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
15+
type_description_hash: ""
2316
message_count: 4
17+
- topic_metadata:
18+
name: /test_topic
19+
type: test_msgs/msg/BasicTypes
20+
serialization_format: cdr
21+
offered_qos_profiles: "- history: 3\n depth: 3\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
22+
type_description_hash: ""
23+
message_count: 3
2424
compression_format: ""
2525
compression_mode: ""
26+
relative_file_paths:
27+
- cdr_test_0.db3
28+
files:
29+
- path: cdr_test_0.db3
30+
starting_time:
31+
nanoseconds_since_epoch: 1586406456763032325
32+
duration:
33+
nanoseconds: 151137181
34+
message_count: 7
35+
custom_data: ~

0 commit comments

Comments
 (0)