diff --git a/src/strands_evals/evaluators/evaluator.py b/src/strands_evals/evaluators/evaluator.py index c592356..4ad634a 100644 --- a/src/strands_evals/evaluators/evaluator.py +++ b/src/strands_evals/evaluators/evaluator.py @@ -1,3 +1,4 @@ +import asyncio import inspect import logging @@ -95,15 +96,13 @@ async def evaluate_async(self, evaluation_case: EvaluationData[InputT, OutputT]) """ Evaluate the performance of the task on the given test cases asynchronously. + Delegates to evaluate() via asyncio.to_thread by default, ensuring subclasses + that only implement evaluate() work in the async path. + Args: evaluation_case: The test case with all of the neccessary context to be evaluated. - - Raises: - NotImplementedError: This method is not implemented in the base class. """ - raise NotImplementedError( - "This method should be implemented in subclasses, especially if you want to run evaluations asynchronously." - ) + return await asyncio.to_thread(self.evaluate, evaluation_case) def _parse_trajectory(self, evaluation_case: EvaluationData[InputT, OutputT]) -> Any: """Parse Session trajectory using TraceExtractor.""" diff --git a/src/strands_evals/experiment.py b/src/strands_evals/experiment.py index 6ef4740..65d4cc3 100644 --- a/src/strands_evals/experiment.py +++ b/src/strands_evals/experiment.py @@ -151,73 +151,6 @@ def evaluators(self, new_evaluators: list[Evaluator[InputT, OutputT]]): """ self._evaluators = new_evaluators - def _record_evaluator_result( - self, - evaluator_data: dict[str, dict[str, list]], - eval_name: str, - case_data: dict, - test_pass: bool, - score: float, - reason: str, - detailed_results: list, - ): - """ - Record a single evaluator result in the evaluator_data dictionary. - - Args: - evaluator_data: Dictionary to store evaluator results - eval_name: Name of the evaluator - case_data: Case data (already serialized as dict) - test_pass: Whether the test passed - score: Evaluation score - reason: Reason/explanation for the result - detailed_results: Detailed evaluation outputs - """ - evaluator_data[eval_name]["cases"].append(case_data) - evaluator_data[eval_name]["test_passes"].append(test_pass) - evaluator_data[eval_name]["scores"].append(score) - evaluator_data[eval_name]["reasons"].append(reason) - evaluator_data[eval_name]["detailed_results"].append(detailed_results) - - def _run_task( - self, task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]], case: Case[InputT, OutputT] - ) -> EvaluationData[InputT, OutputT]: - """ - Run the task with the inputs from the test case. - - Args: - task: The task to run the test case on. This function should take in InputT and returns either - OutputT or {"output": OutputT, "trajectory": ...}. - case: The test case containing neccessary information to run the task - - Return: - An EvaluationData record containing the input and actual output, name, expected output, and metadata. - """ - if asyncio.iscoroutinefunction(task): - raise ValueError("Async task is not supported. Please use run_evaluations_async instead.") - - evaluation_context = EvaluationData( - name=case.name, - input=case.input, - expected_output=case.expected_output, - expected_trajectory=case.expected_trajectory, - expected_interactions=case.expected_interactions, - expected_environment_state=case.expected_environment_state, - metadata=case.metadata, - ) - task_output = task(case) - if isinstance(task_output, dict): # could be evaluating the trajectory as well - evaluation_context.actual_output = task_output.get("output") - evaluation_context.actual_trajectory = task_output.get("trajectory") - evaluation_context.actual_interactions = task_output.get("interactions") - evaluation_context.actual_environment_state = task_output.get("environment_state") - new_input = task_output.get("input", None) # allows the user to update the input in the task function - if new_input is not None: - evaluation_context.input = new_input - else: # evaluating only the output - evaluation_context.actual_output = task_output - return evaluation_context - async def _run_task_async( self, task: Callable[[Case[InputT, OutputT]], OutputT | dict[str, Any]], case: Case[InputT, OutputT] ) -> EvaluationData[InputT, OutputT]: @@ -284,193 +217,224 @@ async def _worker(self, queue: asyncio.Queue, task: Callable, results: list): trace_id = None try: + with self._tracer.start_as_current_span( + f"execute_case {case_name}", + attributes={ + "gen_ai.evaluation.case.name": case_name, + "gen_ai.evaluation.case.input": serialize(case.input), + }, + ) as case_span: + try: - @retry( - retry=retry_if_exception(is_throttling_error), - stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), - wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), - reraise=True, - ) - async def _run_task_with_retry(task=task, case=case): - return await self._run_task_async(task, case) - - try: - with self._tracer.start_as_current_span( - f"execute_case {case_name}", - ) as case_span: - evaluation_context = await _run_task_with_retry() - case_span.set_attributes( - { - "gen_ai.evaluation.data.input": serialize(evaluation_context.input), - "gen_ai.evaluation.data.expected_output": serialize(evaluation_context.expected_output), - "gen_ai.evaluation.data.actual_output": serialize(evaluation_context.actual_output), - "gen_ai.evaluation.data.has_trajectory": ( - evaluation_context.actual_trajectory is not None - ), - "gen_ai.evaluation.data.has_interactions": ( - evaluation_context.actual_interactions is not None - ), - } + @retry( + retry=retry_if_exception(is_throttling_error), + stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), + wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), + reraise=True, ) - trace_id = format_trace_id(case_span.get_span_context().trace_id) - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception(f"Task execution failed after {_MAX_RETRY_ATTEMPTS} retries") - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for task execution " - f"on case {case_name}. Last error: {str(original_exception)}" - ) - raise original_exception from e - - # Evaluate with each evaluator - evaluator_results = [] - for evaluator in self._evaluators: - - @retry( - retry=retry_if_exception(is_throttling_error), - stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), - wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), - reraise=True, - ) - async def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_context): - outputs = await evaluator.evaluate_async(evaluation_context) - (score, passed, reason) = evaluator.aggregator(outputs) - return outputs, float(score), passed, reason + async def _run_task_with_retry(task=task, case=case): + return await self._run_task_async(task, case) - try: with self._tracer.start_as_current_span( - f"evaluator {evaluator.get_type_name()}", - ) as eval_span: - ( - evaluation_outputs, - aggregate_score, - aggregate_pass, - aggregate_reason, - ) = await _evaluate_with_retry() - + f"task_execution {case_name}", + attributes={ + "gen_ai.evaluation.task.type": "agent_task", + "gen_ai.evaluation.case.name": case_name, + }, + ) as task_span: try: - label = _get_label_from_score(evaluator, aggregate_score) - except Exception: - label = "UNKNOWN" + evaluation_context = await _run_task_with_retry() + except RetryError as e: + # Max retries exceeded + original_exception = e.last_attempt.exception() + if original_exception is None: + original_exception = Exception( + f"Task execution failed after {_MAX_RETRY_ATTEMPTS} retries" + ) + logger.error( + f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for task execution " + f"on case {case_name}. Last error: {str(original_exception)}" + ) + raise original_exception from e - eval_span.set_attributes( + task_span.set_attributes( { - "gen_ai.evaluation.score.label": label, - "gen_ai.evaluation.score.value": str(aggregate_score), - "gen_ai.evaluation.test_pass": aggregate_pass, - "gen_ai.evaluation.explanation": aggregate_reason or "", + "gen_ai.evaluation.data.input": serialize(evaluation_context.input), + "gen_ai.evaluation.data.expected_output": serialize( + evaluation_context.expected_output + ), + "gen_ai.evaluation.data.actual_output": serialize( + evaluation_context.actual_output + ), + "gen_ai.evaluation.data.has_trajectory": ( + evaluation_context.actual_trajectory is not None + ), + "gen_ai.evaluation.data.has_interactions": ( + evaluation_context.actual_interactions is not None + ), } ) + trace_id = format_trace_id(case_span.get_span_context().trace_id) - evaluator_results.append( - { - "evaluator_name": evaluator.get_type_name(), - "test_pass": aggregate_pass, - "score": aggregate_score, - "reason": aggregate_reason or "", - "detailed_results": evaluation_outputs, - } + # Evaluate with each evaluator + evaluator_results = [] + for evaluator in self._evaluators: + + @retry( + retry=retry_if_exception(is_throttling_error), + stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), + wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), + reraise=True, ) + async def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_context): + outputs = await evaluator.evaluate_async(evaluation_context) + (score, passed, reason) = evaluator.aggregator(outputs) + return outputs, float(score), passed, reason - # CloudWatch logging for this evaluator try: - evaluator_full_name = f"Custom.{evaluator.get_type_name()}" - region = os.environ.get("AWS_REGION", "us-east-1") - _config_arn = ( - f"arn:aws:strands:{region}::strands-evaluation-empty-config/{self._config_id}" - ) - _evaluator_arn = f"arn:aws:strands-evals:::evaluator/{evaluator_full_name}" - - log_data = { - "gen_ai.evaluation.name": evaluator_full_name, - "gen_ai.evaluation.score.value": str(aggregate_score), - "gen_ai.evaluation.explanation": aggregate_reason or "", - "gen_ai.evaluation.score.label": label, - "gen_ai.response.id": trace_id, - "aws.bedrock_agentcore.evaluator.rating_scale": "Numerical", - "aws.bedrock_agentcore.evaluation_level": evaluator.evaluation_level or "Trace", - "event.name": "gen_ai.evaluation.result", - "aws.bedrock_agentcore.online_evaluation_config.arn": _config_arn, - "aws.bedrock_agentcore.online_evaluation_config.name": "strands-local-evaluation", - "aws.bedrock_agentcore.evaluator.arn": _evaluator_arn, - "session.id": case.session_id, - } + with self._tracer.start_as_current_span( + f"evaluator {evaluator.get_type_name()}", + attributes={ + "gen_ai.evaluation.name": evaluator.get_type_name(), + "gen_ai.evaluation.case.name": case_name, + }, + ) as eval_span: + ( + evaluation_outputs, + aggregate_score, + aggregate_pass, + aggregate_reason, + ) = await _evaluate_with_retry() + + try: + label = _get_label_from_score(evaluator, aggregate_score) + except Exception: + label = "UNKNOWN" + + eval_span.set_attributes( + { + "gen_ai.evaluation.score.label": label, + "gen_ai.evaluation.score.value": str(aggregate_score), + "gen_ai.evaluation.test_pass": aggregate_pass, + "gen_ai.evaluation.explanation": aggregate_reason or "", + } + ) - agent_observability_enabled = os.environ.get("AGENT_OBSERVABILITY_ENABLED", "") - if agent_observability_enabled: - _send_to_cloudwatch( - message="gen_ai.evaluation.result", - log_data=log_data, - trace_id=trace_id, - evaluator_name=evaluator_full_name, - score=cast(float, aggregate_score), - config_id=self._config_id, - label=label, + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": aggregate_pass, + "score": aggregate_score, + "reason": aggregate_reason or "", + "detailed_results": evaluation_outputs, + } ) + + # CloudWatch logging for this evaluator + try: + evaluator_full_name = f"Custom.{evaluator.get_type_name()}" + region = os.environ.get("AWS_REGION", "us-east-1") + _config_arn = ( + f"arn:aws:strands:{region}::strands-evaluation-empty-config/" + f"{self._config_id}" + ) + _evaluator_arn = f"arn:aws:strands-evals:::evaluator/{evaluator_full_name}" + + log_data = { + "gen_ai.evaluation.name": evaluator_full_name, + "gen_ai.evaluation.score.value": str(aggregate_score), + "gen_ai.evaluation.explanation": aggregate_reason or "", + "gen_ai.evaluation.score.label": label, + "gen_ai.response.id": trace_id, + "aws.bedrock_agentcore.evaluator.rating_scale": "Numerical", + "aws.bedrock_agentcore.evaluation_level": ( + evaluator.evaluation_level or "Trace" + ), + "event.name": "gen_ai.evaluation.result", + "aws.bedrock_agentcore.online_evaluation_config.arn": _config_arn, + "aws.bedrock_agentcore.online_evaluation_config.name": ( + "strands-local-evaluation" + ), + "aws.bedrock_agentcore.evaluator.arn": _evaluator_arn, + "session.id": case.session_id, + } + + agent_observability_enabled = os.environ.get("AGENT_OBSERVABILITY_ENABLED", "") + if agent_observability_enabled: + _send_to_cloudwatch( + message="gen_ai.evaluation.result", + log_data=log_data, + trace_id=trace_id, + evaluator_name=evaluator_full_name, + score=cast(float, aggregate_score), + config_id=self._config_id, + label=label, + ) + except Exception as e: + logger.debug(f"Skipping CloudWatch logging: {str(e)}") + + except RetryError as e: + # Max retries exceeded + original_exception = e.last_attempt.exception() + if original_exception is None: + original_exception = Exception( + f"Evaluator {evaluator.get_type_name()} failed after " + f"{_MAX_RETRY_ATTEMPTS} retries" + ) + logger.error( + f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for evaluator " + f"{evaluator.get_type_name()} on case {case_name}. " + f"Last error: {str(original_exception)}" + ) + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": False, + "score": 0, + "reason": f"Evaluator error: {str(original_exception)}", + "detailed_results": [], + } + ) except Exception as e: - logger.debug(f"Skipping CloudWatch logging: {str(e)}") - - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception( - f"Evaluator {evaluator.get_type_name()} failed after {_MAX_RETRY_ATTEMPTS} retries" - ) - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for evaluator " - f"{evaluator.get_type_name()} on case {case_name}. Last error: {str(original_exception)}" - ) - evaluator_results.append( + # Catch non-throttling errors and record as failure (error isolation) + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": False, + "score": 0, + "reason": f"Evaluator error: {str(e)}", + "detailed_results": [], + } + ) + + # Store results + results.append( { - "evaluator_name": evaluator.get_type_name(), - "test_pass": False, - "score": 0, - "reason": f"Evaluator error: {str(original_exception)}", - "detailed_results": [], + "case": evaluation_context.model_dump(), + "evaluator_results": evaluator_results, } ) + except Exception as e: - # Catch non-throttling errors and record as failure (error isolation) - evaluator_results.append( + case_span.record_exception(e) + # Handle task execution errors + evaluator_results = [] + for evaluator in self._evaluators: + evaluator_results.append( + { + "evaluator_name": evaluator.get_type_name(), + "test_pass": False, + "score": 0, + "reason": f"An error occurred: {str(e)}", + "detailed_results": [], + } + ) + results.append( { - "evaluator_name": evaluator.get_type_name(), - "test_pass": False, - "score": 0, - "reason": f"Evaluator error: {str(e)}", - "detailed_results": [], + "case": case.model_dump(), + "evaluator_results": evaluator_results, } ) - - # Store results - results.append( - { - "case": evaluation_context.model_dump(), - "evaluator_results": evaluator_results, - } - ) - - except Exception as e: - # Handle task execution errors - evaluator_results = [] - for evaluator in self._evaluators: - evaluator_results.append( - { - "evaluator_name": evaluator.get_type_name(), - "test_pass": False, - "score": 0, - "reason": f"An error occurred: {str(e)}", - "detailed_results": [], - } - ) - results.append( - { - "case": case.model_dump(), - "evaluator_results": evaluator_results, - } - ) finally: queue.task_done() @@ -480,6 +444,8 @@ def run_evaluations( """ Run the evaluations for all of the test cases with all evaluators. + Delegates to run_evaluations_async with max_workers=1 for sequential execution. + Args: task: The task to run the test case on. This function should take in InputT and returns either OutputT or {"output": OutputT, "trajectory": ...}. @@ -488,187 +454,10 @@ def run_evaluations( A list of EvaluationReport objects, one for each evaluator, containing the overall score, individual case results, and basic feedback for each test case. """ - evaluator_data: dict[str, dict[str, list]] = { - evaluator.get_type_name(): { - "scores": [], - "test_passes": [], - "cases": [], - "reasons": [], - "detailed_results": [], - } - for evaluator in self._evaluators - } - - for case in self._cases: - case_name = case.name or f"case_{len(evaluator_data[self._evaluators[0].get_type_name()]['cases'])}" - - with self._tracer.start_as_current_span( - f"eval_case {case_name}", - attributes={ - "gen_ai.evaluation.case.name": case_name, - "gen_ai.evaluation.case.input": serialize(case.input), - }, - ) as case_span: - # Task execution with retry logic - @retry( - retry=retry_if_exception(is_throttling_error), - stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), - wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), - reraise=True, - ) - def _run_task_with_retry(task=task, case=case): - return self._run_task(task, case) - - try: - with self._tracer.start_as_current_span( - "task_execution", - attributes={ - "gen_ai.evaluation.task.type": "agent_task", - "gen_ai.evaluation.case.name": case_name, - }, - ) as task_span: - evaluation_context = _run_task_with_retry() - task_span.set_attributes( - { - "gen_ai.evaluation.data.input": serialize(evaluation_context.input), - "gen_ai.evaluation.data.expected_output": serialize(evaluation_context.expected_output), - "gen_ai.evaluation.data.actual_output": serialize(evaluation_context.actual_output), - "gen_ai.evaluation.data.has_trajectory": ( - evaluation_context.actual_trajectory is not None - ), - "gen_ai.evaluation.data.has_interactions": ( - evaluation_context.actual_interactions is not None - ), - } - ) - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception(f"Task execution failed after {_MAX_RETRY_ATTEMPTS} retries") - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for task execution " - f"on case {case_name}. Last error: {str(original_exception)}" - ) - case_span.record_exception(original_exception) - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=case.model_dump(), - test_pass=False, - score=0, - reason=f"Task execution error: {str(original_exception)}", - detailed_results=[], - ) - continue - except Exception as e: - case_span.record_exception(e) - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=case.model_dump(), - test_pass=False, - score=0, - reason=f"Task execution error: {str(e)}", - detailed_results=[], - ) - continue - - # Evaluate with each evaluator using the same task output - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - - # Evaluator execution with retry logic - @retry( - retry=retry_if_exception(is_throttling_error), - stop=stop_after_attempt(_MAX_RETRY_ATTEMPTS), - wait=wait_exponential(multiplier=_INITIAL_RETRY_DELAY, max=_MAX_RETRY_DELAY), - reraise=True, - ) - def _evaluate_with_retry(evaluator=evaluator, evaluation_context=evaluation_context): - outputs = evaluator.evaluate(evaluation_context) - (score, passed, reason) = evaluator.aggregator(outputs) - return outputs, float(score), passed, reason - - try: - with self._tracer.start_as_current_span( - f"evaluator {evaluator.get_type_name()}", - attributes={ - "gen_ai.evaluation.name": evaluator.get_type_name(), - "gen_ai.evaluation.case.name": case_name, - }, - ) as eval_span: - evaluation_outputs, aggregate_score, aggregate_pass, aggregate_reason = ( - _evaluate_with_retry() - ) - eval_span.set_attributes( - { - "gen_ai.evaluation.score.value": aggregate_score, - "gen_ai.evaluation.test_pass": aggregate_pass, - "gen_ai.evaluation.explanation": aggregate_reason or "", - } - ) - - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=evaluation_context.model_dump(), - test_pass=aggregate_pass, - score=aggregate_score, - reason=aggregate_reason or "", - detailed_results=evaluation_outputs, - ) - except RetryError as e: - # Max retries exceeded - original_exception = e.last_attempt.exception() - if original_exception is None: - original_exception = Exception( - f"Evaluator {evaluator.get_type_name()} failed after {_MAX_RETRY_ATTEMPTS} retries" - ) - logger.error( - f"Max retry attempts ({_MAX_RETRY_ATTEMPTS}) exceeded for evaluator " - f"{evaluator.get_type_name()} on case {case_name}. Last error: {str(original_exception)}" - ) - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=evaluation_context.model_dump(), - test_pass=False, - score=0, - reason=f"Evaluator error: {str(original_exception)}", - detailed_results=[], - ) - except Exception as e: - self._record_evaluator_result( - evaluator_data=evaluator_data, - eval_name=eval_name, - case_data=evaluation_context.model_dump(), - test_pass=False, - score=0, - reason=f"Evaluator error: {str(e)}", - detailed_results=[], - ) - - reports = [] - for evaluator in self._evaluators: - eval_name = evaluator.get_type_name() - data = evaluator_data[eval_name] - report = EvaluationReport( - evaluator_name=eval_name, - overall_score=sum(data["scores"]) / len(data["scores"]) if len(data["scores"]) else 0, - scores=data["scores"], - test_passes=data["test_passes"], - cases=data["cases"], - reasons=data["reasons"], - detailed_results=data["detailed_results"], - ) - reports.append(report) + if asyncio.iscoroutinefunction(task): + raise ValueError("Async task is not supported. Please use run_evaluations_async instead.") - return reports + return asyncio.run(self.run_evaluations_async(task, max_workers=1)) async def run_evaluations_async(self, task: Callable, max_workers: int = 10) -> list[EvaluationReport]: """ diff --git a/tests/strands_evals/evaluators/test_evaluator.py b/tests/strands_evals/evaluators/test_evaluator.py index f5af47f..09f9c65 100644 --- a/tests/strands_evals/evaluators/test_evaluator.py +++ b/tests/strands_evals/evaluators/test_evaluator.py @@ -22,9 +22,6 @@ def evaluate(self, evaluation_case: EvaluationData[str, str]) -> EvaluationOutpu score = 1.0 if evaluation_case.actual_output == evaluation_case.expected_output else 0.0 return EvaluationOutput(score=score, test_pass=score > 0.5, reason="Test evaluation") - async def evaluate_async(self, evaluation_case: EvaluationData[str, str]) -> EvaluationOutput: - return self.evaluate(evaluation_case) - @pytest.fixture def evaluation_data(): @@ -40,18 +37,29 @@ def test_evaluator_not_implemented_evaluate(evaluation_data): @pytest.mark.asyncio -async def test_evaluator_not_implemented_evaluate_async(evaluation_data): - """Test that base Evaluator raises NotImplementedError for evaluate_async""" +async def test_evaluator_evaluate_async_delegates_to_evaluate(evaluation_data): + """Test that base evaluate_async delegates to evaluate via asyncio.to_thread""" evaluator = Evaluator[str, str]() + # Base evaluate() raises NotImplementedError, which should propagate through evaluate_async with pytest.raises( NotImplementedError, - match="This method should be implemented in subclasses," - " especially if you want to run evaluations asynchronously.", + match="This method should be implemented in subclasses.", ): await evaluator.evaluate_async(evaluation_data) +@pytest.mark.asyncio +async def test_evaluator_evaluate_async_fallback_calls_evaluate(evaluation_data): + """Test that evaluate_async correctly calls evaluate for subclasses that only implement evaluate""" + evaluator = SimpleEvaluator() + + # SimpleEvaluator only needs evaluate() — evaluate_async should delegate to it + result = await evaluator.evaluate_async(evaluation_data) + assert result.score == 0.0 + assert result.test_pass is False + + def test_evaluator_custom_implementation(evaluation_data): """Test that simple implementation works""" evaluator = SimpleEvaluator() diff --git a/tests/strands_evals/test_experiment.py b/tests/strands_evals/test_experiment.py index f1e52fc..df0ed82 100644 --- a/tests/strands_evals/test_experiment.py +++ b/tests/strands_evals/test_experiment.py @@ -28,12 +28,6 @@ def evaluate(self, evaluation_case: EvaluationData[str, str]) -> list[Evaluation score = 1.0 if evaluation_case.actual_output == evaluation_case.expected_output else 0.0 return [EvaluationOutput(score=score, test_pass=score > 0.5, reason="Mock evaluation")] - async def evaluate_async(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]: - # Add a small delay to simulate async processing - await asyncio.sleep(0.01) - score = 1.0 if evaluation_case.actual_output == evaluation_case.expected_output else 0.0 - return [EvaluationOutput(score=score, test_pass=score > 0.5, reason="Async test evaluation")] - class MockEvaluator2(Evaluator[str, str]): """Second mock evaluator that always returns 0.5 for distinguishable results""" @@ -41,10 +35,6 @@ class MockEvaluator2(Evaluator[str, str]): def evaluate(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]: return [EvaluationOutput(score=0.5, test_pass=True, reason="Mock evaluation 2")] - async def evaluate_async(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]: - await asyncio.sleep(0.01) - return [EvaluationOutput(score=0.5, test_pass=True, reason="Async test evaluation 2")] - class ThrowingEvaluator(Evaluator[str, str]): """Evaluator that always throws an exception - used to test error isolation""" @@ -52,9 +42,6 @@ class ThrowingEvaluator(Evaluator[str, str]): def evaluate(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]: raise RuntimeError("Evaluator exploded") - async def evaluate_async(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]: - raise RuntimeError("Async evaluator exploded") - @pytest.fixture def mock_evaluator(): @@ -167,79 +154,6 @@ def test_experiment_evaluators_setter(): assert experiment.evaluators == [eval2, eval3] -def test_experiment__run_task_simple_output(mock_evaluator): - """Test _run_task with simple output""" - case = Case(name="test", input="hello", expected_output="world") - experiment = Experiment(cases=[case], evaluators=[mock_evaluator]) - - def simple_task(c): - return f"response to {c.input}" - - result = experiment._run_task(simple_task, case) - - assert result.input == "hello" - assert result.actual_output == "response to hello" - assert result.expected_output == "world" - assert result.name == "test" - assert result.expected_trajectory is None - assert result.actual_trajectory is None - assert result.metadata is None - assert result.actual_interactions is None - assert result.expected_interactions is None - - -def test_experiment__run_task_dict_output(mock_evaluator): - """Test _run_task with dictionary output containing trajectory""" - case = Case(name="test", input="hello", expected_output="world") - experiment = Experiment(cases=[case], evaluators=[mock_evaluator]) - - def dict_task(c): - return {"output": f"response to {c.input}", "trajectory": ["step1", "step2"]} - - result = experiment._run_task(dict_task, case) - - assert result.actual_output == "response to hello" - assert result.actual_trajectory == ["step1", "step2"] - - -def test_experiment_run_task_dict_output_with_interactions(mock_evaluator): - """Test _run_task with dictionary output containing interactions""" - interactions = [{"node_name": "agent1", "dependencies": [], "messages": ["hello"]}] - case = Case(name="test", input="hello", expected_output="world", expected_interactions=interactions) - experiment = Experiment(cases=[case], evaluators=[mock_evaluator]) - - def dict_task(c): - return { - "output": f"response to {c.input}", - "trajectory": ["step1", "step2"], - "interactions": interactions, - } - - result = experiment._run_task(dict_task, case) - - assert result.actual_output == "response to hello" - assert result.actual_trajectory == ["step1", "step2"] - assert result.actual_interactions == interactions - assert result.expected_output == "world" - assert result.expected_trajectory is None - assert result.expected_interactions == interactions - - -def test_experiment__run_task_dict_output_with_input_update(mock_evaluator): - """Test _run_task with dictionary output containing updated input""" - case = Case(name="test", input="original_input", expected_output="world") - experiment = Experiment(cases=[case], evaluators=[mock_evaluator]) - - def task_with_input_update(c): - return {"output": f"response to {c.input}", "input": "updated_input", "trajectory": ["step1"]} - - result = experiment._run_task(task_with_input_update, case) - - assert result.input == "updated_input" - assert result.actual_output == "response to original_input" - assert result.actual_trajectory == ["step1"] - - @pytest.mark.asyncio async def test_experiment__run_task_async_with_input_update(): """Test _run_task_async with dictionary output containing updated input""" @@ -255,8 +169,8 @@ def task_with_input_update(c): assert result.actual_output == "response to original_input" -def test_experiment__run_task_async_function_raises_error(mock_evaluator): - """Test _run_task raises ValueError when async task is passed""" +def test_experiment_run_evaluations_async_function_raises_error(mock_evaluator): + """Test run_evaluations raises ValueError when async task is passed""" case = Case(name="test", input="hello", expected_output="world") experiment = Experiment(cases=[case], evaluators=[mock_evaluator]) @@ -264,7 +178,7 @@ async def async_task(c): return f"response to {c.input}" with pytest.raises(ValueError, match="Async task is not supported. Please use run_evaluations_async instead."): - experiment._run_task(async_task, case) + experiment.run_evaluations(async_task) @pytest.mark.asyncio @@ -989,11 +903,12 @@ def test_experiment_run_evaluations_with_unnamed_case(mock_span, simple_task): experiment = Experiment(cases=[case], evaluators=[MockEvaluator()]) with patch.object(experiment._tracer, "start_as_current_span", return_value=mock_span): - reports = experiment.run_evaluations(simple_task) + with patch("strands_evals.experiment.format_trace_id", return_value="mock_trace_id"): + reports = experiment.run_evaluations(simple_task) - # Should complete successfully - assert len(reports) == 1 - assert reports[0].scores[0] == 1.0 + # Should complete successfully + assert len(reports) == 1 + assert reports[0].scores[0] == 1.0 @pytest.mark.asyncio @@ -1010,13 +925,73 @@ async def async_task(c): await experiment.run_evaluations_async(async_task) - # Verify both execute_case and evaluator spans were created + # Verify execute_case, task_execution, and evaluator spans were created calls = mock_start_span.call_args_list - assert len(calls) == 2 - execute_case_span_call = calls[0] - evaluator_span_call = calls[1] - assert execute_case_span_call[0][0] == "execute_case async_test" - assert evaluator_span_call[0][0] == "evaluator MockEvaluator" + assert len(calls) == 3 + + # execute_case span has case.name and case.input attributes + assert calls[0][0][0] == "execute_case async_test" + assert calls[0][1]["attributes"]["gen_ai.evaluation.case.name"] == "async_test" + assert calls[0][1]["attributes"]["gen_ai.evaluation.case.input"] == '"hello"' + + # task_execution span has task.type and case.name attributes + assert calls[1][0][0] == "task_execution async_test" + assert calls[1][1]["attributes"]["gen_ai.evaluation.task.type"] == "agent_task" + assert calls[1][1]["attributes"]["gen_ai.evaluation.case.name"] == "async_test" + + # evaluator span has evaluation.name and case.name attributes + assert calls[2][0][0] == "evaluator MockEvaluator" + assert calls[2][1]["attributes"]["gen_ai.evaluation.name"] == "MockEvaluator" + assert calls[2][1]["attributes"]["gen_ai.evaluation.case.name"] == "async_test" + + +@pytest.mark.asyncio +async def test_experiment_run_evaluations_async_data_attrs_on_task_span(): + """Test that data attributes (input, expected_output, etc.) are set on the task_execution span, not execute_case.""" + case = Case(name="test", input="hello", expected_output="hello") + experiment = Experiment(cases=[case], evaluators=[MockEvaluator()]) + + # Create distinct mock spans so we can tell which span gets which set_attributes call + case_span = MagicMock() + case_span.__enter__ = MagicMock(return_value=case_span) + case_span.__exit__ = MagicMock(return_value=False) + task_span = MagicMock() + task_span.__enter__ = MagicMock(return_value=task_span) + task_span.__exit__ = MagicMock(return_value=False) + eval_span = MagicMock() + eval_span.__enter__ = MagicMock(return_value=eval_span) + eval_span.__exit__ = MagicMock(return_value=False) + + spans = [case_span, task_span, eval_span] + span_index = 0 + + def fake_start_span(name, **kwargs): + nonlocal span_index + span = spans[span_index] + span_index += 1 + return span + + with patch.object(experiment._tracer, "start_as_current_span", side_effect=fake_start_span): + with patch("strands_evals.experiment.format_trace_id", return_value="mock_trace_id"): + + async def async_task(c): + return c.input + + await experiment.run_evaluations_async(async_task) + + # data attributes should be on task_span, NOT case_span + task_span.set_attributes.assert_called_once() + data_attrs = task_span.set_attributes.call_args[0][0] + assert "gen_ai.evaluation.data.input" in data_attrs + assert "gen_ai.evaluation.data.expected_output" in data_attrs + assert "gen_ai.evaluation.data.actual_output" in data_attrs + assert "gen_ai.evaluation.data.has_trajectory" in data_attrs + assert "gen_ai.evaluation.data.has_interactions" in data_attrs + + # case_span should NOT have data attributes set via set_attributes + for call in case_span.set_attributes.call_args_list: + attrs = call[0][0] + assert "gen_ai.evaluation.data.input" not in attrs @pytest.mark.asyncio @@ -1078,11 +1053,12 @@ def test_experiment_run_evaluations_multiple_cases(mock_span, simple_task): experiment = Experiment(cases=cases, evaluators=[MockEvaluator()]) with patch.object(experiment._tracer, "start_as_current_span", return_value=mock_span): - reports = experiment.run_evaluations(simple_task) + with patch("strands_evals.experiment.format_trace_id", return_value="mock_trace_id"): + reports = experiment.run_evaluations(simple_task) - assert len(reports) == 1 - assert len(reports[0].scores) == 2 - assert all(score == 1.0 for score in reports[0].scores) + assert len(reports) == 1 + assert len(reports[0].scores) == 2 + assert all(score == 1.0 for score in reports[0].scores) def test_experiment_run_evaluations_evaluator_error_isolated(): @@ -1215,7 +1191,7 @@ def always_throttling_task(c): assert len(reports) == 1 assert reports[0].scores[0] == 0 assert reports[0].test_passes[0] is False - assert "Task execution error" in reports[0].reasons[0] + assert "An error occurred" in reports[0].reasons[0] def test_experiment_run_evaluations_no_retry_on_non_throttling(): @@ -1243,7 +1219,7 @@ def test_experiment_run_evaluations_exponential_backoff(): """Test that run_evaluations uses exponential backoff for retries""" sleep_delays = [] - def mock_sleep(delay): + async def mock_async_sleep(delay): sleep_delays.append(delay) call_count = 0 @@ -1258,7 +1234,7 @@ def throttling_task(c): case = Case(name="test", input="hello", expected_output="hello") experiment = Experiment(cases=[case], evaluators=[MockEvaluator()]) - with patch("time.sleep", mock_sleep): + with patch("asyncio.sleep", mock_async_sleep): with patch("strands_evals.experiment._INITIAL_RETRY_DELAY", 1): with patch("strands_evals.experiment._MAX_RETRY_DELAY", 10): experiment.run_evaluations(throttling_task) @@ -1401,7 +1377,7 @@ async def throttling_task(c): with patch("strands_evals.experiment._MAX_RETRY_DELAY", 10): await experiment.run_evaluations_async(throttling_task, max_workers=1) - # Filter out MockEvaluator's sleep calls (0.01) and verify exponential backoff: 1, 2, 4 + # Verify exponential backoff: 1, 2, 4 retry_delays = [d for d in sleep_delays if d >= 1] assert len(retry_delays) == 3 assert retry_delays[0] == 1 diff --git a/tests/test_integration.py b/tests/test_integration.py index e3e5a16..7d13a26 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import Mock, patch +from unittest.mock import AsyncMock, Mock, patch import pytest @@ -15,13 +15,6 @@ def evaluate(self, evaluation_case: EvaluationData[str, str]) -> list[Evaluation score = 1.0 if evaluation_case.actual_output == evaluation_case.expected_output else 0.0 return [EvaluationOutput(score=score, test_pass=score > 0.5, reason="Integration test")] - async def evaluate_async(self, evaluation_case: EvaluationData[str, str]) -> list[EvaluationOutput]: - """Async version of evaluate""" - # Add a small delay to simulate async processing - await asyncio.sleep(0.01) - score = 1.0 if evaluation_case.actual_output == evaluation_case.expected_output else 0.0 - return [EvaluationOutput(score=score, test_pass=score > 0.5, reason="Async integration test")] - @pytest.fixture def cases(): @@ -52,27 +45,12 @@ def interaction_case(): @pytest.fixture def mock_agent(): - """Mock agent for evaluators using agent() call with structured_output_model""" + """Mock agent for evaluators supporting both sync and async call patterns""" agent = Mock() mock_result = Mock() mock_result.structured_output = EvaluationOutput(score=mock_score, test_pass=True, reason="LLM evaluation") agent.return_value = mock_result - return agent - - -@pytest.fixture -def mock_async_agent(): - """Mock agent for async evaluators""" - agent = Mock() - - async def mock_invoke_async(*args, **kwargs): - mock_result = Mock() - mock_result.structured_output = EvaluationOutput( - score=mock_score, test_pass=True, reason="Async LLM evaluation" - ) - return mock_result - - agent.invoke_async = mock_invoke_async + agent.invoke_async = AsyncMock(return_value=mock_result) return agent @@ -135,7 +113,7 @@ def simple_task(case): reports = experiment.run_evaluations(simple_task) # Verify LLM evaluator was called for each test case - assert mock_agent.call_count == 3 + assert mock_agent.invoke_async.call_count == 3 assert len(reports) == 1 report = reports[0] assert len(report.scores) == 3 @@ -226,11 +204,9 @@ def echo_task(case): assert len(reports) == 1 report = reports[0] assert len(report.scores) == 3 - assert report.scores[0] == 1.0 # exact match - assert report.scores[1] == 0.0 # no match - assert report.scores[2] == 0.0 # partial no match + assert sorted(report.scores, reverse=True) == [1.0, 0.0, 0.0] assert report.overall_score == 1.0 / 3 - assert report.test_passes == [True, False, False] + assert sum(report.test_passes) == 1 assert len(report.cases) == 3 @@ -259,9 +235,9 @@ async def async_echo_task(case): @pytest.mark.asyncio @patch("strands_evals.evaluators.output_evaluator.Agent") -async def test_integration_async_dataset_with_output_evaluator(mock_agent_class, cases, mock_async_agent): +async def test_integration_async_dataset_with_output_evaluator(mock_agent_class, cases, mock_agent): """Test async Experiment with OutputEvaluator integration""" - mock_agent_class.return_value = mock_async_agent + mock_agent_class.return_value = mock_agent output_evaluator = OutputEvaluator(rubric="Test if outputs match exactly") experiment = Experiment(cases=cases, evaluators=[output_evaluator]) @@ -327,7 +303,7 @@ def task_with_interactions(case): reports = experiment.run_evaluations(task_with_interactions) # Verify the evaluator was called (once per interaction, so 2 times) - assert mock_agent.call_count == 2 + assert mock_agent.invoke_async.call_count == 2 assert len(reports) == 1 report = reports[0] assert len(report.scores) == 1