diff --git a/.github/workflows/promote-to-latest.yml b/.github/workflows/promote-to-latest.yml index 8cb4d77..223e89b 100644 --- a/.github/workflows/promote-to-latest.yml +++ b/.github/workflows/promote-to-latest.yml @@ -133,60 +133,16 @@ jobs: with: persist-credentials: false - - name: Install Homebrew - uses: Homebrew/actions/setup-homebrew@master - - - name: Download macOS universal tarball and compute SHA256 - id: sha - env: - GH_TOKEN: ${{ github.token }} - run: | - TAG="v${{ needs.promote.outputs.base_version }}" - curl -fSL -o git-fs-macos-universal.tar.gz \ - "https://github.com/${{ github.repository }}/releases/download/${TAG}/git-fs-macos-universal.tar.gz" - SHA=$(sha256sum git-fs-macos-universal.tar.gz | cut -d' ' -f1) - echo "sha256=${SHA}" >> "$GITHUB_OUTPUT" - - - name: Clone Homebrew tap - uses: actions/checkout@v4 + - uses: mesa-dot-dev/homebrew-tap-action@main with: - repository: mesa-dot-dev/homebrew-tap - token: ${{ secrets.HOMEBREW_TAP_TOKEN }} - path: tap - ref: staged - - - name: Compute Homebrew class name - id: class - env: - VERSION: ${{ needs.promote.outputs.base_version }} - run: | - CLASS_NAME=$(brew ruby -e "puts Formulary.class_s('git-fs@${VERSION}')") - echo "class_name=${CLASS_NAME}" >> "$GITHUB_OUTPUT" - - - name: Generate formulae - env: - VERSION: ${{ needs.promote.outputs.base_version }} - SHA256: ${{ steps.sha.outputs.sha256 }} - CLASS_NAME: ${{ steps.class.outputs.class_name }} - run: | - if [ ! -d tap ]; then - echo "Error: tap directory does not exist — clone may have failed" - exit 1 - fi - mkdir -p tap/Formula - - # Latest formula (class name is always GitFs) - CLASS_NAME=GitFs envsubst '${CLASS_NAME} ${VERSION} ${SHA256}' < formula.rb > tap/Formula/git-fs.rb - - # Versioned formula (e.g., Formula/git-fs@0.1.1-alpha.1.rb) - envsubst '${CLASS_NAME} ${VERSION} ${SHA256}' < formula.rb > "tap/Formula/git-fs@${VERSION}.rb" - - - name: Commit and push formulae - working-directory: tap - run: | - git config user.name "mesa-ci[bot]" - git config user.email "ci-bot@mesa.dev" - git add Formula/ - git diff --cached --quiet && echo "No changes to commit" && exit 0 - git commit -m "git-fs ${{ needs.promote.outputs.base_version }}" - git push origin staged + tap: mesa-dot-dev/homebrew-tap + tap-token: ${{ secrets.HOMEBREW_TAP_TOKEN }} + tap-branch: staged + template: formula.rb + latest-path: Formula/git-fs.rb + versioned-path: "Formula/git-fs@${VERSION}.rb" + version: ${{ needs.promote.outputs.base_version }} + url: "https://github.com/${{ github.repository }}/releases/download/v${{ needs.promote.outputs.base_version }}/git-fs-macos-universal.tar.gz" + commit-message: "git-fs ${VERSION}" + committer-name: "mesa-ci[bot]" + committer-email: ci-bot@mesa.dev diff --git a/formula.rb b/formula.rb index 6610200..e93bb79 100644 --- a/formula.rb +++ b/formula.rb @@ -1,4 +1,4 @@ -class ${CLASS_NAME} < Formula +class ${FORMULA_CLASS_NAME} < Formula desc "Mount Mesa, GitHub and GitLab repositories as local filesystems via FUSE" homepage "https://github.com/mesa-dot-dev/git-fs" url "https://github.com/mesa-dot-dev/git-fs/releases/download/v${VERSION}/git-fs-macos-universal.tar.gz" diff --git a/src/fs/cache_tracker.rs b/src/fs/cache_tracker.rs new file mode 100644 index 0000000..d42381b --- /dev/null +++ b/src/fs/cache_tracker.rs @@ -0,0 +1,242 @@ +//! Concurrent cache size and staleness tracker. +//! +//! [`CacheTracker`] records which paths are cached, their estimated sizes, and +//! when they were inserted. It is safe to use from many async tasks +//! simultaneously because every method takes `&self`. +#![allow(dead_code, reason = "consumed by WriteThroughFs in Tasks 4-6")] + +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +use scc::HashMap as ConcurrentHashMap; + +struct CacheEntry { + inserted_at: Instant, + estimated_size: u64, +} + +/// Tracks cached file paths, their estimated sizes, and insertion times. +/// +/// All methods take `&self`, making the tracker safe for concurrent use from +/// multiple async tasks without external synchronisation. +pub struct CacheTracker { + entries: ConcurrentHashMap, + total_bytes: AtomicU64, +} + +impl CacheTracker { + /// Create a new, empty tracker. + #[must_use] + pub fn new() -> Self { + Self { + entries: ConcurrentHashMap::new(), + total_bytes: AtomicU64::new(0), + } + } + + /// Record a cached path with its estimated size. + /// + /// If the path was already tracked, its entry is replaced and + /// `total_bytes` is adjusted to reflect the new size. + pub async fn track(&self, path: PathBuf, size: u64) { + use scc::hash_map::Entry; + + match self.entries.entry_async(path).await { + Entry::Vacant(vac) => { + vac.insert_entry(CacheEntry { + inserted_at: Instant::now(), + estimated_size: size, + }); + self.total_bytes.fetch_add(size, Ordering::Relaxed); + } + Entry::Occupied(mut occ) => { + let old_size = occ.get().estimated_size; + occ.get_mut().estimated_size = size; + occ.get_mut().inserted_at = Instant::now(); + + // Adjust total: subtract old, add new. + if old_size <= size { + self.total_bytes + .fetch_add(size - old_size, Ordering::Relaxed); + } else { + self.total_bytes + .fetch_sub(old_size - size, Ordering::Relaxed); + } + } + } + } + + /// Remove a path from tracking, reducing `total_bytes` accordingly. + /// + /// If the path is not tracked this is a no-op. + pub async fn untrack(&self, path: &Path) { + if let Some((_, entry)) = self.entries.remove_async(path).await { + self.total_bytes + .fetch_sub(entry.estimated_size, Ordering::Relaxed); + } + } + + /// Total estimated bytes across all tracked entries. + #[must_use] + pub fn estimated_size(&self) -> u64 { + self.total_bytes.load(Ordering::Relaxed) + } + + /// Number of tracked entries. + #[must_use] + pub fn entry_count(&self) -> usize { + self.entries.len() + } + + /// Collect paths whose cache entry is older than `max_age`. + pub async fn stale_entries(&self, max_age: Duration) -> Vec { + let Some(cutoff) = Instant::now().checked_sub(max_age) else { + // max_age exceeds process uptime -- nothing can be that old. + return Vec::new(); + }; + let mut stale = Vec::new(); + + self.entries + .iter_async(|key, value| { + if value.inserted_at < cutoff { + stale.push(key.clone()); + } + true + }) + .await; + + stale + } +} + +impl Default for CacheTracker { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn new_tracker_has_zero_size() { + let tracker = CacheTracker::new(); + assert_eq!(tracker.estimated_size(), 0, "fresh tracker should be empty"); + assert_eq!( + tracker.entry_count(), + 0, + "fresh tracker should have no entries" + ); + } + + #[tokio::test] + async fn track_increases_estimated_size() { + let tracker = CacheTracker::new(); + tracker.track(PathBuf::from("/a"), 100).await; + tracker.track(PathBuf::from("/b"), 250).await; + + assert_eq!( + tracker.estimated_size(), + 350, + "total should be sum of tracked sizes" + ); + assert_eq!(tracker.entry_count(), 2, "should have two entries"); + } + + #[tokio::test] + async fn track_same_path_replaces_size() { + let tracker = CacheTracker::new(); + tracker.track(PathBuf::from("/a"), 100).await; + tracker.track(PathBuf::from("/a"), 60).await; + + assert_eq!( + tracker.estimated_size(), + 60, + "size should reflect replacement" + ); + assert_eq!( + tracker.entry_count(), + 1, + "duplicate path should not create second entry" + ); + } + + #[tokio::test] + async fn untrack_decreases_size() { + let tracker = CacheTracker::new(); + tracker.track(PathBuf::from("/a"), 100).await; + tracker.track(PathBuf::from("/b"), 200).await; + tracker.untrack(Path::new("/a")).await; + + assert_eq!(tracker.estimated_size(), 200, "only /b should remain"); + assert_eq!( + tracker.entry_count(), + 1, + "should have one entry after untrack" + ); + } + + #[tokio::test] + async fn untrack_missing_path_is_noop() { + let tracker = CacheTracker::new(); + tracker.track(PathBuf::from("/a"), 100).await; + tracker.untrack(Path::new("/nonexistent")).await; + + assert_eq!(tracker.estimated_size(), 100, "size should be unchanged"); + assert_eq!(tracker.entry_count(), 1, "entry count should be unchanged"); + } + + #[tokio::test] + async fn stale_entries_returns_old_paths() { + let tracker = CacheTracker::new(); + tracker.track(PathBuf::from("/old"), 50).await; + + // Wait long enough for the entry to become stale. + tokio::time::sleep(Duration::from_millis(50)).await; + + tracker.track(PathBuf::from("/fresh"), 50).await; + + let stale = tracker.stale_entries(Duration::from_millis(25)).await; + assert!( + stale.contains(&PathBuf::from("/old")), + "/old should be stale" + ); + assert!( + !stale.contains(&PathBuf::from("/fresh")), + "/fresh should not be stale" + ); + } + + #[tokio::test] + async fn stale_entries_returns_empty_when_all_fresh() { + let tracker = CacheTracker::new(); + tracker.track(PathBuf::from("/a"), 10).await; + tracker.track(PathBuf::from("/b"), 20).await; + + let stale = tracker.stale_entries(Duration::from_secs(60)).await; + assert!( + stale.is_empty(), + "no entries should be stale with a 60s threshold" + ); + } + + #[tokio::test] + async fn entry_count_tracks_insertions_and_removals() { + let tracker = CacheTracker::new(); + assert_eq!(tracker.entry_count(), 0, "starts at zero"); + + tracker.track(PathBuf::from("/a"), 1).await; + assert_eq!(tracker.entry_count(), 1, "one after first insert"); + + tracker.track(PathBuf::from("/b"), 2).await; + assert_eq!(tracker.entry_count(), 2, "two after second insert"); + + tracker.untrack(Path::new("/a")).await; + assert_eq!(tracker.entry_count(), 1, "back to one after removal"); + + tracker.untrack(Path::new("/b")).await; + assert_eq!(tracker.entry_count(), 0, "zero after removing all"); + } +} diff --git a/src/fs/local.rs b/src/fs/local.rs index 73e41de..c621a0e 100644 --- a/src/fs/local.rs +++ b/src/fs/local.rs @@ -1,17 +1,22 @@ //! An implementation of a filesystem that directly overlays the host filesystem. +#![allow(dead_code, reason = "LocalFs consumed by WriteThroughFs in Tasks 4-6")] + +use std::collections::HashMap; +use std::ffi::OsStr; +use std::future::Future; +use std::path::{Path, PathBuf}; + +use async_trait::async_trait; use bytes::Bytes; use nix::sys::statvfs::statvfs; -use std::{collections::HashMap, path::PathBuf}; use thiserror::Error; use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _}; - -use std::ffi::OsStr; use tracing::warn; -use crate::fs::icache::{ICache, IcbLike}; +use crate::fs::icache::{AsyncICache, FileTable, IcbLike, IcbResolver}; use crate::fs::r#trait::{ - DirEntry, FileAttr, FileHandle, FileOpenOptions, FilesystemStats, Fs, Inode, LockOwner, - OpenFile, OpenFlags, + DirEntry, FileAttr, FileHandle, FileOpenOptions, FilesystemStats, Fs, FsCacheProvider, Inode, + LockOwner, OpenFile, OpenFlags, }; #[derive(Debug, Error)] @@ -20,7 +25,6 @@ pub enum LookupError { InodeNotFound, #[error("io error: {0}")] Io(#[from] std::io::Error), - #[error("invalid file type")] InvalidFileType, } @@ -39,10 +43,8 @@ impl From for i32 { pub enum GetAttrError { #[error("inode not found")] InodeNotFound, - #[error("io error: {0}")] Io(#[from] std::io::Error), - #[error("invalid file type")] InvalidFileType, } @@ -61,7 +63,6 @@ impl From for i32 { pub enum OpenError { #[error("inode not found")] InodeNotFound, - #[error("io error: {0}")] Io(#[from] std::io::Error), } @@ -79,10 +80,8 @@ impl From for i32 { pub enum ReadError { #[error("inode not found")] InodeNotFound, - #[error("file not open")] FileNotOpen, - #[error("io error: {0}")] Io(#[from] std::io::Error), } @@ -115,10 +114,8 @@ impl From for i32 { pub enum ReadDirError { #[error("inode not found")] InodeNotFound, - #[error("io error: {0}")] Io(#[from] std::io::Error), - #[error("invalid file type")] InvalidFileType, } @@ -133,10 +130,11 @@ impl From for i32 { } } +#[derive(Clone)] struct InodeControlBlock { - pub rc: u64, - pub path: PathBuf, - pub children: Option>, + rc: u64, + path: PathBuf, + children: Option>, } impl IcbLike for InodeControlBlock { @@ -155,34 +153,61 @@ impl IcbLike for InodeControlBlock { fn rc_mut(&mut self) -> &mut u64 { &mut self.rc } + + fn needs_resolve(&self) -> bool { + false + } +} + +struct LocalResolver; + +impl IcbResolver for LocalResolver { + type Icb = InodeControlBlock; + type Error = std::convert::Infallible; + + #[expect( + clippy::manual_async_fn, + reason = "must match IcbResolver trait signature" + )] + fn resolve( + &self, + _ino: Inode, + _stub: Option, + _cache: &AsyncICache, + ) -> impl Future> + Send { + async { unreachable!("LocalResolver should never be called") } + } } pub struct LocalFs { - icache: ICache, + root_path: PathBuf, + icache: AsyncICache, + file_table: FileTable, open_files: HashMap, + readdir_buf: Vec, } impl LocalFs { - #[expect(dead_code, reason = "alternative filesystem implementation")] + #[must_use] pub fn new(abs_path: impl Into) -> Self { + let root_path: PathBuf = abs_path.into(); Self { - icache: ICache::new(1, abs_path), + icache: AsyncICache::new(LocalResolver, 1, &root_path), + root_path, + file_table: FileTable::new(), open_files: HashMap::new(), + readdir_buf: Vec::new(), } } - fn abspath(&self) -> &PathBuf { - &self - .icache - .get_icb(1) - .unwrap_or_else(|| unreachable!("root inode 1 must always exist in inode_table")) - .path + pub fn root_path(&self) -> &Path { + &self.root_path } async fn parse_tokio_dirent( dir_entry: &tokio::fs::DirEntry, ) -> Result { - return Ok(DirEntry { + Ok(DirEntry { ino: dir_entry.ino(), name: dir_entry.file_name(), kind: dir_entry.file_type().await?.try_into().map_err(|()| { @@ -191,11 +216,11 @@ impl LocalFs { "invalid file type in directory entry", ) })?, - }); + }) } } -#[async_trait::async_trait] +#[async_trait] impl Fs for LocalFs { type LookupError = LookupError; type GetAttrError = GetAttrError; @@ -209,7 +234,8 @@ impl Fs for LocalFs { self.icache.contains(parent), "parent inode {parent} not in inode_table" ); - let parent_icb = self.icache.get_icb(parent).ok_or_else(|| { + let parent_path = self.icache.get_icb(parent, |icb| icb.path.clone()).await; + let parent_path = parent_path.ok_or_else(|| { warn!( "Lookup called on unknown parent inode {}. This is a programming bug", parent @@ -217,7 +243,7 @@ impl Fs for LocalFs { LookupError::InodeNotFound })?; - let child_path = parent_icb.path.join(name); + let child_path = parent_path.join(name); let meta = tokio::fs::metadata(&child_path) .await .map_err(LookupError::Io)?; @@ -226,14 +252,19 @@ impl Fs for LocalFs { debug_assert!(file_attr.is_ok(), "FileAttr conversion failed unexpectedly"); let file_attr = file_attr?; - let icb = self - .icache - .entry_or_insert_icb(file_attr.common().ino, || InodeControlBlock { - rc: 0, - path: child_path, - children: None, - }); - *icb.rc_mut() += 1; + self.icache + .entry_or_insert_icb( + file_attr.common().ino, + || InodeControlBlock { + rc: 0, + path: child_path, + children: None, + }, + |icb| { + *icb.rc_mut() += 1; + }, + ) + .await; Ok(file_attr) } @@ -244,7 +275,6 @@ impl Fs for LocalFs { fh: Option, ) -> Result { if let Some(fh) = fh { - // The file was already opened, we can just call fstat. debug_assert!( self.open_files.contains_key(&fh), "file handle {fh} not in open_files" @@ -263,9 +293,9 @@ impl Fs for LocalFs { Ok(file_attr?) } else { - // No open path, so we have to do a painful stat on the path. debug_assert!(self.icache.contains(ino), "inode {ino} not in inode_table"); - let icb = self.icache.get_icb(ino).ok_or_else(|| { + let path = self.icache.get_icb(ino, |icb| icb.path.clone()).await; + let path = path.ok_or_else(|| { warn!( "GetAttr called on unknown inode {}. This is a programming bug", ino @@ -273,9 +303,7 @@ impl Fs for LocalFs { GetAttrError::InodeNotFound })?; - let meta = tokio::fs::metadata(&icb.path) - .await - .map_err(GetAttrError::Io)?; + let meta = tokio::fs::metadata(&path).await.map_err(GetAttrError::Io)?; let file_attr = FileAttr::try_from(meta).map_err(|()| GetAttrError::InvalidFileType); debug_assert!(file_attr.is_ok(), "FileAttr conversion failed unexpectedly"); @@ -286,7 +314,8 @@ impl Fs for LocalFs { async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], ReadDirError> { debug_assert!(self.icache.contains(ino), "inode {ino} not in inode_table"); - let inode_cb = self.icache.get_icb(ino).ok_or_else(|| { + let path = self.icache.get_icb(ino, |icb| icb.path.clone()).await; + let path = path.ok_or_else(|| { warn!( parent = ino, "Readdir of unknown parent inode. Programming bug" @@ -294,35 +323,21 @@ impl Fs for LocalFs { ReadDirError::InodeNotFound })?; - let mut read_dir = tokio::fs::read_dir(&inode_cb.path) - .await - .map_err(ReadDirError::Io)?; - - // Note that we HAVE to re-read all entries here, since there's really no way for us to - // know whether another process has modified the underlying directory, without our consent. - // - // TODO(markovejnovic): If we can guarantee that only our process has access to the - // underlying directory, we can avoid re-loading the entries every time. - // - // Two mechanisms appear to exist: namespace mount and/or file permissions. - // - // However, both of these mechanisms take time to develop and we don't have time. + let mut read_dir = tokio::fs::read_dir(&path).await.map_err(ReadDirError::Io)?; + let mut entries: Vec = Vec::new(); while let Some(dir_entry) = read_dir.next_entry().await.map_err(ReadDirError::Io)? { entries.push(Self::parse_tokio_dirent(&dir_entry).await?); } - let inode_cb = self.icache.get_icb_mut(ino).ok_or_else(|| { - warn!(parent = ino, "inode disappeared. TOCTOU programming bug"); - ReadDirError::InodeNotFound - })?; - - Ok(inode_cb.children.insert(entries)) + self.readdir_buf = entries; + Ok(&self.readdir_buf) } async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { debug_assert!(self.icache.contains(ino), "inode {ino} not in inode_table"); - let icb = self.icache.get_icb(ino).ok_or_else(|| { + let path = self.icache.get_icb(ino, |icb| icb.path.clone()).await; + let path = path.ok_or_else(|| { warn!( "Open called on unknown inode {}. This is a programming bug", ino @@ -330,24 +345,21 @@ impl Fs for LocalFs { OpenError::InodeNotFound })?; - // TODO(markovejnovic): Not all flags are supported here. We could do better. let file = tokio::fs::OpenOptions::new() .read(true) .write(flags.contains(OpenFlags::RDWR) || flags.contains(OpenFlags::WRONLY)) .append(flags.contains(OpenFlags::APPEND)) .truncate(flags.contains(OpenFlags::TRUNC)) .create(flags.contains(OpenFlags::CREAT)) - .open(&icb.path) + .open(&path) .await .map_err(OpenError::Io)?; - // Generate a new file handle. - let fh = self.icache.allocate_fh(); + let fh = self.file_table.allocate(); self.open_files.insert(fh, file); Ok(OpenFile { handle: fh, - // TODO(markovejnovic): Might be interesting to set some of these options. options: FileOpenOptions::empty(), }) } @@ -361,7 +373,6 @@ impl Fs for LocalFs { _flags: OpenFlags, _lock_owner: Option, ) -> Result { - // TODO(markovejnovic): Respect flags and lock_owner. debug_assert!(self.icache.contains(ino), "inode {ino} not in inode_table"); debug_assert!( self.open_files.contains_key(&fh), @@ -406,11 +417,11 @@ impl Fs for LocalFs { async fn forget(&mut self, ino: Inode, nlookups: u64) { debug_assert!(self.icache.contains(ino), "inode {ino} not in inode_table"); - self.icache.forget(ino, nlookups); + self.icache.forget(ino, nlookups).await; } async fn statfs(&mut self) -> Result { - let stat = statvfs(self.abspath().as_path())?; + let stat = statvfs(self.root_path.as_path())?; Ok(FilesystemStats { block_size: stat.block_size().try_into().map_err(|_| { @@ -420,20 +431,22 @@ impl Fs for LocalFs { ) })?, fragment_size: stat.fragment_size(), - #[allow(clippy::allow_attributes)] + #[expect(clippy::allow_attributes, reason = "platform-dependent lint")] #[allow(clippy::useless_conversion)] total_blocks: u64::from(stat.blocks()), - #[allow(clippy::allow_attributes)] + #[expect(clippy::allow_attributes, reason = "platform-dependent lint")] #[allow(clippy::useless_conversion)] free_blocks: u64::from(stat.blocks_free()), - #[allow(clippy::allow_attributes)] + #[expect(clippy::allow_attributes, reason = "platform-dependent lint")] #[allow(clippy::useless_conversion)] available_blocks: u64::from(stat.blocks_available()), + #[expect(clippy::allow_attributes, reason = "platform-dependent lint")] + #[allow(clippy::cast_possible_truncation)] total_inodes: self.icache.inode_count() as u64, - #[allow(clippy::allow_attributes)] + #[expect(clippy::allow_attributes, reason = "platform-dependent lint")] #[allow(clippy::useless_conversion)] free_inodes: u64::from(stat.files_free()), - #[allow(clippy::allow_attributes)] + #[expect(clippy::allow_attributes, reason = "platform-dependent lint")] #[allow(clippy::useless_conversion)] available_inodes: u64::from(stat.files_available()), filesystem_id: 0, @@ -446,3 +459,156 @@ impl Fs for LocalFs { }) } } + +#[async_trait] +impl FsCacheProvider for LocalFs { + type CacheError = std::io::Error; + + async fn populate_file(&self, path: &Path, content: &[u8]) -> Result<(), Self::CacheError> { + let full_path = self.root_path.join(path); + if let Some(parent) = full_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(&full_path, content).await + } + + async fn read_cached_file( + &self, + path: &Path, + offset: u64, + size: u32, + ) -> Result, Self::CacheError> { + let full_path = self.root_path.join(path); + let mut file = match tokio::fs::File::open(&full_path).await { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(e) => return Err(e), + }; + file.seek(std::io::SeekFrom::Start(offset)).await?; + let mut buf = vec![0u8; size as usize]; + let n = file.read(&mut buf).await?; + buf.truncate(n); + Ok(Some(Bytes::from(buf))) + } + + async fn is_cached(&self, path: &Path) -> bool { + let full_path = self.root_path.join(path); + tokio::fs::try_exists(&full_path).await.unwrap_or(false) + } + + async fn invalidate(&self, path: &Path) -> Result<(), Self::CacheError> { + let full_path = self.root_path.join(path); + match tokio::fs::metadata(&full_path).await { + Ok(meta) => { + if meta.is_dir() { + tokio::fs::remove_dir_all(&full_path).await + } else { + tokio::fs::remove_file(&full_path).await + } + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn temp_dir() -> PathBuf { + let dir = std::env::temp_dir().join(format!("localfs-test-{}", rand::random::())); + std::fs::create_dir_all(&dir).unwrap(); + dir + } + + #[tokio::test] + async fn populate_file_creates_file_with_content() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + let path = Path::new("subdir/hello.txt"); + + fs.populate_file(path, b"hello world").await.unwrap(); + + let full = root.join(path); + let content = tokio::fs::read_to_string(&full).await.unwrap(); + assert_eq!(content, "hello world"); + } + + #[tokio::test] + async fn read_cached_file_returns_content() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + let path = Path::new("data.bin"); + + let data = b"abcdefghij"; + fs.populate_file(path, data).await.unwrap(); + + let result = fs.read_cached_file(path, 2, 5).await.unwrap(); + assert_eq!(result, Some(Bytes::from_static(b"cdefg"))); + } + + #[tokio::test] + async fn read_cached_file_returns_none_for_missing() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + + let result = fs + .read_cached_file(Path::new("nope.txt"), 0, 100) + .await + .unwrap(); + assert_eq!(result, None); + } + + #[tokio::test] + async fn is_cached_returns_true_for_existing_file() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + let path = Path::new("exists.txt"); + + fs.populate_file(path, b"yes").await.unwrap(); + + assert!(fs.is_cached(path).await); + assert!(!fs.is_cached(Path::new("nope.txt")).await); + } + + #[tokio::test] + async fn invalidate_removes_file() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + let path = Path::new("remove_me.txt"); + + fs.populate_file(path, b"gone").await.unwrap(); + assert!(fs.is_cached(path).await); + + fs.invalidate(path).await.unwrap(); + assert!(!fs.is_cached(path).await); + } + + #[tokio::test] + async fn invalidate_removes_directory_tree() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + + fs.populate_file(Path::new("dir/a.txt"), b"a") + .await + .unwrap(); + fs.populate_file(Path::new("dir/sub/b.txt"), b"b") + .await + .unwrap(); + + assert!(fs.is_cached(Path::new("dir")).await); + + fs.invalidate(Path::new("dir")).await.unwrap(); + assert!(!fs.is_cached(Path::new("dir")).await); + } + + #[tokio::test] + async fn invalidate_missing_path_is_ok() { + let root = temp_dir(); + let fs = LocalFs::new(&root); + + let result = fs.invalidate(Path::new("nonexistent")).await; + assert!(result.is_ok()); + } +} diff --git a/src/fs/mod.rs b/src/fs/mod.rs index c68cdee..80018c6 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -1,6 +1,7 @@ +pub mod cache_tracker; pub mod fuser; pub mod icache; -// TODO: re-enable after icache refactoring is complete -// pub mod local; +pub mod local; pub mod mescloud; pub mod r#trait; +pub mod write_through; diff --git a/src/fs/trait.rs b/src/fs/trait.rs index f4d9852..20060b9 100644 --- a/src/fs/trait.rs +++ b/src/fs/trait.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use std::{ ffi::{OsStr, OsString}, + path::Path, time::{Duration, SystemTime}, }; use tracing::error; @@ -373,3 +374,36 @@ pub trait Fs { /// Get filesystem statistics. async fn statfs(&mut self) -> Result; } + +/// Trait for filesystem implementations that can serve as a local mirror/cache. +/// +/// All methods take `&self` to allow safe use from background `tokio::spawn` tasks. +/// Implementations should use internal mutability (e.g., `tokio::fs` operations). +/// Paths are relative to the cache root directory. +#[cfg_attr( + not(test), + expect(dead_code, reason = "consumed by WriteThroughFs in Tasks 4-6") +)] +#[async_trait] +pub trait FsCacheProvider: Send + Sync { + /// The error type returned by cache operations. + type CacheError: std::error::Error + Send + 'static; + + /// Write file content to the cache at the given relative path. + /// Creates parent directories as needed. + async fn populate_file(&self, path: &Path, content: &[u8]) -> Result<(), Self::CacheError>; + + /// Read cached file content. Returns `None` if the file is not in the cache. + async fn read_cached_file( + &self, + path: &Path, + offset: u64, + size: u32, + ) -> Result, Self::CacheError>; + + /// Check whether a path has cached content. + async fn is_cached(&self, path: &Path) -> bool; + + /// Remove cached content at a path (file or directory tree). + async fn invalidate(&self, path: &Path) -> Result<(), Self::CacheError>; +} diff --git a/src/fs/write_through.rs b/src/fs/write_through.rs new file mode 100644 index 0000000..bf4db0e --- /dev/null +++ b/src/fs/write_through.rs @@ -0,0 +1,744 @@ +//! Write-through caching filesystem layer. +//! +//! `WriteThroughFs` wraps a backend filesystem `B` and mirrors data +//! to a local cache `F` via the `FsCacheProvider` trait. Reads check the +//! cache first; on miss the backend is called and the result is cached in +//! the background via `tokio::spawn`. +#![allow(dead_code, reason = "consumed by WriteThroughFs integration")] + +use std::collections::HashMap; +use std::ffi::OsStr; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use bytes::Bytes; +use tracing::{instrument, trace, warn}; + +use crate::fs::cache_tracker::CacheTracker; +use crate::fs::r#trait::{ + DirEntry, FileAttr, FileHandle, FilesystemStats, Fs, FsCacheProvider, Inode, LockOwner, + OpenFile, OpenFlags, +}; + +/// Write-through caching layer that mirrors backend data to a local cache. +/// +/// Generic parameters: +/// - `F`: Local cache filesystem implementing both `FsCacheProvider` and `Fs` +/// - `B`: Backend (remote) filesystem +pub struct WriteThroughFs +where + F: FsCacheProvider + Fs + Send + Sync + 'static, + B: Fs, +{ + front: Arc, + back: B, + tracker: Arc, + /// Maps backend inodes to their cache-relative paths. + inode_paths: HashMap, + readdir_buf: Vec, + stale_after: Duration, + last_eviction: Instant, + eviction_interval: Duration, +} + +impl WriteThroughFs +where + F: FsCacheProvider + Fs + Send + Sync + 'static, + B: Fs, +{ + /// Create a new write-through caching layer. + /// + /// - `front`: local cache implementing `FsCacheProvider + Fs` + /// - `back`: backend filesystem (e.g., remote) + /// - `stale_after`: maximum age before a cached entry is considered stale + /// - `eviction_interval`: how often to run stale-entry eviction + #[must_use] + pub fn new(front: F, back: B, stale_after: Duration, eviction_interval: Duration) -> Self { + Self { + front: Arc::new(front), + back, + tracker: Arc::new(CacheTracker::new()), + inode_paths: HashMap::new(), + readdir_buf: Vec::new(), + stale_after, + last_eviction: Instant::now(), + eviction_interval, + } + } + + /// Run stale-entry eviction if enough time has elapsed since the last run. + fn maybe_evict(&mut self) { + if self.last_eviction.elapsed() < self.eviction_interval { + return; + } + self.last_eviction = Instant::now(); + + let front = Arc::clone(&self.front); + let tracker = Arc::clone(&self.tracker); + let stale_after = self.stale_after; + + tokio::spawn(async move { + let stale = tracker.stale_entries(stale_after).await; + for path in stale { + if let Err(e) = front.invalidate(&path).await { + warn!(?e, ?path, "failed to invalidate stale cache entry"); + } + tracker.untrack(&path).await; + } + }); + } +} + +#[async_trait] +impl Fs for WriteThroughFs +where + F: FsCacheProvider + Fs + Send + Sync + 'static, + B: Fs + Send, + B::LookupError: Send, + B::GetAttrError: Send, + B::OpenError: Send, + B::ReadError: Send, + B::ReaddirError: Send, + B::ReleaseError: Send, +{ + type LookupError = B::LookupError; + type GetAttrError = B::GetAttrError; + type OpenError = B::OpenError; + type ReadError = B::ReadError; + type ReaddirError = B::ReaddirError; + type ReleaseError = B::ReleaseError; + + #[instrument(name = "WriteThroughFs::lookup", skip(self, name))] + async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + self.maybe_evict(); + + let attr = self.back.lookup(parent, name).await?; + let ino = attr.common().ino; + + // Record path for cache operations + let parent_path = self.inode_paths.get(&parent).cloned().unwrap_or_default(); + let child_path = parent_path.join(name); + self.inode_paths.insert(ino, child_path); + + Ok(attr) + } + + #[instrument(name = "WriteThroughFs::getattr", skip(self))] + async fn getattr( + &mut self, + ino: Inode, + fh: Option, + ) -> Result { + self.back.getattr(ino, fh).await + } + + #[instrument(name = "WriteThroughFs::readdir", skip(self))] + async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], Self::ReaddirError> { + self.maybe_evict(); + + let entries = self.back.readdir(ino).await?.to_vec(); + + // Record paths for all children + let parent_path = self.inode_paths.get(&ino).cloned().unwrap_or_default(); + for entry in &entries { + let child_path = parent_path.join(&entry.name); + self.inode_paths.insert(entry.ino, child_path); + } + + self.readdir_buf = entries; + Ok(&self.readdir_buf) + } + + #[instrument(name = "WriteThroughFs::open", skip(self))] + async fn open(&mut self, ino: Inode, flags: OpenFlags) -> Result { + self.back.open(ino, flags).await + } + + #[instrument(name = "WriteThroughFs::read", skip(self))] + async fn read( + &mut self, + ino: Inode, + fh: FileHandle, + offset: u64, + size: u32, + flags: OpenFlags, + lock_owner: Option, + ) -> Result { + self.maybe_evict(); + + // Check cache first + if let Some(path) = self.inode_paths.get(&ino) + && let Ok(Some(cached)) = self.front.read_cached_file(path, offset, size).await + { + trace!(ino, ?path, "read: cache hit"); + return Ok(cached); + } + + // Cache miss — read from backend + let data = self + .back + .read(ino, fh, offset, size, flags, lock_owner) + .await?; + + // Background: populate cache only for full-file reads (offset == 0) to avoid + // caching a partial slice under the file's path. + if offset == 0 + && let Some(path) = self.inode_paths.get(&ino).cloned() + { + let front = Arc::clone(&self.front); + let tracker = Arc::clone(&self.tracker); + let content = data.clone(); + tokio::spawn(async move { + let size = content.len() as u64; + if let Err(e) = front.populate_file(&path, &content).await { + warn!(?e, ?path, "background cache populate failed"); + return; + } + tracker.track(path, size).await; + trace!("background cache populate complete"); + }); + } + + Ok(data) + } + + #[instrument(name = "WriteThroughFs::release", skip(self))] + async fn release( + &mut self, + ino: Inode, + fh: FileHandle, + flags: OpenFlags, + flush: bool, + ) -> Result<(), Self::ReleaseError> { + self.back.release(ino, fh, flags, flush).await + } + + #[instrument(name = "WriteThroughFs::forget", skip(self))] + async fn forget(&mut self, ino: Inode, nlookups: u64) { + // Evict cached data for this inode + if let Some(path) = self.inode_paths.remove(&ino) { + let front = Arc::clone(&self.front); + let tracker = Arc::clone(&self.tracker); + tokio::spawn(async move { + if let Err(e) = front.invalidate(&path).await { + warn!(?e, ?path, "failed to invalidate on forget"); + } + tracker.untrack(&path).await; + }); + } + self.back.forget(ino, nlookups).await; + } + + async fn statfs(&mut self) -> Result { + self.back.statfs().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::fs::r#trait::{CommonFileAttr, DirEntryType, Permissions}; + use std::path::Path; + use std::sync::Mutex; + use std::sync::atomic::{AtomicU64, Ordering}; + + // -- Mock Backend Fs -- + + #[derive(Debug, thiserror::Error)] + #[error("mock error")] + pub struct MockError; + + impl From for i32 { + fn from(_: MockError) -> Self { + libc::EIO + } + } + + struct MockBackend { + lookup_results: HashMap<(Inode, String), FileAttr>, + getattr_results: HashMap, + readdir_results: HashMap>, + read_results: HashMap, + lookup_count: AtomicU64, + getattr_count: AtomicU64, + _readdir_count: AtomicU64, + read_count: AtomicU64, + readdir_buf: Vec, + } + + impl MockBackend { + fn new() -> Self { + Self { + lookup_results: HashMap::new(), + getattr_results: HashMap::new(), + readdir_results: HashMap::new(), + read_results: HashMap::new(), + lookup_count: AtomicU64::new(0), + getattr_count: AtomicU64::new(0), + _readdir_count: AtomicU64::new(0), + read_count: AtomicU64::new(0), + readdir_buf: Vec::new(), + } + } + } + + #[async_trait] + impl Fs for MockBackend { + type LookupError = MockError; + type GetAttrError = MockError; + type OpenError = MockError; + type ReadError = MockError; + type ReaddirError = MockError; + type ReleaseError = MockError; + + async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result { + self.lookup_count.fetch_add(1, Ordering::Relaxed); + let key = (parent, name.to_string_lossy().into_owned()); + self.lookup_results.get(&key).copied().ok_or(MockError) + } + + async fn getattr( + &mut self, + ino: Inode, + _fh: Option, + ) -> Result { + self.getattr_count.fetch_add(1, Ordering::Relaxed); + self.getattr_results.get(&ino).copied().ok_or(MockError) + } + + async fn readdir(&mut self, ino: Inode) -> Result<&[DirEntry], MockError> { + self._readdir_count.fetch_add(1, Ordering::Relaxed); + let entries = self.readdir_results.get(&ino).ok_or(MockError)?; + self.readdir_buf = entries.clone(); + Ok(&self.readdir_buf) + } + + async fn open(&mut self, _ino: Inode, _flags: OpenFlags) -> Result { + Err(MockError) + } + + async fn read( + &mut self, + ino: Inode, + _fh: FileHandle, + offset: u64, + size: u32, + _flags: OpenFlags, + _lock_owner: Option, + ) -> Result { + self.read_count.fetch_add(1, Ordering::Relaxed); + let data = self.read_results.get(&ino).ok_or(MockError)?; + let start = usize::try_from(offset) + .unwrap_or(usize::MAX) + .min(data.len()); + let end = start + .saturating_add(usize::try_from(size).unwrap_or(usize::MAX)) + .min(data.len()); + Ok(data.slice(start..end)) + } + + async fn release( + &mut self, + _ino: Inode, + _fh: FileHandle, + _flags: OpenFlags, + _flush: bool, + ) -> Result<(), MockError> { + Ok(()) + } + + async fn forget(&mut self, _ino: Inode, _nlookups: u64) {} + + async fn statfs(&mut self) -> Result { + Ok(FilesystemStats { + block_size: 4096, + fragment_size: 4096, + total_blocks: 0, + free_blocks: 0, + available_blocks: 0, + total_inodes: 0, + free_inodes: 0, + available_inodes: 0, + filesystem_id: 0, + mount_flags: 0, + max_filename_length: 255, + }) + } + } + + // -- Mock FsCacheProvider -- + + struct MockCache { + files: Mutex>>, + } + + impl MockCache { + fn new() -> Self { + Self { + files: Mutex::new(HashMap::new()), + } + } + } + + #[derive(Debug, thiserror::Error)] + #[error("mock cache error")] + pub struct MockCacheError; + + impl From for i32 { + fn from(_: MockCacheError) -> Self { + libc::EIO + } + } + + // Minimal Fs impl on MockCache (required by the bound, but not used by WriteThroughFs) + #[async_trait] + impl Fs for MockCache { + type LookupError = MockCacheError; + type GetAttrError = MockCacheError; + type OpenError = MockCacheError; + type ReadError = MockCacheError; + type ReaddirError = MockCacheError; + type ReleaseError = MockCacheError; + + async fn lookup(&mut self, _: Inode, _: &OsStr) -> Result { + Err(MockCacheError) + } + async fn getattr( + &mut self, + _: Inode, + _: Option, + ) -> Result { + Err(MockCacheError) + } + async fn readdir(&mut self, _: Inode) -> Result<&[DirEntry], MockCacheError> { + Err(MockCacheError) + } + async fn open(&mut self, _: Inode, _: OpenFlags) -> Result { + Err(MockCacheError) + } + async fn read( + &mut self, + _: Inode, + _: FileHandle, + _: u64, + _: u32, + _: OpenFlags, + _: Option, + ) -> Result { + Err(MockCacheError) + } + async fn release( + &mut self, + _: Inode, + _: FileHandle, + _: OpenFlags, + _: bool, + ) -> Result<(), MockCacheError> { + Ok(()) + } + async fn forget(&mut self, _: Inode, _: u64) {} + async fn statfs(&mut self) -> Result { + Err(std::io::Error::new(std::io::ErrorKind::Unsupported, "mock")) + } + } + + #[async_trait] + impl FsCacheProvider for MockCache { + type CacheError = std::io::Error; + + async fn populate_file(&self, path: &Path, content: &[u8]) -> Result<(), std::io::Error> { + self.files + .lock() + .expect("mock lock") + .insert(path.to_path_buf(), content.to_vec()); + Ok(()) + } + + async fn read_cached_file( + &self, + path: &Path, + offset: u64, + size: u32, + ) -> Result, std::io::Error> { + let files = self.files.lock().expect("mock lock"); + match files.get(path) { + Some(data) => { + let start = usize::try_from(offset) + .unwrap_or(usize::MAX) + .min(data.len()); + let end = start + .saturating_add(usize::try_from(size).unwrap_or(usize::MAX)) + .min(data.len()); + Ok(Some(Bytes::copy_from_slice(&data[start..end]))) + } + None => Ok(None), + } + } + + async fn is_cached(&self, path: &Path) -> bool { + self.files.lock().expect("mock lock").contains_key(path) + } + + async fn invalidate(&self, path: &Path) -> Result<(), std::io::Error> { + self.files.lock().expect("mock lock").remove(path); + Ok(()) + } + } + + // -- Helpers -- + + fn dummy_file_attr(ino: Inode) -> FileAttr { + FileAttr::RegularFile { + common: CommonFileAttr { + ino, + atime: std::time::SystemTime::UNIX_EPOCH, + mtime: std::time::SystemTime::UNIX_EPOCH, + ctime: std::time::SystemTime::UNIX_EPOCH, + crtime: std::time::SystemTime::UNIX_EPOCH, + perm: Permissions::from_bits_truncate(0o644), + nlink: 1, + uid: 0, + gid: 0, + blksize: 4096, + }, + size: 100, + blocks: 1, + } + } + + fn dummy_dir_attr(ino: Inode) -> FileAttr { + FileAttr::Directory { + common: CommonFileAttr { + ino, + atime: std::time::SystemTime::UNIX_EPOCH, + mtime: std::time::SystemTime::UNIX_EPOCH, + ctime: std::time::SystemTime::UNIX_EPOCH, + crtime: std::time::SystemTime::UNIX_EPOCH, + perm: Permissions::from_bits_truncate(0o755), + nlink: 2, + uid: 0, + gid: 0, + blksize: 4096, + }, + } + } + + fn make_wt(mock: MockBackend) -> WriteThroughFs { + WriteThroughFs::new( + MockCache::new(), + mock, + Duration::from_secs(60), + Duration::from_secs(30), + ) + } + + // -- Tests -- + + #[tokio::test] + async fn lookup_delegates_to_backend_on_miss() { + let mut mock = MockBackend::new(); + mock.lookup_results + .insert((1, "file.txt".into()), dummy_file_attr(10)); + let mut wt = make_wt(mock); + + let attr = wt.lookup(1, OsStr::new("file.txt")).await.unwrap(); + assert_eq!(attr, dummy_file_attr(10)); + assert_eq!(wt.back.lookup_count.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn lookup_records_inode_path() { + let mut mock = MockBackend::new(); + mock.lookup_results + .insert((1, "dir".into()), dummy_dir_attr(2)); + mock.lookup_results + .insert((2, "file.txt".into()), dummy_file_attr(10)); + let mut wt = make_wt(mock); + + // Root inode 1 maps to empty path + wt.inode_paths.insert(1, PathBuf::new()); + wt.lookup(1, OsStr::new("dir")).await.unwrap(); + wt.lookup(2, OsStr::new("file.txt")).await.unwrap(); + + assert_eq!(wt.inode_paths.get(&2), Some(&PathBuf::from("dir"))); + assert_eq!( + wt.inode_paths.get(&10), + Some(&PathBuf::from("dir/file.txt")) + ); + } + + #[tokio::test] + async fn getattr_delegates_to_backend() { + let mut mock = MockBackend::new(); + mock.getattr_results.insert(10, dummy_file_attr(10)); + let mut wt = make_wt(mock); + + let attr = wt.getattr(10, None).await.unwrap(); + assert_eq!(attr, dummy_file_attr(10)); + assert_eq!(wt.back.getattr_count.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn forget_evicts_inode_path() { + let mut mock = MockBackend::new(); + mock.lookup_results + .insert((1, "file.txt".into()), dummy_file_attr(10)); + let mut wt = make_wt(mock); + wt.inode_paths.insert(1, PathBuf::new()); + + wt.lookup(1, OsStr::new("file.txt")).await.unwrap(); + assert!( + wt.inode_paths.contains_key(&10), + "inode 10 should be tracked after lookup" + ); + + wt.forget(10, 1).await; + assert!( + !wt.inode_paths.contains_key(&10), + "inode 10 should be evicted after forget" + ); + } + + #[tokio::test] + async fn read_caches_to_front_in_background() { + let mut mock = MockBackend::new(); + mock.read_results + .insert(10, Bytes::from_static(b"file content here")); + let mut wt = make_wt(mock); + wt.inode_paths.insert(10, PathBuf::from("test/file.txt")); + + // First read: goes to backend, spawns background cache write + let data = wt + .read(10, 1, 0, 100, OpenFlags::RDONLY, None) + .await + .unwrap(); + assert_eq!(data, Bytes::from_static(b"file content here")); + assert_eq!(wt.back.read_count.load(Ordering::Relaxed), 1); + + // Give background task time to complete + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + + // Second read: should hit the cache, NOT the backend + let data = wt + .read(10, 1, 0, 100, OpenFlags::RDONLY, None) + .await + .unwrap(); + assert_eq!(data, Bytes::from_static(b"file content here")); + assert_eq!( + wt.back.read_count.load(Ordering::Relaxed), + 1, + "backend should only be called once" + ); + } + + #[tokio::test] + async fn read_without_path_still_works() { + let mut mock = MockBackend::new(); + mock.read_results.insert(10, Bytes::from_static(b"data")); + let mut wt = make_wt(mock); + // No inode_paths entry for ino 10 + + let data = wt + .read(10, 1, 0, 100, OpenFlags::RDONLY, None) + .await + .unwrap(); + assert_eq!(data, Bytes::from_static(b"data")); + } + + #[tokio::test] + async fn read_cache_hit_returns_correct_slice() { + let mut mock = MockBackend::new(); + mock.read_results + .insert(10, Bytes::from_static(b"0123456789")); + let mut wt = make_wt(mock); + wt.inode_paths.insert(10, PathBuf::from("f.txt")); + + // Populate cache with full content + wt.read(10, 1, 0, 100, OpenFlags::RDONLY, None) + .await + .unwrap(); + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + + // Read a slice from cache + let data = wt.read(10, 1, 3, 4, OpenFlags::RDONLY, None).await.unwrap(); + assert_eq!(data, Bytes::from_static(b"3456")); + } + + #[tokio::test] + async fn readdir_records_child_paths() { + let mut mock = MockBackend::new(); + mock.readdir_results.insert( + 1, + vec![ + DirEntry { + ino: 2, + name: "src".into(), + kind: DirEntryType::Directory, + }, + DirEntry { + ino: 3, + name: "README.md".into(), + kind: DirEntryType::RegularFile, + }, + ], + ); + let mut wt = make_wt(mock); + wt.inode_paths.insert(1, PathBuf::new()); + + wt.readdir(1).await.unwrap(); + + assert_eq!(wt.inode_paths.get(&2), Some(&PathBuf::from("src"))); + assert_eq!(wt.inode_paths.get(&3), Some(&PathBuf::from("README.md"))); + } + + #[tokio::test] + async fn tracker_records_cached_file_size() { + let mut mock = MockBackend::new(); + mock.read_results.insert(10, Bytes::from_static(b"hello")); + let mut wt = make_wt(mock); + wt.inode_paths.insert(10, PathBuf::from("f.txt")); + + wt.read(10, 1, 0, 100, OpenFlags::RDONLY, None) + .await + .unwrap(); + + // Wait for background task + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + + assert_eq!(wt.tracker.estimated_size(), 5); + } + + #[tokio::test] + async fn eviction_removes_stale_cache_entries() { + let mut mock = MockBackend::new(); + mock.read_results.insert(10, Bytes::from_static(b"content")); + let mut wt = WriteThroughFs::new( + MockCache::new(), + mock, + Duration::from_millis(50), // stale after 50ms + Duration::from_millis(10), // check every 10ms + ); + wt.inode_paths.insert(10, PathBuf::from("f.txt")); + + // Populate cache + wt.read(10, 1, 0, 100, OpenFlags::RDONLY, None) + .await + .unwrap(); + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(wt.front.is_cached(Path::new("f.txt")).await); + + // Wait for staleness + trigger eviction + tokio::time::sleep(Duration::from_millis(100)).await; + wt.maybe_evict(); + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(10)).await; + + assert!(!wt.front.is_cached(Path::new("f.txt")).await); + } +}