Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exhibit B: perf with tokio tasks #187

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 28 additions & 27 deletions crates/package-manager/src/install_without_lockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,37 @@ impl<'a, DependencyGroupList> InstallWithoutLockfile<'a, DependencyGroupList> {
impl<'a> InstallWithoutLockfile<'a, ()> {
/// Install dependencies of a dependency.
#[async_recursion]
async fn install_dependencies_from_registry(&self, package: &PackageVersion) {
let InstallWithoutLockfile { tarball_mem_cache, http_client, config, .. } = self;
async fn install_dependencies_from_registry(&self, _: &PackageVersion) {
todo!("fix this later");
// let InstallWithoutLockfile { tarball_mem_cache, http_client, config, .. } = self;

let node_modules_path = self
.config
.virtual_store_dir
.join(package.to_virtual_store_name())
.join("node_modules");
// let node_modules_path = self
// .config
// .virtual_store_dir
// .join(package.to_virtual_store_name())
// .join("node_modules");

tracing::info!(target: "pacquet::install", node_modules = ?node_modules_path, "Start subset");
// tracing::info!(target: "pacquet::install", node_modules = ?node_modules_path, "Start subset");

package
.dependencies(self.config.auto_install_peers)
.map(|(name, version_range)| async {
let dependency = InstallPackageFromRegistry {
tarball_mem_cache,
http_client,
config,
node_modules_dir: &node_modules_path,
name,
version_range,
}
.run::<Version>()
.await
.unwrap(); // TODO: proper error propagation
self.install_dependencies_from_registry(&dependency).await;
})
.pipe(future::join_all)
.await;
// package
// .dependencies(self.config.auto_install_peers)
// .map(|(name, version_range)| async {
// let dependency = InstallPackageFromRegistry {
// tarball_mem_cache,
// http_client,
// config,
// node_modules_dir: &node_modules_path,
// name,
// version_range,
// }
// .run::<Version>()
// .await
// .unwrap(); // TODO: proper error propagation
// self.install_dependencies_from_registry(&dependency).await;
// })
// .pipe(future::join_all)
// .await;

tracing::info!(target: "pacquet::install", node_modules = ?node_modules_path, "Complete subset");
// tracing::info!(target: "pacquet::install", node_modules = ?node_modules_path, "Complete subset");
}
}
1 change: 1 addition & 0 deletions crates/store-dir/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
sha2 = { workspace = true }
ssri = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
pretty_assertions = { workspace = true }
Expand Down
19 changes: 13 additions & 6 deletions crates/store-dir/src/cas_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ pub enum WriteCasFileError {

impl StoreDir {
/// Write a file from an npm package to the store directory.
pub fn write_cas_file(
pub async fn write_cas_file<Buffer>(
&self,
buffer: &[u8],
buffer: Buffer,
executable: bool,
) -> Result<(PathBuf, FileHash), WriteCasFileError> {
let file_hash = Sha512::digest(buffer);
) -> Result<(PathBuf, FileHash), WriteCasFileError>
where
Buffer: AsRef<[u8]> + Send + 'static,
{
let file_hash = Sha512::digest(buffer.as_ref());
let file_path = self.cas_file_path(file_hash, executable);
let mode = executable.then_some(EXEC_MODE);
ensure_file(&file_path, buffer, mode).map_err(WriteCasFileError::WriteFile)?;
Ok((file_path, file_hash))
tokio::task::spawn_blocking(move || {
ensure_file(&file_path, buffer.as_ref(), mode).map_err(WriteCasFileError::WriteFile)?;
Ok((file_path, file_hash))
})
.await
.expect("no join error")
}
}

Expand Down
10 changes: 7 additions & 3 deletions crates/store-dir/src/index_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,20 @@ pub enum WriteTarballIndexFileError {

impl StoreDir {
/// Write a JSON file that indexes files in a tarball to the store directory.
pub fn write_index_file(
pub async fn write_index_file(
&self,
tarball_integrity: &Integrity,
index_content: &PackageFilesIndex,
) -> Result<(), WriteTarballIndexFileError> {
let file_path = self.index_file_path(tarball_integrity);
let index_content =
serde_json::to_string(&index_content).expect("convert a TarballIndex to JSON");
ensure_file(&file_path, index_content.as_bytes(), Some(0o666))
.map_err(WriteTarballIndexFileError::WriteFile)
tokio::task::spawn_blocking(move || {
ensure_file(&file_path, index_content.as_bytes(), Some(0o666))
.map_err(WriteTarballIndexFileError::WriteFile)
})
.await
.expect("no join error")
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/tarball/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pacquet-store-dir = { workspace = true }
base64 = { workspace = true }
dashmap = { workspace = true }
derive_more = { workspace = true }
futures-util = { workspace = true }
miette = { workspace = true }
pipe-trait = { workspace = true }
reqwest = { workspace = true }
Expand Down
140 changes: 74 additions & 66 deletions crates/tarball/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use pacquet_store_dir::{
};
use pipe_trait::Pipe;
use reqwest::Client;
use ssri::{Integrity, IntegrityChecker};
use ssri::Integrity;
use tar::Archive;
use tokio::sync::{Notify, RwLock};
use tracing::instrument;
Expand Down Expand Up @@ -103,11 +103,6 @@ fn decompress_gzip(gz_data: &[u8], unpacked_size: Option<usize>) -> Result<Vec<u
.map_err(TarballError::DecodeGzip)
}

#[instrument(skip(data), fields(data_len = data.len()))]
fn verify_checksum(data: &[u8], integrity: Integrity) -> Result<ssri::Algorithm, ssri::Error> {
integrity.pipe(IntegrityChecker::new).chain(data).result()
}

/// This subroutine downloads and extracts a tarball to the store directory.
///
/// It returns a CAS map of files in the tarball.
Expand Down Expand Up @@ -186,97 +181,110 @@ impl<'a> DownloadTarballToStore<'a> {
.map_err(network_error)?
.bytes()
.await
.map_err(network_error)?;
.map_err(network_error)?
.pipe(Arc::new);

tracing::info!(target: "pacquet::download", ?package_url, "Download completed");

// TODO: Cloning here is less than desirable, there are 2 possible solutions for this problem:
// 1. Use an Arc and convert this line to Arc::clone.
// 2. Replace ssri with base64 and serde magic (which supports Copy).
let package_integrity = package_integrity.clone();

#[derive(Debug, From)]
enum TaskError {
Checksum(ssri::Error),
Other(TarballError),
let package_integrity = package_integrity.clone().pipe(Arc::new);

{
let response = Arc::clone(&response);
let package_integrity = Arc::clone(&package_integrity);
tokio::task::spawn(async move { package_integrity.check(&*response) })
.await
.expect("no join error")
.map_err(|error| {
TarballError::Checksum(VerifyChecksumError {
url: package_url.to_string(),
error,
})
})?;
}
let cas_paths = tokio::task::spawn(async move {
verify_checksum(&response, package_integrity.clone()).map_err(TaskError::Checksum)?;

// TODO: move tarball extraction to its own function
// TODO: test it
// TODO: test the duplication of entries

let mut archive = decompress_gzip(&response, package_unpacked_size)
.map_err(TaskError::Other)?
.pipe(Cursor::new)
.pipe(Archive::new);

let entries = archive
.entries()
.map_err(TarballError::ReadTarballEntries)
.map_err(TaskError::Other)?
.filter(|entry| !entry.as_ref().unwrap().header().entry_type().is_dir());
// TODO: move tarball extraction to its own function
// TODO: test it
// TODO: test the duplication of entries

let ((_, Some(capacity)) | (capacity, None)) = entries.size_hint();
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(capacity);
let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(capacity) };

for entry in entries {
let mut entry = entry.unwrap();
let mut archive =
decompress_gzip(&response, package_unpacked_size)?.pipe(Cursor::new).pipe(Archive::new);

struct FileInfo {
pub cleaned_entry_path: OsString,
pub file_mode: u32,
pub file_size: Option<u64>,
pub buffer: Vec<u8>,
}
let entries = archive
.entries()
.map_err(TarballError::ReadTarballEntries)?
.map(|entry| entry.expect("access entry"))
.filter(|entry| !entry.header().entry_type().is_dir())
.map(|mut entry| {
let cleaned_entry_path = entry
.path()
.expect("get path")
.components()
.skip(1)
.collect::<PathBuf>()
.into_os_string();
let file_mode = entry.header().mode().expect("get mode"); // TODO: properly propagate this error
let file_is_executable = file_mode::is_all_exec(file_mode);

// Read the contents of the entry
let file_size = entry.header().size().ok();
let mut buffer = Vec::with_capacity(entry.size() as usize);
entry.read_to_end(&mut buffer).unwrap();
entry.read_to_end(&mut buffer).expect("read entry"); // TODO: properly propagate this error
FileInfo { cleaned_entry_path, file_mode, file_size, buffer }
})
.map(|info| async move {
let FileInfo { cleaned_entry_path, file_mode, file_size, buffer } = info;

let entry_path = entry.path().unwrap();
let cleaned_entry_path =
entry_path.components().skip(1).collect::<PathBuf>().into_os_string();
let file_is_executable = file_mode::is_all_exec(file_mode);
let (file_path, file_hash) = store_dir
.write_cas_file(&buffer, file_is_executable)
.write_cas_file(buffer, file_is_executable)
.await
.map_err(TarballError::WriteCasFile)?;

let tarball_index_key = cleaned_entry_path
let index_key = cleaned_entry_path
.to_str()
.expect("entry path must be valid UTF-8") // TODO: propagate this error, provide more information
.to_string(); // TODO: convert cleaned_entry_path to String too.

if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}

let checked_at = UNIX_EPOCH.elapsed().ok().map(|x| x.as_millis());
let file_size = entry.header().size().ok();
let file_integrity = format!("sha512-{}", BASE64_STD.encode(file_hash));
let file_attrs = PackageFileInfo {
let index_value = PackageFileInfo {
checked_at,
integrity: file_integrity,
mode: file_mode,
size: file_size,
};

if let Some(previous) = pkg_files_idx.files.insert(tarball_index_key, file_attrs) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}
}
Ok::<_, TarballError>(((cleaned_entry_path, file_path), (index_key, index_value)))
})
.map(tokio::task::spawn)
.pipe(futures_util::future::join_all)
.await;

store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
.map_err(TarballError::WriteTarballIndexFile)?;
let mut cas_paths = HashMap::<OsString, PathBuf>::with_capacity(entries.len());
let mut pkg_files_idx = PackageFilesIndex { files: HashMap::with_capacity(entries.len()) };
for entry in entries {
let ((cleaned_entry_path, file_path), (index_key, index_value)) =
entry.expect("no join error")?;

Ok(cas_paths)
})
.await
.expect("no join error")
.map_err(|error| match error {
TaskError::Checksum(error) => {
TarballError::Checksum(VerifyChecksumError { url: package_url.to_string(), error })
if let Some(previous) = cas_paths.insert(cleaned_entry_path, file_path) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}

if let Some(previous) = pkg_files_idx.files.insert(index_key, index_value) {
tracing::warn!(?previous, "Duplication detected. Old entry has been ejected");
}
TaskError::Other(error) => error,
})?;
}

store_dir
.write_index_file(&package_integrity, &pkg_files_idx)
.await
.map_err(TarballError::WriteTarballIndexFile)?;

tracing::info!(target: "pacquet::download", ?package_url, "Checksum verified");

Expand Down