From d89374154817ea74a703f12d6daa14538069666c Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Tue, 10 Feb 2026 23:30:43 -0800 Subject: [PATCH 01/11] MES-696: Enable prefetching --- src/fs/icache/async_cache.rs | 239 ++++++++++++++++++++++++++++++++--- src/fs/mescloud/icache.rs | 8 ++ src/fs/mescloud/org.rs | 12 ++ src/fs/mescloud/repo.rs | 20 +++ 4 files changed, 259 insertions(+), 20 deletions(-) diff --git a/src/fs/icache/async_cache.rs b/src/fs/icache/async_cache.rs index 84003da..0969d3b 100644 --- a/src/fs/icache/async_cache.rs +++ b/src/fs/icache/async_cache.rs @@ -1,7 +1,9 @@ //! Async inode cache with InFlight/Available state machine. use std::future::Future; +use std::sync::Arc; +use futures::future::join_all; use scc::HashMap as ConcurrentHashMap; use tokio::sync::watch; @@ -60,13 +62,28 @@ pub trait IcbResolver: Send + Sync { Self: Sized; } +/// Shared interior of [`AsyncICache`], behind an `Arc` so the cache can be +/// cheaply cloned for fire-and-forget background work (e.g. `spawn_prefetch`). +struct AsyncICacheInner { + resolver: R, + inode_table: ConcurrentHashMap>, +} + /// Async, concurrency-safe inode cache. /// /// All methods take `&self` — internal synchronization is provided by -/// `scc::HashMap` (sharded lock-free map). +/// `scc::HashMap` (sharded lock-free map). The cache is cheaply cloneable +/// (via `Arc`) so that prefetch work can be spawned in the background. pub struct AsyncICache { - resolver: R, - inode_table: ConcurrentHashMap>, + inner: Arc>, +} + +impl Clone for AsyncICache { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } } impl AsyncICache { @@ -79,14 +96,16 @@ impl AsyncICache { IcbState::Available(R::Icb::new_root(root_path.into())), )); Self { - resolver, - inode_table: table, + inner: Arc::new(AsyncICacheInner { + resolver, + inode_table: table, + }), } } /// Number of entries (`InFlight` + `Available`) in the table. pub fn inode_count(&self) -> usize { - self.inode_table.len() + self.inner.inode_table.len() } /// Wait until `ino` is `Available`. @@ -96,6 +115,7 @@ impl AsyncICache { async fn wait_for_available(&self, ino: Inode) -> bool { loop { let rx = self + .inner .inode_table .read_async(&ino, |_, s| match s { IcbState::InFlight(rx) => Some(rx.clone()), @@ -123,7 +143,7 @@ impl AsyncICache { /// This is a non-blocking, synchronous check. It does **not** wait for /// `InFlight` entries to resolve. pub fn contains(&self, ino: Inode) -> bool { - self.inode_table.contains_sync(&ino) + self.inner.inode_table.contains_sync(&ino) } /// Read an ICB via closure. **Awaits** if `InFlight`. @@ -142,6 +162,7 @@ impl AsyncICache { return None; } let result = self + .inner .inode_table .read_async(&ino, |_, state| match state { IcbState::Available(icb) => Some(f(icb)), @@ -169,6 +190,7 @@ impl AsyncICache { return None; } let result = self + .inner .inode_table .update_async(&ino, |_, state| match state { IcbState::Available(icb) => Some(f(icb)), @@ -190,7 +212,7 @@ impl AsyncICache { use scc::hash_map::Entry; let mut icb = Some(icb); loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Vacant(vac) => { let val = icb .take() @@ -233,7 +255,7 @@ impl AsyncICache { let mut then_fn = Some(then); loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => match occ.get_mut() { IcbState::Available(icb) => { let t = then_fn @@ -270,7 +292,7 @@ impl AsyncICache { /// `forget` has already removed. async fn write_back_if_present(&self, ino: Inode, icb: R::Icb) { use scc::hash_map::Entry; - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => { *occ.get_mut() = IcbState::Available(icb); } @@ -304,6 +326,7 @@ impl AsyncICache { // Fast path: Available and fully resolved { let hit = self + .inner .inode_table .read_async(&ino, |_, s| match s { IcbState::Available(icb) if !icb.needs_resolve() => { @@ -322,7 +345,7 @@ impl AsyncICache { // Slow path: missing, InFlight, or stub needing resolution loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => match occ.get_mut() { IcbState::Available(icb) if !icb.needs_resolve() => { let t = then_fn @@ -340,7 +363,7 @@ impl AsyncICache { let fallback = stub.clone(); drop(occ); // release shard lock before awaiting - match self.resolver.resolve(ino, Some(stub), self).await { + match self.inner.resolver.resolve(ino, Some(stub), self).await { Ok(icb) => { let t = then_fn.take().unwrap_or_else(|| { unreachable!("then_fn consumed more than once") @@ -354,7 +377,7 @@ impl AsyncICache { if fallback.rc() > 0 { self.write_back_if_present(ino, fallback).await; } else { - self.inode_table.remove_async(&ino).await; + self.inner.inode_table.remove_async(&ino).await; } drop(tx); return Err(e); @@ -371,7 +394,7 @@ impl AsyncICache { let (tx, rx) = watch::channel(()); vac.insert_entry(IcbState::InFlight(rx)); - match self.resolver.resolve(ino, None, self).await { + match self.inner.resolver.resolve(ino, None, self).await { Ok(icb) => { let t = then_fn .take() @@ -382,7 +405,7 @@ impl AsyncICache { return Ok(result); } Err(e) => { - self.inode_table.remove_async(&ino).await; + self.inner.inode_table.remove_async(&ino).await; drop(tx); return Err(e); } @@ -409,6 +432,7 @@ impl AsyncICache { return None; } let result = self + .inner .inode_table .update_async(&ino, |_, state| match state { IcbState::Available(icb) => { @@ -441,7 +465,7 @@ impl AsyncICache { use scc::hash_map::Entry; loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => match occ.get_mut() { IcbState::Available(icb) => { if icb.rc() <= nlookups { @@ -470,7 +494,8 @@ impl AsyncICache { /// Synchronous mutable access to an `Available` entry. /// Does **not** wait for `InFlight`. Intended for initialization. pub fn get_icb_mut_sync(&self, ino: Inode, f: impl FnOnce(&mut R::Icb) -> T) -> Option { - self.inode_table + self.inner + .inode_table .update_sync(&ino, |_, state| match state { IcbState::Available(icb) => Some(f(icb)), IcbState::InFlight(_) => None, @@ -478,10 +503,47 @@ impl AsyncICache { .flatten() } + /// Concurrently resolve multiple inodes, best-effort. + /// + /// Each inode is resolved via [`get_or_resolve`] which handles + /// `InFlight`-coalescing and stub-upgrade logic. Errors are silently + /// ignored because prefetch is a performance optimization: a failure + /// simply means the subsequent access will pay the full API latency. + pub async fn prefetch(&self, inodes: impl IntoIterator) { + let futs: Vec<_> = inodes + .into_iter() + .map(|ino| async move { + drop(self.get_or_resolve(ino, |_| ()).await); + }) + .collect(); + join_all(futs).await; + } + + /// Fire-and-forget variant of [`prefetch`](Self::prefetch). + /// + /// Spawns the prefetch work on the tokio runtime and returns immediately, + /// so the caller is never blocked waiting for API responses. + pub fn spawn_prefetch(&self, inodes: impl IntoIterator) + where + R: 'static, + R::Icb: 'static, + R::Error: 'static, + { + let inodes: Vec<_> = inodes.into_iter().collect(); + if inodes.is_empty() { + return; + } + let cache = self.clone(); + tokio::spawn(async move { + cache.prefetch(inodes).await; + }); + } + /// Iterate over all `Available` entries (skips `InFlight`). /// Async-safe iteration using `iter_async` to avoid contention on single-threaded runtimes. pub async fn for_each(&self, mut f: impl FnMut(&Inode, &R::Icb)) { - self.inode_table + self.inner + .inode_table .iter_async(|ino, state| { if let IcbState::Available(icb) = state { f(ino, icb); @@ -693,6 +755,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -717,6 +780,7 @@ mod tests { // Complete the InFlight from the resolver side (write directly) cache + .inner .inode_table .upsert_async( 42, @@ -921,6 +985,7 @@ mod tests { // Directly insert an InFlight entry for testing iteration let (_tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -941,12 +1006,14 @@ mod tests { // Insert InFlight manually, then immediately complete before anyone waits let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; // Complete before any waiter (simulate resolver by writing directly) cache + .inner .inode_table .upsert_async( 42, @@ -1129,6 +1196,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -1141,6 +1209,7 @@ mod tests { // Simulate resolver completing (write directly to inode_table) cache + .inner .inode_table .upsert_async( 42, @@ -1200,6 +1269,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -1209,6 +1279,7 @@ mod tests { // Simulate resolver completing by writing directly to inode_table cache + .inner .inode_table .upsert_async( 42, @@ -1236,6 +1307,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -1244,7 +1316,7 @@ mod tests { let handle = tokio::spawn(async move { cache2.inc_rc(42).await }); // Evict instead of completing - cache.inode_table.remove_async(&42).await; + cache.inner.inode_table.remove_async(&42).await; drop(tx); let result = handle @@ -1300,6 +1372,7 @@ mod tests { // Phase 1: insert an InFlight entry. let (tx1, rx1) = watch::channel(()); cache + .inner .inode_table .upsert_async(ino, IcbState::InFlight(rx1)) .await; @@ -1317,6 +1390,7 @@ mod tests { // re-entering InFlight for a second resolution). let (tx2, rx2) = watch::channel(()); cache + .inner .inode_table .upsert_async(ino, IcbState::InFlight(rx2)) .await; @@ -1329,6 +1403,7 @@ mod tests { // Phase 2 complete: write the final resolved value. cache + .inner .inode_table .upsert_async( ino, @@ -1393,7 +1468,7 @@ mod tests { // forget to proceed. Instead, remove the InFlight entry directly to // simulate a concurrent eviction (e.g., by another path that already // removed the entry). - cache.inode_table.remove_async(&ino).await; + cache.inner.inode_table.remove_async(&ino).await; // Let the resolver finish. proceed.notify_one(); @@ -1407,4 +1482,128 @@ mod tests { "evicted entry must not be resurrected after resolution completes" ); } + + #[tokio::test] + async fn prefetch_resolves_multiple_inodes() { + let resolver = TestResolver::new(); + for ino in [10, 11, 12] { + resolver.add( + ino, + TestIcb { + rc: 0, + path: format!("/dir{ino}").into(), + resolved: true, + }, + ); + } + let cache = test_cache_with(resolver); + + for ino in [10, 11, 12] { + cache + .insert_icb( + ino, + TestIcb { + rc: 0, + path: format!("/dir{ino}").into(), + resolved: false, + }, + ) + .await; + } + + cache.prefetch([10, 11, 12]).await; + + for ino in [10, 11, 12] { + let is_resolved = cache.get_icb(ino, |icb| icb.resolved).await; + assert_eq!( + is_resolved, + Some(true), + "inode {ino} should be resolved after prefetch" + ); + } + } + + #[tokio::test] + async fn prefetch_skips_already_resolved() { + let cache = test_cache(); + // Root (ino=1) is already resolved -- prefetch should be a no-op + cache.prefetch([1]).await; + let path = cache.get_icb(1, |icb| icb.path.clone()).await; + assert_eq!(path, Some(PathBuf::from("/root"))); + } + + #[tokio::test] + async fn prefetch_ignores_errors() { + let resolver = TestResolver::new(); + resolver.add( + 10, + TestIcb { + rc: 0, + path: "/ok".into(), + resolved: true, + }, + ); + resolver.add_err(11, "network error"); + let cache = test_cache_with(resolver); + + for (ino, path) in [(10, "/stub10"), (11, "/stub11")] { + cache + .insert_icb( + ino, + TestIcb { + rc: 0, + path: path.into(), + resolved: false, + }, + ) + .await; + } + + cache.prefetch([10, 11]).await; + + let is_resolved = cache.get_icb(10, |icb| icb.resolved).await; + assert_eq!(is_resolved, Some(true), "ino 10 should be resolved"); + // ino 11 failed with rc=0 -> removed by get_or_resolve error path + assert!( + !cache.contains(11), + "ino 11 should be removed after error with rc=0" + ); + } + + #[tokio::test] + async fn prefetch_does_not_modify_rc() { + let resolver = TestResolver::new(); + resolver.add( + 10, + TestIcb { + rc: 0, + path: "/dir".into(), + resolved: true, + }, + ); + let cache = test_cache_with(resolver); + + cache + .insert_icb( + 10, + TestIcb { + rc: 0, + path: "/stub".into(), + resolved: false, + }, + ) + .await; + + cache.prefetch([10]).await; + + let rc = cache.get_icb(10, IcbLike::rc).await; + assert_eq!(rc, Some(0), "prefetch must not modify rc"); + } + + #[tokio::test] + async fn prefetch_empty_is_noop() { + let cache = test_cache(); + cache.prefetch(std::iter::empty()).await; + assert_eq!(cache.inode_count(), 1, "empty prefetch changes nothing"); + } } diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 15f1f5d..4694f7d 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -150,6 +150,14 @@ impl> MescloudICache { self.inner.get_or_resolve(ino, then).await } + pub fn spawn_prefetch(&self, inodes: impl IntoIterator) + where + R: 'static, + R::Error: 'static, + { + self.inner.spawn_prefetch(inodes); + } + // -- Domain-specific -- /// Allocate a new inode number. diff --git a/src/fs/mescloud/org.rs b/src/fs/mescloud/org.rs index 968c748..2e4e976 100644 --- a/src/fs/mescloud/org.rs +++ b/src/fs/mescloud/org.rs @@ -526,6 +526,18 @@ impl Fs for OrgFs { }); } + let prefetch_count = entries + .iter() + .filter_map(|e| self.composite.child_inodes.get(&e.ino).copied()) + .inspect(|&idx| self.composite.slots[idx].inner.prefetch_root()) + .count(); + if prefetch_count > 0 { + trace!( + count = prefetch_count, + "readdir: prefetching repo root directories" + ); + } + self.composite.readdir_buf = entries; Ok(&self.composite.readdir_buf) } diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 0d22196..69a2e51 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -258,6 +258,12 @@ impl RepoFs { let joined: PathBuf = components.iter().collect(); joined.to_str().map(String::from) } + + /// Spawn a background task to prefetch the root directory listing so + /// subsequent readdir hits cache. + pub(crate) fn prefetch_root(&self) { + self.icache.spawn_prefetch([Self::ROOT_INO]); + } } #[async_trait::async_trait] @@ -367,6 +373,20 @@ impl Fs for RepoFs { }); } + let subdir_inodes: Vec = entries + .iter() + .filter(|e| e.kind == DirEntryType::Directory) + .map(|e| e.ino) + .collect(); + if !subdir_inodes.is_empty() { + trace!( + ino, + subdir_count = subdir_inodes.len(), + "readdir: prefetching subdirectory children" + ); + self.icache.spawn_prefetch(subdir_inodes); + } + self.readdir_buf = entries; Ok(&self.readdir_buf) } From 3d4b62f0e3f2ea8390abd924d4c69e2cec446b1d Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Tue, 10 Feb 2026 23:47:40 -0800 Subject: [PATCH 02/11] bounded concurrency --- src/fs/fuser.rs | 11 ++++++++++- src/fs/icache/async_cache.rs | 17 +++++++++++------ src/fs/mescloud/mod.rs | 11 +++++++++++ src/fs/trait.rs | 4 ++++ src/trc.rs | 2 +- 5 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/fs/fuser.rs b/src/fs/fuser.rs index 86ddabb..5ffa43c 100644 --- a/src/fs/fuser.rs +++ b/src/fs/fuser.rs @@ -145,7 +145,7 @@ where } } -impl fuser::Filesystem for FuserAdapter +impl fuser::Filesystem for FuserAdapter where F::LookupError: Into, F::GetAttrError: Into, @@ -154,6 +154,15 @@ where F::ReaddirError: Into, F::ReleaseError: Into, { + fn init( + &mut self, + _req: &fuser::Request<'_>, + _config: &mut fuser::KernelConfig, + ) -> Result<(), libc::c_int> { + self.runtime.block_on(self.fs.init()); + Ok(()) + } + #[instrument(name = "FuserAdapter::lookup", skip(self, _req, reply))] fn lookup( &mut self, diff --git a/src/fs/icache/async_cache.rs b/src/fs/icache/async_cache.rs index 0969d3b..e1cc0e0 100644 --- a/src/fs/icache/async_cache.rs +++ b/src/fs/icache/async_cache.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::sync::Arc; -use futures::future::join_all; +use futures::StreamExt as _; use scc::HashMap as ConcurrentHashMap; use tokio::sync::watch; @@ -503,20 +503,25 @@ impl AsyncICache { .flatten() } + /// Maximum number of concurrent resolver RPCs during prefetch. + /// Bounds the fan-out so a large directory doesn't overwhelm the backend. + const MAX_PREFETCH_CONCURRENCY: usize = 8; + /// Concurrently resolve multiple inodes, best-effort. /// /// Each inode is resolved via [`get_or_resolve`] which handles /// `InFlight`-coalescing and stub-upgrade logic. Errors are silently /// ignored because prefetch is a performance optimization: a failure /// simply means the subsequent access will pay the full API latency. + /// + /// Concurrency is bounded to [`MAX_PREFETCH_CONCURRENCY`](Self::MAX_PREFETCH_CONCURRENCY) + /// to avoid overwhelming the backend with a burst of RPCs. pub async fn prefetch(&self, inodes: impl IntoIterator) { - let futs: Vec<_> = inodes - .into_iter() - .map(|ino| async move { + futures::stream::iter(inodes) + .for_each_concurrent(Self::MAX_PREFETCH_CONCURRENCY, |ino| async move { drop(self.get_or_resolve(ino, |_| ()).await); }) - .collect(); - join_all(futs).await; + .await; } /// Fire-and-forget variant of [`prefetch`](Self::prefetch). diff --git a/src/fs/mescloud/mod.rs b/src/fs/mescloud/mod.rs index 0e32933..ef0a66a 100644 --- a/src/fs/mescloud/mod.rs +++ b/src/fs/mescloud/mod.rs @@ -260,6 +260,17 @@ impl Fs for MesaFS { type ReaddirError = ReadDirError; type ReleaseError = ReleaseError; + #[instrument(name = "MesaFS::init", skip(self))] + async fn init(&mut self) { + for slot in &mut self.composite.slots { + if slot.inner.name() == "github" { + continue; + } + trace!(org = slot.inner.name(), "prefetching org repo listing"); + drop(slot.inner.readdir(OrgFs::ROOT_INO).await); + } + } + #[instrument(name = "MesaFS::lookup", skip(self))] async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; diff --git a/src/fs/trait.rs b/src/fs/trait.rs index f4d9852..7a56e57 100644 --- a/src/fs/trait.rs +++ b/src/fs/trait.rs @@ -328,6 +328,10 @@ pub trait Fs { type ReaddirError: std::error::Error; type ReleaseError: std::error::Error; + /// Called once after mount, before any FUSE operations. + /// Override to perform startup work like prefetching. + async fn init(&mut self) {} + /// For each lookup call made by the kernel, it expects the icache to be updated with the /// returned `FileAttr`. async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result; diff --git a/src/trc.rs b/src/trc.rs index a504362..a8abbc5 100644 --- a/src/trc.rs +++ b/src/trc.rs @@ -139,7 +139,7 @@ impl Trc { match self.mode { TrcMode::丑 { .. } => { - let indicatif_layer = IndicatifLayer::new(); + let indicatif_layer = IndicatifLayer::new().with_max_progress_bars(20, None); let pretty_with_indicatif: BoxedFmtLayer = Box::new( tracing_subscriber::fmt::layer() .with_ansi(use_ansi) From 692b7058817df4e91d872496d8ffa86b5982c0d7 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 00:32:19 -0800 Subject: [PATCH 03/11] feat: add concurrent child-by-name index for thread-safe ensure_child_ino --- src/fs/mescloud/icache.rs | 136 +++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 38 deletions(-) diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 4694f7d..85edec9 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -1,8 +1,10 @@ //! Mescloud-specific inode control block, helpers, and directory cache wrapper. -use std::ffi::OsStr; +use std::ffi::{OsStr, OsString}; use std::time::SystemTime; +use scc::HashMap as ConcurrentHashMap; + use crate::fs::icache::{AsyncICache, IcbLike, IcbResolver, InodeFactory}; use crate::fs::r#trait::{ CommonFileAttr, DirEntryType, FileAttr, FilesystemStats, Inode, Permissions, @@ -80,6 +82,10 @@ pub fn make_common_file_attr( pub struct MescloudICache> { inner: AsyncICache, inode_factory: InodeFactory, + /// O(1) lookup from (parent inode, child name) to child inode. + /// Maintained alongside the inode table to avoid linear scans in + /// `ensure_child_ino` and to provide atomicity under concurrent access. + child_index: ConcurrentHashMap<(Inode, OsString), Inode>, fs_owner: (u32, u32), block_size: u32, } @@ -90,6 +96,7 @@ impl> MescloudICache { let cache = Self { inner: AsyncICache::new(resolver, root_ino, "/"), inode_factory: InodeFactory::new(root_ino + 1), + child_index: ConcurrentHashMap::new(), fs_owner, block_size, }; @@ -139,7 +146,15 @@ impl> MescloudICache { } pub async fn forget(&self, ino: Inode, nlookups: u64) -> Option { - self.inner.forget(ino, nlookups).await + let evicted = self.inner.forget(ino, nlookups).await; + if let Some(ref icb) = evicted + && let Some(parent) = icb.parent + { + self.child_index + .remove_async(&(parent, icb.path.as_os_str().to_os_string())) + .await; + } + evicted } pub async fn get_or_resolve( @@ -209,13 +224,14 @@ impl> MescloudICache { self.inner .for_each(|&ino, icb| { if icb.rc == 0 && icb.parent == Some(parent) { - to_evict.push(ino); + to_evict.push((ino, icb.path.as_os_str().to_os_string())); } }) .await; let mut evicted = Vec::new(); - for ino in to_evict { + for (ino, name) in to_evict { if self.inner.forget(ino, 0).await.is_some() { + self.child_index.remove_async(&(parent, name)).await; evicted.push(ino); } } @@ -226,48 +242,40 @@ impl> MescloudICache { /// If new, inserts a stub ICB (parent+path set, attr=None, children=None, rc=0). /// Does NOT bump rc. Returns the inode number. /// - /// # Safety invariant - /// - /// The `for_each` scan and `insert_icb` are **not** atomic. If two callers - /// race with the same `(parent, name)`, both may allocate distinct inodes - /// for the same logical child. This is currently safe because all callers - /// go through `&mut self` on the owning `Fs` implementation. + /// Thread-safe: uses `child_index` with `entry_async` for atomic + /// check-and-insert, so concurrent callers for the same (parent, name) + /// always receive the same inode. pub async fn ensure_child_ino(&self, parent: Inode, name: &OsStr) -> Inode { - // Search for existing child by parent + name - let mut existing_ino = None; - self.inner - .for_each(|&ino, icb| { - if icb.parent == Some(parent) && icb.path.as_os_str() == name { - existing_ino = Some(ino); - } - }) - .await; - - if let Some(ino) = existing_ino { - return ino; + use scc::hash_map::Entry; + + let key = (parent, name.to_os_string()); + match self.child_index.entry_async(key).await { + Entry::Occupied(occ) => *occ.get(), + Entry::Vacant(vac) => { + let ino = self.inode_factory.allocate(); + vac.insert_entry(ino); + self.inner + .insert_icb( + ino, + InodeControlBlock { + rc: 0, + path: name.into(), + parent: Some(parent), + attr: None, + children: None, + }, + ) + .await; + ino + } } - - // Allocate new inode and insert stub - let ino = self.inode_factory.allocate(); - self.inner - .insert_icb( - ino, - InodeControlBlock { - rc: 0, - path: name.into(), - parent: Some(parent), - attr: None, - children: None, - }, - ) - .await; - ino } } #[cfg(test)] mod tests { use std::future::Future; + use std::sync::Arc; use super::*; use crate::fs::icache::async_cache::AsyncICache; @@ -373,6 +381,35 @@ mod tests { MescloudICache::new(NoOpResolver, 1, (0, 0), 4096) } + #[tokio::test] + async fn ensure_child_ino_is_idempotent() { + let cache = test_mescloud_cache(); + let ino1 = cache.ensure_child_ino(1, OsStr::new("foo")).await; + let ino2 = cache.ensure_child_ino(1, OsStr::new("foo")).await; + assert_eq!(ino1, ino2, "same (parent, name) must return same inode"); + } + + #[tokio::test] + async fn ensure_child_ino_concurrent_same_child() { + let cache = Arc::new(test_mescloud_cache()); + let mut handles = Vec::new(); + for _ in 0..10 { + let c = Arc::clone(&cache); + handles.push(tokio::spawn(async move { + c.ensure_child_ino(1, OsStr::new("bar")).await + })); + } + let inodes: Vec = futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.unwrap_or_else(|e| panic!("task panicked: {e}"))) + .collect(); + assert!( + inodes.iter().all(|&i| i == inodes[0]), + "all concurrent calls must return the same inode: {inodes:?}" + ); + } + #[tokio::test] async fn evict_zero_rc_children_removes_stubs() { let cache = test_mescloud_cache(); @@ -442,4 +479,27 @@ mod tests { "child of different parent should survive" ); } + + #[tokio::test] + async fn evict_cleans_child_index() { + let cache = test_mescloud_cache(); + let ino1 = cache.ensure_child_ino(1, OsStr::new("temp")).await; + let evicted = cache.evict_zero_rc_children(1).await; + assert!(evicted.contains(&ino1)); + let ino2 = cache.ensure_child_ino(1, OsStr::new("temp")).await; + assert_ne!( + ino1, ino2, + "after eviction, a new inode should be allocated" + ); + } + + #[tokio::test] + async fn forget_cleans_child_index() { + let cache = test_mescloud_cache(); + let ino = cache.ensure_child_ino(1, OsStr::new("ephemeral")).await; + let evicted = cache.forget(ino, 0).await; + assert!(evicted.is_some()); + let ino2 = cache.ensure_child_ino(1, OsStr::new("ephemeral")).await; + assert_ne!(ino, ino2, "child_index should have been cleaned up"); + } } From 4b8c33d295ed5d18c10110aa51844d9984f1061c Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 00:41:14 -0800 Subject: [PATCH 04/11] refactor: make MescloudICache cloneable via Arc for background task access --- src/fs/mescloud/icache.rs | 98 +++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 30 deletions(-) diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 85edec9..a99d58c 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -1,6 +1,7 @@ //! Mescloud-specific inode control block, helpers, and directory cache wrapper. use std::ffi::{OsStr, OsString}; +use std::sync::Arc; use std::time::SystemTime; use scc::HashMap as ConcurrentHashMap; @@ -78,9 +79,8 @@ pub fn make_common_file_attr( } } -/// Mescloud-specific directory cache wrapper over `AsyncICache`. -pub struct MescloudICache> { - inner: AsyncICache, +struct MescloudICacheInner> { + cache: AsyncICache, inode_factory: InodeFactory, /// O(1) lookup from (parent inode, child name) to child inode. /// Maintained alongside the inode table to avoid linear scans in @@ -90,15 +90,33 @@ pub struct MescloudICache> { block_size: u32, } +/// Mescloud-specific directory cache wrapper over `AsyncICache`. +/// +/// Cheaply cloneable via `Arc` so background tasks (e.g. prefetch) can share +/// access to the cache and its domain-specific helpers. +pub struct MescloudICache> { + inner: Arc>, +} + +impl> Clone for MescloudICache { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + impl> MescloudICache { /// Create a new `MescloudICache`. Initializes root ICB (rc=1), caches root dir attr. pub fn new(resolver: R, root_ino: Inode, fs_owner: (u32, u32), block_size: u32) -> Self { let cache = Self { - inner: AsyncICache::new(resolver, root_ino, "/"), - inode_factory: InodeFactory::new(root_ino + 1), - child_index: ConcurrentHashMap::new(), - fs_owner, - block_size, + inner: Arc::new(MescloudICacheInner { + cache: AsyncICache::new(resolver, root_ino, "/"), + inode_factory: InodeFactory::new(root_ino + 1), + child_index: ConcurrentHashMap::new(), + fs_owner, + block_size, + }), }; // Set root directory attr synchronously during initialization @@ -106,7 +124,7 @@ impl> MescloudICache { let root_attr = FileAttr::Directory { common: make_common_file_attr(root_ino, 0o755, now, now, fs_owner, block_size), }; - cache.inner.get_icb_mut_sync(root_ino, |icb| { + cache.inner.cache.get_icb_mut_sync(root_ino, |icb| { icb.attr = Some(root_attr); }); @@ -116,7 +134,7 @@ impl> MescloudICache { // -- Delegated from AsyncICache (async) -- pub fn contains(&self, ino: Inode) -> bool { - self.inner.contains(ino) + self.inner.cache.contains(ino) } pub async fn get_icb( @@ -125,11 +143,11 @@ impl> MescloudICache { // `Sync` required: see comment on `AsyncICache::get_icb`. f: impl Fn(&InodeControlBlock) -> T + Send + Sync, ) -> Option { - self.inner.get_icb(ino, f).await + self.inner.cache.get_icb(ino, f).await } pub async fn insert_icb(&self, ino: Inode, icb: InodeControlBlock) { - self.inner.insert_icb(ino, icb).await; + self.inner.cache.insert_icb(ino, icb).await; } pub async fn entry_or_insert_icb( @@ -138,19 +156,23 @@ impl> MescloudICache { factory: impl FnOnce() -> InodeControlBlock, then: impl FnOnce(&mut InodeControlBlock) -> T, ) -> T { - self.inner.entry_or_insert_icb(ino, factory, then).await + self.inner + .cache + .entry_or_insert_icb(ino, factory, then) + .await } pub async fn inc_rc(&self, ino: Inode) -> Option { - self.inner.inc_rc(ino).await + self.inner.cache.inc_rc(ino).await } pub async fn forget(&self, ino: Inode, nlookups: u64) -> Option { - let evicted = self.inner.forget(ino, nlookups).await; + let evicted = self.inner.cache.forget(ino, nlookups).await; if let Some(ref icb) = evicted && let Some(parent) = icb.parent { - self.child_index + self.inner + .child_index .remove_async(&(parent, icb.path.as_os_str().to_os_string())) .await; } @@ -162,7 +184,7 @@ impl> MescloudICache { ino: Inode, then: impl FnOnce(&InodeControlBlock) -> T, ) -> Result { - self.inner.get_or_resolve(ino, then).await + self.inner.cache.get_or_resolve(ino, then).await } pub fn spawn_prefetch(&self, inodes: impl IntoIterator) @@ -170,22 +192,27 @@ impl> MescloudICache { R: 'static, R::Error: 'static, { - self.inner.spawn_prefetch(inodes); + self.inner.cache.spawn_prefetch(inodes); } // -- Domain-specific -- /// Allocate a new inode number. pub fn allocate_inode(&self) -> Inode { - self.inode_factory.allocate() + self.inner.inode_factory.allocate() } pub async fn get_attr(&self, ino: Inode) -> Option { - self.inner.get_icb(ino, |icb| icb.attr).await.flatten() + self.inner + .cache + .get_icb(ino, |icb| icb.attr) + .await + .flatten() } pub async fn cache_attr(&self, ino: Inode, attr: FileAttr) { self.inner + .cache .get_icb_mut(ino, |icb| { icb.attr = Some(attr); }) @@ -193,21 +220,21 @@ impl> MescloudICache { } pub fn fs_owner(&self) -> (u32, u32) { - self.fs_owner + self.inner.fs_owner } pub fn block_size(&self) -> u32 { - self.block_size + self.inner.block_size } pub fn statfs(&self) -> FilesystemStats { FilesystemStats { - block_size: self.block_size, - fragment_size: u64::from(self.block_size), + block_size: self.inner.block_size, + fragment_size: u64::from(self.inner.block_size), total_blocks: 0, free_blocks: 0, available_blocks: 0, - total_inodes: self.inner.inode_count() as u64, + total_inodes: self.inner.cache.inode_count() as u64, free_inodes: 0, available_inodes: 0, filesystem_id: 0, @@ -222,6 +249,7 @@ impl> MescloudICache { pub async fn evict_zero_rc_children(&self, parent: Inode) -> Vec { let mut to_evict = Vec::new(); self.inner + .cache .for_each(|&ino, icb| { if icb.rc == 0 && icb.parent == Some(parent) { to_evict.push((ino, icb.path.as_os_str().to_os_string())); @@ -230,8 +258,8 @@ impl> MescloudICache { .await; let mut evicted = Vec::new(); for (ino, name) in to_evict { - if self.inner.forget(ino, 0).await.is_some() { - self.child_index.remove_async(&(parent, name)).await; + if self.inner.cache.forget(ino, 0).await.is_some() { + self.inner.child_index.remove_async(&(parent, name)).await; evicted.push(ino); } } @@ -249,12 +277,13 @@ impl> MescloudICache { use scc::hash_map::Entry; let key = (parent, name.to_os_string()); - match self.child_index.entry_async(key).await { + match self.inner.child_index.entry_async(key).await { Entry::Occupied(occ) => *occ.get(), Entry::Vacant(vac) => { - let ino = self.inode_factory.allocate(); + let ino = self.inner.inode_factory.allocate(); vac.insert_entry(ino); self.inner + .cache .insert_icb( ino, InodeControlBlock { @@ -275,7 +304,6 @@ impl> MescloudICache { #[cfg(test)] mod tests { use std::future::Future; - use std::sync::Arc; use super::*; use crate::fs::icache::async_cache::AsyncICache; @@ -493,6 +521,16 @@ mod tests { ); } + #[test] + #[expect( + clippy::redundant_clone, + reason = "test exists to verify Clone is implemented" + )] + fn mescloud_icache_is_clone() { + let cache = test_mescloud_cache(); + let _clone = cache.clone(); + } + #[tokio::test] async fn forget_cleans_child_index() { let cache = test_mescloud_cache(); From 2ad6eeb96522d492becc4ae4b504d9d56371500b Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 00:47:52 -0800 Subject: [PATCH 05/11] feat: selective eviction that preserves prefetched children Replace evict_zero_rc_children calls with evict_stale_children in readdir paths. The new method only evicts rc=0 children whose names are NOT in the current directory listing, preserving prefetched grandchild inodes. --- src/fs/mescloud/composite.rs | 8 +++-- src/fs/mescloud/icache.rs | 59 ++++++++++++++++++++++++++++++++++++ src/fs/mescloud/repo.rs | 26 +++++++++++++--- 3 files changed, 85 insertions(+), 8 deletions(-) diff --git a/src/fs/mescloud/composite.rs b/src/fs/mescloud/composite.rs index 6dbac25..56bdef3 100644 --- a/src/fs/mescloud/composite.rs +++ b/src/fs/mescloud/composite.rs @@ -1,5 +1,5 @@ -use std::collections::HashMap; -use std::ffi::OsStr; +use std::collections::{HashMap, HashSet}; +use std::ffi::{OsStr, OsString}; use bytes::Bytes; use tracing::{instrument, trace, warn}; @@ -280,7 +280,9 @@ where .forward_or_insert_inode(ino, || unreachable!("readdir: ino should be mapped")); let inner_entries = self.slots[idx].inner.readdir(inner_ino).await?; let inner_entries: Vec = inner_entries.to_vec(); - let evicted = self.icache.evict_zero_rc_children(ino).await; + let current_names: HashSet = + inner_entries.iter().map(|e| e.name.clone()).collect(); + let evicted = self.icache.evict_stale_children(ino, ¤t_names).await; for evicted_ino in evicted { if let Some(slot) = self.inode_to_slot.remove(&evicted_ino) { self.slots[slot].bridge.remove_inode_by_left(evicted_ino); diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index a99d58c..7b2aba1 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -1,5 +1,6 @@ //! Mescloud-specific inode control block, helpers, and directory cache wrapper. +use std::collections::HashSet; use std::ffi::{OsStr, OsString}; use std::sync::Arc; use std::time::SystemTime; @@ -246,6 +247,7 @@ impl> MescloudICache { /// Evict all `Available` children of `parent` that have `rc == 0`. /// Returns the list of evicted inode numbers so callers can clean up /// associated state (e.g., bridge mappings, slot tracking). + #[cfg(test)] pub async fn evict_zero_rc_children(&self, parent: Inode) -> Vec { let mut to_evict = Vec::new(); self.inner @@ -266,6 +268,37 @@ impl> MescloudICache { evicted } + /// Evict rc=0 children of `parent` that are NOT in `current_names`. + /// Preserves prefetched children that are still part of the directory listing. + /// Returns the list of evicted inode numbers so callers can clean up + /// associated state (e.g., bridge mappings, slot tracking). + pub async fn evict_stale_children( + &self, + parent: Inode, + current_names: &HashSet, + ) -> Vec { + let mut to_evict = Vec::new(); + self.inner + .cache + .for_each(|&ino, icb| { + if icb.rc == 0 + && icb.parent == Some(parent) + && !current_names.contains(icb.path.as_os_str()) + { + to_evict.push((ino, icb.path.as_os_str().to_os_string())); + } + }) + .await; + let mut evicted = Vec::new(); + for (ino, name) in to_evict { + if self.inner.cache.forget(ino, 0).await.is_some() { + self.inner.child_index.remove_async(&(parent, name)).await; + evicted.push(ino); + } + } + evicted + } + /// Find an existing child by (parent, name) or allocate a new inode. /// If new, inserts a stub ICB (parent+path set, attr=None, children=None, rc=0). /// Does NOT bump rc. Returns the inode number. @@ -540,4 +573,30 @@ mod tests { let ino2 = cache.ensure_child_ino(1, OsStr::new("ephemeral")).await; assert_ne!(ino, ino2, "child_index should have been cleaned up"); } + + #[tokio::test] + #[expect( + clippy::similar_names, + reason = "bar_ino/baz_ino distinction is intentional" + )] + async fn evict_stale_children_preserves_current() { + use std::collections::HashSet; + + let cache = test_mescloud_cache(); + + // Insert children of root via ensure_child_ino (populates child_index) + let foo_ino = cache.ensure_child_ino(1, OsStr::new("foo")).await; + let bar_ino = cache.ensure_child_ino(1, OsStr::new("bar")).await; + let baz_ino = cache.ensure_child_ino(1, OsStr::new("baz")).await; + + // "foo" and "baz" are in the current listing; "bar" is stale + let current: HashSet = + ["foo", "baz"].iter().copied().map(OsString::from).collect(); + let evicted = cache.evict_stale_children(1, ¤t).await; + + assert_eq!(evicted, vec![bar_ino], "only stale 'bar' should be evicted"); + assert!(cache.contains(foo_ino), "current child 'foo' preserved"); + assert!(!cache.contains(bar_ino), "stale child 'bar' evicted"); + assert!(cache.contains(baz_ino), "current child 'baz' preserved"); + } } diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 69a2e51..1faeaec 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -2,6 +2,8 @@ //! //! This module directly accesses the mesa repo through the Rust SDK, on a per-repo basis. +use std::collections::HashSet; +use std::ffi::OsString; use std::future::Future; use std::{collections::HashMap, ffi::OsStr, path::PathBuf, time::SystemTime}; @@ -58,6 +60,12 @@ impl IcbResolver for RepoResolver { let stub = stub.ok_or(LookupError::InodeNotFound)?; let file_path = build_repo_path(stub.parent, &stub.path, cache, RepoFs::ROOT_INO).await; + trace!( + ino, + path = file_path.as_deref().unwrap_or(""), + "resolver: fetching content" + ); + // Non-root inodes must have a resolvable path. if stub.parent.is_some() && file_path.is_none() { return Err(LookupError::InodeNotFound); @@ -262,6 +270,7 @@ impl RepoFs { /// Spawn a background task to prefetch the root directory listing so /// subsequent readdir hits cache. pub(crate) fn prefetch_root(&self) { + trace!(repo = %self.repo_name, "prefetch_root: warming root directory cache"); self.icache.spawn_prefetch([Self::ROOT_INO]); } } @@ -343,7 +352,11 @@ impl Fs for RepoFs { "readdir: resolved directory listing from icache" ); - self.icache.evict_zero_rc_children(ino).await; + let current_names: HashSet = children + .iter() + .map(|(name, _)| OsString::from(name)) + .collect(); + self.icache.evict_stale_children(ino, ¤t_names).await; let mut entries = Vec::with_capacity(children.len()); for (name, kind) in &children { @@ -373,17 +386,20 @@ impl Fs for RepoFs { }); } - let subdir_inodes: Vec = entries + let subdir_entries: Vec<(Inode, &str)> = entries .iter() .filter(|e| e.kind == DirEntryType::Directory) - .map(|e| e.ino) + .map(|e| (e.ino, e.name.to_str().unwrap_or(""))) .collect(); - if !subdir_inodes.is_empty() { + if !subdir_entries.is_empty() { + let names: Vec<&str> = subdir_entries.iter().map(|(_, n)| *n).collect(); trace!( ino, - subdir_count = subdir_inodes.len(), + subdir_count = subdir_entries.len(), + ?names, "readdir: prefetching subdirectory children" ); + let subdir_inodes: Vec = subdir_entries.iter().map(|(ino, _)| *ino).collect(); self.icache.spawn_prefetch(subdir_inodes); } From 5ce94f1d0caa4e74437beefa84df969e90ec46d4 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 00:55:46 -0800 Subject: [PATCH 06/11] feat: add prefetch_readdir for full readdir-equivalent background prefetch --- src/fs/mescloud/icache.rs | 195 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 7b2aba1..eef9496 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -5,7 +5,9 @@ use std::ffi::{OsStr, OsString}; use std::sync::Arc; use std::time::SystemTime; +use futures::StreamExt as _; use scc::HashMap as ConcurrentHashMap; +use tracing::trace; use crate::fs::icache::{AsyncICache, IcbLike, IcbResolver, InodeFactory}; use crate::fs::r#trait::{ @@ -196,6 +198,99 @@ impl> MescloudICache { self.inner.cache.spawn_prefetch(inodes); } + /// Maximum concurrent readdir prefetches. + const MAX_READDIR_PREFETCH_CONCURRENCY: usize = 4; + + /// Perform readdir-equivalent work for a directory: resolve its ICB, + /// allocate inodes for children, cache directory attrs, and trigger + /// deeper prefetch for subdirectory children. + /// + /// This is the "true readdir prefetch" -- when the user later calls + /// `readdir(ino)`, all the expensive work is already done. + pub async fn prefetch_readdir(&self, ino: Inode) -> Result<(), R::Error> + where + R: 'static, + R::Error: 'static, + { + let children = self + .inner + .cache + .get_or_resolve(ino, |icb| icb.children.clone()) + .await?; + + let Some(children) = children else { + return Ok(()); // Not a directory + }; + + let mut subdir_inodes = Vec::new(); + let now = SystemTime::now(); + + for (name, kind) in &children { + let child_ino = self.ensure_child_ino(ino, OsStr::new(name)).await; + if *kind == DirEntryType::Directory { + let attr = FileAttr::Directory { + common: make_common_file_attr( + child_ino, + 0o755, + now, + now, + self.inner.fs_owner, + self.inner.block_size, + ), + }; + self.cache_attr(child_ino, attr).await; + subdir_inodes.push(child_ino); + } + } + + let subdir_count = subdir_inodes.len(); + + // Trigger deeper prefetch: resolve subdirectory ICBs (one more level) + if !subdir_inodes.is_empty() { + self.inner.cache.spawn_prefetch(subdir_inodes); + } + + trace!( + ino, + child_count = children.len(), + subdir_count, + "prefetch_readdir: completed" + ); + + Ok(()) + } + + /// Fire-and-forget: spawn background tasks that perform full readdir + /// prefetch for the given directory inodes. + #[expect(dead_code, reason = "wired up in Task 5")] + pub fn spawn_prefetch_readdir(&self, inodes: impl IntoIterator) + where + R: 'static, + R::Error: 'static, + { + let inodes: Vec<_> = inodes.into_iter().collect(); + if inodes.is_empty() { + return; + } + trace!( + count = inodes.len(), + "spawn_prefetch_readdir: dispatching background task" + ); + let cache = self.clone(); + tokio::spawn(async move { + futures::stream::iter(inodes) + .for_each_concurrent(Self::MAX_READDIR_PREFETCH_CONCURRENCY, |ino| { + let cache = cache.clone(); + async move { + if cache.prefetch_readdir(ino).await.is_err() { + trace!(ino, "prefetch_readdir: failed (will retry on access)"); + } + } + }) + .await; + }); + } + // -- Domain-specific -- /// Allocate a new inode number. @@ -599,4 +694,104 @@ mod tests { assert!(!cache.contains(bar_ino), "stale child 'bar' evicted"); assert!(cache.contains(baz_ino), "current child 'baz' preserved"); } + + struct PrefetchTestResolver { + listings: std::sync::Mutex>>, + } + + impl PrefetchTestResolver { + fn new() -> Self { + Self { + listings: std::sync::Mutex::new(std::collections::HashMap::new()), + } + } + + fn add_dir(&self, ino: Inode, children: Vec<(String, DirEntryType)>) { + self.listings.lock().unwrap().insert(ino, children); + } + } + + impl IcbResolver for PrefetchTestResolver { + type Icb = InodeControlBlock; + type Error = super::super::common::LookupError; + + fn resolve( + &self, + ino: Inode, + stub: Option, + _cache: &AsyncICache, + ) -> impl Future> + Send + where + Self: Sized, + { + let listings = self.listings.lock().unwrap().clone(); + async move { + let stub = stub.ok_or(super::super::common::LookupError::InodeNotFound)?; + let children = listings.get(&ino).cloned(); + let now = SystemTime::now(); + let attr = if children.is_some() { + FileAttr::Directory { + common: make_common_file_attr(ino, 0o755, now, now, (0, 0), 4096), + } + } else { + FileAttr::RegularFile { + common: make_common_file_attr(ino, 0o644, now, now, (0, 0), 4096), + size: 42, + blocks: 1, + } + }; + Ok(InodeControlBlock { + parent: stub.parent, + path: stub.path, + rc: stub.rc, + attr: Some(attr), + children, + }) + } + } + } + + fn test_prefetch_cache(resolver: PrefetchTestResolver) -> MescloudICache { + MescloudICache::new(resolver, 1, (0, 0), 4096) + } + + #[tokio::test] + async fn prefetch_readdir_allocates_child_inodes_and_caches_dir_attrs() { + let resolver = PrefetchTestResolver::new(); + resolver.add_dir( + 1, + vec![ + ("src".to_owned(), DirEntryType::Directory), + ("README.md".to_owned(), DirEntryType::RegularFile), + ], + ); + + let cache = test_prefetch_cache(resolver); + + cache.prefetch_readdir(1).await.unwrap(); + + // Children should have inodes allocated + let src_ino = cache.ensure_child_ino(1, OsStr::new("src")).await; + let readme_ino = cache.ensure_child_ino(1, OsStr::new("README.md")).await; + + assert!(cache.contains(src_ino), "src should exist in cache"); + assert!( + cache.contains(readme_ino), + "README.md should exist in cache" + ); + + // Directory children should have cached attr + let src_attr = cache.get_attr(src_ino).await; + assert!( + matches!(src_attr, Some(FileAttr::Directory { .. })), + "src should have directory attr cached" + ); + + // File children should NOT have cached attr (avoids poisoning needs_resolve) + let readme_attr = cache.get_attr(readme_ino).await; + assert!( + readme_attr.is_none(), + "file children should not have attr cached by prefetch" + ); + } } From 2cf8569c02bf798f0b8fcc43006afd05930418e8 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 01:02:44 -0800 Subject: [PATCH 07/11] feat: wire up prefetch_readdir in RepoFs and OrgFs --- src/fs/mescloud/icache.rs | 9 --------- src/fs/mescloud/repo.rs | 17 +++++++---------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index eef9496..d1d7acc 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -190,14 +190,6 @@ impl> MescloudICache { self.inner.cache.get_or_resolve(ino, then).await } - pub fn spawn_prefetch(&self, inodes: impl IntoIterator) - where - R: 'static, - R::Error: 'static, - { - self.inner.cache.spawn_prefetch(inodes); - } - /// Maximum concurrent readdir prefetches. const MAX_READDIR_PREFETCH_CONCURRENCY: usize = 4; @@ -262,7 +254,6 @@ impl> MescloudICache { /// Fire-and-forget: spawn background tasks that perform full readdir /// prefetch for the given directory inodes. - #[expect(dead_code, reason = "wired up in Task 5")] pub fn spawn_prefetch_readdir(&self, inodes: impl IntoIterator) where R: 'static, diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 1faeaec..254fddc 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -271,7 +271,7 @@ impl RepoFs { /// subsequent readdir hits cache. pub(crate) fn prefetch_root(&self) { trace!(repo = %self.repo_name, "prefetch_root: warming root directory cache"); - self.icache.spawn_prefetch([Self::ROOT_INO]); + self.icache.spawn_prefetch_readdir([Self::ROOT_INO]); } } @@ -386,21 +386,18 @@ impl Fs for RepoFs { }); } - let subdir_entries: Vec<(Inode, &str)> = entries + let subdir_inodes: Vec = entries .iter() .filter(|e| e.kind == DirEntryType::Directory) - .map(|e| (e.ino, e.name.to_str().unwrap_or(""))) + .map(|e| e.ino) .collect(); - if !subdir_entries.is_empty() { - let names: Vec<&str> = subdir_entries.iter().map(|(_, n)| *n).collect(); + if !subdir_inodes.is_empty() { trace!( ino, - subdir_count = subdir_entries.len(), - ?names, - "readdir: prefetching subdirectory children" + subdir_count = subdir_inodes.len(), + "readdir: prefetching subdirectory readdirs" ); - let subdir_inodes: Vec = subdir_entries.iter().map(|(ino, _)| *ino).collect(); - self.icache.spawn_prefetch(subdir_inodes); + self.icache.spawn_prefetch_readdir(subdir_inodes); } self.readdir_buf = entries; From 85764a162d04d8800504520b40eaa8c7f2d12081 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 01:07:41 -0800 Subject: [PATCH 08/11] chore: remove unused evict_zero_rc_children All production callers now use evict_stale_children which preserves prefetched children. Update evict_cleans_child_index test to use evict_stale_children with an empty set instead. --- src/fs/mescloud/icache.rs | 97 +-------------------------------------- 1 file changed, 2 insertions(+), 95 deletions(-) diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index d1d7acc..b3728b9 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -330,30 +330,6 @@ impl> MescloudICache { } } - /// Evict all `Available` children of `parent` that have `rc == 0`. - /// Returns the list of evicted inode numbers so callers can clean up - /// associated state (e.g., bridge mappings, slot tracking). - #[cfg(test)] - pub async fn evict_zero_rc_children(&self, parent: Inode) -> Vec { - let mut to_evict = Vec::new(); - self.inner - .cache - .for_each(|&ino, icb| { - if icb.rc == 0 && icb.parent == Some(parent) { - to_evict.push((ino, icb.path.as_os_str().to_os_string())); - } - }) - .await; - let mut evicted = Vec::new(); - for (ino, name) in to_evict { - if self.inner.cache.forget(ino, 0).await.is_some() { - self.inner.child_index.remove_async(&(parent, name)).await; - evicted.push(ino); - } - } - evicted - } - /// Evict rc=0 children of `parent` that are NOT in `current_names`. /// Preserves prefetched children that are still part of the directory listing. /// Returns the list of evicted inode numbers so callers can clean up @@ -557,81 +533,12 @@ mod tests { ); } - #[tokio::test] - async fn evict_zero_rc_children_removes_stubs() { - let cache = test_mescloud_cache(); - - // Insert stubs as children of root (ino=1) with rc=0 - cache - .insert_icb( - 10, - InodeControlBlock { - rc: 0, - path: "child_a".into(), - parent: Some(1), - attr: None, - children: None, - }, - ) - .await; - cache - .insert_icb( - 11, - InodeControlBlock { - rc: 0, - path: "child_b".into(), - parent: Some(1), - attr: None, - children: None, - }, - ) - .await; - - // Insert a child with rc > 0 — should survive - cache - .insert_icb( - 12, - InodeControlBlock { - rc: 1, - path: "active".into(), - parent: Some(1), - attr: None, - children: None, - }, - ) - .await; - - // Insert a stub under a different parent — should survive - cache - .insert_icb( - 20, - InodeControlBlock { - rc: 0, - path: "other".into(), - parent: Some(12), - attr: None, - children: None, - }, - ) - .await; - - let evicted = cache.evict_zero_rc_children(1).await; - assert_eq!(evicted.len(), 2, "should evict 2 zero-rc children of root"); - - assert!(!cache.contains(10), "child_a should be evicted"); - assert!(!cache.contains(11), "child_b should be evicted"); - assert!(cache.contains(12), "active child should survive"); - assert!( - cache.contains(20), - "child of different parent should survive" - ); - } - #[tokio::test] async fn evict_cleans_child_index() { let cache = test_mescloud_cache(); let ino1 = cache.ensure_child_ino(1, OsStr::new("temp")).await; - let evicted = cache.evict_zero_rc_children(1).await; + // Use evict_stale_children with empty set (evicts all rc=0 children) + let evicted = cache.evict_stale_children(1, &HashSet::new()).await; assert!(evicted.contains(&ino1)); let ino2 = cache.ensure_child_ino(1, OsStr::new("temp")).await; assert_ne!( From cfd3fec31fa89f0b4e39670c9436d56415c420c7 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 01:15:09 -0800 Subject: [PATCH 09/11] feat: add tracing instrumentation to prefetch and eviction paths Add trace logging to ensure_child_ino (cache hit vs allocation), evict_stale_children (eviction counts), forget (child_index cleanup), and readdir eviction results in both RepoFs and CompositeFs. --- src/fs/mescloud/composite.rs | 7 +++++++ src/fs/mescloud/icache.rs | 16 +++++++++++++++- src/fs/mescloud/repo.rs | 9 ++++++++- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/fs/mescloud/composite.rs b/src/fs/mescloud/composite.rs index 56bdef3..0bd684f 100644 --- a/src/fs/mescloud/composite.rs +++ b/src/fs/mescloud/composite.rs @@ -283,6 +283,13 @@ where let current_names: HashSet = inner_entries.iter().map(|e| e.name.clone()).collect(); let evicted = self.icache.evict_stale_children(ino, ¤t_names).await; + if !evicted.is_empty() { + trace!( + ino, + evicted_count = evicted.len(), + "delegated_readdir: evicted stale children" + ); + } for evicted_ino in evicted { if let Some(slot) = self.inode_to_slot.remove(&evicted_ino) { self.slots[slot].bridge.remove_inode_by_left(evicted_ino); diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index b3728b9..8d77bad 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -174,6 +174,7 @@ impl> MescloudICache { if let Some(ref icb) = evicted && let Some(parent) = icb.parent { + trace!(ino, parent, path = %icb.path.display(), "forget: cleaning up child_index"); self.inner .child_index .remove_async(&(parent, icb.path.as_os_str().to_os_string())) @@ -358,6 +359,14 @@ impl> MescloudICache { evicted.push(ino); } } + if !evicted.is_empty() { + trace!( + parent, + evicted_count = evicted.len(), + current_count = current_names.len(), + "evict_stale_children: removed stale entries" + ); + } evicted } @@ -373,7 +382,11 @@ impl> MescloudICache { let key = (parent, name.to_os_string()); match self.inner.child_index.entry_async(key).await { - Entry::Occupied(occ) => *occ.get(), + Entry::Occupied(occ) => { + let ino = *occ.get(); + trace!(parent, ?name, ino, "ensure_child_ino: cache hit"); + ino + } Entry::Vacant(vac) => { let ino = self.inner.inode_factory.allocate(); vac.insert_entry(ino); @@ -390,6 +403,7 @@ impl> MescloudICache { }, ) .await; + trace!(parent, ?name, ino, "ensure_child_ino: allocated new inode"); ino } } diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 254fddc..2196528 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -356,7 +356,14 @@ impl Fs for RepoFs { .iter() .map(|(name, _)| OsString::from(name)) .collect(); - self.icache.evict_stale_children(ino, ¤t_names).await; + let evicted = self.icache.evict_stale_children(ino, ¤t_names).await; + if !evicted.is_empty() { + trace!( + ino, + evicted_count = evicted.len(), + "readdir: evicted stale children" + ); + } let mut entries = Vec::with_capacity(children.len()); for (name, kind) in &children { From 942623f8251a3d74a88b02a6c4d37999274c2fbe Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 01:26:44 -0800 Subject: [PATCH 10/11] fix: handle stale child_index entries and upgrade prefetch error logging Two fixes for prefetch reliability: 1. ensure_child_ino now validates that the cached inode still exists in the inode_table before returning it. If a failed prefetch (rc=0) caused get_or_resolve to evict the entry from inode_table without cleaning up child_index, the stale mapping is removed and a fresh inode is allocated. This prevents cascading lookup failures. 2. Prefetch failures are now logged at warn! level (was trace!) so they're visible with normal log settings. Includes the error message for easier debugging. --- src/fs/mescloud/icache.rs | 69 +++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 8d77bad..358ac44 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -7,7 +7,7 @@ use std::time::SystemTime; use futures::StreamExt as _; use scc::HashMap as ConcurrentHashMap; -use tracing::trace; +use tracing::{trace, warn}; use crate::fs::icache::{AsyncICache, IcbLike, IcbResolver, InodeFactory}; use crate::fs::r#trait::{ @@ -258,7 +258,7 @@ impl> MescloudICache { pub fn spawn_prefetch_readdir(&self, inodes: impl IntoIterator) where R: 'static, - R::Error: 'static, + R::Error: std::fmt::Display + 'static, { let inodes: Vec<_> = inodes.into_iter().collect(); if inodes.is_empty() { @@ -274,8 +274,8 @@ impl> MescloudICache { .for_each_concurrent(Self::MAX_READDIR_PREFETCH_CONCURRENCY, |ino| { let cache = cache.clone(); async move { - if cache.prefetch_readdir(ino).await.is_err() { - trace!(ino, "prefetch_readdir: failed (will retry on access)"); + if let Err(e) = cache.prefetch_readdir(ino).await { + warn!(ino, error = %e, "prefetch_readdir: failed (will retry on access)"); } } }) @@ -380,31 +380,44 @@ impl> MescloudICache { pub async fn ensure_child_ino(&self, parent: Inode, name: &OsStr) -> Inode { use scc::hash_map::Entry; - let key = (parent, name.to_os_string()); - match self.inner.child_index.entry_async(key).await { - Entry::Occupied(occ) => { - let ino = *occ.get(); - trace!(parent, ?name, ino, "ensure_child_ino: cache hit"); - ino - } - Entry::Vacant(vac) => { - let ino = self.inner.inode_factory.allocate(); - vac.insert_entry(ino); - self.inner - .cache - .insert_icb( + loop { + let key = (parent, name.to_os_string()); + match self.inner.child_index.entry_async(key).await { + Entry::Occupied(occ) => { + let ino = *occ.get(); + if self.inner.cache.contains(ino) { + trace!(parent, ?name, ino, "ensure_child_ino: cache hit"); + return ino; + } + // Stale: inode was evicted (e.g. failed prefetch with rc=0). + // Remove the stale mapping and loop to reallocate. + warn!( + parent, + ?name, ino, - InodeControlBlock { - rc: 0, - path: name.into(), - parent: Some(parent), - attr: None, - children: None, - }, - ) - .await; - trace!(parent, ?name, ino, "ensure_child_ino: allocated new inode"); - ino + "ensure_child_ino: stale child_index entry, reallocating" + ); + drop(occ.remove_entry()); + } + Entry::Vacant(vac) => { + let ino = self.inner.inode_factory.allocate(); + vac.insert_entry(ino); + self.inner + .cache + .insert_icb( + ino, + InodeControlBlock { + rc: 0, + path: name.into(), + parent: Some(parent), + attr: None, + children: None, + }, + ) + .await; + trace!(parent, ?name, ino, "ensure_child_ino: allocated new inode"); + return ino; + } } } } From c3ab193ce07093673231b9523a6a87669491d033 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Wed, 11 Feb 2026 11:22:12 -0800 Subject: [PATCH 11/11] feat: enable parallel FUSE request handling via interior mutability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the entire Fs stack from &mut self to &self so FuserAdapter can spawn a tokio task per FUSE request instead of blocking the session loop with block_on. This unblocks concurrent request processing — a slow API call no longer stalls all other FUSE operations. Key changes: - Fs trait: &self, Send+Sync+'static bounds, readdir returns Vec - HashMapBridge: wrap BiMaps in std::sync::RwLock for interior mutability - CompositeFs: slots in tokio::sync::RwLock, inode maps in scc::HashMap - RepoFs/OrgFs/MesaFS: all mutable state behind concurrent containers - FuserAdapter: fs wrapped in Arc, each method spawns instead of block_on --- src/fs/fuser.rs | 211 +++++++++++++++++++---------------- src/fs/icache/async_cache.rs | 14 ++- src/fs/icache/bridge.rs | 106 +++++++++++------- src/fs/mescloud/composite.rs | 120 ++++++++++---------- src/fs/mescloud/mod.rs | 131 ++++++++++++---------- src/fs/mescloud/org.rs | 210 +++++++++++++++++++--------------- src/fs/mescloud/repo.rs | 48 ++++---- src/fs/trait.rs | 32 +++--- 8 files changed, 489 insertions(+), 383 deletions(-) diff --git a/src/fs/fuser.rs b/src/fs/fuser.rs index 5ffa43c..d1c9369 100644 --- a/src/fs/fuser.rs +++ b/src/fs/fuser.rs @@ -1,4 +1,5 @@ use std::ffi::OsStr; +use std::sync::Arc; use crate::fs::r#trait::{CommonFileAttr, DirEntryType, FileAttr, Fs, LockOwner, OpenFlags}; use tracing::{debug, error, instrument}; @@ -115,7 +116,7 @@ where F::ReaddirError: Into, F::ReleaseError: Into, { - fs: F, + fs: Arc, runtime: tokio::runtime::Handle, } @@ -141,11 +142,14 @@ where const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1); pub fn new(fs: F, runtime: tokio::runtime::Handle) -> Self { - Self { fs, runtime } + Self { + fs: Arc::new(fs), + runtime, + } } } -impl fuser::Filesystem for FuserAdapter +impl fuser::Filesystem for FuserAdapter where F::LookupError: Into, F::GetAttrError: Into, @@ -171,20 +175,22 @@ where name: &OsStr, reply: fuser::ReplyEntry, ) { - match self.runtime.block_on(self.fs.lookup(parent, name)) { - Ok(attr) => { - // TODO(markovejnovic): Passing generation = 0 here is a recipe for disaster. - // Someone with A LOT of files will likely see inode reuse which will lead to a - // disaster. - let f_attr: fuser::FileAttr = attr.into(); - debug!(?f_attr, "replying..."); - reply.entry(&Self::SHAMEFUL_TTL, &f_attr, 0); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let name = name.to_owned(); + let ttl = Self::SHAMEFUL_TTL; + self.runtime.spawn(async move { + match fs.lookup(parent, &name).await { + Ok(attr) => { + let f_attr: fuser::FileAttr = attr.into(); + debug!(?f_attr, "replying..."); + reply.entry(&ttl, &f_attr, 0); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::getattr", skip(self, _req, fh, reply))] @@ -195,16 +201,20 @@ where fh: Option, reply: fuser::ReplyAttr, ) { - match self.runtime.block_on(self.fs.getattr(ino, fh)) { - Ok(attr) => { - debug!(?attr, "replying..."); - reply.attr(&Self::SHAMEFUL_TTL, &attr.into()); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let ttl = Self::SHAMEFUL_TTL; + self.runtime.spawn(async move { + match fs.getattr(ino, fh).await { + Ok(attr) => { + debug!(?attr, "replying..."); + reply.attr(&ttl, &attr.into()); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::readdir", skip(self, _req, _fh, offset, reply))] @@ -216,54 +226,61 @@ where offset: i64, mut reply: fuser::ReplyDirectory, ) { - let entries = match self.runtime.block_on(self.fs.readdir(ino)) { - Ok(entries) => entries, - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); - return; - } - }; - - #[expect( - clippy::cast_possible_truncation, - reason = "fuser offset is i64 but always non-negative" - )] - for (i, entry) in entries - .iter() - .enumerate() - .skip(offset.cast_unsigned() as usize) - { - let kind: fuser::FileType = entry.kind.into(); - let Ok(idx): Result = (i + 1).try_into() else { - error!("Directory entry index {} too large for fuser", i + 1); - reply.error(libc::EIO); - return; + let fs = Arc::clone(&self.fs); + self.runtime.spawn(async move { + let entries = match fs.readdir(ino).await { + Ok(entries) => entries, + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + return; + } }; - debug!(?entry, "adding entry to reply..."); - if reply.add(entry.ino, idx, kind, &entry.name) { - debug!("buffer full for now, stopping readdir"); - break; + #[expect( + clippy::cast_possible_truncation, + reason = "fuser offset is i64 but always non-negative" + )] + for (i, entry) in entries + .iter() + .enumerate() + .skip(offset.cast_unsigned() as usize) + { + let kind: fuser::FileType = entry.kind.into(); + let Ok(idx): Result = (i + 1).try_into() else { + error!("Directory entry index {} too large for fuser", i + 1); + reply.error(libc::EIO); + return; + }; + + debug!(?entry, "adding entry to reply..."); + if reply.add(entry.ino, idx, kind, &entry.name) { + debug!("buffer full for now, stopping readdir"); + break; + } } - } - debug!("finalizing reply..."); - reply.ok(); + debug!("finalizing reply..."); + reply.ok(); + }); } #[instrument(name = "FuserAdapter::open", skip(self, _req, flags, reply))] fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - match self.runtime.block_on(self.fs.open(ino, flags.into())) { - Ok(open_file) => { - debug!(handle = open_file.handle, "replying..."); - reply.opened(open_file.handle, 0); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let flags: OpenFlags = flags.into(); + self.runtime.spawn(async move { + match fs.open(ino, flags).await { + Ok(open_file) => { + debug!(handle = open_file.handle, "replying..."); + reply.opened(open_file.handle, 0); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument( @@ -281,25 +298,24 @@ where lock_owner: Option, reply: fuser::ReplyData, ) { + let fs = Arc::clone(&self.fs); let flags: OpenFlags = flags.into(); let lock_owner = lock_owner.map(LockOwner); - match self.runtime.block_on(self.fs.read( - ino, - fh, - offset.cast_unsigned(), - size, - flags, - lock_owner, - )) { - Ok(data) => { - debug!(read_bytes = data.len(), "replying..."); - reply.data(&data); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + self.runtime.spawn(async move { + match fs + .read(ino, fh, offset.cast_unsigned(), size, flags, lock_owner) + .await + { + Ok(data) => { + debug!(read_bytes = data.len(), "replying..."); + reply.data(&data); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::release", skip(self, _req, _lock_owner, reply))] @@ -313,30 +329,35 @@ where flush: bool, reply: fuser::ReplyEmpty, ) { - match self - .runtime - .block_on(self.fs.release(ino, fh, flags.into(), flush)) - { - Ok(()) => { - debug!("replying ok"); - reply.ok(); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let flags: OpenFlags = flags.into(); + self.runtime.spawn(async move { + match fs.release(ino, fh, flags, flush).await { + Ok(()) => { + debug!("replying ok"); + reply.ok(); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::forget", skip(self, _req, nlookup))] fn forget(&mut self, _req: &fuser::Request<'_>, ino: u64, nlookup: u64) { - self.runtime.block_on(self.fs.forget(ino, nlookup)); + let fs = Arc::clone(&self.fs); + self.runtime.spawn(async move { + fs.forget(ino, nlookup).await; + }); } #[instrument(name = "FuserAdapter::statfs", skip(self, _req, _ino, reply))] fn statfs(&mut self, _req: &fuser::Request<'_>, _ino: u64, reply: fuser::ReplyStatfs) { - self.runtime.block_on(async { - match self.fs.statfs().await { + let fs = Arc::clone(&self.fs); + self.runtime.spawn(async move { + match fs.statfs().await { Ok(statvfs) => { debug!(?statvfs, "replying..."); reply.statfs( diff --git a/src/fs/icache/async_cache.rs b/src/fs/icache/async_cache.rs index e1cc0e0..488f4db 100644 --- a/src/fs/icache/async_cache.rs +++ b/src/fs/icache/async_cache.rs @@ -517,11 +517,18 @@ impl AsyncICache { /// Concurrency is bounded to [`MAX_PREFETCH_CONCURRENCY`](Self::MAX_PREFETCH_CONCURRENCY) /// to avoid overwhelming the backend with a burst of RPCs. pub async fn prefetch(&self, inodes: impl IntoIterator) { + let inodes: Vec<_> = inodes.into_iter().collect(); + trace!(count = inodes.len(), ?inodes, "prefetch: starting batch"); futures::stream::iter(inodes) .for_each_concurrent(Self::MAX_PREFETCH_CONCURRENCY, |ino| async move { - drop(self.get_or_resolve(ino, |_| ()).await); + if self.get_or_resolve(ino, |_| ()).await.is_err() { + trace!(ino, "prefetch: resolution failed (will retry on access)"); + } else { + trace!(ino, "prefetch: resolved successfully"); + } }) .await; + trace!("prefetch: batch complete"); } /// Fire-and-forget variant of [`prefetch`](Self::prefetch). @@ -538,6 +545,11 @@ impl AsyncICache { if inodes.is_empty() { return; } + trace!( + count = inodes.len(), + ?inodes, + "spawn_prefetch: dispatching background task" + ); let cache = self.clone(); tokio::spawn(async move { cache.prefetch(inodes).await; diff --git a/src/fs/icache/bridge.rs b/src/fs/icache/bridge.rs index e674a56..8de2deb 100644 --- a/src/fs/icache/bridge.rs +++ b/src/fs/icache/bridge.rs @@ -1,72 +1,100 @@ +use std::sync::RwLock; + use crate::fs::r#trait::{FileAttr, FileHandle, Inode}; /// Bidirectional bridge for both inodes and file handles between two Fs layers. /// /// Convention: **left = outer (caller), right = inner (callee)**. /// `forward(left)` → right, `backward(right)` → left. +/// +/// All methods take `&self` — interior mutability is provided by `RwLock`. pub struct HashMapBridge { - inode_map: bimap::BiMap, - fh_map: bimap::BiMap, + inode_map: RwLock>, + fh_map: RwLock>, } +#[expect( + clippy::expect_used, + reason = "RwLock poisoning is unrecoverable; panicking is the correct behavior" +)] impl HashMapBridge { pub fn new() -> Self { Self { - inode_map: bimap::BiMap::new(), - fh_map: bimap::BiMap::new(), + inode_map: RwLock::new(bimap::BiMap::new()), + fh_map: RwLock::new(bimap::BiMap::new()), } } - // ── Inode methods ──────────────────────────────────────────────────── - - pub fn insert_inode(&mut self, left: Inode, right: Inode) { - self.inode_map.insert(left, right); + pub fn insert_inode(&self, left: Inode, right: Inode) { + self.inode_map + .write() + .expect("poisoned") + .insert(left, right); } /// Look up right→left, or allocate a new left inode if unmapped. pub fn backward_or_insert_inode( - &mut self, + &self, right: Inode, allocate: impl FnOnce() -> Inode, ) -> Inode { - if let Some(&left) = self.inode_map.get_by_right(&right) { - left - } else { - let left = allocate(); - self.inode_map.insert(left, right); - left + // Fast path: read-only check. + if let Some(&left) = self + .inode_map + .read() + .expect("poisoned") + .get_by_right(&right) + { + return left; + } + // Slow path: acquire write lock and double-check. + let mut map = self.inode_map.write().expect("poisoned"); + if let Some(&left) = map.get_by_right(&right) { + return left; } + let left = allocate(); + map.insert(left, right); + left } /// Look up left→right, or allocate a new right inode if unmapped. - pub fn forward_or_insert_inode( - &mut self, - left: Inode, - allocate: impl FnOnce() -> Inode, - ) -> Inode { - if let Some(&right) = self.inode_map.get_by_left(&left) { - right - } else { - let right = allocate(); - self.inode_map.insert(left, right); - right + pub fn forward_or_insert_inode(&self, left: Inode, allocate: impl FnOnce() -> Inode) -> Inode { + // Fast path: read-only check. + if let Some(&right) = self.inode_map.read().expect("poisoned").get_by_left(&left) { + return right; } + // Slow path: acquire write lock and double-check. + let mut map = self.inode_map.write().expect("poisoned"); + if let Some(&right) = map.get_by_left(&left) { + return right; + } + let right = allocate(); + map.insert(left, right); + right } /// Remove an inode mapping by its left (outer) key. - pub fn remove_inode_by_left(&mut self, left: Inode) { - self.inode_map.remove_by_left(&left); + pub fn remove_inode_by_left(&self, left: Inode) { + self.inode_map + .write() + .expect("poisoned") + .remove_by_left(&left); } /// Look up left→right directly. - pub fn inode_map_get_by_left(&self, left: Inode) -> Option<&Inode> { - self.inode_map.get_by_left(&left) + pub fn inode_map_get_by_left(&self, left: Inode) -> Option { + self.inode_map + .read() + .expect("poisoned") + .get_by_left(&left) + .copied() } /// Rewrite the `ino` field in a [`FileAttr`] from right (inner) to left (outer) namespace. pub fn attr_backward(&self, attr: FileAttr) -> FileAttr { + let map = self.inode_map.read().expect("poisoned"); let backward = |ino: Inode| -> Inode { - if let Some(&left) = self.inode_map.get_by_right(&ino) { + if let Some(&left) = map.get_by_right(&ino) { left } else { tracing::warn!( @@ -79,19 +107,21 @@ impl HashMapBridge { rewrite_attr_ino(attr, backward) } - // ── File handle methods ────────────────────────────────────────────── - - pub fn insert_fh(&mut self, left: FileHandle, right: FileHandle) { - self.fh_map.insert(left, right); + pub fn insert_fh(&self, left: FileHandle, right: FileHandle) { + self.fh_map.write().expect("poisoned").insert(left, right); } pub fn fh_forward(&self, left: FileHandle) -> Option { - self.fh_map.get_by_left(&left).copied() + self.fh_map + .read() + .expect("poisoned") + .get_by_left(&left) + .copied() } /// Remove a file handle mapping by its left (outer) key. - pub fn remove_fh_by_left(&mut self, left: FileHandle) { - self.fh_map.remove_by_left(&left); + pub fn remove_fh_by_left(&self, left: FileHandle) { + self.fh_map.write().expect("poisoned").remove_by_left(&left); } } diff --git a/src/fs/mescloud/composite.rs b/src/fs/mescloud/composite.rs index 0bd684f..5bd534b 100644 --- a/src/fs/mescloud/composite.rs +++ b/src/fs/mescloud/composite.rs @@ -1,7 +1,7 @@ -use std::collections::{HashMap, HashSet}; -use std::ffi::{OsStr, OsString}; +use std::ffi::OsStr; use bytes::Bytes; +use scc::HashMap as ConcurrentHashMap; use tracing::{instrument, trace, warn}; use crate::fs::icache::bridge::HashMapBridge; @@ -36,18 +36,21 @@ pub(super) struct ChildSlot { /// maintains a bidirectional inode/file-handle bridge per child (see /// [`ChildSlot`]) to translate between the outer namespace visible to FUSE and /// each child's internal namespace. +/// +/// All methods take `&self` — interior mutability is provided by +/// `scc::HashMap` (for inode maps) and `tokio::sync::RwLock` (for the +/// growable slot vector). pub(super) struct CompositeFs where R: IcbResolver, { pub icache: MescloudICache, pub file_table: FileTable, - pub readdir_buf: Vec, /// Maps outer inode to index into `slots` for child-root inodes. - pub child_inodes: HashMap, + pub child_inodes: ConcurrentHashMap, /// Maps every translated outer inode to its owning slot index. - pub inode_to_slot: HashMap, - pub slots: Vec>, + pub inode_to_slot: ConcurrentHashMap, + pub slots: tokio::sync::RwLock>>, } impl CompositeFs @@ -67,31 +70,32 @@ where /// Look up which child slot owns an inode via direct map. #[instrument(name = "CompositeFs::slot_for_inode", skip(self))] pub fn slot_for_inode(&self, ino: Inode) -> Option { - self.inode_to_slot.get(&ino).copied() + self.inode_to_slot.read_sync(&ino, |_, &idx| idx) } /// Allocate an outer file handle and map it through the bridge. #[must_use] - pub fn alloc_fh(&mut self, slot_idx: usize, inner_fh: FileHandle) -> FileHandle { + pub fn alloc_fh(&self, slot: &ChildSlot, inner_fh: FileHandle) -> FileHandle { let fh = self.file_table.allocate(); - self.slots[slot_idx].bridge.insert_fh(fh, inner_fh); + slot.bridge.insert_fh(fh, inner_fh); fh } /// Translate an inner inode to an outer inode, allocating if needed. /// Also inserts a stub ICB into the outer icache when the inode is new. - #[instrument(name = "CompositeFs::translate_inner_ino", skip(self, name))] + #[instrument(name = "CompositeFs::translate_inner_ino", skip(self, slot, name))] pub async fn translate_inner_ino( - &mut self, + &self, slot_idx: usize, + slot: &ChildSlot, inner_ino: Inode, parent_outer_ino: Inode, name: &OsStr, ) -> Inode { - let outer_ino = self.slots[slot_idx] + let outer_ino = slot .bridge .backward_or_insert_inode(inner_ino, || self.icache.allocate_inode()); - self.inode_to_slot.insert(outer_ino, slot_idx); + let _ = self.inode_to_slot.insert_async(outer_ino, slot_idx).await; self.icache .entry_or_insert_icb( outer_ino, @@ -120,7 +124,7 @@ where /// Find slot, forward inode, delegate to inner, allocate outer file handle. #[instrument(name = "CompositeFs::delegated_open", skip(self))] pub async fn delegated_open( - &mut self, + &self, ino: Inode, flags: OpenFlags, ) -> Result { @@ -128,11 +132,12 @@ where warn!(ino, "open on inode not belonging to any child"); OpenError::InodeNotFound })?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("open: ino should be mapped")); - let inner_open = self.slots[idx].inner.open(inner_ino, flags).await?; - let outer_fh = self.alloc_fh(idx, inner_open.handle); + let inner_open = slots[idx].inner.open(inner_ino, flags).await?; + let outer_fh = self.alloc_fh(&slots[idx], inner_open.handle); trace!( ino, outer_fh, @@ -149,7 +154,7 @@ where #[expect(clippy::too_many_arguments, reason = "mirrors fuser read API")] #[instrument(name = "CompositeFs::delegated_read", skip(self))] pub async fn delegated_read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -161,14 +166,15 @@ where warn!(ino, "read on inode not belonging to any child"); ReadError::InodeNotFound })?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("read: ino should be mapped")); - let inner_fh = self.slots[idx].bridge.fh_forward(fh).ok_or_else(|| { + let inner_fh = slots[idx].bridge.fh_forward(fh).ok_or_else(|| { warn!(fh, "read: no fh mapping found"); ReadError::FileNotOpen })?; - self.slots[idx] + slots[idx] .inner .read(inner_ino, inner_fh, offset, size, flags, lock_owner) .await @@ -178,7 +184,7 @@ where /// then clean up the file handle mapping. #[instrument(name = "CompositeFs::delegated_release", skip(self))] pub async fn delegated_release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -188,18 +194,19 @@ where warn!(ino, "release on inode not belonging to any child"); ReleaseError::FileNotOpen })?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("release: ino should be mapped")); - let inner_fh = self.slots[idx].bridge.fh_forward(fh).ok_or_else(|| { + let inner_fh = slots[idx].bridge.fh_forward(fh).ok_or_else(|| { warn!(fh, "release: no fh mapping found"); ReleaseError::FileNotOpen })?; - let result = self.slots[idx] + let result = slots[idx] .inner .release(inner_ino, inner_fh, flags, flush) .await; - self.slots[idx].bridge.remove_fh_by_left(fh); + slots[idx].bridge.remove_fh_by_left(fh); trace!(ino, fh, "release: cleaned up fh mapping"); result } @@ -213,20 +220,21 @@ where /// evict the inner root, breaking all subsequent operations on that child. #[must_use] #[instrument(name = "CompositeFs::delegated_forget", skip(self))] - pub async fn delegated_forget(&mut self, ino: Inode, nlookups: u64) -> bool { + pub async fn delegated_forget(&self, ino: Inode, nlookups: u64) -> bool { let slot_idx = self.slot_for_inode(ino); - let is_child_root = self.child_inodes.contains_key(&ino); - if !is_child_root - && let Some(idx) = slot_idx - && let Some(&inner_ino) = self.slots[idx].bridge.inode_map_get_by_left(ino) - { - self.slots[idx].inner.forget(inner_ino, nlookups).await; + let is_child_root = self.child_inodes.read_sync(&ino, |_, _| ()).is_some(); + if !is_child_root && let Some(idx) = slot_idx { + let slots = self.slots.read().await; + if let Some(inner_ino) = slots[idx].bridge.inode_map_get_by_left(ino) { + slots[idx].inner.forget(inner_ino, nlookups).await; + } } if self.icache.forget(ino, nlookups).await.is_some() { - self.child_inodes.remove(&ino); - self.inode_to_slot.remove(&ino); + self.child_inodes.remove_async(&ino).await; + self.inode_to_slot.remove_async(&ino).await; if let Some(idx) = slot_idx { - self.slots[idx].bridge.remove_inode_by_left(ino); + let slots = self.slots.read().await; + slots[idx].bridge.remove_inode_by_left(ino); } true } else { @@ -243,23 +251,24 @@ where /// Delegation branch for lookup when the parent is owned by a child slot. #[instrument(name = "CompositeFs::delegated_lookup", skip(self, name))] pub async fn delegated_lookup( - &mut self, + &self, parent: Inode, name: &OsStr, ) -> Result { let idx = self .slot_for_inode(parent) .ok_or(LookupError::InodeNotFound)?; - let inner_parent = self.slots[idx] + let slots = self.slots.read().await; + let inner_parent = slots[idx] .bridge .forward_or_insert_inode(parent, || unreachable!("lookup: parent should be mapped")); - let inner_attr = self.slots[idx].inner.lookup(inner_parent, name).await?; + let inner_attr = slots[idx].inner.lookup(inner_parent, name).await?; let inner_ino = inner_attr.common().ino; - let outer_ino = self.translate_inner_ino(idx, inner_ino, parent, name).await; - let outer_attr = self.slots[idx].bridge.attr_backward(inner_attr); + let outer_ino = self + .translate_inner_ino(idx, &slots[idx], inner_ino, parent, name) + .await; + let outer_attr = slots[idx].bridge.attr_backward(inner_attr); self.icache.cache_attr(outer_ino, outer_attr).await; - // None means the entry was concurrently evicted; fail the lookup so - // the kernel doesn't hold a ref the cache no longer tracks. let rc = self .icache .inc_rc(outer_ino) @@ -271,16 +280,16 @@ where /// Delegation branch for readdir when the inode is owned by a child slot. #[instrument(name = "CompositeFs::delegated_readdir", skip(self))] - pub async fn delegated_readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + pub async fn delegated_readdir(&self, ino: Inode) -> Result, ReadDirError> { let idx = self .slot_for_inode(ino) .ok_or(ReadDirError::InodeNotFound)?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("readdir: ino should be mapped")); - let inner_entries = self.slots[idx].inner.readdir(inner_ino).await?; - let inner_entries: Vec = inner_entries.to_vec(); - let current_names: HashSet = + let inner_entries = slots[idx].inner.readdir(inner_ino).await?; + let current_names: std::collections::HashSet = inner_entries.iter().map(|e| e.name.clone()).collect(); let evicted = self.icache.evict_stale_children(ino, ¤t_names).await; if !evicted.is_empty() { @@ -291,18 +300,18 @@ where ); } for evicted_ino in evicted { - if let Some(slot) = self.inode_to_slot.remove(&evicted_ino) { - self.slots[slot].bridge.remove_inode_by_left(evicted_ino); + if let Some((_, slot)) = self.inode_to_slot.remove_async(&evicted_ino).await { + slots[slot].bridge.remove_inode_by_left(evicted_ino); } - self.child_inodes.remove(&evicted_ino); + self.child_inodes.remove_async(&evicted_ino).await; } let mut outer_entries = Vec::with_capacity(inner_entries.len()); for entry in &inner_entries { let outer_child_ino = self - .translate_inner_ino(idx, entry.ino, ino, &entry.name) + .translate_inner_ino(idx, &slots[idx], entry.ino, ino, &entry.name) .await; - if let Some(inner_attr) = self.slots[idx].inner.peek_attr(entry.ino).await { - let outer_attr = self.slots[idx].bridge.attr_backward(inner_attr); + if let Some(inner_attr) = slots[idx].inner.peek_attr(entry.ino).await { + let outer_attr = slots[idx].bridge.attr_backward(inner_attr); self.icache.cache_attr(outer_child_ino, outer_attr).await; } outer_entries.push(DirEntry { @@ -311,7 +320,6 @@ where kind: entry.kind, }); } - self.readdir_buf = outer_entries; - Ok(&self.readdir_buf) + Ok(outer_entries) } } diff --git a/src/fs/mescloud/mod.rs b/src/fs/mescloud/mod.rs index ef0a66a..06298af 100644 --- a/src/fs/mescloud/mod.rs +++ b/src/fs/mescloud/mod.rs @@ -1,10 +1,10 @@ -use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; use std::time::SystemTime; use bytes::Bytes; use mesa_dev::MesaClient; +use scc::HashMap as ConcurrentHashMap; use secrecy::ExposeSecret as _; use tracing::{Instrument as _, instrument, trace, warn}; @@ -117,11 +117,10 @@ impl MesaFS { Self::BLOCK_SIZE, ), file_table: FileTable::new(), - readdir_buf: Vec::new(), - child_inodes: HashMap::new(), - inode_to_slot: HashMap::new(), - slots: orgs - .map(|org_conf| { + child_inodes: ConcurrentHashMap::new(), + inode_to_slot: ConcurrentHashMap::new(), + slots: tokio::sync::RwLock::new( + orgs.map(|org_conf| { let client = MesaClient::builder() .with_api_key(org_conf.api_key.expose_secret()) .with_base_path(MESA_API_BASE_URL) @@ -133,6 +132,7 @@ impl MesaFS { } }) .collect(), + ), }, } } @@ -142,7 +142,12 @@ impl MesaFS { if ino == Self::ROOT_NODE_INO { return Some(InodeRole::Root); } - if self.composite.child_inodes.contains_key(&ino) { + if self + .composite + .child_inodes + .read_sync(&ino, |_, _| ()) + .is_some() + { return Some(InodeRole::OrgOwned); } if self.composite.slot_for_inode(ino).is_some() { @@ -154,39 +159,39 @@ impl MesaFS { /// Ensure a mesa-level inode exists for the org at `org_idx`. /// Seeds the bridge with (`mesa_org_ino`, `OrgFs::ROOT_INO`). /// Does NOT bump rc. - async fn ensure_org_inode(&mut self, org_idx: usize) -> (Inode, FileAttr) { + async fn ensure_org_inode(&self, org_idx: usize) -> (Inode, FileAttr) { // Check if an inode already exists. - let existing_ino = self - .composite + let mut existing_ino = None; + self.composite .child_inodes - .iter() - .find(|&(_, &idx)| idx == org_idx) - .map(|(&ino, _)| ino); + .any_async(|&ino, &idx| { + if idx == org_idx { + existing_ino = Some(ino); + true + } else { + false + } + }) + .await; - if let Some(existing_ino) = existing_ino { - if let Some(attr) = self.composite.icache.get_attr(existing_ino).await { + if let Some(ino) = existing_ino { + if let Some(attr) = self.composite.icache.get_attr(ino).await { let rc = self .composite .icache - .get_icb(existing_ino, |icb| icb.rc) + .get_icb(ino, |icb| icb.rc) .await .unwrap_or(0); - trace!( - ino = existing_ino, - org_idx, rc, "ensure_org_inode: reusing existing inode" - ); - return (existing_ino, attr); + trace!(ino, org_idx, rc, "ensure_org_inode: reusing existing inode"); + return (ino, attr); } - if self.composite.icache.contains(existing_ino) { + if self.composite.icache.contains(ino) { // ICB exists but attr missing — rebuild and cache. - warn!( - ino = existing_ino, - org_idx, "ensure_org_inode: attr missing, rebuilding" - ); + warn!(ino, org_idx, "ensure_org_inode: attr missing, rebuilding"); let now = SystemTime::now(); let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( - existing_ino, + ino, 0o755, now, now, @@ -194,20 +199,21 @@ impl MesaFS { self.composite.icache.block_size(), ), }; - self.composite.icache.cache_attr(existing_ino, attr).await; - return (existing_ino, attr); + self.composite.icache.cache_attr(ino, attr).await; + return (ino, attr); } // ICB was evicted — clean up stale tracking entries. warn!( - ino = existing_ino, + ino, org_idx, "ensure_org_inode: ICB evicted, cleaning up stale entry" ); - self.composite.child_inodes.remove(&existing_ino); - self.composite.inode_to_slot.remove(&existing_ino); + self.composite.child_inodes.remove_async(&ino).await; + self.composite.inode_to_slot.remove_async(&ino).await; } // Allocate new. - let org_name = self.composite.slots[org_idx].inner.name().to_owned(); + let slots = self.composite.slots.read().await; + let org_name = slots[org_idx].inner.name().to_owned(); let ino = self.composite.icache.allocate_inode(); trace!(ino, org_idx, org = %org_name, "ensure_org_inode: allocated new inode"); @@ -226,15 +232,22 @@ impl MesaFS { ) .await; - self.composite.child_inodes.insert(ino, org_idx); - self.composite.inode_to_slot.insert(ino, org_idx); + let _ = self.composite.child_inodes.insert_async(ino, org_idx).await; + let _ = self + .composite + .inode_to_slot + .insert_async(ino, org_idx) + .await; // Reset bridge (may have stale mappings from a previous eviction cycle) // and seed: mesa org-root <-> OrgFs::ROOT_INO. - self.composite.slots[org_idx].bridge = HashMapBridge::new(); - self.composite.slots[org_idx] - .bridge - .insert_inode(ino, OrgFs::ROOT_INO); + // Need write lock to replace the bridge itself. + drop(slots); + { + let mut slots = self.composite.slots.write().await; + slots[org_idx].bridge = HashMapBridge::new(); + slots[org_idx].bridge.insert_inode(ino, OrgFs::ROOT_INO); + } let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( @@ -261,8 +274,9 @@ impl Fs for MesaFS { type ReleaseError = ReleaseError; #[instrument(name = "MesaFS::init", skip(self))] - async fn init(&mut self) { - for slot in &mut self.composite.slots { + async fn init(&self) { + let slots = self.composite.slots.read().await; + for slot in slots.iter() { if slot.inner.name() == "github" { continue; } @@ -272,17 +286,17 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::lookup", skip(self))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; match role { InodeRole::Root => { let org_name = name.to_str().ok_or(LookupError::InodeNotFound)?; - let org_idx = self - .composite - .slots + let slots = self.composite.slots.read().await; + let org_idx = slots .iter() .position(|s| s.inner.name() == org_name) .ok_or(LookupError::InodeNotFound)?; + drop(slots); trace!(org = org_name, "lookup: matched org"); let (ino, attr) = self.ensure_org_inode(org_idx).await; @@ -300,26 +314,22 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::getattr", skip(self))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.composite.delegated_getattr(ino).await } #[instrument(name = "MesaFS::readdir", skip(self))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { let role = self.inode_role(ino).ok_or(ReadDirError::InodeNotFound)?; match role { InodeRole::Root => { - let org_info: Vec<(usize, String)> = self - .composite - .slots + let slots = self.composite.slots.read().await; + let org_info: Vec<(usize, String)> = slots .iter() .enumerate() .map(|(idx, s)| (idx, s.inner.name().to_owned())) .collect(); + drop(slots); let mut entries = Vec::with_capacity(org_info.len()); for (org_idx, name) in &org_info { @@ -332,21 +342,20 @@ impl Fs for MesaFS { } trace!(entry_count = entries.len(), "readdir: listing orgs"); - self.composite.readdir_buf = entries; - Ok(&self.composite.readdir_buf) + Ok(entries) } InodeRole::OrgOwned => self.composite.delegated_readdir(ino).await, } } #[instrument(name = "MesaFS::open", skip(self))] - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result { self.composite.delegated_open(ino, flags).await } #[instrument(name = "MesaFS::read", skip(self))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -361,7 +370,7 @@ impl Fs for MesaFS { #[instrument(name = "MesaFS::release", skip(self))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -373,12 +382,12 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::forget", skip(self))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { // MesaFS has no extra state to clean up on eviction (unlike OrgFs::owner_inodes). let _ = self.composite.delegated_forget(ino, nlookups).await; } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.composite.delegated_statfs()) } } diff --git a/src/fs/mescloud/org.rs b/src/fs/mescloud/org.rs index 2e4e976..255c489 100644 --- a/src/fs/mescloud/org.rs +++ b/src/fs/mescloud/org.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; use std::time::SystemTime; @@ -6,6 +5,7 @@ use std::time::SystemTime; use bytes::Bytes; use futures::TryStreamExt as _; use mesa_dev::MesaClient; +use scc::HashMap as ConcurrentHashMap; use secrecy::SecretString; use tracing::{Instrument as _, instrument, trace, warn}; @@ -93,7 +93,7 @@ pub struct OrgFs { client: MesaClient, composite: CompositeFs, /// Maps org-level owner-dir inodes to owner name (github only). - owner_inodes: HashMap, + owner_inodes: ConcurrentHashMap, } impl OrgFs { @@ -121,37 +121,42 @@ impl OrgFs { /// Ensure an inode exists for a virtual owner directory (github only). Does NOT bump rc. /// TODO(MES-674): Cleanup "special" casing for github. - async fn ensure_owner_inode(&mut self, owner: &str) -> (Inode, FileAttr) { + async fn ensure_owner_inode(&self, owner: &str) -> (Inode, FileAttr) { // Check existing - let mut stale_ino = None; - for (&ino, existing_owner) in &self.owner_inodes { - if existing_owner == owner { - if let Some(attr) = self.composite.icache.get_attr(ino).await { - return (ino, attr); - } - if self.composite.icache.contains(ino) { - // ICB exists but attr missing — rebuild and cache - let now = SystemTime::now(); - let attr = FileAttr::Directory { - common: mescloud_icache::make_common_file_attr( - ino, - 0o755, - now, - now, - self.composite.icache.fs_owner(), - self.composite.icache.block_size(), - ), - }; - self.composite.icache.cache_attr(ino, attr).await; - return (ino, attr); + let mut existing_ino_found = None; + self.owner_inodes + .any_async(|&ino, existing_owner| { + if existing_owner == owner { + existing_ino_found = Some(ino); + true + } else { + false } - // ICB was evicted — mark for cleanup - stale_ino = Some(ino); - break; + }) + .await; + + if let Some(existing_ino) = existing_ino_found { + if let Some(attr) = self.composite.icache.get_attr(existing_ino).await { + return (existing_ino, attr); } - } - if let Some(ino) = stale_ino { - self.owner_inodes.remove(&ino); + if self.composite.icache.contains(existing_ino) { + // ICB exists but attr missing — rebuild and cache + let now = SystemTime::now(); + let attr = FileAttr::Directory { + common: mescloud_icache::make_common_file_attr( + existing_ino, + 0o755, + now, + now, + self.composite.icache.fs_owner(), + self.composite.icache.block_size(), + ), + }; + self.composite.icache.cache_attr(existing_ino, attr).await; + return (existing_ino, attr); + } + // ICB was evicted — mark for cleanup + self.owner_inodes.remove_async(&existing_ino).await; } // Allocate new @@ -170,7 +175,7 @@ impl OrgFs { }, ) .await; - self.owner_inodes.insert(ino, owner.to_owned()); + drop(self.owner_inodes.insert_async(ino, owner.to_owned()).await); let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( ino, @@ -197,12 +202,11 @@ impl OrgFs { composite: CompositeFs { icache: MescloudICache::new(resolver, Self::ROOT_INO, fs_owner, Self::BLOCK_SIZE), file_table: FileTable::new(), - readdir_buf: Vec::new(), - child_inodes: HashMap::new(), - inode_to_slot: HashMap::new(), - slots: Vec::new(), + child_inodes: ConcurrentHashMap::new(), + inode_to_slot: ConcurrentHashMap::new(), + slots: tokio::sync::RwLock::new(Vec::new()), }, - owner_inodes: HashMap::new(), + owner_inodes: ConcurrentHashMap::new(), } } @@ -211,10 +215,15 @@ impl OrgFs { if ino == Self::ROOT_INO { return Some(InodeRole::OrgRoot); } - if self.owner_inodes.contains_key(&ino) { + if self.owner_inodes.read_sync(&ino, |_, _| ()).is_some() { return Some(InodeRole::OwnerDir); } - if self.composite.child_inodes.contains_key(&ino) { + if self + .composite + .child_inodes + .read_sync(&ino, |_, _| ()) + .is_some() + { return Some(InodeRole::RepoOwned); } if self.composite.slot_for_inode(ino).is_some() { @@ -230,15 +239,28 @@ impl OrgFs { /// - `display_name`: name shown in filesystem ("linux" for github, same as `repo_name` otherwise) /// - `parent_ino`: owner-dir inode for github, `ROOT_INO` otherwise async fn ensure_repo_inode( - &mut self, + &self, repo_name: &str, display_name: &str, default_branch: &str, parent_ino: Inode, ) -> (Inode, FileAttr) { // Check existing repos. - for (&ino, &idx) in &self.composite.child_inodes { - if self.composite.slots[idx].inner.repo_name() == repo_name { + { + let slots = self.composite.slots.read().await; + let mut found = None; + self.composite + .child_inodes + .any_async(|&ino, &idx| { + if slots[idx].inner.repo_name() == repo_name { + found = Some((ino, idx)); + true + } else { + false + } + }) + .await; + if let Some((ino, _)) = found { if let Some(attr) = self.composite.icache.get_attr(ino).await { let rc = self .composite @@ -256,19 +278,15 @@ impl OrgFs { ); return self.make_repo_dir_attr(ino).await; } - } - // Check for orphaned slot (slot exists but not in child_inodes). - if let Some(idx) = self - .composite - .slots - .iter() - .position(|s| s.inner.repo_name() == repo_name) - { - return self.register_repo_slot(idx, display_name, parent_ino).await; + // Check for orphaned slot (slot exists but not in child_inodes). + if let Some(idx) = slots.iter().position(|s| s.inner.repo_name() == repo_name) { + drop(slots); + return self.register_repo_slot(idx, display_name, parent_ino).await; + } } - // Allocate truly new slot. + // Allocate truly new slot. Need write lock. let ino = self.composite.icache.allocate_inode(); trace!( ino, @@ -298,16 +316,19 @@ impl OrgFs { self.composite.icache.fs_owner(), ); - let mut bridge = HashMapBridge::new(); + let bridge = HashMapBridge::new(); bridge.insert_inode(ino, RepoFs::ROOT_INO); - let idx = self.composite.slots.len(); - self.composite.slots.push(ChildSlot { + let mut slots = self.composite.slots.write().await; + let idx = slots.len(); + slots.push(ChildSlot { inner: repo, bridge, }); - self.composite.child_inodes.insert(ino, idx); - self.composite.inode_to_slot.insert(ino, idx); + drop(slots); + + let _ = self.composite.child_inodes.insert_async(ino, idx).await; + let _ = self.composite.inode_to_slot.insert_async(ino, idx).await; self.make_repo_dir_attr(ino).await } @@ -315,7 +336,7 @@ impl OrgFs { /// Allocate a new inode, register it in an existing (orphaned) slot, and /// return `(ino, attr)`. async fn register_repo_slot( - &mut self, + &self, idx: usize, display_name: &str, parent_ino: Inode, @@ -343,12 +364,19 @@ impl OrgFs { "register_repo_slot: resetting bridge for orphaned slot; \ inner filesystem will not receive forget for stale inode mappings" ); - self.composite.slots[idx].bridge = HashMapBridge::new(); - self.composite.slots[idx] - .bridge - .insert_inode(ino, RepoFs::ROOT_INO); - self.composite.child_inodes.insert(ino, idx); - self.composite.inode_to_slot.insert(ino, idx); + let slots = self.composite.slots.read().await; + // Reset bridge by replacing its internal state via insert_inode + // (HashMapBridge doesn't expose a reset method, so we create a new one + // and write through the RwLock. Since ChildSlot's bridge uses interior + // mutability, we need write lock to replace the bridge itself.) + drop(slots); + { + let mut slots = self.composite.slots.write().await; + slots[idx].bridge = HashMapBridge::new(); + slots[idx].bridge.insert_inode(ino, RepoFs::ROOT_INO); + } + let _ = self.composite.child_inodes.insert_async(ino, idx).await; + let _ = self.composite.inode_to_slot.insert_async(ino, idx).await; self.make_repo_dir_attr(ino).await } @@ -402,7 +430,7 @@ impl Fs for OrgFs { type ReleaseError = ReleaseError; #[instrument(name = "OrgFs::lookup", skip(self), fields(org = %self.name))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; match role { InodeRole::OrgRoot => { @@ -444,9 +472,9 @@ impl Fs for OrgFs { // Parent is an owner dir, name is a repo like "linux". let owner = self .owner_inodes - .get(&parent) - .ok_or(LookupError::InodeNotFound)? - .clone(); + .read_async(&parent, |_, v| v.clone()) + .await + .ok_or(LookupError::InodeNotFound)?; let repo_name_str = name.to_str().ok_or(LookupError::InodeNotFound)?; let full_decoded = format!("{owner}/{repo_name_str}"); let encoded = Self::encode_github_repo_name(&full_decoded); @@ -476,16 +504,12 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::getattr", skip(self), fields(org = %self.name))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.composite.delegated_getattr(ino).await } #[instrument(name = "OrgFs::readdir", skip(self), fields(org = %self.name))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { let role = self.inode_role(ino).ok_or(ReadDirError::InodeNotFound)?; match role { InodeRole::OrgRoot => { @@ -526,20 +550,24 @@ impl Fs for OrgFs { }); } - let prefetch_count = entries - .iter() - .filter_map(|e| self.composite.child_inodes.get(&e.ino).copied()) - .inspect(|&idx| self.composite.slots[idx].inner.prefetch_root()) - .count(); - if prefetch_count > 0 { - trace!( - count = prefetch_count, - "readdir: prefetching repo root directories" - ); + { + let slots = self.composite.slots.read().await; + let prefetch_count = entries + .iter() + .filter_map(|e| { + self.composite.child_inodes.read_sync(&e.ino, |_, &idx| idx) + }) + .inspect(|&idx| slots[idx].inner.prefetch_root()) + .count(); + if prefetch_count > 0 { + trace!( + count = prefetch_count, + "readdir: prefetching repo root directories" + ); + } } - self.composite.readdir_buf = entries; - Ok(&self.composite.readdir_buf) + Ok(entries) } InodeRole::OwnerDir if self.is_github() => { // TODO(MES-674): Cleanup "special" casing for github. @@ -551,13 +579,13 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::open", skip(self), fields(org = %self.name))] - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result { self.composite.delegated_open(ino, flags).await } #[instrument(name = "OrgFs::read", skip(self), fields(org = %self.name))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -572,7 +600,7 @@ impl Fs for OrgFs { #[instrument(name = "OrgFs::release", skip(self), fields(org = %self.name))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -584,14 +612,14 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::forget", skip(self), fields(org = %self.name))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { let evicted = self.composite.delegated_forget(ino, nlookups).await; if evicted { - self.owner_inodes.remove(&ino); + self.owner_inodes.remove_async(&ino).await; } } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.composite.delegated_statfs()) } } diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 2196528..0857d5a 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -5,13 +5,14 @@ use std::collections::HashSet; use std::ffi::OsString; use std::future::Future; -use std::{collections::HashMap, ffi::OsStr, path::PathBuf, time::SystemTime}; +use std::{ffi::OsStr, path::PathBuf, time::SystemTime}; use base64::Engine as _; use bytes::Bytes; use mesa_dev::MesaClient; use mesa_dev::low_level::content::{Content, DirEntry as MesaDirEntry}; use num_traits::cast::ToPrimitive as _; +use scc::HashMap as ConcurrentHashMap; use tracing::{Instrument as _, instrument, trace, warn}; use crate::fs::icache::{AsyncICache, FileTable, IcbResolver}; @@ -189,8 +190,7 @@ pub struct RepoFs { icache: MescloudICache, file_table: FileTable, - readdir_buf: Vec, - open_files: HashMap, + open_files: ConcurrentHashMap, } impl RepoFs { @@ -220,8 +220,7 @@ impl RepoFs { ref_, icache: MescloudICache::new(resolver, Self::ROOT_INO, fs_owner, Self::BLOCK_SIZE), file_table: FileTable::new(), - readdir_buf: Vec::new(), - open_files: HashMap::new(), + open_files: ConcurrentHashMap::new(), } } @@ -292,7 +291,7 @@ impl Fs for RepoFs { type ReleaseError = ReleaseError; #[instrument(name = "RepoFs::lookup", skip(self), fields(repo = %self.repo_name))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { debug_assert!( self.icache.contains(parent), "lookup: parent inode {parent} not in inode table" @@ -315,11 +314,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::getattr", skip(self), fields(repo = %self.repo_name))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.icache.get_attr(ino).await.ok_or_else(|| { warn!(ino, "getattr on unknown inode"); GetAttrError::InodeNotFound @@ -327,7 +322,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::readdir", skip(self), fields(repo = %self.repo_name))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { debug_assert!( self.icache.contains(ino), "readdir: inode {ino} not in inode table" @@ -407,12 +402,11 @@ impl Fs for RepoFs { self.icache.spawn_prefetch_readdir(subdir_inodes); } - self.readdir_buf = entries; - Ok(&self.readdir_buf) + Ok(entries) } #[instrument(name = "RepoFs::open", skip(self), fields(repo = %self.repo_name))] - async fn open(&mut self, ino: Inode, _flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, _flags: OpenFlags) -> Result { if !self.icache.contains(ino) { warn!(ino, "open on unknown inode"); return Err(OpenError::InodeNotFound); @@ -425,7 +419,7 @@ impl Fs for RepoFs { "open: inode {ino} has non-file cached attr" ); let fh = self.file_table.allocate(); - self.open_files.insert(fh, ino); + let _ = self.open_files.insert_async(fh, ino).await; trace!(ino, fh, "assigned file handle"); Ok(OpenFile { handle: fh, @@ -435,7 +429,7 @@ impl Fs for RepoFs { #[instrument(name = "RepoFs::read", skip(self), fields(repo = %self.repo_name))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -443,10 +437,14 @@ impl Fs for RepoFs { _flags: OpenFlags, _lock_owner: Option, ) -> Result { - let &file_ino = self.open_files.get(&fh).ok_or_else(|| { - warn!(fh, "read on unknown file handle"); - ReadError::FileNotOpen - })?; + let file_ino = self + .open_files + .read_async(&fh, |_, &file_ino| file_ino) + .await + .ok_or_else(|| { + warn!(fh, "read on unknown file handle"); + ReadError::FileNotOpen + })?; debug_assert!( file_ino == ino, "read: file handle {fh} maps to inode {file_ino}, but caller passed inode {ino}" @@ -497,13 +495,13 @@ impl Fs for RepoFs { #[instrument(name = "RepoFs::release", skip(self), fields(repo = %self.repo_name))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, _flags: OpenFlags, _flush: bool, ) -> Result<(), ReleaseError> { - let released_ino = self.open_files.remove(&fh).ok_or_else(|| { + let (_, released_ino) = self.open_files.remove_async(&fh).await.ok_or_else(|| { warn!(fh, "release on unknown file handle"); ReleaseError::FileNotOpen })?; @@ -516,7 +514,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::forget", skip(self), fields(repo = %self.repo_name))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { debug_assert!( self.icache.contains(ino), "forget: inode {ino} not in inode table" @@ -525,7 +523,7 @@ impl Fs for RepoFs { self.icache.forget(ino, nlookups).await; } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.icache.statfs()) } } diff --git a/src/fs/trait.rs b/src/fs/trait.rs index 7a56e57..1758e53 100644 --- a/src/fs/trait.rs +++ b/src/fs/trait.rs @@ -320,40 +320,40 @@ pub struct FilesystemStats { } #[async_trait] -pub trait Fs { - type LookupError: std::error::Error; - type GetAttrError: std::error::Error; - type OpenError: std::error::Error; - type ReadError: std::error::Error; - type ReaddirError: std::error::Error; - type ReleaseError: std::error::Error; +pub trait Fs: Send + Sync + 'static { + type LookupError: std::error::Error + Send + 'static; + type GetAttrError: std::error::Error + Send + 'static; + type OpenError: std::error::Error + Send + 'static; + type ReadError: std::error::Error + Send + 'static; + type ReaddirError: std::error::Error + Send + 'static; + type ReleaseError: std::error::Error + Send + 'static; /// Called once after mount, before any FUSE operations. /// Override to perform startup work like prefetching. - async fn init(&mut self) {} + async fn init(&self) {} /// For each lookup call made by the kernel, it expects the icache to be updated with the /// returned `FileAttr`. - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result; + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result; /// Can be called in two contexts -- the file is not open (in which case `fh` is `None`), /// or the file is open (in which case `fh` is `Some`). async fn getattr( - &mut self, + &self, ino: Inode, fh: Option, ) -> Result; /// Read the contents of a directory. - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], Self::ReaddirError>; + async fn readdir(&self, ino: Inode) -> Result, Self::ReaddirError>; /// Open a file for reading. - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result; + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result; /// Read data from an open file. #[expect(clippy::too_many_arguments, reason = "mirrors fuser read API")] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -364,7 +364,7 @@ pub trait Fs { /// Called when the kernel closes a file handle. async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -372,8 +372,8 @@ pub trait Fs { ) -> Result<(), Self::ReleaseError>; /// Called when the kernel is done with an inode. - async fn forget(&mut self, ino: Inode, nlookups: u64); + async fn forget(&self, ino: Inode, nlookups: u64); /// Get filesystem statistics. - async fn statfs(&mut self) -> Result; + async fn statfs(&self) -> Result; }