diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index 19b6c26d..bb2685db 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -609,8 +609,9 @@ pub(crate) fn gateway_management_policy( /// Decide whether this agent kind should emit a long-lived session agent scope. /// /// Claude Code and Codex can outlive a user-visible run, so the CLI represents -/// their work with bounded turn scopes instead of exporting a long-lived agent -/// scope that needs synthetic termination. +/// their work with bounded Custom turn scopes instead of exporting a long-lived +/// agent scope that needs synthetic termination. Every turn uses Custom scope +/// semantics, while Agent scopes remain reserved for agent and subagent lineage. pub(crate) fn should_emit_session_agent_scope(agent_kind: AgentKind) -> bool { !matches!(agent_kind, AgentKind::ClaudeCode | AgentKind::Codex) } diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index c108c0b5..839b470e 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -1083,7 +1083,7 @@ impl Session { } // Lazily opens the root agent scope for harnesses that have a meaningful session boundary. - // Harnesses without a reliable session end deliberately skip this and use bounded turn agent + // Harnesses without a reliable session end deliberately skip this and use bounded Custom turn // scopes as the top-level observable unit. fn ensure_agent_started(&mut self, event_metadata: Value) -> Result<(), CliError> { if self.agent_scope.is_some() @@ -1107,7 +1107,7 @@ impl Session { Ok(()) } - // Opens a new turn agent scope for a user prompt. If the previous turn never received a + // Opens a new Custom turn scope for a user prompt. If the previous turn never received a // terminal hook, close it first so each user input gets a bounded reviewable trace segment. async fn start_turn(&mut self, event: SessionEvent) -> Result<(), CliError> { if alignment::aliased_turn_subagent_id(&event).is_some() { @@ -1181,7 +1181,7 @@ impl Session { let scope = push_scope( PushScopeParams::builder() .name(turn_name.as_str()) - .scope_type(ScopeType::Agent) + .scope_type(ScopeType::Custom) .parent_opt(self.agent_scope.as_ref()) .metadata(metadata) .input(input) @@ -1364,8 +1364,9 @@ impl Session { self.turn_scope.clone().or_else(|| self.agent_scope.clone()) } - // Starts a subagent agent scope under the active turn. Duplicate subagent starts are ignored so - // integrations that retry or emit both "start" and "created" style hooks do not double-nest. + // Starts an Agent subagent scope under the active Custom turn scope. Duplicate subagent starts + // are ignored so integrations that retry or emit both "start" and "created" style hooks do + // not double-nest. // // Subagents get their own runtime stack seeded with the turn parent. That keeps Phoenix // parentage sibling-shaped within a turn while still allowing parallel workers to end out of @@ -2086,8 +2087,8 @@ impl Session { })); } - // Remembers the latest completed LLM response owned by the turn/root agent so the enclosing - // turn agent scope can export the final assistant output. Subagent-owned responses are + // Remembers the latest completed LLM response owned by the turn or root Agent scope so the + // enclosing Custom turn scope can export the final assistant output. Subagent-owned responses are // deliberately excluded; otherwise a worker's last local answer can overwrite the parent // agent's final synthesis. fn record_completed_llm_response( diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index a5b23472..25d7ac59 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -748,19 +748,19 @@ async fn serve_listener_observability_plugin_records_non_hermes_hooks() { assert!(nemo_relay::plugin::active_plugin_report().is_none()); let events = std::fs::read_to_string(temp.path().join("atof/events.jsonl")).unwrap(); - let agent_starts = events + let turn_starts = events .lines() .map(|line| serde_json::from_str::(line).unwrap()) .filter(|event| { event["kind"] == "scope" && event["scope_category"] == "start" - && event["category"] == "agent" + && event["metadata"]["nemo_relay_scope_role"] == "turn" }) .filter_map(|event| event["name"].as_str().map(ToOwned::to_owned)) .collect::>(); - assert!(agent_starts.contains(&"codex-turn".to_string())); - assert!(agent_starts.contains(&"claude-code-turn".to_string())); - assert!(!agent_starts.contains(&"claude-code".to_string())); + assert!(turn_starts.contains(&"codex-turn".to_string())); + assert!(turn_starts.contains(&"claude-code-turn".to_string())); + assert!(!turn_starts.contains(&"claude-code".to_string())); } #[tokio::test] @@ -1615,8 +1615,8 @@ async fn serve_listener_records_codex_stop_atof_contract() { && event["name"] == "codex" })); - let turn_start = find_scope_event(&events, "codex-turn", "agent", "start"); - let turn_end = find_scope_event(&events, "codex-turn", "agent", "end"); + let turn_start = find_scope_event(&events, "codex-turn", "custom", "start"); + let turn_end = find_scope_event(&events, "codex-turn", "custom", "end"); assert_eq!(turn_start["uuid"], turn_end["uuid"]); assert_eq!( turn_start["data"], diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index 74bdbe17..0a283b0b 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -720,7 +720,7 @@ async fn parallel_subagents_are_siblings_under_turn_scope() { } #[tokio::test] -async fn codex_turn_is_agent_scope_with_turn_role_metadata() { +async fn codex_turn_is_custom_scope_with_turn_role_metadata() { let manager = SessionManager::new(session_test_config()); manager .apply_events( @@ -748,7 +748,7 @@ async fn codex_turn_is_agent_scope_with_turn_role_metadata() { assert!(session.agent_scope.is_none()); let turn = active_turn_scope(session); assert_eq!(turn.name, "codex-turn"); - assert_eq!(turn.scope_type, ScopeType::Agent); + assert_eq!(turn.scope_type, ScopeType::Custom); assert_eq!( turn.metadata.as_ref().unwrap()["nemo_relay_scope_role"], json!("turn") @@ -1820,7 +1820,7 @@ async fn codex_stop_snapshots_atif_without_session_end() { .iter() .find(|event| { event["name"] == "codex-turn" - && event["category"] == "agent" + && event["category"] == "custom" && event["scope_category"] == "start" }) .expect("Codex turn start should be observed"); @@ -1828,7 +1828,7 @@ async fn codex_stop_snapshots_atif_without_session_end() { .iter() .find(|event| { event["name"] == "codex-turn" - && event["category"] == "agent" + && event["category"] == "custom" && event["scope_category"] == "end" }) .expect("Codex Stop should close the turn scope"); @@ -1917,7 +1917,7 @@ async fn codex_openinference_spans_match_shared_contract() { turn_attributes .get("openinference.span.kind") .map(String::as_str), - Some("AGENT") + Some("CHAIN") ); assert_eq!( llm_attributes @@ -2967,7 +2967,7 @@ async fn hermes_orphan_subagent_stop_links_atof_and_openinference_to_turn() { let turn_start = atof_events .iter() .find(|event| { - event["category"] == "agent" + event["category"] == "custom" && event["scope_category"] == "start" && event["metadata"]["session_id"] == json!("hermes-orphan") && event["metadata"]["nemo_relay_scope_role"] == json!("turn") @@ -2998,7 +2998,7 @@ async fn hermes_orphan_subagent_stop_links_atof_and_openinference_to_turn() { attributes .get("openinference.span.kind") .map(String::as_str) - == Some("AGENT") + == Some("CHAIN") && attributes.get("metadata").is_some_and(|metadata| { serde_json::from_str::(metadata) .ok() @@ -3110,7 +3110,7 @@ async fn hermes_subagent_child_session_preserves_atof_and_openinference_lineage( let parent_turn = atof_events .iter() .find(|event| { - event["category"] == "agent" + event["category"] == "custom" && event["scope_category"] == "start" && event["metadata"]["session_id"] == json!("parent-session") && event["metadata"]["nemo_relay_scope_role"] == json!("turn") @@ -3143,7 +3143,7 @@ async fn hermes_subagent_child_session_preserves_atof_and_openinference_lineage( attributes .get("openinference.span.kind") .map(String::as_str) - == Some("AGENT") + == Some("CHAIN") && attributes.get("metadata").is_some_and(|metadata| { serde_json::from_str::(metadata) .ok() diff --git a/crates/core/src/api/llm.rs b/crates/core/src/api/llm.rs index 6e6766fc..ab1a99e9 100644 --- a/crates/core/src/api/llm.rs +++ b/crates/core/src/api/llm.rs @@ -20,8 +20,8 @@ use crate::api::runtime::{ use crate::api::scope::event; use crate::api::scope::{EmitMarkEventParams, ScopeHandle}; use crate::api::shared::{ - ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid, - run_request_intercepts_with_codec, snapshot_event_subscribers, + ensure_runtime_owner, inject_dynamo_session_ids, metadata_with_otel_status, + resolve_parent_uuid, run_request_intercepts_with_codec, snapshot_event_subscribers, }; use crate::codec::request::AnnotatedLlmRequest; use crate::codec::response::{AnnotatedLlmResponse, attach_estimated_cost_for_provider}; @@ -919,9 +919,11 @@ pub fn llm_request_intercepts( .map_err(|error| FlowError::Internal(error.to_string()))?; state.llm_request_intercept_entries(&scope_locals) }; - NemoRelayContextState::llm_request_intercepts_snapshot_chain( + let mut outcome = NemoRelayContextState::llm_request_intercepts_snapshot_chain( name, request, None, &entries, false, - ) + )?; + inject_dynamo_session_ids(&mut outcome.request); + Ok(outcome) } /// Run only the LLM conditional-execution guardrail chain. diff --git a/crates/core/src/api/shared.rs b/crates/core/src/api/shared.rs index 28965542..28767174 100644 --- a/crates/core/src/api/shared.rs +++ b/crates/core/src/api/shared.rs @@ -10,12 +10,18 @@ use crate::api::runtime::EventSubscriberFn; use crate::api::runtime::global_context; use crate::api::runtime::{current_scope_stack, task_scope_top}; use crate::api::scope::ScopeHandle; +use crate::api::scope::ScopeType; use crate::codec::request::AnnotatedLlmRequest; use crate::codec::traits::LlmCodec; use crate::error::{FlowError, Result}; use crate::json::{Json, merge_json}; use crate::shared_runtime::ensure_process_runtime_owner; +/// Header carrying the current Dynamo agent session ID. +pub const DYNAMO_SESSION_ID_HEADER_KEY: &str = "x-dynamo-session-id"; +/// Header carrying the parent Dynamo agent session ID. +pub const DYNAMO_PARENT_SESSION_ID_HEADER_KEY: &str = "x-dynamo-parent-session-id"; + pub(crate) fn resolve_parent_uuid(parent: Option<&ScopeHandle>) -> Option { Some( parent @@ -38,6 +44,65 @@ pub(crate) fn ensure_runtime_owner() -> Result<()> { ensure_process_runtime_owner() } +/// Resolve the current and parent agent session IDs from the active scope stack. +/// +/// The most recent two explicit Agent scopes are used. +/// Harness-specific session metadata takes precedence over the scope name, while +/// names keep application-created scopes useful when no metadata is attached. +pub(crate) fn resolve_agent_session_ids() -> Option<(String, Option)> { + let stack = current_scope_stack(); + let stack = stack.read().ok()?; + let mut agent_scopes = stack + .scopes() + .iter() + .skip(1) + .filter(|scope| matches!(scope.scope_type, ScopeType::Agent)) + .rev(); + let current = agent_scopes.next().map(agent_scope_id)?; + let parent = agent_scopes.next().map(agent_scope_id); + Some((current, parent)) +} + +fn agent_scope_id(scope: &ScopeHandle) -> String { + scope + .metadata + .as_ref() + .and_then(|metadata| { + [ + "codex_subagent_session_id", + "subagent_session_id", + "subagent_id", + "session_id", + ] + .into_iter() + .find_map(|key| metadata.get(key).and_then(|value| value.as_str())) + }) + .unwrap_or(&scope.name) + .to_string() +} + +pub(crate) fn inject_dynamo_session_ids(request: &mut LlmRequest) { + let Some((current, parent)) = resolve_agent_session_ids() else { + return; + }; + + request.headers.insert( + DYNAMO_SESSION_ID_HEADER_KEY.to_string(), + Json::String(current), + ); + match parent { + Some(parent) => { + request.headers.insert( + DYNAMO_PARENT_SESSION_ID_HEADER_KEY.to_string(), + Json::String(parent), + ); + } + None => { + request.headers.remove(DYNAMO_PARENT_SESSION_ID_HEADER_KEY); + } + } +} + pub(crate) fn metadata_with_otel_status( metadata: Option, status_code: &'static str, @@ -105,15 +170,17 @@ pub(crate) fn run_request_intercepts_with_codec( &entries, codec.is_some(), )?; + let mut request = outcome.request; + inject_dynamo_session_ids(&mut request); let pending_marks = outcome.pending_marks; match (codec, outcome.annotated_request) { (Some(codec), Some(annotated)) => { - let mut encoded = codec.encode(&annotated, &outcome.request)?; - encoded.headers = outcome.request.headers; + let mut encoded = codec.encode(&annotated, &request)?; + encoded.headers.extend(request.headers); Ok((encoded, Some(Arc::new(annotated)), pending_marks)) } - (_, annotated) => Ok((outcome.request, annotated.map(Arc::new), pending_marks)), + (_, annotated) => Ok((request, annotated.map(Arc::new), pending_marks)), } } diff --git a/crates/core/src/observability/atif.rs b/crates/core/src/observability/atif.rs index 67dedff5..ce96140e 100644 --- a/crates/core/src/observability/atif.rs +++ b/crates/core/src/observability/atif.rs @@ -3121,6 +3121,10 @@ fn agent_scope_role(event: &Event) -> Option<&str> { .and_then(Json::as_str) } +fn is_agent_scope_event(event: &Event) -> bool { + event.scope_type() == Some(crate::api::scope::ScopeType::Agent) +} + fn is_subagent_reference_event(event: &Event) -> bool { agent_scope_role(event) == Some("subagent") || event.metadata().and_then(delegation_tool_call_id).is_some() @@ -3191,9 +3195,7 @@ fn events_to_trajectory( fn can_use_agent_scope_tree(tree: &AgentScopeTree, events: &[&Event]) -> bool { events.iter().all(|event| { - event.scope_type() == Some(crate::api::scope::ScopeType::Agent) - || !is_step_event(event) - || tree.owner_agent(event).is_some() + is_agent_scope_event(event) || !is_step_event(event) || tree.owner_agent(event).is_some() }) } diff --git a/crates/core/src/observability/plugin_component.rs b/crates/core/src/observability/plugin_component.rs index 7e66ea0c..ff8e6219 100644 --- a/crates/core/src/observability/plugin_component.rs +++ b/crates/core/src/observability/plugin_component.rs @@ -12,9 +12,10 @@ //! so configuration remains portable across bindings. Agent Trajectory //! Observability Format (ATOF), OpenTelemetry, and OpenInference each register //! one global subscriber when enabled. Agent Trajectory Interchange Format -//! (ATIF) uses a global dispatcher that detects top-level agent scopes and -//! creates one scope-local exporter for each trajectory run. Coding-agent turns -//! that need bounded traces are represented as agent scopes with role metadata. +//! (ATIF) uses a global dispatcher that detects top-level agent or turn scopes +//! and creates one scope-local exporter for each trajectory run. Coding-agent +//! turns that need bounded traces carry role metadata; their declared scope +//! type is preserved in the exported event stream. use std::collections::{HashMap, HashSet}; use std::future::Future; @@ -1315,7 +1316,14 @@ fn is_top_level_trajectory_start(event: &Event) -> bool { if event.scope_category() != Some(ScopeCategory::Start) { return false; } - if event.scope_type() != Some(ScopeType::Agent) { + let is_agent_scope = event.scope_type() == Some(ScopeType::Agent); + let is_turn_scope = event.scope_type() == Some(ScopeType::Custom) + && event + .metadata() + .and_then(|metadata| metadata.get("nemo_relay_scope_role")) + .and_then(Json::as_str) + == Some("turn"); + if !is_agent_scope && !is_turn_scope { return false; } let Some(parent_uuid) = event.parent_uuid() else { diff --git a/crates/core/tests/unit/shared_tests.rs b/crates/core/tests/unit/shared_tests.rs index 851ea4ee..196efd03 100644 --- a/crates/core/tests/unit/shared_tests.rs +++ b/crates/core/tests/unit/shared_tests.rs @@ -57,10 +57,9 @@ impl LlmCodec for SharedTestCodec { fn encode(&self, annotated: &AnnotatedLlmRequest, original: &LlmRequest) -> Result { let mut content = original.content.clone(); content["encoded_model"] = json!(annotated.model.clone()); - Ok(LlmRequest { - headers: original.headers.clone(), - content, - }) + let mut headers = original.headers.clone(); + headers.insert("x-codec-encoded".into(), json!(true)); + Ok(LlmRequest { headers, content }) } } @@ -215,6 +214,10 @@ fn test_run_request_intercepts_with_codec_none_and_codec_paths() { request_with_codec.headers.get("x-codec"), Some(&json!(true)) ); + assert_eq!( + request_with_codec.headers.get("x-codec-encoded"), + Some(&json!(true)) + ); assert_eq!( request_with_codec.content["encoded_model"], json!("intercepted-model") @@ -230,3 +233,168 @@ fn test_run_request_intercepts_with_codec_none_and_codec_paths() { deregister_llm_request_intercept("shared-codec").unwrap(); reset_global(); } + +#[test] +fn test_run_request_intercepts_injects_dynamo_agent_lineage() { + let _guard = lock_runtime_owner(); + reset_global(); + + let parent = push_scope( + crate::api::scope::PushScopeParams::builder() + .name("parent-name") + .scope_type(ScopeType::Agent) + .metadata(json!({"session_id": "parent-session"})) + .build(), + ) + .unwrap(); + let turn = push_scope( + crate::api::scope::PushScopeParams::builder() + .name("codex-turn") + .scope_type(ScopeType::Custom) + .metadata(json!({ + "nemo_relay_scope_role": "turn", + "session_id": "parent-session" + })) + .parent(&parent) + .build(), + ) + .unwrap(); + let child = push_scope( + crate::api::scope::PushScopeParams::builder() + .name("child-name") + .scope_type(ScopeType::Agent) + .metadata(json!({"codex_subagent_session_id": "child-session"})) + .parent(&turn) + .build(), + ) + .unwrap(); + + let (request, _, _) = run_request_intercepts_with_codec( + "openai.responses", + LlmRequest { + headers: Map::new(), + content: json!({"prompt": "hello"}), + }, + None, + ) + .unwrap(); + assert_eq!( + request.headers.get(DYNAMO_SESSION_ID_HEADER_KEY), + Some(&json!("child-session")) + ); + assert_eq!( + request.headers.get(DYNAMO_PARENT_SESSION_ID_HEADER_KEY), + Some(&json!("parent-session")) + ); + + let duplicate_id_child = push_scope( + crate::api::scope::PushScopeParams::builder() + .name("duplicate-id-child") + .scope_type(ScopeType::Agent) + .metadata(json!({"session_id": "child-session"})) + .parent(&child) + .build(), + ) + .unwrap(); + let (request_with_codec, _, _) = run_request_intercepts_with_codec( + "openai.responses", + LlmRequest { + headers: Map::new(), + content: json!({"prompt": "hello"}), + }, + Some(Arc::new(SharedTestCodec)), + ) + .unwrap(); + assert_eq!( + request_with_codec.headers.get(DYNAMO_SESSION_ID_HEADER_KEY), + Some(&json!("child-session")) + ); + assert_eq!( + request_with_codec + .headers + .get(DYNAMO_PARENT_SESSION_ID_HEADER_KEY), + Some(&json!("child-session")) + ); + assert_eq!( + request_with_codec.headers.get("x-codec-encoded"), + Some(&json!(true)) + ); + pop_scope( + crate::api::scope::PopScopeParams::builder() + .handle_uuid(&duplicate_id_child.uuid) + .build(), + ) + .unwrap(); + + pop_scope( + crate::api::scope::PopScopeParams::builder() + .handle_uuid(&child.uuid) + .build(), + ) + .unwrap(); + + let (request, _, _) = run_request_intercepts_with_codec( + "openai.responses", + LlmRequest { + headers: Map::new(), + content: json!({"prompt": "hello"}), + }, + None, + ) + .unwrap(); + assert_eq!( + request.headers.get(DYNAMO_SESSION_ID_HEADER_KEY), + Some(&json!("parent-session")) + ); + assert!( + !request + .headers + .contains_key(DYNAMO_PARENT_SESSION_ID_HEADER_KEY) + ); + pop_scope( + crate::api::scope::PopScopeParams::builder() + .handle_uuid(&turn.uuid) + .build(), + ) + .unwrap(); + pop_scope( + crate::api::scope::PopScopeParams::builder() + .handle_uuid(&parent.uuid) + .build(), + ) + .unwrap(); + + let custom_turn = push_scope( + crate::api::scope::PushScopeParams::builder() + .name("custom-turn-only") + .scope_type(ScopeType::Custom) + .metadata(json!({ + "nemo_relay_scope_role": "turn", + "session_id": "custom-session" + })) + .build(), + ) + .unwrap(); + let (request, _, _) = run_request_intercepts_with_codec( + "openai.responses", + LlmRequest { + headers: Map::new(), + content: json!({"prompt": "hello"}), + }, + None, + ) + .unwrap(); + assert!(!request.headers.contains_key(DYNAMO_SESSION_ID_HEADER_KEY)); + assert!( + !request + .headers + .contains_key(DYNAMO_PARENT_SESSION_ID_HEADER_KEY) + ); + pop_scope( + crate::api::scope::PopScopeParams::builder() + .handle_uuid(&custom_turn.uuid) + .build(), + ) + .unwrap(); + reset_global(); +} diff --git a/crates/node/tests/scope_local_tests.mjs b/crates/node/tests/scope_local_tests.mjs index 6a5b94f5..14dc49dd 100644 --- a/crates/node/tests/scope_local_tests.mjs +++ b/crates/node/tests/scope_local_tests.mjs @@ -267,6 +267,7 @@ describe('Scope-local guardrail registration and execution', () => { assert.deepEqual(start.data, { headers: { 'X-Scope-Local': 'yes', + 'x-dynamo-session-id': 'sl_llm_guard_req_exec', }, content: { messages: [], diff --git a/docs/instrument-applications/instrument-llm-call.mdx b/docs/instrument-applications/instrument-llm-call.mdx index 8842a2f8..f8159c68 100644 --- a/docs/instrument-applications/instrument-llm-call.mdx +++ b/docs/instrument-applications/instrument-llm-call.mdx @@ -32,6 +32,13 @@ Create a scope for the active request or agent run before adding LLM instrumenta The request and response payloads must be JSON-compatible. If your provider SDK uses clients, streams, callbacks, or other opaque objects, keep those objects in the provider callback and pass only a serializable request projection into NeMo Relay. +For every managed LLM request, Relay automatically propagates agent lineage to +Dynamo using `x-dynamo-session-id` and, when a parent agent scope is active, +`x-dynamo-parent-session-id`. The current and parent IDs come from the most +recent explicit `Agent` scopes in the active scope stack and ignores the implicit root +scope. When present, Relay uses the harness session metadata, otherwise, it uses application +scope names. No plugin or configuration is required. + If you want Relay to add cost estimates, initialize the built-in `pricing` plugin before the LLM call and attach a response codec that decodes `model` and token `usage` from the provider response. Provider- or framework-reported cost