diff --git a/shelfmark/download/orchestrator.py b/shelfmark/download/orchestrator.py index 903bb3e7..888befb2 100644 --- a/shelfmark/download/orchestrator.py +++ b/shelfmark/download/orchestrator.py @@ -56,9 +56,11 @@ # Stall detection - track last activity time per download _last_activity: Dict[str, float] = {} +_last_progress_value: Dict[str, float] = {} # De-duplicate status updates (keep-alive updates shouldn't spam clients) _last_status_event: Dict[str, Tuple[str, Optional[str]]] = {} STALL_TIMEOUT = 300 # 5 minutes without progress/status update = stalled +COORDINATOR_LOOP_ERROR_RETRY_DELAY = 1.0 def _is_plain_email_address(value: str) -> bool: parsed = parseaddr(value or "")[1] @@ -687,9 +689,13 @@ def update_download_progress(book_id: str, progress: float) -> None: """Update download progress with throttled WebSocket broadcasts.""" book_queue.update_progress(book_id, progress) - # Track activity for stall detection + # Only real progress changes should reset stall detection. Repeated keep-alive + # polls at the same percentage must not hide a stuck download forever. with _progress_lock: - _last_activity[book_id] = time.time() + last_progress = _last_progress_value.get(book_id) + if last_progress is None or progress != last_progress: + _last_activity[book_id] = time.time() + _last_progress_value[book_id] = progress # Broadcast progress via WebSocket with throttling if ws_manager: @@ -728,13 +734,11 @@ def update_download_status(book_id: str, status: str, message: Optional[str] = N except ValueError: return - # Always update activity timestamp (used by stall detection) even if the status - # event is a duplicate keep-alive update. with _progress_lock: - _last_activity[book_id] = time.time() status_event = (status_key, message) if _last_status_event.get(book_id) == status_event: return + _last_activity[book_id] = time.time() _last_status_event[book_id] = status_event # Update status message first so terminal snapshots capture the final message @@ -812,6 +816,7 @@ def _cleanup_progress_tracking(task_id: str) -> None: _progress_last_broadcast.pop(task_id, None) _progress_last_broadcast.pop(f"{task_id}_progress", None) _last_activity.pop(task_id, None) + _last_progress_value.pop(task_id, None) _last_status_event.pop(task_id, None) @@ -892,70 +897,77 @@ def concurrent_download_loop() -> None: stalled_tasks: set[str] = set() # Track tasks already cancelled due to stall while True: - # Clean up completed futures - completed_futures = [f for f in active_futures if f.done()] - for future in completed_futures: - task_id = active_futures.pop(future) - stalled_tasks.discard(task_id) - try: - future.result() # This will raise any exceptions from the worker - except Exception as e: - logger.error_trace(f"Future exception for {task_id}: {e}") - - # Check for stalled downloads (no activity in STALL_TIMEOUT seconds) - current_time = time.time() - with _progress_lock: - for future, task_id in list(active_futures.items()): - if task_id in stalled_tasks: - continue - last_active = _last_activity.get(task_id, current_time) - if current_time - last_active > STALL_TIMEOUT: - logger.warning(f"Download stalled for {task_id}, cancelling") - book_queue.cancel_download(task_id) - book_queue.update_status_message(task_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)") - stalled_tasks.add(task_id) - - # Start new downloads if we have capacity - while len(active_futures) < max_workers: - next_download = book_queue.get_next() - if not next_download: - break - - # Stagger concurrent downloads to avoid rate limiting on shared download servers - # Only delay if other downloads are already active - if active_futures: - stagger_delay = random.uniform(2, 5) - logger.debug(f"Staggering download start by {stagger_delay:.1f}s") - time.sleep(stagger_delay) - - task_id, cancel_flag = next_download - - # Submit download job to thread pool - future = executor.submit(_process_single_download, task_id, cancel_flag) - active_futures[future] = task_id - - # Brief sleep to prevent busy waiting - time.sleep(config.MAIN_LOOP_SLEEP_TIME) + try: + # Clean up completed futures + completed_futures = [f for f in active_futures if f.done()] + for future in completed_futures: + task_id = active_futures.pop(future) + stalled_tasks.discard(task_id) + try: + future.result() # This will raise any exceptions from the worker + except Exception as e: + logger.error_trace(f"Future exception for {task_id}: {e}") + + # Check for stalled downloads (no activity in STALL_TIMEOUT seconds) + current_time = time.time() + with _progress_lock: + for future, task_id in list(active_futures.items()): + if task_id in stalled_tasks: + continue + last_active = _last_activity.get(task_id, current_time) + if current_time - last_active > STALL_TIMEOUT: + logger.warning(f"Download stalled for {task_id}, cancelling") + book_queue.cancel_download(task_id) + book_queue.update_status_message(task_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)") + stalled_tasks.add(task_id) + + # Start new downloads if we have capacity + while len(active_futures) < max_workers: + next_download = book_queue.get_next() + if not next_download: + break + + # Stagger concurrent downloads to avoid rate limiting on shared download servers + # Only delay if other downloads are already active + if active_futures: + stagger_delay = random.uniform(2, 5) + logger.debug(f"Staggering download start by {stagger_delay:.1f}s") + time.sleep(stagger_delay) + + task_id, cancel_flag = next_download + + # Submit download job to thread pool + future = executor.submit(_process_single_download, task_id, cancel_flag) + active_futures[future] = task_id + + # Brief sleep to prevent busy waiting + time.sleep(config.MAIN_LOOP_SLEEP_TIME) + except Exception as e: + logger.error_trace("Download coordinator loop error: %s", e) + time.sleep(COORDINATOR_LOOP_ERROR_RETRY_DELAY) # Download coordinator thread (started explicitly via start()) _coordinator_thread: Optional[threading.Thread] = None -_started = False +_coordinator_lock = Lock() def start() -> None: """Start the download coordinator thread. Safe to call multiple times.""" - global _coordinator_thread, _started + global _coordinator_thread - if _started: - logger.debug("Download coordinator already started") - return + with _coordinator_lock: + if _coordinator_thread is not None and _coordinator_thread.is_alive(): + logger.debug("Download coordinator already started") + return - _coordinator_thread = threading.Thread( - target=concurrent_download_loop, - daemon=True, - name="DownloadCoordinator" - ) - _coordinator_thread.start() - _started = True + if _coordinator_thread is not None: + logger.warning("Download coordinator thread is not alive; starting a new one") + + _coordinator_thread = threading.Thread( + target=concurrent_download_loop, + daemon=True, + name="DownloadCoordinator" + ) + _coordinator_thread.start() logger.info(f"Download coordinator started with {config.MAX_CONCURRENT_DOWNLOADS} concurrent workers") diff --git a/tests/download/test_orchestrator_lifecycle.py b/tests/download/test_orchestrator_lifecycle.py new file mode 100644 index 00000000..6d4513c1 --- /dev/null +++ b/tests/download/test_orchestrator_lifecycle.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +from unittest.mock import ANY, MagicMock + +import pytest + + +class _StopLoop(BaseException): + """Sentinel used to stop the infinite coordinator loop during tests.""" + + +class _FakeExecutor: + def __init__(self, *args, **kwargs) -> None: + self.args = args + self.kwargs = kwargs + + def __enter__(self) -> _FakeExecutor: + return self + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + def submit(self, *args, **kwargs): # pragma: no cover - not expected in these tests + raise AssertionError("submit() should not be called in this test") + + +class _StopCoordinator(BaseException): + """Sentinel used to stop a real coordinator thread cleanly in tests.""" + + +def test_concurrent_download_loop_logs_and_recovers_after_loop_error(monkeypatch): + import shelfmark.download.orchestrator as orchestrator + + call_count = 0 + + def fake_get_next(): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("boom") + return None + + sleep_delays: list[float] = [] + + def fake_sleep(delay: float) -> None: + sleep_delays.append(delay) + if len(sleep_delays) >= 2: + raise _StopLoop() + + mock_queue = MagicMock() + mock_queue.get_next.side_effect = fake_get_next + + error_trace = MagicMock() + + monkeypatch.setattr(orchestrator, "book_queue", mock_queue) + monkeypatch.setattr(orchestrator, "ThreadPoolExecutor", _FakeExecutor) + monkeypatch.setattr(orchestrator.time, "sleep", fake_sleep) + monkeypatch.setattr(orchestrator.logger, "error_trace", error_trace) + + with pytest.raises(_StopLoop): + orchestrator.concurrent_download_loop() + + assert mock_queue.get_next.call_count == 2 + error_trace.assert_called_once_with("Download coordinator loop error: %s", ANY) + assert sleep_delays == [ + orchestrator.COORDINATOR_LOOP_ERROR_RETRY_DELAY, + orchestrator.config.MAIN_LOOP_SLEEP_TIME, + ] + + +def test_concurrent_download_loop_recovers_and_processes_task_after_transient_loop_error(monkeypatch): + import threading + + import shelfmark.download.orchestrator as orchestrator + + processed = threading.Event() + + class FlakyQueue: + def __init__(self) -> None: + self.calls = 0 + + def get_next(self): + self.calls += 1 + if self.calls == 1: + raise RuntimeError("boom") + if self.calls == 2: + return ("task-1", threading.Event()) + if processed.is_set(): + raise _StopCoordinator() + return None + + def cancel_download(self, task_id: str) -> None: # pragma: no cover - unused + raise AssertionError(f"cancel_download unexpectedly called for {task_id}") + + def update_status_message(self, task_id: str, message: str) -> None: # pragma: no cover - unused + raise AssertionError(f"update_status_message unexpectedly called for {task_id}: {message}") + + queue = FlakyQueue() + error_trace = MagicMock() + + monkeypatch.setattr(orchestrator, "book_queue", queue) + monkeypatch.setattr( + orchestrator, + "_process_single_download", + lambda task_id, cancel_flag: processed.set(), + ) + monkeypatch.setattr(orchestrator.logger, "error_trace", error_trace) + monkeypatch.setattr(orchestrator, "COORDINATOR_LOOP_ERROR_RETRY_DELAY", 0.01) + monkeypatch.setattr(orchestrator.config, "MAX_CONCURRENT_DOWNLOADS", 1, raising=False) + monkeypatch.setattr(orchestrator.config, "MAIN_LOOP_SLEEP_TIME", 0.01, raising=False) + + def run_loop() -> None: + try: + orchestrator.concurrent_download_loop() + except _StopCoordinator: + pass + + thread = threading.Thread(target=run_loop, daemon=True, name="TestDownloadCoordinator") + thread.start() + + assert processed.wait(timeout=1.0) is True + thread.join(timeout=1.0) + + assert thread.is_alive() is False + assert queue.calls >= 3 + error_trace.assert_called_once_with("Download coordinator loop error: %s", ANY) + + +def test_start_replaces_dead_coordinator_thread(monkeypatch): + import shelfmark.download.orchestrator as orchestrator + + dead_thread = MagicMock() + dead_thread.is_alive.return_value = False + + new_thread = MagicMock() + new_thread.is_alive.return_value = True + + thread_factory = MagicMock(return_value=new_thread) + + monkeypatch.setattr(orchestrator, "_coordinator_thread", dead_thread) + monkeypatch.setattr(orchestrator.threading, "Thread", thread_factory) + + orchestrator.start() + + thread_factory.assert_called_once_with( + target=orchestrator.concurrent_download_loop, + daemon=True, + name="DownloadCoordinator", + ) + new_thread.start.assert_called_once_with() + assert orchestrator._coordinator_thread is new_thread diff --git a/tests/download/test_orchestrator_status.py b/tests/download/test_orchestrator_status.py index 2856d9c0..96b47069 100644 --- a/tests/download/test_orchestrator_status.py +++ b/tests/download/test_orchestrator_status.py @@ -8,6 +8,7 @@ def test_update_download_status_dedupes_identical_events(monkeypatch): # Ensure clean module-level state orchestrator._last_activity.clear() + orchestrator._last_progress_value.clear() orchestrator._last_status_event.clear() mock_queue = MagicMock() @@ -28,6 +29,52 @@ def test_update_download_status_dedupes_identical_events(monkeypatch): assert mock_queue.update_status_message.call_count == 1 assert mock_ws.broadcast_status_update.call_count == 1 - # Activity timestamp should still be updated on the duplicate keep-alive call. - assert orchestrator._last_activity[book_id] == 2.0 + # Duplicate keep-alives should not refresh stall activity. + assert orchestrator._last_activity[book_id] == 1.0 + +def test_update_download_progress_dedupes_identical_progress_for_activity(monkeypatch): + import shelfmark.download.orchestrator as orchestrator + + book_id = "test-progress-book" + + orchestrator._last_activity.clear() + orchestrator._last_progress_value.clear() + orchestrator._last_status_event.clear() + + mock_queue = MagicMock() + monkeypatch.setattr(orchestrator, "book_queue", mock_queue) + monkeypatch.setattr(orchestrator, "ws_manager", None) + + times = iter([10.0, 20.0]) + monkeypatch.setattr(orchestrator.time, "time", lambda: next(times)) + + orchestrator.update_download_progress(book_id, 0.0) + orchestrator.update_download_progress(book_id, 0.0) + + assert mock_queue.update_progress.call_count == 2 + assert orchestrator._last_activity[book_id] == 10.0 + assert orchestrator._last_progress_value[book_id] == 0.0 + + +def test_update_download_progress_refreshes_activity_when_progress_changes(monkeypatch): + import shelfmark.download.orchestrator as orchestrator + + book_id = "test-progress-change" + + orchestrator._last_activity.clear() + orchestrator._last_progress_value.clear() + orchestrator._last_status_event.clear() + + mock_queue = MagicMock() + monkeypatch.setattr(orchestrator, "book_queue", mock_queue) + monkeypatch.setattr(orchestrator, "ws_manager", None) + + times = iter([30.0, 40.0]) + monkeypatch.setattr(orchestrator.time, "time", lambda: next(times)) + + orchestrator.update_download_progress(book_id, 0.0) + orchestrator.update_download_progress(book_id, 0.5) + + assert orchestrator._last_activity[book_id] == 40.0 + assert orchestrator._last_progress_value[book_id] == 0.5