diff --git a/Cargo.lock b/Cargo.lock index 6e9acfc39..ec2bf737d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1548,6 +1548,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.3.0" @@ -1885,6 +1897,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -2433,6 +2454,17 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "libsqlite3-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb8270bb4060bd76c6e96f20c52d80620f1d82a3470885694e41e0f81ef6fe7" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.21" @@ -2603,6 +2635,7 @@ dependencies = [ "rand", "rand_chacha", "regex", + "rusqlite", "serde", "serde_json", "serial_test", @@ -3365,6 +3398,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rusqlite" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e34486da88d8e051c7c0e23c3f15fd806ea8546260aa2fec247e97242ec143" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-demangle" version = "0.1.24" diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 9108361ca..8ac78ea7f 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -739,7 +739,8 @@ pub enum RestoreStatus { /// See [Object](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Object.html) in the *Amazon S3 /// API Reference* for more details. #[derive(Debug, Clone)] -#[non_exhaustive] +// TODO: builder pattern? return different type from readdir iter? +// #[non_exhaustive] pub struct ObjectInfo { /// Key for this object. pub key: String, diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index f2e95652d..2af4e36d2 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -46,6 +46,7 @@ time = { version = "0.3.37", features = ["macros", "formatting"] } tracing = { version = "0.1.41", features = ["log"] } tracing-log = "0.2.0" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } +rusqlite = { version = "0.34.0", features = ["bundled"] } [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.17.0", default-features = false } diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index eb7ee4da5..c648ab75f 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -163,6 +163,7 @@ where let superblock_config = SuperblockConfig { cache_config: config.cache_config.clone(), s3_personality: config.s3_personality, + use_manifest: true, }; let superblock = Superblock::new(bucket, prefix, superblock_config); let mem_limiter = Arc::new(MemoryLimiter::new(client.clone(), config.mem_limit)); diff --git a/mountpoint-s3/src/lib.rs b/mountpoint-s3/src/lib.rs index 1c6251fa2..9f3f7f0e1 100644 --- a/mountpoint-s3/src/lib.rs +++ b/mountpoint-s3/src/lib.rs @@ -7,6 +7,7 @@ pub mod data_cache; pub mod fs; pub mod fuse; pub mod logging; +mod manifest; pub mod mem_limiter; pub mod metrics; pub mod object; diff --git a/mountpoint-s3/src/manifest.rs b/mountpoint-s3/src/manifest.rs new file mode 100644 index 000000000..6df5c19da --- /dev/null +++ b/mountpoint-s3/src/manifest.rs @@ -0,0 +1,283 @@ +use crate::sync::{Arc, Mutex}; +use std::collections::VecDeque; +use std::time::Instant; + +use rusqlite::Connection; +use tracing::{error, trace}; + +use crate::superblock::{Inode, InodeError, InodeKind}; + +/// An entry returned by manifest_lookup() and ManifestIter::next() +#[derive(Debug, Clone)] +pub enum ManifestEntry { + File { + full_key: String, + etag: String, + size: usize, + }, + Directory { + full_key: String, // let's assume it always ends with '/' + }, +} + +impl ManifestEntry { + fn file(db_entry: DbEntry) -> Self { + ManifestEntry::File { + full_key: db_entry.full_key, + etag: db_entry.etag, + size: db_entry.size, + } + } + + fn directory(full_key: String) -> Self { + ManifestEntry::Directory { full_key } + } +} + +/// Manifest of all available objects in the bucket +#[derive(Debug)] +pub struct Manifest { + db: Db, +} + +impl Manifest { + pub fn new() -> Result { + let db = Db::new()?; + Ok(Self { db }) + } + + /// Lookup an entry in the manifest, the result may be a file or a directory + pub fn manifest_lookup( + &self, + parent: Inode, + parent_full_path: String, + name: &str, + ) -> Result { + trace!("using manifest to lookup {} in {}", name, parent_full_path); + + if parent.kind() != InodeKind::Directory { + return Err(InodeError::NotADirectory(parent.err())); + } + + let mut full_path = parent_full_path; + full_path.push_str(name); + + // search for an entry + let start = Instant::now(); + let manifest_entry = self + .search_manifest_entry(&full_path) + .inspect_err(|err| error!("failed to query the database: {}", err)) + .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: ManifestError::DbError + trace!("lookup db search completed in {:?}", start.elapsed()); + + // return an inode or error + match manifest_entry { + Some(manifest_entry) => Ok(manifest_entry.clone()), + None => Err(InodeError::FileDoesNotExist(name.to_owned(), parent.err())), + } + } + + /// Create an iterator over directory's direct children + pub fn iter(&self, bucket: &str, directory_full_path: &str) -> Result { + ManifestIter::new(self.db.clone(), bucket, directory_full_path) + } + + /// Search an entry in the manifest that matches the path, a partial match is expected for a directory + fn search_manifest_entry(&self, full_path: &str) -> Result, rusqlite::Error> { + let dir_search_start = format!("{full_path}/"); + let dir_search_end = format!("{full_path}0"); // any child of [full_path] directory will have a key which is "less" than this + let file_search = full_path; + + let db_entry = self + .db + .select_entry_or_child(&dir_search_start, &dir_search_end, file_search)?; + let Some(db_entry) = db_entry else { + return Ok(None); + }; + + trace!( + "found entry in the manifest: {}, searched for: {}", + db_entry.full_key, + full_path + ); + + let entry = if db_entry.full_key == full_path { + // exact match means this is a file + ManifestEntry::file(db_entry) + } else if db_entry.full_key.starts_with(full_path) { + // partial match means this is a directory + ManifestEntry::directory(full_path.to_owned()) + } else { + panic!("got non-matching row: {}, searched: {}", db_entry.full_key, full_path); + }; + + Ok(Some(entry)) + } +} + +#[derive(Debug)] +struct DbEntry { + full_key: String, + etag: String, + size: usize, +} + +impl DbEntry { + fn from(row: &rusqlite::Row) -> Result { + Ok(Self { + full_key: row.get(0)?, + etag: row.get(1)?, + size: row.get(2)?, + }) + } +} + +#[derive(Debug, Clone)] +struct Db { + conn: Arc>, +} + +impl Db { + fn new() -> Result { + let db_path = "./s3_objects.db3"; + let conn = Connection::open(db_path)?; + Ok(Self { + conn: Arc::new(Mutex::new(conn)), // TODO: no mutex? serialized mode of sqlite? + }) + } + + fn select_entry_or_child( + &self, + dir_search_start: &str, + dir_search_end: &str, + file_search: &str, + ) -> Result, rusqlite::Error> { + let query = "SELECT key, etag, size FROM s3_objects where (key > ?1 and key < ?2) or key = ?3 LIMIT 1"; + let conn = self.conn.lock().expect("lock must succeed"); + let mut stmt = conn.prepare(query)?; + let manifest_entry = stmt + .query_map((dir_search_start, dir_search_end, file_search), |row| { + DbEntry::from(row) + })? + .next(); + + manifest_entry.map_or(Ok(None), |v| v.map(Some)) + } + + fn select_children( + &self, + dir_search_start: &str, + dir_search_end: Option<&str>, + batch_size: usize, + ) -> Result, rusqlite::Error> { + let conn = self.conn.lock().expect("lock must succeed"); + if let Some(dir_search_end) = dir_search_end { + let query = "SELECT key, etag, size FROM s3_objects where key > ?1 and key < ?2 ORDER BY key LIMIT ?3"; + let query_params = (dir_search_start, dir_search_end, batch_size); + let mut stmt = conn.prepare(query)?; + let result: Result, _> = stmt.query_map(query_params, |row| DbEntry::from(row))?.collect(); + result + } else { + let query = "SELECT key, etag, size FROM s3_objects where key > ?1 ORDER BY key LIMIT ?2"; + let query_params = (dir_search_start, batch_size); + let mut stmt = conn.prepare(query)?; + let result: Result, _> = stmt.query_map(query_params, |row| DbEntry::from(row))?.collect(); + result + } + } +} + +#[derive(Debug)] +pub struct ManifestIter { + db: Db, + /// Prepared entries in order to be returned by the iterator. + entries: VecDeque, + /// Key of the directory being listed by this iterator + parent_key: String, + /// Next key to search for in the database + search_from_key: String, + /// Name of the last subdirectory pushed to self.entries, used for deduplication + last_subdir_name: Option, + /// Max amount of entries to read from the database at once + batch_size: usize, + /// Database has no more entries + finished: bool, +} + +impl ManifestIter { + fn new(db: Db, _bucket: &str, parent_key: &str) -> Result { + let parent_key = parent_key.to_owned(); + + let batch_size = 1000; + let search_from_key = parent_key.clone(); + Ok(Self { + db, + entries: Default::default(), + parent_key, + search_from_key, + last_subdir_name: None, + batch_size, + finished: false, + }) + } + + /// Next child of the directory + pub fn next(&mut self) -> Result, InodeError> { + if self.entries.is_empty() { + self.search_next_entries() + .inspect_err(|err| error!("failed to query the database: {}", err)) + .map_err(|_| InodeError::InodeDoesNotExist(0))?; // TODO: ManifestError::DbError + } + + Ok(self.entries.pop_front()) + } + + /// Load next batch of entries from the database, inferring subdirectories and filtering out ancestors of those + fn search_next_entries(&mut self) -> Result<(), rusqlite::Error> { + let dir_search_end = if self.parent_key.is_empty() { + None + } else { + let mut dir_search_end = self.parent_key[..self.parent_key.len() - 1].to_owned(); + dir_search_end.push('0'); // any child of [self.parent_key] directory will have a key which is "less" than this + Some(dir_search_end) + }; + + // Given that we filter loaded entries, we may need multiple requests to the db + while self.entries.is_empty() && !self.finished { + let start = Instant::now(); + let db_entries = + self.db + .select_children(&self.search_from_key, dir_search_end.as_deref(), self.batch_size)?; + trace!("list db search completed in {:?}", start.elapsed()); + + if db_entries.len() < self.batch_size { + self.finished = true; + } + + if let Some(last_entry) = db_entries.last() { + self.search_from_key = last_entry.full_key.clone(); + } + + for db_entry in db_entries { + let relative_key = &db_entry.full_key[self.parent_key.len()..]; + let components: Vec<&str> = relative_key.split('/').collect(); // todo: handle "//" and other weird names? empty? + let first_path_component = components[0]; + let manifest_entry = if components.len() == 1 { + // this file is a direct child of the listed directory + ManifestEntry::file(db_entry) + } else if self.last_subdir_name.as_deref() != Some(first_path_component) { + // infer a subdirectory, discarding the irrelevant part of the path + self.last_subdir_name = Some(first_path_component.to_owned()); + let subdir_full_key = format!("{}{}/", self.parent_key, first_path_component); + ManifestEntry::directory(subdir_full_key) + } else { + // skipping subdirectory which was already pushed to self.entries + continue; + }; + self.entries.push_back(manifest_entry); + } + } + + Ok(()) + } +} diff --git a/mountpoint-s3/src/superblock.rs b/mountpoint-s3/src/superblock.rs index c6422ae0e..35f928375 100644 --- a/mountpoint-s3/src/superblock.rs +++ b/mountpoint-s3/src/superblock.rs @@ -39,6 +39,7 @@ use tracing::{debug, error, trace, warn}; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; use crate::fs::CacheConfig; use crate::logging; +use crate::manifest::{Manifest, ManifestEntry}; use crate::prefix::Prefix; use crate::s3::S3Personality; use crate::sync::atomic::{AtomicU64, Ordering}; @@ -73,6 +74,7 @@ struct SuperblockInner { next_ino: AtomicU64, mount_time: OffsetDateTime, config: SuperblockConfig, + manifest: Option, } /// Configuration for superblock operations @@ -80,6 +82,7 @@ struct SuperblockInner { pub struct SuperblockConfig { pub cache_config: CacheConfig, pub s3_personality: S3Personality, + pub use_manifest: bool, } impl Superblock { @@ -96,6 +99,12 @@ impl Superblock { config.cache_config.negative_cache_ttl, ); + let manifest = if config.use_manifest { + Some(Manifest::new().expect("manifest must be created")) + } else { + None + }; + let inner = SuperblockInner { bucket: bucket.to_owned(), prefix: prefix.clone(), @@ -104,6 +113,7 @@ impl Superblock { next_ino: AtomicU64::new(2), mount_time, config, + manifest, }; Self { inner: Arc::new(inner) } } @@ -602,7 +612,11 @@ impl SuperblockInner { let lookup = match lookup { Some(lookup) => lookup?, None => { - let remote = self.remote_lookup(client, parent_ino, name).await?; + let remote = if let Some(manifest) = &self.manifest { + self.manifest_lookup(manifest, parent_ino, name)? + } else { + self.remote_lookup(client, parent_ino, name).await? + }; self.update_from_remote(parent_ino, name, remote)? } }; @@ -658,6 +672,36 @@ impl SuperblockInner { lookup } + /// Lookup in the [Manifest] and convert the entry to [RemoteLookup] + fn manifest_lookup( + &self, + manifest: &Manifest, + parent_ino: InodeNo, + name: &str, + ) -> Result, InodeError> { + let parent = self.get(parent_ino)?; + let parent_full_path = self.full_key_for_inode(&parent); + let mount_time = OffsetDateTime::now_utc(); // todo: mount time + let remote_lookup = match manifest.manifest_lookup(parent, parent_full_path, name)? { + ManifestEntry::File { etag, size, .. } => RemoteLookup { + kind: InodeKind::File, + stat: InodeStat::for_file( + size, + mount_time, + Some(etag), + None, + None, + self.config.cache_config.file_ttl, + ), + }, + ManifestEntry::Directory { .. } => RemoteLookup { + kind: InodeKind::Directory, + stat: InodeStat::for_directory(mount_time, self.config.cache_config.dir_ttl), + }, + }; + Ok(Some(remote_lookup)) + } + /// Lookup an inode in the parent directory with the given name /// on the remote client. async fn remote_lookup( diff --git a/mountpoint-s3/src/superblock/readdir.rs b/mountpoint-s3/src/superblock/readdir.rs index 0a7a4ca07..7208aaeae 100644 --- a/mountpoint-s3/src/superblock/readdir.rs +++ b/mountpoint-s3/src/superblock/readdir.rs @@ -44,6 +44,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; +use crate::manifest::{Manifest, ManifestEntry}; use mountpoint_s3_client::types::ObjectInfo; use mountpoint_s3_client::ObjectClient; use tracing::{error, trace, warn}; @@ -99,7 +100,14 @@ impl ReaddirHandle { } }; - let iter = if inner.config.s3_personality.is_list_ordered() { + let iter = if inner.config.use_manifest { + trace!("using manifest readdir iter"); + ReaddirIter::manifest( + inner.manifest.as_ref().expect("manifest should be set"), + &inner.bucket, + &full_path, + )? + } else if inner.config.s3_personality.is_list_ordered() { ReaddirIter::ordered(&inner.bucket, &full_path, page_size, local_entries.into()) } else { ReaddirIter::unordered(&inner.bucket, &full_path, page_size, local_entries.into()) @@ -290,6 +298,7 @@ impl Ord for ReaddirEntry { enum ReaddirIter { Ordered(ordered::ReaddirIter), Unordered(unordered::ReaddirIter), + Manifest(manifest::ReaddirIter), } impl ReaddirIter { @@ -301,10 +310,18 @@ impl ReaddirIter { Self::Unordered(unordered::ReaddirIter::new(bucket, full_path, page_size, local_entries)) } + fn manifest(manifest: &Manifest, bucket: &str, full_path: &str) -> Result { + Ok(Self::Manifest(manifest::ReaddirIter::new( + manifest.iter(bucket, full_path)?, + full_path.len(), + ))) + } + async fn next(&mut self, client: &impl ObjectClient) -> Result, InodeError> { match self { Self::Ordered(iter) => iter.next(client).await, Self::Unordered(iter) => iter.next(client).await, + Self::Manifest(iter) => iter.next(), } } } @@ -574,3 +591,53 @@ mod unordered { } } } + +mod manifest { + use time::OffsetDateTime; + + use crate::manifest::ManifestIter; + + use super::*; + + /// Adaptor for [ManifestIter], converts [ManifestEntry] to [ReaddirEntry] + #[derive(Debug)] + pub struct ReaddirIter { + manifest_iter: ManifestIter, + full_path_len: usize, + } + + impl ReaddirIter { + pub(super) fn new(manifest_iter: ManifestIter, full_path_len: usize) -> Self { + Self { + manifest_iter, + full_path_len, + } + } + + /// Return the next [ReaddirEntry] for the directory stream. If the stream is finished, returns + /// `Ok(None)`. + pub(super) fn next(&mut self) -> Result, InodeError> { + let readdir_entry = match self.manifest_iter.next()? { + Some(ManifestEntry::File { full_key, etag, size }) => { + let name = full_key[self.full_path_len..].to_owned(); + let object_info = ObjectInfo { + key: full_key.clone(), + size: size as u64, + last_modified: OffsetDateTime::now_utc(), + storage_class: None, + restore_status: None, + etag: etag.clone(), + checksum_algorithms: Default::default(), // TODO: what are the implications? + }; + Some(ReaddirEntry::RemoteObject { name, object_info }) + } + Some(ManifestEntry::Directory { full_key, .. }) => { + let name = full_key[self.full_path_len..full_key.len() - 1].to_owned(); + Some(ReaddirEntry::RemotePrefix { name }) + } + None => None, + }; + Ok(readdir_entry) + } + } +}