diff --git a/apps/aether-gateway/src/orchestration/attempt.rs b/apps/aether-gateway/src/orchestration/attempt.rs index 0d3213488..53f081a06 100644 --- a/apps/aether-gateway/src/orchestration/attempt.rs +++ b/apps/aether-gateway/src/orchestration/attempt.rs @@ -74,17 +74,44 @@ pub(crate) fn local_attempt_slot_count(transport: &GatewayProviderTransportSnaps local_attempt_slots_from_transport(transport).unwrap_or(1) } +/// For endpoint/provider table fields, `2` is the legacy admin default and is +/// treated as "not explicitly configured" so existing local-execution behaviour +/// (one attempt slot per candidate) stays unchanged. Values `0`, `1`, and `>2` +/// are treated as explicit. +const LEGACY_DEFAULT_MAX_RETRIES: u32 = 2; + +/// Upper bound on local attempt slots. This is intentionally stricter than +/// admin max_retries validation to prevent unbounded pre-materialization from +/// arbitrarily large JSON config values. +const MAX_LOCAL_ATTEMPT_SLOTS: u32 = 99; + fn local_attempt_slots_from_transport(transport: &GatewayProviderTransportSnapshot) -> Option { - transport + let rules = transport .provider .config .as_ref() .and_then(|config| config.get("failover_rules")) - .and_then(Value::as_object) + .and_then(Value::as_object); + + rules .and_then(|value| value.get("max_retries")) .and_then(Value::as_u64) .and_then(|value| u32::try_from(value).ok()) - .map(|value| value.max(1)) + .or_else(|| { + transport + .endpoint + .max_retries + .and_then(|value| u32::try_from(value).ok()) + .filter(|&value| value != LEGACY_DEFAULT_MAX_RETRIES) + }) + .or_else(|| { + transport + .provider + .max_retries + .and_then(|value| u32::try_from(value).ok()) + .filter(|&value| value != LEGACY_DEFAULT_MAX_RETRIES) + }) + .map(|value| value.clamp(1, MAX_LOCAL_ATTEMPT_SLOTS)) } #[cfg(test)] @@ -190,11 +217,172 @@ mod tests { } #[test] - fn build_local_attempt_identities_require_explicit_failover_rule_for_expansion() { + fn build_local_attempt_identities_falls_back_to_endpoint_max_retries() { let identities = build_local_attempt_identities(2, &sample_transport(Some(5), Some(3), None)); - assert_eq!(identities, vec![ExecutionAttemptIdentity::new(2, 0)]); + assert_eq!( + identities, + vec![ + ExecutionAttemptIdentity::new(2, 0), + ExecutionAttemptIdentity::new(2, 1), + ExecutionAttemptIdentity::new(2, 2), + ] + ); + } + + #[test] + fn build_local_attempt_identities_falls_back_to_provider_max_retries() { + let identities = build_local_attempt_identities(0, &sample_transport(Some(4), None, None)); + + assert_eq!( + identities, + vec![ + ExecutionAttemptIdentity::new(0, 0), + ExecutionAttemptIdentity::new(0, 1), + ExecutionAttemptIdentity::new(0, 2), + ExecutionAttemptIdentity::new(0, 3), + ] + ); + } + + #[test] + fn build_local_attempt_identities_endpoint_overrides_provider() { + let identities = + build_local_attempt_identities(7, &sample_transport(Some(10), Some(3), None)); + + assert_eq!( + identities, + vec![ + ExecutionAttemptIdentity::new(7, 0), + ExecutionAttemptIdentity::new(7, 1), + ExecutionAttemptIdentity::new(7, 2), + ] + ); + } + + #[test] + fn build_local_attempt_identities_default_two_treated_as_unset() { + let identities = + build_local_attempt_identities(5, &sample_transport(Some(2), Some(2), None)); + + assert_eq!(identities, vec![ExecutionAttemptIdentity::new(5, 0)]); + } + + #[test] + fn build_local_attempt_identities_endpoint_two_falls_back_to_provider_ten() { + let identities = + build_local_attempt_identities(1, &sample_transport(Some(10), Some(2), None)); + + assert_eq!( + identities, + vec![ + ExecutionAttemptIdentity::new(1, 0), + ExecutionAttemptIdentity::new(1, 1), + ExecutionAttemptIdentity::new(1, 2), + ExecutionAttemptIdentity::new(1, 3), + ExecutionAttemptIdentity::new(1, 4), + ExecutionAttemptIdentity::new(1, 5), + ExecutionAttemptIdentity::new(1, 6), + ExecutionAttemptIdentity::new(1, 7), + ExecutionAttemptIdentity::new(1, 8), + ExecutionAttemptIdentity::new(1, 9), + ] + ); + } + + #[test] + fn build_local_attempt_identities_failover_rules_zero_produces_one_slot() { + let identities = build_local_attempt_identities( + 1, + &sample_transport( + Some(5), + Some(4), + Some(json!({ + "failover_rules": { + "max_retries": 0 + } + })), + ), + ); + + assert_eq!(identities, vec![ExecutionAttemptIdentity::new(1, 0)]); + } + + #[test] + fn build_local_attempt_identities_endpoint_zero_produces_one_slot() { + let identities = + build_local_attempt_identities(3, &sample_transport(Some(5), Some(0), None)); + + assert_eq!(identities, vec![ExecutionAttemptIdentity::new(3, 0)]); + } + + #[test] + fn build_local_attempt_identities_provider_zero_produces_one_slot() { + let identities = build_local_attempt_identities(3, &sample_transport(Some(0), None, None)); + + assert_eq!(identities, vec![ExecutionAttemptIdentity::new(3, 0)]); + } + + #[test] + fn build_local_attempt_identities_provider_ten_creates_ten_slots() { + let identities = build_local_attempt_identities(2, &sample_transport(Some(10), None, None)); + + assert_eq!(identities.len(), 10); + assert_eq!(identities[0], ExecutionAttemptIdentity::new(2, 0)); + assert_eq!(identities[9], ExecutionAttemptIdentity::new(2, 9)); + } + + #[test] + fn build_local_attempt_identities_failover_rules_over_limit_clamped_to_max() { + let identities = build_local_attempt_identities( + 0, + &sample_transport( + Some(3), + Some(5), + Some(json!({ + "failover_rules": { + "max_retries": 1000 + } + })), + ), + ); + + assert_eq!(identities.len(), 99); + } + + #[test] + fn build_local_attempt_identities_failover_rules_u32_max_clamped_to_max() { + let identities = build_local_attempt_identities( + 0, + &sample_transport( + None, + None, + Some(json!({ + "failover_rules": { + "max_retries": u32::MAX + } + })), + ), + ); + + assert_eq!(identities.len(), 99); + } + + #[test] + fn build_local_attempt_identities_endpoint_over_limit_clamped_to_max() { + let identities = + build_local_attempt_identities(0, &sample_transport(None, Some(2000), None)); + + assert_eq!(identities.len(), 99); + } + + #[test] + fn build_local_attempt_identities_provider_over_limit_clamped_to_max() { + let identities = + build_local_attempt_identities(0, &sample_transport(Some(5000), None, None)); + + assert_eq!(identities.len(), 99); } #[test] diff --git a/crates/aether-scheduler-core/src/request_candidate.rs b/crates/aether-scheduler-core/src/request_candidate.rs index e97d9d777..1d3c6fc17 100644 --- a/crates/aether-scheduler-core/src/request_candidate.rs +++ b/crates/aether-scheduler-core/src/request_candidate.rs @@ -72,6 +72,17 @@ pub struct ReportRequestCandidateStatusRecordInput { pub now_unix_ms: u64, } +struct ReportCandidateExtraDataInput { + client_api_format: Option, + provider_api_format: Option, + upstream_url: Option, + mapped_model: Option, + key_name: Option, + header_rules: Option, + body_rules: Option, + proxy: Option, +} + pub fn execution_error_details( error: Option<&ExecutionError>, body_json: Option<&Value>, @@ -153,7 +164,7 @@ pub fn resolve_report_request_candidate_slot( proxy, } = metadata; let request_id = request_id?; - let synthesized_extra_data = build_report_candidate_extra_data( + let synthesized_extra_data = build_report_candidate_extra_data(ReportCandidateExtraDataInput { client_api_format, provider_api_format, upstream_url, @@ -162,7 +173,7 @@ pub fn resolve_report_request_candidate_slot( header_rules, body_rules, proxy, - ); + }); let created_at_unix_ms = matched_candidate .as_ref() .map(|candidate| candidate.created_at_unix_ms) @@ -316,16 +327,16 @@ pub fn build_local_request_candidate_status_record( .filter(|value| !value.is_empty())?; let metadata = parse_request_candidate_report_context(report_context)?; let candidate_index = metadata.candidate_index?; - let extra_data = build_report_candidate_extra_data( - metadata.client_api_format.clone(), - metadata.provider_api_format.clone(), - metadata.upstream_url.clone(), - metadata.mapped_model.clone(), - metadata.key_name.clone(), - metadata.header_rules.clone(), - metadata.body_rules.clone(), - metadata.proxy.clone(), - ); + let extra_data = build_report_candidate_extra_data(ReportCandidateExtraDataInput { + client_api_format: metadata.client_api_format.clone(), + provider_api_format: metadata.provider_api_format.clone(), + upstream_url: metadata.upstream_url.clone(), + mapped_model: metadata.mapped_model.clone(), + key_name: metadata.key_name.clone(), + header_rules: metadata.header_rules.clone(), + body_rules: metadata.body_rules.clone(), + proxy: metadata.proxy.clone(), + }); let created_at_unix_ms = started_at_unix_ms.or(finished_at_unix_ms); Some(UpsertRequestCandidateRecord { @@ -526,17 +537,18 @@ fn next_candidate_index(candidates: &[StoredRequestCandidate]) -> u32 { .unwrap_or_default() } -fn build_report_candidate_extra_data( - client_api_format: Option, - provider_api_format: Option, - upstream_url: Option, - mapped_model: Option, - key_name: Option, - header_rules: Option, - body_rules: Option, - proxy: Option, -) -> Option { - let mut extra_data = Map::with_capacity(8); +fn build_report_candidate_extra_data(input: ReportCandidateExtraDataInput) -> Option { + let ReportCandidateExtraDataInput { + client_api_format, + provider_api_format, + upstream_url, + mapped_model, + key_name, + header_rules, + body_rules, + proxy, + } = input; + let mut extra_data = Map::with_capacity(10); extra_data.insert("gateway_execution_runtime".to_string(), Value::Bool(true)); extra_data.insert("phase".to_string(), Value::String("3c_trial".to_string())); if let Some(client_api_format) = client_api_format {