Skip to content

Commit 6b093f4

Browse files
authored
Fix race condition at exit of monitoring filesystem radio (#4041)
This was uncovered by removal of an unrelated sleep in PR #4037, but I think it will present itself (as missing monitoring data, rather than as an error/exception) in user-facing code. ### Prior to this PR A monitoring message could be written to the new_dir and then shortly after the exit event could be set, sufficiently close in time that the monitoring radio receiver loop exited without seeing those new message files. This PR modifies the exit behaviour of that loop to have one final iteration with the following ordering of events: 1. monitoring messages are written 2. task completes 3. parsl begins to shut down 4. monitoring radio exit event is set by DFK 5. monitoring radio loop observes exit event ### As of this PR 6. monitoring radio loop performs one final processing of directory The new behaviour here is step 6, that a final directory processing will always happen strictly after the exit event is set, which is strictly after the monitoring messages are written in step 1, assuming directories are consistently observable from different places in the filesystem. The misbehaviour can be observed by increasing the delay time of the loop before this PR (for example to 10 seconds) and running the test suite. With this race condition addressed, the loop poll period can be made longer and this PR arbitrarily increases it from 1 second to 10 seconds - although it could also be made configurable. # Changed Behaviour I expect some situations where end of task monitoring data may have been missing to now not be missing that data. ## Type of change - Bug fix
1 parent 2571116 commit 6b093f4

File tree

1 file changed

+14
-3
lines changed

1 file changed

+14
-3
lines changed

parsl/monitoring/radios/filesystem_router.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import logging
44
import os
55
import pickle
6-
import time
76
from multiprocessing.queues import Queue
87
from multiprocessing.synchronize import Event
98
from typing import cast
@@ -18,6 +17,9 @@
1817

1918
logger = logging.getLogger(__name__)
2019

20+
# how often the router will scan the new message directory
21+
POLL_PERIOD_S = 10
22+
2123

2224
@wrap_with_logs
2325
def filesystem_router_starter(*, q: Queue[TaggedMonitoringMessage], run_dir: str, exit_event: Event) -> None:
@@ -36,9 +38,19 @@ def filesystem_router_starter(*, q: Queue[TaggedMonitoringMessage], run_dir: str
3638
os.makedirs(tmp_dir, exist_ok=True)
3739
os.makedirs(new_dir, exist_ok=True)
3840

39-
while not exit_event.is_set():
41+
loop = True
42+
43+
while loop:
4044
logger.debug("Start filesystem radio receiver loop")
4145

46+
# this happens before the final poll of the directory so that
47+
# one complete pass over the new_dir will happen strictly after
48+
# the exit_event is set. Without forcing that final pass, there
49+
# can be a race between exiting on exit_event after the files
50+
# are added.
51+
if exit_event.wait(POLL_PERIOD_S):
52+
loop = False
53+
4254
# iterate over files in new_dir
4355
for filename in os.listdir(new_dir):
4456
try:
@@ -53,7 +65,6 @@ def filesystem_router_starter(*, q: Queue[TaggedMonitoringMessage], run_dir: str
5365
except Exception:
5466
logger.exception("Exception processing %s - probably will be retried next iteration", filename)
5567

56-
time.sleep(1) # whats a good time for this poll?
5768
logger.info("Ending filesystem radio receiver")
5869

5970

0 commit comments

Comments
 (0)