Skip to content

Commit

Permalink
return task states from run_blockwise instead of just a boolean for…
Browse files Browse the repository at this point in the history
… whether or not all blocks finished.

This allows finer grained views into how many succeeded vs failed vs skipped etc.
  • Loading branch information
pattonw committed Oct 24, 2024
1 parent cd6e671 commit 052cc50
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
3 changes: 2 additions & 1 deletion daisy/convenience.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ def run_blockwise(tasks, multiprocessing=True):
def _run_blockwise(tasks, stop_event):
server = Server(stop_event=stop_event)
cl_monitor = CLMonitor(server) # noqa
return server.run_blockwise(tasks)
task_states = server.run_blockwise(tasks)
return all(task_state.is_done() for task_state in task_states.values())
8 changes: 4 additions & 4 deletions daisy/serial_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .block import BlockStatus
from .scheduler import Scheduler
from .task_state import TaskState
from .server_observer import ServerObservee
import logging

Expand All @@ -10,7 +11,7 @@ class SerialServer(ServerObservee):
def __init__(self):
super().__init__()

def run_blockwise(self, tasks, scheduler=None):
def run_blockwise(self, tasks, scheduler=None) -> dict[str, TaskState]:
if scheduler is None:
scheduler = Scheduler(tasks)
else:
Expand Down Expand Up @@ -60,6 +61,5 @@ def run_blockwise(self, tasks, scheduler=None):
del process_funcs[block.task_id]

if len(process_funcs) == 0:
return True

self.notify_server_exit()
self.notify_server_exit()
return scheduler.task_states
5 changes: 3 additions & 2 deletions daisy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
UnexpectedMessage,
)
from .scheduler import Scheduler
from .task_state import TaskState
from .server_observer import ServerObservee
from .task_worker_pools import TaskWorkerPools
from .tcp import TCPServer
Expand Down Expand Up @@ -40,7 +41,7 @@ def __init__(self, stop_event=None):

logger.debug("Started server listening at %s:%s", self.hostname, self.port)

def run_blockwise(self, tasks, scheduler=None):
def run_blockwise(self, tasks, scheduler=None) -> dict[str, TaskState]:

if scheduler is None:
self.scheduler = Scheduler(tasks)
Expand Down Expand Up @@ -68,7 +69,7 @@ def run_blockwise(self, tasks, scheduler=None):
logger.debug("TCP streams closed.")
self.notify_server_exit()

return True if self.all_done else False
return self.scheduler.task_states

def _event_loop(self):
last_time = time()
Expand Down

0 comments on commit 052cc50

Please sign in to comment.