Skip to content
20 changes: 20 additions & 0 deletions ci/smoke-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ set -euo pipefail

BIN="${1:-target/debug/pocker}"
PUBLIC_REF="registry.k8s.io/pause:3.9"
SECOND_PUBLIC_REF="registry.k8s.io/pause:3.8"
RESUME_REGISTRY_PORT="${RESUME_REGISTRY_PORT:-5002}"
AUTH_REGISTRY_PORT="${AUTH_REGISTRY_PORT:-5003}"
RESUME_REF="localhost:${RESUME_REGISTRY_PORT}/pocker/resume:latest"
Expand Down Expand Up @@ -67,6 +68,25 @@ if docker images --format '{{.Repository}}:{{.Tag}}' | grep -q 'pocker-cache'; t
exit 1
fi

echo "smoke: parallel cached load into Docker"
docker image rm -f "${PUBLIC_REF}" "${SECOND_PUBLIC_REF}" >/dev/null 2>&1 || true
run_pocker \
--cache-dir "${WORKDIR}/cache-parallel-load" \
pull \
--no-load \
--max-parallel-images 2 \
"${PUBLIC_REF}" \
"${SECOND_PUBLIC_REF}"
docker image rm -f "${PUBLIC_REF}" "${SECOND_PUBLIC_REF}" >/dev/null 2>&1 || true
run_pocker \
--cache-dir "${WORKDIR}/cache-parallel-load" \
pull \
--max-parallel-images 2 \
"${PUBLIC_REF}" \
"${SECOND_PUBLIC_REF}"
docker image inspect "${PUBLIC_REF}" >/dev/null
docker image inspect "${SECOND_PUBLIC_REF}" >/dev/null

echo "smoke: compose config and service-filtered pull"
mkdir -p "${WORKDIR}/compose"
cat > "${WORKDIR}/compose/.env" <<EOF
Expand Down
62 changes: 60 additions & 2 deletions src/digest.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fmt;
use std::io::Read;
use std::io::{Read, Write};
use std::path::Path;

use sha2::{Sha256, Sha384, Sha512};
Expand Down Expand Up @@ -60,6 +60,22 @@ impl DigestAlgorithm {
.map(|digest| format!("{}:{digest}", self)),
}
}

fn copy_reader_with_digest(
self,
mut reader: impl Read,
mut writer: impl Write,
) -> Result<String> {
let mut buffer = [0_u8; 64 * 1024];
match self {
Self::Sha256 => copy_and_hash_reader::<Sha256>(&mut reader, &mut writer, &mut buffer)
.map(|digest| format!("{}:{digest}", self)),
Self::Sha384 => copy_and_hash_reader::<Sha384>(&mut reader, &mut writer, &mut buffer)
.map(|digest| format!("{}:{digest}", self)),
Self::Sha512 => copy_and_hash_reader::<Sha512>(&mut reader, &mut writer, &mut buffer)
.map(|digest| format!("{}:{digest}", self)),
}
}
}

impl fmt::Display for DigestAlgorithm {
Expand Down Expand Up @@ -102,6 +118,16 @@ pub(crate) fn digest_file_for_digest(digest: &str, path: &Path) -> Result<String
parse_digest(digest)?.algorithm.digest_reader(file)
}

pub(crate) fn copy_reader_with_digest(
digest: &str,
reader: impl Read,
writer: impl Write,
) -> Result<String> {
parse_digest(digest)?
.algorithm
.copy_reader_with_digest(reader, writer)
}

pub(crate) fn canonical_digest_bytes(bytes: &[u8]) -> String {
format!("sha256:{}", sha256_hex(bytes))
}
Expand All @@ -128,9 +154,28 @@ fn hash_reader<D: sha2::Digest>(reader: &mut impl Read, buffer: &mut [u8]) -> Re
Ok(hex::encode(hasher.finalize()))
}

fn copy_and_hash_reader<D: sha2::Digest>(
reader: &mut impl Read,
writer: &mut impl Write,
buffer: &mut [u8],
) -> Result<String> {
let mut hasher = D::new();
loop {
let read = reader.read(buffer)?;
if read == 0 {
break;
}
writer.write_all(&buffer[..read])?;
hasher.update(&buffer[..read]);
}
Ok(hex::encode(hasher.finalize()))
}

#[cfg(test)]
mod tests {
use super::{DigestAlgorithm, digest_bytes_for_digest, digest_hex, parse_digest};
use super::{
DigestAlgorithm, copy_reader_with_digest, digest_bytes_for_digest, digest_hex, parse_digest,
};
use crate::error::DockerPullError;

#[test]
Expand Down Expand Up @@ -186,4 +231,17 @@ mod tests {
assert_eq!(actual, DigestAlgorithm::Sha384.digest_bytes(bytes));
assert!(actual.starts_with("sha384:"));
}

#[test]
fn copy_reader_with_digest_uses_requested_algorithm() {
let bytes = b"pocker";
let expected = DigestAlgorithm::Sha512.digest_bytes(bytes);
let mut output = Vec::new();

let actual = copy_reader_with_digest(&expected, &bytes[..], &mut output)
.expect("supported digest should hash while copying");

assert_eq!(actual, expected);
assert_eq!(output, bytes);
}
}
17 changes: 13 additions & 4 deletions src/docker/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::Deserialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio::io::DuplexStream;
use tokio::sync::OnceCell;
use tokio_util::io::ReaderStream;

use crate::error::Result;
Expand All @@ -19,13 +20,21 @@ pub(super) struct DockerDaemon {
transport: DockerTransport,
}

static SHARED_DAEMON: OnceCell<DockerDaemon> = OnceCell::const_new();

impl DockerDaemon {
pub(super) fn connect() -> Result<Self> {
Ok(Self {
transport: DockerTransport::connect()?,
})
}

pub(super) async fn shared() -> Result<&'static Self> {
SHARED_DAEMON
.get_or_try_init(|| async { Self::connect() })
.await
}

pub(super) async fn load_archive(&self, path: &Path) -> Result<()> {
self.transport.load_archive(path).await
}
Expand Down Expand Up @@ -93,8 +102,8 @@ impl DockerDaemon {
"POST",
&format!(
"/images/create?fromImage={}&tag={}",
encode_query_value(repository),
encode_query_value(tag)
encode_query_value(&repository),
encode_query_value(&tag)
),
None,
)
Expand All @@ -115,8 +124,8 @@ impl DockerDaemon {
&format!(
"/images/{}/tag?repo={}&tag={}",
encode_path_segment(source),
encode_query_value(repository),
encode_query_value(tag)
encode_query_value(&repository),
encode_query_value(&tag)
),
None,
)
Expand Down
62 changes: 36 additions & 26 deletions src/docker/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::io::{Read, Write};
use std::path::{Path, PathBuf};

use serde::Deserialize;
use sha2::{Digest as _, Sha256};
use tar::Archive;
use tempfile::{NamedTempFile, TempDir};
use tokio::sync::OnceCell;
use tokio::task::JoinSet;
use tokio::task::{self, JoinSet};

use crate::digest::parse_digest;
use crate::digest::{copy_reader_with_digest, parse_digest};
use crate::error::{DockerPullError, Result};
use crate::image::LayerSpec;
use crate::store::Store;
Expand Down Expand Up @@ -38,8 +37,8 @@ impl DaemonLayerCache {
let images = self
.images
.get_or_try_init(|| async {
let daemon = DockerDaemon::connect()?;
list_daemon_images(&daemon).await
let daemon = DockerDaemon::shared().await?;
list_daemon_images(daemon).await
})
.await?;
let chosen = choose_from_daemon_images(images.iter().cloned(), &wanted);
Expand Down Expand Up @@ -168,12 +167,12 @@ pub async fn materialize_daemon_layers(
});
}

let daemon = DockerDaemon::connect()?;
let chosen = choose_daemon_images(&daemon, &wanted).await?;
let daemon = DockerDaemon::shared().await?;
let chosen = choose_daemon_images(daemon, &wanted).await?;
let tempdir = tempfile::tempdir_in(store.root())?;
let mut paths = HashMap::new();
for chosen in &chosen {
materialize_layers_from_saved_image(store, &daemon, chosen, tempdir.path(), &mut paths)
materialize_layers_from_saved_image(store, daemon, chosen, tempdir.path(), &mut paths)
.await?;
}

Expand Down Expand Up @@ -251,12 +250,32 @@ async fn materialize_layers_from_saved_image(
return Ok(());
}

let entries = save_manifest_entries(temp.path())?;
let archive_path = temp.path().to_path_buf();
let output_root = output_root.to_path_buf();
let chosen = chosen.clone();
let extracted = task::spawn_blocking(move || {
materialize_layers_from_saved_archive(&archive_path, &chosen, &output_root)
})
.await
.map_err(|error| {
DockerPullError::CommandFailed(format!("docker layer materialization task failed: {error}"))
})??;

paths.extend(extracted);
Ok(())
}

fn materialize_layers_from_saved_archive(
archive_path: &Path,
chosen: &ChosenImageLayers,
output_root: &Path,
) -> Result<HashMap<String, PathBuf>> {
let entries = save_manifest_entries(archive_path)?;
let Some(entry) = entries.into_iter().next() else {
return Ok(());
return Ok(HashMap::new());
};
if entry.layers.len() != chosen.image.rootfs_layers().len() {
return Ok(());
return Ok(HashMap::new());
}

let targets = chosen
Expand All @@ -269,10 +288,11 @@ async fn materialize_layers_from_saved_image(
.map(|(diff_id, path)| (path, diff_id))
.collect::<HashMap<_, _>>();
if targets.is_empty() {
return Ok(());
return Ok(HashMap::new());
}

let file = File::open(temp.path())?;
let mut paths = HashMap::new();
let file = File::open(archive_path)?;
let mut archive = Archive::new(file);

for entry in archive.entries()? {
Expand All @@ -281,15 +301,15 @@ async fn materialize_layers_from_saved_image(
let Some(diff_id) = targets.get(&path) else {
continue;
};
if paths.contains_key(diff_id.as_str()) {
if paths.contains_key(diff_id) {
continue;
}
let destination = extracted_layer_path(output_root, diff_id)?;
copy_archive_entry_with_digest(&mut entry, &destination, diff_id)?;
paths.insert(diff_id.clone(), destination);
}

Ok(())
Ok(paths)
}

fn save_manifest_entries(path: &Path) -> Result<Vec<SaveManifestEntry>> {
Expand Down Expand Up @@ -320,19 +340,9 @@ fn copy_archive_entry_with_digest<R: Read>(
expected_digest: &str,
) -> Result<()> {
let mut file = File::create(path)?;
let mut hasher = Sha256::new();
let mut buffer = [0_u8; 64 * 1024];
loop {
let read = reader.read(&mut buffer)?;
if read == 0 {
break;
}
file.write_all(&buffer[..read])?;
hasher.update(&buffer[..read]);
}
let actual_digest = copy_reader_with_digest(expected_digest, reader, &mut file)?;
file.flush()?;
file.sync_data()?;
let actual_digest = format!("sha256:{}", hex::encode(hasher.finalize()));
if actual_digest != expected_digest {
return Err(DockerPullError::DigestMismatch {
digest: expected_digest.to_string(),
Expand Down
Loading
Loading