From a6b94d7f55d1a157f12deeafdaa12168974e1789 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Wed, 5 Mar 2025 15:08:21 +0000 Subject: [PATCH 1/5] Create inodes from manifest on lookup and readdir Signed-off-by: Vlad Volodkin --- mountpoint-s3-client/src/object_client.rs | 3 +- mountpoint-s3/src/fs.rs | 1 + mountpoint-s3/src/lib.rs | 1 + mountpoint-s3/src/manifest.rs | 65 +++++++++++++ mountpoint-s3/src/superblock.rs | 65 ++++++++++++- mountpoint-s3/src/superblock/readdir.rs | 109 +++++++++++++++++++++- 6 files changed, 241 insertions(+), 3 deletions(-) create mode 100644 mountpoint-s3/src/manifest.rs diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 9108361ca..8ac78ea7f 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -739,7 +739,8 @@ pub enum RestoreStatus { /// See [Object](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html) in the *Amazon S3 /// API Reference* for more details. #[derive(Debug, Clone)] -#[non_exhaustive] +// TODO: builder pattern? return different type from readdir iter? +// #[non_exhaustive] pub struct ObjectInfo { /// Key for this object. pub key: String, diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index eb7ee4da5..c648ab75f 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -163,6 +163,7 @@ where let superblock_config = SuperblockConfig { cache_config: config.cache_config.clone(), s3_personality: config.s3_personality, + use_manifest: true, }; let superblock = Superblock::new(bucket, prefix, superblock_config); let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit)); diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index 1c6251fa2..9f3f7f0e1 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -7,6 +7,7 @@ pub mod data_cache; pub mod fs; pub mod fuse; pub mod logging; +mod manifest; pub mod mem_limiter; pub mod metrics; pub mod object; diff --git a/mountpoint-s3/src/manifest.rs b/mountpoint-s3/src/manifest.rs new file mode 100644 index 000000000..68dadaf80 --- /dev/null +++ b/mountpoint-s3/src/manifest.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +#[derive(Debug)] +pub enum ManifestEntry { + File { + full_key: String, + etag: String, + size: usize, + }, + Directory { + full_key: String, // let's assume it always ends with '/' + total_entries: usize, + }, +} + +impl ManifestEntry { + pub fn key(&self) -> &str { + match self { + ManifestEntry::Directory { full_key, .. } => full_key.as_str(), + ManifestEntry::File { full_key, .. } => full_key.as_str(), + } + } +} + +pub fn dummy_manifest() -> Arc> { + // contract: + // - all entries should start with '/' (may be simplified? but keeping for now) + // - all directory entries [apart from the root] should also end with '/' + // - file entries never end with '/' + // - directories are followed by files and subdirectories contained in them (matches lexicographical order?) + // - directory entries have a counter of total (calculated recursively) number of entries contained in them + Arc::new(vec![ + ManifestEntry::Directory { + full_key: "/".to_owned(), + total_entries: 7, + }, + ManifestEntry::Directory { + full_key: "/1.9.1/".to_owned(), + total_entries: 6, + }, + ManifestEntry::Directory { + full_key: "/1.9.1/arm64/".to_owned(), + total_entries: 2, + }, + ManifestEntry::File { + full_key: "/1.9.1/arm64/mount-s3-1.9.1-arm64.rpm".to_owned(), + etag: "\"f7183d9e02960286692ea6521665aa89\"".to_owned(), + size: 11844684, + }, + ManifestEntry::File { + full_key: "/1.9.1/checksum-1.9.1".to_owned(), + etag: "\"f423fc83d1fda1fdd2887c7e2122ad05\"".to_owned(), + size: 10, + }, + ManifestEntry::Directory { + full_key: "/1.9.1/x86_64/".to_owned(), + total_entries: 2, + }, + ManifestEntry::File { + full_key: "/1.9.1/x86_64/mount-s3-1.9.1-x86_64.deb".to_owned(), + etag: "\"e4930b1bfe7e10de29c863d1b69f444e\"".to_owned(), + size: 10944866, + }, + ]) +} diff --git a/mountpoint-s3/src/superblock.rs b/mountpoint-s3/src/superblock.rs index c6422ae0e..e2acb602d 100644 --- a/mountpoint-s3/src/superblock.rs +++ b/mountpoint-s3/src/superblock.rs @@ -39,6 +39,7 @@ use tracing::{debug, error, trace, warn}; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; use crate::fs::CacheConfig; use crate::logging; +use crate::manifest::{dummy_manifest, ManifestEntry}; use crate::prefix::Prefix; use crate::s3::S3Personality; use crate::sync::atomic::{AtomicU64, Ordering}; @@ -73,6 +74,7 @@ struct SuperblockInner { next_ino: AtomicU64, mount_time: OffsetDateTime, config: SuperblockConfig, + manifest: Option>>, } /// Configuration for superblock operations @@ -80,6 +82,7 @@ struct SuperblockInner { pub struct SuperblockConfig { pub cache_config: CacheConfig, pub s3_personality: S3Personality, + pub use_manifest: bool, } impl Superblock { @@ -96,6 +99,12 @@ impl Superblock { config.cache_config.negative_cache_ttl, ); + let manifest = if config.use_manifest { + Some(dummy_manifest()) + } else { + None + }; + let inner = SuperblockInner { bucket: bucket.to_owned(), prefix: prefix.clone(), @@ -104,6 +113,7 @@ impl Superblock { next_ino: AtomicU64::new(2), mount_time, config, + manifest, }; Self { inner: Arc::new(inner) } } @@ -602,7 +612,11 @@ impl SuperblockInner { let lookup = match lookup { Some(lookup) => lookup?, None => { - let remote = self.remote_lookup(client, parent_ino, name).await?; + let remote = if self.config.use_manifest { + self.manifest_lookup(parent_ino, name)? + } else { + self.remote_lookup(client, parent_ino, name).await? + }; self.update_from_remote(parent_ino, name, remote)? } }; @@ -658,6 +672,55 @@ impl SuperblockInner { lookup } + fn manifest_lookup(&self, parent_ino: InodeNo, name: &str) -> Result, InodeError> { + let parent = self.get(parent_ino)?; + if parent.kind() != InodeKind::Directory { + return Err(InodeError::NotADirectory(parent.err())); + } + + let mut full_path = parent.full_key().to_owned(); + assert!(full_path.is_empty() || full_path.ends_with('/')); + full_path.push_str(name); + + let mut full_path_suffixed = full_path.clone(); + full_path_suffixed.push('/'); + + // this should be a bin search through a file stored on disk + fn search_manifest_entry<'a>(manifest: &'a [ManifestEntry], full_path: &str) -> Option<&'a ManifestEntry> { + manifest + .binary_search_by(|manifest_entry| manifest_entry.key().cmp(full_path)) + .map_or(None, |index| Some(&manifest[index])) + } + + // search for file entry + let mut manifest_entry = search_manifest_entry(self.manifest.as_ref().unwrap(), &full_path); + + // search for dir entry + if manifest_entry.is_none() { + manifest_entry = search_manifest_entry(self.manifest.as_ref().unwrap(), &full_path_suffixed); + } + + // return an inode or error + match manifest_entry { + Some(ManifestEntry::File { etag, size, .. }) => Ok(Some(RemoteLookup { + kind: InodeKind::File, + stat: InodeStat::for_file( + *size, + OffsetDateTime::now_utc(), + Some(etag.clone()), + None, + None, + self.config.cache_config.file_ttl, + ), + })), + Some(ManifestEntry::Directory { .. }) => Ok(Some(RemoteLookup { + kind: InodeKind::Directory, + stat: InodeStat::for_directory(OffsetDateTime::now_utc(), self.config.cache_config.dir_ttl), + })), + None => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())), + } + } + /// Lookup an inode in the parent directory with the given name /// on the remote client. async fn remote_lookup( diff --git a/mountpoint-s3/src/superblock/readdir.rs b/mountpoint-s3/src/superblock/readdir.rs index 0a7a4ca07..609d37c88 100644 --- a/mountpoint-s3/src/superblock/readdir.rs +++ b/mountpoint-s3/src/superblock/readdir.rs @@ -44,6 +44,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; +use crate::manifest::ManifestEntry; use mountpoint_s3_client::types::ObjectInfo; use mountpoint_s3_client::ObjectClient; use tracing::{error, trace, warn}; @@ -99,7 +100,14 @@ impl ReaddirHandle { } }; - let iter = if inner.config.s3_personality.is_list_ordered() { + let iter = if inner.config.use_manifest { + trace!("using manifest readdir iter"); + ReaddirIter::manifest( + inner.manifest.clone().expect("manifest should be set"), + &inner.bucket, + &full_path, + )? + } else if inner.config.s3_personality.is_list_ordered() { ReaddirIter::ordered(&inner.bucket, &full_path, page_size, local_entries.into()) } else { ReaddirIter::unordered(&inner.bucket, &full_path, page_size, local_entries.into()) @@ -290,6 +298,7 @@ impl Ord for ReaddirEntry { enum ReaddirIter { Ordered(ordered::ReaddirIter), Unordered(unordered::ReaddirIter), + Manifest(manifest::ReaddirIter), } impl ReaddirIter { @@ -301,10 +310,15 @@ impl ReaddirIter { Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries)) } + fn manifest(manifest: Arc>, bucket: &str, full_path: &str) -> Result { + Ok(Self::Manifest(manifest::ReaddirIter::new(manifest, bucket, full_path)?)) + } + async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { match self { Self::Ordered(iter) => iter.next(client).await, Self::Unordered(iter) => iter.next(client).await, + Self::Manifest(iter) => iter.next(), } } } @@ -574,3 +588,96 @@ mod unordered { } } } + +mod manifest { + use time::OffsetDateTime; + + use super::*; + + #[derive(Debug)] + pub struct ReaddirIter { + manifest: Arc>, + bucket: String, + full_path: String, + idx: usize, + end_idx: usize, + } + + impl ReaddirIter { + /// Locate the index of the directory in the manifest and create an iterator + pub(super) fn new( + manifest: Arc>, + bucket: &str, + full_path: &str, + ) -> Result { + let full_path = if full_path.starts_with("/") { + full_path.to_owned() + } else { + format!("/{full_path}") + }; + trace!("searching for an entry in manifest: {}", full_path); + let idx = manifest + .binary_search_by(|manifest_entry| manifest_entry.key().cmp(&full_path)) + .inspect_err(|_| error!("entry not found in the manifest: {}", full_path)) + .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: add InodeError::BadManifest + trace!("found an entry in manifest: {}", full_path); + let end_idx = match manifest[idx] { + ManifestEntry::Directory { total_entries, .. } => Ok(idx + total_entries), + _ => { + error!("manifest entry is not a directory: {}", full_path); + Err(InodeError::InodeDoesNotExist(0)) // TODO: add InodeError::BadManifest + } + }?; + trace!("initializing readdir iter with indices: {}, {}", idx + 1, end_idx); + + Ok(Self { + manifest, + bucket: bucket.to_owned(), + full_path, + idx: idx + 1, // skip the directory entry itself + end_idx, + }) + } + + /// Iterate over the manifest entries, skipping subdirectories + pub(super) fn next(&mut self) -> Result, InodeError> { + if self.idx >= self.end_idx { + trace!("readdir iter exhausted: {}, {}", self.idx, self.end_idx); + return Ok(None); + } + + let readdir_entry = match &self.manifest[self.idx] { + ManifestEntry::File { full_key, etag, size } => { + trace!("found a file entry in the manifest: {}", full_key); + self.idx = self.idx + 1; + let name = full_key[self.full_path.len()..full_key.len()].to_owned(); + let object_info = ObjectInfo { + key: full_key.clone(), + size: *size as u64, + last_modified: OffsetDateTime::now_utc(), + storage_class: None, + restore_status: None, + etag: etag.clone(), + checksum_algorithms: Default::default(), // TODO: what are the implications? + }; + ReaddirEntry::RemoteObject { name, object_info } + } + ManifestEntry::Directory { + full_key, + total_entries, + } => { + trace!( + "found a directory entry in the manifest: {}, {}", + full_key, + total_entries + ); + self.idx = self.idx + total_entries; + let name = full_key[self.full_path.len()..full_key.len() - 1].to_owned(); + ReaddirEntry::RemotePrefix { name } + } + }; + + Ok(Some(readdir_entry)) + } + } +} From 9532a1ab07f8c984944ba284629c39c2166a1a63 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Tue, 11 Mar 2025 16:17:14 +0000 Subject: [PATCH 2/5] Rebase on main Signed-off-by: Vlad Volodkin --- mountpoint-s3/src/superblock.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/mountpoint-s3/src/superblock.rs b/mountpoint-s3/src/superblock.rs index e2acb602d..d17f69761 100644 --- a/mountpoint-s3/src/superblock.rs +++ b/mountpoint-s3/src/superblock.rs @@ -613,7 +613,9 @@ impl SuperblockInner { Some(lookup) => lookup?, None => { let remote = if self.config.use_manifest { - self.manifest_lookup(parent_ino, name)? + let parent = self.get(parent_ino)?; + let parent_full_path = self.full_key_for_inode(&parent); + self.manifest_lookup(parent, parent_full_path, name)? } else { self.remote_lookup(client, parent_ino, name).await? }; @@ -672,13 +674,17 @@ impl SuperblockInner { lookup } - fn manifest_lookup(&self, parent_ino: InodeNo, name: &str) -> Result, InodeError> { - let parent = self.get(parent_ino)?; + fn manifest_lookup( + &self, + parent: Inode, + parent_full_path: String, + name: &str, + ) -> Result, InodeError> { if parent.kind() != InodeKind::Directory { return Err(InodeError::NotADirectory(parent.err())); } - let mut full_path = parent.full_key().to_owned(); + let mut full_path = parent_full_path; assert!(full_path.is_empty() || full_path.ends_with('/')); full_path.push_str(name); From 21471d9a76edd24745cff0554ba5da2e8febd1b4 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Tue, 11 Mar 2025 18:26:01 +0000 Subject: [PATCH 3/5] Move manifest implementation out of superblock Signed-off-by: Vlad Volodkin --- mountpoint-s3/src/manifest.rs | 150 ++++++++++++++++++++++-- mountpoint-s3/src/superblock.rs | 71 ++++------- mountpoint-s3/src/superblock/readdir.rs | 96 +++++---------- 3 files changed, 191 insertions(+), 126 deletions(-) diff --git a/mountpoint-s3/src/manifest.rs b/mountpoint-s3/src/manifest.rs index 68dadaf80..ed0e0ce5a 100644 --- a/mountpoint-s3/src/manifest.rs +++ b/mountpoint-s3/src/manifest.rs @@ -1,6 +1,11 @@ use std::sync::Arc; -#[derive(Debug)] +use tracing::{error, trace}; + +use crate::superblock::{Inode, InodeError, InodeKind}; + +/// An entry returned by manifest_lookup() and ManifestIter::next() +#[derive(Debug, Clone)] pub enum ManifestEntry { File { full_key: String, @@ -22,42 +27,167 @@ impl ManifestEntry { } } +/// Manifest of all available objects in the bucket +#[derive(Debug, Clone)] +pub struct Manifest { + inner: Arc>, +} + +impl Manifest { + pub fn new() -> Self { + Self { + inner: dummy_manifest(), + } + } + + /// Lookup an entry in the manifest, the result may be a file or a directory + pub fn manifest_lookup( + &self, + parent: Inode, + parent_full_path: String, + name: &str, + ) -> Result { + trace!("using manifest to lookup {} in {}", name, parent_full_path); + + if parent.kind() != InodeKind::Directory { + return Err(InodeError::NotADirectory(parent.err())); + } + + let mut full_path = parent_full_path; + full_path.push_str(name); + + let mut full_path_suffixed = full_path.clone(); + full_path_suffixed.push('/'); + + // this should be a bin search through a file stored on disk + fn search_manifest_entry<'a>(manifest: &'a [ManifestEntry], full_path: &str) -> Option<&'a ManifestEntry> { + trace!("searching for {}", full_path); + manifest + .binary_search_by(|manifest_entry| manifest_entry.key().cmp(full_path)) + .map_or(None, |index| Some(&manifest[index])) + } + + // search for file entry + let mut manifest_entry = search_manifest_entry(&self.inner, &full_path); + + // search for dir entry + if manifest_entry.is_none() { + manifest_entry = search_manifest_entry(&self.inner, &full_path_suffixed); + } + + // return an inode or error + match manifest_entry { + Some(manifest_entry) => Ok(manifest_entry.clone()), + None => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())), + } + } + + /// Create an iterator over directory's direct children + pub fn iter(&self, bucket: &str, directory_full_path: &str) -> Result { + ManifestIter::new(self.clone(), bucket, directory_full_path) + } +} + +#[derive(Debug)] +pub struct ManifestIter { + manifest: Manifest, + bucket: String, + idx: usize, + end_idx: usize, +} + +impl ManifestIter { + /// Locate the index of the directory in the manifest and create an iterator + fn new(manifest: Manifest, bucket: &str, full_path: &str) -> Result { + let full_path = full_path.to_owned(); + trace!("searching for an entry in manifest: {}", full_path); + let idx = manifest + .inner + .binary_search_by(|manifest_entry| manifest_entry.key().cmp(&full_path)) + .inspect_err(|_| error!("entry not found in the manifest: {}", full_path)) + .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: add InodeError::BadManifest + trace!("found an entry in manifest: {}", full_path); + let end_idx = match manifest.inner[idx] { + ManifestEntry::Directory { total_entries, .. } => Ok(idx + total_entries), + _ => { + error!("manifest entry is not a directory: {}", full_path); + Err(InodeError::InodeDoesNotExist(0)) // TODO: add InodeError::BadManifest + } + }?; + trace!("initializing readdir iter with indices: {}, {}", idx + 1, end_idx); + + Ok(Self { + manifest, + bucket: bucket.to_owned(), + idx: idx + 1, // skip the directory entry itself + end_idx, + }) + } + + /// Iterate over the manifest entries, skipping subdirectories + pub fn next(&mut self) -> Result, InodeError> { + if self.idx >= self.end_idx { + trace!("readdir iter exhausted: {}, {}", self.idx, self.end_idx); + return Ok(None); + } + + match &self.manifest.inner[self.idx] { + entry @ ManifestEntry::File { full_key, .. } => { + trace!("found a file entry in the manifest: {}", full_key); + self.idx = self.idx + 1; + Ok(Some(entry.clone())) + } + entry @ ManifestEntry::Directory { + full_key, + total_entries, + } => { + trace!( + "found a directory entry in the manifest: {}, {}", + full_key, + total_entries + ); + self.idx = self.idx + total_entries; + Ok(Some(entry.clone())) + } + } + } +} + pub fn dummy_manifest() -> Arc> { // contract: - // - all entries should start with '/' (may be simplified? but keeping for now) - // - all directory entries [apart from the root] should also end with '/' + // - all directory entries [apart from the root] should end with '/' // - file entries never end with '/' // - directories are followed by files and subdirectories contained in them (matches lexicographical order?) // - directory entries have a counter of total (calculated recursively) number of entries contained in them Arc::new(vec![ ManifestEntry::Directory { - full_key: "/".to_owned(), + full_key: "".to_owned(), total_entries: 7, }, ManifestEntry::Directory { - full_key: "/1.9.1/".to_owned(), + full_key: "1.9.1/".to_owned(), total_entries: 6, }, ManifestEntry::Directory { - full_key: "/1.9.1/arm64/".to_owned(), + full_key: "1.9.1/arm64/".to_owned(), total_entries: 2, }, ManifestEntry::File { - full_key: "/1.9.1/arm64/mount-s3-1.9.1-arm64.rpm".to_owned(), + full_key: "1.9.1/arm64/mount-s3-1.9.1-arm64.rpm".to_owned(), etag: "\"f7183d9e02960286692ea6521665aa89\"".to_owned(), size: 11844684, }, ManifestEntry::File { - full_key: "/1.9.1/checksum-1.9.1".to_owned(), + full_key: "1.9.1/checksum-1.9.1".to_owned(), etag: "\"f423fc83d1fda1fdd2887c7e2122ad05\"".to_owned(), size: 10, }, ManifestEntry::Directory { - full_key: "/1.9.1/x86_64/".to_owned(), + full_key: "1.9.1/x86_64/".to_owned(), total_entries: 2, }, ManifestEntry::File { - full_key: "/1.9.1/x86_64/mount-s3-1.9.1-x86_64.deb".to_owned(), + full_key: "1.9.1/x86_64/mount-s3-1.9.1-x86_64.deb".to_owned(), etag: "\"e4930b1bfe7e10de29c863d1b69f444e\"".to_owned(), size: 10944866, }, diff --git a/mountpoint-s3/src/superblock.rs b/mountpoint-s3/src/superblock.rs index d17f69761..658cbdb1f 100644 --- a/mountpoint-s3/src/superblock.rs +++ b/mountpoint-s3/src/superblock.rs @@ -38,12 +38,12 @@ use tracing::{debug, error, trace, warn}; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; use crate::fs::CacheConfig; -use crate::logging; -use crate::manifest::{dummy_manifest, ManifestEntry}; +use crate::manifest::{Manifest, ManifestEntry}; use crate::prefix::Prefix; use crate::s3::S3Personality; use crate::sync::atomic::{AtomicU64, Ordering}; use crate::sync::{Arc, RwLock}; +use crate::{logging, manifest}; mod expiry; use expiry::Expiry; @@ -74,7 +74,7 @@ struct SuperblockInner { next_ino: AtomicU64, mount_time: OffsetDateTime, config: SuperblockConfig, - manifest: Option>>, + manifest: Option, } /// Configuration for superblock operations @@ -100,7 +100,7 @@ impl Superblock { ); let manifest = if config.use_manifest { - Some(dummy_manifest()) + Some(Manifest::new()) } else { None }; @@ -612,10 +612,8 @@ impl SuperblockInner { let lookup = match lookup { Some(lookup) => lookup?, None => { - let remote = if self.config.use_manifest { - let parent = self.get(parent_ino)?; - let parent_full_path = self.full_key_for_inode(&parent); - self.manifest_lookup(parent, parent_full_path, name)? + let remote = if let Some(manifest) = &self.manifest { + self.manifest_lookup(manifest, parent_ino, name)? } else { self.remote_lookup(client, parent_ino, name).await? }; @@ -674,57 +672,34 @@ impl SuperblockInner { lookup } + /// Lookup in the [Manifest] and convert the entry to [RemoteLookup] fn manifest_lookup( &self, - parent: Inode, - parent_full_path: String, + manifest: &Manifest, + parent_ino: InodeNo, name: &str, ) -> Result, InodeError> { - if parent.kind() != InodeKind::Directory { - return Err(InodeError::NotADirectory(parent.err())); - } - - let mut full_path = parent_full_path; - assert!(full_path.is_empty() || full_path.ends_with('/')); - full_path.push_str(name); - - let mut full_path_suffixed = full_path.clone(); - full_path_suffixed.push('/'); - - // this should be a bin search through a file stored on disk - fn search_manifest_entry<'a>(manifest: &'a [ManifestEntry], full_path: &str) -> Option<&'a ManifestEntry> { - manifest - .binary_search_by(|manifest_entry| manifest_entry.key().cmp(full_path)) - .map_or(None, |index| Some(&manifest[index])) - } - - // search for file entry - let mut manifest_entry = search_manifest_entry(self.manifest.as_ref().unwrap(), &full_path); - - // search for dir entry - if manifest_entry.is_none() { - manifest_entry = search_manifest_entry(self.manifest.as_ref().unwrap(), &full_path_suffixed); - } - - // return an inode or error - match manifest_entry { - Some(ManifestEntry::File { etag, size, .. }) => Ok(Some(RemoteLookup { + let parent = self.get(parent_ino)?; + let parent_full_path = self.full_key_for_inode(&parent); + let mount_time = OffsetDateTime::now_utc(); // todo: mount time + let remote_lookup = match manifest.manifest_lookup(parent, parent_full_path, name)? { + ManifestEntry::File { etag, size, .. } => RemoteLookup { kind: InodeKind::File, stat: InodeStat::for_file( - *size, - OffsetDateTime::now_utc(), - Some(etag.clone()), + size, + mount_time, + Some(etag), None, None, self.config.cache_config.file_ttl, ), - })), - Some(ManifestEntry::Directory { .. }) => Ok(Some(RemoteLookup { + }, + ManifestEntry::Directory { .. } => RemoteLookup { kind: InodeKind::Directory, - stat: InodeStat::for_directory(OffsetDateTime::now_utc(), self.config.cache_config.dir_ttl), - })), - None => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())), - } + stat: InodeStat::for_directory(mount_time, self.config.cache_config.dir_ttl), + }, + }; + Ok(Some(remote_lookup)) } /// Lookup an inode in the parent directory with the given name diff --git a/mountpoint-s3/src/superblock/readdir.rs b/mountpoint-s3/src/superblock/readdir.rs index 609d37c88..892ac40aa 100644 --- a/mountpoint-s3/src/superblock/readdir.rs +++ b/mountpoint-s3/src/superblock/readdir.rs @@ -44,7 +44,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; -use crate::manifest::ManifestEntry; +use crate::manifest::{Manifest, ManifestEntry}; use mountpoint_s3_client::types::ObjectInfo; use mountpoint_s3_client::ObjectClient; use tracing::{error, trace, warn}; @@ -310,8 +310,11 @@ impl ReaddirIter { Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries)) } - fn manifest(manifest: Arc>, bucket: &str, full_path: &str) -> Result { - Ok(Self::Manifest(manifest::ReaddirIter::new(manifest, bucket, full_path)?)) + fn manifest(manifest: Manifest, bucket: &str, full_path: &str) -> Result { + Ok(Self::Manifest(manifest::ReaddirIter::new( + manifest.iter(bucket, full_path)?, + full_path.len(), + ))) } async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { @@ -592,92 +595,49 @@ mod unordered { mod manifest { use time::OffsetDateTime; + use crate::manifest::ManifestIter; + use super::*; + /// Adaptor for [ManifestIter], converts [ManifestEntry] to [ReaddirEntry] #[derive(Debug)] pub struct ReaddirIter { - manifest: Arc>, - bucket: String, - full_path: String, - idx: usize, - end_idx: usize, + manifest_iter: ManifestIter, + full_path_len: usize, } impl ReaddirIter { - /// Locate the index of the directory in the manifest and create an iterator - pub(super) fn new( - manifest: Arc>, - bucket: &str, - full_path: &str, - ) -> Result { - let full_path = if full_path.starts_with("/") { - full_path.to_owned() - } else { - format!("/{full_path}") - }; - trace!("searching for an entry in manifest: {}", full_path); - let idx = manifest - .binary_search_by(|manifest_entry| manifest_entry.key().cmp(&full_path)) - .inspect_err(|_| error!("entry not found in the manifest: {}", full_path)) - .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: add InodeError::BadManifest - trace!("found an entry in manifest: {}", full_path); - let end_idx = match manifest[idx] { - ManifestEntry::Directory { total_entries, .. } => Ok(idx + total_entries), - _ => { - error!("manifest entry is not a directory: {}", full_path); - Err(InodeError::InodeDoesNotExist(0)) // TODO: add InodeError::BadManifest - } - }?; - trace!("initializing readdir iter with indices: {}, {}", idx + 1, end_idx); - - Ok(Self { - manifest, - bucket: bucket.to_owned(), - full_path, - idx: idx + 1, // skip the directory entry itself - end_idx, - }) + pub(super) fn new(manifest_iter: ManifestIter, full_path_len: usize) -> Self { + Self { + manifest_iter, + full_path_len, + } } - /// Iterate over the manifest entries, skipping subdirectories + /// Return the next [ReaddirEntry] for the directory stream. If the stream is finished, returns + /// `Ok(None)`. pub(super) fn next(&mut self) -> Result, InodeError> { - if self.idx >= self.end_idx { - trace!("readdir iter exhausted: {}, {}", self.idx, self.end_idx); - return Ok(None); - } - - let readdir_entry = match &self.manifest[self.idx] { - ManifestEntry::File { full_key, etag, size } => { - trace!("found a file entry in the manifest: {}", full_key); - self.idx = self.idx + 1; - let name = full_key[self.full_path.len()..full_key.len()].to_owned(); + let readdir_entry = match self.manifest_iter.next()? { + Some(ManifestEntry::File { full_key, etag, size }) => { + let name = full_key[self.full_path_len..].to_owned(); let object_info = ObjectInfo { key: full_key.clone(), - size: *size as u64, + size: size as u64, last_modified: OffsetDateTime::now_utc(), storage_class: None, restore_status: None, etag: etag.clone(), checksum_algorithms: Default::default(), // TODO: what are the implications? }; - ReaddirEntry::RemoteObject { name, object_info } + Some(ReaddirEntry::RemoteObject { name, object_info }) } - ManifestEntry::Directory { - full_key, - total_entries, - } => { - trace!( - "found a directory entry in the manifest: {}, {}", - full_key, - total_entries - ); - self.idx = self.idx + total_entries; - let name = full_key[self.full_path.len()..full_key.len() - 1].to_owned(); - ReaddirEntry::RemotePrefix { name } + Some(ManifestEntry::Directory { full_key, .. }) => { + let name = full_key[self.full_path_len..full_key.len() - 1].to_owned(); + Some(ReaddirEntry::RemotePrefix { name }) } + None => None, }; - - Ok(Some(readdir_entry)) + Ok(readdir_entry) } } } From 57a1ec5de10bd9aaca23222c5f01bd48a0aef550 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Wed, 12 Mar 2025 16:40:51 +0000 Subject: [PATCH 4/5] Lookup in sqllite database Signed-off-by: Vlad Volodkin --- Cargo.lock | 47 +++++++++ mountpoint-s3/Cargo.toml | 1 + mountpoint-s3/src/manifest.rs | 174 +++++++++++--------------------- mountpoint-s3/src/superblock.rs | 2 +- 4 files changed, 110 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6e9acfc39..ec2bf737d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1548,6 +1548,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.3.0" @@ -1885,6 +1897,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -2433,6 +2454,17 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "libsqlite3-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb8270bb4060bd76c6e96f20c52d80620f1d82a3470885694e41e0f81ef6fe7" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.21" @@ -2603,6 +2635,7 @@ dependencies = [ "rand", "rand_chacha", "regex", + "rusqlite", "serde", "serde_json", "serial_test", @@ -3365,6 +3398,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rusqlite" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e34486da88d8e051c7c0e23c3f15fd806ea8546260aa2fec247e97242ec143" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.24" diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index f2e95652d..2af4e36d2 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -46,6 +46,7 @@ time = { version = "0.3.37", features = ["macros", "formatting"] } tracing = { version = "0.1.41", features = ["log"] } tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +rusqlite = { version = "0.34.0", features = ["bundled"] } [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.17.0", default-features = false } diff --git a/mountpoint-s3/src/manifest.rs b/mountpoint-s3/src/manifest.rs index ed0e0ce5a..48a2cde63 100644 --- a/mountpoint-s3/src/manifest.rs +++ b/mountpoint-s3/src/manifest.rs @@ -1,5 +1,7 @@ -use std::sync::Arc; +use crate::sync::{Arc, Mutex}; +use std::time::Instant; +use rusqlite::Connection; use tracing::{error, trace}; use crate::superblock::{Inode, InodeError, InodeKind}; @@ -14,7 +16,6 @@ pub enum ManifestEntry { }, Directory { full_key: String, // let's assume it always ends with '/' - total_entries: usize, }, } @@ -30,14 +31,16 @@ impl ManifestEntry { /// Manifest of all available objects in the bucket #[derive(Debug, Clone)] pub struct Manifest { - inner: Arc>, + conn: Arc>, } impl Manifest { - pub fn new() -> Self { - Self { - inner: dummy_manifest(), - } + pub fn new() -> Result { + let db_path = "./s3_objects.db3"; + let conn = Connection::open(db_path)?; // TODO: ManifestError::DbError + Ok(Self { + conn: Arc::new(Mutex::new(conn)), // TODO: no mutex? serialized mode of sqlite? + }) } /// Lookup an entry in the manifest, the result may be a file or a directory @@ -56,24 +59,13 @@ impl Manifest { let mut full_path = parent_full_path; full_path.push_str(name); - let mut full_path_suffixed = full_path.clone(); - full_path_suffixed.push('/'); - - // this should be a bin search through a file stored on disk - fn search_manifest_entry<'a>(manifest: &'a [ManifestEntry], full_path: &str) -> Option<&'a ManifestEntry> { - trace!("searching for {}", full_path); - manifest - .binary_search_by(|manifest_entry| manifest_entry.key().cmp(full_path)) - .map_or(None, |index| Some(&manifest[index])) - } - - // search for file entry - let mut manifest_entry = search_manifest_entry(&self.inner, &full_path); - - // search for dir entry - if manifest_entry.is_none() { - manifest_entry = search_manifest_entry(&self.inner, &full_path_suffixed); - } + // search for an entry + let start = Instant::now(); + let manifest_entry = self + .search_manifest_entry(&full_path) + .inspect_err(|err| error!("failed to query the database: {}", err)) + .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: ManifestError::DbError + trace!("db search completed in {:?}", start.elapsed()); // return an inode or error match manifest_entry { @@ -86,110 +78,66 @@ impl Manifest { pub fn iter(&self, bucket: &str, directory_full_path: &str) -> Result { ManifestIter::new(self.clone(), bucket, directory_full_path) } + + fn search_manifest_entry(&self, full_path: &str) -> Result, rusqlite::Error> { + let dir_search_start = format!("{full_path}/"); + let dir_search_end = format!("{full_path}0"); // any child of [full_path] directory will have a key which is "less" than this + let file_search = full_path; + + let query = "SELECT key, etag, size FROM s3_objects where (key >= ?1 and key < ?2) or key = ?3 LIMIT 1"; + let conn = self.conn.lock().expect("lock must succeed"); + let mut stmt = conn.prepare(query)?; + let manifest_entry = stmt + .query_map((dir_search_start, dir_search_end, file_search), |row| { + let found_key: String = row.get(0)?; + trace!( + "found entry in the manifest: {}, searched for: {}", + found_key, + full_path + ); + + let entry = if found_key == full_path { + // exact match means this is a file + ManifestEntry::File { + full_key: found_key, + etag: row.get(1)?, + size: row.get(2)?, + } + } else if found_key.starts_with(full_path) { + // partial match means this is a directory + ManifestEntry::Directory { full_key: found_key } + } else { + panic!("got non-matching row: {}, searched: {}", found_key, full_path); + }; + + Ok(entry) + })? + .next(); + + manifest_entry.map_or(Ok(None), |v| v.map(Some)) + } } #[derive(Debug)] pub struct ManifestIter { manifest: Manifest, - bucket: String, - idx: usize, - end_idx: usize, + search_from_key: String, } impl ManifestIter { /// Locate the index of the directory in the manifest and create an iterator - fn new(manifest: Manifest, bucket: &str, full_path: &str) -> Result { - let full_path = full_path.to_owned(); - trace!("searching for an entry in manifest: {}", full_path); - let idx = manifest - .inner - .binary_search_by(|manifest_entry| manifest_entry.key().cmp(&full_path)) - .inspect_err(|_| error!("entry not found in the manifest: {}", full_path)) - .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: add InodeError::BadManifest - trace!("found an entry in manifest: {}", full_path); - let end_idx = match manifest.inner[idx] { - ManifestEntry::Directory { total_entries, .. } => Ok(idx + total_entries), - _ => { - error!("manifest entry is not a directory: {}", full_path); - Err(InodeError::InodeDoesNotExist(0)) // TODO: add InodeError::BadManifest - } - }?; - trace!("initializing readdir iter with indices: {}, {}", idx + 1, end_idx); + fn new(manifest: Manifest, bucket: &str, full_key: &str) -> Result { + let full_key = full_key.to_owned(); + trace!("searching for an entry in the manifest starting with: {}", full_key); Ok(Self { manifest, - bucket: bucket.to_owned(), - idx: idx + 1, // skip the directory entry itself - end_idx, + search_from_key: full_key, }) } /// Iterate over the manifest entries, skipping subdirectories pub fn next(&mut self) -> Result, InodeError> { - if self.idx >= self.end_idx { - trace!("readdir iter exhausted: {}, {}", self.idx, self.end_idx); - return Ok(None); - } - - match &self.manifest.inner[self.idx] { - entry @ ManifestEntry::File { full_key, .. } => { - trace!("found a file entry in the manifest: {}", full_key); - self.idx = self.idx + 1; - Ok(Some(entry.clone())) - } - entry @ ManifestEntry::Directory { - full_key, - total_entries, - } => { - trace!( - "found a directory entry in the manifest: {}, {}", - full_key, - total_entries - ); - self.idx = self.idx + total_entries; - Ok(Some(entry.clone())) - } - } + Ok(None) } } - -pub fn dummy_manifest() -> Arc> { - // contract: - // - all directory entries [apart from the root] should end with '/' - // - file entries never end with '/' - // - directories are followed by files and subdirectories contained in them (matches lexicographical order?) - // - directory entries have a counter of total (calculated recursively) number of entries contained in them - Arc::new(vec![ - ManifestEntry::Directory { - full_key: "".to_owned(), - total_entries: 7, - }, - ManifestEntry::Directory { - full_key: "1.9.1/".to_owned(), - total_entries: 6, - }, - ManifestEntry::Directory { - full_key: "1.9.1/arm64/".to_owned(), - total_entries: 2, - }, - ManifestEntry::File { - full_key: "1.9.1/arm64/mount-s3-1.9.1-arm64.rpm".to_owned(), - etag: "\"f7183d9e02960286692ea6521665aa89\"".to_owned(), - size: 11844684, - }, - ManifestEntry::File { - full_key: "1.9.1/checksum-1.9.1".to_owned(), - etag: "\"f423fc83d1fda1fdd2887c7e2122ad05\"".to_owned(), - size: 10, - }, - ManifestEntry::Directory { - full_key: "1.9.1/x86_64/".to_owned(), - total_entries: 2, - }, - ManifestEntry::File { - full_key: "1.9.1/x86_64/mount-s3-1.9.1-x86_64.deb".to_owned(), - etag: "\"e4930b1bfe7e10de29c863d1b69f444e\"".to_owned(), - size: 10944866, - }, - ]) -} diff --git a/mountpoint-s3/src/superblock.rs b/mountpoint-s3/src/superblock.rs index 658cbdb1f..393f8792e 100644 --- a/mountpoint-s3/src/superblock.rs +++ b/mountpoint-s3/src/superblock.rs @@ -100,7 +100,7 @@ impl Superblock { ); let manifest = if config.use_manifest { - Some(Manifest::new()) + Some(Manifest::new().expect("manifest must be created")) } else { None }; From 81b55caf4279cc0b12c55ec1a8b5ed253819992c Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Thu, 13 Mar 2025 16:23:58 +0000 Subject: [PATCH 5/5] Readdir via sqlite database Signed-off-by: Vlad Volodkin --- mountpoint-s3/src/manifest.rs | 230 +++++++++++++++++++----- mountpoint-s3/src/superblock.rs | 2 +- mountpoint-s3/src/superblock/readdir.rs | 4 +- 3 files changed, 188 insertions(+), 48 deletions(-) diff --git a/mountpoint-s3/src/manifest.rs b/mountpoint-s3/src/manifest.rs index 48a2cde63..6df5c19da 100644 --- a/mountpoint-s3/src/manifest.rs +++ b/mountpoint-s3/src/manifest.rs @@ -1,4 +1,5 @@ use crate::sync::{Arc, Mutex}; +use std::collections::VecDeque; use std::time::Instant; use rusqlite::Connection; @@ -20,27 +21,29 @@ pub enum ManifestEntry { } impl ManifestEntry { - pub fn key(&self) -> &str { - match self { - ManifestEntry::Directory { full_key, .. } => full_key.as_str(), - ManifestEntry::File { full_key, .. } => full_key.as_str(), + fn file(db_entry: DbEntry) -> Self { + ManifestEntry::File { + full_key: db_entry.full_key, + etag: db_entry.etag, + size: db_entry.size, } } + + fn directory(full_key: String) -> Self { + ManifestEntry::Directory { full_key } + } } /// Manifest of all available objects in the bucket -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Manifest { - conn: Arc>, + db: Db, } impl Manifest { pub fn new() -> Result { - let db_path = "./s3_objects.db3"; - let conn = Connection::open(db_path)?; // TODO: ManifestError::DbError - Ok(Self { - conn: Arc::new(Mutex::new(conn)), // TODO: no mutex? serialized mode of sqlite? - }) + let db = Db::new()?; + Ok(Self { db }) } /// Lookup an entry in the manifest, the result may be a file or a directory @@ -65,7 +68,7 @@ impl Manifest { .search_manifest_entry(&full_path) .inspect_err(|err| error!("failed to query the database: {}", err)) .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: ManifestError::DbError - trace!("db search completed in {:?}", start.elapsed()); + trace!("lookup db search completed in {:?}", start.elapsed()); // return an inode or error match manifest_entry { @@ -76,68 +79,205 @@ impl Manifest { /// Create an iterator over directory's direct children pub fn iter(&self, bucket: &str, directory_full_path: &str) -> Result { - ManifestIter::new(self.clone(), bucket, directory_full_path) + ManifestIter::new(self.db.clone(), bucket, directory_full_path) } + /// Search an entry in the manifest that matches the path, a partial match is expected for a directory fn search_manifest_entry(&self, full_path: &str) -> Result, rusqlite::Error> { let dir_search_start = format!("{full_path}/"); let dir_search_end = format!("{full_path}0"); // any child of [full_path] directory will have a key which is "less" than this let file_search = full_path; - let query = "SELECT key, etag, size FROM s3_objects where (key >= ?1 and key < ?2) or key = ?3 LIMIT 1"; + let db_entry = self + .db + .select_entry_or_child(&dir_search_start, &dir_search_end, file_search)?; + let Some(db_entry) = db_entry else { + return Ok(None); + }; + + trace!( + "found entry in the manifest: {}, searched for: {}", + db_entry.full_key, + full_path + ); + + let entry = if db_entry.full_key == full_path { + // exact match means this is a file + ManifestEntry::file(db_entry) + } else if db_entry.full_key.starts_with(full_path) { + // partial match means this is a directory + ManifestEntry::directory(full_path.to_owned()) + } else { + panic!("got non-matching row: {}, searched: {}", db_entry.full_key, full_path); + }; + + Ok(Some(entry)) + } +} + +#[derive(Debug)] +struct DbEntry { + full_key: String, + etag: String, + size: usize, +} + +impl DbEntry { + fn from(row: &rusqlite::Row) -> Result { + Ok(Self { + full_key: row.get(0)?, + etag: row.get(1)?, + size: row.get(2)?, + }) + } +} + +#[derive(Debug, Clone)] +struct Db { + conn: Arc>, +} + +impl Db { + fn new() -> Result { + let db_path = "./s3_objects.db3"; + let conn = Connection::open(db_path)?; + Ok(Self { + conn: Arc::new(Mutex::new(conn)), // TODO: no mutex? serialized mode of sqlite? + }) + } + + fn select_entry_or_child( + &self, + dir_search_start: &str, + dir_search_end: &str, + file_search: &str, + ) -> Result, rusqlite::Error> { + let query = "SELECT key, etag, size FROM s3_objects where (key > ?1 and key < ?2) or key = ?3 LIMIT 1"; let conn = self.conn.lock().expect("lock must succeed"); let mut stmt = conn.prepare(query)?; let manifest_entry = stmt .query_map((dir_search_start, dir_search_end, file_search), |row| { - let found_key: String = row.get(0)?; - trace!( - "found entry in the manifest: {}, searched for: {}", - found_key, - full_path - ); - - let entry = if found_key == full_path { - // exact match means this is a file - ManifestEntry::File { - full_key: found_key, - etag: row.get(1)?, - size: row.get(2)?, - } - } else if found_key.starts_with(full_path) { - // partial match means this is a directory - ManifestEntry::Directory { full_key: found_key } - } else { - panic!("got non-matching row: {}, searched: {}", found_key, full_path); - }; - - Ok(entry) + DbEntry::from(row) })? .next(); manifest_entry.map_or(Ok(None), |v| v.map(Some)) } + + fn select_children( + &self, + dir_search_start: &str, + dir_search_end: Option<&str>, + batch_size: usize, + ) -> Result, rusqlite::Error> { + let conn = self.conn.lock().expect("lock must succeed"); + if let Some(dir_search_end) = dir_search_end { + let query = "SELECT key, etag, size FROM s3_objects where key > ?1 and key < ?2 ORDER BY key LIMIT ?3"; + let query_params = (dir_search_start, dir_search_end, batch_size); + let mut stmt = conn.prepare(query)?; + let result: Result, _> = stmt.query_map(query_params, |row| DbEntry::from(row))?.collect(); + result + } else { + let query = "SELECT key, etag, size FROM s3_objects where key > ?1 ORDER BY key LIMIT ?2"; + let query_params = (dir_search_start, batch_size); + let mut stmt = conn.prepare(query)?; + let result: Result, _> = stmt.query_map(query_params, |row| DbEntry::from(row))?.collect(); + result + } + } } #[derive(Debug)] pub struct ManifestIter { - manifest: Manifest, + db: Db, + /// Prepared entries in order to be returned by the iterator. + entries: VecDeque, + /// Key of the directory being listed by this iterator + parent_key: String, + /// Next key to search for in the database search_from_key: String, + /// Name of the last subdirectory pushed to self.entries, used for deduplication + last_subdir_name: Option, + /// Max amount of entries to read from the database at once + batch_size: usize, + /// Database has no more entries + finished: bool, } impl ManifestIter { - /// Locate the index of the directory in the manifest and create an iterator - fn new(manifest: Manifest, bucket: &str, full_key: &str) -> Result { - let full_key = full_key.to_owned(); - trace!("searching for an entry in the manifest starting with: {}", full_key); + fn new(db: Db, _bucket: &str, parent_key: &str) -> Result { + let parent_key = parent_key.to_owned(); + let batch_size = 1000; + let search_from_key = parent_key.clone(); Ok(Self { - manifest, - search_from_key: full_key, + db, + entries: Default::default(), + parent_key, + search_from_key, + last_subdir_name: None, + batch_size, + finished: false, }) } - /// Iterate over the manifest entries, skipping subdirectories + /// Next child of the directory pub fn next(&mut self) -> Result, InodeError> { - Ok(None) + if self.entries.is_empty() { + self.search_next_entries() + .inspect_err(|err| error!("failed to query the database: {}", err)) + .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: ManifestError::DbError + } + + Ok(self.entries.pop_front()) + } + + /// Load next batch of entries from the database, inferring subdirectories and filtering out ancestors of those + fn search_next_entries(&mut self) -> Result<(), rusqlite::Error> { + let dir_search_end = if self.parent_key.is_empty() { + None + } else { + let mut dir_search_end = self.parent_key[..self.parent_key.len() - 1].to_owned(); + dir_search_end.push('0'); // any child of [self.parent_key] directory will have a key which is "less" than this + Some(dir_search_end) + }; + + // Given that we filter loaded entries, we may need multiple requests to the db + while self.entries.is_empty() && !self.finished { + let start = Instant::now(); + let db_entries = + self.db + .select_children(&self.search_from_key, dir_search_end.as_deref(), self.batch_size)?; + trace!("list db search completed in {:?}", start.elapsed()); + + if db_entries.len() < self.batch_size { + self.finished = true; + } + + if let Some(last_entry) = db_entries.last() { + self.search_from_key = last_entry.full_key.clone(); + } + + for db_entry in db_entries { + let relative_key = &db_entry.full_key[self.parent_key.len()..]; + let components: Vec<&str> = relative_key.split('/').collect(); // todo: handle "//" and other weird names? empty? + let first_path_component = components[0]; + let manifest_entry = if components.len() == 1 { + // this file is a direct child of the listed directory + ManifestEntry::file(db_entry) + } else if self.last_subdir_name.as_deref() != Some(first_path_component) { + // infer a subdirectory, discarding the irrelevant part of the path + self.last_subdir_name = Some(first_path_component.to_owned()); + let subdir_full_key = format!("{}{}/", self.parent_key, first_path_component); + ManifestEntry::directory(subdir_full_key) + } else { + // skipping subdirectory which was already pushed to self.entries + continue; + }; + self.entries.push_back(manifest_entry); + } + } + + Ok(()) } } diff --git a/mountpoint-s3/src/superblock.rs b/mountpoint-s3/src/superblock.rs index 393f8792e..35f928375 100644 --- a/mountpoint-s3/src/superblock.rs +++ b/mountpoint-s3/src/superblock.rs @@ -38,12 +38,12 @@ use tracing::{debug, error, trace, warn}; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; use crate::fs::CacheConfig; +use crate::logging; use crate::manifest::{Manifest, ManifestEntry}; use crate::prefix::Prefix; use crate::s3::S3Personality; use crate::sync::atomic::{AtomicU64, Ordering}; use crate::sync::{Arc, RwLock}; -use crate::{logging, manifest}; mod expiry; use expiry::Expiry; diff --git a/mountpoint-s3/src/superblock/readdir.rs b/mountpoint-s3/src/superblock/readdir.rs index 892ac40aa..7208aaeae 100644 --- a/mountpoint-s3/src/superblock/readdir.rs +++ b/mountpoint-s3/src/superblock/readdir.rs @@ -103,7 +103,7 @@ impl ReaddirHandle { let iter = if inner.config.use_manifest { trace!("using manifest readdir iter"); ReaddirIter::manifest( - inner.manifest.clone().expect("manifest should be set"), + inner.manifest.as_ref().expect("manifest should be set"), &inner.bucket, &full_path, )? @@ -310,7 +310,7 @@ impl ReaddirIter { Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries)) } - fn manifest(manifest: Manifest, bucket: &str, full_path: &str) -> Result { + fn manifest(manifest: &Manifest, bucket: &str, full_path: &str) -> Result { Ok(Self::Manifest(manifest::ReaddirIter::new( manifest.iter(bucket, full_path)?, full_path.len(),