Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/cache/async_backed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,14 @@ where
///
/// Suitable for seeding the cache before async operations begin.
pub fn insert_sync(&self, key: K, value: V) {
drop(self.map.insert_sync(key, Slot::Ready(value)));
match self.map.entry_sync(key) {
scc::hash_map::Entry::Occupied(mut occ) => {
*occ.get_mut() = Slot::Ready(value);
}
scc::hash_map::Entry::Vacant(vac) => {
vac.insert_entry(Slot::Ready(value));
}
}
}

/// Synchronously remove the entry for `key`, returning `true` if it was present.
Expand All @@ -419,6 +426,14 @@ where
})
.is_some()
}

/// Synchronously check whether an entry exists for `key`.
///
/// Returns `true` for both `Ready` and `InFlight` entries.
#[must_use]
pub fn contains_sync(&self, key: &K) -> bool {
self.map.read_sync(key, |_, _| ()).is_some()
}
}

/// Drop guard that synchronously promotes an `InFlight` entry to `Ready` if the caller
Expand Down
145 changes: 141 additions & 4 deletions lib/fs/async_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ pub trait FsDataProvider: Clone + Send + Sync + 'static {
/// Never called directly -- [`InodeForget::delete`] invokes it
/// automatically when the refcount drops to zero.
fn forget(&self, _addr: InodeAddr) {}

/// Write data to a file at the given offset.
///
/// The default implementation returns `EROFS` (read-only filesystem).
/// Providers that support writes should override this.
fn write(
&self,
_inode: INode,
_offset: u64,
_data: Bytes,
) -> impl Future<Output = Result<u32, std::io::Error>> + Send {
async { Err(std::io::Error::from_raw_os_error(libc::EROFS)) }
}
}

/// Zero-sized cleanup tag for inode eviction.
Expand Down Expand Up @@ -114,17 +127,25 @@ pub struct ForgetContext<DP: FsDataProvider> {
pub lookup_cache: Arc<IndexedLookupCache>,
/// The data provider for provider-specific cleanup.
pub provider: DP,
/// Write overlay — inodes present here must not be evicted.
pub write_overlay: Arc<FutureBackedCache<InodeAddr, Bytes>>,
}

/// Evicts the inode from the table, directory cache, and lookup cache, then
/// delegates to [`FsDataProvider::forget`] so the provider can clean up its
/// own auxiliary state.
///
/// Inodes that have locally-written data (present in `write_overlay`) are
/// skipped — they must persist for the lifetime of the mount.
///
/// The lookup cache cleanup removes all entries referencing the forgotten
/// inode (as parent or child) via the [`IndexedLookupCache`]'s reverse
/// index, ensuring O(k) eviction instead of O(N) full-cache scan.
impl<DP: FsDataProvider> StatelessDrop<ForgetContext<DP>, InodeAddr> for InodeForget {
fn delete(ctx: &ForgetContext<DP>, key: &InodeAddr) {
if ctx.write_overlay.contains_sync(key) {
return;
}
let addr = *key;
ctx.inode_table.remove_sync(key);
ctx.dcache.evict(LoadedAddr::new_unchecked(addr));
Expand All @@ -144,6 +165,42 @@ pub struct ResolvedINode {
pub inode: INode,
}

/// A file reader that checks the write overlay first, falling back to the
/// underlying provider reader.
pub struct OverlayReader<R: FileReader> {
/// The inode address this reader is for.
pub addr: InodeAddr,
write_overlay: Arc<FutureBackedCache<InodeAddr, Bytes>>,
inner: R,
}

impl<R: FileReader> std::fmt::Debug for OverlayReader<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OverlayReader")
.field("addr", &self.addr)
.finish_non_exhaustive()
}
}

impl<R: FileReader> FileReader for OverlayReader<R> {
#[expect(
clippy::cast_possible_truncation,
reason = "offset and size fit in usize on supported 64-bit platforms"
)]
async fn read(&self, offset: u64, size: u32) -> Result<Bytes, std::io::Error> {
if let Some(data) = self.write_overlay.get(&self.addr).await {
let start = (offset as usize).min(data.len());
let end = (start + size as usize).min(data.len());
return Ok(data.slice(start..end));
}
self.inner.read(offset, size).await
}

async fn close(&self) -> Result<(), std::io::Error> {
self.inner.close().await
}
}

/// An open file that provides read access.
///
/// Returned by [`AsyncFs::open`]. The caller owns this handle and uses
Expand Down Expand Up @@ -337,6 +394,13 @@ pub struct AsyncFs<DP: FsDataProvider> {

/// Bounds the number of concurrent background prefetch tasks.
prefetch_semaphore: Arc<Semaphore>,

/// Overlay cache for locally-written file data.
///
/// Keyed by inode address, valued by the full file content. Written
/// entries are never evicted by `InodeForget` — they persist for the
/// lifetime of the mount.
write_overlay: Arc<FutureBackedCache<InodeAddr, Bytes>>,
}

impl<DP: FsDataProvider> AsyncFs<DP> {
Expand All @@ -357,6 +421,7 @@ impl<DP: FsDataProvider> AsyncFs<DP> {
data_provider,
next_fh: AtomicU64::new(1),
prefetch_semaphore: Arc::new(Semaphore::new(MAX_PREFETCH_CONCURRENCY)),
write_overlay: Arc::new(FutureBackedCache::default()),
}
}

Expand All @@ -376,6 +441,7 @@ impl<DP: FsDataProvider> AsyncFs<DP> {
data_provider,
next_fh: AtomicU64::new(1),
prefetch_semaphore: Arc::new(Semaphore::new(MAX_PREFETCH_CONCURRENCY)),
write_overlay: Arc::new(FutureBackedCache::default()),
}
}

Expand Down Expand Up @@ -528,25 +594,96 @@ impl<DP: FsDataProvider> AsyncFs<DP> {
/// Open a file for reading.
///
/// Validates the inode is not a directory, delegates to the data provider
/// to create a [`FileReader`], and returns an [`OpenFile`] that the caller
/// owns. Reads go through [`OpenFile::read`].
/// to create a [`FileReader`], and returns an [`OpenFile`] wrapping an
/// [`OverlayReader`] that checks the write overlay first, falling back
/// to the provider reader.
pub async fn open(
&self,
addr: LoadedAddr,
flags: OpenFlags,
) -> Result<OpenFile<DP::Reader>, std::io::Error> {
) -> Result<OpenFile<OverlayReader<DP::Reader>>, std::io::Error> {
let inode = self.loaded_inode(addr).await?;
if inode.itype == INodeType::Directory {
return Err(std::io::Error::from_raw_os_error(libc::EISDIR));
}
let reader = self.data_provider.open(inode, flags).await?;
let overlay_reader = OverlayReader {
addr: addr.addr(),
write_overlay: Arc::clone(&self.write_overlay),
inner: reader,
};
let fh = self.next_fh.fetch_add(1, Ordering::Relaxed);
Ok(OpenFile {
fh,
reader: Arc::new(reader),
reader: Arc::new(overlay_reader),
})
}

/// Write data to a file at the given offset.
///
/// Stores the result in the write overlay cache. The overlay takes
/// precedence over the data provider on subsequent reads. Also calls
/// [`FsDataProvider::write`] so the provider can log or forward as needed.
pub async fn write(
&self,
addr: LoadedAddr,
offset: u64,
data: Bytes,
) -> Result<u32, std::io::Error> {
let inode = self.loaded_inode(addr).await?;
if inode.itype == INodeType::Directory {
return Err(std::io::Error::from_raw_os_error(libc::EISDIR));
}

#[expect(
clippy::cast_possible_truncation,
reason = "data.len() fits in u32 (FUSE writes are at most 128 KiB)"
)]
let bytes_written = data.len() as u32;

// Merge with existing overlay content, if any.
let existing = self.write_overlay.get(&addr.addr()).await;
let mut buf = existing.map_or_else(Vec::new, |b| b.to_vec());

#[expect(
clippy::cast_possible_truncation,
reason = "offset fits in usize on supported 64-bit platforms"
)]
let offset_usize = offset as usize;
if offset_usize > buf.len() {
buf.resize(offset_usize, 0);
}
let end = offset_usize + data.len();
if end > buf.len() {
buf.resize(end, 0);
}
buf[offset_usize..end].copy_from_slice(&data);

let new_content = Bytes::from(buf);
let new_size = new_content.len() as u64;
self.write_overlay.insert_sync(addr.addr(), new_content);

// Update inode size in the table.
let mut updated = inode;
updated.size = new_size;
updated.last_modified_at = std::time::SystemTime::now();
self.inode_table.insert_sync(addr.addr(), updated);

// Notify the data provider (for logging / future forwarding).
drop(self.data_provider.write(inode, offset, data).await);

Ok(bytes_written)
}

/// Returns a clone of the write overlay handle.
///
/// Used by the FUSE adapter to pass into `ForgetContext` so that
/// `InodeForget` can check whether an inode has locally-written data.
#[must_use]
pub fn write_overlay(&self) -> Arc<FutureBackedCache<InodeAddr, Bytes>> {
Arc::clone(&self.write_overlay)
}

/// Evict an inode from the inode table and notify the data provider.
///
/// Called by the composite layer when propagating `forget` to a child
Expand Down
37 changes: 34 additions & 3 deletions lib/fs/composite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use bytes::Bytes;

use crate::cache::async_backed::FutureBackedCache;
use crate::fs::async_fs::{AsyncFs, FileReader, FsDataProvider, OpenFile};
use crate::fs::async_fs::{AsyncFs, FileReader, FsDataProvider, OpenFile, OverlayReader};
use crate::fs::bridge::ConcurrentBridge;
use crate::fs::{INode, INodeType, InodeAddr, InodePerms, LoadedAddr, OpenFlags, ROOT_INO};

Expand Down Expand Up @@ -283,7 +283,8 @@ where
R::ChildDP: Clone,
<<R as CompositeRoot>::ChildDP as FsDataProvider>::Reader: 'static,
{
type Reader = CompositeReader<<<R as CompositeRoot>::ChildDP as FsDataProvider>::Reader>;
type Reader =
CompositeReader<OverlayReader<<<R as CompositeRoot>::ChildDP as FsDataProvider>::Reader>>;

async fn lookup(&self, parent: INode, name: &OsStr) -> Result<INode, std::io::Error> {
if parent.addr == ROOT_INO {
Expand Down Expand Up @@ -416,7 +417,9 @@ where

let inner_ino = inner_ino.ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENOENT))?;

let open_file: OpenFile<<<R as CompositeRoot>::ChildDP as FsDataProvider>::Reader> = child
let open_file: OpenFile<
OverlayReader<<<R as CompositeRoot>::ChildDP as FsDataProvider>::Reader>,
> = child
.get_fs()
.open(LoadedAddr::new_unchecked(inner_ino), flags)
.await?;
Expand All @@ -426,6 +429,34 @@ where
})
}

async fn write(&self, inode: INode, offset: u64, data: Bytes) -> Result<u32, std::io::Error> {
if inode.addr == ROOT_INO {
return Err(std::io::Error::from_raw_os_error(libc::EROFS));
}

let slot_idx = self
.inner
.addr_to_slot
.read_sync(&inode.addr, |_, &v| v)
.ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENOENT))?;

let (child, inner_addr) = self
.inner
.slots
.read_sync(&slot_idx, |_, slot| {
(Arc::clone(&slot.inner), slot.bridge.forward(inode.addr))
})
.ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENOENT))?;

let inner_addr =
inner_addr.ok_or_else(|| std::io::Error::from_raw_os_error(libc::ENOENT))?;

child
.get_fs()
.write(LoadedAddr::new_unchecked(inner_addr), offset, data)
.await
}

/// Removes the composite-level address from the child's bridge map and
/// then from `addr_to_slot`. When the bridge becomes empty, the slot
/// and its `name_to_slot` entry are garbage-collected.
Expand Down
Loading
Loading