Skip to content

Commit d557d7b

Browse files
Janosch MachowinskiJanosch Machowinski
Janosch Machowinski
authored and
Janosch Machowinski
committed
feat(Executor): Added spin API with exception handler.
Signed-off-by: Janosch Machowinski <[email protected]>
1 parent ec1aa16 commit d557d7b

16 files changed

+315
-82
lines changed

rclcpp/include/rclcpp/executor.hpp

+24
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ class Executor
8484
virtual void
8585
spin() = 0;
8686

87+
/**
88+
* \sa rclcpp::Executor:spin() for more details
89+
* \throws std::runtime_error when spin() called while already spinning
90+
* @param exception_handler will be called for every exception in the processing threads
91+
*
92+
* The exception_handler can be called from multiple threads at the same time.
93+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
94+
*/
95+
RCLCPP_PUBLIC
96+
virtual void
97+
spin(const std::function<void(const std::exception & e)> & exception_handler) = 0;
98+
8799
/// Add a callback group to an executor.
88100
/**
89101
* An executor can have zero or more callback groups which provide work during `spin` functions.
@@ -476,6 +488,18 @@ class Executor
476488
void
477489
execute_any_executable(AnyExecutable & any_exec);
478490

491+
/// Find the next available executable and do the work associated with it.
492+
/**
493+
* \param[in] any_exec Union structure that can hold any executable type (timer, subscription,
494+
* service, client).
495+
* \throws std::runtime_error if there is an issue triggering the guard condition
496+
*/
497+
RCLCPP_PUBLIC
498+
void
499+
execute_any_executable(
500+
AnyExecutable & any_exec,
501+
const std::function<void(const std::exception & e)> & exception_handler);
502+
479503
/// Run subscription executable.
480504
/**
481505
* Do necessary setup and tear-down as well as executing the subscription.

rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
7979
*/
8080
RCLCPP_PUBLIC
8181
void
82-
spin(std::function<void(const std::exception & e)> exception_handler);
82+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
8383

8484
RCLCPP_PUBLIC
8585
size_t
@@ -88,7 +88,9 @@ class MultiThreadedExecutor : public rclcpp::Executor
8888
protected:
8989
RCLCPP_PUBLIC
9090
void
91-
run(size_t this_thread_number, std::function<void(const std::exception & e)> exception_handler);
91+
run(
92+
size_t this_thread_number,
93+
const std::function<void(const std::exception & e)> & exception_handler);
9294

9395
private:
9496
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

rclcpp/include/rclcpp/executors/single_threaded_executor.hpp

+10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ class SingleThreadedExecutor : public rclcpp::Executor
6565
void
6666
spin() override;
6767

68+
/**
69+
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
70+
* \throws std::runtime_error when spin() called while already spinning
71+
* @param exception_handler will be called for every exception in the processing threads
72+
*
73+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
74+
*/RCLCPP_PUBLIC
75+
virtual void
76+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
77+
6878
private:
6979
RCLCPP_DISABLE_COPY(SingleThreadedExecutor)
7080
};

rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp

+10
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
7979
void
8080
spin() override;
8181

82+
/**
83+
* \sa rclcpp::SingleThreadedExecutor:spin() for more details
84+
* \throws std::runtime_error when spin() called while already spinning
85+
* @param exception_handler will be called for every exception in the processing threads
86+
*
87+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
88+
*/RCLCPP_PUBLIC
89+
virtual void
90+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
91+
8292
/// Static executor implementation of spin some
8393
/**
8494
* This non-blocking function will execute entities that

rclcpp/include/rclcpp/experimental/executors/events_executor/events_executor.hpp

+12
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,18 @@ class EventsExecutor : public rclcpp::Executor
9292
void
9393
spin() override;
9494

95+
/**
96+
* \sa rclcpp::Executor:spin() for more details
97+
* \throws std::runtime_error when spin() called while already spinning
98+
* @param exception_handler will be called for every exception in the processing threads
99+
*
100+
* The exception_handler can be called from multiple threads at the same time.
101+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
102+
*/
103+
RCLCPP_PUBLIC
104+
void
105+
spin(const std::function<void(const std::exception & e)> & exception_handler) override;
106+
95107
/// Events executor implementation of spin some
96108
/**
97109
* This non-blocking function will execute the timers and events

rclcpp/include/rclcpp/experimental/timers_manager.hpp

+11-2
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,13 @@ class TimersManager
124124
/**
125125
* @brief Starts a thread that takes care of executing the timers stored in this object.
126126
* Function will throw an error if the timers thread was already running.
127+
*
128+
* @param exception_handler if valid, the execution of the timer will be done in a try catch block,
129+
* and any occurring exception will be passed to the given handler
127130
*/
128131
RCLCPP_PUBLIC
129-
void start();
132+
void start(
133+
const std::function<void(const std::exception & e)> & exception_handler = std::function<void(const std::exception & e)>());
130134

131135
/**
132136
* @brief Stops the timers thread.
@@ -509,6 +513,11 @@ class TimersManager
509513
*/
510514
void run_timers();
511515

516+
/**
517+
* @brief calls run_timers with a try catch block.
518+
*/
519+
void run_timers(const std::function<void(const std::exception & e)> & exception_handler);
520+
512521
/**
513522
* @brief Get the amount of time before the next timer triggers.
514523
* This function is not thread safe, acquire a mutex before calling it.
@@ -526,7 +535,7 @@ class TimersManager
526535
* while keeping the heap correctly sorted.
527536
* This function is not thread safe, acquire the timers_mutex_ before calling it.
528537
*/
529-
void execute_ready_timers_unsafe();
538+
void execute_ready_timers_unsafe(std::function<void(const std::exception & e)> exception_handler);
530539

531540
// Callback to be called when timer is ready
532541
std::function<void(const rclcpp::TimerBase *)> on_ready_callback_;

rclcpp/src/rclcpp/executor.cpp

+66
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,72 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
575575
}
576576
}
577577

578+
template<typename Function>
579+
void execute_guarded(
580+
const Function & fun,
581+
const std::function<void(const std::exception & e)> & exception_handler)
582+
{
583+
try {
584+
fun();
585+
} catch (const std::exception & e) {
586+
RCLCPP_ERROR_STREAM(
587+
rclcpp::get_logger("rclcpp"),
588+
"Exception while spinning : " << e.what());
589+
590+
exception_handler(e);
591+
}
592+
}
593+
594+
void
595+
Executor::execute_any_executable(
596+
AnyExecutable & any_exec,
597+
const std::function<void(const std::exception & e)> & exception_handler)
598+
{
599+
if (!spinning.load()) {
600+
return;
601+
}
602+
603+
if (any_exec.timer) {
604+
TRACETOOLS_TRACEPOINT(
605+
rclcpp_executor_execute,
606+
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
607+
execute_guarded([&any_exec]() {execute_timer(any_exec.timer);}, exception_handler);
608+
}
609+
if (any_exec.subscription) {
610+
TRACETOOLS_TRACEPOINT(
611+
rclcpp_executor_execute,
612+
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
613+
execute_guarded(
614+
[&any_exec]() {
615+
execute_subscription(any_exec.subscription);
616+
}, exception_handler);
617+
618+
}
619+
if (any_exec.service) {
620+
execute_guarded([&any_exec]() {execute_service(any_exec.service);}, exception_handler);
621+
622+
}
623+
if (any_exec.client) {
624+
execute_guarded([&any_exec]() {execute_client(any_exec.client);}, exception_handler);
625+
626+
}
627+
if (any_exec.waitable) {
628+
execute_guarded([&any_exec]() {any_exec.waitable->execute(any_exec.data);}, exception_handler);
629+
}
630+
// Reset the callback_group, regardless of type
631+
any_exec.callback_group->can_be_taken_from().store(true);
632+
// Wake the wait, because it may need to be recalculated or work that
633+
// was previously blocked is now available.
634+
try {
635+
interrupt_guard_condition_->trigger();
636+
} catch (const rclcpp::exceptions::RCLError & ex) {
637+
throw std::runtime_error(
638+
std::string(
639+
"Failed to trigger guard condition from execute_any_executable: ") + ex.what());
640+
}
641+
}
642+
643+
578644
template<typename Taker, typename Handler>
579645
static
580646
void

rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp

+18-28
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ MultiThreadedExecutor::spin()
5757

5858
void
5959

60-
MultiThreadedExecutor::spin(std::function<void(const std::exception & e)> exception_handler)
60+
MultiThreadedExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
6161
{
6262
if (spinning.exchange(true)) {
6363
throw std::runtime_error("spin() called while already spinning");
@@ -88,38 +88,28 @@ MultiThreadedExecutor::get_number_of_threads()
8888
void
8989
MultiThreadedExecutor::run(
9090
size_t this_thread_number,
91-
std::function<void(const std::exception & e)> exception_handler)
91+
const std::function<void(const std::exception & e)> & exception_handler)
9292
{
93-
try {
94-
95-
(void)this_thread_number;
96-
while (rclcpp::ok(this->context_) && spinning.load()) {
97-
rclcpp::AnyExecutable any_exec;
98-
{
99-
std::lock_guard wait_lock{wait_mutex_};
100-
if (!rclcpp::ok(this->context_) || !spinning.load()) {
101-
return;
102-
}
103-
if (!get_next_executable(any_exec, next_exec_timeout_)) {
104-
continue;
105-
}
93+
(void)this_thread_number;
94+
while (rclcpp::ok(this->context_) && spinning.load()) {
95+
rclcpp::AnyExecutable any_exec;
96+
{
97+
std::lock_guard wait_lock{wait_mutex_};
98+
if (!rclcpp::ok(this->context_) || !spinning.load()) {
99+
return;
106100
}
107-
if (yield_before_execute_) {
108-
std::this_thread::yield();
101+
if (!get_next_executable(any_exec, next_exec_timeout_)) {
102+
continue;
109103
}
110-
111-
execute_any_executable(any_exec);
112-
113-
// Clear the callback_group to prevent the AnyExecutable destructor from
114-
// resetting the callback group `can_be_taken_from`
115-
any_exec.callback_group.reset();
104+
}
105+
if (yield_before_execute_) {
106+
std::this_thread::yield();
116107
}
117108

118-
} catch (const std::exception & e) {
119-
RCLCPP_ERROR_STREAM(
120-
rclcpp::get_logger("rclcpp"),
121-
"Exception while spinning : " << e.what());
109+
execute_any_executable(any_exec, exception_handler);
122110

123-
exception_handler(e);
111+
// Clear the callback_group to prevent the AnyExecutable destructor from
112+
// resetting the callback group `can_be_taken_from`
113+
any_exec.callback_group.reset();
124114
}
125115
}

rclcpp/src/rclcpp/executors/single_threaded_executor.cpp

+17
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,20 @@ SingleThreadedExecutor::spin()
3838
}
3939
}
4040
}
41+
42+
43+
void
44+
SingleThreadedExecutor::spin(
45+
const std::function<void(const std::exception & e)> & exception_handler)
46+
{
47+
if (spinning.exchange(true)) {
48+
throw std::runtime_error("spin() called while already spinning");
49+
}
50+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
51+
while (rclcpp::ok(this->context_) && spinning.load()) {
52+
rclcpp::AnyExecutable any_executable;
53+
if (get_next_executable(any_executable)) {
54+
execute_any_executable(any_executable, exception_handler);
55+
}
56+
}
57+
}

rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp

+35
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,41 @@ StaticSingleThreadedExecutor::spin()
5757
}
5858
}
5959

60+
61+
void
62+
StaticSingleThreadedExecutor::spin(
63+
const std::function<void(const std::exception & e)> & exception_handler)
64+
{
65+
if (spinning.exchange(true)) {
66+
throw std::runtime_error("spin() called while already spinning");
67+
}
68+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
69+
70+
// Set memory_strategy_ and exec_list_ based on weak_nodes_
71+
// Prepare wait_set_ based on memory_strategy_
72+
entities_collector_->init(&wait_set_, memory_strategy_);
73+
74+
while (rclcpp::ok(this->context_) && spinning.load()) {
75+
// Refresh wait set and wait for work
76+
entities_collector_->refresh_wait_set();
77+
78+
RCLCPP_ERROR_STREAM(
79+
rclcpp::get_logger("rclcpp"),
80+
"Waitset Refresh");
81+
82+
try {
83+
execute_ready_executables();
84+
85+
} catch (const std::exception & e) {
86+
RCLCPP_ERROR_STREAM(
87+
rclcpp::get_logger("rclcpp"),
88+
"Exception while spinning : " << e.what());
89+
90+
exception_handler(e);
91+
}
92+
}
93+
}
94+
6095
void
6196
StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration)
6297
{

rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp

+29
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,35 @@ EventsExecutor::spin()
124124
}
125125
}
126126

127+
void
128+
EventsExecutor::spin(const std::function<void(const std::exception & e)> & exception_handler)
129+
{
130+
if (spinning.exchange(true)) {
131+
throw std::runtime_error("spin() called while already spinning");
132+
}
133+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
134+
135+
timers_manager_->start(exception_handler);
136+
RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
137+
138+
while (rclcpp::ok(context_) && spinning.load()) {
139+
// Wait until we get an event
140+
ExecutorEvent event;
141+
bool has_event = events_queue_->dequeue(event);
142+
if (has_event) {
143+
try {
144+
this->execute_event(event);
145+
} catch (const std::exception & e) {
146+
RCLCPP_ERROR_STREAM(
147+
rclcpp::get_logger("rclcpp"),
148+
"Exception while spinning : " << e.what());
149+
150+
exception_handler(e);
151+
}
152+
}
153+
}
154+
}
155+
127156
void
128157
EventsExecutor::spin_some(std::chrono::nanoseconds max_duration)
129158
{

0 commit comments

Comments
 (0)