Skip to content

Commit 7fd9c1a

Browse files
Janosch MachowinskiJanosch Machowinski
Janosch Machowinski
authored and
Janosch Machowinski
committed
feat(MultiThreadedExecutor): Added ability to handle exceptions from threads
This commit adds external exception handling for the worker threads, allowing application code to implement custom exception handling. Signed-off-by: Janosch Machowinski <[email protected]>
1 parent c500695 commit 7fd9c1a

File tree

3 files changed

+95
-20
lines changed

3 files changed

+95
-20
lines changed

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,26 @@ class MultiThreadedExecutor : public rclcpp::Executor
6969
void
7070
spin() override;
7171

72+
/**
73+
* \sa rclcpp::Executor:spin() for more details
74+
* \throws std::runtime_error when spin() called while already spinning
75+
* @param exception_handler will be called for every exception in the processing threads
76+
*
77+
* The exception_handler can be called from multiple threads at the same time.
78+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
79+
*/
80+
RCLCPP_PUBLIC
81+
void
82+
spin(std::function<void(const std::exception & e)> exception_handler);
83+
7284
RCLCPP_PUBLIC
7385
size_t
7486
get_number_of_threads();
7587

7688
protected:
7789
RCLCPP_PUBLIC
7890
void
79-
run(size_t this_thread_number);
91+
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);
8092

8193
private:
8294
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

+38-19
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
5151

5252
void
5353
MultiThreadedExecutor::spin()
54+
{
55+
spin([](const std::exception & e) {throw e;});
56+
}
57+
58+
void
59+
60+
MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
5461
{
5562
if (spinning.exchange(true)) {
5663
throw std::runtime_error("spin() called while already spinning");
@@ -61,12 +68,12 @@ MultiThreadedExecutor::spin()
6168
{
6269
std::lock_guard wait_lock{wait_mutex_};
6370
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
64-
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
71+
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
6572
threads.emplace_back(func);
6673
}
6774
}
6875

69-
run(thread_id);
76+
run(thread_id, exception_handler);
7077
for (auto & thread : threads) {
7178
thread.join();
7279
}
@@ -79,28 +86,40 @@ MultiThreadedExecutor::get_number_of_threads()
7986
}
8087

8188
void
82-
MultiThreadedExecutor::run(size_t this_thread_number)
89+
MultiThreadedExecutor::run(
90+
size_t this_thread_number,
91+
std::function<void(const std::exception & e)> exception_handler)
8392
{
84-
(void)this_thread_number;
85-
while (rclcpp::ok(this->context_) && spinning.load()) {
86-
rclcpp::AnyExecutable any_exec;
87-
{
88-
std::lock_guard wait_lock{wait_mutex_};
89-
if (!rclcpp::ok(this->context_) || !spinning.load()) {
90-
return;
93+
try {
94+
95+
(void)this_thread_number;
96+
while (rclcpp::ok(this->context_) && spinning.load()) {
97+
rclcpp::AnyExecutable any_exec;
98+
{
99+
std::lock_guard wait_lock{wait_mutex_};
100+
if (!rclcpp::ok(this->context_) || !spinning.load()) {
101+
return;
102+
}
103+
if (!get_next_executable(any_exec, next_exec_timeout_)) {
104+
continue;
105+
}
91106
}
92-
if (!get_next_executable(any_exec, next_exec_timeout_)) {
93-
continue;
107+
if (yield_before_execute_) {
108+
std::this_thread::yield();
94109
}
95-
}
96-
if (yield_before_execute_) {
97-
std::this_thread::yield();
110+
111+
execute_any_executable(any_exec);
112+
113+
// Clear the callback_group to prevent the AnyExecutable destructor from
114+
// resetting the callback group `can_be_taken_from`
115+
any_exec.callback_group.reset();
98116
}
99117

100-
execute_any_executable(any_exec);
118+
} catch (const std::exception & e) {
119+
RCLCPP_ERROR_STREAM(
120+
rclcpp::get_logger("rclcpp"),
121+
"Exception while spinning : " << e.what());
101122

102-
// Clear the callback_group to prevent the AnyExecutable destructor from
103-
// resetting the callback group `can_be_taken_from`
104-
any_exec.callback_group.reset();
123+
exception_handler(e);
105124
}
106125
}

rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp

+44
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,48 @@ TEST_F(TestMultiThreadedExecutor, timer_over_take) {
9696
auto timer = node->create_wall_timer(PERIOD_MS, timer_callback, cbg);
9797
executor.add_node(node);
9898
executor.spin();
99+
100+
}
101+
102+
TEST_F(TestMultiThreadedExecutor, catch_exception) {
103+
104+
rclcpp::executors::MultiThreadedExecutor executor;
105+
106+
std::shared_ptr<rclcpp::Node> node =
107+
std::make_shared<rclcpp::Node>("test_multi_threaded_executor_catch_exception");
108+
109+
const std::string test_reason = "test exception";
110+
111+
std::atomic_bool timer_executed_after_exception = false;
112+
113+
auto timer = node->create_wall_timer(
114+
std::chrono::milliseconds(1), [test_reason, &timer_executed_after_exception, &executor]()
115+
{
116+
static size_t cnt = 0;
117+
if (cnt == 0) {
118+
cnt++;
119+
throw std::runtime_error(test_reason);
120+
}
121+
122+
timer_executed_after_exception = true;
123+
executor.cancel();
124+
});
125+
126+
std::atomic_bool caught_exception = false;
127+
128+
executor.add_node(node);
129+
executor.spin(
130+
[&caught_exception, &test_reason](const std::exception & e)
131+
{
132+
const std::runtime_error * runtime_error = dynamic_cast<const std::runtime_error *>(&e);
133+
ASSERT_NE(runtime_error, nullptr);
134+
135+
ASSERT_EQ(runtime_error->what(), test_reason);
136+
137+
caught_exception = true;
138+
}
139+
);
140+
141+
ASSERT_TRUE(caught_exception);
142+
ASSERT_TRUE(timer_executed_after_exception);
99143
}

0 commit comments

Comments
 (0)