diff --git a/src/object_store/stats.rs b/src/object_store/stats.rs index 4e6f3e2..98838f1 100644 --- a/src/object_store/stats.rs +++ b/src/object_store/stats.rs @@ -12,6 +12,11 @@ const ALPHA: f64 = 0.015; const LATENCY_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(1); const CONSECUTIVE_FAILURE_THRESHOLD: u32 = 5; const RECOVERY_TIME: Duration = Duration::from_secs(30); +const POSITION_PENALTY: u64 = 2_000; +const UNKNOWN_BUCKET_PENALTY: u64 = 5000; +const ERROR_RATE_SCORE_MULTIPLIER: f64 = 100_000.0; +const ERROR_RATE_MAX: f64 = 1.0; +const CIRCUIT_OPEN_SCORE_PENALTY: u64 = 1_000_000; #[derive(Debug, Clone)] pub struct BucketMetrics { @@ -120,7 +125,7 @@ impl BucketedStats { let entry = self.by_bucket.entry(bucket).or_default(); let mut stats = entry.lock(); - stats.error_rate = stats.error_rate(now) * (1.0 - ALPHA); + stats.error_rate = stats.error_rate(now); if let Ok(lat) = outcome { // Success: update error rate and reset failures stats.consecutive_failures = 0; @@ -131,7 +136,7 @@ impl BucketedStats { // Failure: update error rate and increment failures stats.consecutive_failures += 1; stats.last_failure_time = now; - stats.error_rate += ALPHA; + stats.error_rate = (stats.error_rate + ALPHA).min(ERROR_RATE_MAX); } stats.last_update = now; } @@ -143,14 +148,14 @@ impl BucketedStats { let now = Instant::now(); buckets .enumerate() - .sorted_by_cached_key(|(i, bucket)| self.score(now, bucket, *i)) + .sorted_by_cached_key(|(i, bucket)| (self.score(now, bucket, *i), *i)) .map(|(i, _bucket)| i) } /// Calculate a score for bucket selection. Lower scores are preferred. /// /// The scoring formula balances three factors: - /// 1. **Position penalty** (idx * 200): Respects client's bucket ordering preference + /// 1. **Position penalty** (idx * 2000): Strongly respects client bucket ordering preference /// 2. **Latency penalty** (µs / 100): Based on observed performance /// 3. **Error penalty**: Based on error rate or circuit breaker state /// @@ -159,32 +164,34 @@ impl BucketedStats { /// - Client ordering is preserved even with typical S3 latencies (~200ms same-region) /// - The system still explores unknown buckets when known ones fail /// - /// When a circuit breaker is open (10000 points penalty), the bucket is strongly - /// deprioritized to ensure failed buckets are avoided in favor of any healthy bucket. + /// Error penalties are intentionally weighted orders of magnitude above latency so + /// healthy buckets are preferred over slightly faster but failing ones. fn score(&self, now: Instant, bucket: &BucketName, idx: usize) -> u64 { - let base = (idx as u64) * 200; - self.by_bucket.get(bucket).map_or(base + 5000, |s| { - let mut guard = s.lock(); - - // Calculate latency component: 1 point per 100 µs = 0.1 ms - // - S3 Express same-AZ: ~4ms → 40 points - // - S3 Express cross-AZ: ~8ms → 80 points - // - Standard S3 same-region: ~200ms → 2000 points - // - Standard S3 cross-region: 300-1000ms → 3000-10000 points - let lat = guard - .latency_micros_snapshot(now, self.hedge_latency_quantile) - .mean - / 100; - - // Calculate error component based on circuit breaker state - let err = if guard.is_circuit_open(now) { - 10000 - } else { - (guard.error_rate(now) * 100.0).round() as u64 - }; - - base + err + lat - }) + let base = (idx as u64) * POSITION_PENALTY; + self.by_bucket + .get(bucket) + .map_or(base + UNKNOWN_BUCKET_PENALTY, |s| { + let mut guard = s.lock(); + + // Calculate latency component: 1 point per 100 µs = 0.1 ms + // - S3 Express same-AZ: ~4ms → 40 points + // - S3 Express cross-AZ: ~8ms → 80 points + // - Standard S3 same-region: ~200ms → 2000 points + // - Standard S3 cross-region: 300-1000ms → 3000-10000 points + let lat = guard + .latency_micros_snapshot(now, self.hedge_latency_quantile) + .mean + / 100; + + // Calculate error component based on circuit breaker state + let err = if guard.is_circuit_open(now) { + CIRCUIT_OPEN_SCORE_PENALTY + } else { + (guard.error_rate(now) * ERROR_RATE_SCORE_MULTIPLIER).round() as u64 + }; + + base + err + lat + }) } /// Returns `Duration::ZERO` if hedging is disabled or no latency datapoints are available. @@ -240,9 +247,15 @@ mod tests { let now = Instant::now(); // Test base scoring without any data (includes 5000 point penalty for unknown) - assert_eq!(stats.score(now, &bucket1, 0), 5000); - assert_eq!(stats.score(now, &bucket2, 1), 5200); - assert_eq!(stats.score(now, &bucket3, 2), 5400); + assert_eq!(stats.score(now, &bucket1, 0), UNKNOWN_BUCKET_PENALTY); + assert_eq!( + stats.score(now, &bucket2, 1), + UNKNOWN_BUCKET_PENALTY + POSITION_PENALTY + ); + assert_eq!( + stats.score(now, &bucket3, 2), + UNKNOWN_BUCKET_PENALTY + 2 * POSITION_PENALTY + ); } #[test] @@ -276,9 +289,9 @@ mod tests { let now = Instant::now(); let score = stats.score(now, &bucket, 0); - // With 3 errors and ALPHA=0.015: error_rate ≈ 0.044 - // Score should be 0 (base) + ~4 (0.044 * 100) + 0 (no latency) - assert!(score <= 10, "Score was {score}"); + // With 3 errors and ALPHA=0.015: error_rate = 0.045 (minus tiny time decay) + // Score should be 0 (base) + ~4500 (0.045 * 100_000) + 0 (no latency) + assert!((4300..=4500).contains(&score), "Score was {score}"); } #[test] @@ -294,9 +307,9 @@ mod tests { let now = Instant::now(); let score = stats.score(now, &bucket, 0); - // Score should be 0 (base) + 10000 (circuit open) + 0 (no latency) + // Score should be 0 (base) + 1_000_000 (circuit open) + 0 (no latency) assert_eq!( - score, 10000, + score, CIRCUIT_OPEN_SCORE_PENALTY, "Circuit breaker didn't open, score was {score}", ); } @@ -313,20 +326,19 @@ mod tests { stats.observe(bucket.clone(), Err(())); stats.observe(bucket.clone(), Err(())); - // Check initial error penalty (error_rate ≈ 0.044 -> ~4 points) + // Check initial error penalty (error_rate ≈ 0.045 -> ~4500 points) let now = Instant::now(); let initial_score = stats.score(now, &bucket, 0); - assert!(initial_score <= 10, "Initial score was {initial_score}"); + assert!(initial_score >= 4300, "Initial score was {initial_score}"); // Advance time by 10 seconds to allow decay // After ~46s (one half-life), error should decay by 50% tokio::time::advance(Duration::from_secs(46)).await; let now = Instant::now(); let decayed_score = stats.score(now, &bucket, 0); - // After one half-life (~46s), error rate should be ~50% of original - // Initial score ~4, decayed should be ~2 + // After one half-life (~46s), error rate should be ~50% of original. assert!( - decayed_score <= initial_score / 2 + 1, + decayed_score <= initial_score / 2 + 100, "Error rate didn't decay by half: {initial_score} -> {decayed_score}", ); @@ -337,11 +349,53 @@ mod tests { let now = Instant::now(); let final_score = stats.score(now, &bucket, 0); assert!( - final_score <= 6, + final_score < initial_score / 3, "Score didn't decay sufficiently: {final_score}" ); } + #[tokio::test] + async fn test_success_without_elapsed_time_preserves_error_penalty() { + tokio::time::pause(); + + let stats = make_test_stats(); + let bucket = BucketName::new("no-elapsed-success").unwrap(); + + for _ in 0..3 { + stats.observe(bucket.clone(), Err(())); + } + + let score_before = stats.score(Instant::now(), &bucket, 0); + stats.observe(bucket.clone(), Ok(Duration::ZERO)); + let score_after = stats.score(Instant::now(), &bucket, 0); + + assert_eq!( + score_after, score_before, + "Error penalty should not decay without elapsed time" + ); + } + + #[tokio::test] + async fn test_error_rate_is_capped_after_repeated_failures() { + tokio::time::pause(); + + let stats = make_test_stats(); + let bucket = BucketName::new("error-rate-cap").unwrap(); + + for _ in 0..200 { + stats.observe(bucket.clone(), Err(())); + } + + // Let circuit breaker recovery window expire so scoring uses error_rate again. + tokio::time::advance(RECOVERY_TIME + Duration::from_secs(1)).await; + let score = stats.score(Instant::now(), &bucket, 0); + + assert!( + score <= ERROR_RATE_SCORE_MULTIPLIER as u64, + "Score exceeded max error-rate penalty: {score}" + ); + } + #[tokio::test] async fn test_latency_snapshot_caching() { tokio::time::pause(); @@ -391,25 +445,28 @@ mod tests { // Verify it's open let now = Instant::now(); - assert_eq!(stats.score(now, &bucket, 0), 10000); + assert_eq!(stats.score(now, &bucket, 0), CIRCUIT_OPEN_SCORE_PENALTY); // Still open before recovery time tokio::time::advance(RECOVERY_TIME.checked_sub(Duration::from_secs(1)).unwrap()).await; let now = Instant::now(); - assert_eq!(stats.score(now, &bucket, 0), 10000); + assert_eq!(stats.score(now, &bucket, 0), CIRCUIT_OPEN_SCORE_PENALTY); // After recovery time, should be closed (ready to try again) tokio::time::advance(Duration::from_secs(2)).await; let now = Instant::now(); let score = stats.score(now, &bucket, 0); - assert!(score < 100, "Circuit should be closed after recovery time"); + assert!( + score < CIRCUIT_OPEN_SCORE_PENALTY, + "Circuit should be closed after recovery time" + ); // If it fails again, back to open for _ in 0..5 { stats.observe(bucket.clone(), Err(())); } let now = Instant::now(); - assert_eq!(stats.score(now, &bucket, 0), 10000); + assert_eq!(stats.score(now, &bucket, 0), CIRCUIT_OPEN_SCORE_PENALTY); } #[test] @@ -428,9 +485,9 @@ mod tests { let score = stats.score(now, &bucket, 0); // Mixed traffic (3 success, 2 failures) gives moderate error rate - // With latency ~10ms = 100 points, error component should be small + // Error component dominates latency once failures are observed. assert!( - (100..=200).contains(&score), + (2900..=3200).contains(&score), "Mixed traffic score was {score}", ); } @@ -462,10 +519,31 @@ mod tests { let buckets = vec![bucket1.clone(), bucket2.clone(), bucket3.clone()]; let ordered = get_attempt_order(&stats, &buckets); - // Should be ordered: fast, error (has errors but still better than slow), slow + // Reliability now dominates latency for unhealthy buckets. assert_eq!(ordered[0], bucket1); - assert_eq!(ordered[1], bucket3); - assert_eq!(ordered[2], bucket2); + assert_eq!(ordered[1], bucket2); + assert_eq!(ordered[2], bucket3); + } + + #[test] + fn test_tie_break_prefers_client_order() { + let stats = make_test_stats(); + let first_bucket = BucketName::new("first-bucket").unwrap(); + let second_bucket = BucketName::new("second-bucket").unwrap(); + + for _ in 0..10 { + stats.observe(first_bucket.clone(), Ok(Duration::from_millis(250))); + stats.observe(second_bucket.clone(), Ok(Duration::from_millis(50))); + } + + // Tie by score: + // - first: base(0) + lat(2500) = 2500 + // - second: base(2000) + lat(500) = 2500 + // Client order should win ties. + let buckets = vec![first_bucket.clone(), second_bucket.clone()]; + let ordered = get_attempt_order(&stats, &buckets); + assert_eq!(ordered[0], first_bucket); + assert_eq!(ordered[1], second_bucket); } #[test] @@ -523,10 +601,10 @@ mod tests { let buckets = vec![bucket1.clone(), bucket2.clone(), bucket3.clone()]; let ordered = get_attempt_order(&stats, &buckets); - // Primary should still be first (low latency, few errors) - assert_eq!(ordered[0], bucket1); - // Secondary should be second (higher latency but no errors) - assert_eq!(ordered[1], bucket2); + // Secondary should win because it stayed healthy. + assert_eq!(ordered[0], bucket2); + // Primary remains ahead of tertiary because tertiary's circuit opens. + assert_eq!(ordered[1], bucket1); // Tertiary should be last (circuit breaker triggered) assert_eq!(ordered[2], bucket3); } @@ -574,9 +652,9 @@ mod tests { "Local score was {local_score}", ); - // Remote bucket: base(200) + default_penalty(5000) = 5200 + // Remote bucket: base(POSITION_PENALTY) + default_penalty(UNKNOWN_BUCKET_PENALTY) let remote_score = stats.score(now, &remote_bucket, 1); - assert_eq!(remote_score, 5200); + assert_eq!(remote_score, UNKNOWN_BUCKET_PENALTY + POSITION_PENALTY); // Verify ordering - local should come first let buckets = vec![local_bucket.clone(), remote_bucket.clone()]; @@ -660,17 +738,17 @@ mod tests { assert_eq!(ordered[2], tertiary, "Unknown tertiary should stay third"); // Even with some errors on primary, it should still be preferred if error rate is low - stats.observe(primary.clone(), Err(())); // 1 error out of 6 attempts = ~16% error rate + stats.observe(primary.clone(), Err(())); // One observed error adds a 1.5% penalty weight let buckets = vec![primary.clone(), secondary, tertiary]; let ordered = get_attempt_order(&stats, &buckets); - // Primary: base(0) + err(16) + lat(2000) = 2016 - // Secondary: base(200) + err(0) + lat(3950) = 4150 - // Tertiary: base(400) + unknown(5000) = 5400 + // Primary: base(0) + err(1500) + lat(2000) = 3500 + // Secondary: base(2000) + err(0) + lat(3950) = 5950 + // Tertiary: base(4000) + unknown(5000) = 9000 assert_eq!( ordered[0], primary, - "Primary with 16% errors still better than 400ms secondary" + "Primary with a single error should still beat the much slower secondary" ); } @@ -705,13 +783,35 @@ mod tests { // because its circuit is open // Scores: // - primary: base(0) + lat(2000) = 2000 - // - secondary: base(200) + err(10000) + lat(4000) = 14200 - // - tertiary: base(400) + unknown(5000) = 5400 + // - secondary: base(2000) + err(1_000_000) + lat(4000) = 1_006_000 + // - tertiary: base(4000) + unknown(5000) = 9000 assert_eq!(ordered[0], primary, "Healthy primary should be first"); assert_eq!(ordered[1], tertiary, "Unknown tertiary should be second"); assert_eq!(ordered[2], secondary, "Failed secondary should be last"); } + #[test] + fn test_failing_low_latency_bucket_is_deprioritized() { + let stats = make_test_stats(); + let low_latency_failing = BucketName::new("low-latency-failing").unwrap(); + let high_latency_healthy = BucketName::new("high-latency-healthy").unwrap(); + + stats.observe(low_latency_failing.clone(), Ok(Duration::from_millis(5))); + for _ in 0..5 { + stats.observe(low_latency_failing.clone(), Err(())); + } + + for _ in 0..5 { + stats.observe(high_latency_healthy.clone(), Ok(Duration::from_millis(50))); + } + + let buckets = vec![low_latency_failing.clone(), high_latency_healthy.clone()]; + let ordered = get_attempt_order(&stats, &buckets); + + assert_eq!(ordered[0], high_latency_healthy); + assert_eq!(ordered[1], low_latency_failing); + } + #[test] fn test_s3_express_buckets() { let stats = make_test_stats(); @@ -748,8 +848,8 @@ mod tests { // With small latency differences, position penalties dominate // Scores: // - nearby_az: base(0) + lat(70) = 70 - // - far_az: base(200) + lat(120) = 320 - // - same_az: base(400) + lat(40) = 440 + // - far_az: base(2000) + lat(120) = 2120 + // - same_az: base(4000) + lat(40) = 4040 assert_eq!( ordered[0], nearby_az, "Client preference preserved despite 3ms penalty" @@ -764,8 +864,8 @@ mod tests { // Even 8ms difference (12ms vs 4ms) isn't enough to override position 0 vs 1 // Scores: // - far_az: base(0) + lat(120) = 120 - // - same_az: base(200) + lat(40) = 240 - // - nearby_az: base(400) + lat(70) = 470 + // - same_az: base(2000) + lat(40) = 2040 + // - nearby_az: base(4000) + lat(70) = 4070 assert_eq!( ordered[0], far_az, "8ms latency difference doesn't override position preference" @@ -794,64 +894,64 @@ mod tests { let same_az = BucketName::new("s3express-use1-az1-same").unwrap(); let cross_az = BucketName::new("s3express-use1-az2-cross").unwrap(); - // Test the threshold where latency difference overcomes position penalty - // Position penalty is 200 points, so we need >20ms difference to override + // Test the threshold where latency difference overcomes position penalty. + // With 0.1ms per score point, POSITION_PENALTY=2000 requires >200ms to override. // same_az: consistent 4ms for _ in 0..10 { stats.observe(same_az.clone(), Ok(Duration::from_millis(4))); } - // cross_az: 25ms (21ms higher - just enough to override position penalty) + // cross_az: 250ms (246ms higher - enough to override position penalty) for _ in 0..10 { - stats.observe(cross_az.clone(), Ok(Duration::from_millis(25))); + stats.observe(cross_az.clone(), Ok(Duration::from_millis(250))); } // Test with cross_az in position 0 let buckets = vec![cross_az.clone(), same_az.clone()]; let ordered = get_attempt_order(&stats, &buckets); - // With 21ms difference (210 points), same_az at position 1 should win + // With 246ms difference (2460 points), same_az at position 1 should win. // Scores: - // - cross_az: base(0) + lat(250) = 250 - // - same_az: base(200) + lat(40) = 240 + // - cross_az: base(0) + lat(2500) = 2500 + // - same_az: base(2000) + lat(40) = 2040 assert_eq!( ordered[0], same_az, - "21ms latency difference overrides position preference" + "Large latency difference should override position preference" ); assert_eq!(ordered[1], cross_az); - // Now test with only 19ms difference (not enough to override) + // Now test with only 186ms difference (not enough to override) let nearby_az = BucketName::new("s3express-use1-az3-nearby").unwrap(); for _ in 0..10 { - stats.observe(nearby_az.clone(), Ok(Duration::from_millis(23))); + stats.observe(nearby_az.clone(), Ok(Duration::from_millis(190))); } let buckets = vec![nearby_az.clone(), same_az.clone()]; let ordered = get_attempt_order(&stats, &buckets); - // With 19ms difference (190 points), position penalty wins + // With 186ms difference (1860 points), position penalty wins. // Scores: - // - nearby_az: base(0) + lat(230) = 230 - // - same_az: base(200) + lat(40) = 240 + // - nearby_az: base(0) + lat(1900) = 1900 + // - same_az: base(2000) + lat(40) = 2040 assert_eq!( ordered[0], nearby_az, - "19ms difference isn't enough to override position" + "Sub-threshold latency difference should not override position" ); assert_eq!(ordered[1], same_az); // Show the exact threshold let now = Instant::now(); println!( - "Reordering threshold: >20ms latency difference needed to override position penalty of 200 points" + "Reordering threshold: >200ms latency difference needed to override position penalty of 2000 points" ); println!( - "25ms bucket at pos 0: {} points, 4ms bucket at pos 1: {} points (reorders)", + "250ms bucket at pos 0: {} points, 4ms bucket at pos 1: {} points (reorders)", stats.score(now, &cross_az, 0), stats.score(now, &same_az, 1) ); println!( - "23ms bucket at pos 0: {} points, 4ms bucket at pos 1: {} points (no reorder)", + "190ms bucket at pos 0: {} points, 4ms bucket at pos 1: {} points (no reorder)", stats.score(now, &nearby_az, 0), stats.score(now, &same_az, 1) );