Skip to content

Commit 2571116

Browse files
authored
Send task state monitoring record as part of general state update (#4034)
Previous PR #4032 should make this PR a straightforward refactoring. After this PR, _send_task_info invocations and monitoring TASK_INFO message sends now happen in only one place. # Changed Behaviour none ## Type of change - Code maintenance/cleanup
1 parent 76cc604 commit 2571116

File tree

1 file changed

+6
-12
lines changed

1 file changed

+6
-12
lines changed

parsl/dataflow/dflow.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -359,14 +359,12 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
359359

360360
# record the final state for this try before we mutate for retries
361361
self._update_task_state(task_record, States.fail_retryable)
362-
self._send_task_info(task_record)
363362

364363
task_record['try_id'] += 1
365364
task_record['try_time_launched'] = None
366365
task_record['try_time_returned'] = None
367366
task_record['fail_history'] = []
368367
self._update_task_state(task_record, States.pending)
369-
self._send_task_info(task_record)
370368

371369
logger.info("Task {} marked for retry".format(task_id))
372370

@@ -395,19 +393,16 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
395393
task_record['joins'] = joinable
396394
task_record['join_lock'] = threading.Lock()
397395
self._update_task_state(task_record, States.joining)
398-
self._send_task_info(task_record)
399396
joinable.add_done_callback(partial(self.handle_join_update, task_record))
400397
elif joinable == []: # got a list, but it had no entries, and specifically, no Futures.
401398
task_record['joins'] = joinable
402399
task_record['join_lock'] = threading.Lock()
403400
self._update_task_state(task_record, States.joining)
404-
self._send_task_info(task_record)
405401
self.handle_join_update(task_record, None)
406402
elif isinstance(joinable, list) and [j for j in joinable if not isinstance(j, Future)] == []:
407403
task_record['joins'] = joinable
408404
task_record['join_lock'] = threading.Lock()
409405
self._update_task_state(task_record, States.joining)
410-
self._send_task_info(task_record)
411406
for inner_future in joinable:
412407
inner_future.add_done_callback(partial(self.handle_join_update, task_record))
413408
else:
@@ -515,7 +510,6 @@ def _complete_task_result(self, task_record: TaskRecord, new_state: States, resu
515510
self.memoizer.update_memo_result(task_record, result)
516511

517512
self._update_task_state(task_record, new_state)
518-
self._send_task_info(task_record)
519513

520514
self.wipe_task(task_record['id'])
521515

@@ -535,16 +529,17 @@ def _complete_task_exception(self, task_record: TaskRecord, new_state: States, e
535529
self.memoizer.update_memo_exception(task_record, exception)
536530

537531
self._update_task_state(task_record, new_state)
538-
self._send_task_info(task_record)
539532

540533
self.wipe_task(task_record['id'])
541534

542535
with task_record['app_fu']._update_lock:
543536
task_record['app_fu'].set_exception(exception)
544537

545538
def _update_task_state(self, task_record: TaskRecord, new_state: States) -> None:
546-
"""Updates a task record state, and recording an appropriate change
547-
to task state counters.
539+
"""Updates a task record state including accompanying consistency-keeping:
540+
541+
* record change in task state counters
542+
* record change in monitoring
548543
"""
549544

550545
with self.task_state_counts_lock:
@@ -553,6 +548,8 @@ def _update_task_state(self, task_record: TaskRecord, new_state: States) -> None
553548
self.task_state_counts[new_state] += 1
554549
task_record['status'] = new_state
555550

551+
self._send_task_info(task_record)
552+
556553
@staticmethod
557554
def _unwrap_remote_exception_wrapper(future: Future) -> Any:
558555
result = future.result()
@@ -705,7 +702,6 @@ def launch_task(self, task_record: TaskRecord) -> Future:
705702
exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs)
706703

707704
self._update_task_state(task_record, States.launched)
708-
self._send_task_info(task_record)
709705

710706
if hasattr(exec_fu, "parsl_executor_task_id"):
711707
logger.info(
@@ -1025,9 +1021,7 @@ def submit(self,
10251021
waiting_message))
10261022

10271023
logger.debug("Task {} set to pending state with AppFuture: {}".format(task_id, task_record['app_fu']))
1028-
10291024
self._update_task_state(task_record, States.pending)
1030-
self._send_task_info(task_record)
10311025

10321026
assert task_id not in self.tasks
10331027
self.tasks[task_id] = task_record

0 commit comments

Comments
 (0)