Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions src/ansys/hps/data_transfer/client/api/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ def __init__(self):
"""Initializes the WaitHandler class object."""
self.start = time.time()
self.report_threshold = 2.0 # seconds
self.min_progress_interval = 3.0 # seconds
self.last_progress = self.start
self.min_progress_interval = 15.0 # seconds
self.last_progress = {}
self.completed_ids = []

def __call__(self, ops: list[Operation]):
"""Handle operations after they are fetched."""
Expand All @@ -60,9 +61,15 @@ def _log_ops(self, ops: list[Operation]) -> str:
for op in ops:
if op.children_detail is not None and self.Meta.expand_group:
for ch in op.children_detail or []:
# For ops with lots of children, avoid logging completed children on every loop iteration
# If there are 29 small files and 1 large file in the group, we dont want to log 29 files
# are done 1000 times while the large file copies.
if ch.id not in self.completed_ids:
self._log_op(logging.DEBUG, ch)
if ch.state not in self.final:
num_running += 1
self._log_op(logging.DEBUG, ch)
elif ch.id not in self.completed_ids:
self.completed_ids.append(ch.id)

if op.state not in self.final:
num_running += 1
Expand All @@ -73,7 +80,7 @@ def _log_op(self, lvl: int, op: Operation):
op_type = "operation" if op.children is None or len(op.children) == 0 else "operation group"
op_done = op.state in self.final

msg = f"Data transfer {op_type} '{op.description}'({op.id}) done? {op_done}"
msg = f"Data transfer {op_type} '{op.description}'({op.id}) {'finished. ' if op_done else 'is in progress. '}"

try:
start = op.started_at
Expand All @@ -88,17 +95,22 @@ def _log_op(self, lvl: int, op: Operation):
duration = 0
duration_str = "unknown"

# Initialize last progress time if not set, set it back in time
# so it logs right away (report threshold) the first time.
if self.last_progress.get(op.id, None) is None:
self.last_progress[op.id] = time.time() - self.min_progress_interval

state = op.state.value
if op_done:
msg += f" has {state} after {duration_str}"
msg += f"{state.title()} after {duration_str}"
msg += self._info_str(op)
# if op.messages:
# msg += f', messages="{"; ".join(op.messages)}"'
log.log(lvl, msg)
elif duration > self.report_threshold and time.time() - self.last_progress > self.min_progress_interval:
self.last_progress = time.time()
msg += f" is {state}, {duration_str} so far"
if op.progress_current > 0:
elif duration > self.report_threshold and time.time() - self.last_progress[op.id] > self.min_progress_interval:
self.last_progress[op.id] = time.time()
msg += f"{state.title()} for {duration_str}"
if op.progress and op.progress > 0.0:
msg += f", progress {op.progress * 100.0:.1f}%"
msg += self._info_str(op)
log.log(lvl, msg)
Expand Down