diff --git a/pyproject.toml b/pyproject.toml index b31a3e7..0e26d63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,11 +14,11 @@ authors = [ ] dependencies = [ - "pydantic>=2.0.0,<3.0.0", + "pydantic>=2.4.0,<3.0.0", "rich>=14.0.0,<15.0.0", "strands-agents>=1.0.0", "strands-agents-tools>=0.1.0,<1.0.0", - "typing-extensions>=4.0", + "typing-extensions>=4.13.2,<5.0.0", "opentelemetry-api>=1.20.0", "opentelemetry-sdk>=1.20.0", "opentelemetry-instrumentation-threading>=0.51b0,<1.00b0", @@ -44,7 +44,7 @@ dev = [ "ruff>=0.13.0,<0.15.0", ] -langfuse = ["langfuse>=2.0.0,<3"] +langfuse = ["langfuse>=2.0.0,<5"] otel = ["opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0"] langchain = [ "langchain>=0.3.0", diff --git a/src/strands_evals/providers/langfuse_provider.py b/src/strands_evals/providers/langfuse_provider.py index 73a2c45..c1e8d74 100644 --- a/src/strands_evals/providers/langfuse_provider.py +++ b/src/strands_evals/providers/langfuse_provider.py @@ -128,15 +128,33 @@ def get_evaluation_data(self, session_id: str) -> TaskOutput: # --- Internal: fetching --- def _fetch_all_pages(self, fetch_fn: Callable[..., Any], **kwargs: Any) -> list[Any]: - """Fetch all pages from a paginated Langfuse API endpoint.""" + """Fetch all items from a paginated Langfuse API endpoint. + + Supports both page-based (v2/v3: ``meta.total_pages``) and cursor-based + (v4+: ``meta.cursor``) pagination styles. + """ all_items: list = [] - page = 1 - while True: - response = self._call_with_retry(fetch_fn, page=page, limit=_PAGE_SIZE, **kwargs) - all_items.extend(response.data) - if page >= response.meta.total_pages: - break - page += 1 + first_response = self._call_with_retry(fetch_fn, limit=_PAGE_SIZE, **kwargs) + all_items.extend(first_response.data) + meta = first_response.meta + + if hasattr(meta, "total_pages"): + # Page-based pagination (Langfuse v2/v3) + page = 1 + while page < meta.total_pages: + page += 1 + response = self._call_with_retry(fetch_fn, page=page, limit=_PAGE_SIZE, **kwargs) + all_items.extend(response.data) + elif hasattr(meta, "cursor"): + # Cursor-based pagination (Langfuse v4+) + cursor = meta.cursor + while cursor and first_response.data: + response = self._call_with_retry(fetch_fn, limit=_PAGE_SIZE, cursor=cursor, **kwargs) + if not response.data: + break + all_items.extend(response.data) + cursor = getattr(response.meta, "cursor", None) + return all_items def _call_with_retry(self, fn: Callable[..., Any], **kwargs: Any) -> Any: @@ -156,7 +174,16 @@ def _fetch_traces_for_session(self, session_id: str) -> list: return self._fetch_all_pages(self._client.api.trace.list, session_id=session_id) def _fetch_observations(self, trace_id: str) -> list[Any]: - """Fetch all observations for a trace, handling pagination.""" + """Fetch all observations for a trace, handling pagination. + + Uses ``legacy.observations_v1.get_many`` when available (Langfuse v4+) + because the v2 observations endpoint returns lightweight objects without + ``input``/``output``/``name`` fields. Falls back to ``observations.get_many`` + for older SDK versions. + """ + legacy = getattr(self._client.api, "legacy", None) + if legacy and hasattr(legacy, "observations_v1"): + return self._fetch_all_pages(legacy.observations_v1.get_many, trace_id=trace_id) return self._fetch_all_pages(self._client.api.observations.get_many, trace_id=trace_id) # --- Internal: building Session --- @@ -195,12 +222,19 @@ def _convert_observations(self, observations: list[Any], session_id: str) -> lis def _convert_observation(self, obs: Any, session_id: str) -> Any: """Route a single Langfuse observation to the appropriate span converter. - Langfuse observation fields used for routing: - obs.type: str — "GENERATION" | "SPAN" | "EVENT" | ... - obs.name: str — e.g. "execute_tool calc", "invoke_agent my_agent", "chat" + Langfuse normalizes traces from ALL frameworks into its own Observation + format. The ``obs.type`` field is universal across frameworks: + + - ``GENERATION`` — LLM call (LangChain, Strands, LlamaIndex, etc.) + - ``TOOL`` — Tool invocation (LangChain sends these) + - ``CHAIN`` — Orchestration/agent (root chain = agent invocation) + - ``SPAN`` — Strands-specific spans (fallback by ``obs.name``) Routing: obs.type == "GENERATION" → InferenceSpan + obs.type == "TOOL" → ToolExecutionSpan + obs.type == "CHAIN" and no parent → AgentInvocationSpan + obs.type == "AGENT" and no parent → AgentInvocationSpan (Langfuse v4+) obs.type == "SPAN", name starts "execute_tool" → ToolExecutionSpan obs.type == "SPAN", name starts "invoke_agent" → AgentInvocationSpan Otherwise → None (skipped) @@ -210,17 +244,21 @@ def _convert_observation(self, obs: Any, session_id: str) -> Any: if obs_type == "GENERATION": return self._convert_generation(obs, session_id) - if obs_type != "SPAN": - logger.debug("Skipping observation with type: %s", obs_type) - return None - - obs_name = obs.name or "" - if obs_name.startswith("execute_tool"): + if obs_type == "TOOL": return self._convert_tool_execution(obs, session_id) - if obs_name.startswith("invoke_agent"): + + if obs_type in ("CHAIN", "AGENT") and obs.parent_observation_id is None: return self._convert_agent_invocation(obs, session_id) - logger.debug("Skipping SPAN with unrecognized name: %s", obs_name) + # Strands-specific fallback for SPAN type + if obs_type == "SPAN": + obs_name = obs.name or "" + if obs_name.startswith("execute_tool"): + return self._convert_tool_execution(obs, session_id) + if obs_name.startswith("invoke_agent"): + return self._convert_agent_invocation(obs, session_id) + + logger.debug("Skipping observation: type=%s, name=%s", obs_type, obs.name) return None def _create_span_info(self, obs: Any, session_id: str) -> SpanInfo: @@ -314,6 +352,18 @@ def _convert_message(self, msg: dict) -> UserMessage | AssistantMessage | None: if role == "assistant": assistant_content = self._parse_assistant_content(content_data) + # LangChain format: tool_calls as a separate field + tool_calls = msg.get("tool_calls") + if isinstance(tool_calls, list): + for tc in tool_calls: + if isinstance(tc, dict) and "name" in tc: + assistant_content.append( + ToolCallContent( + name=tc["name"], + arguments=tc.get("args") or tc.get("input") or {}, + tool_call_id=tc.get("id"), + ) + ) return AssistantMessage(content=assistant_content) if assistant_content else None elif role == "user": user_content = self._parse_user_content(content_data) @@ -412,26 +462,44 @@ def _parse_tool_result_content(self, content_data: list) -> list[TextContent | T return result def _convert_tool_execution(self, obs: Any, session_id: str) -> ToolExecutionSpan: - """Convert an execute_tool SPAN observation to a ToolExecutionSpan. - - Langfuse observation (obs.type == "SPAN", obs.name starts with "execute_tool"): - obs.input: dict — tool call details - {"name": "calc", "arguments": {"x": "2+2"}, "toolUseId": "tooluse_abc123"} - obs.output: str | dict — tool execution result - str: "42" - dict: {"result": "4", "status": "success"} - obs.metadata: dict | None + """Convert a tool observation to a ToolExecutionSpan. - Returns: - ToolExecutionSpan with tool_call and tool_result populated from the above. + Handles two formats: + + **Strands** (obs.type == "SPAN", name starts with "execute_tool"): + obs.input: ``{"name": "calc", "arguments": {"x": "2+2"}, "toolUseId": "..."}`` + obs.output: ``"42"`` or ``{"result": "4", "status": "success"}`` + + **LangChain / universal** (obs.type == "TOOL"): + obs.name: tool name (e.g. ``"add_numbers"``) + obs.input: tool arguments (dict or other) + obs.output: tool result """ span_info = self._create_span_info(obs, session_id) obs_input = obs.input or {} - if isinstance(obs_input, dict): + if isinstance(obs_input, dict) and "name" in obs_input: + # Strands format: input carries name/arguments/toolUseId tool_name = obs_input.get("name", "") tool_arguments = obs_input.get("arguments", {}) tool_call_id = obs_input.get("toolUseId") + elif obs.type == "TOOL": + # LangChain/universal: obs.name is the tool, obs.input is arguments + tool_name = obs.name or "" + if isinstance(obs_input, dict): + tool_arguments = obs_input + elif isinstance(obs_input, str): + # Try parsing as JSON; LangChain may send stringified dicts + try: + parsed = json.loads(obs_input) + tool_arguments = parsed if isinstance(parsed, dict) else {"input": obs_input} + except (json.JSONDecodeError, ValueError): + tool_arguments = {"input": obs_input} + elif obs_input: + tool_arguments = {"input": str(obs_input)} + else: + tool_arguments = {} + tool_call_id = None else: tool_name = "" tool_arguments = {} @@ -449,42 +517,46 @@ def _parse_tool_result(self, obs_output: Any) -> tuple[str, str | None]: """Parse tool execution output into (content, error). Input formats: - str: "42" → ("42", None) - dict: {"result": "4", "status": "success"} → ("4", None) - dict: {"result": "...", "status": "error"} → ("...", "error") - dict: {"result": "...", "status": ""} → ("...", None) - None: → ("", None) + str: ``"42"`` → ``("42", None)`` + dict: ``{"result": "4", "status": "success"}`` → ``("4", None)`` + dict: ``{"result": "...", "status": "error"}`` → ``("...", "error")`` + dict: ``{"content": "Weather...", "type": "tool", ...}`` → ``("Weather...", None)`` + (LangChain ToolMessage format via Langfuse) + None: → ``("", None)`` """ if isinstance(obs_output, str): return obs_output, None if isinstance(obs_output, dict): - content = obs_output.get("result", str(obs_output)) - status = obs_output.get("status", "") - error = None if status == "success" else (str(status) if status else None) - return content, error + # Strands format: {"result": "...", "status": "success"|"error"} + if "result" in obs_output: + content = obs_output["result"] + status = obs_output.get("status", "") + error = None if status == "success" else (str(status) if status else None) + return content, error + # LangChain ToolMessage format: {"content": "...", "type": "tool", ...} + if "content" in obs_output: + content = obs_output["content"] + if isinstance(content, str): + return content, None + return str(content), None + return str(obs_output), None content = str(obs_output) if obs_output is not None else "" return content, None def _convert_agent_invocation(self, obs: Any, session_id: str) -> AgentInvocationSpan: - """Convert an invoke_agent SPAN observation to an AgentInvocationSpan. - - Langfuse observation (obs.type == "SPAN", obs.name starts with "invoke_agent"): - obs.input: str | list[dict] | dict — user prompt - str: "Hello" - list[dict]: [{"text": "Hello"}] - dict: {"text": "Hello"} - obs.output: str | dict — agent response - str: "Hi there!" - dict: {"message": "Hi there!", "finish_reason": "end_turn"} - dict: {"text": "Hi there!"} - dict: {"content": [{"text": "Hi there!"}]} - obs.metadata: dict | None — may contain "tools" key with available tool names - {"tools": ["shell", "get_pull_request", ...]} + """Convert an agent observation to an AgentInvocationSpan. - Returns: - AgentInvocationSpan with user_prompt, agent_response, and available_tools extracted. + Handles two formats: + + **Strands** (obs.type == "SPAN", name starts with "invoke_agent"): + obs.input: ``"Hello"`` | ``[{"text": "Hello"}]`` | ``{"text": "Hello"}`` + obs.output: ``"Hi!"`` | ``{"message": "Hi!", "finish_reason": "end_turn"}`` + + **LangChain / universal** (obs.type == "CHAIN", root observation): + obs.input: ``{"input": "question"}`` or ``{"messages": [...]}`` + obs.output: ``{"output": "answer"}`` or ``{"content": "answer"}`` """ span_info = self._create_span_info(obs, session_id) obs_input = obs.input @@ -507,43 +579,101 @@ def _convert_agent_invocation(self, obs: Any, session_id: str) -> AgentInvocatio metadata=obs.metadata or {}, ) + def _extract_text_from_content(self, content: Any) -> str: + """Extract plain text from a message content field. + + Handles formats seen in Strands observations via OTEL→Langfuse: + str: ``"Hello"`` → ``"Hello"`` + str (JSON): ``'[{"text": "Hello"}]'`` → ``"Hello"`` + list[dict]: ``[{"text": "Hello"}]`` → ``"Hello"`` + None: → ``""`` + """ + if isinstance(content, str): + try: + parsed = json.loads(content) + except (json.JSONDecodeError, ValueError): + return content + if isinstance(parsed, list): + for item in parsed: + if isinstance(item, dict) and "text" in item: + return item["text"] + return content + if isinstance(content, list): + for item in content: + if isinstance(item, dict) and "text" in item: + return item["text"] + return str(content) if content else "" + def _extract_user_prompt(self, obs_input: Any) -> str: """Extract user prompt string from observation input. Input formats: - str: "Hello" → "Hello" - list[dict]: [{"text": "Hello"}] → "Hello" - dict: {"text": "Hello"} → "Hello" - None: → "" + str: ``"Hello"`` → ``"Hello"`` + list[dict]: ``[{"text": "Hello"}]`` → ``"Hello"`` + list[dict]: ``[{"role": "user", "content": ...}]`` + → ``"Hello"`` (Strands via OTEL) + dict: ``{"text": "Hello"}`` → ``"Hello"`` + dict: ``{"input": "Hello"}`` → ``"Hello"`` (LangChain CHAIN) + dict: ``{"messages": [{"type": "human", "content": "Hello"}]}`` + → ``"Hello"`` (LangChain messages) + None: → ``""`` """ if isinstance(obs_input, str): return obs_input if isinstance(obs_input, list): for item in obs_input: - if isinstance(item, dict) and "text" in item: + if not isinstance(item, dict): + continue + if "text" in item: return item["text"] - if isinstance(obs_input, dict) and "text" in obs_input: - return obs_input["text"] + # Strands message-list format via OTEL→Langfuse + if item.get("role") in ("user", "human"): + return self._extract_text_from_content(item.get("content")) + if isinstance(obs_input, dict): + if "text" in obs_input: + return obs_input["text"] + if "input" in obs_input: + return str(obs_input["input"]) + if "messages" in obs_input: + messages = obs_input["messages"] + if isinstance(messages, list): + for msg in messages: + if isinstance(msg, dict) and msg.get("type") in ("human", "user"): + return str(msg.get("content", "")) + # Fallback: last message + if messages and isinstance(messages[-1], dict): + return str(messages[-1].get("content", "")) return str(obs_input) if obs_input else "" def _extract_agent_response(self, obs_output: Any) -> str: """Extract agent response string from observation output. Input formats: - str: "Hi there!" → "Hi there!" - dict: {"text": "Hi there!"} → "Hi there!" - dict: {"message": "Hi there!", "finish_reason": "..."} → "Hi there!" - dict: {"content": [{"text": "Hi there!"}]} → "Hi there!" - dict: {"content": "Hi there!"} → "Hi there!" - None: → "" + str: ``"Hi there!"`` → ``"Hi there!"`` + list[dict]: ``[{"role": "assistant", "content": ...}]`` → ``"Hi!"`` (Strands via OTEL) + dict: ``{"text": "Hi there!"}`` → ``"Hi there!"`` + dict: ``{"message": "Hi!", "finish_reason": "..."}`` → ``"Hi!"`` + dict: ``{"output": "Hi!"}`` → ``"Hi!"`` (LangChain CHAIN) + dict: ``{"content": [{"text": "Hi!"}]}`` → ``"Hi!"`` + dict: ``{"content": "Hi!"}`` → ``"Hi!"`` + dict: ``{"messages": [{"type": "ai", "content": "Hi!"}]}`` + → ``"Hi!"`` (LangGraph output) + None: → ``""`` """ if isinstance(obs_output, str): return obs_output + if isinstance(obs_output, list): + # Strands message-list format via OTEL→Langfuse + for item in reversed(obs_output): + if isinstance(item, dict) and item.get("role") in ("assistant", "ai"): + return self._extract_text_from_content(item.get("content")) if isinstance(obs_output, dict): if "text" in obs_output: return obs_output["text"] if "message" in obs_output: return obs_output["message"] + if "output" in obs_output: + return str(obs_output["output"]) if "content" in obs_output: content = obs_output["content"] if isinstance(content, list): @@ -552,6 +682,16 @@ def _extract_agent_response(self, obs_output: Any) -> str: return item["text"] elif isinstance(content, str): return content + if "messages" in obs_output: + messages = obs_output["messages"] + if isinstance(messages, list): + # Find last ai/assistant message + for msg in reversed(messages): + if isinstance(msg, dict) and msg.get("type") in ("ai", "assistant"): + return str(msg.get("content", "")) + # Fallback: last message content + if messages and isinstance(messages[-1], dict): + return str(messages[-1].get("content", "")) return str(obs_output) if obs_output else "" def _extract_available_tools(self, metadata: Any) -> list[ToolConfig]: diff --git a/tests/strands_evals/providers/test_langfuse_provider.py b/tests/strands_evals/providers/test_langfuse_provider.py index cdad090..7f7a0d9 100644 --- a/tests/strands_evals/providers/test_langfuse_provider.py +++ b/tests/strands_evals/providers/test_langfuse_provider.py @@ -21,7 +21,7 @@ def _meta(page=1, total_pages=1, total_items=10, limit=100): - m = MagicMock() + m = MagicMock(spec=["page", "limit", "total_items", "total_pages"]) m.page, m.limit, m.total_items, m.total_pages = page, limit, total_items, total_pages return m @@ -64,9 +64,21 @@ def _paginated(data, page=1, total_pages=1): return r +def _cursor_paginated(data, cursor=None): + """Build a cursor-based paginated response (Langfuse v4+ observations API).""" + r = MagicMock(spec=["data", "meta"]) + m = MagicMock(spec=["cursor"]) + m.cursor = cursor + r.data, r.meta = data, m + return r + + @pytest.fixture def mock_client(): - return MagicMock() + m = MagicMock() + # Simulate Langfuse v2/v3 SDK (no legacy API) so tests use observations.get_many + m.api.legacy = None + return m @pytest.fixture @@ -241,6 +253,47 @@ def test_paginates_observations(self, provider, mock_client): ] assert len(provider.get_evaluation_data("s1")["trajectory"].traces[0].spans) == 2 + def test_cursor_pagination_observations(self, provider, mock_client): + """Langfuse v4+ uses cursor-based pagination via legacy.observations_v1.""" + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + # Simulate v4 SDK: legacy.observations_v1 exists + legacy = MagicMock() + mock_client.api.legacy = legacy + legacy.observations_v1.get_many.side_effect = [ + _cursor_paginated( + [_obs("o1", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a")], + cursor="cursor-page2", + ), + _cursor_paginated( + [ + _obs( + "o2", + "t1", + "GENERATION", + name="chat", + obs_input=[{"role": "user", "content": [{"text": "q"}]}], + obs_output={"role": "assistant", "content": [{"text": "a"}]}, + ) + ], + cursor=None, + ), + ] + spans = provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + assert len(spans) == 2 + + def test_cursor_pagination_single_page(self, provider, mock_client): + """Cursor-based pagination with a single page (cursor=None from the start).""" + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + # Simulate v4 SDK: legacy.observations_v1 exists + legacy = MagicMock() + mock_client.api.legacy = legacy + legacy.observations_v1.get_many.return_value = _cursor_paginated( + [_obs("o1", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a")], + cursor=None, + ) + spans = provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + assert len(spans) == 1 + def test_wraps_api_error(self, provider, mock_client): mock_client.api.trace.list.side_effect = Exception("Connection refused") with pytest.raises(ProviderError, match="Connection refused"): @@ -584,3 +637,298 @@ def test_nonempty_agent_response_used(self, provider, mock_client): ], ) assert result["output"] == "agent says this" + + +# --- LangChain framework support via obs.type routing --- + + +class TestLangChainToolType: + """TOOL-type observations from LangChain via Langfuse.""" + + def _get_spans(self, provider, mock_client, observations): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated(observations) + return provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + + def test_tool_type_produces_tool_execution_span(self, provider, mock_client): + """obs.type == 'TOOL' is routed to ToolExecutionSpan.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs("o-tool", "t1", "TOOL", name="add_numbers", obs_input={"a": 2, "b": 3}, obs_output="5"), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + tools = [s for s in spans if isinstance(s, ToolExecutionSpan)] + assert len(tools) == 1 + assert tools[0].tool_call.name == "add_numbers" + assert tools[0].tool_call.arguments == {"a": 2, "b": 3} + assert tools[0].tool_result.content == "5" + + def test_tool_type_with_dict_output(self, provider, mock_client): + """TOOL with dict output parses result/status correctly.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-tool", + "t1", + "TOOL", + name="search", + obs_input={"query": "weather"}, + obs_output={"result": "Sunny", "status": "success"}, + ), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + tools = [s for s in spans if isinstance(s, ToolExecutionSpan)] + assert tools[0].tool_result.content == "Sunny" + assert tools[0].tool_result.error is None + + def test_tool_type_with_string_input(self, provider, mock_client): + """TOOL with string input wraps it in a dict.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs("o-tool", "t1", "TOOL", name="echo", obs_input="hello", obs_output="hello"), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + tools = [s for s in spans if isinstance(s, ToolExecutionSpan)] + assert tools[0].tool_call.name == "echo" + assert tools[0].tool_call.arguments == {"input": "hello"} + + +class TestLangChainChainType: + """CHAIN-type observations from LangChain via Langfuse.""" + + def _get_spans(self, provider, mock_client, observations): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated(observations) + return provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + + def test_root_chain_produces_agent_invocation(self, provider, mock_client): + """Root CHAIN (parent_observation_id=None) → AgentInvocationSpan.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="AgentExecutor", + obs_input={"input": "What is 2+2?"}, + obs_output={"output": "4"}, + parent_observation_id=None, + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert len(agents) == 1 + assert agents[0].user_prompt == "What is 2+2?" + assert agents[0].agent_response == "4" + + def test_child_chain_is_skipped(self, provider, mock_client): + """Non-root CHAIN (has parent) is skipped.""" + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated( + [ + _obs( + "o-child", + "t1", + "CHAIN", + name="SubChain", + obs_input={"input": "sub"}, + obs_output={"output": "sub-out"}, + parent_observation_id="o-parent", + ), + _obs("o-agent", "t1", "SPAN", name="invoke_agent a", obs_input=[{"text": "q"}], obs_output="a"), + ], + ) + spans = provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + assert len(spans) == 1 + assert isinstance(spans[0], AgentInvocationSpan) + + def test_chain_with_messages_input(self, provider, mock_client): + """CHAIN with LangChain messages-style input extracts human message.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="LangGraph", + obs_input={"messages": [{"type": "human", "content": "Tell me a joke"}]}, + obs_output={"output": "Why did the chicken cross the road?"}, + parent_observation_id=None, + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].user_prompt == "Tell me a joke" + + def test_chain_with_content_output(self, provider, mock_client): + """CHAIN with content-style output.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="LangGraph", + obs_input={"input": "Hi"}, + obs_output={"content": "Hello!"}, + parent_observation_id=None, + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].agent_response == "Hello!" + + +class TestLangChainEndToEnd: + """Full LangChain agent trace: CHAIN + GENERATION + TOOL.""" + + def test_full_langchain_trace(self, provider, mock_client): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated( + [ + _obs( + "o-chain", + "t1", + "CHAIN", + name="AgentExecutor", + obs_input={"input": "What is the weather?"}, + obs_output={"output": "Sunny, 72F."}, + parent_observation_id=None, + ), + _obs( + "o-gen", + "t1", + "GENERATION", + name="ChatOpenAI", + obs_input=[{"role": "user", "content": [{"text": "What is the weather?"}]}], + obs_output={"role": "assistant", "content": [{"text": "Let me check."}]}, + parent_observation_id="o-chain", + ), + _obs( + "o-tool", + "t1", + "TOOL", + name="get_weather", + obs_input={"location": "SF"}, + obs_output="Sunny, 72F", + parent_observation_id="o-chain", + ), + ], + ) + result = provider.get_evaluation_data("s1") + spans = result["trajectory"].traces[0].spans + + agent_spans = [s for s in spans if isinstance(s, AgentInvocationSpan)] + inference_spans = [s for s in spans if isinstance(s, InferenceSpan)] + tool_spans = [s for s in spans if isinstance(s, ToolExecutionSpan)] + + assert len(agent_spans) == 1 + assert len(inference_spans) == 1 + assert len(tool_spans) == 1 + + assert agent_spans[0].user_prompt == "What is the weather?" + assert agent_spans[0].agent_response == "Sunny, 72F." + assert tool_spans[0].tool_call.name == "get_weather" + assert tool_spans[0].tool_result.content == "Sunny, 72F" + assert result["output"] == "Sunny, 72F." + + +class TestStrandsOtelViaLangfuse: + """Strands message-list format arriving via OTEL→Langfuse OTLP endpoint.""" + + def _get_spans(self, provider, mock_client, observations): + mock_client.api.trace.list.return_value = _paginated([_trace("t1", "s1")]) + mock_client.api.observations.get_many.return_value = _paginated(observations) + return provider.get_evaluation_data("s1")["trajectory"].traces[0].spans + + def test_strands_message_list_input(self, provider, mock_client): + """Strands invoke_agent with message-list input extracts user text.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"role": "user", "content": '[{"text": "What\'s the weather?"}]'}], + obs_output="Sunny and warm!", + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert len(agents) == 1 + assert agents[0].user_prompt == "What's the weather?" + + def test_strands_message_list_input_plain_string_content(self, provider, mock_client): + """Strands message-list where content is a plain string (not JSON).""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"role": "user", "content": "Hello there"}], + obs_output="Hi!", + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].user_prompt == "Hello there" + + def test_strands_message_list_output(self, provider, mock_client): + """Strands invoke_agent with message-list output extracts assistant text.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"text": "Hello"}], + obs_output=[{"role": "assistant", "content": '[{"text": "Hi there!"}]'}], + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].agent_response == "Hi there!" + + def test_strands_message_list_content_as_list(self, provider, mock_client): + """Strands message-list where content is already a parsed list.""" + spans = self._get_spans( + provider, + mock_client, + [ + _obs( + "o1", + "t1", + "SPAN", + name="invoke_agent my_agent", + obs_input=[{"role": "user", "content": [{"text": "How are you?"}]}], + obs_output="I'm fine!", + ), + ], + ) + agents = [s for s in spans if isinstance(s, AgentInvocationSpan)] + assert agents[0].user_prompt == "How are you?"