Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/cli/src/alignment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,9 @@ pub(crate) fn gateway_management_policy(
/// Decide whether this agent kind should emit a long-lived session agent scope.
///
/// Claude Code and Codex can outlive a user-visible run, so the CLI represents
/// their work with bounded turn scopes instead of exporting a long-lived agent
/// scope that needs synthetic termination.
/// their work with bounded Custom turn scopes instead of exporting a long-lived
/// agent scope that needs synthetic termination. Every turn uses Custom scope
/// semantics, while Agent scopes remain reserved for agent and subagent lineage.
pub(crate) fn should_emit_session_agent_scope(agent_kind: AgentKind) -> bool {
!matches!(agent_kind, AgentKind::ClaudeCode | AgentKind::Codex)
}
Expand Down
15 changes: 8 additions & 7 deletions crates/cli/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ impl Session {
}

// Lazily opens the root agent scope for harnesses that have a meaningful session boundary.
// Harnesses without a reliable session end deliberately skip this and use bounded turn agent
// Harnesses without a reliable session end deliberately skip this and use bounded Custom turn
// scopes as the top-level observable unit.
fn ensure_agent_started(&mut self, event_metadata: Value) -> Result<(), CliError> {
if self.agent_scope.is_some()
Expand All @@ -1107,7 +1107,7 @@ impl Session {
Ok(())
}

// Opens a new turn agent scope for a user prompt. If the previous turn never received a
// Opens a new Custom turn scope for a user prompt. If the previous turn never received a
// terminal hook, close it first so each user input gets a bounded reviewable trace segment.
async fn start_turn(&mut self, event: SessionEvent) -> Result<(), CliError> {
if alignment::aliased_turn_subagent_id(&event).is_some() {
Expand Down Expand Up @@ -1181,7 +1181,7 @@ impl Session {
let scope = push_scope(
PushScopeParams::builder()
.name(turn_name.as_str())
.scope_type(ScopeType::Agent)
.scope_type(ScopeType::Custom)
Comment thread
willkill07 marked this conversation as resolved.
.parent_opt(self.agent_scope.as_ref())
.metadata(metadata)
.input(input)
Expand Down Expand Up @@ -1364,8 +1364,9 @@ impl Session {
self.turn_scope.clone().or_else(|| self.agent_scope.clone())
}

// Starts a subagent agent scope under the active turn. Duplicate subagent starts are ignored so
// integrations that retry or emit both "start" and "created" style hooks do not double-nest.
// Starts an Agent subagent scope under the active Custom turn scope. Duplicate subagent starts
// are ignored so integrations that retry or emit both "start" and "created" style hooks do
// not double-nest.
//
// Subagents get their own runtime stack seeded with the turn parent. That keeps Phoenix
// parentage sibling-shaped within a turn while still allowing parallel workers to end out of
Expand Down Expand Up @@ -2086,8 +2087,8 @@ impl Session {
}));
}

// Remembers the latest completed LLM response owned by the turn/root agent so the enclosing
// turn agent scope can export the final assistant output. Subagent-owned responses are
// Remembers the latest completed LLM response owned by the turn or root Agent scope so the
// enclosing Custom turn scope can export the final assistant output. Subagent-owned responses are
// deliberately excluded; otherwise a worker's last local answer can overwrite the parent
// agent's final synthesis.
fn record_completed_llm_response(
Expand Down
14 changes: 7 additions & 7 deletions crates/cli/tests/coverage/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,19 +748,19 @@ async fn serve_listener_observability_plugin_records_non_hermes_hooks() {
assert!(nemo_relay::plugin::active_plugin_report().is_none());

let events = std::fs::read_to_string(temp.path().join("atof/events.jsonl")).unwrap();
let agent_starts = events
let turn_starts = events
.lines()
.map(|line| serde_json::from_str::<Value>(line).unwrap())
.filter(|event| {
event["kind"] == "scope"
&& event["scope_category"] == "start"
&& event["category"] == "agent"
&& event["metadata"]["nemo_relay_scope_role"] == "turn"
})
.filter_map(|event| event["name"].as_str().map(ToOwned::to_owned))
.collect::<Vec<_>>();
assert!(agent_starts.contains(&"codex-turn".to_string()));
assert!(agent_starts.contains(&"claude-code-turn".to_string()));
assert!(!agent_starts.contains(&"claude-code".to_string()));
assert!(turn_starts.contains(&"codex-turn".to_string()));
assert!(turn_starts.contains(&"claude-code-turn".to_string()));
assert!(!turn_starts.contains(&"claude-code".to_string()));
}

#[tokio::test]
Expand Down Expand Up @@ -1615,8 +1615,8 @@ async fn serve_listener_records_codex_stop_atof_contract() {
&& event["name"] == "codex"
}));

let turn_start = find_scope_event(&events, "codex-turn", "agent", "start");
let turn_end = find_scope_event(&events, "codex-turn", "agent", "end");
let turn_start = find_scope_event(&events, "codex-turn", "custom", "start");
let turn_end = find_scope_event(&events, "codex-turn", "custom", "end");
assert_eq!(turn_start["uuid"], turn_end["uuid"]);
assert_eq!(
turn_start["data"],
Expand Down
18 changes: 9 additions & 9 deletions crates/cli/tests/coverage/session_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ async fn parallel_subagents_are_siblings_under_turn_scope() {
}

#[tokio::test]
async fn codex_turn_is_agent_scope_with_turn_role_metadata() {
async fn codex_turn_is_custom_scope_with_turn_role_metadata() {
let manager = SessionManager::new(session_test_config());
manager
.apply_events(
Expand Down Expand Up @@ -748,7 +748,7 @@ async fn codex_turn_is_agent_scope_with_turn_role_metadata() {
assert!(session.agent_scope.is_none());
let turn = active_turn_scope(session);
assert_eq!(turn.name, "codex-turn");
assert_eq!(turn.scope_type, ScopeType::Agent);
assert_eq!(turn.scope_type, ScopeType::Custom);
assert_eq!(
turn.metadata.as_ref().unwrap()["nemo_relay_scope_role"],
json!("turn")
Expand Down Expand Up @@ -1820,15 +1820,15 @@ async fn codex_stop_snapshots_atif_without_session_end() {
.iter()
.find(|event| {
event["name"] == "codex-turn"
&& event["category"] == "agent"
&& event["category"] == "custom"
&& event["scope_category"] == "start"
})
.expect("Codex turn start should be observed");
let turn_end = observed
.iter()
.find(|event| {
event["name"] == "codex-turn"
&& event["category"] == "agent"
&& event["category"] == "custom"
&& event["scope_category"] == "end"
})
.expect("Codex Stop should close the turn scope");
Expand Down Expand Up @@ -1917,7 +1917,7 @@ async fn codex_openinference_spans_match_shared_contract() {
turn_attributes
.get("openinference.span.kind")
.map(String::as_str),
Some("AGENT")
Some("CHAIN")
);
assert_eq!(
llm_attributes
Expand Down Expand Up @@ -2967,7 +2967,7 @@ async fn hermes_orphan_subagent_stop_links_atof_and_openinference_to_turn() {
let turn_start = atof_events
.iter()
.find(|event| {
event["category"] == "agent"
event["category"] == "custom"
&& event["scope_category"] == "start"
&& event["metadata"]["session_id"] == json!("hermes-orphan")
&& event["metadata"]["nemo_relay_scope_role"] == json!("turn")
Expand Down Expand Up @@ -2998,7 +2998,7 @@ async fn hermes_orphan_subagent_stop_links_atof_and_openinference_to_turn() {
attributes
.get("openinference.span.kind")
.map(String::as_str)
== Some("AGENT")
== Some("CHAIN")
&& attributes.get("metadata").is_some_and(|metadata| {
serde_json::from_str::<Value>(metadata)
.ok()
Expand Down Expand Up @@ -3110,7 +3110,7 @@ async fn hermes_subagent_child_session_preserves_atof_and_openinference_lineage(
let parent_turn = atof_events
.iter()
.find(|event| {
event["category"] == "agent"
event["category"] == "custom"
&& event["scope_category"] == "start"
&& event["metadata"]["session_id"] == json!("parent-session")
&& event["metadata"]["nemo_relay_scope_role"] == json!("turn")
Expand Down Expand Up @@ -3143,7 +3143,7 @@ async fn hermes_subagent_child_session_preserves_atof_and_openinference_lineage(
attributes
.get("openinference.span.kind")
.map(String::as_str)
== Some("AGENT")
== Some("CHAIN")
&& attributes.get("metadata").is_some_and(|metadata| {
serde_json::from_str::<Value>(metadata)
.ok()
Expand Down
10 changes: 6 additions & 4 deletions crates/core/src/api/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::api::runtime::{
use crate::api::scope::event;
use crate::api::scope::{EmitMarkEventParams, ScopeHandle};
use crate::api::shared::{
ensure_runtime_owner, metadata_with_otel_status, resolve_parent_uuid,
run_request_intercepts_with_codec, snapshot_event_subscribers,
ensure_runtime_owner, inject_dynamo_session_ids, metadata_with_otel_status,
resolve_parent_uuid, run_request_intercepts_with_codec, snapshot_event_subscribers,
};
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::response::{AnnotatedLlmResponse, attach_estimated_cost_for_provider};
Expand Down Expand Up @@ -919,9 +919,11 @@ pub fn llm_request_intercepts(
.map_err(|error| FlowError::Internal(error.to_string()))?;
state.llm_request_intercept_entries(&scope_locals)
};
NemoRelayContextState::llm_request_intercepts_snapshot_chain(
let mut outcome = NemoRelayContextState::llm_request_intercepts_snapshot_chain(
name, request, None, &entries, false,
)
)?;
inject_dynamo_session_ids(&mut outcome.request);
Ok(outcome)
}

/// Run only the LLM conditional-execution guardrail chain.
Expand Down
73 changes: 70 additions & 3 deletions crates/core/src/api/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ use crate::api::runtime::EventSubscriberFn;
use crate::api::runtime::global_context;
use crate::api::runtime::{current_scope_stack, task_scope_top};
use crate::api::scope::ScopeHandle;
use crate::api::scope::ScopeType;
use crate::codec::request::AnnotatedLlmRequest;
use crate::codec::traits::LlmCodec;
use crate::error::{FlowError, Result};
use crate::json::{Json, merge_json};
use crate::shared_runtime::ensure_process_runtime_owner;

/// Header carrying the current Dynamo agent session ID.
pub const DYNAMO_SESSION_ID_HEADER_KEY: &str = "x-dynamo-session-id";
/// Header carrying the parent Dynamo agent session ID.
pub const DYNAMO_PARENT_SESSION_ID_HEADER_KEY: &str = "x-dynamo-parent-session-id";

pub(crate) fn resolve_parent_uuid(parent: Option<&ScopeHandle>) -> Option<Uuid> {
Some(
parent
Expand All @@ -38,6 +44,65 @@ pub(crate) fn ensure_runtime_owner() -> Result<()> {
ensure_process_runtime_owner()
}

/// Resolve the current and parent agent session IDs from the active scope stack.
///
/// The most recent two explicit Agent scopes are used.
/// Harness-specific session metadata takes precedence over the scope name, while
/// names keep application-created scopes useful when no metadata is attached.
pub(crate) fn resolve_agent_session_ids() -> Option<(String, Option<String>)> {
let stack = current_scope_stack();
let stack = stack.read().ok()?;
let mut agent_scopes = stack
.scopes()
.iter()
.skip(1)
.filter(|scope| matches!(scope.scope_type, ScopeType::Agent))
.rev();
let current = agent_scopes.next().map(agent_scope_id)?;
let parent = agent_scopes.next().map(agent_scope_id);
Some((current, parent))
}

fn agent_scope_id(scope: &ScopeHandle) -> String {
scope
.metadata
.as_ref()
.and_then(|metadata| {
[
"codex_subagent_session_id",
"subagent_session_id",
"subagent_id",
"session_id",
]
.into_iter()
.find_map(|key| metadata.get(key).and_then(|value| value.as_str()))
})
.unwrap_or(&scope.name)
.to_string()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

pub(crate) fn inject_dynamo_session_ids(request: &mut LlmRequest) {
let Some((current, parent)) = resolve_agent_session_ids() else {
return;
};

request.headers.insert(
DYNAMO_SESSION_ID_HEADER_KEY.to_string(),
Json::String(current),
);
match parent {
Some(parent) => {
request.headers.insert(
DYNAMO_PARENT_SESSION_ID_HEADER_KEY.to_string(),
Json::String(parent),
);
}
None => {
request.headers.remove(DYNAMO_PARENT_SESSION_ID_HEADER_KEY);
}
}
}

pub(crate) fn metadata_with_otel_status(
metadata: Option<Json>,
status_code: &'static str,
Expand Down Expand Up @@ -105,15 +170,17 @@ pub(crate) fn run_request_intercepts_with_codec(
&entries,
codec.is_some(),
)?;
let mut request = outcome.request;
inject_dynamo_session_ids(&mut request);
let pending_marks = outcome.pending_marks;

match (codec, outcome.annotated_request) {
(Some(codec), Some(annotated)) => {
let mut encoded = codec.encode(&annotated, &outcome.request)?;
encoded.headers = outcome.request.headers;
let mut encoded = codec.encode(&annotated, &request)?;
encoded.headers.extend(request.headers);
Ok((encoded, Some(Arc::new(annotated)), pending_marks))
}
(_, annotated) => Ok((outcome.request, annotated.map(Arc::new), pending_marks)),
(_, annotated) => Ok((request, annotated.map(Arc::new), pending_marks)),
}
}

Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/observability/atif.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3121,6 +3121,10 @@ fn agent_scope_role(event: &Event) -> Option<&str> {
.and_then(Json::as_str)
}

fn is_agent_scope_event(event: &Event) -> bool {
event.scope_type() == Some(crate::api::scope::ScopeType::Agent)
}

fn is_subagent_reference_event(event: &Event) -> bool {
agent_scope_role(event) == Some("subagent")
|| event.metadata().and_then(delegation_tool_call_id).is_some()
Expand Down Expand Up @@ -3191,9 +3195,7 @@ fn events_to_trajectory(

fn can_use_agent_scope_tree(tree: &AgentScopeTree, events: &[&Event]) -> bool {
events.iter().all(|event| {
event.scope_type() == Some(crate::api::scope::ScopeType::Agent)
|| !is_step_event(event)
|| tree.owner_agent(event).is_some()
is_agent_scope_event(event) || !is_step_event(event) || tree.owner_agent(event).is_some()
})
}

Expand Down
16 changes: 12 additions & 4 deletions crates/core/src/observability/plugin_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
//! so configuration remains portable across bindings. Agent Trajectory
//! Observability Format (ATOF), OpenTelemetry, and OpenInference each register
//! one global subscriber when enabled. Agent Trajectory Interchange Format
//! (ATIF) uses a global dispatcher that detects top-level agent scopes and
//! creates one scope-local exporter for each trajectory run. Coding-agent turns
//! that need bounded traces are represented as agent scopes with role metadata.
//! (ATIF) uses a global dispatcher that detects top-level agent or turn scopes
//! and creates one scope-local exporter for each trajectory run. Coding-agent
//! turns that need bounded traces carry role metadata; their declared scope
//! type is preserved in the exported event stream.

use std::collections::{HashMap, HashSet};
use std::future::Future;
Expand Down Expand Up @@ -1315,7 +1316,14 @@ fn is_top_level_trajectory_start(event: &Event) -> bool {
if event.scope_category() != Some(ScopeCategory::Start) {
return false;
}
if event.scope_type() != Some(ScopeType::Agent) {
let is_agent_scope = event.scope_type() == Some(ScopeType::Agent);
let is_turn_scope = event.scope_type() == Some(ScopeType::Custom)
&& event
.metadata()
.and_then(|metadata| metadata.get("nemo_relay_scope_role"))
.and_then(Json::as_str)
== Some("turn");
if !is_agent_scope && !is_turn_scope {
return false;
}
let Some(parent_uuid) = event.parent_uuid() else {
Expand Down
Loading
Loading