Skip to content

Commit 80ecbd8

Browse files
Janosch MachowinskiJanosch Machowinski
Janosch Machowinski
authored and
Janosch Machowinski
committed
feat(MultiThreadedExecutor): Added ability to handle exceptions from threads
This commit adds external exception handling for the worker threads, allowing application code to implement custom exception handling. Signed-off-by: Janosch Machowinski <[email protected]>
1 parent 265f5ec commit 80ecbd8

File tree

2 files changed

+58
-20
lines changed

2 files changed

+58
-20
lines changed

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,26 @@ class MultiThreadedExecutor : public rclcpp::Executor
6969
void
7070
spin() override;
7171

72+
/**
73+
* \sa rclcpp::Executor:spin() for more details
74+
* \throws std::runtime_error when spin() called while already spinning
75+
* @param exception_handler will be called for every exception in the processing threads
76+
*
77+
* The exception_handler can be called from multiple threads at the same time.
78+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
79+
*/
80+
RCLCPP_PUBLIC
81+
void
82+
spin(std::function<void(const std::exception & e)> exception_handler);
83+
7284
RCLCPP_PUBLIC
7385
size_t
7486
get_number_of_threads();
7587

7688
protected:
7789
RCLCPP_PUBLIC
7890
void
79-
run(size_t this_thread_number);
91+
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);
8092

8193
private:
8294
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

+45-19
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
5151

5252
void
5353
MultiThreadedExecutor::spin()
54+
{
55+
spin(std::function<void(const std::exception &)>());
56+
}
57+
58+
void
59+
60+
MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
5461
{
5562
if (spinning.exchange(true)) {
5663
throw std::runtime_error("spin() called while already spinning");
@@ -61,12 +68,12 @@ MultiThreadedExecutor::spin()
6168
{
6269
std::lock_guard wait_lock{wait_mutex_};
6370
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
64-
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
71+
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id, exception_handler);
6572
threads.emplace_back(func);
6673
}
6774
}
6875

69-
run(thread_id);
76+
run(thread_id, exception_handler);
7077
for (auto & thread : threads) {
7178
thread.join();
7279
}
@@ -79,28 +86,47 @@ MultiThreadedExecutor::get_number_of_threads()
7986
}
8087

8188
void
82-
MultiThreadedExecutor::run(size_t this_thread_number)
89+
MultiThreadedExecutor::run(
90+
size_t this_thread_number,
91+
std::function<void(const std::exception & e)> exception_handler)
8392
{
8493
(void)this_thread_number;
85-
while (rclcpp::ok(this->context_) && spinning.load()) {
86-
rclcpp::AnyExecutable any_exec;
94+
const auto run_inner = [this]()
8795
{
88-
std::lock_guard wait_lock{wait_mutex_};
89-
if (!rclcpp::ok(this->context_) || !spinning.load()) {
90-
return;
91-
}
92-
if (!get_next_executable(any_exec, next_exec_timeout_)) {
93-
continue;
96+
while (rclcpp::ok(this->context_) && spinning.load()) {
97+
rclcpp::AnyExecutable any_exec;
98+
{
99+
std::lock_guard wait_lock{wait_mutex_};
100+
if (!rclcpp::ok(this->context_) || !spinning.load()) {
101+
return;
102+
}
103+
if (!get_next_executable(any_exec, next_exec_timeout_)) {
104+
continue;
105+
}
106+
}
107+
if (yield_before_execute_) {
108+
std::this_thread::yield();
109+
}
110+
111+
execute_any_executable(any_exec);
112+
113+
// Clear the callback_group to prevent the AnyExecutable destructor from
114+
// resetting the callback group `can_be_taken_from`
115+
any_exec.callback_group.reset();
94116
}
95-
}
96-
if (yield_before_execute_) {
97-
std::this_thread::yield();
98-
}
117+
};
99118

100-
execute_any_executable(any_exec);
119+
if (exception_handler) {
120+
try {
121+
run_inner();
122+
} catch (const std::exception & e) {
123+
RCLCPP_ERROR_STREAM(
124+
rclcpp::get_logger("rclcpp"),
125+
"Exception while spinning : " << e.what());
101126

102-
// Clear the callback_group to prevent the AnyExecutable destructor from
103-
// resetting the callback group `can_be_taken_from`
104-
any_exec.callback_group.reset();
127+
exception_handler(e);
128+
}
129+
} else {
130+
run_inner();
105131
}
106132
}

0 commit comments

Comments
 (0)