diff --git a/src/fs/fuser.rs b/src/fs/fuser.rs index 86ddabb..d1c9369 100644 --- a/src/fs/fuser.rs +++ b/src/fs/fuser.rs @@ -1,4 +1,5 @@ use std::ffi::OsStr; +use std::sync::Arc; use crate::fs::r#trait::{CommonFileAttr, DirEntryType, FileAttr, Fs, LockOwner, OpenFlags}; use tracing::{debug, error, instrument}; @@ -115,7 +116,7 @@ where F::ReaddirError: Into, F::ReleaseError: Into, { - fs: F, + fs: Arc, runtime: tokio::runtime::Handle, } @@ -141,7 +142,10 @@ where const SHAMEFUL_TTL: std::time::Duration = std::time::Duration::from_secs(1); pub fn new(fs: F, runtime: tokio::runtime::Handle) -> Self { - Self { fs, runtime } + Self { + fs: Arc::new(fs), + runtime, + } } } @@ -154,6 +158,15 @@ where F::ReaddirError: Into, F::ReleaseError: Into, { + fn init( + &mut self, + _req: &fuser::Request<'_>, + _config: &mut fuser::KernelConfig, + ) -> Result<(), libc::c_int> { + self.runtime.block_on(self.fs.init()); + Ok(()) + } + #[instrument(name = "FuserAdapter::lookup", skip(self, _req, reply))] fn lookup( &mut self, @@ -162,20 +175,22 @@ where name: &OsStr, reply: fuser::ReplyEntry, ) { - match self.runtime.block_on(self.fs.lookup(parent, name)) { - Ok(attr) => { - // TODO(markovejnovic): Passing generation = 0 here is a recipe for disaster. - // Someone with A LOT of files will likely see inode reuse which will lead to a - // disaster. - let f_attr: fuser::FileAttr = attr.into(); - debug!(?f_attr, "replying..."); - reply.entry(&Self::SHAMEFUL_TTL, &f_attr, 0); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let name = name.to_owned(); + let ttl = Self::SHAMEFUL_TTL; + self.runtime.spawn(async move { + match fs.lookup(parent, &name).await { + Ok(attr) => { + let f_attr: fuser::FileAttr = attr.into(); + debug!(?f_attr, "replying..."); + reply.entry(&ttl, &f_attr, 0); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::getattr", skip(self, _req, fh, reply))] @@ -186,16 +201,20 @@ where fh: Option, reply: fuser::ReplyAttr, ) { - match self.runtime.block_on(self.fs.getattr(ino, fh)) { - Ok(attr) => { - debug!(?attr, "replying..."); - reply.attr(&Self::SHAMEFUL_TTL, &attr.into()); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let ttl = Self::SHAMEFUL_TTL; + self.runtime.spawn(async move { + match fs.getattr(ino, fh).await { + Ok(attr) => { + debug!(?attr, "replying..."); + reply.attr(&ttl, &attr.into()); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::readdir", skip(self, _req, _fh, offset, reply))] @@ -207,54 +226,61 @@ where offset: i64, mut reply: fuser::ReplyDirectory, ) { - let entries = match self.runtime.block_on(self.fs.readdir(ino)) { - Ok(entries) => entries, - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); - return; - } - }; - - #[expect( - clippy::cast_possible_truncation, - reason = "fuser offset is i64 but always non-negative" - )] - for (i, entry) in entries - .iter() - .enumerate() - .skip(offset.cast_unsigned() as usize) - { - let kind: fuser::FileType = entry.kind.into(); - let Ok(idx): Result = (i + 1).try_into() else { - error!("Directory entry index {} too large for fuser", i + 1); - reply.error(libc::EIO); - return; + let fs = Arc::clone(&self.fs); + self.runtime.spawn(async move { + let entries = match fs.readdir(ino).await { + Ok(entries) => entries, + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + return; + } }; - debug!(?entry, "adding entry to reply..."); - if reply.add(entry.ino, idx, kind, &entry.name) { - debug!("buffer full for now, stopping readdir"); - break; + #[expect( + clippy::cast_possible_truncation, + reason = "fuser offset is i64 but always non-negative" + )] + for (i, entry) in entries + .iter() + .enumerate() + .skip(offset.cast_unsigned() as usize) + { + let kind: fuser::FileType = entry.kind.into(); + let Ok(idx): Result = (i + 1).try_into() else { + error!("Directory entry index {} too large for fuser", i + 1); + reply.error(libc::EIO); + return; + }; + + debug!(?entry, "adding entry to reply..."); + if reply.add(entry.ino, idx, kind, &entry.name) { + debug!("buffer full for now, stopping readdir"); + break; + } } - } - debug!("finalizing reply..."); - reply.ok(); + debug!("finalizing reply..."); + reply.ok(); + }); } #[instrument(name = "FuserAdapter::open", skip(self, _req, flags, reply))] fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - match self.runtime.block_on(self.fs.open(ino, flags.into())) { - Ok(open_file) => { - debug!(handle = open_file.handle, "replying..."); - reply.opened(open_file.handle, 0); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let flags: OpenFlags = flags.into(); + self.runtime.spawn(async move { + match fs.open(ino, flags).await { + Ok(open_file) => { + debug!(handle = open_file.handle, "replying..."); + reply.opened(open_file.handle, 0); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument( @@ -272,25 +298,24 @@ where lock_owner: Option, reply: fuser::ReplyData, ) { + let fs = Arc::clone(&self.fs); let flags: OpenFlags = flags.into(); let lock_owner = lock_owner.map(LockOwner); - match self.runtime.block_on(self.fs.read( - ino, - fh, - offset.cast_unsigned(), - size, - flags, - lock_owner, - )) { - Ok(data) => { - debug!(read_bytes = data.len(), "replying..."); - reply.data(&data); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + self.runtime.spawn(async move { + match fs + .read(ino, fh, offset.cast_unsigned(), size, flags, lock_owner) + .await + { + Ok(data) => { + debug!(read_bytes = data.len(), "replying..."); + reply.data(&data); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::release", skip(self, _req, _lock_owner, reply))] @@ -304,30 +329,35 @@ where flush: bool, reply: fuser::ReplyEmpty, ) { - match self - .runtime - .block_on(self.fs.release(ino, fh, flags.into(), flush)) - { - Ok(()) => { - debug!("replying ok"); - reply.ok(); - } - Err(e) => { - debug!(error = %e, "replying error"); - reply.error(e.into()); + let fs = Arc::clone(&self.fs); + let flags: OpenFlags = flags.into(); + self.runtime.spawn(async move { + match fs.release(ino, fh, flags, flush).await { + Ok(()) => { + debug!("replying ok"); + reply.ok(); + } + Err(e) => { + debug!(error = %e, "replying error"); + reply.error(e.into()); + } } - } + }); } #[instrument(name = "FuserAdapter::forget", skip(self, _req, nlookup))] fn forget(&mut self, _req: &fuser::Request<'_>, ino: u64, nlookup: u64) { - self.runtime.block_on(self.fs.forget(ino, nlookup)); + let fs = Arc::clone(&self.fs); + self.runtime.spawn(async move { + fs.forget(ino, nlookup).await; + }); } #[instrument(name = "FuserAdapter::statfs", skip(self, _req, _ino, reply))] fn statfs(&mut self, _req: &fuser::Request<'_>, _ino: u64, reply: fuser::ReplyStatfs) { - self.runtime.block_on(async { - match self.fs.statfs().await { + let fs = Arc::clone(&self.fs); + self.runtime.spawn(async move { + match fs.statfs().await { Ok(statvfs) => { debug!(?statvfs, "replying..."); reply.statfs( diff --git a/src/fs/icache/async_cache.rs b/src/fs/icache/async_cache.rs index 84003da..488f4db 100644 --- a/src/fs/icache/async_cache.rs +++ b/src/fs/icache/async_cache.rs @@ -1,7 +1,9 @@ //! Async inode cache with InFlight/Available state machine. use std::future::Future; +use std::sync::Arc; +use futures::StreamExt as _; use scc::HashMap as ConcurrentHashMap; use tokio::sync::watch; @@ -60,13 +62,28 @@ pub trait IcbResolver: Send + Sync { Self: Sized; } +/// Shared interior of [`AsyncICache`], behind an `Arc` so the cache can be +/// cheaply cloned for fire-and-forget background work (e.g. `spawn_prefetch`). +struct AsyncICacheInner { + resolver: R, + inode_table: ConcurrentHashMap>, +} + /// Async, concurrency-safe inode cache. /// /// All methods take `&self` — internal synchronization is provided by -/// `scc::HashMap` (sharded lock-free map). +/// `scc::HashMap` (sharded lock-free map). The cache is cheaply cloneable +/// (via `Arc`) so that prefetch work can be spawned in the background. pub struct AsyncICache { - resolver: R, - inode_table: ConcurrentHashMap>, + inner: Arc>, +} + +impl Clone for AsyncICache { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } } impl AsyncICache { @@ -79,14 +96,16 @@ impl AsyncICache { IcbState::Available(R::Icb::new_root(root_path.into())), )); Self { - resolver, - inode_table: table, + inner: Arc::new(AsyncICacheInner { + resolver, + inode_table: table, + }), } } /// Number of entries (`InFlight` + `Available`) in the table. pub fn inode_count(&self) -> usize { - self.inode_table.len() + self.inner.inode_table.len() } /// Wait until `ino` is `Available`. @@ -96,6 +115,7 @@ impl AsyncICache { async fn wait_for_available(&self, ino: Inode) -> bool { loop { let rx = self + .inner .inode_table .read_async(&ino, |_, s| match s { IcbState::InFlight(rx) => Some(rx.clone()), @@ -123,7 +143,7 @@ impl AsyncICache { /// This is a non-blocking, synchronous check. It does **not** wait for /// `InFlight` entries to resolve. pub fn contains(&self, ino: Inode) -> bool { - self.inode_table.contains_sync(&ino) + self.inner.inode_table.contains_sync(&ino) } /// Read an ICB via closure. **Awaits** if `InFlight`. @@ -142,6 +162,7 @@ impl AsyncICache { return None; } let result = self + .inner .inode_table .read_async(&ino, |_, state| match state { IcbState::Available(icb) => Some(f(icb)), @@ -169,6 +190,7 @@ impl AsyncICache { return None; } let result = self + .inner .inode_table .update_async(&ino, |_, state| match state { IcbState::Available(icb) => Some(f(icb)), @@ -190,7 +212,7 @@ impl AsyncICache { use scc::hash_map::Entry; let mut icb = Some(icb); loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Vacant(vac) => { let val = icb .take() @@ -233,7 +255,7 @@ impl AsyncICache { let mut then_fn = Some(then); loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => match occ.get_mut() { IcbState::Available(icb) => { let t = then_fn @@ -270,7 +292,7 @@ impl AsyncICache { /// `forget` has already removed. async fn write_back_if_present(&self, ino: Inode, icb: R::Icb) { use scc::hash_map::Entry; - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => { *occ.get_mut() = IcbState::Available(icb); } @@ -304,6 +326,7 @@ impl AsyncICache { // Fast path: Available and fully resolved { let hit = self + .inner .inode_table .read_async(&ino, |_, s| match s { IcbState::Available(icb) if !icb.needs_resolve() => { @@ -322,7 +345,7 @@ impl AsyncICache { // Slow path: missing, InFlight, or stub needing resolution loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => match occ.get_mut() { IcbState::Available(icb) if !icb.needs_resolve() => { let t = then_fn @@ -340,7 +363,7 @@ impl AsyncICache { let fallback = stub.clone(); drop(occ); // release shard lock before awaiting - match self.resolver.resolve(ino, Some(stub), self).await { + match self.inner.resolver.resolve(ino, Some(stub), self).await { Ok(icb) => { let t = then_fn.take().unwrap_or_else(|| { unreachable!("then_fn consumed more than once") @@ -354,7 +377,7 @@ impl AsyncICache { if fallback.rc() > 0 { self.write_back_if_present(ino, fallback).await; } else { - self.inode_table.remove_async(&ino).await; + self.inner.inode_table.remove_async(&ino).await; } drop(tx); return Err(e); @@ -371,7 +394,7 @@ impl AsyncICache { let (tx, rx) = watch::channel(()); vac.insert_entry(IcbState::InFlight(rx)); - match self.resolver.resolve(ino, None, self).await { + match self.inner.resolver.resolve(ino, None, self).await { Ok(icb) => { let t = then_fn .take() @@ -382,7 +405,7 @@ impl AsyncICache { return Ok(result); } Err(e) => { - self.inode_table.remove_async(&ino).await; + self.inner.inode_table.remove_async(&ino).await; drop(tx); return Err(e); } @@ -409,6 +432,7 @@ impl AsyncICache { return None; } let result = self + .inner .inode_table .update_async(&ino, |_, state| match state { IcbState::Available(icb) => { @@ -441,7 +465,7 @@ impl AsyncICache { use scc::hash_map::Entry; loop { - match self.inode_table.entry_async(ino).await { + match self.inner.inode_table.entry_async(ino).await { Entry::Occupied(mut occ) => match occ.get_mut() { IcbState::Available(icb) => { if icb.rc() <= nlookups { @@ -470,7 +494,8 @@ impl AsyncICache { /// Synchronous mutable access to an `Available` entry. /// Does **not** wait for `InFlight`. Intended for initialization. pub fn get_icb_mut_sync(&self, ino: Inode, f: impl FnOnce(&mut R::Icb) -> T) -> Option { - self.inode_table + self.inner + .inode_table .update_sync(&ino, |_, state| match state { IcbState::Available(icb) => Some(f(icb)), IcbState::InFlight(_) => None, @@ -478,10 +503,64 @@ impl AsyncICache { .flatten() } + /// Maximum number of concurrent resolver RPCs during prefetch. + /// Bounds the fan-out so a large directory doesn't overwhelm the backend. + const MAX_PREFETCH_CONCURRENCY: usize = 8; + + /// Concurrently resolve multiple inodes, best-effort. + /// + /// Each inode is resolved via [`get_or_resolve`] which handles + /// `InFlight`-coalescing and stub-upgrade logic. Errors are silently + /// ignored because prefetch is a performance optimization: a failure + /// simply means the subsequent access will pay the full API latency. + /// + /// Concurrency is bounded to [`MAX_PREFETCH_CONCURRENCY`](Self::MAX_PREFETCH_CONCURRENCY) + /// to avoid overwhelming the backend with a burst of RPCs. + pub async fn prefetch(&self, inodes: impl IntoIterator) { + let inodes: Vec<_> = inodes.into_iter().collect(); + trace!(count = inodes.len(), ?inodes, "prefetch: starting batch"); + futures::stream::iter(inodes) + .for_each_concurrent(Self::MAX_PREFETCH_CONCURRENCY, |ino| async move { + if self.get_or_resolve(ino, |_| ()).await.is_err() { + trace!(ino, "prefetch: resolution failed (will retry on access)"); + } else { + trace!(ino, "prefetch: resolved successfully"); + } + }) + .await; + trace!("prefetch: batch complete"); + } + + /// Fire-and-forget variant of [`prefetch`](Self::prefetch). + /// + /// Spawns the prefetch work on the tokio runtime and returns immediately, + /// so the caller is never blocked waiting for API responses. + pub fn spawn_prefetch(&self, inodes: impl IntoIterator) + where + R: 'static, + R::Icb: 'static, + R::Error: 'static, + { + let inodes: Vec<_> = inodes.into_iter().collect(); + if inodes.is_empty() { + return; + } + trace!( + count = inodes.len(), + ?inodes, + "spawn_prefetch: dispatching background task" + ); + let cache = self.clone(); + tokio::spawn(async move { + cache.prefetch(inodes).await; + }); + } + /// Iterate over all `Available` entries (skips `InFlight`). /// Async-safe iteration using `iter_async` to avoid contention on single-threaded runtimes. pub async fn for_each(&self, mut f: impl FnMut(&Inode, &R::Icb)) { - self.inode_table + self.inner + .inode_table .iter_async(|ino, state| { if let IcbState::Available(icb) = state { f(ino, icb); @@ -693,6 +772,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -717,6 +797,7 @@ mod tests { // Complete the InFlight from the resolver side (write directly) cache + .inner .inode_table .upsert_async( 42, @@ -921,6 +1002,7 @@ mod tests { // Directly insert an InFlight entry for testing iteration let (_tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -941,12 +1023,14 @@ mod tests { // Insert InFlight manually, then immediately complete before anyone waits let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; // Complete before any waiter (simulate resolver by writing directly) cache + .inner .inode_table .upsert_async( 42, @@ -1129,6 +1213,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -1141,6 +1226,7 @@ mod tests { // Simulate resolver completing (write directly to inode_table) cache + .inner .inode_table .upsert_async( 42, @@ -1200,6 +1286,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -1209,6 +1296,7 @@ mod tests { // Simulate resolver completing by writing directly to inode_table cache + .inner .inode_table .upsert_async( 42, @@ -1236,6 +1324,7 @@ mod tests { let cache = Arc::new(test_cache()); let (tx, rx) = watch::channel(()); cache + .inner .inode_table .upsert_async(42, IcbState::InFlight(rx)) .await; @@ -1244,7 +1333,7 @@ mod tests { let handle = tokio::spawn(async move { cache2.inc_rc(42).await }); // Evict instead of completing - cache.inode_table.remove_async(&42).await; + cache.inner.inode_table.remove_async(&42).await; drop(tx); let result = handle @@ -1300,6 +1389,7 @@ mod tests { // Phase 1: insert an InFlight entry. let (tx1, rx1) = watch::channel(()); cache + .inner .inode_table .upsert_async(ino, IcbState::InFlight(rx1)) .await; @@ -1317,6 +1407,7 @@ mod tests { // re-entering InFlight for a second resolution). let (tx2, rx2) = watch::channel(()); cache + .inner .inode_table .upsert_async(ino, IcbState::InFlight(rx2)) .await; @@ -1329,6 +1420,7 @@ mod tests { // Phase 2 complete: write the final resolved value. cache + .inner .inode_table .upsert_async( ino, @@ -1393,7 +1485,7 @@ mod tests { // forget to proceed. Instead, remove the InFlight entry directly to // simulate a concurrent eviction (e.g., by another path that already // removed the entry). - cache.inode_table.remove_async(&ino).await; + cache.inner.inode_table.remove_async(&ino).await; // Let the resolver finish. proceed.notify_one(); @@ -1407,4 +1499,128 @@ mod tests { "evicted entry must not be resurrected after resolution completes" ); } + + #[tokio::test] + async fn prefetch_resolves_multiple_inodes() { + let resolver = TestResolver::new(); + for ino in [10, 11, 12] { + resolver.add( + ino, + TestIcb { + rc: 0, + path: format!("/dir{ino}").into(), + resolved: true, + }, + ); + } + let cache = test_cache_with(resolver); + + for ino in [10, 11, 12] { + cache + .insert_icb( + ino, + TestIcb { + rc: 0, + path: format!("/dir{ino}").into(), + resolved: false, + }, + ) + .await; + } + + cache.prefetch([10, 11, 12]).await; + + for ino in [10, 11, 12] { + let is_resolved = cache.get_icb(ino, |icb| icb.resolved).await; + assert_eq!( + is_resolved, + Some(true), + "inode {ino} should be resolved after prefetch" + ); + } + } + + #[tokio::test] + async fn prefetch_skips_already_resolved() { + let cache = test_cache(); + // Root (ino=1) is already resolved -- prefetch should be a no-op + cache.prefetch([1]).await; + let path = cache.get_icb(1, |icb| icb.path.clone()).await; + assert_eq!(path, Some(PathBuf::from("/root"))); + } + + #[tokio::test] + async fn prefetch_ignores_errors() { + let resolver = TestResolver::new(); + resolver.add( + 10, + TestIcb { + rc: 0, + path: "/ok".into(), + resolved: true, + }, + ); + resolver.add_err(11, "network error"); + let cache = test_cache_with(resolver); + + for (ino, path) in [(10, "/stub10"), (11, "/stub11")] { + cache + .insert_icb( + ino, + TestIcb { + rc: 0, + path: path.into(), + resolved: false, + }, + ) + .await; + } + + cache.prefetch([10, 11]).await; + + let is_resolved = cache.get_icb(10, |icb| icb.resolved).await; + assert_eq!(is_resolved, Some(true), "ino 10 should be resolved"); + // ino 11 failed with rc=0 -> removed by get_or_resolve error path + assert!( + !cache.contains(11), + "ino 11 should be removed after error with rc=0" + ); + } + + #[tokio::test] + async fn prefetch_does_not_modify_rc() { + let resolver = TestResolver::new(); + resolver.add( + 10, + TestIcb { + rc: 0, + path: "/dir".into(), + resolved: true, + }, + ); + let cache = test_cache_with(resolver); + + cache + .insert_icb( + 10, + TestIcb { + rc: 0, + path: "/stub".into(), + resolved: false, + }, + ) + .await; + + cache.prefetch([10]).await; + + let rc = cache.get_icb(10, IcbLike::rc).await; + assert_eq!(rc, Some(0), "prefetch must not modify rc"); + } + + #[tokio::test] + async fn prefetch_empty_is_noop() { + let cache = test_cache(); + cache.prefetch(std::iter::empty()).await; + assert_eq!(cache.inode_count(), 1, "empty prefetch changes nothing"); + } } diff --git a/src/fs/icache/bridge.rs b/src/fs/icache/bridge.rs index e674a56..8de2deb 100644 --- a/src/fs/icache/bridge.rs +++ b/src/fs/icache/bridge.rs @@ -1,72 +1,100 @@ +use std::sync::RwLock; + use crate::fs::r#trait::{FileAttr, FileHandle, Inode}; /// Bidirectional bridge for both inodes and file handles between two Fs layers. /// /// Convention: **left = outer (caller), right = inner (callee)**. /// `forward(left)` → right, `backward(right)` → left. +/// +/// All methods take `&self` — interior mutability is provided by `RwLock`. pub struct HashMapBridge { - inode_map: bimap::BiMap, - fh_map: bimap::BiMap, + inode_map: RwLock>, + fh_map: RwLock>, } +#[expect( + clippy::expect_used, + reason = "RwLock poisoning is unrecoverable; panicking is the correct behavior" +)] impl HashMapBridge { pub fn new() -> Self { Self { - inode_map: bimap::BiMap::new(), - fh_map: bimap::BiMap::new(), + inode_map: RwLock::new(bimap::BiMap::new()), + fh_map: RwLock::new(bimap::BiMap::new()), } } - // ── Inode methods ──────────────────────────────────────────────────── - - pub fn insert_inode(&mut self, left: Inode, right: Inode) { - self.inode_map.insert(left, right); + pub fn insert_inode(&self, left: Inode, right: Inode) { + self.inode_map + .write() + .expect("poisoned") + .insert(left, right); } /// Look up right→left, or allocate a new left inode if unmapped. pub fn backward_or_insert_inode( - &mut self, + &self, right: Inode, allocate: impl FnOnce() -> Inode, ) -> Inode { - if let Some(&left) = self.inode_map.get_by_right(&right) { - left - } else { - let left = allocate(); - self.inode_map.insert(left, right); - left + // Fast path: read-only check. + if let Some(&left) = self + .inode_map + .read() + .expect("poisoned") + .get_by_right(&right) + { + return left; + } + // Slow path: acquire write lock and double-check. + let mut map = self.inode_map.write().expect("poisoned"); + if let Some(&left) = map.get_by_right(&right) { + return left; } + let left = allocate(); + map.insert(left, right); + left } /// Look up left→right, or allocate a new right inode if unmapped. - pub fn forward_or_insert_inode( - &mut self, - left: Inode, - allocate: impl FnOnce() -> Inode, - ) -> Inode { - if let Some(&right) = self.inode_map.get_by_left(&left) { - right - } else { - let right = allocate(); - self.inode_map.insert(left, right); - right + pub fn forward_or_insert_inode(&self, left: Inode, allocate: impl FnOnce() -> Inode) -> Inode { + // Fast path: read-only check. + if let Some(&right) = self.inode_map.read().expect("poisoned").get_by_left(&left) { + return right; } + // Slow path: acquire write lock and double-check. + let mut map = self.inode_map.write().expect("poisoned"); + if let Some(&right) = map.get_by_left(&left) { + return right; + } + let right = allocate(); + map.insert(left, right); + right } /// Remove an inode mapping by its left (outer) key. - pub fn remove_inode_by_left(&mut self, left: Inode) { - self.inode_map.remove_by_left(&left); + pub fn remove_inode_by_left(&self, left: Inode) { + self.inode_map + .write() + .expect("poisoned") + .remove_by_left(&left); } /// Look up left→right directly. - pub fn inode_map_get_by_left(&self, left: Inode) -> Option<&Inode> { - self.inode_map.get_by_left(&left) + pub fn inode_map_get_by_left(&self, left: Inode) -> Option { + self.inode_map + .read() + .expect("poisoned") + .get_by_left(&left) + .copied() } /// Rewrite the `ino` field in a [`FileAttr`] from right (inner) to left (outer) namespace. pub fn attr_backward(&self, attr: FileAttr) -> FileAttr { + let map = self.inode_map.read().expect("poisoned"); let backward = |ino: Inode| -> Inode { - if let Some(&left) = self.inode_map.get_by_right(&ino) { + if let Some(&left) = map.get_by_right(&ino) { left } else { tracing::warn!( @@ -79,19 +107,21 @@ impl HashMapBridge { rewrite_attr_ino(attr, backward) } - // ── File handle methods ────────────────────────────────────────────── - - pub fn insert_fh(&mut self, left: FileHandle, right: FileHandle) { - self.fh_map.insert(left, right); + pub fn insert_fh(&self, left: FileHandle, right: FileHandle) { + self.fh_map.write().expect("poisoned").insert(left, right); } pub fn fh_forward(&self, left: FileHandle) -> Option { - self.fh_map.get_by_left(&left).copied() + self.fh_map + .read() + .expect("poisoned") + .get_by_left(&left) + .copied() } /// Remove a file handle mapping by its left (outer) key. - pub fn remove_fh_by_left(&mut self, left: FileHandle) { - self.fh_map.remove_by_left(&left); + pub fn remove_fh_by_left(&self, left: FileHandle) { + self.fh_map.write().expect("poisoned").remove_by_left(&left); } } diff --git a/src/fs/mescloud/composite.rs b/src/fs/mescloud/composite.rs index 6dbac25..5bd534b 100644 --- a/src/fs/mescloud/composite.rs +++ b/src/fs/mescloud/composite.rs @@ -1,7 +1,7 @@ -use std::collections::HashMap; use std::ffi::OsStr; use bytes::Bytes; +use scc::HashMap as ConcurrentHashMap; use tracing::{instrument, trace, warn}; use crate::fs::icache::bridge::HashMapBridge; @@ -36,18 +36,21 @@ pub(super) struct ChildSlot { /// maintains a bidirectional inode/file-handle bridge per child (see /// [`ChildSlot`]) to translate between the outer namespace visible to FUSE and /// each child's internal namespace. +/// +/// All methods take `&self` — interior mutability is provided by +/// `scc::HashMap` (for inode maps) and `tokio::sync::RwLock` (for the +/// growable slot vector). pub(super) struct CompositeFs where R: IcbResolver, { pub icache: MescloudICache, pub file_table: FileTable, - pub readdir_buf: Vec, /// Maps outer inode to index into `slots` for child-root inodes. - pub child_inodes: HashMap, + pub child_inodes: ConcurrentHashMap, /// Maps every translated outer inode to its owning slot index. - pub inode_to_slot: HashMap, - pub slots: Vec>, + pub inode_to_slot: ConcurrentHashMap, + pub slots: tokio::sync::RwLock>>, } impl CompositeFs @@ -67,31 +70,32 @@ where /// Look up which child slot owns an inode via direct map. #[instrument(name = "CompositeFs::slot_for_inode", skip(self))] pub fn slot_for_inode(&self, ino: Inode) -> Option { - self.inode_to_slot.get(&ino).copied() + self.inode_to_slot.read_sync(&ino, |_, &idx| idx) } /// Allocate an outer file handle and map it through the bridge. #[must_use] - pub fn alloc_fh(&mut self, slot_idx: usize, inner_fh: FileHandle) -> FileHandle { + pub fn alloc_fh(&self, slot: &ChildSlot, inner_fh: FileHandle) -> FileHandle { let fh = self.file_table.allocate(); - self.slots[slot_idx].bridge.insert_fh(fh, inner_fh); + slot.bridge.insert_fh(fh, inner_fh); fh } /// Translate an inner inode to an outer inode, allocating if needed. /// Also inserts a stub ICB into the outer icache when the inode is new. - #[instrument(name = "CompositeFs::translate_inner_ino", skip(self, name))] + #[instrument(name = "CompositeFs::translate_inner_ino", skip(self, slot, name))] pub async fn translate_inner_ino( - &mut self, + &self, slot_idx: usize, + slot: &ChildSlot, inner_ino: Inode, parent_outer_ino: Inode, name: &OsStr, ) -> Inode { - let outer_ino = self.slots[slot_idx] + let outer_ino = slot .bridge .backward_or_insert_inode(inner_ino, || self.icache.allocate_inode()); - self.inode_to_slot.insert(outer_ino, slot_idx); + let _ = self.inode_to_slot.insert_async(outer_ino, slot_idx).await; self.icache .entry_or_insert_icb( outer_ino, @@ -120,7 +124,7 @@ where /// Find slot, forward inode, delegate to inner, allocate outer file handle. #[instrument(name = "CompositeFs::delegated_open", skip(self))] pub async fn delegated_open( - &mut self, + &self, ino: Inode, flags: OpenFlags, ) -> Result { @@ -128,11 +132,12 @@ where warn!(ino, "open on inode not belonging to any child"); OpenError::InodeNotFound })?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("open: ino should be mapped")); - let inner_open = self.slots[idx].inner.open(inner_ino, flags).await?; - let outer_fh = self.alloc_fh(idx, inner_open.handle); + let inner_open = slots[idx].inner.open(inner_ino, flags).await?; + let outer_fh = self.alloc_fh(&slots[idx], inner_open.handle); trace!( ino, outer_fh, @@ -149,7 +154,7 @@ where #[expect(clippy::too_many_arguments, reason = "mirrors fuser read API")] #[instrument(name = "CompositeFs::delegated_read", skip(self))] pub async fn delegated_read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -161,14 +166,15 @@ where warn!(ino, "read on inode not belonging to any child"); ReadError::InodeNotFound })?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("read: ino should be mapped")); - let inner_fh = self.slots[idx].bridge.fh_forward(fh).ok_or_else(|| { + let inner_fh = slots[idx].bridge.fh_forward(fh).ok_or_else(|| { warn!(fh, "read: no fh mapping found"); ReadError::FileNotOpen })?; - self.slots[idx] + slots[idx] .inner .read(inner_ino, inner_fh, offset, size, flags, lock_owner) .await @@ -178,7 +184,7 @@ where /// then clean up the file handle mapping. #[instrument(name = "CompositeFs::delegated_release", skip(self))] pub async fn delegated_release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -188,18 +194,19 @@ where warn!(ino, "release on inode not belonging to any child"); ReleaseError::FileNotOpen })?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("release: ino should be mapped")); - let inner_fh = self.slots[idx].bridge.fh_forward(fh).ok_or_else(|| { + let inner_fh = slots[idx].bridge.fh_forward(fh).ok_or_else(|| { warn!(fh, "release: no fh mapping found"); ReleaseError::FileNotOpen })?; - let result = self.slots[idx] + let result = slots[idx] .inner .release(inner_ino, inner_fh, flags, flush) .await; - self.slots[idx].bridge.remove_fh_by_left(fh); + slots[idx].bridge.remove_fh_by_left(fh); trace!(ino, fh, "release: cleaned up fh mapping"); result } @@ -213,20 +220,21 @@ where /// evict the inner root, breaking all subsequent operations on that child. #[must_use] #[instrument(name = "CompositeFs::delegated_forget", skip(self))] - pub async fn delegated_forget(&mut self, ino: Inode, nlookups: u64) -> bool { + pub async fn delegated_forget(&self, ino: Inode, nlookups: u64) -> bool { let slot_idx = self.slot_for_inode(ino); - let is_child_root = self.child_inodes.contains_key(&ino); - if !is_child_root - && let Some(idx) = slot_idx - && let Some(&inner_ino) = self.slots[idx].bridge.inode_map_get_by_left(ino) - { - self.slots[idx].inner.forget(inner_ino, nlookups).await; + let is_child_root = self.child_inodes.read_sync(&ino, |_, _| ()).is_some(); + if !is_child_root && let Some(idx) = slot_idx { + let slots = self.slots.read().await; + if let Some(inner_ino) = slots[idx].bridge.inode_map_get_by_left(ino) { + slots[idx].inner.forget(inner_ino, nlookups).await; + } } if self.icache.forget(ino, nlookups).await.is_some() { - self.child_inodes.remove(&ino); - self.inode_to_slot.remove(&ino); + self.child_inodes.remove_async(&ino).await; + self.inode_to_slot.remove_async(&ino).await; if let Some(idx) = slot_idx { - self.slots[idx].bridge.remove_inode_by_left(ino); + let slots = self.slots.read().await; + slots[idx].bridge.remove_inode_by_left(ino); } true } else { @@ -243,23 +251,24 @@ where /// Delegation branch for lookup when the parent is owned by a child slot. #[instrument(name = "CompositeFs::delegated_lookup", skip(self, name))] pub async fn delegated_lookup( - &mut self, + &self, parent: Inode, name: &OsStr, ) -> Result { let idx = self .slot_for_inode(parent) .ok_or(LookupError::InodeNotFound)?; - let inner_parent = self.slots[idx] + let slots = self.slots.read().await; + let inner_parent = slots[idx] .bridge .forward_or_insert_inode(parent, || unreachable!("lookup: parent should be mapped")); - let inner_attr = self.slots[idx].inner.lookup(inner_parent, name).await?; + let inner_attr = slots[idx].inner.lookup(inner_parent, name).await?; let inner_ino = inner_attr.common().ino; - let outer_ino = self.translate_inner_ino(idx, inner_ino, parent, name).await; - let outer_attr = self.slots[idx].bridge.attr_backward(inner_attr); + let outer_ino = self + .translate_inner_ino(idx, &slots[idx], inner_ino, parent, name) + .await; + let outer_attr = slots[idx].bridge.attr_backward(inner_attr); self.icache.cache_attr(outer_ino, outer_attr).await; - // None means the entry was concurrently evicted; fail the lookup so - // the kernel doesn't hold a ref the cache no longer tracks. let rc = self .icache .inc_rc(outer_ino) @@ -271,29 +280,38 @@ where /// Delegation branch for readdir when the inode is owned by a child slot. #[instrument(name = "CompositeFs::delegated_readdir", skip(self))] - pub async fn delegated_readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + pub async fn delegated_readdir(&self, ino: Inode) -> Result, ReadDirError> { let idx = self .slot_for_inode(ino) .ok_or(ReadDirError::InodeNotFound)?; - let inner_ino = self.slots[idx] + let slots = self.slots.read().await; + let inner_ino = slots[idx] .bridge .forward_or_insert_inode(ino, || unreachable!("readdir: ino should be mapped")); - let inner_entries = self.slots[idx].inner.readdir(inner_ino).await?; - let inner_entries: Vec = inner_entries.to_vec(); - let evicted = self.icache.evict_zero_rc_children(ino).await; + let inner_entries = slots[idx].inner.readdir(inner_ino).await?; + let current_names: std::collections::HashSet = + inner_entries.iter().map(|e| e.name.clone()).collect(); + let evicted = self.icache.evict_stale_children(ino, ¤t_names).await; + if !evicted.is_empty() { + trace!( + ino, + evicted_count = evicted.len(), + "delegated_readdir: evicted stale children" + ); + } for evicted_ino in evicted { - if let Some(slot) = self.inode_to_slot.remove(&evicted_ino) { - self.slots[slot].bridge.remove_inode_by_left(evicted_ino); + if let Some((_, slot)) = self.inode_to_slot.remove_async(&evicted_ino).await { + slots[slot].bridge.remove_inode_by_left(evicted_ino); } - self.child_inodes.remove(&evicted_ino); + self.child_inodes.remove_async(&evicted_ino).await; } let mut outer_entries = Vec::with_capacity(inner_entries.len()); for entry in &inner_entries { let outer_child_ino = self - .translate_inner_ino(idx, entry.ino, ino, &entry.name) + .translate_inner_ino(idx, &slots[idx], entry.ino, ino, &entry.name) .await; - if let Some(inner_attr) = self.slots[idx].inner.peek_attr(entry.ino).await { - let outer_attr = self.slots[idx].bridge.attr_backward(inner_attr); + if let Some(inner_attr) = slots[idx].inner.peek_attr(entry.ino).await { + let outer_attr = slots[idx].bridge.attr_backward(inner_attr); self.icache.cache_attr(outer_child_ino, outer_attr).await; } outer_entries.push(DirEntry { @@ -302,7 +320,6 @@ where kind: entry.kind, }); } - self.readdir_buf = outer_entries; - Ok(&self.readdir_buf) + Ok(outer_entries) } } diff --git a/src/fs/mescloud/icache.rs b/src/fs/mescloud/icache.rs index 15f1f5d..358ac44 100644 --- a/src/fs/mescloud/icache.rs +++ b/src/fs/mescloud/icache.rs @@ -1,8 +1,14 @@ //! Mescloud-specific inode control block, helpers, and directory cache wrapper. -use std::ffi::OsStr; +use std::collections::HashSet; +use std::ffi::{OsStr, OsString}; +use std::sync::Arc; use std::time::SystemTime; +use futures::StreamExt as _; +use scc::HashMap as ConcurrentHashMap; +use tracing::{trace, warn}; + use crate::fs::icache::{AsyncICache, IcbLike, IcbResolver, InodeFactory}; use crate::fs::r#trait::{ CommonFileAttr, DirEntryType, FileAttr, FilesystemStats, Inode, Permissions, @@ -76,22 +82,44 @@ pub fn make_common_file_attr( } } -/// Mescloud-specific directory cache wrapper over `AsyncICache`. -pub struct MescloudICache> { - inner: AsyncICache, +struct MescloudICacheInner> { + cache: AsyncICache, inode_factory: InodeFactory, + /// O(1) lookup from (parent inode, child name) to child inode. + /// Maintained alongside the inode table to avoid linear scans in + /// `ensure_child_ino` and to provide atomicity under concurrent access. + child_index: ConcurrentHashMap<(Inode, OsString), Inode>, fs_owner: (u32, u32), block_size: u32, } +/// Mescloud-specific directory cache wrapper over `AsyncICache`. +/// +/// Cheaply cloneable via `Arc` so background tasks (e.g. prefetch) can share +/// access to the cache and its domain-specific helpers. +pub struct MescloudICache> { + inner: Arc>, +} + +impl> Clone for MescloudICache { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + impl> MescloudICache { /// Create a new `MescloudICache`. Initializes root ICB (rc=1), caches root dir attr. pub fn new(resolver: R, root_ino: Inode, fs_owner: (u32, u32), block_size: u32) -> Self { let cache = Self { - inner: AsyncICache::new(resolver, root_ino, "/"), - inode_factory: InodeFactory::new(root_ino + 1), - fs_owner, - block_size, + inner: Arc::new(MescloudICacheInner { + cache: AsyncICache::new(resolver, root_ino, "/"), + inode_factory: InodeFactory::new(root_ino + 1), + child_index: ConcurrentHashMap::new(), + fs_owner, + block_size, + }), }; // Set root directory attr synchronously during initialization @@ -99,7 +127,7 @@ impl> MescloudICache { let root_attr = FileAttr::Directory { common: make_common_file_attr(root_ino, 0o755, now, now, fs_owner, block_size), }; - cache.inner.get_icb_mut_sync(root_ino, |icb| { + cache.inner.cache.get_icb_mut_sync(root_ino, |icb| { icb.attr = Some(root_attr); }); @@ -109,7 +137,7 @@ impl> MescloudICache { // -- Delegated from AsyncICache (async) -- pub fn contains(&self, ino: Inode) -> bool { - self.inner.contains(ino) + self.inner.cache.contains(ino) } pub async fn get_icb( @@ -118,11 +146,11 @@ impl> MescloudICache { // `Sync` required: see comment on `AsyncICache::get_icb`. f: impl Fn(&InodeControlBlock) -> T + Send + Sync, ) -> Option { - self.inner.get_icb(ino, f).await + self.inner.cache.get_icb(ino, f).await } pub async fn insert_icb(&self, ino: Inode, icb: InodeControlBlock) { - self.inner.insert_icb(ino, icb).await; + self.inner.cache.insert_icb(ino, icb).await; } pub async fn entry_or_insert_icb( @@ -131,15 +159,28 @@ impl> MescloudICache { factory: impl FnOnce() -> InodeControlBlock, then: impl FnOnce(&mut InodeControlBlock) -> T, ) -> T { - self.inner.entry_or_insert_icb(ino, factory, then).await + self.inner + .cache + .entry_or_insert_icb(ino, factory, then) + .await } pub async fn inc_rc(&self, ino: Inode) -> Option { - self.inner.inc_rc(ino).await + self.inner.cache.inc_rc(ino).await } pub async fn forget(&self, ino: Inode, nlookups: u64) -> Option { - self.inner.forget(ino, nlookups).await + let evicted = self.inner.cache.forget(ino, nlookups).await; + if let Some(ref icb) = evicted + && let Some(parent) = icb.parent + { + trace!(ino, parent, path = %icb.path.display(), "forget: cleaning up child_index"); + self.inner + .child_index + .remove_async(&(parent, icb.path.as_os_str().to_os_string())) + .await; + } + evicted } pub async fn get_or_resolve( @@ -147,22 +188,119 @@ impl> MescloudICache { ino: Inode, then: impl FnOnce(&InodeControlBlock) -> T, ) -> Result { - self.inner.get_or_resolve(ino, then).await + self.inner.cache.get_or_resolve(ino, then).await + } + + /// Maximum concurrent readdir prefetches. + const MAX_READDIR_PREFETCH_CONCURRENCY: usize = 4; + + /// Perform readdir-equivalent work for a directory: resolve its ICB, + /// allocate inodes for children, cache directory attrs, and trigger + /// deeper prefetch for subdirectory children. + /// + /// This is the "true readdir prefetch" -- when the user later calls + /// `readdir(ino)`, all the expensive work is already done. + pub async fn prefetch_readdir(&self, ino: Inode) -> Result<(), R::Error> + where + R: 'static, + R::Error: 'static, + { + let children = self + .inner + .cache + .get_or_resolve(ino, |icb| icb.children.clone()) + .await?; + + let Some(children) = children else { + return Ok(()); // Not a directory + }; + + let mut subdir_inodes = Vec::new(); + let now = SystemTime::now(); + + for (name, kind) in &children { + let child_ino = self.ensure_child_ino(ino, OsStr::new(name)).await; + if *kind == DirEntryType::Directory { + let attr = FileAttr::Directory { + common: make_common_file_attr( + child_ino, + 0o755, + now, + now, + self.inner.fs_owner, + self.inner.block_size, + ), + }; + self.cache_attr(child_ino, attr).await; + subdir_inodes.push(child_ino); + } + } + + let subdir_count = subdir_inodes.len(); + + // Trigger deeper prefetch: resolve subdirectory ICBs (one more level) + if !subdir_inodes.is_empty() { + self.inner.cache.spawn_prefetch(subdir_inodes); + } + + trace!( + ino, + child_count = children.len(), + subdir_count, + "prefetch_readdir: completed" + ); + + Ok(()) + } + + /// Fire-and-forget: spawn background tasks that perform full readdir + /// prefetch for the given directory inodes. + pub fn spawn_prefetch_readdir(&self, inodes: impl IntoIterator) + where + R: 'static, + R::Error: std::fmt::Display + 'static, + { + let inodes: Vec<_> = inodes.into_iter().collect(); + if inodes.is_empty() { + return; + } + trace!( + count = inodes.len(), + "spawn_prefetch_readdir: dispatching background task" + ); + let cache = self.clone(); + tokio::spawn(async move { + futures::stream::iter(inodes) + .for_each_concurrent(Self::MAX_READDIR_PREFETCH_CONCURRENCY, |ino| { + let cache = cache.clone(); + async move { + if let Err(e) = cache.prefetch_readdir(ino).await { + warn!(ino, error = %e, "prefetch_readdir: failed (will retry on access)"); + } + } + }) + .await; + }); } // -- Domain-specific -- /// Allocate a new inode number. pub fn allocate_inode(&self) -> Inode { - self.inode_factory.allocate() + self.inner.inode_factory.allocate() } pub async fn get_attr(&self, ino: Inode) -> Option { - self.inner.get_icb(ino, |icb| icb.attr).await.flatten() + self.inner + .cache + .get_icb(ino, |icb| icb.attr) + .await + .flatten() } pub async fn cache_attr(&self, ino: Inode, attr: FileAttr) { self.inner + .cache .get_icb_mut(ino, |icb| { icb.attr = Some(attr); }) @@ -170,21 +308,21 @@ impl> MescloudICache { } pub fn fs_owner(&self) -> (u32, u32) { - self.fs_owner + self.inner.fs_owner } pub fn block_size(&self) -> u32 { - self.block_size + self.inner.block_size } pub fn statfs(&self) -> FilesystemStats { FilesystemStats { - block_size: self.block_size, - fragment_size: u64::from(self.block_size), + block_size: self.inner.block_size, + fragment_size: u64::from(self.inner.block_size), total_blocks: 0, free_blocks: 0, available_blocks: 0, - total_inodes: self.inner.inode_count() as u64, + total_inodes: self.inner.cache.inode_count() as u64, free_inodes: 0, available_inodes: 0, filesystem_id: 0, @@ -193,24 +331,42 @@ impl> MescloudICache { } } - /// Evict all `Available` children of `parent` that have `rc == 0`. + /// Evict rc=0 children of `parent` that are NOT in `current_names`. + /// Preserves prefetched children that are still part of the directory listing. /// Returns the list of evicted inode numbers so callers can clean up /// associated state (e.g., bridge mappings, slot tracking). - pub async fn evict_zero_rc_children(&self, parent: Inode) -> Vec { + pub async fn evict_stale_children( + &self, + parent: Inode, + current_names: &HashSet, + ) -> Vec { let mut to_evict = Vec::new(); self.inner + .cache .for_each(|&ino, icb| { - if icb.rc == 0 && icb.parent == Some(parent) { - to_evict.push(ino); + if icb.rc == 0 + && icb.parent == Some(parent) + && !current_names.contains(icb.path.as_os_str()) + { + to_evict.push((ino, icb.path.as_os_str().to_os_string())); } }) .await; let mut evicted = Vec::new(); - for ino in to_evict { - if self.inner.forget(ino, 0).await.is_some() { + for (ino, name) in to_evict { + if self.inner.cache.forget(ino, 0).await.is_some() { + self.inner.child_index.remove_async(&(parent, name)).await; evicted.push(ino); } } + if !evicted.is_empty() { + trace!( + parent, + evicted_count = evicted.len(), + current_count = current_names.len(), + "evict_stale_children: removed stale entries" + ); + } evicted } @@ -218,42 +374,52 @@ impl> MescloudICache { /// If new, inserts a stub ICB (parent+path set, attr=None, children=None, rc=0). /// Does NOT bump rc. Returns the inode number. /// - /// # Safety invariant - /// - /// The `for_each` scan and `insert_icb` are **not** atomic. If two callers - /// race with the same `(parent, name)`, both may allocate distinct inodes - /// for the same logical child. This is currently safe because all callers - /// go through `&mut self` on the owning `Fs` implementation. + /// Thread-safe: uses `child_index` with `entry_async` for atomic + /// check-and-insert, so concurrent callers for the same (parent, name) + /// always receive the same inode. pub async fn ensure_child_ino(&self, parent: Inode, name: &OsStr) -> Inode { - // Search for existing child by parent + name - let mut existing_ino = None; - self.inner - .for_each(|&ino, icb| { - if icb.parent == Some(parent) && icb.path.as_os_str() == name { - existing_ino = Some(ino); + use scc::hash_map::Entry; + + loop { + let key = (parent, name.to_os_string()); + match self.inner.child_index.entry_async(key).await { + Entry::Occupied(occ) => { + let ino = *occ.get(); + if self.inner.cache.contains(ino) { + trace!(parent, ?name, ino, "ensure_child_ino: cache hit"); + return ino; + } + // Stale: inode was evicted (e.g. failed prefetch with rc=0). + // Remove the stale mapping and loop to reallocate. + warn!( + parent, + ?name, + ino, + "ensure_child_ino: stale child_index entry, reallocating" + ); + drop(occ.remove_entry()); } - }) - .await; - - if let Some(ino) = existing_ino { - return ino; + Entry::Vacant(vac) => { + let ino = self.inner.inode_factory.allocate(); + vac.insert_entry(ino); + self.inner + .cache + .insert_icb( + ino, + InodeControlBlock { + rc: 0, + path: name.into(), + parent: Some(parent), + attr: None, + children: None, + }, + ) + .await; + trace!(parent, ?name, ino, "ensure_child_ino: allocated new inode"); + return ino; + } + } } - - // Allocate new inode and insert stub - let ino = self.inode_factory.allocate(); - self.inner - .insert_icb( - ino, - InodeControlBlock { - rc: 0, - path: name.into(), - parent: Some(parent), - attr: None, - children: None, - }, - ) - .await; - ino } } @@ -366,72 +532,191 @@ mod tests { } #[tokio::test] - async fn evict_zero_rc_children_removes_stubs() { + async fn ensure_child_ino_is_idempotent() { let cache = test_mescloud_cache(); + let ino1 = cache.ensure_child_ino(1, OsStr::new("foo")).await; + let ino2 = cache.ensure_child_ino(1, OsStr::new("foo")).await; + assert_eq!(ino1, ino2, "same (parent, name) must return same inode"); + } - // Insert stubs as children of root (ino=1) with rc=0 - cache - .insert_icb( - 10, - InodeControlBlock { - rc: 0, - path: "child_a".into(), - parent: Some(1), - attr: None, - children: None, - }, - ) - .await; - cache - .insert_icb( - 11, - InodeControlBlock { - rc: 0, - path: "child_b".into(), - parent: Some(1), - attr: None, - children: None, - }, - ) - .await; + #[tokio::test] + async fn ensure_child_ino_concurrent_same_child() { + let cache = Arc::new(test_mescloud_cache()); + let mut handles = Vec::new(); + for _ in 0..10 { + let c = Arc::clone(&cache); + handles.push(tokio::spawn(async move { + c.ensure_child_ino(1, OsStr::new("bar")).await + })); + } + let inodes: Vec = futures::future::join_all(handles) + .await + .into_iter() + .map(|r| r.unwrap_or_else(|e| panic!("task panicked: {e}"))) + .collect(); + assert!( + inodes.iter().all(|&i| i == inodes[0]), + "all concurrent calls must return the same inode: {inodes:?}" + ); + } - // Insert a child with rc > 0 — should survive - cache - .insert_icb( - 12, - InodeControlBlock { - rc: 1, - path: "active".into(), - parent: Some(1), - attr: None, - children: None, - }, - ) - .await; + #[tokio::test] + async fn evict_cleans_child_index() { + let cache = test_mescloud_cache(); + let ino1 = cache.ensure_child_ino(1, OsStr::new("temp")).await; + // Use evict_stale_children with empty set (evicts all rc=0 children) + let evicted = cache.evict_stale_children(1, &HashSet::new()).await; + assert!(evicted.contains(&ino1)); + let ino2 = cache.ensure_child_ino(1, OsStr::new("temp")).await; + assert_ne!( + ino1, ino2, + "after eviction, a new inode should be allocated" + ); + } - // Insert a stub under a different parent — should survive - cache - .insert_icb( - 20, - InodeControlBlock { - rc: 0, - path: "other".into(), - parent: Some(12), - attr: None, - children: None, - }, - ) - .await; + #[test] + #[expect( + clippy::redundant_clone, + reason = "test exists to verify Clone is implemented" + )] + fn mescloud_icache_is_clone() { + let cache = test_mescloud_cache(); + let _clone = cache.clone(); + } + + #[tokio::test] + async fn forget_cleans_child_index() { + let cache = test_mescloud_cache(); + let ino = cache.ensure_child_ino(1, OsStr::new("ephemeral")).await; + let evicted = cache.forget(ino, 0).await; + assert!(evicted.is_some()); + let ino2 = cache.ensure_child_ino(1, OsStr::new("ephemeral")).await; + assert_ne!(ino, ino2, "child_index should have been cleaned up"); + } + + #[tokio::test] + #[expect( + clippy::similar_names, + reason = "bar_ino/baz_ino distinction is intentional" + )] + async fn evict_stale_children_preserves_current() { + use std::collections::HashSet; + + let cache = test_mescloud_cache(); + + // Insert children of root via ensure_child_ino (populates child_index) + let foo_ino = cache.ensure_child_ino(1, OsStr::new("foo")).await; + let bar_ino = cache.ensure_child_ino(1, OsStr::new("bar")).await; + let baz_ino = cache.ensure_child_ino(1, OsStr::new("baz")).await; + + // "foo" and "baz" are in the current listing; "bar" is stale + let current: HashSet = + ["foo", "baz"].iter().copied().map(OsString::from).collect(); + let evicted = cache.evict_stale_children(1, ¤t).await; + + assert_eq!(evicted, vec![bar_ino], "only stale 'bar' should be evicted"); + assert!(cache.contains(foo_ino), "current child 'foo' preserved"); + assert!(!cache.contains(bar_ino), "stale child 'bar' evicted"); + assert!(cache.contains(baz_ino), "current child 'baz' preserved"); + } + + struct PrefetchTestResolver { + listings: std::sync::Mutex>>, + } - let evicted = cache.evict_zero_rc_children(1).await; - assert_eq!(evicted.len(), 2, "should evict 2 zero-rc children of root"); + impl PrefetchTestResolver { + fn new() -> Self { + Self { + listings: std::sync::Mutex::new(std::collections::HashMap::new()), + } + } + + fn add_dir(&self, ino: Inode, children: Vec<(String, DirEntryType)>) { + self.listings.lock().unwrap().insert(ino, children); + } + } + + impl IcbResolver for PrefetchTestResolver { + type Icb = InodeControlBlock; + type Error = super::super::common::LookupError; + + fn resolve( + &self, + ino: Inode, + stub: Option, + _cache: &AsyncICache, + ) -> impl Future> + Send + where + Self: Sized, + { + let listings = self.listings.lock().unwrap().clone(); + async move { + let stub = stub.ok_or(super::super::common::LookupError::InodeNotFound)?; + let children = listings.get(&ino).cloned(); + let now = SystemTime::now(); + let attr = if children.is_some() { + FileAttr::Directory { + common: make_common_file_attr(ino, 0o755, now, now, (0, 0), 4096), + } + } else { + FileAttr::RegularFile { + common: make_common_file_attr(ino, 0o644, now, now, (0, 0), 4096), + size: 42, + blocks: 1, + } + }; + Ok(InodeControlBlock { + parent: stub.parent, + path: stub.path, + rc: stub.rc, + attr: Some(attr), + children, + }) + } + } + } + + fn test_prefetch_cache(resolver: PrefetchTestResolver) -> MescloudICache { + MescloudICache::new(resolver, 1, (0, 0), 4096) + } + + #[tokio::test] + async fn prefetch_readdir_allocates_child_inodes_and_caches_dir_attrs() { + let resolver = PrefetchTestResolver::new(); + resolver.add_dir( + 1, + vec![ + ("src".to_owned(), DirEntryType::Directory), + ("README.md".to_owned(), DirEntryType::RegularFile), + ], + ); + + let cache = test_prefetch_cache(resolver); + + cache.prefetch_readdir(1).await.unwrap(); + + // Children should have inodes allocated + let src_ino = cache.ensure_child_ino(1, OsStr::new("src")).await; + let readme_ino = cache.ensure_child_ino(1, OsStr::new("README.md")).await; + + assert!(cache.contains(src_ino), "src should exist in cache"); + assert!( + cache.contains(readme_ino), + "README.md should exist in cache" + ); + + // Directory children should have cached attr + let src_attr = cache.get_attr(src_ino).await; + assert!( + matches!(src_attr, Some(FileAttr::Directory { .. })), + "src should have directory attr cached" + ); - assert!(!cache.contains(10), "child_a should be evicted"); - assert!(!cache.contains(11), "child_b should be evicted"); - assert!(cache.contains(12), "active child should survive"); + // File children should NOT have cached attr (avoids poisoning needs_resolve) + let readme_attr = cache.get_attr(readme_ino).await; assert!( - cache.contains(20), - "child of different parent should survive" + readme_attr.is_none(), + "file children should not have attr cached by prefetch" ); } } diff --git a/src/fs/mescloud/mod.rs b/src/fs/mescloud/mod.rs index 0e32933..06298af 100644 --- a/src/fs/mescloud/mod.rs +++ b/src/fs/mescloud/mod.rs @@ -1,10 +1,10 @@ -use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; use std::time::SystemTime; use bytes::Bytes; use mesa_dev::MesaClient; +use scc::HashMap as ConcurrentHashMap; use secrecy::ExposeSecret as _; use tracing::{Instrument as _, instrument, trace, warn}; @@ -117,11 +117,10 @@ impl MesaFS { Self::BLOCK_SIZE, ), file_table: FileTable::new(), - readdir_buf: Vec::new(), - child_inodes: HashMap::new(), - inode_to_slot: HashMap::new(), - slots: orgs - .map(|org_conf| { + child_inodes: ConcurrentHashMap::new(), + inode_to_slot: ConcurrentHashMap::new(), + slots: tokio::sync::RwLock::new( + orgs.map(|org_conf| { let client = MesaClient::builder() .with_api_key(org_conf.api_key.expose_secret()) .with_base_path(MESA_API_BASE_URL) @@ -133,6 +132,7 @@ impl MesaFS { } }) .collect(), + ), }, } } @@ -142,7 +142,12 @@ impl MesaFS { if ino == Self::ROOT_NODE_INO { return Some(InodeRole::Root); } - if self.composite.child_inodes.contains_key(&ino) { + if self + .composite + .child_inodes + .read_sync(&ino, |_, _| ()) + .is_some() + { return Some(InodeRole::OrgOwned); } if self.composite.slot_for_inode(ino).is_some() { @@ -154,39 +159,39 @@ impl MesaFS { /// Ensure a mesa-level inode exists for the org at `org_idx`. /// Seeds the bridge with (`mesa_org_ino`, `OrgFs::ROOT_INO`). /// Does NOT bump rc. - async fn ensure_org_inode(&mut self, org_idx: usize) -> (Inode, FileAttr) { + async fn ensure_org_inode(&self, org_idx: usize) -> (Inode, FileAttr) { // Check if an inode already exists. - let existing_ino = self - .composite + let mut existing_ino = None; + self.composite .child_inodes - .iter() - .find(|&(_, &idx)| idx == org_idx) - .map(|(&ino, _)| ino); + .any_async(|&ino, &idx| { + if idx == org_idx { + existing_ino = Some(ino); + true + } else { + false + } + }) + .await; - if let Some(existing_ino) = existing_ino { - if let Some(attr) = self.composite.icache.get_attr(existing_ino).await { + if let Some(ino) = existing_ino { + if let Some(attr) = self.composite.icache.get_attr(ino).await { let rc = self .composite .icache - .get_icb(existing_ino, |icb| icb.rc) + .get_icb(ino, |icb| icb.rc) .await .unwrap_or(0); - trace!( - ino = existing_ino, - org_idx, rc, "ensure_org_inode: reusing existing inode" - ); - return (existing_ino, attr); + trace!(ino, org_idx, rc, "ensure_org_inode: reusing existing inode"); + return (ino, attr); } - if self.composite.icache.contains(existing_ino) { + if self.composite.icache.contains(ino) { // ICB exists but attr missing — rebuild and cache. - warn!( - ino = existing_ino, - org_idx, "ensure_org_inode: attr missing, rebuilding" - ); + warn!(ino, org_idx, "ensure_org_inode: attr missing, rebuilding"); let now = SystemTime::now(); let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( - existing_ino, + ino, 0o755, now, now, @@ -194,20 +199,21 @@ impl MesaFS { self.composite.icache.block_size(), ), }; - self.composite.icache.cache_attr(existing_ino, attr).await; - return (existing_ino, attr); + self.composite.icache.cache_attr(ino, attr).await; + return (ino, attr); } // ICB was evicted — clean up stale tracking entries. warn!( - ino = existing_ino, + ino, org_idx, "ensure_org_inode: ICB evicted, cleaning up stale entry" ); - self.composite.child_inodes.remove(&existing_ino); - self.composite.inode_to_slot.remove(&existing_ino); + self.composite.child_inodes.remove_async(&ino).await; + self.composite.inode_to_slot.remove_async(&ino).await; } // Allocate new. - let org_name = self.composite.slots[org_idx].inner.name().to_owned(); + let slots = self.composite.slots.read().await; + let org_name = slots[org_idx].inner.name().to_owned(); let ino = self.composite.icache.allocate_inode(); trace!(ino, org_idx, org = %org_name, "ensure_org_inode: allocated new inode"); @@ -226,15 +232,22 @@ impl MesaFS { ) .await; - self.composite.child_inodes.insert(ino, org_idx); - self.composite.inode_to_slot.insert(ino, org_idx); + let _ = self.composite.child_inodes.insert_async(ino, org_idx).await; + let _ = self + .composite + .inode_to_slot + .insert_async(ino, org_idx) + .await; // Reset bridge (may have stale mappings from a previous eviction cycle) // and seed: mesa org-root <-> OrgFs::ROOT_INO. - self.composite.slots[org_idx].bridge = HashMapBridge::new(); - self.composite.slots[org_idx] - .bridge - .insert_inode(ino, OrgFs::ROOT_INO); + // Need write lock to replace the bridge itself. + drop(slots); + { + let mut slots = self.composite.slots.write().await; + slots[org_idx].bridge = HashMapBridge::new(); + slots[org_idx].bridge.insert_inode(ino, OrgFs::ROOT_INO); + } let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( @@ -260,18 +273,30 @@ impl Fs for MesaFS { type ReaddirError = ReadDirError; type ReleaseError = ReleaseError; + #[instrument(name = "MesaFS::init", skip(self))] + async fn init(&self) { + let slots = self.composite.slots.read().await; + for slot in slots.iter() { + if slot.inner.name() == "github" { + continue; + } + trace!(org = slot.inner.name(), "prefetching org repo listing"); + drop(slot.inner.readdir(OrgFs::ROOT_INO).await); + } + } + #[instrument(name = "MesaFS::lookup", skip(self))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; match role { InodeRole::Root => { let org_name = name.to_str().ok_or(LookupError::InodeNotFound)?; - let org_idx = self - .composite - .slots + let slots = self.composite.slots.read().await; + let org_idx = slots .iter() .position(|s| s.inner.name() == org_name) .ok_or(LookupError::InodeNotFound)?; + drop(slots); trace!(org = org_name, "lookup: matched org"); let (ino, attr) = self.ensure_org_inode(org_idx).await; @@ -289,26 +314,22 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::getattr", skip(self))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.composite.delegated_getattr(ino).await } #[instrument(name = "MesaFS::readdir", skip(self))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { let role = self.inode_role(ino).ok_or(ReadDirError::InodeNotFound)?; match role { InodeRole::Root => { - let org_info: Vec<(usize, String)> = self - .composite - .slots + let slots = self.composite.slots.read().await; + let org_info: Vec<(usize, String)> = slots .iter() .enumerate() .map(|(idx, s)| (idx, s.inner.name().to_owned())) .collect(); + drop(slots); let mut entries = Vec::with_capacity(org_info.len()); for (org_idx, name) in &org_info { @@ -321,21 +342,20 @@ impl Fs for MesaFS { } trace!(entry_count = entries.len(), "readdir: listing orgs"); - self.composite.readdir_buf = entries; - Ok(&self.composite.readdir_buf) + Ok(entries) } InodeRole::OrgOwned => self.composite.delegated_readdir(ino).await, } } #[instrument(name = "MesaFS::open", skip(self))] - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result { self.composite.delegated_open(ino, flags).await } #[instrument(name = "MesaFS::read", skip(self))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -350,7 +370,7 @@ impl Fs for MesaFS { #[instrument(name = "MesaFS::release", skip(self))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -362,12 +382,12 @@ impl Fs for MesaFS { } #[instrument(name = "MesaFS::forget", skip(self))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { // MesaFS has no extra state to clean up on eviction (unlike OrgFs::owner_inodes). let _ = self.composite.delegated_forget(ino, nlookups).await; } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.composite.delegated_statfs()) } } diff --git a/src/fs/mescloud/org.rs b/src/fs/mescloud/org.rs index 968c748..255c489 100644 --- a/src/fs/mescloud/org.rs +++ b/src/fs/mescloud/org.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::ffi::OsStr; use std::future::Future; use std::time::SystemTime; @@ -6,6 +5,7 @@ use std::time::SystemTime; use bytes::Bytes; use futures::TryStreamExt as _; use mesa_dev::MesaClient; +use scc::HashMap as ConcurrentHashMap; use secrecy::SecretString; use tracing::{Instrument as _, instrument, trace, warn}; @@ -93,7 +93,7 @@ pub struct OrgFs { client: MesaClient, composite: CompositeFs, /// Maps org-level owner-dir inodes to owner name (github only). - owner_inodes: HashMap, + owner_inodes: ConcurrentHashMap, } impl OrgFs { @@ -121,37 +121,42 @@ impl OrgFs { /// Ensure an inode exists for a virtual owner directory (github only). Does NOT bump rc. /// TODO(MES-674): Cleanup "special" casing for github. - async fn ensure_owner_inode(&mut self, owner: &str) -> (Inode, FileAttr) { + async fn ensure_owner_inode(&self, owner: &str) -> (Inode, FileAttr) { // Check existing - let mut stale_ino = None; - for (&ino, existing_owner) in &self.owner_inodes { - if existing_owner == owner { - if let Some(attr) = self.composite.icache.get_attr(ino).await { - return (ino, attr); - } - if self.composite.icache.contains(ino) { - // ICB exists but attr missing — rebuild and cache - let now = SystemTime::now(); - let attr = FileAttr::Directory { - common: mescloud_icache::make_common_file_attr( - ino, - 0o755, - now, - now, - self.composite.icache.fs_owner(), - self.composite.icache.block_size(), - ), - }; - self.composite.icache.cache_attr(ino, attr).await; - return (ino, attr); + let mut existing_ino_found = None; + self.owner_inodes + .any_async(|&ino, existing_owner| { + if existing_owner == owner { + existing_ino_found = Some(ino); + true + } else { + false } - // ICB was evicted — mark for cleanup - stale_ino = Some(ino); - break; + }) + .await; + + if let Some(existing_ino) = existing_ino_found { + if let Some(attr) = self.composite.icache.get_attr(existing_ino).await { + return (existing_ino, attr); } - } - if let Some(ino) = stale_ino { - self.owner_inodes.remove(&ino); + if self.composite.icache.contains(existing_ino) { + // ICB exists but attr missing — rebuild and cache + let now = SystemTime::now(); + let attr = FileAttr::Directory { + common: mescloud_icache::make_common_file_attr( + existing_ino, + 0o755, + now, + now, + self.composite.icache.fs_owner(), + self.composite.icache.block_size(), + ), + }; + self.composite.icache.cache_attr(existing_ino, attr).await; + return (existing_ino, attr); + } + // ICB was evicted — mark for cleanup + self.owner_inodes.remove_async(&existing_ino).await; } // Allocate new @@ -170,7 +175,7 @@ impl OrgFs { }, ) .await; - self.owner_inodes.insert(ino, owner.to_owned()); + drop(self.owner_inodes.insert_async(ino, owner.to_owned()).await); let attr = FileAttr::Directory { common: mescloud_icache::make_common_file_attr( ino, @@ -197,12 +202,11 @@ impl OrgFs { composite: CompositeFs { icache: MescloudICache::new(resolver, Self::ROOT_INO, fs_owner, Self::BLOCK_SIZE), file_table: FileTable::new(), - readdir_buf: Vec::new(), - child_inodes: HashMap::new(), - inode_to_slot: HashMap::new(), - slots: Vec::new(), + child_inodes: ConcurrentHashMap::new(), + inode_to_slot: ConcurrentHashMap::new(), + slots: tokio::sync::RwLock::new(Vec::new()), }, - owner_inodes: HashMap::new(), + owner_inodes: ConcurrentHashMap::new(), } } @@ -211,10 +215,15 @@ impl OrgFs { if ino == Self::ROOT_INO { return Some(InodeRole::OrgRoot); } - if self.owner_inodes.contains_key(&ino) { + if self.owner_inodes.read_sync(&ino, |_, _| ()).is_some() { return Some(InodeRole::OwnerDir); } - if self.composite.child_inodes.contains_key(&ino) { + if self + .composite + .child_inodes + .read_sync(&ino, |_, _| ()) + .is_some() + { return Some(InodeRole::RepoOwned); } if self.composite.slot_for_inode(ino).is_some() { @@ -230,15 +239,28 @@ impl OrgFs { /// - `display_name`: name shown in filesystem ("linux" for github, same as `repo_name` otherwise) /// - `parent_ino`: owner-dir inode for github, `ROOT_INO` otherwise async fn ensure_repo_inode( - &mut self, + &self, repo_name: &str, display_name: &str, default_branch: &str, parent_ino: Inode, ) -> (Inode, FileAttr) { // Check existing repos. - for (&ino, &idx) in &self.composite.child_inodes { - if self.composite.slots[idx].inner.repo_name() == repo_name { + { + let slots = self.composite.slots.read().await; + let mut found = None; + self.composite + .child_inodes + .any_async(|&ino, &idx| { + if slots[idx].inner.repo_name() == repo_name { + found = Some((ino, idx)); + true + } else { + false + } + }) + .await; + if let Some((ino, _)) = found { if let Some(attr) = self.composite.icache.get_attr(ino).await { let rc = self .composite @@ -256,19 +278,15 @@ impl OrgFs { ); return self.make_repo_dir_attr(ino).await; } - } - // Check for orphaned slot (slot exists but not in child_inodes). - if let Some(idx) = self - .composite - .slots - .iter() - .position(|s| s.inner.repo_name() == repo_name) - { - return self.register_repo_slot(idx, display_name, parent_ino).await; + // Check for orphaned slot (slot exists but not in child_inodes). + if let Some(idx) = slots.iter().position(|s| s.inner.repo_name() == repo_name) { + drop(slots); + return self.register_repo_slot(idx, display_name, parent_ino).await; + } } - // Allocate truly new slot. + // Allocate truly new slot. Need write lock. let ino = self.composite.icache.allocate_inode(); trace!( ino, @@ -298,16 +316,19 @@ impl OrgFs { self.composite.icache.fs_owner(), ); - let mut bridge = HashMapBridge::new(); + let bridge = HashMapBridge::new(); bridge.insert_inode(ino, RepoFs::ROOT_INO); - let idx = self.composite.slots.len(); - self.composite.slots.push(ChildSlot { + let mut slots = self.composite.slots.write().await; + let idx = slots.len(); + slots.push(ChildSlot { inner: repo, bridge, }); - self.composite.child_inodes.insert(ino, idx); - self.composite.inode_to_slot.insert(ino, idx); + drop(slots); + + let _ = self.composite.child_inodes.insert_async(ino, idx).await; + let _ = self.composite.inode_to_slot.insert_async(ino, idx).await; self.make_repo_dir_attr(ino).await } @@ -315,7 +336,7 @@ impl OrgFs { /// Allocate a new inode, register it in an existing (orphaned) slot, and /// return `(ino, attr)`. async fn register_repo_slot( - &mut self, + &self, idx: usize, display_name: &str, parent_ino: Inode, @@ -343,12 +364,19 @@ impl OrgFs { "register_repo_slot: resetting bridge for orphaned slot; \ inner filesystem will not receive forget for stale inode mappings" ); - self.composite.slots[idx].bridge = HashMapBridge::new(); - self.composite.slots[idx] - .bridge - .insert_inode(ino, RepoFs::ROOT_INO); - self.composite.child_inodes.insert(ino, idx); - self.composite.inode_to_slot.insert(ino, idx); + let slots = self.composite.slots.read().await; + // Reset bridge by replacing its internal state via insert_inode + // (HashMapBridge doesn't expose a reset method, so we create a new one + // and write through the RwLock. Since ChildSlot's bridge uses interior + // mutability, we need write lock to replace the bridge itself.) + drop(slots); + { + let mut slots = self.composite.slots.write().await; + slots[idx].bridge = HashMapBridge::new(); + slots[idx].bridge.insert_inode(ino, RepoFs::ROOT_INO); + } + let _ = self.composite.child_inodes.insert_async(ino, idx).await; + let _ = self.composite.inode_to_slot.insert_async(ino, idx).await; self.make_repo_dir_attr(ino).await } @@ -402,7 +430,7 @@ impl Fs for OrgFs { type ReleaseError = ReleaseError; #[instrument(name = "OrgFs::lookup", skip(self), fields(org = %self.name))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { let role = self.inode_role(parent).ok_or(LookupError::InodeNotFound)?; match role { InodeRole::OrgRoot => { @@ -444,9 +472,9 @@ impl Fs for OrgFs { // Parent is an owner dir, name is a repo like "linux". let owner = self .owner_inodes - .get(&parent) - .ok_or(LookupError::InodeNotFound)? - .clone(); + .read_async(&parent, |_, v| v.clone()) + .await + .ok_or(LookupError::InodeNotFound)?; let repo_name_str = name.to_str().ok_or(LookupError::InodeNotFound)?; let full_decoded = format!("{owner}/{repo_name_str}"); let encoded = Self::encode_github_repo_name(&full_decoded); @@ -476,16 +504,12 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::getattr", skip(self), fields(org = %self.name))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.composite.delegated_getattr(ino).await } #[instrument(name = "OrgFs::readdir", skip(self), fields(org = %self.name))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { let role = self.inode_role(ino).ok_or(ReadDirError::InodeNotFound)?; match role { InodeRole::OrgRoot => { @@ -526,8 +550,24 @@ impl Fs for OrgFs { }); } - self.composite.readdir_buf = entries; - Ok(&self.composite.readdir_buf) + { + let slots = self.composite.slots.read().await; + let prefetch_count = entries + .iter() + .filter_map(|e| { + self.composite.child_inodes.read_sync(&e.ino, |_, &idx| idx) + }) + .inspect(|&idx| slots[idx].inner.prefetch_root()) + .count(); + if prefetch_count > 0 { + trace!( + count = prefetch_count, + "readdir: prefetching repo root directories" + ); + } + } + + Ok(entries) } InodeRole::OwnerDir if self.is_github() => { // TODO(MES-674): Cleanup "special" casing for github. @@ -539,13 +579,13 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::open", skip(self), fields(org = %self.name))] - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result { self.composite.delegated_open(ino, flags).await } #[instrument(name = "OrgFs::read", skip(self), fields(org = %self.name))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -560,7 +600,7 @@ impl Fs for OrgFs { #[instrument(name = "OrgFs::release", skip(self), fields(org = %self.name))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -572,14 +612,14 @@ impl Fs for OrgFs { } #[instrument(name = "OrgFs::forget", skip(self), fields(org = %self.name))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { let evicted = self.composite.delegated_forget(ino, nlookups).await; if evicted { - self.owner_inodes.remove(&ino); + self.owner_inodes.remove_async(&ino).await; } } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.composite.delegated_statfs()) } } diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 0d22196..0857d5a 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -2,14 +2,17 @@ //! //! This module directly accesses the mesa repo through the Rust SDK, on a per-repo basis. +use std::collections::HashSet; +use std::ffi::OsString; use std::future::Future; -use std::{collections::HashMap, ffi::OsStr, path::PathBuf, time::SystemTime}; +use std::{ffi::OsStr, path::PathBuf, time::SystemTime}; use base64::Engine as _; use bytes::Bytes; use mesa_dev::MesaClient; use mesa_dev::low_level::content::{Content, DirEntry as MesaDirEntry}; use num_traits::cast::ToPrimitive as _; +use scc::HashMap as ConcurrentHashMap; use tracing::{Instrument as _, instrument, trace, warn}; use crate::fs::icache::{AsyncICache, FileTable, IcbResolver}; @@ -58,6 +61,12 @@ impl IcbResolver for RepoResolver { let stub = stub.ok_or(LookupError::InodeNotFound)?; let file_path = build_repo_path(stub.parent, &stub.path, cache, RepoFs::ROOT_INO).await; + trace!( + ino, + path = file_path.as_deref().unwrap_or(""), + "resolver: fetching content" + ); + // Non-root inodes must have a resolvable path. if stub.parent.is_some() && file_path.is_none() { return Err(LookupError::InodeNotFound); @@ -181,8 +190,7 @@ pub struct RepoFs { icache: MescloudICache, file_table: FileTable, - readdir_buf: Vec, - open_files: HashMap, + open_files: ConcurrentHashMap, } impl RepoFs { @@ -212,8 +220,7 @@ impl RepoFs { ref_, icache: MescloudICache::new(resolver, Self::ROOT_INO, fs_owner, Self::BLOCK_SIZE), file_table: FileTable::new(), - readdir_buf: Vec::new(), - open_files: HashMap::new(), + open_files: ConcurrentHashMap::new(), } } @@ -258,6 +265,13 @@ impl RepoFs { let joined: PathBuf = components.iter().collect(); joined.to_str().map(String::from) } + + /// Spawn a background task to prefetch the root directory listing so + /// subsequent readdir hits cache. + pub(crate) fn prefetch_root(&self) { + trace!(repo = %self.repo_name, "prefetch_root: warming root directory cache"); + self.icache.spawn_prefetch_readdir([Self::ROOT_INO]); + } } #[async_trait::async_trait] @@ -277,7 +291,7 @@ impl Fs for RepoFs { type ReleaseError = ReleaseError; #[instrument(name = "RepoFs::lookup", skip(self), fields(repo = %self.repo_name))] - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result { debug_assert!( self.icache.contains(parent), "lookup: parent inode {parent} not in inode table" @@ -300,11 +314,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::getattr", skip(self), fields(repo = %self.repo_name))] - async fn getattr( - &mut self, - ino: Inode, - _fh: Option, - ) -> Result { + async fn getattr(&self, ino: Inode, _fh: Option) -> Result { self.icache.get_attr(ino).await.ok_or_else(|| { warn!(ino, "getattr on unknown inode"); GetAttrError::InodeNotFound @@ -312,7 +322,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::readdir", skip(self), fields(repo = %self.repo_name))] - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { + async fn readdir(&self, ino: Inode) -> Result, ReadDirError> { debug_assert!( self.icache.contains(ino), "readdir: inode {ino} not in inode table" @@ -337,7 +347,18 @@ impl Fs for RepoFs { "readdir: resolved directory listing from icache" ); - self.icache.evict_zero_rc_children(ino).await; + let current_names: HashSet = children + .iter() + .map(|(name, _)| OsString::from(name)) + .collect(); + let evicted = self.icache.evict_stale_children(ino, ¤t_names).await; + if !evicted.is_empty() { + trace!( + ino, + evicted_count = evicted.len(), + "readdir: evicted stale children" + ); + } let mut entries = Vec::with_capacity(children.len()); for (name, kind) in &children { @@ -367,12 +388,25 @@ impl Fs for RepoFs { }); } - self.readdir_buf = entries; - Ok(&self.readdir_buf) + let subdir_inodes: Vec = entries + .iter() + .filter(|e| e.kind == DirEntryType::Directory) + .map(|e| e.ino) + .collect(); + if !subdir_inodes.is_empty() { + trace!( + ino, + subdir_count = subdir_inodes.len(), + "readdir: prefetching subdirectory readdirs" + ); + self.icache.spawn_prefetch_readdir(subdir_inodes); + } + + Ok(entries) } #[instrument(name = "RepoFs::open", skip(self), fields(repo = %self.repo_name))] - async fn open(&mut self, ino: Inode, _flags: OpenFlags) -> Result { + async fn open(&self, ino: Inode, _flags: OpenFlags) -> Result { if !self.icache.contains(ino) { warn!(ino, "open on unknown inode"); return Err(OpenError::InodeNotFound); @@ -385,7 +419,7 @@ impl Fs for RepoFs { "open: inode {ino} has non-file cached attr" ); let fh = self.file_table.allocate(); - self.open_files.insert(fh, ino); + let _ = self.open_files.insert_async(fh, ino).await; trace!(ino, fh, "assigned file handle"); Ok(OpenFile { handle: fh, @@ -395,7 +429,7 @@ impl Fs for RepoFs { #[instrument(name = "RepoFs::read", skip(self), fields(repo = %self.repo_name))] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -403,10 +437,14 @@ impl Fs for RepoFs { _flags: OpenFlags, _lock_owner: Option, ) -> Result { - let &file_ino = self.open_files.get(&fh).ok_or_else(|| { - warn!(fh, "read on unknown file handle"); - ReadError::FileNotOpen - })?; + let file_ino = self + .open_files + .read_async(&fh, |_, &file_ino| file_ino) + .await + .ok_or_else(|| { + warn!(fh, "read on unknown file handle"); + ReadError::FileNotOpen + })?; debug_assert!( file_ino == ino, "read: file handle {fh} maps to inode {file_ino}, but caller passed inode {ino}" @@ -457,13 +495,13 @@ impl Fs for RepoFs { #[instrument(name = "RepoFs::release", skip(self), fields(repo = %self.repo_name))] async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, _flags: OpenFlags, _flush: bool, ) -> Result<(), ReleaseError> { - let released_ino = self.open_files.remove(&fh).ok_or_else(|| { + let (_, released_ino) = self.open_files.remove_async(&fh).await.ok_or_else(|| { warn!(fh, "release on unknown file handle"); ReleaseError::FileNotOpen })?; @@ -476,7 +514,7 @@ impl Fs for RepoFs { } #[instrument(name = "RepoFs::forget", skip(self), fields(repo = %self.repo_name))] - async fn forget(&mut self, ino: Inode, nlookups: u64) { + async fn forget(&self, ino: Inode, nlookups: u64) { debug_assert!( self.icache.contains(ino), "forget: inode {ino} not in inode table" @@ -485,7 +523,7 @@ impl Fs for RepoFs { self.icache.forget(ino, nlookups).await; } - async fn statfs(&mut self) -> Result { + async fn statfs(&self) -> Result { Ok(self.icache.statfs()) } } diff --git a/src/fs/trait.rs b/src/fs/trait.rs index f4d9852..1758e53 100644 --- a/src/fs/trait.rs +++ b/src/fs/trait.rs @@ -320,36 +320,40 @@ pub struct FilesystemStats { } #[async_trait] -pub trait Fs { - type LookupError: std::error::Error; - type GetAttrError: std::error::Error; - type OpenError: std::error::Error; - type ReadError: std::error::Error; - type ReaddirError: std::error::Error; - type ReleaseError: std::error::Error; +pub trait Fs: Send + Sync + 'static { + type LookupError: std::error::Error + Send + 'static; + type GetAttrError: std::error::Error + Send + 'static; + type OpenError: std::error::Error + Send + 'static; + type ReadError: std::error::Error + Send + 'static; + type ReaddirError: std::error::Error + Send + 'static; + type ReleaseError: std::error::Error + Send + 'static; + + /// Called once after mount, before any FUSE operations. + /// Override to perform startup work like prefetching. + async fn init(&self) {} /// For each lookup call made by the kernel, it expects the icache to be updated with the /// returned `FileAttr`. - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result; + async fn lookup(&self, parent: Inode, name: &OsStr) -> Result; /// Can be called in two contexts -- the file is not open (in which case `fh` is `None`), /// or the file is open (in which case `fh` is `Some`). async fn getattr( - &mut self, + &self, ino: Inode, fh: Option, ) -> Result; /// Read the contents of a directory. - async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], Self::ReaddirError>; + async fn readdir(&self, ino: Inode) -> Result, Self::ReaddirError>; /// Open a file for reading. - async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result; + async fn open(&self, ino: Inode, flags: OpenFlags) -> Result; /// Read data from an open file. #[expect(clippy::too_many_arguments, reason = "mirrors fuser read API")] async fn read( - &mut self, + &self, ino: Inode, fh: FileHandle, offset: u64, @@ -360,7 +364,7 @@ pub trait Fs { /// Called when the kernel closes a file handle. async fn release( - &mut self, + &self, ino: Inode, fh: FileHandle, flags: OpenFlags, @@ -368,8 +372,8 @@ pub trait Fs { ) -> Result<(), Self::ReleaseError>; /// Called when the kernel is done with an inode. - async fn forget(&mut self, ino: Inode, nlookups: u64); + async fn forget(&self, ino: Inode, nlookups: u64); /// Get filesystem statistics. - async fn statfs(&mut self) -> Result; + async fn statfs(&self) -> Result; } diff --git a/src/trc.rs b/src/trc.rs index a504362..a8abbc5 100644 --- a/src/trc.rs +++ b/src/trc.rs @@ -139,7 +139,7 @@ impl Trc { match self.mode { TrcMode::丑 { .. } => { - let indicatif_layer = IndicatifLayer::new(); + let indicatif_layer = IndicatifLayer::new().with_max_progress_bars(20, None); let pretty_with_indicatif: BoxedFmtLayer = Box::new( tracing_subscriber::fmt::layer() .with_ansi(use_ansi)