diff --git a/ci/smoke-docker.sh b/ci/smoke-docker.sh index ec9354c..fcc14cd 100755 --- a/ci/smoke-docker.sh +++ b/ci/smoke-docker.sh @@ -3,6 +3,7 @@ set -euo pipefail BIN="${1:-target/debug/pocker}" PUBLIC_REF="registry.k8s.io/pause:3.9" +SECOND_PUBLIC_REF="registry.k8s.io/pause:3.8" RESUME_REGISTRY_PORT="${RESUME_REGISTRY_PORT:-5002}" AUTH_REGISTRY_PORT="${AUTH_REGISTRY_PORT:-5003}" RESUME_REF="localhost:${RESUME_REGISTRY_PORT}/pocker/resume:latest" @@ -67,6 +68,25 @@ if docker images --format '{{.Repository}}:{{.Tag}}' | grep -q 'pocker-cache'; t exit 1 fi +echo "smoke: parallel cached load into Docker" +docker image rm -f "${PUBLIC_REF}" "${SECOND_PUBLIC_REF}" >/dev/null 2>&1 || true +run_pocker \ + --cache-dir "${WORKDIR}/cache-parallel-load" \ + pull \ + --no-load \ + --max-parallel-images 2 \ + "${PUBLIC_REF}" \ + "${SECOND_PUBLIC_REF}" +docker image rm -f "${PUBLIC_REF}" "${SECOND_PUBLIC_REF}" >/dev/null 2>&1 || true +run_pocker \ + --cache-dir "${WORKDIR}/cache-parallel-load" \ + pull \ + --max-parallel-images 2 \ + "${PUBLIC_REF}" \ + "${SECOND_PUBLIC_REF}" +docker image inspect "${PUBLIC_REF}" >/dev/null +docker image inspect "${SECOND_PUBLIC_REF}" >/dev/null + echo "smoke: compose config and service-filtered pull" mkdir -p "${WORKDIR}/compose" cat > "${WORKDIR}/compose/.env" < Result { + let mut buffer = [0_u8; 64 * 1024]; + match self { + Self::Sha256 => copy_and_hash_reader::(&mut reader, &mut writer, &mut buffer) + .map(|digest| format!("{}:{digest}", self)), + Self::Sha384 => copy_and_hash_reader::(&mut reader, &mut writer, &mut buffer) + .map(|digest| format!("{}:{digest}", self)), + Self::Sha512 => copy_and_hash_reader::(&mut reader, &mut writer, &mut buffer) + .map(|digest| format!("{}:{digest}", self)), + } + } } impl fmt::Display for DigestAlgorithm { @@ -102,6 +118,16 @@ pub(crate) fn digest_file_for_digest(digest: &str, path: &Path) -> Result Result { + parse_digest(digest)? + .algorithm + .copy_reader_with_digest(reader, writer) +} + pub(crate) fn canonical_digest_bytes(bytes: &[u8]) -> String { format!("sha256:{}", sha256_hex(bytes)) } @@ -128,9 +154,28 @@ fn hash_reader(reader: &mut impl Read, buffer: &mut [u8]) -> Re Ok(hex::encode(hasher.finalize())) } +fn copy_and_hash_reader( + reader: &mut impl Read, + writer: &mut impl Write, + buffer: &mut [u8], +) -> Result { + let mut hasher = D::new(); + loop { + let read = reader.read(buffer)?; + if read == 0 { + break; + } + writer.write_all(&buffer[..read])?; + hasher.update(&buffer[..read]); + } + Ok(hex::encode(hasher.finalize())) +} + #[cfg(test)] mod tests { - use super::{DigestAlgorithm, digest_bytes_for_digest, digest_hex, parse_digest}; + use super::{ + DigestAlgorithm, copy_reader_with_digest, digest_bytes_for_digest, digest_hex, parse_digest, + }; use crate::error::DockerPullError; #[test] @@ -186,4 +231,17 @@ mod tests { assert_eq!(actual, DigestAlgorithm::Sha384.digest_bytes(bytes)); assert!(actual.starts_with("sha384:")); } + + #[test] + fn copy_reader_with_digest_uses_requested_algorithm() { + let bytes = b"pocker"; + let expected = DigestAlgorithm::Sha512.digest_bytes(bytes); + let mut output = Vec::new(); + + let actual = copy_reader_with_digest(&expected, &bytes[..], &mut output) + .expect("supported digest should hash while copying"); + + assert_eq!(actual, expected); + assert_eq!(output, bytes); + } } diff --git a/src/docker/daemon.rs b/src/docker/daemon.rs index d7ad6e2..788ec86 100644 --- a/src/docker/daemon.rs +++ b/src/docker/daemon.rs @@ -5,6 +5,7 @@ use serde::Deserialize; use serde::de::DeserializeOwned; use serde_json::Value; use tokio::io::DuplexStream; +use tokio::sync::OnceCell; use tokio_util::io::ReaderStream; use crate::error::Result; @@ -19,6 +20,8 @@ pub(super) struct DockerDaemon { transport: DockerTransport, } +static SHARED_DAEMON: OnceCell = OnceCell::const_new(); + impl DockerDaemon { pub(super) fn connect() -> Result { Ok(Self { @@ -26,6 +29,12 @@ impl DockerDaemon { }) } + pub(super) async fn shared() -> Result<&'static Self> { + SHARED_DAEMON + .get_or_try_init(|| async { Self::connect() }) + .await + } + pub(super) async fn load_archive(&self, path: &Path) -> Result<()> { self.transport.load_archive(path).await } @@ -93,8 +102,8 @@ impl DockerDaemon { "POST", &format!( "/images/create?fromImage={}&tag={}", - encode_query_value(repository), - encode_query_value(tag) + encode_query_value(&repository), + encode_query_value(&tag) ), None, ) @@ -115,8 +124,8 @@ impl DockerDaemon { &format!( "/images/{}/tag?repo={}&tag={}", encode_path_segment(source), - encode_query_value(repository), - encode_query_value(tag) + encode_query_value(&repository), + encode_query_value(&tag) ), None, ) diff --git a/src/docker/layers.rs b/src/docker/layers.rs index 249950c..7f870cb 100644 --- a/src/docker/layers.rs +++ b/src/docker/layers.rs @@ -4,13 +4,12 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use serde::Deserialize; -use sha2::{Digest as _, Sha256}; use tar::Archive; use tempfile::{NamedTempFile, TempDir}; use tokio::sync::OnceCell; -use tokio::task::JoinSet; +use tokio::task::{self, JoinSet}; -use crate::digest::parse_digest; +use crate::digest::{copy_reader_with_digest, parse_digest}; use crate::error::{DockerPullError, Result}; use crate::image::LayerSpec; use crate::store::Store; @@ -38,8 +37,8 @@ impl DaemonLayerCache { let images = self .images .get_or_try_init(|| async { - let daemon = DockerDaemon::connect()?; - list_daemon_images(&daemon).await + let daemon = DockerDaemon::shared().await?; + list_daemon_images(daemon).await }) .await?; let chosen = choose_from_daemon_images(images.iter().cloned(), &wanted); @@ -168,12 +167,12 @@ pub async fn materialize_daemon_layers( }); } - let daemon = DockerDaemon::connect()?; - let chosen = choose_daemon_images(&daemon, &wanted).await?; + let daemon = DockerDaemon::shared().await?; + let chosen = choose_daemon_images(daemon, &wanted).await?; let tempdir = tempfile::tempdir_in(store.root())?; let mut paths = HashMap::new(); for chosen in &chosen { - materialize_layers_from_saved_image(store, &daemon, chosen, tempdir.path(), &mut paths) + materialize_layers_from_saved_image(store, daemon, chosen, tempdir.path(), &mut paths) .await?; } @@ -251,12 +250,32 @@ async fn materialize_layers_from_saved_image( return Ok(()); } - let entries = save_manifest_entries(temp.path())?; + let archive_path = temp.path().to_path_buf(); + let output_root = output_root.to_path_buf(); + let chosen = chosen.clone(); + let extracted = task::spawn_blocking(move || { + materialize_layers_from_saved_archive(&archive_path, &chosen, &output_root) + }) + .await + .map_err(|error| { + DockerPullError::CommandFailed(format!("docker layer materialization task failed: {error}")) + })??; + + paths.extend(extracted); + Ok(()) +} + +fn materialize_layers_from_saved_archive( + archive_path: &Path, + chosen: &ChosenImageLayers, + output_root: &Path, +) -> Result> { + let entries = save_manifest_entries(archive_path)?; let Some(entry) = entries.into_iter().next() else { - return Ok(()); + return Ok(HashMap::new()); }; if entry.layers.len() != chosen.image.rootfs_layers().len() { - return Ok(()); + return Ok(HashMap::new()); } let targets = chosen @@ -269,10 +288,11 @@ async fn materialize_layers_from_saved_image( .map(|(diff_id, path)| (path, diff_id)) .collect::>(); if targets.is_empty() { - return Ok(()); + return Ok(HashMap::new()); } - let file = File::open(temp.path())?; + let mut paths = HashMap::new(); + let file = File::open(archive_path)?; let mut archive = Archive::new(file); for entry in archive.entries()? { @@ -281,7 +301,7 @@ async fn materialize_layers_from_saved_image( let Some(diff_id) = targets.get(&path) else { continue; }; - if paths.contains_key(diff_id.as_str()) { + if paths.contains_key(diff_id) { continue; } let destination = extracted_layer_path(output_root, diff_id)?; @@ -289,7 +309,7 @@ async fn materialize_layers_from_saved_image( paths.insert(diff_id.clone(), destination); } - Ok(()) + Ok(paths) } fn save_manifest_entries(path: &Path) -> Result> { @@ -320,19 +340,9 @@ fn copy_archive_entry_with_digest( expected_digest: &str, ) -> Result<()> { let mut file = File::create(path)?; - let mut hasher = Sha256::new(); - let mut buffer = [0_u8; 64 * 1024]; - loop { - let read = reader.read(&mut buffer)?; - if read == 0 { - break; - } - file.write_all(&buffer[..read])?; - hasher.update(&buffer[..read]); - } + let actual_digest = copy_reader_with_digest(expected_digest, reader, &mut file)?; file.flush()?; file.sync_data()?; - let actual_digest = format!("sha256:{}", hex::encode(hasher.finalize())); if actual_digest != expected_digest { return Err(DockerPullError::DigestMismatch { digest: expected_digest.to_string(), diff --git a/src/docker/mod.rs b/src/docker/mod.rs index a7933a3..78bc88f 100644 --- a/src/docker/mod.rs +++ b/src/docker/mod.rs @@ -9,7 +9,7 @@ use crate::error::{DockerPullError, Result}; use crate::export::oci_archive::{ PreparedOciArchive, prepare_oci_archive, write_prepared_oci_archive_to_writer, }; -use crate::reference::{ImageReference, ReferenceTarget}; +use crate::reference::{ImageReference, ReferenceTarget, is_docker_hub}; use crate::store::{Store, StoredReference}; mod daemon; @@ -56,7 +56,7 @@ pub async fn load_reference_archive_stream( store: &Store, reference: &StoredReference, ) -> Result<()> { - let daemon = DockerDaemon::connect()?; + let daemon = DockerDaemon::shared().await?; let prepared = prepare_oci_archive(store, reference).await?; let (reader, writer) = tokio::io::duplex(LOAD_ARCHIVE_STREAM_BUFFER_BYTES); let writer = SyncIoBridge::new(writer); @@ -98,7 +98,7 @@ fn write_archive_to_stream( pub async fn daemon_has_reference(reference: &ImageReference, config_digest: &str) -> Result { let inspect_target = daemon_inspect_target(reference, config_digest); - let daemon = DockerDaemon::connect()?; + let daemon = DockerDaemon::shared().await?; let Some(image) = daemon.inspect_daemon_image(&inspect_target).await? else { return Ok(false); }; @@ -118,27 +118,36 @@ fn normalize_image_id(image_id: &str) -> &str { } pub async fn load_archive(path: &Path) -> Result<()> { - DockerDaemon::connect()?.load_archive(path).await + DockerDaemon::shared().await?.load_archive(path).await } pub async fn pull_image(reference: &str) -> Result<()> { - DockerDaemon::connect()?.pull_image(reference).await + DockerDaemon::shared().await?.pull_image(reference).await } pub async fn tag_image(source: &str, target: &str) -> Result<()> { - DockerDaemon::connect()?.tag_image(source, target).await + DockerDaemon::shared() + .await? + .tag_image(source, target) + .await } pub async fn remove_image_tag(reference: &str) -> Result<()> { - DockerDaemon::connect()?.remove_image_tag(reference).await + DockerDaemon::shared() + .await? + .remove_image_tag(reference) + .await } pub async fn inspect_image(reference: &str) -> Result> { - DockerDaemon::connect()?.inspect_image_json(reference).await + DockerDaemon::shared() + .await? + .inspect_image_json(reference) + .await } pub async fn list_images() -> Result> { - let daemon = DockerDaemon::connect()?; + let daemon = DockerDaemon::shared().await?; let images = daemon.list_image_summaries().await?; Ok(images .into_iter() @@ -152,7 +161,10 @@ pub async fn list_images() -> Result> { } pub async fn save_image(reference: &str, path: &Path) -> Result<()> { - DockerDaemon::connect()?.save_image(reference, path).await + DockerDaemon::shared() + .await? + .save_image(reference, path) + .await } fn encode_path_segment(value: &str) -> String { @@ -163,22 +175,28 @@ fn encode_query_value(value: &str) -> String { utf8_percent_encode(value, QUERY_VALUE_ENCODE_SET).to_string() } -fn split_tagged_reference(reference: &str) -> Result<(&str, &str)> { - if reference.contains('@') { +fn split_tagged_reference(reference: &str) -> Result<(String, String)> { + let reference = ImageReference::parse(reference)?; + let ReferenceTarget::Tag(tag) = &reference.target else { return Err(DockerPullError::InvalidInput(format!( - "image reference `{reference}` is not a tagged reference" - ))); - } - let slash = reference.rfind('/'); - let colon = reference.rfind(':').ok_or_else(|| { - DockerPullError::InvalidInput(format!("image reference `{reference}` is missing a tag")) - })?; - if slash.is_some_and(|slash| colon < slash) { - return Err(DockerPullError::InvalidInput(format!( - "image reference `{reference}` is missing a tag" + "image reference `{}` is not a tagged reference", + reference.display_name() ))); + }; + + Ok((tagged_repository_name(&reference), tag.clone())) +} + +fn tagged_repository_name(reference: &ImageReference) -> String { + if is_docker_hub(&reference.registry) { + return reference + .repository + .strip_prefix("library/") + .unwrap_or(&reference.repository) + .to_string(); } - Ok((&reference[..colon], &reference[colon + 1..])) + + format!("{}/{}", reference.registry, reference.repository) } fn ensure_json_stream_success(body: String, action: &str) -> Result<()> { @@ -213,10 +231,12 @@ mod tests { #[cfg(unix)] use std::path::Path; + use super::layers::ordered_unique_image_ids; use super::transport::windows::{decode_chunked_body, header_value, parse_response_head}; use super::transport::{DEFAULT_DOCKER_HOST, DockerEndpoint, docker_endpoint_from_host}; - use super::{daemon_inspect_target, encode_path_segment, encode_query_value}; - use super::{layers::ordered_unique_image_ids, split_tagged_reference}; + use super::{ + daemon_inspect_target, encode_path_segment, encode_query_value, split_tagged_reference, + }; use crate::reference::ImageReference; #[test] @@ -271,7 +291,10 @@ mod tests { assert_eq!( split_tagged_reference("127.0.0.1:5000/pocker/image:latest") .expect("reference should split"), - ("127.0.0.1:5000/pocker/image", "latest") + ( + "127.0.0.1:5000/pocker/image".to_string(), + "latest".to_string() + ) ); } @@ -363,4 +386,21 @@ mod tests { "example.com%2Facme%2Fapp%26name%3Dvalue%2Btag%3Alatest" ); } + + #[test] + fn tagged_reference_parts_use_parsed_reference() { + let (repository, tag) = super::split_tagged_reference("docker.io/library/alpine:3.20") + .expect("tagged reference should parse"); + + assert_eq!(repository, "alpine"); + assert_eq!(tag, "3.20"); + } + + #[test] + fn tagged_reference_parts_reject_digest_references() { + super::split_tagged_reference( + "ghcr.io/acme/app@sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + ) + .expect_err("digest reference is not a tagged target"); + } } diff --git a/src/docker/transport.rs b/src/docker/transport.rs index 70e92a4..26aca27 100644 --- a/src/docker/transport.rs +++ b/src/docker/transport.rs @@ -201,14 +201,20 @@ pub(super) fn ensure_success_status(status: StatusCode, body: Vec, action: & return Ok(()); } - let body = String::from_utf8_lossy(&body); + Err(build_failure_error(status, &body, action)) +} + +pub(super) fn build_failure_error( + status: StatusCode, + body: &[u8], + action: &str, +) -> DockerPullError { + let body = String::from_utf8_lossy(body); let body = body.trim(); let detail = if body.is_empty() { format!("status {status}") } else { format!("status {status}: {body}") }; - Err(DockerPullError::CommandFailed(format!( - "{action} failed: {detail}" - ))) + DockerPullError::CommandFailed(format!("{action} failed: {detail}")) } diff --git a/src/docker/transport_reqwest.rs b/src/docker/transport_reqwest.rs index 1961416..ba43113 100644 --- a/src/docker/transport_reqwest.rs +++ b/src/docker/transport_reqwest.rs @@ -11,7 +11,7 @@ use tokio_util::io::ReaderStream; use crate::error::{DockerPullError, Result}; use crate::http::{CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT, USER_AGENT}; -use super::DockerResponse; +use super::{DockerResponse, build_failure_error}; #[derive(Debug, Clone)] pub(in crate::docker) struct ReqwestTransport { @@ -122,13 +122,5 @@ async fn ensure_success(response: Response, action: &str) -> Result { let status = response.status(); let body = response.text().await.unwrap_or_default(); - let body = body.trim(); - let detail = if body.is_empty() { - format!("status {status}") - } else { - format!("status {status}: {body}") - }; - Err(DockerPullError::CommandFailed(format!( - "{action} failed: {detail}" - ))) + Err(build_failure_error(status, body.as_bytes(), action)) } diff --git a/src/docker/transport_windows.rs b/src/docker/transport_windows.rs index 7e74681..bed22ba 100644 --- a/src/docker/transport_windows.rs +++ b/src/docker/transport_windows.rs @@ -57,9 +57,8 @@ pub(super) async fn request_bytes( ) -> Result { let mut pipe = open_named_pipe(pipe_path).await?; let len = body.len(); - let headers = format!( - "{method} {path} HTTP/1.1\r\nHost: docker\r\nUser-Agent: {USER_AGENT}\r\nConnection: close\r\nContent-Length: {len}\r\n\r\n" - ); + let len = len.to_string(); + let headers = build_request_head(method, path, &[("Content-Length", &len)]); pipe.write_all(headers.as_bytes()).await?; pipe.write_all(&body).await?; pipe.flush().await?; @@ -76,8 +75,11 @@ pub(super) async fn request_file( len: u64, ) -> Result { let mut pipe = open_named_pipe(pipe_path).await?; - let headers = format!( - "{method} {path} HTTP/1.1\r\nHost: docker\r\nUser-Agent: {USER_AGENT}\r\nConnection: close\r\nContent-Type: {content_type}\r\nContent-Length: {len}\r\n\r\n" + let len = len.to_string(); + let headers = build_request_head( + method, + path, + &[("Content-Type", content_type), ("Content-Length", &len)], ); pipe.write_all(headers.as_bytes()).await?; tokio::io::copy(&mut file, &mut pipe).await?; @@ -94,8 +96,13 @@ pub(super) async fn request_chunked_stream( mut stream: ReaderStream, ) -> Result { let mut pipe = open_named_pipe(pipe_path).await?; - let headers = format!( - "{method} {path} HTTP/1.1\r\nHost: docker\r\nUser-Agent: {USER_AGENT}\r\nConnection: close\r\nContent-Type: {content_type}\r\nTransfer-Encoding: chunked\r\n\r\n" + let headers = build_request_head( + method, + path, + &[ + ("Content-Type", content_type), + ("Transfer-Encoding", "chunked"), + ], ); pipe.write_all(headers.as_bytes()).await?; while let Some(chunk) = stream.next().await { @@ -119,9 +126,7 @@ pub(super) async fn request_to_file( action: &str, ) -> Result<()> { let mut pipe = open_named_pipe(pipe_path).await?; - let headers = format!( - "{method} {path} HTTP/1.1\r\nHost: docker\r\nUser-Agent: {USER_AGENT}\r\nConnection: close\r\nContent-Length: 0\r\n\r\n" - ); + let headers = build_request_head(method, path, &[("Content-Length", "0")]); pipe.write_all(headers.as_bytes()).await?; pipe.flush().await?; @@ -137,6 +142,21 @@ pub(super) async fn request_to_file( Ok(()) } +#[cfg(windows)] +fn build_request_head(method: &str, path: &str, headers: &[(&str, &str)]) -> String { + let mut head = format!( + "{method} {path} HTTP/1.1\r\nHost: docker\r\nUser-Agent: {USER_AGENT}\r\nConnection: close\r\n" + ); + for (name, value) in headers { + head.push_str(name); + head.push_str(": "); + head.push_str(value); + head.push_str("\r\n"); + } + head.push_str("\r\n"); + head +} + #[cfg(windows)] async fn open_named_pipe( pipe_path: &Path, @@ -290,12 +310,13 @@ where #[cfg(any(test, windows))] async fn write_chunked_body_to_file( reader: &mut R, - mut buffer: Vec, + buffer: Vec, file: &mut tokio::fs::File, ) -> Result<()> where R: tokio::io::AsyncRead + Unpin, { + let mut buffer = ChunkReadBuffer::new(buffer); loop { let size = read_chunk_size(reader, &mut buffer).await?; if size == 0 { @@ -308,23 +329,19 @@ where } #[cfg(any(test, windows))] -async fn read_chunk_size(reader: &mut R, buffer: &mut Vec) -> Result +async fn read_chunk_size(reader: &mut R, buffer: &mut ChunkReadBuffer) -> Result where R: tokio::io::AsyncRead + Unpin, { loop { - if let Some(line_end) = find_bytes(buffer, b"\r\n") { - let line = buffer.drain(..line_end + 2).collect::>(); - let size_line = std::str::from_utf8(&line[..line_end]).map_err(|error| { - DockerPullError::BadResponse(format!("invalid chunk size: {error}")) - })?; - let size_hex = size_line - .split_once(';') - .map(|(size, _)| size) - .unwrap_or(size_line); - return usize::from_str_radix(size_hex.trim(), 16).map_err(|error| { - DockerPullError::BadResponse(format!("invalid chunk size: {error}")) - }); + if let Some(line_end) = find_crlf(buffer.available(), 0) { + let size_line = + std::str::from_utf8(&buffer.available()[..line_end]).map_err(|error| { + DockerPullError::BadResponse(format!("invalid chunk size: {error}")) + })?; + let size = parse_chunk_size(size_line)?; + buffer.consume(line_end + 2); + return Ok(size); } read_more(reader, buffer).await?; } @@ -333,7 +350,7 @@ where #[cfg(any(test, windows))] async fn write_chunk_data( reader: &mut R, - buffer: &mut Vec, + buffer: &mut ChunkReadBuffer, file: &mut tokio::fs::File, mut remaining: usize, ) -> Result<()> @@ -341,37 +358,37 @@ where R: tokio::io::AsyncRead + Unpin, { while remaining > 0 { - if buffer.is_empty() { + if buffer.available().is_empty() { read_more(reader, buffer).await?; } - let count = remaining.min(buffer.len()); - file.write_all(&buffer[..count]).await?; - buffer.drain(..count); + let count = remaining.min(buffer.available().len()); + file.write_all(&buffer.available()[..count]).await?; + buffer.consume(count); remaining -= count; } Ok(()) } #[cfg(any(test, windows))] -async fn consume_chunk_crlf(reader: &mut R, buffer: &mut Vec) -> Result<()> +async fn consume_chunk_crlf(reader: &mut R, buffer: &mut ChunkReadBuffer) -> Result<()> where R: tokio::io::AsyncRead + Unpin, { - while buffer.len() < 2 { + while buffer.available().len() < 2 { read_more(reader, buffer).await?; } - if &buffer[..2] != b"\r\n" { + if &buffer.available()[..2] != b"\r\n" { return Err(DockerPullError::BadResponse( "chunked docker API response is missing chunk terminator".into(), )); } - buffer.drain(..2); + buffer.consume(2); Ok(()) } #[cfg(any(test, windows))] -async fn read_more(reader: &mut R, buffer: &mut Vec) -> Result<()> +async fn read_more(reader: &mut R, buffer: &mut ChunkReadBuffer) -> Result<()> where R: tokio::io::AsyncRead + Unpin, { @@ -386,6 +403,42 @@ where Ok(()) } +#[cfg(any(test, windows))] +struct ChunkReadBuffer { + bytes: Vec, + position: usize, +} + +#[cfg(any(test, windows))] +impl ChunkReadBuffer { + fn new(bytes: Vec) -> Self { + Self { bytes, position: 0 } + } + + fn available(&self) -> &[u8] { + &self.bytes[self.position..] + } + + fn consume(&mut self, count: usize) { + debug_assert!( + count <= self.available().len(), + "consume count exceeds available bytes" + ); + self.position += count; + if self.position == self.bytes.len() { + self.bytes.clear(); + self.position = 0; + } else if self.position > 8192 && self.position * 2 > self.bytes.len() { + self.bytes.drain(..self.position); + self.position = 0; + } + } + + fn extend_from_slice(&mut self, bytes: &[u8]) { + self.bytes.extend_from_slice(bytes); + } +} + #[cfg(any(test, windows))] pub(crate) fn parse_response_head(bytes: &[u8]) -> Result<(StatusCode, Vec<(String, String)>)> { let mut headers = [httparse::EMPTY_HEADER; 128]; @@ -427,7 +480,7 @@ pub(crate) fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> O .map(|(_, value)| value.as_str()) } -#[cfg(any(test, windows))] +#[cfg(windows)] fn find_bytes(haystack: &[u8], needle: &[u8]) -> Option { haystack .windows(needle.len()) @@ -439,26 +492,22 @@ pub(crate) fn decode_chunked_body(bytes: &[u8]) -> Result> { let mut decoded = Vec::new(); let mut index = 0; loop { - let Some(line_end) = find_bytes(&bytes[index..], b"\r\n") else { + let Some(line_end) = find_crlf(bytes, index) else { return Err(DockerPullError::BadResponse( "chunked docker API response is missing chunk size".into(), )); }; - let size_line = std::str::from_utf8(&bytes[index..index + line_end]).map_err(|error| { - DockerPullError::BadResponse(format!("invalid chunk size: {error}")) - })?; - let size_hex = size_line - .split_once(';') - .map(|(size, _)| size) - .unwrap_or(size_line); - let size = usize::from_str_radix(size_hex.trim(), 16).map_err(|error| { + let size_line = std::str::from_utf8(&bytes[index..line_end]).map_err(|error| { DockerPullError::BadResponse(format!("invalid chunk size: {error}")) })?; - index += line_end + 2; + let size = parse_chunk_size(size_line)?; + index = line_end + 2; if size == 0 { return Ok(decoded); } - let chunk_end = index + size; + let chunk_end = index.checked_add(size).ok_or_else(|| { + DockerPullError::BadResponse("chunked docker API response is too large".into()) + })?; if bytes.len() < chunk_end + 2 || &bytes[chunk_end..chunk_end + 2] != b"\r\n" { return Err(DockerPullError::BadResponse( "chunked docker API response is truncated".into(), @@ -469,6 +518,25 @@ pub(crate) fn decode_chunked_body(bytes: &[u8]) -> Result> { } } +#[cfg(any(test, windows))] +fn find_crlf(bytes: &[u8], start: usize) -> Option { + let mut index = start; + while index + 1 < bytes.len() { + if bytes[index] == b'\r' && bytes[index + 1] == b'\n' { + return Some(index); + } + index += 1; + } + None +} + +#[cfg(any(test, windows))] +fn parse_chunk_size(line: &str) -> Result { + let size_hex = line.split_once(';').map(|(size, _)| size).unwrap_or(line); + usize::from_str_radix(size_hex.trim(), 16) + .map_err(|error| DockerPullError::BadResponse(format!("invalid chunk size: {error}"))) +} + #[cfg(test)] mod tests { use tempfile::tempdir;