Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 193 additions & 5 deletions apps/aether-gateway/src/orchestration/attempt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,44 @@ pub(crate) fn local_attempt_slot_count(transport: &GatewayProviderTransportSnaps
local_attempt_slots_from_transport(transport).unwrap_or(1)
}

/// For endpoint/provider table fields, `2` is the legacy admin default and is
/// treated as "not explicitly configured" so existing local-execution behaviour
/// (one attempt slot per candidate) stays unchanged. Values `0`, `1`, and `>2`
/// are treated as explicit.
const LEGACY_DEFAULT_MAX_RETRIES: u32 = 2;

/// Upper bound on local attempt slots. This is intentionally stricter than
/// admin max_retries validation to prevent unbounded pre-materialization from
/// arbitrarily large JSON config values.
const MAX_LOCAL_ATTEMPT_SLOTS: u32 = 99;

fn local_attempt_slots_from_transport(transport: &GatewayProviderTransportSnapshot) -> Option<u32> {
transport
let rules = transport
.provider
.config
.as_ref()
.and_then(|config| config.get("failover_rules"))
.and_then(Value::as_object)
.and_then(Value::as_object);

rules
.and_then(|value| value.get("max_retries"))
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.map(|value| value.max(1))
.or_else(|| {
transport
.endpoint
.max_retries
.and_then(|value| u32::try_from(value).ok())
.filter(|&value| value != LEGACY_DEFAULT_MAX_RETRIES)
})
.or_else(|| {
transport
.provider
.max_retries
.and_then(|value| u32::try_from(value).ok())
.filter(|&value| value != LEGACY_DEFAULT_MAX_RETRIES)
})
.map(|value| value.clamp(1, MAX_LOCAL_ATTEMPT_SLOTS))
}

#[cfg(test)]
Expand Down Expand Up @@ -190,11 +217,172 @@ mod tests {
}

#[test]
fn build_local_attempt_identities_require_explicit_failover_rule_for_expansion() {
fn build_local_attempt_identities_falls_back_to_endpoint_max_retries() {
let identities =
build_local_attempt_identities(2, &sample_transport(Some(5), Some(3), None));

assert_eq!(identities, vec![ExecutionAttemptIdentity::new(2, 0)]);
assert_eq!(
identities,
vec![
ExecutionAttemptIdentity::new(2, 0),
ExecutionAttemptIdentity::new(2, 1),
ExecutionAttemptIdentity::new(2, 2),
]
);
}

#[test]
fn build_local_attempt_identities_falls_back_to_provider_max_retries() {
let identities = build_local_attempt_identities(0, &sample_transport(Some(4), None, None));

assert_eq!(
identities,
vec![
ExecutionAttemptIdentity::new(0, 0),
ExecutionAttemptIdentity::new(0, 1),
ExecutionAttemptIdentity::new(0, 2),
ExecutionAttemptIdentity::new(0, 3),
]
);
}

#[test]
fn build_local_attempt_identities_endpoint_overrides_provider() {
let identities =
build_local_attempt_identities(7, &sample_transport(Some(10), Some(3), None));

assert_eq!(
identities,
vec![
ExecutionAttemptIdentity::new(7, 0),
ExecutionAttemptIdentity::new(7, 1),
ExecutionAttemptIdentity::new(7, 2),
]
);
}

#[test]
fn build_local_attempt_identities_default_two_treated_as_unset() {
let identities =
build_local_attempt_identities(5, &sample_transport(Some(2), Some(2), None));

assert_eq!(identities, vec![ExecutionAttemptIdentity::new(5, 0)]);
}

#[test]
fn build_local_attempt_identities_endpoint_two_falls_back_to_provider_ten() {
let identities =
build_local_attempt_identities(1, &sample_transport(Some(10), Some(2), None));

assert_eq!(
identities,
vec![
ExecutionAttemptIdentity::new(1, 0),
ExecutionAttemptIdentity::new(1, 1),
ExecutionAttemptIdentity::new(1, 2),
ExecutionAttemptIdentity::new(1, 3),
ExecutionAttemptIdentity::new(1, 4),
ExecutionAttemptIdentity::new(1, 5),
ExecutionAttemptIdentity::new(1, 6),
ExecutionAttemptIdentity::new(1, 7),
ExecutionAttemptIdentity::new(1, 8),
ExecutionAttemptIdentity::new(1, 9),
]
);
}

#[test]
fn build_local_attempt_identities_failover_rules_zero_produces_one_slot() {
let identities = build_local_attempt_identities(
1,
&sample_transport(
Some(5),
Some(4),
Some(json!({
"failover_rules": {
"max_retries": 0
}
})),
),
);

assert_eq!(identities, vec![ExecutionAttemptIdentity::new(1, 0)]);
}

#[test]
fn build_local_attempt_identities_endpoint_zero_produces_one_slot() {
let identities =
build_local_attempt_identities(3, &sample_transport(Some(5), Some(0), None));

assert_eq!(identities, vec![ExecutionAttemptIdentity::new(3, 0)]);
}

#[test]
fn build_local_attempt_identities_provider_zero_produces_one_slot() {
let identities = build_local_attempt_identities(3, &sample_transport(Some(0), None, None));

assert_eq!(identities, vec![ExecutionAttemptIdentity::new(3, 0)]);
}

#[test]
fn build_local_attempt_identities_provider_ten_creates_ten_slots() {
let identities = build_local_attempt_identities(2, &sample_transport(Some(10), None, None));

assert_eq!(identities.len(), 10);
assert_eq!(identities[0], ExecutionAttemptIdentity::new(2, 0));
assert_eq!(identities[9], ExecutionAttemptIdentity::new(2, 9));
}

#[test]
fn build_local_attempt_identities_failover_rules_over_limit_clamped_to_max() {
let identities = build_local_attempt_identities(
0,
&sample_transport(
Some(3),
Some(5),
Some(json!({
"failover_rules": {
"max_retries": 1000
}
})),
),
);

assert_eq!(identities.len(), 99);
}

#[test]
fn build_local_attempt_identities_failover_rules_u32_max_clamped_to_max() {
let identities = build_local_attempt_identities(
0,
&sample_transport(
None,
None,
Some(json!({
"failover_rules": {
"max_retries": u32::MAX
}
})),
),
);

assert_eq!(identities.len(), 99);
}

#[test]
fn build_local_attempt_identities_endpoint_over_limit_clamped_to_max() {
let identities =
build_local_attempt_identities(0, &sample_transport(None, Some(2000), None));

assert_eq!(identities.len(), 99);
}

#[test]
fn build_local_attempt_identities_provider_over_limit_clamped_to_max() {
let identities =
build_local_attempt_identities(0, &sample_transport(Some(5000), None, None));

assert_eq!(identities.len(), 99);
}

#[test]
Expand Down
58 changes: 35 additions & 23 deletions crates/aether-scheduler-core/src/request_candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ pub struct ReportRequestCandidateStatusRecordInput {
pub now_unix_ms: u64,
}

struct ReportCandidateExtraDataInput {
client_api_format: Option<String>,
provider_api_format: Option<String>,
upstream_url: Option<String>,
mapped_model: Option<String>,
key_name: Option<String>,
header_rules: Option<Value>,
body_rules: Option<Value>,
proxy: Option<Value>,
}

pub fn execution_error_details(
error: Option<&ExecutionError>,
body_json: Option<&Value>,
Expand Down Expand Up @@ -153,7 +164,7 @@ pub fn resolve_report_request_candidate_slot(
proxy,
} = metadata;
let request_id = request_id?;
let synthesized_extra_data = build_report_candidate_extra_data(
let synthesized_extra_data = build_report_candidate_extra_data(ReportCandidateExtraDataInput {
client_api_format,
provider_api_format,
upstream_url,
Expand All @@ -162,7 +173,7 @@ pub fn resolve_report_request_candidate_slot(
header_rules,
body_rules,
proxy,
);
});
let created_at_unix_ms = matched_candidate
.as_ref()
.map(|candidate| candidate.created_at_unix_ms)
Expand Down Expand Up @@ -316,16 +327,16 @@ pub fn build_local_request_candidate_status_record(
.filter(|value| !value.is_empty())?;
let metadata = parse_request_candidate_report_context(report_context)?;
let candidate_index = metadata.candidate_index?;
let extra_data = build_report_candidate_extra_data(
metadata.client_api_format.clone(),
metadata.provider_api_format.clone(),
metadata.upstream_url.clone(),
metadata.mapped_model.clone(),
metadata.key_name.clone(),
metadata.header_rules.clone(),
metadata.body_rules.clone(),
metadata.proxy.clone(),
);
let extra_data = build_report_candidate_extra_data(ReportCandidateExtraDataInput {
client_api_format: metadata.client_api_format.clone(),
provider_api_format: metadata.provider_api_format.clone(),
upstream_url: metadata.upstream_url.clone(),
mapped_model: metadata.mapped_model.clone(),
key_name: metadata.key_name.clone(),
header_rules: metadata.header_rules.clone(),
body_rules: metadata.body_rules.clone(),
proxy: metadata.proxy.clone(),
});
let created_at_unix_ms = started_at_unix_ms.or(finished_at_unix_ms);

Some(UpsertRequestCandidateRecord {
Expand Down Expand Up @@ -526,17 +537,18 @@ fn next_candidate_index(candidates: &[StoredRequestCandidate]) -> u32 {
.unwrap_or_default()
}

fn build_report_candidate_extra_data(
client_api_format: Option<String>,
provider_api_format: Option<String>,
upstream_url: Option<String>,
mapped_model: Option<String>,
key_name: Option<String>,
header_rules: Option<Value>,
body_rules: Option<Value>,
proxy: Option<Value>,
) -> Option<Value> {
let mut extra_data = Map::with_capacity(8);
fn build_report_candidate_extra_data(input: ReportCandidateExtraDataInput) -> Option<Value> {
let ReportCandidateExtraDataInput {
client_api_format,
provider_api_format,
upstream_url,
mapped_model,
key_name,
header_rules,
body_rules,
proxy,
} = input;
let mut extra_data = Map::with_capacity(10);
extra_data.insert("gateway_execution_runtime".to_string(), Value::Bool(true));
extra_data.insert("phase".to_string(), Value::String("3c_trial".to_string()));
if let Some(client_api_format) = client_api_format {
Expand Down
Loading