diff --git a/tests/test_cli_agent_env.py b/tests/test_cli_agent_env.py index 041ec3505..f90bbc74f 100644 --- a/tests/test_cli_agent_env.py +++ b/tests/test_cli_agent_env.py @@ -3,12 +3,16 @@ import asyncio import tempfile from pathlib import Path +from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest from datasets import Dataset import verifiers as vf +from verifiers.envs.experimental.cli_agent_env import CliAgentMonitorRubric +from verifiers.types import RolloutTiming +from verifiers.utils.save_utils import state_to_output from verifiers.utils.interception_utils import serialize_intercept_response @@ -224,6 +228,163 @@ async def test_non_streaming_intercept_tools_use_oai_schema( assert kwargs["tools"][0].name == "echo" +@pytest.mark.asyncio +async def test_cli_agent_final_log_formats_missing_first_model_call(): + timing = RolloutTiming() + timing.generation.start = 100.0 + timing.generation.end = 115.0 + timing.setup.start = 100.0 + timing.setup.end = 105.0 + timing.scoring.start = 115.0 + timing.scoring.end = 118.0 + state = { + "rollout_id": "rollout_test", + "example_id": 2332, + "info": {"instance_id": "brazilian-utils__brutils-python-126"}, + "sandbox_id": "sbx_test", + "trajectory": [], + "stop_condition": "agent_completed", + "agent_exit_code": 0, + "agent_start_time": 105.0, + "agent_end_time": 112.0, + "timing": timing, + } + + rubric = CliAgentMonitorRubric() + rubric.logger = MagicMock() + + await rubric.cleanup(state) + + message = rubric.logger.info.call_args.args[0] + assert "setup_s=5.000" in message + assert "first_call_latency_s=n/a" in message + assert "agent_s=7.000" in message + assert "scoring_s=3.000" in message + assert "duration_s=18.000" in message + + +@pytest.mark.asyncio +async def test_cli_agent_failure_metrics_use_structured_failure(): + rubric = CliAgentMonitorRubric() + + assert ( + await rubric.agent_error( + { + "failure": { + "reason": "agent_poll_failed", + "origin": "agent", + "error_type": "AgentPollError", + "root_error_type": "AgentPollError", + "message": "poll failed", + "logs": {}, + } + } + ) + == 1.0 + ) + assert ( + await rubric.agent_error( + { + "failure": { + "reason": "sandbox_timeout", + "origin": "sandbox", + "error_type": "SandboxError", + "root_error_type": "SandboxError", + "message": "timeout", + "logs": {}, + } + } + ) + == 0.0 + ) + assert await rubric.agent_nonzero_exit({"agent_exit_code": 7}) == 1.0 + assert await rubric.agent_nonzero_exit({"agent_exit_code": 0}) == 0.0 + assert ( + await rubric.agent_poll_failed( + { + "failure": { + "reason": "agent_poll_failed", + "origin": "agent", + "error_type": "AgentPollError", + "root_error_type": "AgentPollError", + "message": "poll failed", + "logs": {}, + } + } + ) + == 1.0 + ) + + +def test_state_to_output_serializes_failure_and_preserves_error_fields(): + state = vf.State( + input={"prompt": [], "example_id": 1, "task": "default", "info": {}} + ) + state.update( + { + "example_id": 1, + "task": "default", + "prompt": [], + "completion": [], + "reward": 0.0, + "timing": RolloutTiming(), + "is_completed": True, + "is_truncated": False, + "stop_condition": "has_error", + "metrics": {}, + "tool_defs": [], + "trajectory": [], + "error": vf.ModelError("model failed"), + } + ) + + output = state_to_output(state, state_columns=[]) + + assert output["error"]["error"] == "ModelError" + assert output["error_chain"] == "ModelError('model failed')" + assert output["long_error_chain"] == "ModelError" + assert output["failure"]["reason"] == "model_error" + assert output["failure"]["origin"] == "model" + assert output["failure"]["error_type"] == "ModelError" + + +@pytest.mark.asyncio +async def test_failure_log_collection_returns_bounded_tails(sample_dataset): + env = vf.CliAgentEnv( + run_command="python agent.py", + dataset=sample_dataset, + rubric=vf.Rubric(), + ) + env.sandbox_client.get_background_job = AsyncMock( + return_value=SimpleNamespace(stdout="a" * 13000, stderr="err") + ) + state = {"sandbox_id": "sbx", "background_job": object()} + + logs = await env._collect_background_job_log_tails(state) + + assert logs["agent_stdout"].endswith("a" * env.FAILURE_LOG_TAIL_CHARS) + assert logs["agent_stdout"].startswith("... /tmp/install_progress.log" in script + assert "tee -a /tmp/install_progress.log" in script + assert 'echo "[setup] start $name"' in script + assert 'echo "[setup] end $name exit=$exit_code elapsed_s=$elapsed_s"' in script + assert 'run_setup_step "apt_dependencies"' in script + assert 'run_setup_step "ripgrep_install"' in script + assert 'run_setup_step "download_opencode"' in script + assert 'run_setup_step "verify_opencode_sha256"' in script + assert 'run_setup_step "install_opencode"' in script + + +def test_observed_setup_wrapper_writes_trace_and_preserves_exit(): + wrapped = ComposableEnv._wrap_observed_setup_command( + "echo hi && false", "agent_install" + ) + + assert "/tmp/vf_observed_command.log" in wrapped + assert "event=observed_command_started" in wrapped + assert "event=setup_failed" in wrapped + assert 'exit "${PIPESTATUS[0]}"' in wrapped diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index 36d5c9743..a12d5dfb9 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -608,6 +608,7 @@ async def init_state( state["reward"] = None state["metrics"] = None state["error"] = None + state["failure"] = None state["final_env_response"] = None state["timing"] = RolloutTiming() return state diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 87e8d849e..6dd4b8b22 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -1,6 +1,7 @@ import asyncio import logging import os +import shlex import time import uuid from collections import Counter @@ -20,6 +21,8 @@ SandboxMixin, SandboxMonitorRubric, SandboxTimeouts, + format_rollout_log_event, + log_rollout_event, ) from verifiers.types import ( AssistantMessage, @@ -36,7 +39,15 @@ deliver_response, synthesize_stream, ) -from verifiers.utils.logging_utils import print_time, truncate +from verifiers.utils.failure_utils import ( + FAILURE_ORIGIN_AGENT, + FAILURE_REASON_AGENT_NONZERO_EXIT, + FAILURE_REASON_AGENT_POLL_FAILED, + add_failure_logs, + ensure_rollout_failure, + tail_text, +) +from verifiers.utils.logging_utils import truncate from verifiers.utils.message_utils import normalize_messages logger = logging.getLogger(__name__) @@ -46,7 +57,13 @@ class AgentError(vf.InfraError): """Raised when the agent process fails or exits unexpectedly.""" -def make_agent_error(state: State, message: str) -> AgentError: +class AgentPollError(AgentError): + """Raised when polling the sandbox background job fails.""" + + +def make_agent_error( + state: State, message: str, error_cls: type[AgentError] = AgentError +) -> AgentError: """Create an AgentError with rollout-specific sandbox context when available.""" context_parts = [ f"sandbox_id={state['sandbox_id']}", @@ -57,7 +74,107 @@ def make_agent_error(state: State, message: str) -> AgentError: instance_id = state_info.get("instance_id") if instance_id: context_parts.append(f"instance_id={instance_id}") - return AgentError(f"{message} ({', '.join(context_parts)})") + return error_cls(f"{message} ({', '.join(context_parts)})") + + +def _collect_tool_counts(state: State) -> Counter[str]: + tool_counts: Counter[str] = Counter() + for step in state.get("trajectory", []): + for msg in step.get("completion", []): + if isinstance(msg, AssistantMessage) and isinstance(msg.tool_calls, list): + for tc in msg.tool_calls: + if isinstance(tc, ToolCall): + tool_counts[tc.name] += 1 + return tool_counts + + +def _phase(timing: Any, name: str) -> Any: + if timing is None: + return None + if isinstance(timing, dict): + return timing.get(name) + return getattr(timing, name, None) + + +def _span_start(span: Any) -> float: + if span is None: + return 0.0 + if isinstance(span, dict): + return float(span.get("start", 0.0) or 0.0) + return float(getattr(span, "start", 0.0) or 0.0) + + +def _span_end(span: Any) -> float: + if span is None: + return 0.0 + if isinstance(span, dict): + return float(span.get("end", 0.0) or 0.0) + return float(getattr(span, "end", 0.0) or 0.0) + + +def _span_duration(span: Any) -> float: + if span is None: + return 0.0 + duration = ( + span.get("duration") + if isinstance(span, dict) + else getattr(span, "duration", None) + ) + if duration is not None: + return float(duration or 0.0) + end = _span_end(span) + start = _span_start(span) + return end - start if end > 0 and start > 0 else 0.0 + + +def _time_spans(timing: Any, name: str) -> list[Any]: + phase = _phase(timing, name) + if phase is None: + return [] + if isinstance(phase, dict): + spans = phase.get("spans", []) + else: + spans = getattr(phase, "spans", []) + return list(spans or []) + + +def _first_call_latency_s(state: State) -> float | None: + timing = state.get("timing") + spans = _time_spans(timing, "model") + if not spans: + return None + first_model_start = _span_start(spans[0]) + agent_start = float(state.get("agent_start_time") or 0.0) + if agent_start <= 0: + agent_start = _span_end(_phase(timing, "setup")) + if first_model_start <= 0 or agent_start <= 0: + return None + return max(0.0, first_model_start - agent_start) + + +def _agent_duration_s(state: State) -> float: + agent_start = float(state.get("agent_start_time") or 0.0) + if agent_start <= 0: + return 0.0 + agent_end = float(state.get("agent_end_time") or 0.0) + if agent_end <= 0: + agent_end = _span_end(_phase(state.get("timing"), "generation")) + return max(0.0, agent_end - agent_start) if agent_end > 0 else 0.0 + + +def _rollout_total_s(state: State) -> float: + timing = state.get("timing") + scoring = _phase(timing, "scoring") + generation = _phase(timing, "generation") + scoring_end = _span_end(scoring) + generation_start = _span_start(generation) + if scoring_end > 0 and generation_start > 0: + return max(0.0, scoring_end - generation_start) + return _span_duration(generation) + + +def _format_optional_s(value: float | None) -> str: + return "n/a" if value is None else f"{value:.3f}" class CliAgentMonitorRubric(vf.Rubric): @@ -67,18 +184,100 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.add_metric(self.agent_timeout) self.add_metric(self.agent_error) + self.add_metric(self.agent_nonzero_exit) + self.add_metric(self.agent_poll_failed) async def agent_timeout(self, state: vf.State) -> float: """Whether the agent timed out.""" return float(bool(state.get("timed_out"))) async def agent_error(self, state: vf.State) -> float: - """Whether the agent errored (non-zero exit_code).""" + """Whether the rollout failure originated in the CLI agent.""" + failure = ensure_rollout_failure(state) + if failure is None: + return 0.0 + return float(failure.get("origin") == FAILURE_ORIGIN_AGENT) + + async def agent_nonzero_exit(self, state: vf.State) -> float: + """Whether the agent process completed with a non-zero exit code.""" agent_exit_code = state.get("agent_exit_code") if agent_exit_code is None: return 0.0 return float(agent_exit_code != 0) + async def agent_poll_failed(self, state: vf.State) -> float: + """Whether polling/reading the background agent job failed.""" + failure = ensure_rollout_failure(state) + if failure is None: + return 0.0 + return float(failure.get("reason") == FAILURE_REASON_AGENT_POLL_FAILED) + + @vf.cleanup + async def log_rollout_finished(self, state: vf.State) -> None: + """Log final rollout lifecycle details after scoring has completed.""" + if state.get("_cli_agent_rollout_finished_logged"): + return + state["_cli_agent_rollout_finished_logged"] = True + + tool_counts = Counter(state.get("_cli_agent_tool_counts") or {}) + if not tool_counts: + tool_counts = _collect_tool_counts(state) + tools_str = ",".join(f"{k}:{v}" for k, v in tool_counts.most_common()) + error = state.get("error") + error_info = ( + f"{type(error).__name__}: {truncate(str(error), 80)}" if error else None + ) + timed_out = state.get("timed_out", False) + stop_condition = state.get("stop_condition", "unknown") + exit_code = state.get("agent_exit_code") + turns = len(state.get("trajectory", [])) + failure = ensure_rollout_failure(state) + + if error_info or timed_out: + self.logger.error( + format_rollout_log_event( + "rollout_aborted", + state, + stop=stop_condition, + exit_code=exit_code, + timed_out=timed_out if timed_out else None, + reason=failure.get("reason") if failure else None, + origin=failure.get("origin") if failure else None, + error=error_info, + ) + ) + + timing = state.get("timing") + self.logger.info( + format_rollout_log_event( + "rollout_finished", + state, + turns=len(state.get("trajectory", [])), + tools=f"[{tools_str}]", + stop=stop_condition, + exit_code=exit_code, + setup_s=_span_duration(_phase(timing, "setup")), + first_call_latency_s=_format_optional_s(_first_call_latency_s(state)), + agent_s=_agent_duration_s(state), + scoring_s=_span_duration(_phase(timing, "scoring")), + duration_s=_rollout_total_s(state), + ) + ) + + if turns == 0 and not error_info and not timed_out: + self.logger.error( + format_rollout_log_event( + "rollout_empty_trajectory", + state, + stop=stop_condition, + exit_code=exit_code, + reason=failure.get("reason") if failure else None, + origin=failure.get("origin") if failure else None, + agent_s=_agent_duration_s(state), + duration_s=_rollout_total_s(state), + ) + ) + class CliAgentEnv(SandboxMixin, vf.MultiTurnEnv): """ @@ -233,6 +432,24 @@ async def setup_state(self, state: State) -> State: rollout_id = f"rollout_{uuid.uuid4().hex[:8]}" state["rollout_id"] = rollout_id + setup_start = time.perf_counter() + setup_succeeded = False + log_rollout_event(self.logger, "setup_started", state) + try: + await self._setup_sandbox_and_agent(state, rollout_id) + setup_succeeded = True + finally: + log_rollout_event( + self.logger, + "setup_finished", + state, + status="ok" if setup_succeeded else "error", + elapsed_s=time.perf_counter() - setup_start, + ) + return state + + async def _setup_sandbox_and_agent(self, state: State, rollout_id: str) -> None: + """Create sandbox resources and launch the CLI agent.""" interception_server = self._require_interception_server() await interception_server.start() @@ -287,7 +504,6 @@ async def setup_state(self, state: State) -> State: f"example_id={state['example_id']}", ] self.logger.info(" | ".join(parts)) - return state async def get_docker_image(self, state: State) -> str: """Get the Docker image for the sandbox. Override for per-task images.""" @@ -345,6 +561,8 @@ async def start_agent(self, state: State) -> None: sandbox_id = state["sandbox_id"] self.logger.debug(f"Starting agent in sandbox {sandbox_id}") + launch_start = time.perf_counter() + log_rollout_event(self.logger, "agent_launch_started", state) try: background_job: BackgroundJob = ( await self.sandbox_client.start_background_job( @@ -360,6 +578,12 @@ async def start_agent(self, state: State) -> None: state["completion_wait_task"] = asyncio.create_task( self.wait_for_completion(state) ) + log_rollout_event( + self.logger, + "agent_launched", + state, + elapsed_s=time.perf_counter() - launch_start, + ) async def wait_for_completion(self, state: State) -> None: """Poll for agent completion using background job API.""" @@ -376,9 +600,24 @@ async def wait_for_completion(self, state: State) -> None: self.logger.debug("Completion wait task cancelled") raise except Exception as e: - error = make_agent_error(state, f"Agent polling failed: {e}") + state["agent_poll_failed"] = True + error = make_agent_error( + state, f"Agent polling failed: {e}", error_cls=AgentPollError + ) state["error"] = error - self.logger.error(str(error)) + ensure_rollout_failure( + state, + reason=FAILURE_REASON_AGENT_POLL_FAILED, + origin=FAILURE_ORIGIN_AGENT, + error=error, + ) + self.logger.error( + format_rollout_log_event( + "agent_poll_failed", + state, + error=truncate(str(error), 500), + ) + ) finally: state["agent_completed"] = True @@ -391,6 +630,7 @@ async def poll_job_completion( sandbox_id, background_job, timeout=self.timeouts.poll ) if status.completed: + state["agent_end_time"] = time.time() state["agent_exit_code"] = status.exit_code state["agent_stdout"] = status.stdout state["agent_stderr"] = status.stderr @@ -414,7 +654,21 @@ async def poll_job_completion( f"(exit_code={status.exit_code}): {stderr_full}", ) state["error"] = error - self.logger.error(str(error)) + ensure_rollout_failure( + state, + reason=FAILURE_REASON_AGENT_NONZERO_EXIT, + origin=FAILURE_ORIGIN_AGENT, + error=error, + ) + self.logger.error( + format_rollout_log_event( + "rollout_aborted", + state, + stop="agent_nonzero_exit", + exit_code=status.exit_code, + error=truncate(str(error), 500), + ) + ) return await asyncio.sleep(self.poll_interval) @@ -635,6 +889,91 @@ async def add_model_response( state, prompt_messages, await self.normalize_response(response) ) + FAILURE_LOG_TAIL_CHARS = 12000 + SETUP_DIAGNOSTIC_LOG_PATHS = { + "observed_command_log": "/tmp/vf_observed_command.log", + "install_progress_log": "/tmp/install_progress.log", + } + + def get_failure_log_paths(self, state: State) -> dict[str, str]: + """Return sandbox paths whose bounded tails should be attached to failures.""" + return dict(self.SETUP_DIAGNOSTIC_LOG_PATHS) + + async def _read_sandbox_log_tail( + self, sandbox_id: str, path: str, *, max_chars: int | None = None + ) -> str: + max_chars = max_chars or self.FAILURE_LOG_TAIL_CHARS + quoted_path = shlex.quote(path) + result = await self.sandbox_client.execute_command( + sandbox_id, + f"tail -c {int(max_chars)} {quoted_path} 2>/dev/null || true", + working_dir=None, + timeout=self.timeouts.read_file, + ) + return (result.stdout or "").strip() + + async def _collect_background_job_log_tails(self, state: State) -> dict[str, str]: + logs: dict[str, str] = {} + for key in ("agent_stdout", "agent_stderr"): + if state.get(key): + logs[key] = tail_text(state.get(key), self.FAILURE_LOG_TAIL_CHARS) + + sandbox_id = state.get("sandbox_id") + background_job = state.get("background_job") + if ( + sandbox_id + and background_job + and not {"agent_stdout", "agent_stderr"} <= logs.keys() + ): + try: + status = await self.sandbox_client.get_background_job( + sandbox_id, background_job, timeout=self.timeouts.poll + ) + if status.stdout and "agent_stdout" not in logs: + logs["agent_stdout"] = tail_text( + status.stdout, self.FAILURE_LOG_TAIL_CHARS + ) + if status.stderr and "agent_stderr" not in logs: + logs["agent_stderr"] = tail_text( + status.stderr, self.FAILURE_LOG_TAIL_CHARS + ) + except Exception as e: + self.logger.warning( + format_rollout_log_event( + "diagnostic_collection_failed", + state, + source="background_job", + error=f"{type(e).__name__}: {truncate(str(e), 200)}", + ) + ) + return logs + + async def collect_failure_diagnostics(self, state: State) -> dict[str, str]: + """Collect bounded diagnostic log tails without masking rollout failures.""" + logs = await self._collect_background_job_log_tails(state) + sandbox_id = state.get("sandbox_id") + if sandbox_id: + for name, path in self.get_failure_log_paths(state).items(): + try: + value = await self._read_sandbox_log_tail(sandbox_id, path) + except Exception as e: + self.logger.warning( + format_rollout_log_event( + "diagnostic_collection_failed", + state, + source=name, + error=f"{type(e).__name__}: {truncate(str(e), 200)}", + ) + ) + continue + if value: + logs[name] = tail_text(value, self.FAILURE_LOG_TAIL_CHARS) + for name, value in logs.items(): + self.logger.debug( + f"{format_rollout_log_event('failure_log_collected', state, source=name, chars=len(value))}\n{value}" + ) + return logs + @vf.teardown async def teardown_resources(self): """Stop Prime Tunnel and HTTP interception server.""" @@ -662,8 +1001,6 @@ async def cleanup_interception_context(self, state: State): except asyncio.CancelledError: pass - state.pop("background_job", None) - rollout_id = state.get("rollout_id") if rollout_id and self._interception_server is not None: self._interception_server.unregister_rollout(rollout_id) @@ -678,47 +1015,11 @@ async def post_rollout(self, state: State): Override for custom post-rollout logic. For example, if sandbox state is needed for reward functions, run computation here and cache the result in state before sandbox is destroyed. """ - tool_counts: Counter[str] = Counter() - for step in state.get("trajectory", []): - for msg in step.get("completion", []): - if isinstance(msg, AssistantMessage) and isinstance( - msg.tool_calls, list - ): - for tc in msg.tool_calls: - if isinstance(tc, ToolCall): - tool_counts[tc.name] += 1 - - example_id = state.get("example_id") - num_turns = len(state.get("trajectory", [])) - stop_condition = state.get("stop_condition", "unknown") - error = state.get("error") - error_info = ( - f"{type(error).__name__}: {truncate(str(error), 80)}" if error else None - ) - exit_code = state.get("agent_exit_code") - timed_out = state.get("timed_out", False) - # post_rollout runs during finalization, before Environment.run_rollout - # stamps scoring.end. timing.total is therefore still 0 here, so derive - # the live duration from the generation-phase start. - timing = state.get("timing") - gen = getattr(timing, "generation", None) if timing is not None else None - gen_start = getattr(gen, "start", 0.0) if gen is not None else 0.0 - duration_s = time.time() - gen_start if gen_start > 0 else 0.0 - tools_str = ",".join(f"{k}:{v}" for k, v in tool_counts.most_common()) - parts = [ - f"Finished rollout_id={state.get('rollout_id')}", - f"example_id={example_id}", - f"turns={num_turns}", - f"tools=[{tools_str}]", - f"stop={stop_condition}", - f"exit_code={exit_code}", - f"duration={print_time(duration_s)}", - ] - if timed_out: - parts.append("timed_out=True") - if error_info: - parts.append(f"error={error_info}") - self.logger.info(" | ".join(parts)) + state["_cli_agent_tool_counts"] = dict(_collect_tool_counts(state)) + failure = ensure_rollout_failure(state) + if failure is not None: + logs = await self.collect_failure_diagnostics(state) + add_failure_logs(state, logs) @vf.cleanup async def destroy_sandbox(self, state: State): @@ -741,6 +1042,7 @@ async def destroy_sandbox(self, state: State): self.deregister_sandbox(sandbox_id) else: await self.delete_sandbox(sandbox_id) + state.pop("background_job", None) async def env_response( self, messages: Messages, state: State, **kwargs diff --git a/verifiers/envs/experimental/composable/composable_env.py b/verifiers/envs/experimental/composable/composable_env.py index 1d1a28afe..ed5307269 100644 --- a/verifiers/envs/experimental/composable/composable_env.py +++ b/verifiers/envs/experimental/composable/composable_env.py @@ -52,9 +52,18 @@ from verifiers.envs.experimental.cli_agent_env import CliAgentEnv from verifiers.envs.experimental.composable.harness import Harness from verifiers.envs.experimental.composable.task import TaskSet +from verifiers.envs.experimental.sandbox_mixin import ( + format_rollout_log_event, + log_rollout_event, +) from verifiers.envs.experimental.utils.file_locks import shared_path_lock from verifiers.envs.tool_env import ToolMonitorRubric from verifiers.types import State, TrajectoryStep +from verifiers.utils.failure_utils import ( + FAILURE_ORIGIN_SANDBOX, + FAILURE_REASON_SANDBOX_SETUP_FAILED, + ensure_rollout_failure, +) from verifiers.utils.logging_utils import print_size, print_time logger = logging.getLogger(__name__) @@ -261,13 +270,83 @@ async def post_sandbox_setup(self, state: State) -> None: onto the installed agent.""" sandbox_id = state["sandbox_id"] - await self._populate_sandbox_context(state) - await self.taskset.setup(state) - await self._create_harness_input_dirs(sandbox_id) - await self._upload_harness_inputs(sandbox_id, state) - await self._after_harness_inputs_uploaded(state) - await self._install_agent(sandbox_id) - await self._run_post_install(sandbox_id) + await self._run_setup_step( + state, "populate_sandbox_context", self._populate_sandbox_context, state + ) + await self._run_setup_step(state, "task_setup", self.taskset.setup, state) + await self._run_setup_step( + state, + "create_harness_input_dirs", + self._create_harness_input_dirs, + sandbox_id, + ) + await self._run_setup_step( + state, + "upload_harness_inputs", + self._upload_harness_inputs, + sandbox_id, + state, + ) + await self._run_setup_step( + state, + "after_harness_inputs_uploaded", + self._after_harness_inputs_uploaded, + state, + ) + if self.harness.install_script: + await self._run_setup_step( + state, "agent_install", self._install_agent, sandbox_id + ) + if self.harness.post_install_uploads or self.harness.post_install_script: + await self._run_setup_step( + state, "post_install", self._run_post_install, sandbox_id + ) + + async def _run_setup_step( + self, + state: State, + step: str, + func: Any, + *args: Any, + ) -> Any: + start = time.perf_counter() + log_rollout_event(self.logger, "setup_step_started", state, step=step) + try: + result = await func(*args) + except BaseException as e: + if not isinstance(e, asyncio.CancelledError): + ensure_rollout_failure( + state, + reason=FAILURE_REASON_SANDBOX_SETUP_FAILED, + origin=FAILURE_ORIGIN_SANDBOX, + error=e, + ) + self.logger.error( + format_rollout_log_event( + "setup_failed", + state, + step=step, + error=f"{type(e).__name__}: {str(e)[:500]}", + ) + ) + log_rollout_event( + self.logger, + "setup_step_finished", + state, + step=step, + status="error", + elapsed_s=time.perf_counter() - start, + ) + raise + log_rollout_event( + self.logger, + "setup_step_finished", + state, + step=step, + status="ok", + elapsed_s=time.perf_counter() - start, + ) + return result async def post_rollout(self, state: State) -> None: """Collect agent logs and harness metrics after the agent finishes. @@ -371,6 +450,34 @@ def _get_install_execute_kwargs(self) -> dict[str, Any]: kwargs["env"] = self.install_env return kwargs + def get_failure_log_paths(self, state: State) -> dict[str, str]: + paths = super().get_failure_log_paths(state) + if self.harness.log_path: + paths["agent_log"] = self.harness.log_path + return paths + + @staticmethod + def _wrap_observed_setup_command(command: str, step: str) -> str: + """Wrap setup commands with a grep-friendly trace log in the sandbox.""" + one_line = " ".join(command.split()) + if len(one_line) > 1000: + one_line = one_line[:1000] + "..." + script = f"""\ +set +e +{{ + echo "event=observed_command_started step={shlex.quote(step)} command={shlex.quote(one_line)}" + bash -lc {shlex.quote(command)} + exit_code="$?" + if [ "$exit_code" -ne 0 ]; then + echo "event=setup_failed step={shlex.quote(step)} exit_code=$exit_code" + fi + echo "event=observed_command_finished step={shlex.quote(step)} exit_code=$exit_code" + exit "$exit_code" +}} 2>&1 | tee -a /tmp/vf_observed_command.log +exit "${{PIPESTATUS[0]}}" +""" + return f"bash -lc {shlex.quote(script)}" + async def _install_agent(self, sandbox_id: str) -> None: """Install the agent inside the sandbox when an install script is present.""" if self.harness.install_script: @@ -378,7 +485,9 @@ async def _install_agent(self, sandbox_id: str) -> None: install_start = time.perf_counter() result = await self.sandbox_client.execute_command( sandbox_id, - self.harness.install_script, + self._wrap_observed_setup_command( + self.harness.install_script, "agent_install" + ), **self._get_install_execute_kwargs(), ) elapsed = time.perf_counter() - install_start @@ -409,7 +518,9 @@ async def _run_post_install(self, sandbox_id: str) -> None: self.logger.debug(f"Running post-install script in sandbox {sandbox_id}") result = await self.sandbox_client.execute_command( sandbox_id, - self.harness.post_install_script, + self._wrap_observed_setup_command( + self.harness.post_install_script, "post_install" + ), **self._get_install_execute_kwargs(), ) if result.exit_code != 0: diff --git a/verifiers/envs/experimental/composable/harnesses/opencode.py b/verifiers/envs/experimental/composable/harnesses/opencode.py index a320c61db..276ce9d0d 100644 --- a/verifiers/envs/experimental/composable/harnesses/opencode.py +++ b/verifiers/envs/experimental/composable/harnesses/opencode.py @@ -58,7 +58,8 @@ def build_install_script( ) -> str: """Build the shell script that installs OpenCode in a sandbox.""" rg_install = ( - "apt-get -o Acquire::Retries=3 install -y -qq ripgrep > /dev/null 2>&1 || true" + 'run_setup_step "ripgrep_install" ' + '"apt-get -o Acquire::Retries=3 install -y -qq ripgrep > /dev/null 2>&1 || true"' if install_ripgrep else "" ) @@ -68,17 +69,33 @@ def build_install_script( # bug #1876035. apt's default retries is 0, so one bad fetch fails the rollout. return f"""\ set -e -apt-get -o Acquire::Retries=3 update -qq && apt-get -o Acquire::Retries=3 install -y -qq curl tar > /dev/null 2>&1 +: > /tmp/install_progress.log +run_setup_step() {{ + name="$1" + shift + start="$(date +%s)" + echo "[setup] start $name" | tee -a /tmp/install_progress.log + set +e + eval "$*" + exit_code="$?" + set -e + end="$(date +%s)" + elapsed_s="$((end - start))" + echo "[setup] end $name exit=$exit_code elapsed_s=$elapsed_s" | tee -a /tmp/install_progress.log + return "$exit_code" +}} + +run_setup_step "apt_dependencies" "apt-get -o Acquire::Retries=3 update -qq && apt-get -o Acquire::Retries=3 install -y -qq curl tar > /dev/null 2>&1" {rg_install} OPENCODE_RELEASE_REPO="{release_repo}" OPENCODE_RELEASE_VERSION="{release_version}" -case "$(uname -m)" in +run_setup_step "detect_arch" 'case "$(uname -m)" in x86_64) OPENCODE_ARCH=x64 ;; aarch64|arm64) OPENCODE_ARCH=arm64 ;; *) echo "Unsupported architecture: $(uname -m)"; exit 1 ;; -esac +esac' OPENCODE_ASSET="opencode-linux-$OPENCODE_ARCH.tar.gz" OPENCODE_RELEASE_TAG="${{OPENCODE_RELEASE_VERSION#v}}" @@ -86,14 +103,14 @@ def build_install_script( mkdir -p "$HOME/.opencode/bin" if [ -x "$HOME/.opencode/bin/opencode" ]; then - echo "OpenCode already installed, skipping download" + echo "OpenCode already installed, skipping download" | tee -a /tmp/install_progress.log else - curl -fsSL "$OPENCODE_RELEASE_URL" -o /tmp/opencode.tar.gz - {sha256_check} - tar -xzf /tmp/opencode.tar.gz -C /tmp - install -m 755 /tmp/opencode "$HOME/.opencode/bin/opencode" + run_setup_step "download_opencode" 'curl -fsSL "$OPENCODE_RELEASE_URL" -o /tmp/opencode.tar.gz' + run_setup_step "verify_opencode_sha256" '{sha256_check}' + run_setup_step "extract_opencode" "tar -xzf /tmp/opencode.tar.gz -C /tmp" + run_setup_step "install_opencode" 'install -m 755 /tmp/opencode "$HOME/.opencode/bin/opencode"' rm -f /tmp/opencode.tar.gz /tmp/opencode - echo "OpenCode installed successfully" + echo "OpenCode installed successfully" | tee -a /tmp/install_progress.log fi """ diff --git a/verifiers/envs/experimental/composable/tasksets/swe/swe_rebench_v2.py b/verifiers/envs/experimental/composable/tasksets/swe/swe_rebench_v2.py index a88df4a6a..0d63aa4b2 100644 --- a/verifiers/envs/experimental/composable/tasksets/swe/swe_rebench_v2.py +++ b/verifiers/envs/experimental/composable/tasksets/swe/swe_rebench_v2.py @@ -26,16 +26,19 @@ from __future__ import annotations +import hashlib import json import logging import re import tempfile +import time from pathlib import Path from typing import Any import verifiers as vf from datasets import load_dataset from verifiers.envs.experimental.composable import SandboxSpec, SandboxTaskSet +from verifiers.envs.experimental.sandbox_mixin import log_rollout_event from verifiers.envs.experimental.composable.tasksets.swe import ( swe_rebench_v2_log_parsers as _lp, @@ -48,6 +51,8 @@ DATASET_NAME = "nebius/SWE-rebench-V2" +PATCH_OUTPUT_LIMIT = 800 +PATCH_PREVIEW_LIMIT = 500 # Broad PATH covering all known SWE-rebench-V2 language toolchains. @@ -132,6 +137,43 @@ def _normalize_test_cmds(test_cmd: Any) -> list[str]: return cmds +def _truncate_context(value: str | None, limit: int = PATCH_OUTPUT_LIMIT) -> str: + if not value: + return "" + if len(value) <= limit: + return value + return value[:limit] + f"... " + + +def _patch_preview(patch: str) -> str: + return _truncate_context(patch, PATCH_PREVIEW_LIMIT) + + +def _patch_failure_message( + label: str, + patch: str, + attempts: list[tuple[str, str, str, Any]], +) -> str: + patch_bytes = patch.encode("utf-8") + lines = [ + f"{label} apply failed", + f"patch_sha256={hashlib.sha256(patch_bytes).hexdigest()}", + f"patch_size={len(patch_bytes)}", + f"patch_preview={_patch_preview(patch)!r}", + ] + for name, command, workdir, result in attempts: + lines.extend( + [ + f"{name}_command={command!r}", + f"{name}_working_dir={workdir!r}", + f"{name}_exit_code={getattr(result, 'exit_code', None)}", + f"{name}_stdout={_truncate_context(getattr(result, 'stdout', None))!r}", + f"{name}_stderr={_truncate_context(getattr(result, 'stderr', None))!r}", + ] + ) + return "\n".join(lines) + + def _build_eval_script(test_cmds: list[str], workdir: str) -> str: """Run the upstream ``test_cmd`` sequence and emit a SENTINEL bookend. @@ -323,9 +365,20 @@ async def setup(self, state) -> None: "&& install -m 755 /tmp/ripgrep-${V}-x86_64-unknown-linux-musl/rg /usr/local/bin/rg; " "}" ) + rg_start = time.perf_counter() + log_rollout_event(logger, "setup_step_started", state, step="ripgrep_install") result = await sandbox_client.execute_command( sandbox_id, rg_install, timeout=120 ) + log_rollout_event( + logger, + "setup_step_finished", + state, + step="ripgrep_install", + status="ok" if result.exit_code == 0 else "error", + exit_code=result.exit_code, + elapsed_s=time.perf_counter() - rg_start, + ) if result.exit_code != 0: logger.warning( f"[{sandbox_id}] ripgrep install failed (exit={result.exit_code}); " @@ -335,7 +388,12 @@ async def setup(self, state) -> None: test_patch = (info or {}).get("test_patch") or "" if test_patch.strip(): await self._apply_patch_file( - sandbox_client, sandbox_id, workdir, test_patch, "test_patch" + sandbox_client, + sandbox_id, + workdir, + test_patch, + "test_patch", + state=state, ) async def _run_tests( @@ -424,7 +482,12 @@ async def _apply_patch_file( workdir: str, patch: str, label: str, + state: dict | None = None, ) -> None: + context_state = state or {"sandbox_id": sandbox_id} + patch_start = time.perf_counter() + patch_succeeded = False + log_rollout_event(logger, "setup_step_started", context_state, step=label) with tempfile.NamedTemporaryFile(suffix=".patch", mode="w", delete=False) as f: f.write(patch) f.flush() @@ -432,32 +495,89 @@ async def _apply_patch_file( remote_path = f"/tmp/{label}.patch" try: - await sandbox_client.upload_file(sandbox_id, remote_path, local_path) - finally: - Path(local_path).unlink(missing_ok=True) - - # Upstream eval.py uses ``git apply -v --3way --recount - # --ignore-space-change --whitespace=nowarn``. We follow the same - # flags; fall back to ``patch --fuzz=5`` if git apply rejects. - git_cmd = ( - "git apply -v --3way --recount --ignore-space-change " - f"--whitespace=nowarn {remote_path}" - ) - result = await sandbox_client.execute_command( - sandbox_id, git_cmd, working_dir=workdir, timeout=60 - ) - if result.exit_code != 0: - result = await sandbox_client.execute_command( + try: + await sandbox_client.upload_file(sandbox_id, remote_path, local_path) + finally: + Path(local_path).unlink(missing_ok=True) + + # Upstream eval.py uses ``git apply -v --3way --recount + # --ignore-space-change --whitespace=nowarn``. We follow the same + # flags; fall back to ``patch --fuzz=5`` if git apply rejects. + git_cmd = ( + "git apply -v --3way --recount --ignore-space-change " + f"--whitespace=nowarn {remote_path}" + ) + git_start = time.perf_counter() + log_rollout_event( + logger, + "setup_step_started", + context_state, + step=f"{label}_git_apply", + command=git_cmd, + ) + git_result = await sandbox_client.execute_command( + sandbox_id, git_cmd, working_dir=workdir, timeout=60 + ) + log_rollout_event( + logger, + "setup_step_finished", + context_state, + step=f"{label}_git_apply", + status="ok" if git_result.exit_code == 0 else "error", + exit_code=git_result.exit_code, + elapsed_s=time.perf_counter() - git_start, + ) + if git_result.exit_code == 0: + patch_succeeded = True + return + + fallback_cmd = f"patch --fuzz=5 -p1 -i {remote_path}" + fallback_start = time.perf_counter() + log_rollout_event( + logger, + "setup_step_started", + context_state, + step=f"{label}_fallback_patch", + command=fallback_cmd, + ) + fallback_result = await sandbox_client.execute_command( sandbox_id, - f"patch --fuzz=5 -p1 -i {remote_path}", + fallback_cmd, working_dir=workdir, timeout=60, ) - if result.exit_code != 0: - stderr = (result.stderr or "")[:500] - raise RuntimeError( - f"{label} apply failed: exit_code={result.exit_code} stderr={stderr}" + log_rollout_event( + logger, + "setup_step_finished", + context_state, + step=f"{label}_fallback_patch", + status="ok" if fallback_result.exit_code == 0 else "error", + exit_code=fallback_result.exit_code, + elapsed_s=time.perf_counter() - fallback_start, + ) + if fallback_result.exit_code == 0: + patch_succeeded = True + return + + raise RuntimeError( + _patch_failure_message( + label, + patch, + [ + ("git_apply", git_cmd, workdir, git_result), + ("fallback_patch", fallback_cmd, workdir, fallback_result), + ], ) + ) + finally: + log_rollout_event( + logger, + "setup_step_finished", + context_state, + step=label, + status="ok" if patch_succeeded else "error", + elapsed_s=time.perf_counter() - patch_start, + ) async def _apply_gold_patch( self, sandbox_client: Any, sandbox_id: str, state: dict @@ -467,7 +587,9 @@ async def _apply_gold_patch( if not patch or not patch.strip(): raise RuntimeError("No gold patch in info['patch']") workdir = _repo_workdir(info["repo"]) - await self._apply_patch_file(sandbox_client, sandbox_id, workdir, patch, "gold") + await self._apply_patch_file( + sandbox_client, sandbox_id, workdir, patch, "gold", state=state + ) def get_rubric(self): return SWERebenchV2Rubric(self) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index 6ff9bcdb7..b74cd89b5 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -222,6 +222,11 @@ def remote_prompt_path(self) -> str: def remote_logs_path(self) -> str: return f"{self.asset_dir}/logs.txt" + def get_failure_log_paths(self, state: vf.State) -> dict[str, str]: + paths = super().get_failure_log_paths(state) + paths["agent_log"] = self.remote_logs_path + return paths + async def post_sandbox_setup(self, state: vf.State) -> None: """Upload prompt and optional system prompt after sandbox creation.""" sandbox_id = state.get("sandbox_id") @@ -341,7 +346,7 @@ async def post_rollout(self, state: vf.State) -> None: num_turns = len(state.get("trajectory", [])) agent_error = state.get("agent_exit_code", 0) != 0 if (agent_error or num_turns == 0) and agent_logs: - logger.warning( + logger.debug( f"Agent logs (example_id={state.get('example_id')}, " f"exit_code={state.get('agent_exit_code')}, turns={num_turns}):\n{agent_logs}" ) diff --git a/verifiers/envs/experimental/sandbox_mixin.py b/verifiers/envs/experimental/sandbox_mixin.py index 0e3d4ec2f..3135be95b 100644 --- a/verifiers/envs/experimental/sandbox_mixin.py +++ b/verifiers/envs/experimental/sandbox_mixin.py @@ -49,6 +49,40 @@ class SandboxNotReadyError(vf.SandboxError): ... class SandboxSetupError(vf.SandboxError): ... +def format_rollout_log_event( + event: str, + state: dict[str, Any] | None = None, + **fields: Any, +) -> str: + """Format rollout lifecycle logs with stable context fields.""" + state = state or {} + info = state.get("info") or {} + instance_id = info.get("instance_id") if isinstance(info, dict) else None + parts: list[str] = [f"event={event}"] + context = { + "rollout_id": state.get("rollout_id"), + "example_id": state.get("example_id"), + "instance_id": instance_id, + "sandbox_id": state.get("sandbox_id"), + } + for key, value in {**context, **fields}.items(): + if value is None: + continue + if isinstance(value, float): + value = f"{value:.3f}" + parts.append(f"{key}={value}") + return " | ".join(parts) + + +def log_rollout_event( + logger: logging.Logger, + event: str, + state: dict[str, Any] | None = None, + **fields: Any, +) -> None: + logger.info(format_rollout_log_event(event, state, **fields)) + + @dataclass(frozen=True) class SandboxTimeouts: """Per-operation HTTP timeouts (seconds) for sandbox client calls. @@ -228,6 +262,8 @@ async def create_sandbox(self, state, request: CreateSandboxRequest) -> str: SandboxNotReadyError: If sandbox fails to become ready. SandboxSetupError: If post_sandbox_setup hook fails. """ + create_start = time.perf_counter() + log_rollout_event(self.logger, "sandbox_create_started", state) if self.sandbox_creation_rate_limiter is not None: await self.sandbox_creation_rate_limiter.acquire() @@ -270,6 +306,12 @@ def cleanup_created_sandbox(task: asyncio.Task): raise SandboxNotReadyError( f"Sandbox {sandbox.id} failed to become ready: {e}" ) from e + log_rollout_event( + self.logger, + "sandbox_created", + state, + elapsed_s=time.perf_counter() - create_start, + ) try: self.logger.debug(f"Running post-sandbox setup in sandbox {sandbox.id}") diff --git a/verifiers/types.py b/verifiers/types.py index f0dc4ac55..8a5d3a002 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -328,6 +328,15 @@ class ErrorInfo(TypedDict): error_chain_str: str +class FailureInfo(TypedDict): + reason: str + origin: str + error_type: str | None + root_error_type: str | None + message: str + logs: dict[str, str] + + class RolloutOutput(dict): """Serialized output from a rollout (mirrors RolloutInput). @@ -337,8 +346,8 @@ class RolloutOutput(dict): Required fields: example_id, prompt, completion, reward, timing, is_completed, is_truncated, metrics - Optional fields: answer, info, error, stop_condition, trajectory, tool_defs, - token_usage + Optional fields: answer, info, error, failure, stop_condition, trajectory, + tool_defs, token_usage Additional fields: arbitrary serializable state_columns """ @@ -355,6 +364,7 @@ class RolloutOutput(dict): answer: str info: Info error: ErrorInfo | None + failure: FailureInfo | None stop_condition: str | None trajectory: list["TrajectoryStep"] tool_defs: list[Tool] diff --git a/verifiers/utils/failure_utils.py b/verifiers/utils/failure_utils.py new file mode 100644 index 000000000..31c0f7b9d --- /dev/null +++ b/verifiers/utils/failure_utils.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +from typing import Any, TypedDict + +from verifiers.errors import ModelError, SandboxError, TunnelError + + +class RolloutFailure(TypedDict): + reason: str + origin: str + error_type: str | None + root_error_type: str | None + message: str + logs: dict[str, str] + + +FAILURE_REASON_AGENT_NONZERO_EXIT = "agent_nonzero_exit" +FAILURE_REASON_AGENT_POLL_FAILED = "agent_poll_failed" +FAILURE_REASON_AGENT_EMPTY_TRAJECTORY = "agent_empty_trajectory" +FAILURE_REASON_ROLLOUT_TIMEOUT = "rollout_timeout" +FAILURE_REASON_SANDBOX_OOM = "sandbox_oom" +FAILURE_REASON_SANDBOX_TIMEOUT = "sandbox_timeout" +FAILURE_REASON_SANDBOX_COMMAND_FAILED = "sandbox_command_failed" +FAILURE_REASON_SANDBOX_SETUP_FAILED = "sandbox_setup_failed" +FAILURE_REASON_TUNNEL_ERROR = "tunnel_error" +FAILURE_REASON_STREAM_INTERRUPTED = "stream_interrupted" +FAILURE_REASON_MODEL_ERROR = "model_error" +FAILURE_REASON_ENV_SERVER_ERROR = "env_server_error" +FAILURE_REASON_UNKNOWN = "unknown" + +FAILURE_ORIGIN_AGENT = "agent" +FAILURE_ORIGIN_SANDBOX = "sandbox" +FAILURE_ORIGIN_TUNNEL = "tunnel" +FAILURE_ORIGIN_MODEL = "model" +FAILURE_ORIGIN_ENV_SERVER = "env_server" +FAILURE_ORIGIN_ROLLOUT = "rollout" +FAILURE_ORIGIN_UNKNOWN = "unknown" + +DEFAULT_FAILURE_MESSAGE_CHARS = 2000 +DEFAULT_FAILURE_LOG_CHARS = 12000 + + +def tail_text(value: Any, max_chars: int = DEFAULT_FAILURE_LOG_CHARS) -> str: + """Return a bounded string tail for logs and diagnostic messages.""" + text = "" if value is None else str(value) + if len(text) <= max_chars: + return text + return f"...\n{text[-max_chars:]}" + + +def _error_chain(error: BaseException | None) -> list[BaseException]: + chain: list[BaseException] = [] + seen: set[int] = set() + while error is not None and id(error) not in seen: + seen.add(id(error)) + chain.append(error) + error = error.__cause__ + return chain + + +def _chain_type_names(error: BaseException | None) -> list[str]: + return [type(item).__name__ for item in _error_chain(error)] + + +def _has_chain_type(error: BaseException | None, type_name: str) -> bool: + return type_name in _chain_type_names(error) + + +def classify_rollout_failure( + state: dict[str, Any], + error: BaseException | None = None, + *, + detect_empty_trajectory: bool = True, +) -> tuple[str, str]: + """Classify a rollout failure into a dashboard-oriented reason/origin pair.""" + error = state.get("error") if error is None else error + + if state.get("timed_out"): + return FAILURE_REASON_ROLLOUT_TIMEOUT, FAILURE_ORIGIN_ROLLOUT + if state.get("sandbox_oom"): + return FAILURE_REASON_SANDBOX_OOM, FAILURE_ORIGIN_SANDBOX + if state.get("sandbox_timeout"): + return FAILURE_REASON_SANDBOX_TIMEOUT, FAILURE_ORIGIN_SANDBOX + + if error is not None: + if _has_chain_type(error, "StreamInterrupted"): + return FAILURE_REASON_STREAM_INTERRUPTED, FAILURE_ORIGIN_TUNNEL + if isinstance(error, TunnelError) or _has_chain_type(error, "TunnelError"): + return FAILURE_REASON_TUNNEL_ERROR, FAILURE_ORIGIN_TUNNEL + if isinstance(error, ModelError) or _has_chain_type(error, "ModelError"): + return FAILURE_REASON_MODEL_ERROR, FAILURE_ORIGIN_MODEL + if state.get("agent_poll_failed") or _has_chain_type(error, "AgentPollError"): + return FAILURE_REASON_AGENT_POLL_FAILED, FAILURE_ORIGIN_AGENT + if _has_chain_type(error, "AgentError"): + agent_exit_code = state.get("agent_exit_code") + if agent_exit_code is not None and agent_exit_code != 0: + return FAILURE_REASON_AGENT_NONZERO_EXIT, FAILURE_ORIGIN_AGENT + return FAILURE_REASON_AGENT_POLL_FAILED, FAILURE_ORIGIN_AGENT + if _has_chain_type(error, "SandboxSetupError"): + return FAILURE_REASON_SANDBOX_SETUP_FAILED, FAILURE_ORIGIN_SANDBOX + if isinstance(error, SandboxError) or _has_chain_type(error, "SandboxError"): + return FAILURE_REASON_SANDBOX_COMMAND_FAILED, FAILURE_ORIGIN_SANDBOX + + agent_exit_code = state.get("agent_exit_code") + if agent_exit_code is not None and agent_exit_code != 0: + return FAILURE_REASON_AGENT_NONZERO_EXIT, FAILURE_ORIGIN_AGENT + + trajectory = state.get("trajectory") + if ( + detect_empty_trajectory + and isinstance(trajectory, list) + and len(trajectory) == 0 + ): + return FAILURE_REASON_AGENT_EMPTY_TRAJECTORY, FAILURE_ORIGIN_AGENT + + return FAILURE_REASON_UNKNOWN, FAILURE_ORIGIN_UNKNOWN + + +def make_rollout_failure( + state: dict[str, Any], + *, + reason: str | None = None, + origin: str | None = None, + error: BaseException | None = None, + message: str | None = None, + logs: dict[str, str] | None = None, + detect_empty_trajectory: bool = True, +) -> RolloutFailure: + """Build a JSON-serializable rollout failure payload.""" + error = state.get("error") if error is None else error + classified_reason, classified_origin = classify_rollout_failure( + state, error, detect_empty_trajectory=detect_empty_trajectory + ) + reason = reason or classified_reason + origin = origin or classified_origin + chain = _error_chain(error) + error_type = type(error).__name__ if error is not None else None + root_error_type = type(chain[-1]).__name__ if chain else None + if message is None: + message = str(error) if error is not None else reason + return { + "reason": reason, + "origin": origin, + "error_type": error_type, + "root_error_type": root_error_type, + "message": tail_text(message, DEFAULT_FAILURE_MESSAGE_CHARS), + "logs": {str(k): tail_text(v) for k, v in (logs or {}).items()}, + } + + +def normalize_rollout_failure(value: Any) -> RolloutFailure | None: + """Normalize an existing failure-like mapping into the public payload shape.""" + if value is None: + return None + if not isinstance(value, dict): + return None + logs = value.get("logs") + return { + "reason": str(value.get("reason") or FAILURE_REASON_UNKNOWN), + "origin": str(value.get("origin") or FAILURE_ORIGIN_UNKNOWN), + "error_type": value.get("error_type"), + "root_error_type": value.get("root_error_type"), + "message": str(value.get("message") or ""), + "logs": { + str(k): tail_text(v) + for k, v in (logs if isinstance(logs, dict) else {}).items() + }, + } + + +def ensure_rollout_failure( + state: dict[str, Any], + *, + reason: str | None = None, + origin: str | None = None, + error: BaseException | None = None, + message: str | None = None, + logs: dict[str, str] | None = None, + detect_empty_trajectory: bool = True, + overwrite: bool = False, +) -> RolloutFailure | None: + """Set and return state['failure'] when the state has a classified failure.""" + existing = normalize_rollout_failure(state.get("failure")) + if existing is not None and not overwrite: + if logs: + existing["logs"].update({str(k): tail_text(v) for k, v in logs.items()}) + state["failure"] = existing + return existing + + error = state.get("error") if error is None else error + should_create = ( + error is not None + or state.get("timed_out") + or state.get("sandbox_oom") + or state.get("sandbox_timeout") + ) + trajectory = state.get("trajectory") + should_create = should_create or ( + detect_empty_trajectory + and isinstance(trajectory, list) + and len(trajectory) == 0 + ) + should_create = should_create or reason is not None or origin is not None + if not should_create: + return None + + failure = make_rollout_failure( + state, + reason=reason, + origin=origin, + error=error, + message=message, + logs=logs, + detect_empty_trajectory=detect_empty_trajectory, + ) + state["failure"] = failure + return failure + + +def add_failure_logs( + state: dict[str, Any], + logs: dict[str, str], + *, + detect_empty_trajectory: bool = True, +) -> RolloutFailure | None: + """Merge bounded diagnostic logs into state['failure'] if a failure exists.""" + if not logs: + return normalize_rollout_failure(state.get("failure")) + failure = ensure_rollout_failure( + state, logs=logs, detect_empty_trajectory=detect_empty_trajectory + ) + if failure is None: + return None + failure["logs"].update({str(k): tail_text(v) for k, v in logs.items()}) + state["failure"] = failure + return failure diff --git a/verifiers/utils/save_utils.py b/verifiers/utils/save_utils.py index d9aa889e7..16d390103 100644 --- a/verifiers/utils/save_utils.py +++ b/verifiers/utils/save_utils.py @@ -22,6 +22,7 @@ Tool, ) from verifiers.utils.error_utils import ErrorChain +from verifiers.utils.failure_utils import ensure_rollout_failure from verifiers.utils.message_utils import ( sanitize_tool_calls, serialize_messages_for_output, @@ -251,6 +252,9 @@ def state_to_output( ) output["error_chain"] = output["error"]["error_chain_repr"] output["long_error_chain"] = output["error"]["error_chain_str"] + failure = ensure_rollout_failure(state, detect_empty_trajectory=False) + if failure is not None: + output["failure"] = failure # only include optional fields if non-empty if "answer" in output and not output["answer"]: output.pop("answer")