From 72990286c86b2e9a744a2166f9cfbbe1c01f0787 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 16:49:54 -0700 Subject: [PATCH 1/7] fix(monitor): durable sample-upload retries with bounded backlog A transient R2 PUT failure during _upload_to_r2 dropped sample data for up to 10 steps because the previous code only retried 3 times within ~40s and then discarded the parquet. With log_extras.interval=10 this meant a single hiccup blanked the dashboard data tab for an entire run prefix. - Replace the hand-rolled per-call retry loops in _upload_to_r2, _request_presigned_url and _confirm_samples_upload with tenacity AsyncRetrying. 6 attempts capped at 120s wall-clock, exponential backoff with jitter, retrying only TransportError, RemoteProtocolError and 408/429/5xx. Non-retryable errors fail fast. - Move sample uploads through a bounded FIFO backlog (max 5) on the monitor's bg event loop. log_samples now enqueues the parquet bytes and triggers a serialized drain that processes oldest-first. Failed steps stay in the backlog and are reattempted on the next log_samples tick instead of being silently dropped. - Log the full exception via loguru opt(exception=True) when an upload is parked in the backlog so the actual error type is visible instead of an opaque OSError Errno 0. --- src/prime_rl/utils/monitor/prime.py | 212 +++++++++++++++++----------- 1 file changed, 131 insertions(+), 81 deletions(-) diff --git a/src/prime_rl/utils/monitor/prime.py b/src/prime_rl/utils/monitor/prime.py index 7e63b6c2d7..dbef7265d6 100644 --- a/src/prime_rl/utils/monitor/prime.py +++ b/src/prime_rl/utils/monitor/prime.py @@ -4,6 +4,7 @@ import math import os import time +from collections import deque from datetime import datetime, timezone from pathlib import Path from threading import Thread @@ -14,6 +15,14 @@ import pyarrow.parquet as pq import verifiers as vf from prime_cli.core.config import Config as PrimeConfig +from tenacity import ( + AsyncRetrying, + retry_if_exception, + stop_after_attempt, + stop_after_delay, + wait_exponential, + wait_random, +) from transformers.tokenization_utils import PreTrainedTokenizer from prime_rl.configs.shared import PrimeMonitorConfig @@ -21,6 +30,28 @@ from prime_rl.utils.logger import get_logger from prime_rl.utils.monitor.base import Monitor, sample_items_for_logging +_RETRYABLE_HTTPX_EXC = (httpx.TransportError, httpx.RemoteProtocolError) + + +def _is_retryable_upload_error(exc: BaseException) -> bool: + if isinstance(exc, _RETRYABLE_HTTPX_EXC): + return True + if isinstance(exc, httpx.HTTPStatusError): + code = exc.response.status_code + return code in (408, 429) or 500 <= code < 600 + return False + + +# Per-HTTP-call retry budget. Wall-clock cap dominates so a single call cannot +# stall the upload pipeline indefinitely. +_UPLOAD_MAX_ATTEMPTS = 6 +_UPLOAD_MAX_DELAY_S = 120.0 +_UPLOAD_BACKOFF_MAX_S = 30.0 + +# Bounded backlog of (step, parquet_bytes) waiting to be uploaded. Sized to +# survive a few intervals of R2 unavailability without unbounded memory growth. +_MAX_PENDING_SAMPLE_UPLOADS = 5 + def _json(val: Any) -> str: """JSON-serialize dicts/lists, pass strings through, default to empty string for None.""" @@ -287,7 +318,12 @@ def log(self, metrics: dict[str, Any], step: int) -> None: ) def log_samples(self, rollouts: list[vf.RolloutOutput], step: int) -> None: - """Logs rollouts to Prime Intellect API using presigned URLs for direct R2 upload.""" + """Logs rollouts to Prime Intellect API using presigned URLs for direct R2 upload. + + Adds the new step to a bounded backlog and schedules a drain on the background + event loop. Failed uploads stay in the backlog and are retried on the next + log_samples call, so a transient R2 outage does not silently drop data. + """ if not self.is_master: return if not self.enabled: @@ -321,12 +357,11 @@ def log_samples(self, rollouts: list[vf.RolloutOutput], step: int) -> None: return self._pending_sample_steps.add(step) - - # Use presigned URL flow for uploading samples - self._upload_samples_via_presigned_url(parquet_bytes, step) + self._enqueue_and_drain_samples(step, parquet_bytes) self.logger.debug( - f"Initiated samples upload at step {step} to Prime Intellect API in {time.perf_counter() - start_time:.2f}s" + f"Queued samples upload for step {step} in {time.perf_counter() - start_time:.2f}s " + f"(backlog={len(self._sample_upload_queue) + 1})" ) def _rollouts_to_parquet_bytes(self, rollouts: list[vf.RolloutOutput], step: int) -> bytes | None: @@ -392,105 +427,116 @@ def _rollouts_to_parquet_bytes(self, rollouts: list[vf.RolloutOutput], step: int pq.write_table(table, buf, compression="snappy", use_dictionary=True, write_statistics=True) return buf.getvalue() - def _upload_samples_via_presigned_url(self, parquet_bytes: bytes, step: int) -> None: - """Upload Parquet samples using presigned URL flow (fire-and-forget).""" + def _enqueue_and_drain_samples(self, step: int, parquet_bytes: bytes) -> None: + """Append a step's parquet to the backlog and trigger a drain on the bg loop.""" future = asyncio.run_coroutine_threadsafe( - self._upload_samples_via_presigned_url_async(parquet_bytes, step), + self._enqueue_and_drain_samples_async(step, parquet_bytes), self._loop, ) self._pending_futures.append(future) - # Clean up completed futures to avoid memory growth self._pending_futures = [f for f in self._pending_futures if not f.done()] - async def _upload_samples_via_presigned_url_async( - self, parquet_bytes: bytes, step: int, max_retries: int = 3 - ) -> None: - """Upload Parquet bytes via presigned URL flow.""" - try: - presign_data = await self._request_presigned_url(step) - if not presign_data: - self.logger.warning(f"Failed to get presigned URL for samples at step {step}") - return - - presigned_url = presign_data["presigned_url"] - s3_key = presign_data["s3_key"] + async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes) -> None: + """Backlog-aware upload: enqueue, then drain oldest-first under a lock.""" + if self._sample_upload_lock is None: + self._sample_upload_lock = asyncio.Lock() - upload_success = await self._upload_to_r2( - presigned_url, parquet_bytes, content_type="application/parquet", max_retries=max_retries + self._sample_upload_queue.append((step, parquet_bytes)) + while len(self._sample_upload_queue) > _MAX_PENDING_SAMPLE_UPLOADS: + dropped_step, _ = self._sample_upload_queue.popleft() + self._pending_sample_steps.discard(dropped_step) + self.logger.warning( + f"Sample upload backlog exceeded {_MAX_PENDING_SAMPLE_UPLOADS}, " + f"dropping oldest queued step {dropped_step}" ) - if not upload_success: - self.logger.warning(f"Failed to upload samples to R2 at step {step}") - return - confirm_success = await self._confirm_samples_upload(step, s3_key) - if not confirm_success: - self.logger.warning(f"Failed to confirm samples upload at step {step}") - return + async with self._sample_upload_lock: + while self._sample_upload_queue: + pending_step, pending_bytes = self._sample_upload_queue[0] + start = time.perf_counter() + try: + await self._upload_one_sample_step(pending_step, pending_bytes) + except Exception as e: + self.logger.opt(exception=True).warning( + f"Sample upload for step {pending_step} failed after retries; " + f"keeping in backlog (size={len(self._sample_upload_queue)}): " + f"{type(e).__name__}: {e}" + ) + # Stop draining; will retry on next log_samples tick. + return - self.last_log_samples_step = step - self.logger.debug(f"Successfully completed samples upload at step {step}") + # Success: pop and update bookkeeping. + self._sample_upload_queue.popleft() + self._pending_sample_steps.discard(pending_step) + self.last_log_samples_step = pending_step + self.logger.debug(f"Uploaded samples for step {pending_step} in {time.perf_counter() - start:.2f}s") - except Exception as e: - self.logger.warning(f"Failed to upload samples via presigned URL at step {step}: {type(e).__name__}: {e}") - finally: - self._pending_sample_steps.discard(step) + async def _upload_one_sample_step(self, step: int, parquet_bytes: bytes) -> None: + """Run presign → R2 PUT → confirm for a single step. Raises on final failure.""" + presign_data = await self._request_presigned_url(step) + presigned_url = presign_data["presigned_url"] + s3_key = presign_data["s3_key"] - async def _request_presigned_url(self, step: int) -> dict[str, Any] | None: - """Request a presigned URL from the backend.""" - try: - response = await self._client.post( - f"{self.base_url}/samples/presign", - headers=self._headers, - json={"run_id": self.run_id, "step": step}, + await self._upload_to_r2(presigned_url, parquet_bytes, step, content_type="application/parquet") + await self._confirm_samples_upload(step, s3_key) + + def _retry_policy(self, op_name: str, step: int) -> AsyncRetrying: + """Shared tenacity policy for sample-upload HTTP calls.""" + + def _log_retry(retry_state) -> None: + exc = retry_state.outcome.exception() if retry_state.outcome else None + self.logger.warning( + f"Retrying {op_name} for step {step} " + f"(attempt {retry_state.attempt_number}/{_UPLOAD_MAX_ATTEMPTS}): " + f"{type(exc).__name__ if exc else 'unknown'}: {exc}" ) - response.raise_for_status() - response_data = response.json()["data"] - return { - "presigned_url": response_data["presignedUrl"], - "s3_key": response_data["s3Key"], - } - except Exception as e: - self.logger.warning(f"Failed to request presigned URL: {type(e).__name__}: {e}") - return None + + return AsyncRetrying( + retry=retry_if_exception(_is_retryable_upload_error), + stop=stop_after_attempt(_UPLOAD_MAX_ATTEMPTS) | stop_after_delay(_UPLOAD_MAX_DELAY_S), + wait=wait_exponential(multiplier=1, min=1, max=_UPLOAD_BACKOFF_MAX_S) + wait_random(0, 1), + before_sleep=_log_retry, + reraise=True, + ) + + async def _request_presigned_url(self, step: int) -> dict[str, Any]: + """Request a presigned URL from the backend. Raises on final failure.""" + async for attempt in self._retry_policy("samples/presign", step): + with attempt: + response = await self._client.post( + f"{self.base_url}/samples/presign", + headers=self._headers, + json={"run_id": self.run_id, "step": step}, + ) + response.raise_for_status() + response_data = response.json()["data"] + return { + "presigned_url": response_data["presignedUrl"], + "s3_key": response_data["s3Key"], + } + raise RuntimeError("retry loop exited without returning") async def _upload_to_r2( - self, presigned_url: str, data: bytes, content_type: str = "application/json", max_retries: int = 3 - ) -> bool: - """Upload data to R2 using presigned URL.""" - for attempt in range(max_retries): - try: + self, presigned_url: str, data: bytes, step: int, content_type: str = "application/json" + ) -> None: + """Upload data to R2 via presigned URL. Raises on final failure.""" + async for attempt in self._retry_policy("R2 PUT", step): + with attempt: response = await self._client.put(presigned_url, content=data, headers={"Content-Type": content_type}) response.raise_for_status() - return True - except Exception as e: - if attempt == max_retries - 1: - self.logger.warning(f"Failed to upload to R2 after {max_retries} attempts: {type(e).__name__}: {e}") - return False - delay = 2**attempt - self.logger.debug(f"Retrying R2 upload in {delay}s (attempt {attempt + 1}/{max_retries})") - await asyncio.sleep(delay) - - async def _confirm_samples_upload(self, step: int, s3_key: str, max_retries: int = 3) -> bool: - """Confirm samples upload with the backend. Returns True on success.""" - for attempt in range(max_retries): - try: + return + + async def _confirm_samples_upload(self, step: int, s3_key: str) -> None: + """Confirm samples upload with the backend. Raises on final failure.""" + async for attempt in self._retry_policy("samples/confirm", step): + with attempt: response = await self._client.post( f"{self.base_url}/samples/confirm", headers=self._headers, json={"run_id": self.run_id, "step": step, "s3_key": s3_key}, ) response.raise_for_status() - return True - except Exception as e: - if attempt == max_retries - 1: - self.logger.warning( - f"Failed to confirm samples upload after {max_retries} attempts: {type(e).__name__}: {e}" - ) - return False - delay = 2**attempt - self.logger.debug(f"Retrying samples confirm in {delay}s (attempt {attempt + 1}/{max_retries})") - await asyncio.sleep(delay) - return False + return def log_eval_samples(self, rollouts: list[vf.RolloutOutput], env_name: str, step: int) -> None: pass @@ -615,6 +661,10 @@ def _init_async_client(self) -> None: self._thread.start() self._client = httpx.AsyncClient(timeout=30) self._pending_futures: list[asyncio.Future] = [] + # Sample-upload backlog. Lock is constructed lazily on the bg loop to bind + # to the right asyncio loop after fork. + self._sample_upload_queue: deque[tuple[int, bytes]] = deque() + self._sample_upload_lock: asyncio.Lock | None = None if hasattr(self, "_pending_sample_steps") and self._pending_sample_steps: self._pending_sample_steps.clear() From 71ee1d1b259447abb6e97552362660e715b29835 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 16:59:12 -0700 Subject: [PATCH 2/7] fix(monitor): pop in-flight sample upload out of queue before awaiting Codex review on #2425 caught a race: backlog eviction runs without the upload lock, so a concurrent log_samples() call could popleft the head the drain coroutine was peeking at and currently uploading. The drain's own success-path popleft would then remove a different (un-uploaded) step, silently dropping its data. Fix: popleft the in-flight item into a local variable before awaiting the HTTP call. The in-flight item is no longer in the queue, so concurrent eviction cannot touch it. On failure we appendleft to preserve ordering for the next drain tick. --- src/prime_rl/utils/monitor/prime.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/prime_rl/utils/monitor/prime.py b/src/prime_rl/utils/monitor/prime.py index dbef7265d6..c866c75490 100644 --- a/src/prime_rl/utils/monitor/prime.py +++ b/src/prime_rl/utils/monitor/prime.py @@ -452,11 +452,16 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes async with self._sample_upload_lock: while self._sample_upload_queue: - pending_step, pending_bytes = self._sample_upload_queue[0] + # Pop the in-flight item out of the queue entirely before awaiting, + # so concurrent eviction (which runs without the lock) cannot remove + # the head we are uploading. On failure we appendleft to preserve + # ordering; on success the item simply stays gone. + pending_step, pending_bytes = self._sample_upload_queue.popleft() start = time.perf_counter() try: await self._upload_one_sample_step(pending_step, pending_bytes) except Exception as e: + self._sample_upload_queue.appendleft((pending_step, pending_bytes)) self.logger.opt(exception=True).warning( f"Sample upload for step {pending_step} failed after retries; " f"keeping in backlog (size={len(self._sample_upload_queue)}): " @@ -465,8 +470,7 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes # Stop draining; will retry on next log_samples tick. return - # Success: pop and update bookkeeping. - self._sample_upload_queue.popleft() + # Success: bookkeeping (item is already popped). self._pending_sample_steps.discard(pending_step) self.last_log_samples_step = pending_step self.logger.debug(f"Uploaded samples for step {pending_step} in {time.perf_counter() - start:.2f}s") From 5bd86391f796e8615b5cbaeb29bc70c908e1cfbd Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 17:02:40 -0700 Subject: [PATCH 3/7] test(monitor): regression tests for sample-upload retry queue Two new tests: 1. Failure-then-retry: a step that fails upload stays at the head of the backlog and is reattempted on the next log_samples tick. 2. Concurrent-eviction race (the codex P1 from PR review): start a drain that pops the in-flight item into a local var and awaits, then fire 5 concurrent log_samples calls that each evict from the queue. Verify the in-flight step survives and the correctly-oldest queued step is the one evicted on backlog overflow. --- tests/unit/utils/test_prime_monitor.py | 108 ++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/tests/unit/utils/test_prime_monitor.py b/tests/unit/utils/test_prime_monitor.py index f44065a7c6..bb732cbf75 100644 --- a/tests/unit/utils/test_prime_monitor.py +++ b/tests/unit/utils/test_prime_monitor.py @@ -1,10 +1,16 @@ +import asyncio import io import json +from collections import deque from unittest.mock import Mock +import httpx import pyarrow.parquet as pq -from prime_rl.utils.monitor.prime import PrimeMonitor +from prime_rl.utils.monitor.prime import ( + _MAX_PENDING_SAMPLE_UPLOADS, + PrimeMonitor, +) def _new_monitor() -> PrimeMonitor: @@ -13,6 +19,18 @@ def _new_monitor() -> PrimeMonitor: return monitor +def _new_drain_monitor() -> PrimeMonitor: + """Monitor with the minimum state needed for sample-upload drain tests.""" + monitor = _new_monitor() + monitor.logger = Mock() + monitor.run_id = "run-test" + monitor._pending_sample_steps = set() + monitor._sample_upload_queue = deque() + monitor._sample_upload_lock = None + monitor.last_log_samples_step = -1 + return monitor + + def _build_rollout(*, example_id: int, reward: float, task: str) -> dict: return { "example_id": example_id, @@ -94,6 +112,94 @@ def test_rollouts_to_parquet_bytes_skips_rollouts_without_trajectory(): assert rows[0]["sample_id"] == 0 +def test_sample_upload_failure_keeps_step_in_backlog_and_retries_next_tick(): + monitor = _new_drain_monitor() + attempts: list[int] = [] + fail_steps_once = {0} + + async def upload_one(step: int, _bytes: bytes) -> None: + attempts.append(step) + if step in fail_steps_once: + fail_steps_once.discard(step) + raise httpx.ConnectError("transient") + + monitor._upload_one_sample_step = upload_one # type: ignore[method-assign] + + async def scenario() -> None: + monitor._pending_sample_steps.add(0) + await monitor._enqueue_and_drain_samples_async(0, b"p0") + # First tick fails: step 0 stays at the head of the backlog. + assert list(monitor._sample_upload_queue) == [(0, b"p0")] + assert 0 in monitor._pending_sample_steps + + # Second tick adds step 10 and drains both, oldest-first. + monitor._pending_sample_steps.add(10) + await monitor._enqueue_and_drain_samples_async(10, b"p10") + assert list(monitor._sample_upload_queue) == [] + assert monitor._pending_sample_steps == set() + assert monitor.last_log_samples_step == 10 + + asyncio.run(scenario()) + # Step 0: failed, retried, succeeded; then step 10 succeeded. + assert attempts == [0, 0, 10] + + +def test_in_flight_sample_upload_survives_concurrent_eviction(): + """Regression test: backlog eviction runs without the upload lock, so a + concurrent log_samples() call could previously popleft the head that the + drain coroutine was peeking at and currently uploading. The fix pops the + in-flight item out of the queue into a local variable before awaiting, + making it invisible to eviction. + """ + monitor = _new_drain_monitor() + completed: list[int] = [] + + async def scenario() -> None: + started = asyncio.Event() + finish = asyncio.Event() + + async def slow_upload(step: int, _bytes: bytes) -> None: + if not started.is_set(): + started.set() + await finish.wait() + completed.append(step) + + monitor._upload_one_sample_step = slow_upload # type: ignore[method-assign] + + # Pre-load step 10; it will become the in-flight item. + monitor._pending_sample_steps.add(10) + monitor._sample_upload_queue.append((10, b"p10")) + + # Drain coroutine A: enqueues step 20, acquires lock, popleft's step 10 + # into a local variable, awaits. + monitor._pending_sample_steps.add(20) + a = asyncio.create_task(monitor._enqueue_and_drain_samples_async(20, b"p20")) + await started.wait() + assert list(monitor._sample_upload_queue) == [(20, b"p20")] + + # Five concurrent log_samples calls. Each appends + evicts outside the lock. + # Queue grows to size 6 on the last call; eviction popleft's step 20. + # Step 10 is in A's local var, so eviction can never see it. + extra_steps = [30, 40, 50, 60, 70] + for s in extra_steps: + monitor._pending_sample_steps.add(s) + bs = [asyncio.create_task(monitor._enqueue_and_drain_samples_async(s, f"p{s}".encode())) for s in extra_steps] + await asyncio.sleep(0.05) + queue_before_release = [s for s, _ in monitor._sample_upload_queue] + assert queue_before_release == [30, 40, 50, 60, 70] + + finish.set() + await asyncio.gather(a, *bs) + + asyncio.run(scenario()) + # The in-flight step survived, the correctly-oldest queued step was evicted. + assert 10 in completed + assert 20 not in completed + for s in [30, 40, 50, 60, 70]: + assert s in completed + assert _MAX_PENDING_SAMPLE_UPLOADS == 5 + + def test_sanitize_json_payload_drops_non_finite_values_and_logs_paths(): monitor = _new_monitor() monitor.logger = Mock() From dff33d5cc8ffe58a21489cd10acc2444e0b35ed4 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 17:13:53 -0700 Subject: [PATCH 4/7] fix(monitor): drop sample uploads on permanent failure instead of re-enqueuing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cursor Bugbot on #2425 caught that the outer drain's `except Exception` re-queued every failure, including the non-retryable 4xx errors that the inner tenacity layer (correctly) fails fast on. A permanently- failing step (e.g. 400 from /samples/presign, expired API key) would sit at the queue head and block every subsequent upload until backlog overflow eventually evicted it. Fix: split the drain's failure path by `_is_retryable_upload_error(e)`. Retryable → appendleft and stop draining (existing behavior, will retry next tick). Non-retryable → drop the step, clear from `_pending_sample_steps`, and `continue` so later items aren't blocked. Adds `test_non_retryable_sample_upload_failure_does_not_block_queue` which preloads the queue with a step that raises a 400, plus three healthy steps, and verifies that all four are attempted and the queue ends up empty (the 400 step dropped rather than re-enqueued). --- src/prime_rl/utils/monitor/prime.py | 29 ++++++++++++++----- tests/unit/utils/test_prime_monitor.py | 39 ++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/prime_rl/utils/monitor/prime.py b/src/prime_rl/utils/monitor/prime.py index c866c75490..93a410d261 100644 --- a/src/prime_rl/utils/monitor/prime.py +++ b/src/prime_rl/utils/monitor/prime.py @@ -454,21 +454,36 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes while self._sample_upload_queue: # Pop the in-flight item out of the queue entirely before awaiting, # so concurrent eviction (which runs without the lock) cannot remove - # the head we are uploading. On failure we appendleft to preserve - # ordering; on success the item simply stays gone. + # the head we are uploading. On retryable failure we appendleft to + # preserve ordering; on success or permanent failure the item simply + # stays gone. pending_step, pending_bytes = self._sample_upload_queue.popleft() start = time.perf_counter() try: await self._upload_one_sample_step(pending_step, pending_bytes) except Exception as e: - self._sample_upload_queue.appendleft((pending_step, pending_bytes)) + if _is_retryable_upload_error(e): + # Transient — keep at the head for next log_samples tick. + self._sample_upload_queue.appendleft((pending_step, pending_bytes)) + self.logger.opt(exception=True).warning( + f"Sample upload for step {pending_step} failed after retries; " + f"keeping in backlog (size={len(self._sample_upload_queue)}): " + f"{type(e).__name__}: {e}" + ) + # Stop draining; if this step keeps failing, later steps shouldn't + # be blocked behind it indefinitely — backlog overflow eviction + # eventually frees them. + return + + # Permanent (e.g. 4xx that's not 408/429): drop this step so it + # doesn't block the queue head, and continue draining the rest. + self._pending_sample_steps.discard(pending_step) self.logger.opt(exception=True).warning( - f"Sample upload for step {pending_step} failed after retries; " - f"keeping in backlog (size={len(self._sample_upload_queue)}): " + f"Sample upload for step {pending_step} failed with non-retryable error; " + f"dropping step (queue size={len(self._sample_upload_queue)}): " f"{type(e).__name__}: {e}" ) - # Stop draining; will retry on next log_samples tick. - return + continue # Success: bookkeeping (item is already popped). self._pending_sample_steps.discard(pending_step) diff --git a/tests/unit/utils/test_prime_monitor.py b/tests/unit/utils/test_prime_monitor.py index bb732cbf75..768ae16adc 100644 --- a/tests/unit/utils/test_prime_monitor.py +++ b/tests/unit/utils/test_prime_monitor.py @@ -144,6 +144,45 @@ async def scenario() -> None: assert attempts == [0, 0, 10] +def test_non_retryable_sample_upload_failure_does_not_block_queue(): + """A permanent failure (e.g. 400 from presign, expired API key) must not + re-enqueue and block the head — it should be dropped so the rest of the + backlog can drain. + """ + monitor = _new_drain_monitor() + attempted: list[int] = [] + + async def upload_one(step: int, _bytes: bytes) -> None: + attempted.append(step) + if step == 0: + request = httpx.Request("POST", "https://example/presign") + raise httpx.HTTPStatusError( + "bad request", + request=request, + response=httpx.Response(400, request=request), + ) + + monitor._upload_one_sample_step = upload_one # type: ignore[method-assign] + + async def scenario() -> None: + # Pre-load queue so the drain has work to do behind the failing head. + for s in [0, 10, 20]: + monitor._pending_sample_steps.add(s) + monitor._sample_upload_queue.append((s, f"p{s}".encode())) + + # Trigger drain via a fresh log_samples call. + monitor._pending_sample_steps.add(30) + await monitor._enqueue_and_drain_samples_async(30, b"p30") + + asyncio.run(scenario()) + + # Step 0 (non-retryable 400) was attempted and dropped; 10/20/30 all uploaded. + assert attempted == [0, 10, 20, 30] + assert list(monitor._sample_upload_queue) == [] + assert monitor._pending_sample_steps == set() + assert monitor.last_log_samples_step == 30 + + def test_in_flight_sample_upload_survives_concurrent_eviction(): """Regression test: backlog eviction runs without the upload lock, so a concurrent log_samples() call could previously popleft the head that the From 8e88ee14aa15b11b9db264018cd0a0dd188299f3 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 17:45:34 -0700 Subject: [PATCH 5/7] fix(monitor): cooldown after retryable upload failure to avoid back-to-back retries Codex P2 on #2425: when R2/API is down, multiple log_samples() calls queue drain coroutines on the upload lock. After the first one finishes its full ~120s tenacity retry budget and appendlefts the failed head back, the next waiter immediately reacquires the lock and retries the same head, burning another full budget. With sample interval=10, this ties up the monitor event loop for several full retry budgets in a row during a sustained outage and delays distributions/metrics uploads on the same client. Fix: arm a `_retryable_cooldown_until` timestamp (60s) on retryable failure. Drain checks it at the top of each iteration and bails when active. Cleared on success. Updates `test_sample_upload_failure_keeps_step_in_backlog_and_retries_next_tick` to advance the fake clock past the cooldown before the second tick, and adds `test_retryable_failure_arms_cooldown_to_prevent_back_to_back_retries` which asserts three rapid-fire failed log_samples calls produce only one upload attempt, then verifies a successful tick after the cooldown. --- src/prime_rl/utils/monitor/prime.py | 30 +++++++++--- tests/unit/utils/test_prime_monitor.py | 64 +++++++++++++++++++++++++- 2 files changed, 87 insertions(+), 7 deletions(-) diff --git a/src/prime_rl/utils/monitor/prime.py b/src/prime_rl/utils/monitor/prime.py index 93a410d261..b1257b6ac8 100644 --- a/src/prime_rl/utils/monitor/prime.py +++ b/src/prime_rl/utils/monitor/prime.py @@ -52,6 +52,12 @@ def _is_retryable_upload_error(exc: BaseException) -> bool: # survive a few intervals of R2 unavailability without unbounded memory growth. _MAX_PENDING_SAMPLE_UPLOADS = 5 +# After a retryable upload failure (R2/API outage), suppress further drain +# attempts for this long so coroutines queued during the failed drain don't +# each consume another full tenacity retry budget back-to-back. The next +# log_samples tick after this expires picks up the backlog. +_RETRYABLE_COOLDOWN_S = 60.0 + def _json(val: Any) -> str: """JSON-serialize dicts/lists, pass strings through, default to empty string for None.""" @@ -452,6 +458,13 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes async with self._sample_upload_lock: while self._sample_upload_queue: + # If a previous drain just hit a retryable failure, every coroutine + # queued on this lock would otherwise pop the same head and burn + # another full tenacity budget. Bail out and let a later tick (after + # the cooldown elapses) pick up the backlog. + if self._retryable_cooldown_until is not None and time.monotonic() < self._retryable_cooldown_until: + return + # Pop the in-flight item out of the queue entirely before awaiting, # so concurrent eviction (which runs without the lock) cannot remove # the head we are uploading. On retryable failure we appendleft to @@ -463,16 +476,16 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes await self._upload_one_sample_step(pending_step, pending_bytes) except Exception as e: if _is_retryable_upload_error(e): - # Transient — keep at the head for next log_samples tick. + # Transient — keep at the head and arm the cooldown so other + # waiters don't immediately retry the same head. self._sample_upload_queue.appendleft((pending_step, pending_bytes)) + self._retryable_cooldown_until = time.monotonic() + _RETRYABLE_COOLDOWN_S self.logger.opt(exception=True).warning( f"Sample upload for step {pending_step} failed after retries; " - f"keeping in backlog (size={len(self._sample_upload_queue)}): " + f"keeping in backlog (size={len(self._sample_upload_queue)}, " + f"cooldown={_RETRYABLE_COOLDOWN_S:.0f}s): " f"{type(e).__name__}: {e}" ) - # Stop draining; if this step keeps failing, later steps shouldn't - # be blocked behind it indefinitely — backlog overflow eviction - # eventually frees them. return # Permanent (e.g. 4xx that's not 408/429): drop this step so it @@ -485,7 +498,9 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes ) continue - # Success: bookkeeping (item is already popped). + # Success: bookkeeping (item is already popped). Clear any stale + # cooldown — R2 is healthy. + self._retryable_cooldown_until = None self._pending_sample_steps.discard(pending_step) self.last_log_samples_step = pending_step self.logger.debug(f"Uploaded samples for step {pending_step} in {time.perf_counter() - start:.2f}s") @@ -684,6 +699,9 @@ def _init_async_client(self) -> None: # to the right asyncio loop after fork. self._sample_upload_queue: deque[tuple[int, bytes]] = deque() self._sample_upload_lock: asyncio.Lock | None = None + # Set to a future monotonic timestamp after a retryable failure so queued + # drains don't all reattempt the same failing head back-to-back. + self._retryable_cooldown_until: float | None = None if hasattr(self, "_pending_sample_steps") and self._pending_sample_steps: self._pending_sample_steps.clear() diff --git a/tests/unit/utils/test_prime_monitor.py b/tests/unit/utils/test_prime_monitor.py index 768ae16adc..fc32d0c364 100644 --- a/tests/unit/utils/test_prime_monitor.py +++ b/tests/unit/utils/test_prime_monitor.py @@ -27,6 +27,7 @@ def _new_drain_monitor() -> PrimeMonitor: monitor._pending_sample_steps = set() monitor._sample_upload_queue = deque() monitor._sample_upload_lock = None + monitor._retryable_cooldown_until = None monitor.last_log_samples_step = -1 return monitor @@ -112,11 +113,17 @@ def test_rollouts_to_parquet_bytes_skips_rollouts_without_trajectory(): assert rows[0]["sample_id"] == 0 -def test_sample_upload_failure_keeps_step_in_backlog_and_retries_next_tick(): +def test_sample_upload_failure_keeps_step_in_backlog_and_retries_next_tick(monkeypatch): monitor = _new_drain_monitor() attempts: list[int] = [] fail_steps_once = {0} + fake_time = [1000.0] + monkeypatch.setattr( + "prime_rl.utils.monitor.prime.time.monotonic", + lambda: fake_time[0], + ) + async def upload_one(step: int, _bytes: bytes) -> None: attempts.append(step) if step in fail_steps_once: @@ -132,6 +139,9 @@ async def scenario() -> None: assert list(monitor._sample_upload_queue) == [(0, b"p0")] assert 0 in monitor._pending_sample_steps + # Advance past the retryable cooldown before the next tick. + fake_time[0] += 120.0 + # Second tick adds step 10 and drains both, oldest-first. monitor._pending_sample_steps.add(10) await monitor._enqueue_and_drain_samples_async(10, b"p10") @@ -183,6 +193,58 @@ async def scenario() -> None: assert monitor.last_log_samples_step == 30 +def test_retryable_failure_arms_cooldown_to_prevent_back_to_back_retries(monkeypatch): + """When R2 is down, multiple log_samples calls would otherwise each consume a + full tenacity retry budget on the same failing head back-to-back. The cooldown + timestamp set on retryable failure makes subsequent drains short-circuit until + it elapses. + """ + monitor = _new_drain_monitor() + attempts: list[int] = [] + + # Freeze monotonic time so we can move it deterministically. + fake_time = [1000.0] + monkeypatch.setattr( + "prime_rl.utils.monitor.prime.time.monotonic", + lambda: fake_time[0], + ) + + async def scenario() -> None: + async def fail(step: int, _bytes: bytes) -> None: + attempts.append(step) + raise httpx.ConnectError("R2 down") + + monitor._upload_one_sample_step = fail # type: ignore[method-assign] + + # Three log_samples calls in rapid succession while R2 is "down". + for s in [0, 10, 20]: + monitor._pending_sample_steps.add(s) + await monitor._enqueue_and_drain_samples_async(s, f"p{s}".encode()) + + # Only step 0 should have been attempted; the cooldown must suppress + # retries for the drains triggered by steps 10 and 20. + assert attempts == [0], f"expected only step 0 attempted, got {attempts}" + assert [s for s, _ in monitor._sample_upload_queue] == [0, 10, 20] + assert monitor._retryable_cooldown_until is not None + + # Advance time past the cooldown and switch to a successful uploader + # — next drain should resume processing. + fake_time[0] += 120.0 + + async def ok(step: int, _bytes: bytes) -> None: + attempts.append(step) + + monitor._upload_one_sample_step = ok # type: ignore[method-assign] + monitor._pending_sample_steps.add(30) + await monitor._enqueue_and_drain_samples_async(30, b"p30") + + assert attempts == [0, 0, 10, 20, 30] + assert list(monitor._sample_upload_queue) == [] + assert monitor._retryable_cooldown_until is None + + asyncio.run(scenario()) + + def test_in_flight_sample_upload_survives_concurrent_eviction(): """Regression test: backlog eviction runs without the upload lock, so a concurrent log_samples() call could previously popleft the head that the From 70c956435cb0fde6aea4ad4a06cc5155a2656f3d Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 22:33:21 -0700 Subject: [PATCH 6/7] test(monitor): drop sample-upload retry queue tests to keep PR minimal Removes the tests added in 5bd86391f, dff33d5cc, and 8e88ee14a. The behavior they covered (failure-retry persistence, in-flight survives concurrent eviction, non-retryable drop, retryable cooldown) is still implemented and was verified locally; just keeping the diff small. --- tests/unit/utils/test_prime_monitor.py | 209 +------------------------ 1 file changed, 1 insertion(+), 208 deletions(-) diff --git a/tests/unit/utils/test_prime_monitor.py b/tests/unit/utils/test_prime_monitor.py index fc32d0c364..f44065a7c6 100644 --- a/tests/unit/utils/test_prime_monitor.py +++ b/tests/unit/utils/test_prime_monitor.py @@ -1,16 +1,10 @@ -import asyncio import io import json -from collections import deque from unittest.mock import Mock -import httpx import pyarrow.parquet as pq -from prime_rl.utils.monitor.prime import ( - _MAX_PENDING_SAMPLE_UPLOADS, - PrimeMonitor, -) +from prime_rl.utils.monitor.prime import PrimeMonitor def _new_monitor() -> PrimeMonitor: @@ -19,19 +13,6 @@ def _new_monitor() -> PrimeMonitor: return monitor -def _new_drain_monitor() -> PrimeMonitor: - """Monitor with the minimum state needed for sample-upload drain tests.""" - monitor = _new_monitor() - monitor.logger = Mock() - monitor.run_id = "run-test" - monitor._pending_sample_steps = set() - monitor._sample_upload_queue = deque() - monitor._sample_upload_lock = None - monitor._retryable_cooldown_until = None - monitor.last_log_samples_step = -1 - return monitor - - def _build_rollout(*, example_id: int, reward: float, task: str) -> dict: return { "example_id": example_id, @@ -113,194 +94,6 @@ def test_rollouts_to_parquet_bytes_skips_rollouts_without_trajectory(): assert rows[0]["sample_id"] == 0 -def test_sample_upload_failure_keeps_step_in_backlog_and_retries_next_tick(monkeypatch): - monitor = _new_drain_monitor() - attempts: list[int] = [] - fail_steps_once = {0} - - fake_time = [1000.0] - monkeypatch.setattr( - "prime_rl.utils.monitor.prime.time.monotonic", - lambda: fake_time[0], - ) - - async def upload_one(step: int, _bytes: bytes) -> None: - attempts.append(step) - if step in fail_steps_once: - fail_steps_once.discard(step) - raise httpx.ConnectError("transient") - - monitor._upload_one_sample_step = upload_one # type: ignore[method-assign] - - async def scenario() -> None: - monitor._pending_sample_steps.add(0) - await monitor._enqueue_and_drain_samples_async(0, b"p0") - # First tick fails: step 0 stays at the head of the backlog. - assert list(monitor._sample_upload_queue) == [(0, b"p0")] - assert 0 in monitor._pending_sample_steps - - # Advance past the retryable cooldown before the next tick. - fake_time[0] += 120.0 - - # Second tick adds step 10 and drains both, oldest-first. - monitor._pending_sample_steps.add(10) - await monitor._enqueue_and_drain_samples_async(10, b"p10") - assert list(monitor._sample_upload_queue) == [] - assert monitor._pending_sample_steps == set() - assert monitor.last_log_samples_step == 10 - - asyncio.run(scenario()) - # Step 0: failed, retried, succeeded; then step 10 succeeded. - assert attempts == [0, 0, 10] - - -def test_non_retryable_sample_upload_failure_does_not_block_queue(): - """A permanent failure (e.g. 400 from presign, expired API key) must not - re-enqueue and block the head — it should be dropped so the rest of the - backlog can drain. - """ - monitor = _new_drain_monitor() - attempted: list[int] = [] - - async def upload_one(step: int, _bytes: bytes) -> None: - attempted.append(step) - if step == 0: - request = httpx.Request("POST", "https://example/presign") - raise httpx.HTTPStatusError( - "bad request", - request=request, - response=httpx.Response(400, request=request), - ) - - monitor._upload_one_sample_step = upload_one # type: ignore[method-assign] - - async def scenario() -> None: - # Pre-load queue so the drain has work to do behind the failing head. - for s in [0, 10, 20]: - monitor._pending_sample_steps.add(s) - monitor._sample_upload_queue.append((s, f"p{s}".encode())) - - # Trigger drain via a fresh log_samples call. - monitor._pending_sample_steps.add(30) - await monitor._enqueue_and_drain_samples_async(30, b"p30") - - asyncio.run(scenario()) - - # Step 0 (non-retryable 400) was attempted and dropped; 10/20/30 all uploaded. - assert attempted == [0, 10, 20, 30] - assert list(monitor._sample_upload_queue) == [] - assert monitor._pending_sample_steps == set() - assert monitor.last_log_samples_step == 30 - - -def test_retryable_failure_arms_cooldown_to_prevent_back_to_back_retries(monkeypatch): - """When R2 is down, multiple log_samples calls would otherwise each consume a - full tenacity retry budget on the same failing head back-to-back. The cooldown - timestamp set on retryable failure makes subsequent drains short-circuit until - it elapses. - """ - monitor = _new_drain_monitor() - attempts: list[int] = [] - - # Freeze monotonic time so we can move it deterministically. - fake_time = [1000.0] - monkeypatch.setattr( - "prime_rl.utils.monitor.prime.time.monotonic", - lambda: fake_time[0], - ) - - async def scenario() -> None: - async def fail(step: int, _bytes: bytes) -> None: - attempts.append(step) - raise httpx.ConnectError("R2 down") - - monitor._upload_one_sample_step = fail # type: ignore[method-assign] - - # Three log_samples calls in rapid succession while R2 is "down". - for s in [0, 10, 20]: - monitor._pending_sample_steps.add(s) - await monitor._enqueue_and_drain_samples_async(s, f"p{s}".encode()) - - # Only step 0 should have been attempted; the cooldown must suppress - # retries for the drains triggered by steps 10 and 20. - assert attempts == [0], f"expected only step 0 attempted, got {attempts}" - assert [s for s, _ in monitor._sample_upload_queue] == [0, 10, 20] - assert monitor._retryable_cooldown_until is not None - - # Advance time past the cooldown and switch to a successful uploader - # — next drain should resume processing. - fake_time[0] += 120.0 - - async def ok(step: int, _bytes: bytes) -> None: - attempts.append(step) - - monitor._upload_one_sample_step = ok # type: ignore[method-assign] - monitor._pending_sample_steps.add(30) - await monitor._enqueue_and_drain_samples_async(30, b"p30") - - assert attempts == [0, 0, 10, 20, 30] - assert list(monitor._sample_upload_queue) == [] - assert monitor._retryable_cooldown_until is None - - asyncio.run(scenario()) - - -def test_in_flight_sample_upload_survives_concurrent_eviction(): - """Regression test: backlog eviction runs without the upload lock, so a - concurrent log_samples() call could previously popleft the head that the - drain coroutine was peeking at and currently uploading. The fix pops the - in-flight item out of the queue into a local variable before awaiting, - making it invisible to eviction. - """ - monitor = _new_drain_monitor() - completed: list[int] = [] - - async def scenario() -> None: - started = asyncio.Event() - finish = asyncio.Event() - - async def slow_upload(step: int, _bytes: bytes) -> None: - if not started.is_set(): - started.set() - await finish.wait() - completed.append(step) - - monitor._upload_one_sample_step = slow_upload # type: ignore[method-assign] - - # Pre-load step 10; it will become the in-flight item. - monitor._pending_sample_steps.add(10) - monitor._sample_upload_queue.append((10, b"p10")) - - # Drain coroutine A: enqueues step 20, acquires lock, popleft's step 10 - # into a local variable, awaits. - monitor._pending_sample_steps.add(20) - a = asyncio.create_task(monitor._enqueue_and_drain_samples_async(20, b"p20")) - await started.wait() - assert list(monitor._sample_upload_queue) == [(20, b"p20")] - - # Five concurrent log_samples calls. Each appends + evicts outside the lock. - # Queue grows to size 6 on the last call; eviction popleft's step 20. - # Step 10 is in A's local var, so eviction can never see it. - extra_steps = [30, 40, 50, 60, 70] - for s in extra_steps: - monitor._pending_sample_steps.add(s) - bs = [asyncio.create_task(monitor._enqueue_and_drain_samples_async(s, f"p{s}".encode())) for s in extra_steps] - await asyncio.sleep(0.05) - queue_before_release = [s for s, _ in monitor._sample_upload_queue] - assert queue_before_release == [30, 40, 50, 60, 70] - - finish.set() - await asyncio.gather(a, *bs) - - asyncio.run(scenario()) - # The in-flight step survived, the correctly-oldest queued step was evicted. - assert 10 in completed - assert 20 not in completed - for s in [30, 40, 50, 60, 70]: - assert s in completed - assert _MAX_PENDING_SAMPLE_UPLOADS == 5 - - def test_sanitize_json_payload_drops_non_finite_values_and_logs_paths(): monitor = _new_monitor() monitor.logger = Mock() From 5c96d98b02a914b2c188605fdf365f03f9e92824 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Tue, 5 May 2026 22:39:29 -0700 Subject: [PATCH 7/7] fix(monitor): final sample-upload drain on close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P2 on #2425: a retryable failure on the last log_samples tick (or just before close) leaves the parquet bytes appendleft'd back into the backlog with the upload future already complete. _flush() only waits on pending futures, not on the queue, so close() proceeds to aclose the HTTP client and stop the loop while the queued samples sit in memory and are never retried — losing the very data the backlog was meant to preserve. Fix: extract the drain loop into _drain_sample_backlog and call it once more from _flush() with ignore_cooldown=True, bounded by the existing flush timeout. Any items still in the queue after that drain are logged as lost. --- src/prime_rl/utils/monitor/prime.py | 57 ++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/src/prime_rl/utils/monitor/prime.py b/src/prime_rl/utils/monitor/prime.py index b1257b6ac8..35d19352f9 100644 --- a/src/prime_rl/utils/monitor/prime.py +++ b/src/prime_rl/utils/monitor/prime.py @@ -456,13 +456,30 @@ async def _enqueue_and_drain_samples_async(self, step: int, parquet_bytes: bytes f"dropping oldest queued step {dropped_step}" ) + await self._drain_sample_backlog() + + async def _drain_sample_backlog(self, *, ignore_cooldown: bool = False) -> None: + """Drain the sample backlog under the upload lock. Used by the regular + log_samples path and by close()/_flush() for a final shutdown attempt. + With ignore_cooldown=True, retries the head even if the retryable cooldown + is currently armed — last-chance attempt before the loop stops. + """ + if self._sample_upload_lock is None: + self._sample_upload_lock = asyncio.Lock() + if ignore_cooldown: + self._retryable_cooldown_until = None + async with self._sample_upload_lock: while self._sample_upload_queue: # If a previous drain just hit a retryable failure, every coroutine # queued on this lock would otherwise pop the same head and burn # another full tenacity budget. Bail out and let a later tick (after # the cooldown elapses) pick up the backlog. - if self._retryable_cooldown_until is not None and time.monotonic() < self._retryable_cooldown_until: + if ( + not ignore_cooldown + and self._retryable_cooldown_until is not None + and time.monotonic() < self._retryable_cooldown_until + ): return # Pop the in-flight item out of the queue entirely before awaiting, @@ -715,21 +732,41 @@ def _run_event_loop(self) -> None: self._loop.run_forever() def _flush(self, timeout: float = 30.0) -> None: - """Wait for all pending async requests to complete.""" + """Wait for all pending async requests to complete and make a last-chance + attempt to drain any sample uploads still queued in the backlog. Without + this final drain, a retryable failure on the last log_samples tick would + leave its parquet bytes parked in the queue and lost on close. + """ if not self.enabled or not hasattr(self, "_loop"): return - if not self._pending_futures: - return - - self.logger.debug(f"Flushing {len(self._pending_futures)} pending request(s)") - for future in self._pending_futures: + if self._pending_futures: + self.logger.debug(f"Flushing {len(self._pending_futures)} pending request(s)") + for future in self._pending_futures: + try: + future.result(timeout=timeout) + except Exception as e: + self.logger.debug(f"Pending request completed with error: {e}") + self._pending_futures.clear() + + # Final shutdown drain of the sample backlog. Bypass the cooldown — this + # is our last chance before the loop stops. + if hasattr(self, "_sample_upload_queue") and self._sample_upload_queue: + backlog_size = len(self._sample_upload_queue) + self.logger.info(f"Final sample-upload drain on close ({backlog_size} step(s) queued)") try: + future = asyncio.run_coroutine_threadsafe( + self._drain_sample_backlog(ignore_cooldown=True), + self._loop, + ) future.result(timeout=timeout) except Exception as e: - self.logger.debug(f"Pending request completed with error: {e}") - - self._pending_futures.clear() + self.logger.warning(f"Final sample-upload drain failed: {type(e).__name__}: {e}") + remaining = len(self._sample_upload_queue) + if remaining: + self.logger.warning( + f"{remaining} sample upload(s) still in backlog at close — these samples will be lost" + ) async def _make_request_async(self, endpoint: str, data: dict[str, Any], max_retries: int = 3) -> None: """Make an async POST request to the Prime Intellect API with retries."""