diff --git a/src/service/metrics.rs b/src/service/metrics.rs index afd7755..9310022 100644 --- a/src/service/metrics.rs +++ b/src/service/metrics.rs @@ -129,7 +129,48 @@ pub fn fetch_request_pages(kind: &ObjectKind, pages: u16) { .observe(f64::from(pages)); } -pub fn page_request_count(kind: &ObjectKind, typ: &str) { +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PageRequestType { + /// Total page requests observed by the service. + Access, + /// Page requests that fetched bytes from object storage. + Download, + /// Object storage fetches where a hedged request was issued. + Hedged, + /// Fetches whose primary attempt used the client-preferred bucket. + ClientPref, + /// Fetches that succeeded via a fallback bucket after the primary path failed. + Fallback, + /// Page requests that completed successfully, regardless of hit/miss path. + Success, + /// Aggregate cache-hit count (includes both memory and disk hits). + CacheHit, + /// Cache hits served from in-memory cache. + CacheHitMemory, + /// Cache hits served from disk cache. + CacheHitDisk, + /// Requests that waited on another in-flight miss for the same page instead of downloading. + Coalesced, +} + +impl PageRequestType { + const fn as_label(self) -> &'static str { + match self { + Self::Access => "access", + Self::Download => "download", + Self::Hedged => "hedged", + Self::ClientPref => "client_pref", + Self::Fallback => "fallback", + Self::Success => "success", + Self::CacheHit => "cache_hit", + Self::CacheHitMemory => "cache_hit_memory", + Self::CacheHitDisk => "cache_hit_disk", + Self::Coalesced => "coalesced", + } + } +} + +pub fn page_request_count(kind: &ObjectKind, typ: PageRequestType) { static COUNTER: LazyLock = LazyLock::new(|| { register_int_counter_vec!( "cachey_page_request_total", @@ -139,7 +180,7 @@ pub fn page_request_count(kind: &ObjectKind, typ: &str) { .unwrap() }); - COUNTER.with_label_values(&[&**kind, typ]).inc(); + COUNTER.with_label_values(&[&**kind, typ.as_label()]).inc(); } pub fn page_download_latency(kind: &ObjectKind, latency: std::time::Duration) { diff --git a/src/service/mod.rs b/src/service/mod.rs index 074be11..45f8363 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -2,18 +2,23 @@ use std::{ net::SocketAddr, num::NonZeroU32, ops::{Range, RangeInclusive}, - sync::{Arc, atomic::AtomicBool}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::Duration, }; use bytes::Bytes; use crossbeam::atomic::AtomicCell; use eyre::Result; +use foyer::Source; use futures::{Stream, StreamExt}; use parking_lot::Mutex; mod metrics; mod throughput; +pub use metrics::PageRequestType; pub use throughput::SlidingThroughput; mod routes; @@ -251,21 +256,21 @@ struct PageGetExecutor { impl PageGetExecutor { async fn execute(self, page_id: PageId) -> Result<(PageId, CacheValue), ServiceError> { - metrics::page_request_count(&self.kind, "access"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Access); let cache_key = CacheKey { kind: self.kind.clone(), object: self.object.clone(), page_id, }; - let hit_state = Arc::new(AtomicBool::new(true)); + let fetched_by_current_request = Arc::new(AtomicBool::new(false)); match self .cache .get_or_fetch(&cache_key, { - let hit_state = hit_state.clone(); + let fetched_by_current_request = Arc::clone(&fetched_by_current_request); move || async move { - hit_state.store(false, std::sync::atomic::Ordering::Relaxed); - metrics::page_request_count(&self.kind, "download"); + fetched_by_current_request.store(true, Ordering::Relaxed); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Download); let start = u64::from(page_id) * PAGE_SIZE; let end = start + PAGE_SIZE; @@ -275,13 +280,16 @@ impl PageGetExecutor { .await?; metrics::page_download_latency(&self.kind, out.piece.latency); if out.piece.hedged.is_some() { - metrics::page_request_count(&self.kind, "hedged"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Hedged); } if self.buckets.first() == Some(&self.buckets[out.primary_bucket_idx]) { - metrics::page_request_count(&self.kind, "client_pref"); + metrics::page_request_count( + &self.kind, + metrics::PageRequestType::ClientPref, + ); } if out.used_bucket_idx != out.primary_bucket_idx { - metrics::page_request_count(&self.kind, "fallback"); + metrics::page_request_count(&self.kind, metrics::PageRequestType::Fallback); } Ok::<_, DownloadError>(CacheValue { bucket: self.buckets[out.used_bucket_idx].clone(), @@ -296,7 +304,7 @@ impl PageGetExecutor { { Ok(entry) => { let key = entry.key(); - metrics::page_request_count(&key.kind, "success"); + metrics::page_request_count(&key.kind, metrics::PageRequestType::Success); let mut value = entry.value().clone(); match self @@ -316,13 +324,30 @@ impl PageGetExecutor { } Ok(Some(_)) | Err(None) => unreachable!("CAS"), } - // It gets initialized as now() for insertion via fetch() in case of a miss, - // but we need this to signal whether it was a hit or miss. - // 0 => miss, non-zero => timestamp. - if hit_state.load(std::sync::atomic::Ordering::Relaxed) { - metrics::page_request_count(&key.kind, "cache_hit"); - } else { - value.cached_at = 0; + match entry.source() { + Source::Memory => { + metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); + metrics::page_request_count( + &key.kind, + metrics::PageRequestType::CacheHitMemory, + ); + } + Source::Disk => { + metrics::page_request_count(&key.kind, metrics::PageRequestType::CacheHit); + metrics::page_request_count( + &key.kind, + metrics::PageRequestType::CacheHitDisk, + ); + } + Source::Outer => { + value.cached_at = 0; + if !fetched_by_current_request.load(Ordering::Relaxed) { + metrics::page_request_count( + &key.kind, + metrics::PageRequestType::Coalesced, + ); + } + } } Ok((page_id, value)) } @@ -336,8 +361,160 @@ impl PageGetExecutor { #[cfg(test)] mod tests { + use std::{ + sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, + time::Duration, + }; + + use aws_config::BehaviorVersion; + use aws_sdk_s3::config::{Credentials, Region}; + use axum::{ + Router, + extract::{Path, State}, + http::{HeaderMap, StatusCode}, + routing::get, + }; + use bytesize::ByteSize; + use super::*; + #[derive(Debug, Clone)] + struct MockS3State { + expected_bucket: String, + expected_key: String, + object: Bytes, + request_count: Arc, + response_delay: Duration, + } + + fn parse_range_header(range_header: &str) -> Option<(u64, u64)> { + let range = range_header.strip_prefix("bytes=")?; + let (start, end) = range.split_once('-')?; + Some((start.parse().ok()?, end.parse().ok()?)) + } + + async fn mock_get_object( + State(state): State>, + Path((bucket, key)): Path<(String, String)>, + headers: HeaderMap, + ) -> (StatusCode, HeaderMap, Bytes) { + if bucket != state.expected_bucket || key != state.expected_key { + return (StatusCode::NOT_FOUND, HeaderMap::new(), Bytes::new()); + } + + let Some(range_header) = headers + .get(http::header::RANGE) + .and_then(|v| v.to_str().ok()) + else { + return (StatusCode::BAD_REQUEST, HeaderMap::new(), Bytes::new()); + }; + let Some((requested_start, requested_end)) = parse_range_header(range_header) else { + return (StatusCode::BAD_REQUEST, HeaderMap::new(), Bytes::new()); + }; + + state.request_count.fetch_add(1, AtomicOrdering::Relaxed); + tokio::time::sleep(state.response_delay).await; + + let object_size = state.object.len() as u64; + if requested_start >= object_size { + let mut headers = HeaderMap::new(); + headers.insert( + http::header::CONTENT_RANGE, + format!("bytes */{object_size}") + .parse() + .expect("content-range"), + ); + return (StatusCode::RANGE_NOT_SATISFIABLE, headers, Bytes::new()); + } + + let response_end = requested_end.min(object_size.saturating_sub(1)); + let mut headers = HeaderMap::new(); + headers.insert( + http::header::CONTENT_RANGE, + format!("bytes {requested_start}-{response_end}/{object_size}") + .parse() + .expect("content-range"), + ); + headers.insert( + http::header::LAST_MODIFIED, + "Tue, 15 Nov 1994 08:12:31 GMT" + .parse() + .expect("last-modified"), + ); + let data = state + .object + .slice((requested_start as usize)..=(response_end as usize)); + (StatusCode::PARTIAL_CONTENT, headers, data) + } + + async fn spawn_mock_s3_server( + bucket: &BucketName, + key: &ObjectKey, + object: Bytes, + response_delay: Duration, + ) -> (String, Arc, tokio::task::JoinHandle<()>) { + let request_count = Arc::new(AtomicUsize::new(0)); + let state = Arc::new(MockS3State { + expected_bucket: bucket.to_string(), + expected_key: key.to_string(), + object, + request_count: Arc::clone(&request_count), + response_delay, + }); + let app = Router::new() + .route("/{bucket}/{*key}", get(mock_get_object)) + .with_state(state); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind mock server"); + let endpoint = format!("http://{}", listener.local_addr().expect("local addr")); + let handle = tokio::spawn(async move { + axum::serve(listener, app).await.expect("serve mock s3"); + }); + (endpoint, request_count, handle) + } + + fn mock_s3_client(endpoint: &str) -> aws_sdk_s3::Client { + let config = aws_sdk_s3::Config::builder() + .behavior_version(BehaviorVersion::latest()) + .credentials_provider(Credentials::new("test", "test", None, None, "test")) + .endpoint_url(endpoint) + .force_path_style(true) + .region(Region::new("us-east-1")) + .build(); + aws_sdk_s3::Client::from_conf(config) + } + + fn metric_page_request_total(kind: &ObjectKind, typ: &str) -> u64 { + prometheus::gather() + .into_iter() + .find(|family| family.name() == "cachey_page_request_total") + .and_then(|family| { + family.metric.iter().find_map(|metric| { + let mut metric_kind = None; + let mut metric_type = None; + for label in &metric.label { + match label.name() { + "kind" => metric_kind = Some(label.value()), + "type" => metric_type = Some(label.value()), + _ => {} + } + } + if metric_kind == Some(&**kind) && metric_type == Some(typ) { + Some(metric.counter.value().round() as u64) + } else { + None + } + }) + }) + .unwrap_or(0) + } + + fn unique_name(prefix: &str) -> String { + static NEXT_ID: AtomicU64 = AtomicU64::new(0); + format!("{prefix}-{}", NEXT_ID.fetch_add(1, AtomicOrdering::Relaxed)) + } + #[test] fn page_id_for_byte_offset_matches_page_boundaries() { assert_eq!(page_id_for_byte_offset(0), 0); @@ -390,4 +567,71 @@ mod tests { other => panic!("unexpected error: {other:?}"), } } + + #[tokio::test] + async fn page_get_executor_coalesced_miss_is_counted_end_to_end() { + let kind = ObjectKind::new(unique_name("kind")).expect("kind"); + let object = ObjectKey::new(unique_name("object")).expect("object"); + let bucket = BucketName::new(unique_name("bucket")).expect("bucket"); + let buckets = BucketNameSet::new(std::iter::once(bucket.clone())).expect("buckets"); + let object_data = Bytes::from(vec![7_u8; 4096]); + + let (endpoint, request_count, server_handle) = + spawn_mock_s3_server(&bucket, &object, object_data, Duration::from_millis(50)).await; + let s3 = mock_s3_client(&endpoint); + let downloader = + Downloader::new(s3, 0.9, Arc::new(Mutex::new(SlidingThroughput::default()))); + let cache = build_cache(CacheConfig { + memory_size: ByteSize::mib(16), + disk_cache: None, + metrics_registry: None, + }) + .await + .expect("cache"); + + let before_access = metric_page_request_total(&kind, "access"); + let before_success = metric_page_request_total(&kind, "success"); + let before_download = metric_page_request_total(&kind, "download"); + let before_coalesced = metric_page_request_total(&kind, "coalesced"); + let before_cache_hit = metric_page_request_total(&kind, "cache_hit"); + + let executor = PageGetExecutor { + cache, + downloader, + kind: kind.clone(), + object: object.clone(), + buckets, + object_size: Arc::default(), + req_config: RequestConfig::default(), + }; + let (left, right) = tokio::join!(executor.clone().execute(0), executor.execute(0)); + let left_value = left.expect("left request").1; + let right_value = right.expect("right request").1; + + assert_eq!(request_count.load(AtomicOrdering::Relaxed), 1); + assert_eq!(left_value.cached_at, 0); + assert_eq!(right_value.cached_at, 0); + assert_eq!( + metric_page_request_total(&kind, "access") - before_access, + 2 + ); + assert_eq!( + metric_page_request_total(&kind, "success") - before_success, + 2 + ); + assert_eq!( + metric_page_request_total(&kind, "download") - before_download, + 1 + ); + assert_eq!( + metric_page_request_total(&kind, "coalesced") - before_coalesced, + 1 + ); + assert_eq!( + metric_page_request_total(&kind, "cache_hit") - before_cache_hit, + 0 + ); + + server_handle.abort(); + } }