diff --git a/src/object_store/downloader.rs b/src/object_store/downloader.rs index f2fa76b..b91d2ef 100644 --- a/src/object_store/downloader.rs +++ b/src/object_store/downloader.rs @@ -207,81 +207,84 @@ impl Downloader { latency: Duration, hedged: Option, ) -> Result { - match result { + let final_result = match result { Ok(output) => { - self.bucketed_stats.observe(bucket.clone(), Ok(latency)); - let content_range = match output.content_range().and_then(ContentRange::parse) { - Some(ContentRange::Bytes(rsp_range)) => { - if rsp_range.first_byte != req_range.start { + async { + let content_range = match output.content_range().and_then(ContentRange::parse) { + Some(ContentRange::Bytes(rsp_range)) => { + if rsp_range.first_byte != req_range.start { + return Err(DownloadError::RangeNotSatisfied { + requested: req_range.clone(), + object_size: Some(rsp_range.complete_length), + }); + } + if rsp_range.last_byte != (req_range.end - 1) + && rsp_range.last_byte != (rsp_range.complete_length - 1) + { + return Err(DownloadError::RangeNotSatisfied { + requested: req_range.clone(), + object_size: Some(rsp_range.complete_length), + }); + } + rsp_range + } + Some(ContentRange::Unsatisfied(r)) => { return Err(DownloadError::RangeNotSatisfied { requested: req_range.clone(), - object_size: Some(rsp_range.complete_length), + object_size: Some(r.complete_length), }); } - if rsp_range.last_byte != (req_range.end - 1) - && rsp_range.last_byte != (rsp_range.complete_length - 1) - { + Some(ContentRange::UnboundBytes(_)) | None => { return Err(DownloadError::RangeNotSatisfied { requested: req_range.clone(), - object_size: Some(rsp_range.complete_length), + object_size: None, }); } - rsp_range - } - Some(ContentRange::Unsatisfied(r)) => { - return Err(DownloadError::RangeNotSatisfied { - requested: req_range.clone(), - object_size: Some(r.complete_length), - }); + }; + let expected_data_len = content_range.last_byte - content_range.first_byte + 1; + let object_size = content_range.complete_length; + let mtime = output + .last_modified() + .and_then(|dt| dt.secs().try_into().ok()) + .unwrap_or(0); + let data = output + .body + .collect() + .await + .map_err(|e| DownloadError::BodyStreaming(e.to_string()))? + .into_bytes(); + self.throughput.lock().record(data.len()); + if data.len() as u64 != expected_data_len { + return Err(DownloadError::BodyStreaming(format!( + "Expected {} bytes, got {}", + expected_data_len, + data.len() + ))); } - Some(ContentRange::UnboundBytes(_)) | None => { - return Err(DownloadError::RangeNotSatisfied { - requested: req_range.clone(), - object_size: None, - }); - } - }; - let expected_data_len = content_range.last_byte - content_range.first_byte + 1; - let object_size = content_range.complete_length; - let mtime = output - .last_modified() - .and_then(|dt| dt.secs().try_into().ok()) - .unwrap_or(0); - let data = output - .body - .collect() - .await - .map_err(|e| DownloadError::BodyStreaming(e.to_string()))? - .into_bytes(); - self.throughput.lock().record(data.len()); - if data.len() as u64 != expected_data_len { - return Err(DownloadError::BodyStreaming(format!( - "Expected {} bytes, got {}", - expected_data_len, - data.len() - ))); + Ok(ObjectPiece { + mtime, + data, + object_size, + latency, + hedged, + }) } - Ok(ObjectPiece { - mtime, - data, - object_size, - latency, - hedged, - }) - } - Err(e) => { - self.bucketed_stats.observe(bucket, Err(())); - Err(match e.into_service_error() { - aws_sdk_s3::operation::get_object::GetObjectError::InvalidObjectState(ios) => { - DownloadError::InvalidObjectState(ios.message.unwrap_or_default()) - } - aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => { - DownloadError::NoSuchKey - } - err => DownloadError::Unknown(format!("{err:?}")), - }) + .await } - } + Err(e) => Err(match e.into_service_error() { + aws_sdk_s3::operation::get_object::GetObjectError::InvalidObjectState(ios) => { + DownloadError::InvalidObjectState(ios.message.unwrap_or_default()) + } + aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_) => { + DownloadError::NoSuchKey + } + err => DownloadError::Unknown(format!("{err:?}")), + }), + }; + + let observed_outcome = final_result.as_ref().map(|_| latency).map_err(|_| ()); + self.bucketed_stats.observe(bucket, observed_outcome); + final_result } async fn hedge_trigger(&self, bucket: &BucketName, start_time: Instant) -> Option { @@ -365,7 +368,7 @@ mod tests { let result = downloader .handle_result( - bucket, + bucket.clone(), &req_range, Ok(output), Duration::from_millis(100), @@ -383,6 +386,14 @@ mod tests { } _ => panic!("Expected RangeNotSatisfied error"), } + let mut metrics_checked = false; + downloader.observe_bucket_metrics(|name, metrics| { + if name == &bucket { + metrics_checked = true; + assert_eq!(metrics.consecutive_failures, 1); + } + }); + assert!(metrics_checked); } #[tokio::test]