Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 73 additions & 61 deletions shelfmark/download/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@

# Stall detection - track last activity time per download
_last_activity: Dict[str, float] = {}
_last_progress_value: Dict[str, float] = {}
# De-duplicate status updates (keep-alive updates shouldn't spam clients)
_last_status_event: Dict[str, Tuple[str, Optional[str]]] = {}
STALL_TIMEOUT = 300 # 5 minutes without progress/status update = stalled
COORDINATOR_LOOP_ERROR_RETRY_DELAY = 1.0

def _is_plain_email_address(value: str) -> bool:
parsed = parseaddr(value or "")[1]
Expand Down Expand Up @@ -687,9 +689,13 @@ def update_download_progress(book_id: str, progress: float) -> None:
"""Update download progress with throttled WebSocket broadcasts."""
book_queue.update_progress(book_id, progress)

# Track activity for stall detection
# Only real progress changes should reset stall detection. Repeated keep-alive
# polls at the same percentage must not hide a stuck download forever.
with _progress_lock:
_last_activity[book_id] = time.time()
last_progress = _last_progress_value.get(book_id)
if last_progress is None or progress != last_progress:
_last_activity[book_id] = time.time()
_last_progress_value[book_id] = progress

# Broadcast progress via WebSocket with throttling
if ws_manager:
Expand Down Expand Up @@ -728,13 +734,11 @@ def update_download_status(book_id: str, status: str, message: Optional[str] = N
except ValueError:
return

# Always update activity timestamp (used by stall detection) even if the status
# event is a duplicate keep-alive update.
with _progress_lock:
_last_activity[book_id] = time.time()
status_event = (status_key, message)
if _last_status_event.get(book_id) == status_event:
return
_last_activity[book_id] = time.time()
_last_status_event[book_id] = status_event

# Update status message first so terminal snapshots capture the final message
Expand Down Expand Up @@ -812,6 +816,7 @@ def _cleanup_progress_tracking(task_id: str) -> None:
_progress_last_broadcast.pop(task_id, None)
_progress_last_broadcast.pop(f"{task_id}_progress", None)
_last_activity.pop(task_id, None)
_last_progress_value.pop(task_id, None)
_last_status_event.pop(task_id, None)


Expand Down Expand Up @@ -892,70 +897,77 @@ def concurrent_download_loop() -> None:
stalled_tasks: set[str] = set() # Track tasks already cancelled due to stall

while True:
# Clean up completed futures
completed_futures = [f for f in active_futures if f.done()]
for future in completed_futures:
task_id = active_futures.pop(future)
stalled_tasks.discard(task_id)
try:
future.result() # This will raise any exceptions from the worker
except Exception as e:
logger.error_trace(f"Future exception for {task_id}: {e}")

# Check for stalled downloads (no activity in STALL_TIMEOUT seconds)
current_time = time.time()
with _progress_lock:
for future, task_id in list(active_futures.items()):
if task_id in stalled_tasks:
continue
last_active = _last_activity.get(task_id, current_time)
if current_time - last_active > STALL_TIMEOUT:
logger.warning(f"Download stalled for {task_id}, cancelling")
book_queue.cancel_download(task_id)
book_queue.update_status_message(task_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)")
stalled_tasks.add(task_id)

# Start new downloads if we have capacity
while len(active_futures) < max_workers:
next_download = book_queue.get_next()
if not next_download:
break

# Stagger concurrent downloads to avoid rate limiting on shared download servers
# Only delay if other downloads are already active
if active_futures:
stagger_delay = random.uniform(2, 5)
logger.debug(f"Staggering download start by {stagger_delay:.1f}s")
time.sleep(stagger_delay)

task_id, cancel_flag = next_download

# Submit download job to thread pool
future = executor.submit(_process_single_download, task_id, cancel_flag)
active_futures[future] = task_id

# Brief sleep to prevent busy waiting
time.sleep(config.MAIN_LOOP_SLEEP_TIME)
try:
# Clean up completed futures
completed_futures = [f for f in active_futures if f.done()]
for future in completed_futures:
task_id = active_futures.pop(future)
stalled_tasks.discard(task_id)
try:
future.result() # This will raise any exceptions from the worker
except Exception as e:
logger.error_trace(f"Future exception for {task_id}: {e}")

# Check for stalled downloads (no activity in STALL_TIMEOUT seconds)
current_time = time.time()
with _progress_lock:
for future, task_id in list(active_futures.items()):
if task_id in stalled_tasks:
continue
last_active = _last_activity.get(task_id, current_time)
if current_time - last_active > STALL_TIMEOUT:
logger.warning(f"Download stalled for {task_id}, cancelling")
book_queue.cancel_download(task_id)
book_queue.update_status_message(task_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)")
stalled_tasks.add(task_id)

# Start new downloads if we have capacity
while len(active_futures) < max_workers:
next_download = book_queue.get_next()
if not next_download:
break

# Stagger concurrent downloads to avoid rate limiting on shared download servers
# Only delay if other downloads are already active
if active_futures:
stagger_delay = random.uniform(2, 5)
logger.debug(f"Staggering download start by {stagger_delay:.1f}s")
time.sleep(stagger_delay)

task_id, cancel_flag = next_download

# Submit download job to thread pool
future = executor.submit(_process_single_download, task_id, cancel_flag)
active_futures[future] = task_id

# Brief sleep to prevent busy waiting
time.sleep(config.MAIN_LOOP_SLEEP_TIME)
except Exception as e:
logger.error_trace("Download coordinator loop error: %s", e)
time.sleep(COORDINATOR_LOOP_ERROR_RETRY_DELAY)

# Download coordinator thread (started explicitly via start())
_coordinator_thread: Optional[threading.Thread] = None
_started = False
_coordinator_lock = Lock()


def start() -> None:
"""Start the download coordinator thread. Safe to call multiple times."""
global _coordinator_thread, _started
global _coordinator_thread

if _started:
logger.debug("Download coordinator already started")
return
with _coordinator_lock:
if _coordinator_thread is not None and _coordinator_thread.is_alive():
logger.debug("Download coordinator already started")
return

_coordinator_thread = threading.Thread(
target=concurrent_download_loop,
daemon=True,
name="DownloadCoordinator"
)
_coordinator_thread.start()
_started = True
if _coordinator_thread is not None:
logger.warning("Download coordinator thread is not alive; starting a new one")

_coordinator_thread = threading.Thread(
target=concurrent_download_loop,
daemon=True,
name="DownloadCoordinator"
)
_coordinator_thread.start()

logger.info(f"Download coordinator started with {config.MAX_CONCURRENT_DOWNLOADS} concurrent workers")
151 changes: 151 additions & 0 deletions tests/download/test_orchestrator_lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import annotations

from unittest.mock import ANY, MagicMock

import pytest


class _StopLoop(BaseException):
"""Sentinel used to stop the infinite coordinator loop during tests."""


class _FakeExecutor:
def __init__(self, *args, **kwargs) -> None:
self.args = args
self.kwargs = kwargs

def __enter__(self) -> _FakeExecutor:
return self

def __exit__(self, exc_type, exc, tb) -> bool:
return False

def submit(self, *args, **kwargs): # pragma: no cover - not expected in these tests
raise AssertionError("submit() should not be called in this test")


class _StopCoordinator(BaseException):
"""Sentinel used to stop a real coordinator thread cleanly in tests."""


def test_concurrent_download_loop_logs_and_recovers_after_loop_error(monkeypatch):
import shelfmark.download.orchestrator as orchestrator

call_count = 0

def fake_get_next():
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("boom")
return None

sleep_delays: list[float] = []

def fake_sleep(delay: float) -> None:
sleep_delays.append(delay)
if len(sleep_delays) >= 2:
raise _StopLoop()

mock_queue = MagicMock()
mock_queue.get_next.side_effect = fake_get_next

error_trace = MagicMock()

monkeypatch.setattr(orchestrator, "book_queue", mock_queue)
monkeypatch.setattr(orchestrator, "ThreadPoolExecutor", _FakeExecutor)
monkeypatch.setattr(orchestrator.time, "sleep", fake_sleep)
monkeypatch.setattr(orchestrator.logger, "error_trace", error_trace)

with pytest.raises(_StopLoop):
orchestrator.concurrent_download_loop()

assert mock_queue.get_next.call_count == 2
error_trace.assert_called_once_with("Download coordinator loop error: %s", ANY)
assert sleep_delays == [
orchestrator.COORDINATOR_LOOP_ERROR_RETRY_DELAY,
orchestrator.config.MAIN_LOOP_SLEEP_TIME,
]


def test_concurrent_download_loop_recovers_and_processes_task_after_transient_loop_error(monkeypatch):
import threading

import shelfmark.download.orchestrator as orchestrator

processed = threading.Event()

class FlakyQueue:
def __init__(self) -> None:
self.calls = 0

def get_next(self):
self.calls += 1
if self.calls == 1:
raise RuntimeError("boom")
if self.calls == 2:
return ("task-1", threading.Event())
if processed.is_set():
raise _StopCoordinator()
return None

def cancel_download(self, task_id: str) -> None: # pragma: no cover - unused
raise AssertionError(f"cancel_download unexpectedly called for {task_id}")

def update_status_message(self, task_id: str, message: str) -> None: # pragma: no cover - unused
raise AssertionError(f"update_status_message unexpectedly called for {task_id}: {message}")

queue = FlakyQueue()
error_trace = MagicMock()

monkeypatch.setattr(orchestrator, "book_queue", queue)
monkeypatch.setattr(
orchestrator,
"_process_single_download",
lambda task_id, cancel_flag: processed.set(),
)
monkeypatch.setattr(orchestrator.logger, "error_trace", error_trace)
monkeypatch.setattr(orchestrator, "COORDINATOR_LOOP_ERROR_RETRY_DELAY", 0.01)
monkeypatch.setattr(orchestrator.config, "MAX_CONCURRENT_DOWNLOADS", 1, raising=False)
monkeypatch.setattr(orchestrator.config, "MAIN_LOOP_SLEEP_TIME", 0.01, raising=False)

def run_loop() -> None:
try:
orchestrator.concurrent_download_loop()
except _StopCoordinator:
pass

thread = threading.Thread(target=run_loop, daemon=True, name="TestDownloadCoordinator")
thread.start()

assert processed.wait(timeout=1.0) is True
thread.join(timeout=1.0)

assert thread.is_alive() is False
assert queue.calls >= 3
error_trace.assert_called_once_with("Download coordinator loop error: %s", ANY)


def test_start_replaces_dead_coordinator_thread(monkeypatch):
import shelfmark.download.orchestrator as orchestrator

dead_thread = MagicMock()
dead_thread.is_alive.return_value = False

new_thread = MagicMock()
new_thread.is_alive.return_value = True

thread_factory = MagicMock(return_value=new_thread)

monkeypatch.setattr(orchestrator, "_coordinator_thread", dead_thread)
monkeypatch.setattr(orchestrator.threading, "Thread", thread_factory)

orchestrator.start()

thread_factory.assert_called_once_with(
target=orchestrator.concurrent_download_loop,
daemon=True,
name="DownloadCoordinator",
)
new_thread.start.assert_called_once_with()
assert orchestrator._coordinator_thread is new_thread
51 changes: 49 additions & 2 deletions tests/download/test_orchestrator_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def test_update_download_status_dedupes_identical_events(monkeypatch):

# Ensure clean module-level state
orchestrator._last_activity.clear()
orchestrator._last_progress_value.clear()
orchestrator._last_status_event.clear()

mock_queue = MagicMock()
Expand All @@ -28,6 +29,52 @@ def test_update_download_status_dedupes_identical_events(monkeypatch):
assert mock_queue.update_status_message.call_count == 1
assert mock_ws.broadcast_status_update.call_count == 1

# Activity timestamp should still be updated on the duplicate keep-alive call.
assert orchestrator._last_activity[book_id] == 2.0
# Duplicate keep-alives should not refresh stall activity.
assert orchestrator._last_activity[book_id] == 1.0


def test_update_download_progress_dedupes_identical_progress_for_activity(monkeypatch):
import shelfmark.download.orchestrator as orchestrator

book_id = "test-progress-book"

orchestrator._last_activity.clear()
orchestrator._last_progress_value.clear()
orchestrator._last_status_event.clear()

mock_queue = MagicMock()
monkeypatch.setattr(orchestrator, "book_queue", mock_queue)
monkeypatch.setattr(orchestrator, "ws_manager", None)

times = iter([10.0, 20.0])
monkeypatch.setattr(orchestrator.time, "time", lambda: next(times))

orchestrator.update_download_progress(book_id, 0.0)
orchestrator.update_download_progress(book_id, 0.0)

assert mock_queue.update_progress.call_count == 2
assert orchestrator._last_activity[book_id] == 10.0
assert orchestrator._last_progress_value[book_id] == 0.0


def test_update_download_progress_refreshes_activity_when_progress_changes(monkeypatch):
import shelfmark.download.orchestrator as orchestrator

book_id = "test-progress-change"

orchestrator._last_activity.clear()
orchestrator._last_progress_value.clear()
orchestrator._last_status_event.clear()

mock_queue = MagicMock()
monkeypatch.setattr(orchestrator, "book_queue", mock_queue)
monkeypatch.setattr(orchestrator, "ws_manager", None)

times = iter([30.0, 40.0])
monkeypatch.setattr(orchestrator.time, "time", lambda: next(times))

orchestrator.update_download_progress(book_id, 0.0)
orchestrator.update_download_progress(book_id, 0.5)

assert orchestrator._last_activity[book_id] == 40.0
assert orchestrator._last_progress_value[book_id] == 0.5
Loading