Skip to content

Commit 4f32af5

Browse files
authored
Refactor launch service run_async loop to wait on futures and queued events (ros2#449)
Fixes ros2/launch_ros#169 Otherwise, it's possible to get into a hung state where we wait for an event, even though there are no more events. This is because the check for an "idle" state evaluates to "True" as we wait for some futures to complete. By waiting for futures and events concurrently, we can avoid this problem. Further, we don't have to wait for an event if there's nothing in the queue. Signed-off-by: Jacob Perron <[email protected]> * Wait until one event is processed or nothing done (timeout) Signed-off-by: Jacob Perron <[email protected]> * Fix bugs Always create a task to wait on and cancel it if there is a timeout. Signed-off-by: Jacob Perron <[email protected]> * Avoid canceling an event mid-processing Signed-off-by: Jacob Perron <[email protected]> * minor refactor Signed-off-by: Jacob Perron <[email protected]> * Guard against leaving a task pending Signed-off-by: Jacob Perron <[email protected]> * Further refactoring We don't need to have an inner loop or timeout when waiting on futures. Signed-off-by: Jacob Perron <[email protected]> * Only wait on futures if there are no events in the queue This prevents spurious wake-ups when we're waiting for an event to finish processing. Signed-off-by: Jacob Perron <[email protected]>
1 parent f3f0d69 commit 4f32af5

File tree

1 file changed

+29
-19
lines changed

1 file changed

+29
-19
lines changed

launch/launch/launch_service.py

+29-19
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ def _on_exception(loop, context):
329329
return loop.default_exception_handler(context)
330330
this_loop.set_exception_handler(_on_exception)
331331

332+
process_one_event_task = None
332333
while True:
333334
try:
334335
# Check if we're idle, i.e. no on-going entities (actions) or events in
@@ -338,28 +339,37 @@ def _on_exception(loop, context):
338339
ret = await self._shutdown(reason='idle', due_to_sigint=False)
339340
assert ret is None, ret
340341
continue
341-
process_one_event_task = this_loop.create_task(self._process_one_event())
342-
if self.__shutting_down:
343-
# If shutting down and idle then we're done.
344-
if is_idle:
342+
343+
# Stop running if we're shutting down and there's no more work
344+
if self.__shutting_down and is_idle:
345+
if (
346+
process_one_event_task is not None and
347+
not process_one_event_task.done()
348+
):
345349
process_one_event_task.cancel()
346-
break
350+
break
351+
352+
# Collect futures to wait on
353+
# We only need to wait on futures if there are no events to wait on
354+
entity_futures = []
355+
if self.__context._event_queue.empty():
347356
entity_futures = [pair[1] for pair in self._entity_future_pairs]
348-
entity_futures.append(process_one_event_task)
349357
entity_futures.extend(self.__context._completion_futures)
350-
done = set() # type: Set[asyncio.Future]
351-
while not done:
352-
done, pending = await asyncio.wait(
353-
entity_futures,
354-
timeout=1.0,
355-
return_when=asyncio.FIRST_COMPLETED
356-
)
357-
if not done:
358-
self.__logger.debug(
359-
'still waiting on futures: {}'.format(entity_futures)
360-
)
361-
else:
362-
await process_one_event_task
358+
359+
# If the current task is done, create a new task to process any events
360+
# in the queue
361+
if process_one_event_task is None or process_one_event_task.done():
362+
process_one_event_task = this_loop.create_task(self._process_one_event())
363+
364+
# Add the process event task to the list of awaitables
365+
entity_futures.append(process_one_event_task)
366+
367+
# Wait on events and futures
368+
await asyncio.wait(
369+
entity_futures,
370+
return_when=asyncio.FIRST_COMPLETED
371+
)
372+
363373
except KeyboardInterrupt:
364374
continue
365375
except asyncio.CancelledError:

0 commit comments

Comments
 (0)