From cad15e953334fb19bbbb0fe4d2f1e25dfc25b470 Mon Sep 17 00:00:00 2001 From: PolarBearEs Date: Sat, 23 May 2026 21:27:50 +0200 Subject: [PATCH] Replace sync with async fs --- src/auth.rs | 20 +++++++-- src/export/oci_archive.rs | 3 +- src/pull/orchestrator.rs | 2 +- src/runtime.rs | 2 +- src/serve_registry/server.rs | 3 +- src/store/fs.rs | 20 +++++---- src/store/mod.rs | 79 +++++++++++++++++++++++++----------- 7 files changed, 88 insertions(+), 41 deletions(-) diff --git a/src/auth.rs b/src/auth.rs index 9fcbf5a..2ac06a1 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::io::ErrorKind; use std::path::PathBuf; use std::process::Stdio; use std::sync::Arc; @@ -60,6 +61,7 @@ struct HelperResponse { } impl AuthResolver { + #[cfg(test)] pub fn new(cli_credentials: Option) -> Result { Ok(Self { cli_credentials, @@ -68,6 +70,19 @@ impl AuthResolver { }) } + pub async fn new_async(cli_credentials: Option) -> Result { + let docker_config = tokio::task::spawn_blocking(load_docker_config) + .await + .map_err(|error| { + DockerPullError::InvalidInput(format!("docker config task panicked: {error}")) + })??; + Ok(Self { + cli_credentials, + docker_config: docker_config.map(Arc::new), + resolved: Mutex::new(HashMap::new()), + }) + } + pub async fn resolve(&self, registry: &str) -> Result> { if let Some(credentials) = &self.cli_credentials { return Ok(Some(credentials.clone())); @@ -142,12 +157,9 @@ fn decode_auth_entry(auth: &str) -> Result { fn load_docker_config() -> Result> { let path = docker_config_path(); - if !path.exists() { - return Ok(None); - } - let content = match std::fs::read_to_string(&path) { Ok(content) => content, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None), Err(error) => { warn!( "failed to read docker config at `{}`: {}; continuing without docker auth", diff --git a/src/export/oci_archive.rs b/src/export/oci_archive.rs index f62d541..8b6e836 100644 --- a/src/export/oci_archive.rs +++ b/src/export/oci_archive.rs @@ -204,7 +204,8 @@ async fn load_archive_inputs(store: &Store, reference: &StoredReference) -> Resu let mut missing = Vec::new(); let mut resolved_diff_ids = Vec::with_capacity(diff_ids.len()); for (layer, diff_id) in manifest.layers.iter().zip(diff_ids) { - if !store.blob_path(&layer.digest)?.exists() { + let blob_path = store.blob_path(&layer.digest)?; + if !tokio_fs::try_exists(&blob_path).await? { missing.push(diff_id.clone()); } resolved_diff_ids.push(diff_id); diff --git a/src/pull/orchestrator.rs b/src/pull/orchestrator.rs index 41e38e9..a9202b3 100644 --- a/src/pull/orchestrator.rs +++ b/src/pull/orchestrator.rs @@ -122,7 +122,7 @@ pub(crate) async fn pull_references( .transpose()? .unwrap_or_else(Platform::host); let credentials = read_credentials(request.username, request.password_stdin)?; - let auth = Arc::new(AuthResolver::new(credentials)?); + let auth = Arc::new(AuthResolver::new_async(credentials).await?); let client = Arc::new(RegistryClient::new_with_cache_from( build_http_client( request.plain_http diff --git a/src/runtime.rs b/src/runtime.rs index 85c0a38..a3002b2 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -62,7 +62,7 @@ pub async fn run() -> Result<()> { Commands::Serve(args) => { let store = Arc::new(Store::open_active(cli.global.cache_dir.clone()).await?); let credentials = read_credentials(args.auth.username, args.auth.password_stdin)?; - let auth = Arc::new(AuthResolver::new(credentials)?); + let auth = Arc::new(AuthResolver::new_async(credentials).await?); let client = Arc::new(RegistryClient::new( build_http_client( args.registry.plain_http, diff --git a/src/serve_registry/server.rs b/src/serve_registry/server.rs index 6d44899..0b24da7 100644 --- a/src/serve_registry/server.rs +++ b/src/serve_registry/server.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use tokio::fs as tokio_fs; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{Semaphore, oneshot}; use tokio::task::JoinSet; @@ -249,7 +250,7 @@ async fn cached_manifest_response( let Ok(path) = state.store.blob_path(requested_reference) else { return Ok(None); }; - if path.exists() { + if tokio_fs::try_exists(&path).await? { let descriptor = manifest_descriptor_from_blob(&state.store, requested_reference).await?; return Ok(Some( serve_manifest_blob(state, &descriptor, headers_only).await, diff --git a/src/store/fs.rs b/src/store/fs.rs index 6b8bde3..0dd1014 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -1,5 +1,5 @@ use std::fs::{self, OpenOptions}; -use std::io::Write; +use std::io::{ErrorKind, Write}; use std::path::Path; use serde::Serialize; @@ -31,18 +31,20 @@ pub fn atomic_write_json(path: &Path, value: &impl Serialize) -> Result<()> { } pub fn read_json_if_exists(path: &Path) -> Result> { - if !path.exists() { - return Ok(None); - } - let bytes = fs::read(path)?; + let bytes = match fs::read(path) { + Ok(bytes) => bytes, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None), + Err(error) => return Err(error.into()), + }; serde_json::from_slice(&bytes).map(Some).map_err(Into::into) } pub fn reconcile_partial_file(path: &Path, durable_offset: u64) -> Result { - if !path.exists() { - return Ok(0); - } - let file_len = fs::metadata(path)?.len(); + let file_len = match fs::metadata(path) { + Ok(metadata) => metadata.len(), + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(0), + Err(error) => return Err(error.into()), + }; if file_len > durable_offset { let file = OpenOptions::new().write(true).open(path)?; file.set_len(durable_offset)?; diff --git a/src/store/mod.rs b/src/store/mod.rs index bf87a91..ff7137e 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -66,7 +66,7 @@ impl Store { } async fn open_with_lock(root: PathBuf, shared_lock: bool) -> Result { - ensure_directory(&root)?; + ensure_directory_blocking(root.clone()).await?; let lock = if shared_lock { Some(std::sync::Arc::new( acquire_shared_cache_lock(root.clone()).await?, @@ -74,12 +74,7 @@ impl Store { } else { None }; - for algorithm in DigestAlgorithm::SUPPORTED { - let algorithm = algorithm.to_string(); - ensure_directory(&root.join("blobs").join(&algorithm))?; - ensure_directory(&root.join("partials").join(&algorithm))?; - } - ensure_directory(&root.join("references"))?; + ensure_store_layout_blocking(root.clone()).await?; Ok(Self { root, shared_lock: lock, @@ -114,7 +109,7 @@ impl Store { pub async fn save_blob_bytes(&self, descriptor: &Descriptor, bytes: &[u8]) -> Result<()> { let expected_size = descriptor.expected_size()?; let path = self.blob_path(&descriptor.digest)?; - if path.exists() + if tokio_fs::try_exists(&path).await? && self .ensure_blob_complete(&descriptor.digest, expected_size) .await? @@ -136,7 +131,7 @@ impl Store { actual: actual_digest, }); } - if let Err(error) = atomic_write_bytes(&path, bytes) { + if let Err(error) = atomic_write_bytes_blocking(path.clone(), bytes.to_vec()).await { if is_concurrent_blob_save_race(&error) && self .ensure_blob_complete(&descriptor.digest, expected_size) @@ -185,9 +180,7 @@ impl Store { pub async fn reset_partial(&self, digest: &str, expected_size: u64) -> Result<()> { let partial = self.partial_path(digest)?; - if partial.exists() { - tokio_fs::remove_file(&partial).await?; - } + remove_file_if_exists(&partial).await?; self.checkpoint_download(digest, 0, expected_size).await?; Ok(()) } @@ -214,7 +207,7 @@ impl Store { }); } if let Some(parent) = final_path.parent() { - ensure_directory(parent)?; + tokio_fs::create_dir_all(parent).await?; } tokio_fs::rename(&partial, &final_path).await?; @@ -239,9 +232,10 @@ impl Store { .and_then(|value| value.as_str()) .ok_or_else(|| DockerPullError::BadResponse("layer digest missing".into()))?; let path = self.blob_path(digest)?; - if path.exists() { - tokio_fs::remove_file(path).await?; - removed += 1; + match tokio_fs::remove_file(path).await { + Ok(()) => removed += 1, + Err(error) if error.kind() == ErrorKind::NotFound => {} + Err(error) => return Err(error.into()), } } @@ -250,11 +244,21 @@ impl Store { pub async fn save_reference(&self, record: &StoredReference) -> Result<()> { let path = self.reference_path(&record.reference); - atomic_write_json(&path, record) + let record = record.clone(); + tokio::task::spawn_blocking(move || atomic_write_json(&path, &record)) + .await + .map_err(|e| { + DockerPullError::InvalidInput(format!("reference save task panicked: {e}")) + })? } pub async fn load_reference(&self, reference: &str) -> Result> { - read_json_if_exists(&self.reference_path(reference)) + let path = self.reference_path(reference); + tokio::task::spawn_blocking(move || read_json_if_exists(&path)) + .await + .map_err(|e| { + DockerPullError::InvalidInput(format!("reference load task panicked: {e}")) + })? } pub async fn clear(&self) -> Result { @@ -316,6 +320,26 @@ async fn acquire_shared_cache_lock(root: PathBuf) -> Result { .map_err(|e| DockerPullError::InvalidInput(format!("cache lock task panicked: {e}")))? } +async fn ensure_directory_blocking(path: PathBuf) -> Result<()> { + tokio::task::spawn_blocking(move || ensure_directory(&path)) + .await + .map_err(|e| DockerPullError::InvalidInput(format!("directory setup task panicked: {e}")))? +} + +async fn ensure_store_layout_blocking(root: PathBuf) -> Result<()> { + tokio::task::spawn_blocking(move || { + for algorithm in DigestAlgorithm::SUPPORTED { + let algorithm = algorithm.to_string(); + ensure_directory(&root.join("blobs").join(&algorithm))?; + ensure_directory(&root.join("partials").join(&algorithm))?; + } + ensure_directory(&root.join("references"))?; + Result::Ok(()) + }) + .await + .map_err(|e| DockerPullError::InvalidInput(format!("store layout task panicked: {e}")))? +} + fn open_lock_file(root: &Path) -> Result { let path = cache_lock_path(root); if let Some(parent) = path.parent() { @@ -335,7 +359,7 @@ fn cache_lock_path(root: &Path) -> PathBuf { } async fn remove_cache_contents(root: &Path) -> Result<()> { - if !root.exists() { + if !tokio_fs::try_exists(root).await? { return Ok(()); } @@ -445,6 +469,12 @@ async fn write_download_checkpoint_blocking( .map_err(|e| DockerPullError::InvalidInput(format!("checkpoint task panicked: {e}")))? } +async fn atomic_write_bytes_blocking(path: PathBuf, bytes: Vec) -> Result<()> { + tokio::task::spawn_blocking(move || atomic_write_bytes(&path, &bytes)) + .await + .map_err(|e| DockerPullError::InvalidInput(format!("blob write task panicked: {e}")))? +} + async fn digest_file_for_digest_blocking(digest: String, path: PathBuf) -> Result { tokio::task::spawn_blocking(move || digest_file_for_digest(&digest, &path)) .await @@ -496,11 +526,12 @@ fn collect_cache_files_recursive( current: &Path, files: &mut Vec, ) -> Result<()> { - if !current.exists() { - return Ok(()); - } - - for entry in std::fs::read_dir(current)? { + let entries = match std::fs::read_dir(current) { + Ok(entries) => entries, + Err(error) if error.kind() == ErrorKind::NotFound => return Ok(()), + Err(error) => return Err(error.into()), + }; + for entry in entries { let entry = entry?; let path = entry.path(); if path == cache_lock_path(root) {