From a7b91dab7e6753cef69ad8ef7dcee57aabc3517c Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 08:52:59 -0500 Subject: [PATCH 01/11] fix(service): classify coalesced cache misses via foyer source --- src/service/mod.rs | 60 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/src/service/mod.rs b/src/service/mod.rs index 074be11..4f3d85a 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -2,13 +2,14 @@ use std::{ net::SocketAddr, num::NonZeroU32, ops::{Range, RangeInclusive}, - sync::{Arc, atomic::AtomicBool}, + sync::Arc, time::Duration, }; use bytes::Bytes; use crossbeam::atomic::AtomicCell; use eyre::Result; +use foyer::Source; use futures::{Stream, StreamExt}; use parking_lot::Mutex; @@ -238,6 +239,16 @@ fn now() -> u32 { .as_secs() as u32 } +fn cache_hit_for_source(source: Source, value: &mut CacheValue) -> bool { + match source { + Source::Outer => { + value.cached_at = 0; + false + } + Source::Memory | Source::Disk => true, + } +} + #[derive(Debug, Clone)] struct PageGetExecutor { cache: foyer::HybridCache, @@ -258,13 +269,10 @@ impl PageGetExecutor { object: self.object.clone(), page_id, }; - let hit_state = Arc::new(AtomicBool::new(true)); match self .cache .get_or_fetch(&cache_key, { - let hit_state = hit_state.clone(); move || async move { - hit_state.store(false, std::sync::atomic::Ordering::Relaxed); metrics::page_request_count(&self.kind, "download"); let start = u64::from(page_id) * PAGE_SIZE; @@ -316,13 +324,8 @@ impl PageGetExecutor { } Ok(Some(_)) | Err(None) => unreachable!("CAS"), } - // It gets initialized as now() for insertion via fetch() in case of a miss, - // but we need this to signal whether it was a hit or miss. - // 0 => miss, non-zero => timestamp. - if hit_state.load(std::sync::atomic::Ordering::Relaxed) { + if cache_hit_for_source(entry.source(), &mut value) { metrics::page_request_count(&key.kind, "cache_hit"); - } else { - value.cached_at = 0; } Ok((page_id, value)) } @@ -390,4 +393,41 @@ mod tests { other => panic!("unexpected error: {other:?}"), } } + + #[test] + fn cache_hit_for_source_marks_outer_as_miss() { + let mut value = CacheValue { + bucket: BucketName::new("test-bucket").expect("bucket name"), + mtime: 0, + data: Bytes::from_static(b"hello"), + object_size: 5, + cached_at: 123, + }; + + assert!(!cache_hit_for_source(Source::Outer, &mut value)); + assert_eq!(value.cached_at, 0); + } + + #[test] + fn cache_hit_for_source_keeps_cached_at_for_hits() { + let mut memory_value = CacheValue { + bucket: BucketName::new("test-bucket").expect("bucket name"), + mtime: 0, + data: Bytes::from_static(b"hello"), + object_size: 5, + cached_at: 123, + }; + assert!(cache_hit_for_source(Source::Memory, &mut memory_value)); + assert_eq!(memory_value.cached_at, 123); + + let mut disk_value = CacheValue { + bucket: BucketName::new("test-bucket").expect("bucket name"), + mtime: 0, + data: Bytes::from_static(b"hello"), + object_size: 5, + cached_at: 456, + }; + assert!(cache_hit_for_source(Source::Disk, &mut disk_value)); + assert_eq!(disk_value.cached_at, 456); + } } From a58016f141a6040c3933bdbb2447d0ca2f9401e7 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:04:08 -0500 Subject: [PATCH 02/11] feat(metrics): count coalesced page requests --- src/service/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/src/service/mod.rs b/src/service/mod.rs index 4f3d85a..c9ab45f 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -2,7 +2,10 @@ use std::{ net::SocketAddr, num::NonZeroU32, ops::{Range, RangeInclusive}, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::Duration, }; @@ -239,13 +242,21 @@ fn now() -> u32 { .as_secs() as u32 } -fn cache_hit_for_source(source: Source, value: &mut CacheValue) -> bool { +fn page_request_type_for_source( + source: Source, + fetched_by_current_request: bool, + value: &mut CacheValue, +) -> Option<&'static str> { match source { Source::Outer => { value.cached_at = 0; - false + if fetched_by_current_request { + None + } else { + Some("coalesced") + } } - Source::Memory | Source::Disk => true, + Source::Memory | Source::Disk => Some("cache_hit"), } } @@ -269,10 +280,13 @@ impl PageGetExecutor { object: self.object.clone(), page_id, }; + let fetched_by_current_request = Arc::new(AtomicBool::new(false)); match self .cache .get_or_fetch(&cache_key, { + let fetched_by_current_request = Arc::clone(&fetched_by_current_request); move || async move { + fetched_by_current_request.store(true, Ordering::Relaxed); metrics::page_request_count(&self.kind, "download"); let start = u64::from(page_id) * PAGE_SIZE; @@ -324,8 +338,12 @@ impl PageGetExecutor { } Ok(Some(_)) | Err(None) => unreachable!("CAS"), } - if cache_hit_for_source(entry.source(), &mut value) { - metrics::page_request_count(&key.kind, "cache_hit"); + if let Some(request_type) = page_request_type_for_source( + entry.source(), + fetched_by_current_request.load(Ordering::Relaxed), + &mut value, + ) { + metrics::page_request_count(&key.kind, request_type); } Ok((page_id, value)) } @@ -395,7 +413,7 @@ mod tests { } #[test] - fn cache_hit_for_source_marks_outer_as_miss() { + fn page_request_type_for_source_marks_leader_outer_as_miss() { let mut value = CacheValue { bucket: BucketName::new("test-bucket").expect("bucket name"), mtime: 0, @@ -404,12 +422,32 @@ mod tests { cached_at: 123, }; - assert!(!cache_hit_for_source(Source::Outer, &mut value)); + assert_eq!( + page_request_type_for_source(Source::Outer, true, &mut value), + None + ); assert_eq!(value.cached_at, 0); } #[test] - fn cache_hit_for_source_keeps_cached_at_for_hits() { + fn page_request_type_for_source_marks_coalesced_as_miss() { + let mut value = CacheValue { + bucket: BucketName::new("test-bucket").expect("bucket name"), + mtime: 0, + data: Bytes::from_static(b"hello"), + object_size: 5, + cached_at: 123, + }; + + assert_eq!( + page_request_type_for_source(Source::Outer, false, &mut value), + Some("coalesced") + ); + assert_eq!(value.cached_at, 0); + } + + #[test] + fn page_request_type_for_source_keeps_cached_at_for_hits() { let mut memory_value = CacheValue { bucket: BucketName::new("test-bucket").expect("bucket name"), mtime: 0, @@ -417,7 +455,10 @@ mod tests { object_size: 5, cached_at: 123, }; - assert!(cache_hit_for_source(Source::Memory, &mut memory_value)); + assert_eq!( + page_request_type_for_source(Source::Memory, false, &mut memory_value), + Some("cache_hit") + ); assert_eq!(memory_value.cached_at, 123); let mut disk_value = CacheValue { @@ -427,7 +468,10 @@ mod tests { object_size: 5, cached_at: 456, }; - assert!(cache_hit_for_source(Source::Disk, &mut disk_value)); + assert_eq!( + page_request_type_for_source(Source::Disk, false, &mut disk_value), + Some("cache_hit") + ); assert_eq!(disk_value.cached_at, 456); } } From 027859e8a2095b2bc434344eb7a6446c7b5104b8 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:11:13 -0500 Subject: [PATCH 03/11] refactor(service): type page request metrics and miss outcomes --- src/service/metrics.rs | 31 +++++++++++++++++-- src/service/mod.rs | 69 +++++++++++++++++++++++++++--------------- 2 files changed, 74 insertions(+), 26 deletions(-) diff --git a/src/service/metrics.rs b/src/service/metrics.rs index afd7755..4d46339 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -129,7 +129,34 @@ pub fn fetch_request_pages(kind: &ObjectKind, pages: u16) { .observe(f64::from(pages)); } -pub fn page_request_count(kind: &ObjectKind, typ: &str) { +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum PageRequestType { + Access, + Download, + Hedged, + ClientPref, + Fallback, + Success, + CacheHit, + Coalesced, +} + +impl PageRequestType { + const fn as_label(self) -> &'static str { + match self { + Self::Access => "access", + Self::Download => "download", + Self::Hedged => "hedged", + Self::ClientPref => "client_pref", + Self::Fallback => "fallback", + Self::Success => "success", + Self::CacheHit => "cache_hit", + Self::Coalesced => "coalesced", + } + } +} + +pub fn page_request_count(kind: &ObjectKind, typ: PageRequestType) { static COUNTER: LazyLock = LazyLock::new(|| { register_int_counter_vec!( "cachey_page_request_total", @@ -139,7 +166,7 @@ pub fn page_request_count(kind: &ObjectKind, typ: &str) { .unwrap() }); - COUNTER.with_label_values(&[&**kind, typ]).inc(); + COUNTER.with_label_values(&[&**kind, typ.as_label()]).inc(); } pub fn page_download_latency(kind: &ObjectKind, latency: std::time::Duration) { diff --git a/src/service/mod.rs b/src/service/mod.rs index c9ab45f..af2300e 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -242,21 +242,33 @@ fn now() -> u32 { .as_secs() as u32 } -fn page_request_type_for_source( +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CacheLookupOutcome { + Hit, + Miss(MissKind), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum MissKind { + Leader, + Coalesced, +} + +fn cache_lookup_outcome_for_source( source: Source, fetched_by_current_request: bool, value: &mut CacheValue, -) -> Option<&'static str> { +) -> CacheLookupOutcome { match source { Source::Outer => { value.cached_at = 0; if fetched_by_current_request { - None + CacheLookupOutcome::Miss(MissKind::Leader) } else { - Some("coalesced") + CacheLookupOutcome::Miss(MissKind::Coalesced) } } - Source::Memory | Source::Disk => Some("cache_hit"), + Source::Memory | Source::Disk => CacheLookupOutcome::Hit, } } @@ -273,7 +285,7 @@ struct PageGetExecutor { impl PageGetExecutor { async fn execute(self, page_id: PageId) -> Result<(PageId, CacheValue), ServiceError> { - metrics::page_request_count(&self.kind, "access"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Access); let cache_key = CacheKey { kind: self.kind.clone(), @@ -287,7 +299,7 @@ impl PageGetExecutor { let fetched_by_current_request = Arc::clone(&fetched_by_current_request); move || async move { fetched_by_current_request.store(true, Ordering::Relaxed); - metrics::page_request_count(&self.kind, "download"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Download); let start = u64::from(page_id) * PAGE_SIZE; let end = start + PAGE_SIZE; @@ -297,13 +309,16 @@ impl PageGetExecutor { .await?; metrics::page_download_latency(&self.kind, out.piece.latency); if out.piece.hedged.is_some() { - metrics::page_request_count(&self.kind, "hedged"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Hedged); } if self.buckets.first() == Some(&self.buckets[out.primary_bucket_idx]) { - metrics::page_request_count(&self.kind, "client_pref"); + metrics::page_request_count( + &self.kind, + metrics::PageRequestType::ClientPref, + ); } if out.used_bucket_idx != out.primary_bucket_idx { - metrics::page_request_count(&self.kind, "fallback"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Fallback); } Ok::<_, DownloadError>(CacheValue { bucket: self.buckets[out.used_bucket_idx].clone(), @@ -318,7 +333,7 @@ impl PageGetExecutor { { Ok(entry) => { let key = entry.key(); - metrics::page_request_count(&key.kind, "success"); + metrics::page_request_count(&key.kind, metrics::PageRequestType::Success); let mut value = entry.value().clone(); match self @@ -338,12 +353,18 @@ impl PageGetExecutor { } Ok(Some(_)) | Err(None) => unreachable!("CAS"), } - if let Some(request_type) = page_request_type_for_source( + match cache_lookup_outcome_for_source( entry.source(), fetched_by_current_request.load(Ordering::Relaxed), &mut value, ) { - metrics::page_request_count(&key.kind, request_type); + CacheLookupOutcome::Hit => { + metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); + } + CacheLookupOutcome::Miss(MissKind::Coalesced) => { + metrics::page_request_count(&key.kind, metrics::PageRequestType::Coalesced); + } + CacheLookupOutcome::Miss(MissKind::Leader) => {} } Ok((page_id, value)) } @@ -413,7 +434,7 @@ mod tests { } #[test] - fn page_request_type_for_source_marks_leader_outer_as_miss() { + fn cache_lookup_outcome_for_source_marks_leader_outer_as_miss() { let mut value = CacheValue { bucket: BucketName::new("test-bucket").expect("bucket name"), mtime: 0, @@ -423,14 +444,14 @@ mod tests { }; assert_eq!( - page_request_type_for_source(Source::Outer, true, &mut value), - None + cache_lookup_outcome_for_source(Source::Outer, true, &mut value), + CacheLookupOutcome::Miss(MissKind::Leader) ); assert_eq!(value.cached_at, 0); } #[test] - fn page_request_type_for_source_marks_coalesced_as_miss() { + fn cache_lookup_outcome_for_source_marks_coalesced_as_miss() { let mut value = CacheValue { bucket: BucketName::new("test-bucket").expect("bucket name"), mtime: 0, @@ -440,14 +461,14 @@ mod tests { }; assert_eq!( - page_request_type_for_source(Source::Outer, false, &mut value), - Some("coalesced") + cache_lookup_outcome_for_source(Source::Outer, false, &mut value), + CacheLookupOutcome::Miss(MissKind::Coalesced) ); assert_eq!(value.cached_at, 0); } #[test] - fn page_request_type_for_source_keeps_cached_at_for_hits() { + fn cache_lookup_outcome_for_source_keeps_cached_at_for_hits() { let mut memory_value = CacheValue { bucket: BucketName::new("test-bucket").expect("bucket name"), mtime: 0, @@ -456,8 +477,8 @@ mod tests { cached_at: 123, }; assert_eq!( - page_request_type_for_source(Source::Memory, false, &mut memory_value), - Some("cache_hit") + cache_lookup_outcome_for_source(Source::Memory, false, &mut memory_value), + CacheLookupOutcome::Hit ); assert_eq!(memory_value.cached_at, 123); @@ -469,8 +490,8 @@ mod tests { cached_at: 456, }; assert_eq!( - page_request_type_for_source(Source::Disk, false, &mut disk_value), - Some("cache_hit") + cache_lookup_outcome_for_source(Source::Disk, false, &mut disk_value), + CacheLookupOutcome::Hit ); assert_eq!(disk_value.cached_at, 456); } From 27fa513c67f29a092df3dbbee734366b03c5f71c Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:17:05 -0500 Subject: [PATCH 04/11] feat(metrics): split cache hits into memory and disk --- src/service/metrics.rs | 4 ++++ src/service/mod.rs | 28 +++++++++++++++++++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/service/metrics.rs b/src/service/metrics.rs index 4d46339..eec65d6 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -138,6 +138,8 @@ pub(super) enum PageRequestType { Fallback, Success, CacheHit, + CacheHitMemory, + CacheHitDisk, Coalesced, } @@ -151,6 +153,8 @@ impl PageRequestType { Self::Fallback => "fallback", Self::Success => "success", Self::CacheHit => "cache_hit", + Self::CacheHitMemory => "cache_hit_memory", + Self::CacheHitDisk => "cache_hit_disk", Self::Coalesced => "coalesced", } } diff --git a/src/service/mod.rs b/src/service/mod.rs index af2300e..361b775 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -244,10 +244,16 @@ fn now() -> u32 { #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum CacheLookupOutcome { - Hit, + Hit(HitKind), Miss(MissKind), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum HitKind { + Memory, + Disk, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum MissKind { Leader, @@ -268,7 +274,8 @@ fn cache_lookup_outcome_for_source( CacheLookupOutcome::Miss(MissKind::Coalesced) } } - Source::Memory | Source::Disk => CacheLookupOutcome::Hit, + Source::Memory => CacheLookupOutcome::Hit(HitKind::Memory), + Source::Disk => CacheLookupOutcome::Hit(HitKind::Disk), } } @@ -358,8 +365,19 @@ impl PageGetExecutor { fetched_by_current_request.load(Ordering::Relaxed), &mut value, ) { - CacheLookupOutcome::Hit => { + CacheLookupOutcome::Hit(HitKind::Memory) => { metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); + metrics::page_request_count( + &key.kind, + metrics::PageRequestType::CacheHitMemory, + ); + } + CacheLookupOutcome::Hit(HitKind::Disk) => { + metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); + metrics::page_request_count( + &key.kind, + metrics::PageRequestType::CacheHitDisk, + ); } CacheLookupOutcome::Miss(MissKind::Coalesced) => { metrics::page_request_count(&key.kind, metrics::PageRequestType::Coalesced); @@ -478,7 +496,7 @@ mod tests { }; assert_eq!( cache_lookup_outcome_for_source(Source::Memory, false, &mut memory_value), - CacheLookupOutcome::Hit + CacheLookupOutcome::Hit(HitKind::Memory) ); assert_eq!(memory_value.cached_at, 123); @@ -491,7 +509,7 @@ mod tests { }; assert_eq!( cache_lookup_outcome_for_source(Source::Disk, false, &mut disk_value), - CacheLookupOutcome::Hit + CacheLookupOutcome::Hit(HitKind::Disk) ); assert_eq!(disk_value.cached_at, 456); } From 17d26be313fab1888097e9c0e67d76d19e488025 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:22:26 -0500 Subject: [PATCH 05/11] test(service): replace miss classification units with e2e coalescing test --- src/service/mod.rs | 258 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 207 insertions(+), 51 deletions(-) diff --git a/src/service/mod.rs b/src/service/mod.rs index 361b775..f922d97 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -396,8 +396,160 @@ impl PageGetExecutor { #[cfg(test)] mod tests { + use std::{ + sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, + time::Duration, + }; + + use aws_config::BehaviorVersion; + use aws_sdk_s3::config::{Credentials, Region}; + use axum::{ + Router, + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + routing::get, + }; + use bytesize::ByteSize; + use super::*; + #[derive(Debug, Clone)] + struct MockS3State { + expected_bucket: String, + expected_key: String, + object: Bytes, + request_count: Arc, + response_delay: Duration, + } + + fn parse_range_header(range_header: &str) -> Option<(u64, u64)> { + let range = range_header.strip_prefix("bytes=")?; + let (start, end) = range.split_once('-')?; + Some((start.parse().ok()?, end.parse().ok()?)) + } + + async fn mock_get_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, + headers: HeaderMap, + ) -> (StatusCode, HeaderMap, Bytes) { + if bucket != state.expected_bucket || key != state.expected_key { + return (StatusCode::NOT_FOUND, HeaderMap::new(), Bytes::new()); + } + + let Some(range_header) = headers + .get(http::header::RANGE) + .and_then(|v| v.to_str().ok()) + else { + return (StatusCode::BAD_REQUEST, HeaderMap::new(), Bytes::new()); + }; + let Some((requested_start, requested_end)) = parse_range_header(range_header) else { + return (StatusCode::BAD_REQUEST, HeaderMap::new(), Bytes::new()); + }; + + state.request_count.fetch_add(1, AtomicOrdering::Relaxed); + tokio::time::sleep(state.response_delay).await; + + let object_size = state.object.len() as u64; + if requested_start >= object_size { + let mut headers = HeaderMap::new(); + headers.insert( + http::header::CONTENT_RANGE, + format!("bytes */{object_size}") + .parse() + .expect("content-range"), + ); + return (StatusCode::RANGE_NOT_SATISFIABLE, headers, Bytes::new()); + } + + let response_end = requested_end.min(object_size.saturating_sub(1)); + let mut headers = HeaderMap::new(); + headers.insert( + http::header::CONTENT_RANGE, + format!("bytes {requested_start}-{response_end}/{object_size}") + .parse() + .expect("content-range"), + ); + headers.insert( + http::header::LAST_MODIFIED, + "Tue, 15 Nov 1994 08:12:31 GMT" + .parse() + .expect("last-modified"), + ); + let data = state + .object + .slice((requested_start as usize)..=(response_end as usize)); + (StatusCode::PARTIAL_CONTENT, headers, data) + } + + async fn spawn_mock_s3_server( + bucket: &BucketName, + key: &ObjectKey, + object: Bytes, + response_delay: Duration, + ) -> (String, Arc, tokio::task::JoinHandle<()>) { + let request_count = Arc::new(AtomicUsize::new(0)); + let state = Arc::new(MockS3State { + expected_bucket: bucket.to_string(), + expected_key: key.to_string(), + object, + request_count: Arc::clone(&request_count), + response_delay, + }); + let app = Router::new() + .route("/{bucket}/{*key}", get(mock_get_object)) + .with_state(state); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind mock server"); + let endpoint = format!("http://{}", listener.local_addr().expect("local addr")); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.expect("serve mock s3"); + }); + (endpoint, request_count, handle) + } + + fn mock_s3_client(endpoint: &str) -> aws_sdk_s3::Client { + let config = aws_sdk_s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(Credentials::new("test", "test", None, None, "test")) + .endpoint_url(endpoint) + .force_path_style(true) + .region(Region::new("us-east-1")) + .build(); + aws_sdk_s3::Client::from_conf(config) + } + + fn metric_page_request_total(kind: &ObjectKind, typ: &str) -> u64 { + prometheus::gather() + .into_iter() + .find(|family| family.name() == "cachey_page_request_total") + .and_then(|family| { + family.metric.iter().find_map(|metric| { + let mut metric_kind = None; + let mut metric_type = None; + for label in &metric.label { + match label.name() { + "kind" => metric_kind = Some(label.value()), + "type" => metric_type = Some(label.value()), + _ => {} + } + } + if metric_kind == Some(&**kind) && metric_type == Some(typ) { + Some(metric.counter.value().round() as u64) + } else { + None + } + }) + }) + .unwrap_or(0) + } + + fn unique_name(prefix: &str) -> String { + static NEXT_ID: AtomicU64 = AtomicU64::new(0); + format!("{prefix}-{}", NEXT_ID.fetch_add(1, AtomicOrdering::Relaxed)) + } + #[test] fn page_id_for_byte_offset_matches_page_boundaries() { assert_eq!(page_id_for_byte_offset(0), 0); @@ -451,66 +603,70 @@ mod tests { } } - #[test] - fn cache_lookup_outcome_for_source_marks_leader_outer_as_miss() { - let mut value = CacheValue { - bucket: BucketName::new("test-bucket").expect("bucket name"), - mtime: 0, - data: Bytes::from_static(b"hello"), - object_size: 5, - cached_at: 123, - }; + #[tokio::test] + async fn page_get_executor_coalesced_miss_is_counted_end_to_end() { + let kind = ObjectKind::new(unique_name("kind")).expect("kind"); + let object = ObjectKey::new(unique_name("object")).expect("object"); + let bucket = BucketName::new(unique_name("bucket")).expect("bucket"); + let buckets = BucketNameSet::new(std::iter::once(bucket.clone())).expect("buckets"); + let object_data = Bytes::from(vec![7_u8; 4096]); + + let (endpoint, request_count, server_handle) = + spawn_mock_s3_server(&bucket, &object, object_data, Duration::from_millis(50)).await; + let s3 = mock_s3_client(&endpoint); + let downloader = + Downloader::new(s3, 0.9, Arc::new(Mutex::new(SlidingThroughput::default()))); + let cache = build_cache(CacheConfig { + memory_size: ByteSize::mib(16), + disk_cache: None, + metrics_registry: None, + }) + .await + .expect("cache"); - assert_eq!( - cache_lookup_outcome_for_source(Source::Outer, true, &mut value), - CacheLookupOutcome::Miss(MissKind::Leader) - ); - assert_eq!(value.cached_at, 0); - } + let before_access = metric_page_request_total(&kind, "access"); + let before_success = metric_page_request_total(&kind, "success"); + let before_download = metric_page_request_total(&kind, "download"); + let before_coalesced = metric_page_request_total(&kind, "coalesced"); + let before_cache_hit = metric_page_request_total(&kind, "cache_hit"); - #[test] - fn cache_lookup_outcome_for_source_marks_coalesced_as_miss() { - let mut value = CacheValue { - bucket: BucketName::new("test-bucket").expect("bucket name"), - mtime: 0, - data: Bytes::from_static(b"hello"), - object_size: 5, - cached_at: 123, + let executor = PageGetExecutor { + cache, + downloader, + kind: kind.clone(), + object: object.clone(), + buckets, + object_size: Arc::default(), + req_config: RequestConfig::default(), }; + let (left, right) = tokio::join!(executor.clone().execute(0), executor.execute(0)); + let left_value = left.expect("left request").1; + let right_value = right.expect("right request").1; + assert_eq!(request_count.load(AtomicOrdering::Relaxed), 1); + assert_eq!(left_value.cached_at, 0); + assert_eq!(right_value.cached_at, 0); assert_eq!( - cache_lookup_outcome_for_source(Source::Outer, false, &mut value), - CacheLookupOutcome::Miss(MissKind::Coalesced) + metric_page_request_total(&kind, "access") - before_access, + 2 ); - assert_eq!(value.cached_at, 0); - } - - #[test] - fn cache_lookup_outcome_for_source_keeps_cached_at_for_hits() { - let mut memory_value = CacheValue { - bucket: BucketName::new("test-bucket").expect("bucket name"), - mtime: 0, - data: Bytes::from_static(b"hello"), - object_size: 5, - cached_at: 123, - }; assert_eq!( - cache_lookup_outcome_for_source(Source::Memory, false, &mut memory_value), - CacheLookupOutcome::Hit(HitKind::Memory) + metric_page_request_total(&kind, "success") - before_success, + 2 + ); + assert_eq!( + metric_page_request_total(&kind, "download") - before_download, + 1 ); - assert_eq!(memory_value.cached_at, 123); - - let mut disk_value = CacheValue { - bucket: BucketName::new("test-bucket").expect("bucket name"), - mtime: 0, - data: Bytes::from_static(b"hello"), - object_size: 5, - cached_at: 456, - }; assert_eq!( - cache_lookup_outcome_for_source(Source::Disk, false, &mut disk_value), - CacheLookupOutcome::Hit(HitKind::Disk) + metric_page_request_total(&kind, "coalesced") - before_coalesced, + 1 ); - assert_eq!(disk_value.cached_at, 456); + assert_eq!( + metric_page_request_total(&kind, "cache_hit") - before_cache_hit, + 0 + ); + + server_handle.abort(); } } From 19807e07892f933a87d875acd7bd5a1b83cc2811 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:24:57 -0500 Subject: [PATCH 06/11] docs(metrics): document page request metric types --- src/service/metrics.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/service/metrics.rs b/src/service/metrics.rs index eec65d6..571741a 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -131,15 +131,25 @@ pub fn fetch_request_pages(kind: &ObjectKind, pages: u16) { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) enum PageRequestType { + /// Any page request received by `PageGetExecutor`. Access, + /// A request that performed the upstream object-store download. Download, + /// A request where hedging was used during object-store download. Hedged, + /// A download that used the client's preferred bucket as the primary choice. ClientPref, + /// A download that succeeded from a fallback bucket instead of the primary bucket. Fallback, + /// A page request that completed successfully (hit or miss path). Success, + /// Aggregate cache hit count (includes both memory and disk hits). CacheHit, + /// Cache hit served directly from the in-memory cache. CacheHitMemory, + /// Cache hit served from disk cache. CacheHitDisk, + /// Coalesced miss waiter that received an outer-fetched entry without running fetch itself. Coalesced, } From b00994b08251c35e138c8bd22716f5e5b5139df3 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:25:38 -0500 Subject: [PATCH 07/11] refactor(service): export PageRequestType publicly --- src/service/metrics.rs | 2 +- src/service/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/service/metrics.rs b/src/service/metrics.rs index 571741a..f4843b6 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -130,7 +130,7 @@ pub fn fetch_request_pages(kind: &ObjectKind, pages: u16) { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(super) enum PageRequestType { +pub enum PageRequestType { /// Any page request received by `PageGetExecutor`. Access, /// A request that performed the upstream object-store download. diff --git a/src/service/mod.rs b/src/service/mod.rs index f922d97..321410a 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -18,6 +18,7 @@ use parking_lot::Mutex; mod metrics; mod throughput; +pub use metrics::PageRequestType; pub use throughput::SlidingThroughput; mod routes; From 4ef682aec4761e4fde498c7ad360ccd2d6516b83 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:26:08 -0500 Subject: [PATCH 08/11] docs(metrics): describe page request counters by behavior --- src/service/metrics.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/service/metrics.rs b/src/service/metrics.rs index f4843b6..7d97537 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -131,25 +131,25 @@ pub fn fetch_request_pages(kind: &ObjectKind, pages: u16) { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PageRequestType { - /// Any page request received by `PageGetExecutor`. + /// Total page requests observed by the service. Access, - /// A request that performed the upstream object-store download. + /// Page requests that fetched bytes from object storage. Download, - /// A request where hedging was used during object-store download. + /// Object-storage fetches where a hedged request was issued. Hedged, - /// A download that used the client's preferred bucket as the primary choice. + /// Fetches whose primary attempt used the client-preferred bucket. ClientPref, - /// A download that succeeded from a fallback bucket instead of the primary bucket. + /// Fetches that succeeded via a fallback bucket after the primary path failed. Fallback, - /// A page request that completed successfully (hit or miss path). + /// Page requests that completed successfully, regardless of hit/miss path. Success, - /// Aggregate cache hit count (includes both memory and disk hits). + /// Aggregate cache-hit count (includes both memory and disk hits). CacheHit, - /// Cache hit served directly from the in-memory cache. + /// Cache hits served from in-memory cache. CacheHitMemory, - /// Cache hit served from disk cache. + /// Cache hits served from disk cache. CacheHitDisk, - /// Coalesced miss waiter that received an outer-fetched entry without running fetch itself. + /// Requests that waited on another in-flight miss for the same page instead of downloading. Coalesced, } From 8501ed98ac8252bb1ccc98b7aada7b9e5e399c0c Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:27:27 -0500 Subject: [PATCH 09/11] docs(metrics): adjust object storage wording --- src/service/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service/metrics.rs b/src/service/metrics.rs index 7d97537..9310022 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -135,7 +135,7 @@ pub enum PageRequestType { Access, /// Page requests that fetched bytes from object storage. Download, - /// Object-storage fetches where a hedged request was issued. + /// Object storage fetches where a hedged request was issued. Hedged, /// Fetches whose primary attempt used the client-preferred bucket. ClientPref, From f42ea2a0542a1e6ac1c09e7d1e1a44025878e19e Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:30:21 -0500 Subject: [PATCH 10/11] refactor(service): make cache lookup classifier side-effect free --- src/service/mod.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/service/mod.rs b/src/service/mod.rs index 321410a..0ade6fe 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -264,11 +264,9 @@ enum MissKind { fn cache_lookup_outcome_for_source( source: Source, fetched_by_current_request: bool, - value: &mut CacheValue, ) -> CacheLookupOutcome { match source { Source::Outer => { - value.cached_at = 0; if fetched_by_current_request { CacheLookupOutcome::Miss(MissKind::Leader) } else { @@ -361,11 +359,14 @@ impl PageGetExecutor { } Ok(Some(_)) | Err(None) => unreachable!("CAS"), } - match cache_lookup_outcome_for_source( + let cache_lookup_outcome = cache_lookup_outcome_for_source( entry.source(), fetched_by_current_request.load(Ordering::Relaxed), - &mut value, - ) { + ); + if matches!(cache_lookup_outcome, CacheLookupOutcome::Miss(_)) { + value.cached_at = 0; + } + match cache_lookup_outcome { CacheLookupOutcome::Hit(HitKind::Memory) => { metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); metrics::page_request_count( From 198cc5363bf1ed6b951167faee8d364a1b4f8901 Mon Sep 17 00:00:00 2001 From: shikhar Date: Fri, 20 Feb 2026 10:31:30 -0500 Subject: [PATCH 11/11] refactor(service): inline cache source classification --- src/service/mod.rs | 59 +++++++++------------------------------------- 1 file changed, 11 insertions(+), 48 deletions(-) diff --git a/src/service/mod.rs b/src/service/mod.rs index 0ade6fe..45f8363 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -243,41 +243,6 @@ fn now() -> u32 { .as_secs() as u32 } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum CacheLookupOutcome { - Hit(HitKind), - Miss(MissKind), -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum HitKind { - Memory, - Disk, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum MissKind { - Leader, - Coalesced, -} - -fn cache_lookup_outcome_for_source( - source: Source, - fetched_by_current_request: bool, -) -> CacheLookupOutcome { - match source { - Source::Outer => { - if fetched_by_current_request { - CacheLookupOutcome::Miss(MissKind::Leader) - } else { - CacheLookupOutcome::Miss(MissKind::Coalesced) - } - } - Source::Memory => CacheLookupOutcome::Hit(HitKind::Memory), - Source::Disk => CacheLookupOutcome::Hit(HitKind::Disk), - } -} - #[derive(Debug, Clone)] struct PageGetExecutor { cache: foyer::HybridCache, @@ -359,32 +324,30 @@ impl PageGetExecutor { } Ok(Some(_)) | Err(None) => unreachable!("CAS"), } - let cache_lookup_outcome = cache_lookup_outcome_for_source( - entry.source(), - fetched_by_current_request.load(Ordering::Relaxed), - ); - if matches!(cache_lookup_outcome, CacheLookupOutcome::Miss(_)) { - value.cached_at = 0; - } - match cache_lookup_outcome { - CacheLookupOutcome::Hit(HitKind::Memory) => { + match entry.source() { + Source::Memory => { metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); metrics::page_request_count( &key.kind, metrics::PageRequestType::CacheHitMemory, ); } - CacheLookupOutcome::Hit(HitKind::Disk) => { + Source::Disk => { metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); metrics::page_request_count( &key.kind, metrics::PageRequestType::CacheHitDisk, ); } - CacheLookupOutcome::Miss(MissKind::Coalesced) => { - metrics::page_request_count(&key.kind, metrics::PageRequestType::Coalesced); + Source::Outer => { + value.cached_at = 0; + if !fetched_by_current_request.load(Ordering::Relaxed) { + metrics::page_request_count( + &key.kind, + metrics::PageRequestType::Coalesced, + ); + } } - CacheLookupOutcome::Miss(MissKind::Leader) => {} } Ok((page_id, value)) }