Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
Expand Down Expand Up @@ -60,6 +61,7 @@ struct HelperResponse {
}

impl AuthResolver {
#[cfg(test)]
pub fn new(cli_credentials: Option<Credentials>) -> Result<Self> {
Ok(Self {
cli_credentials,
Expand All @@ -68,6 +70,19 @@ impl AuthResolver {
})
}

pub async fn new_async(cli_credentials: Option<Credentials>) -> Result<Self> {
let docker_config = tokio::task::spawn_blocking(load_docker_config)
.await
.map_err(|error| {
DockerPullError::InvalidInput(format!("docker config task panicked: {error}"))
})??;
Ok(Self {
cli_credentials,
docker_config: docker_config.map(Arc::new),
resolved: Mutex::new(HashMap::new()),
})
}

pub async fn resolve(&self, registry: &str) -> Result<Option<Credentials>> {
if let Some(credentials) = &self.cli_credentials {
return Ok(Some(credentials.clone()));
Expand Down Expand Up @@ -142,12 +157,9 @@ fn decode_auth_entry(auth: &str) -> Result<Credentials> {

fn load_docker_config() -> Result<Option<DockerConfig>> {
let path = docker_config_path();
if !path.exists() {
return Ok(None);
}

let content = match std::fs::read_to_string(&path) {
Ok(content) => content,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None),
Err(error) => {
warn!(
"failed to read docker config at `{}`: {}; continuing without docker auth",
Expand Down
3 changes: 2 additions & 1 deletion src/export/oci_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ async fn load_archive_inputs(store: &Store, reference: &StoredReference) -> Resu
let mut missing = Vec::new();
let mut resolved_diff_ids = Vec::with_capacity(diff_ids.len());
for (layer, diff_id) in manifest.layers.iter().zip(diff_ids) {
if !store.blob_path(&layer.digest)?.exists() {
let blob_path = store.blob_path(&layer.digest)?;
if !tokio_fs::try_exists(&blob_path).await? {
missing.push(diff_id.clone());
}
resolved_diff_ids.push(diff_id);
Expand Down
2 changes: 1 addition & 1 deletion src/pull/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub(crate) async fn pull_references(
.transpose()?
.unwrap_or_else(Platform::host);
let credentials = read_credentials(request.username, request.password_stdin)?;
let auth = Arc::new(AuthResolver::new(credentials)?);
let auth = Arc::new(AuthResolver::new_async(credentials).await?);
let client = Arc::new(RegistryClient::new_with_cache_from(
build_http_client(
request.plain_http
Expand Down
2 changes: 1 addition & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn run() -> Result<()> {
Commands::Serve(args) => {
let store = Arc::new(Store::open_active(cli.global.cache_dir.clone()).await?);
let credentials = read_credentials(args.auth.username, args.auth.password_stdin)?;
let auth = Arc::new(AuthResolver::new(credentials)?);
let auth = Arc::new(AuthResolver::new_async(credentials).await?);
let client = Arc::new(RegistryClient::new(
build_http_client(
args.registry.plain_http,
Expand Down
3 changes: 2 additions & 1 deletion src/serve_registry/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;

use tokio::fs as tokio_fs;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Semaphore, oneshot};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -249,7 +250,7 @@ async fn cached_manifest_response(
let Ok(path) = state.store.blob_path(requested_reference) else {
return Ok(None);
};
if path.exists() {
if tokio_fs::try_exists(&path).await? {
let descriptor = manifest_descriptor_from_blob(&state.store, requested_reference).await?;
return Ok(Some(
serve_manifest_blob(state, &descriptor, headers_only).await,
Expand Down
20 changes: 11 additions & 9 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::io::{ErrorKind, Write};
use std::path::Path;

use serde::Serialize;
Expand Down Expand Up @@ -31,18 +31,20 @@ pub fn atomic_write_json(path: &Path, value: &impl Serialize) -> Result<()> {
}

pub fn read_json_if_exists<T: DeserializeOwned>(path: &Path) -> Result<Option<T>> {
if !path.exists() {
return Ok(None);
}
let bytes = fs::read(path)?;
let bytes = match fs::read(path) {
Ok(bytes) => bytes,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(None),
Err(error) => return Err(error.into()),
};
serde_json::from_slice(&bytes).map(Some).map_err(Into::into)
}

pub fn reconcile_partial_file(path: &Path, durable_offset: u64) -> Result<u64> {
if !path.exists() {
return Ok(0);
}
let file_len = fs::metadata(path)?.len();
let file_len = match fs::metadata(path) {
Ok(metadata) => metadata.len(),
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(0),
Err(error) => return Err(error.into()),
};
if file_len > durable_offset {
let file = OpenOptions::new().write(true).open(path)?;
file.set_len(durable_offset)?;
Expand Down
79 changes: 55 additions & 24 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,15 @@ impl Store {
}

async fn open_with_lock(root: PathBuf, shared_lock: bool) -> Result<Self> {
ensure_directory(&root)?;
ensure_directory_blocking(root.clone()).await?;
let lock = if shared_lock {
Some(std::sync::Arc::new(
acquire_shared_cache_lock(root.clone()).await?,
))
} else {
None
};
for algorithm in DigestAlgorithm::SUPPORTED {
let algorithm = algorithm.to_string();
ensure_directory(&root.join("blobs").join(&algorithm))?;
ensure_directory(&root.join("partials").join(&algorithm))?;
}
ensure_directory(&root.join("references"))?;
ensure_store_layout_blocking(root.clone()).await?;
Ok(Self {
root,
shared_lock: lock,
Expand Down Expand Up @@ -114,7 +109,7 @@ impl Store {
pub async fn save_blob_bytes(&self, descriptor: &Descriptor, bytes: &[u8]) -> Result<()> {
let expected_size = descriptor.expected_size()?;
let path = self.blob_path(&descriptor.digest)?;
if path.exists()
if tokio_fs::try_exists(&path).await?
&& self
.ensure_blob_complete(&descriptor.digest, expected_size)
.await?
Expand All @@ -136,7 +131,7 @@ impl Store {
actual: actual_digest,
});
}
if let Err(error) = atomic_write_bytes(&path, bytes) {
if let Err(error) = atomic_write_bytes_blocking(path.clone(), bytes.to_vec()).await {
if is_concurrent_blob_save_race(&error)
&& self
.ensure_blob_complete(&descriptor.digest, expected_size)
Expand Down Expand Up @@ -185,9 +180,7 @@ impl Store {

pub async fn reset_partial(&self, digest: &str, expected_size: u64) -> Result<()> {
let partial = self.partial_path(digest)?;
if partial.exists() {
tokio_fs::remove_file(&partial).await?;
}
remove_file_if_exists(&partial).await?;
self.checkpoint_download(digest, 0, expected_size).await?;
Ok(())
}
Expand All @@ -214,7 +207,7 @@ impl Store {
});
}
if let Some(parent) = final_path.parent() {
ensure_directory(parent)?;
tokio_fs::create_dir_all(parent).await?;
}
tokio_fs::rename(&partial, &final_path).await?;

Expand All @@ -239,9 +232,10 @@ impl Store {
.and_then(|value| value.as_str())
.ok_or_else(|| DockerPullError::BadResponse("layer digest missing".into()))?;
let path = self.blob_path(digest)?;
if path.exists() {
tokio_fs::remove_file(path).await?;
removed += 1;
match tokio_fs::remove_file(path).await {
Ok(()) => removed += 1,
Err(error) if error.kind() == ErrorKind::NotFound => {}
Err(error) => return Err(error.into()),
}
}

Expand All @@ -250,11 +244,21 @@ impl Store {

pub async fn save_reference(&self, record: &StoredReference) -> Result<()> {
let path = self.reference_path(&record.reference);
atomic_write_json(&path, record)
let record = record.clone();
tokio::task::spawn_blocking(move || atomic_write_json(&path, &record))
.await
.map_err(|e| {
DockerPullError::InvalidInput(format!("reference save task panicked: {e}"))
})?
}

pub async fn load_reference(&self, reference: &str) -> Result<Option<StoredReference>> {
read_json_if_exists(&self.reference_path(reference))
let path = self.reference_path(reference);
tokio::task::spawn_blocking(move || read_json_if_exists(&path))
.await
.map_err(|e| {
DockerPullError::InvalidInput(format!("reference load task panicked: {e}"))
})?
}

pub async fn clear(&self) -> Result<ClearedCache> {
Expand Down Expand Up @@ -316,6 +320,26 @@ async fn acquire_shared_cache_lock(root: PathBuf) -> Result<File> {
.map_err(|e| DockerPullError::InvalidInput(format!("cache lock task panicked: {e}")))?
}

async fn ensure_directory_blocking(path: PathBuf) -> Result<()> {
tokio::task::spawn_blocking(move || ensure_directory(&path))
.await
.map_err(|e| DockerPullError::InvalidInput(format!("directory setup task panicked: {e}")))?
}

async fn ensure_store_layout_blocking(root: PathBuf) -> Result<()> {
tokio::task::spawn_blocking(move || {
for algorithm in DigestAlgorithm::SUPPORTED {
let algorithm = algorithm.to_string();
ensure_directory(&root.join("blobs").join(&algorithm))?;
ensure_directory(&root.join("partials").join(&algorithm))?;
}
ensure_directory(&root.join("references"))?;
Result::Ok(())
})
.await
.map_err(|e| DockerPullError::InvalidInput(format!("store layout task panicked: {e}")))?
}

fn open_lock_file(root: &Path) -> Result<File> {
let path = cache_lock_path(root);
if let Some(parent) = path.parent() {
Expand All @@ -335,7 +359,7 @@ fn cache_lock_path(root: &Path) -> PathBuf {
}

async fn remove_cache_contents(root: &Path) -> Result<()> {
if !root.exists() {
if !tokio_fs::try_exists(root).await? {
return Ok(());
}

Expand Down Expand Up @@ -445,6 +469,12 @@ async fn write_download_checkpoint_blocking(
.map_err(|e| DockerPullError::InvalidInput(format!("checkpoint task panicked: {e}")))?
}

async fn atomic_write_bytes_blocking(path: PathBuf, bytes: Vec<u8>) -> Result<()> {
tokio::task::spawn_blocking(move || atomic_write_bytes(&path, &bytes))
.await
.map_err(|e| DockerPullError::InvalidInput(format!("blob write task panicked: {e}")))?
}

async fn digest_file_for_digest_blocking(digest: String, path: PathBuf) -> Result<String> {
tokio::task::spawn_blocking(move || digest_file_for_digest(&digest, &path))
.await
Expand Down Expand Up @@ -496,11 +526,12 @@ fn collect_cache_files_recursive(
current: &Path,
files: &mut Vec<ClearedCacheFile>,
) -> Result<()> {
if !current.exists() {
return Ok(());
}

for entry in std::fs::read_dir(current)? {
let entries = match std::fs::read_dir(current) {
Ok(entries) => entries,
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(()),
Err(error) => return Err(error.into()),
};
for entry in entries {
let entry = entry?;
let path = entry.path();
if path == cache_lock_path(root) {
Expand Down
Loading