Skip to content

Commit cc6c0bc

Browse files
author
Janosch Machowinski
committed
feat(Executor): Added spin API with exception handler.
Signed-off-by: Janosch Machowinski <[email protected]> # Conflicts: # rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp # rclcpp/test/rclcpp/executors/test_multi_threaded_executor.cpp
1 parent 3edeb90 commit cc6c0bc

15 files changed

+328
-51
lines changed

rclcpp/include/rclcpp/executor.hpp

+24
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ class Executor
8282
virtual void
8383
spin() = 0;
8484

85+
/**
86+
* \sa rclcpp::Executor:spin() for more details
87+
* \throws std::runtime_error when spin() called while already spinning
88+
* @param exception_handler will be called for every exception in the processing threads
89+
*
90+
* The exception_handler can be called from multiple threads at the same time.
91+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
92+
*/
93+
RCLCPP_PUBLIC
94+
virtual void
95+
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0;
96+
8597
/// Add a callback group to an executor.
8698
/**
8799
* An executor can have zero or more callback groups which provide work during `spin` functions.
@@ -470,6 +482,18 @@ class Executor
470482
void
471483
execute_any_executable(AnyExecutable & any_exec);
472484

485+
/// Find the next available executable and do the work associated with it.
486+
/**
487+
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
488+
* service, client).
489+
* \throws std::runtime_error if there is an issue triggering the guard condition
490+
*/
491+
RCLCPP_PUBLIC
492+
void
493+
execute_any_executable(
494+
AnyExecutable & any_exec,
495+
const std::function<void(const std::exception & e)> & exception_handler);
496+
473497
/// Run subscription executable.
474498
/**
475499
* Do necessary setup and tear-down as well as executing the subscription.

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
7979
*/
8080
RCLCPP_PUBLIC
8181
void
82-
spin(std::function<void(const std::exception & e)> exception_handler);
82+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
8383

8484
RCLCPP_PUBLIC
8585
size_t
@@ -88,7 +88,9 @@ class MultiThreadedExecutor : public rclcpp::Executor
8888
protected:
8989
RCLCPP_PUBLIC
9090
void
91-
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);
91+
run(
92+
size_t this_thread_number,
93+
const std::function<void(const std::exception & e)> & exception_handler);
9294

9395
private:
9496
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

rclcpp/include/rclcpp/executors/single_threaded_executor.hpp

+11
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,17 @@ class SingleThreadedExecutor : public rclcpp::Executor
6565
void
6666
spin() override;
6767

68+
/**
69+
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
70+
* \throws std::runtime_error when spin() called while already spinning
71+
* @param exception_handler will be called for every exception in the processing threads
72+
*
73+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
74+
*/
75+
RCLCPP_PUBLIC
76+
void
77+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
78+
6879
private:
6980
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
7081
};

rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp

+12
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,18 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
6868
void
6969
spin() override;
7070

71+
/// Static executor implementation of spin.
72+
/**
73+
* \sa rclcpp::StaticSingleThreadedExecutor:spin() for more details
74+
* \throws std::runtime_error when spin() called while already spinning
75+
* @param exception_handler will be called for every exception in the processing threads
76+
*
77+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
78+
*/
79+
RCLCPP_PUBLIC
80+
void
81+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
82+
7183
/// Static executor implementation of spin some
7284
/**
7385
* This non-blocking function will execute entities that

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

+12
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor
9292
void
9393
spin() override;
9494

95+
/**
96+
* \sa rclcpp::Executor:spin() for more details
97+
* \throws std::runtime_error when spin() called while already spinning
98+
* @param exception_handler will be called for every exception in the processing threads
99+
*
100+
* The exception_handler can be called from multiple threads at the same time.
101+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
102+
*/
103+
RCLCPP_PUBLIC
104+
void
105+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
106+
95107
/// Events executor implementation of spin some
96108
/**
97109
* This non-blocking function will execute the timers and events

rclcpp/include/rclcpp/experimental/timers_manager.hpp

+12-2
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,14 @@ class TimersManager
124124
/**
125125
* @brief Starts a thread that takes care of executing the timers stored in this object.
126126
* Function will throw an error if the timers thread was already running.
127+
*
128+
* @param exception_handler if valid, the execution of the timer will be done in a try catch block,
129+
* and any occurring exception will be passed to the given handler
127130
*/
128131
RCLCPP_PUBLIC
129-
void start();
132+
void start(
133+
const std::function<void(const std::exception & e)> & exception_handler =
134+
std::function<void(const std::exception & e)>());
130135

131136
/**
132137
* @brief Stops the timers thread.
@@ -509,6 +514,11 @@ class TimersManager
509514
*/
510515
void run_timers();
511516

517+
/**
518+
* @brief calls run_timers with a try catch block.
519+
*/
520+
void run_timers(const std::function<void(const std::exception & e)> & exception_handler);
521+
512522
/**
513523
* @brief Get the amount of time before the next timer triggers.
514524
* This function is not thread safe, acquire a mutex before calling it.
@@ -526,7 +536,7 @@ class TimersManager
526536
* while keeping the heap correctly sorted.
527537
* This function is not thread safe, acquire the timers_mutex_ before calling it.
528538
*/
529-
void execute_ready_timers_unsafe();
539+
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler);
530540

531541
// Callback to be called when timer is ready
532542
std::function<void(const rclcpp::TimerBase *)> on_ready_callback_;

rclcpp/src/rclcpp/executor.cpp

+63-2
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,8 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
379379
return;
380380
}
381381

382+
assert(any_exec.callback_group);
383+
382384
if (any_exec.timer) {
383385
TRACETOOLS_TRACEPOINT(
384386
rclcpp_executor_execute,
@@ -403,11 +405,70 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
403405
}
404406

405407
// Reset the callback_group, regardless of type
406-
if (any_exec.callback_group) {
407-
any_exec.callback_group->can_be_taken_from().store(true);
408+
any_exec.callback_group->can_be_taken_from().store(true);
409+
}
410+
411+
template<typename Function>
412+
void execute_guarded(
413+
const Function & fun,
414+
const std::function<void(const std::exception & e)> & exception_handler)
415+
{
416+
try {
417+
fun();
418+
} catch (const std::exception & e) {
419+
RCLCPP_ERROR_STREAM(
420+
rclcpp::get_logger("rclcpp"),
421+
"Exception while spinning : " << e.what());
422+
423+
exception_handler(e);
408424
}
409425
}
410426

427+
void
428+
Executor::execute_any_executable(
429+
AnyExecutable & any_exec,
430+
const std::function<void(const std::exception & e)> & exception_handler)
431+
{
432+
if (!spinning.load()) {
433+
return;
434+
}
435+
436+
assert(any_exec.callback_group);
437+
438+
if (any_exec.timer) {
439+
TRACETOOLS_TRACEPOINT(
440+
rclcpp_executor_execute,
441+
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
442+
execute_guarded([&any_exec]() {execute_timer(any_exec.timer);}, exception_handler);
443+
}
444+
if (any_exec.subscription) {
445+
TRACETOOLS_TRACEPOINT(
446+
rclcpp_executor_execute,
447+
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
448+
execute_guarded(
449+
[&any_exec]() {
450+
execute_subscription(any_exec.subscription);
451+
}, exception_handler);
452+
}
453+
if (any_exec.service) {
454+
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler);
455+
}
456+
if (any_exec.client) {
457+
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler);
458+
}
459+
if (any_exec.waitable) {
460+
execute_guarded(
461+
[&any_exec]() {
462+
const std::shared_ptr<void> & const_data = any_exec.data;
463+
any_exec.waitable->execute(const_data);
464+
}, exception_handler);
465+
}
466+
467+
// Reset the callback_group, regardless of type
468+
any_exec.callback_group->can_be_taken_from().store(true);
469+
}
470+
471+
411472
template<typename Taker, typename Handler>
412473
static
413474
void

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

+28-38
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ MultiThreadedExecutor::spin()
5757

5858
void
5959

60-
MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
60+
MultiThreadedExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
6161
{
6262
if (spinning.exchange(true)) {
6363
throw std::runtime_error("spin() called while already spinning");
@@ -88,48 +88,38 @@ MultiThreadedExecutor::get_number_of_threads()
8888
void
8989
MultiThreadedExecutor::run(
9090
size_t this_thread_number,
91-
std::function<void(const std::exception & e)> exception_handler)
91+
const std::function<void(const std::exception & e)> & exception_handler)
9292
{
93-
try {
94-
(void)this_thread_number;
95-
while (rclcpp::ok(this->context_) && spinning.load()) {
96-
rclcpp::AnyExecutable any_exec;
97-
{
98-
std::lock_guard wait_lock{wait_mutex_};
99-
if (!rclcpp::ok(this->context_) || !spinning.load()) {
100-
return;
101-
}
102-
if (!get_next_executable(any_exec, next_exec_timeout_)) {
103-
continue;
104-
}
93+
(void)this_thread_number;
94+
while (rclcpp::ok(this->context_) && spinning.load()) {
95+
rclcpp::AnyExecutable any_exec;
96+
{
97+
std::lock_guard wait_lock{wait_mutex_};
98+
if (!rclcpp::ok(this->context_) || !spinning.load()) {
99+
return;
105100
}
106-
if (yield_before_execute_) {
107-
std::this_thread::yield();
101+
if (!get_next_executable(any_exec, next_exec_timeout_)) {
102+
continue;
108103
}
104+
}
105+
if (yield_before_execute_) {
106+
std::this_thread::yield();
107+
}
109108

110-
execute_any_executable(any_exec);
111-
112-
if (any_exec.callback_group &&
113-
any_exec.callback_group->type() == CallbackGroupType::MutuallyExclusive)
114-
{
115-
try {
116-
interrupt_guard_condition_->trigger();
117-
} catch (const rclcpp::exceptions::RCLError & ex) {
118-
throw std::runtime_error(
119-
std::string(
120-
"Failed to trigger guard condition on callback group change: ") + ex.what());
121-
}
122-
}
109+
execute_any_executable(any_exec, exception_handler);
123110

124-
// Clear the callback_group to prevent the AnyExecutable destructor from
125-
// resetting the callback group `can_be_taken_from`
126-
any_exec.callback_group.reset();
127-
}
128-
} catch (const std::exception & e) {
129-
RCLCPP_ERROR_STREAM(
130-
rclcpp::get_logger("rclcpp"),
131-
"Exception while spinning : " << e.what());
111+
// Clear the callback_group to prevent the AnyExecutable destructor from
112+
// resetting the callback group `can_be_taken_from`
113+
any_exec.callback_group.reset();
132114

133-
exception_handler(e);
115+
// Wake the wait, because it may need to be recalculated or work that
116+
// was previously blocked is now available.
117+
try {
118+
interrupt_guard_condition_->trigger();
119+
} catch (const rclcpp::exceptions::RCLError & ex) {
120+
throw std::runtime_error(
121+
std::string(
122+
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
123+
}
134124
}
135125
}

rclcpp/src/rclcpp/executors/single_threaded_executor.cpp

+22
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,25 @@ SingleThreadedExecutor::spin()
4343
}
4444
}
4545
}
46+
47+
48+
void
49+
SingleThreadedExecutor::spin(
50+
const std::function<void(const std::exception & e)> & exception_handler)
51+
{
52+
if (spinning.exchange(true)) {
53+
throw std::runtime_error("spin() called while already spinning");
54+
}
55+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
56+
57+
// Clear any previous result and rebuild the waitset
58+
this->wait_result_.reset();
59+
this->entities_need_rebuild_ = true;
60+
61+
while (rclcpp::ok(this->context_) && spinning.load()) {
62+
rclcpp::AnyExecutable any_executable;
63+
if (get_next_executable(any_executable)) {
64+
execute_any_executable(any_executable, exception_handler);
65+
}
66+
}
67+
}

rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp

+32
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,38 @@ StaticSingleThreadedExecutor::spin()
4343
}
4444
}
4545

46+
void
47+
StaticSingleThreadedExecutor::spin(
48+
const std::function<void(const std::exception & e)> & exception_handler)
49+
{
50+
if (spinning.exchange(true)) {
51+
throw std::runtime_error("spin() called while already spinning");
52+
}
53+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
54+
55+
while (rclcpp::ok(context_) && spinning.load()) {
56+
// Get executables that are ready now
57+
std::lock_guard<std::mutex> guard(mutex_);
58+
59+
auto wait_result = this->collect_and_wait(std::chrono::nanoseconds(-1));
60+
if (wait_result.has_value()) {
61+
try {
62+
// Execute ready executables
63+
this->execute_ready_executables(
64+
current_collection_,
65+
wait_result.value(),
66+
false);
67+
} catch (const std::exception & e) {
68+
RCLCPP_ERROR_STREAM(
69+
rclcpp::get_logger("rclcpp"),
70+
"Exception while spinning : " << e.what());
71+
72+
exception_handler(e);
73+
}
74+
}
75+
}
76+
}
77+
4678
void
4779
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
4880
{

0 commit comments

Comments
 (0)