diff --git a/Cargo.lock b/Cargo.lock index 577f7b3..ebbe571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,6 +574,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -767,6 +773,7 @@ dependencies = [ "dirs", "fuser", "futures", + "hashlink", "http", "inquire", "libc", @@ -835,7 +842,7 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -843,6 +850,18 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", +] + +[[package]] +name = "hashlink" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230" +dependencies = [ + "hashbrown 0.16.1", +] [[package]] name = "heck" diff --git a/Cargo.toml b/Cargo.toml index 9256d6e..399c11b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,9 @@ license = "MIT" repository = "https://github.com/mesa-dot-dev/git-fs" authors = ["Marko Vejnovic "] +[lib] +path = "lib/lib.rs" + [dependencies] clap = { version = "4.5.54", features = ["derive", "env"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } @@ -47,6 +50,7 @@ opentelemetry = { version = "0.29" } opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.29", default-features = false, features = ["http-proto", "trace", "reqwest-blocking-client"] } tracing-opentelemetry = { version = "0.30" } +hashlink = "0.11.0" [features] default = [] @@ -56,6 +60,7 @@ staging = [] vergen-gitcl = { version = "1", features = [] } [lints.rust] +unexpected_cfgs = "warn" unsafe_code = "allow" missing_docs = "warn" unreachable_pub = "allow" diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs new file mode 100644 index 0000000..ff56b2c --- /dev/null +++ b/lib/cache/eviction/lru.rs @@ -0,0 +1,297 @@ +//! Implements the LRU eviction policy. + +use std::{future::Future, hash::Hash}; + +use std::sync::{ + Arc, + atomic::{self, AtomicI64}, +}; + +use hashlink::LinkedHashMap; +use tokio::sync::mpsc::{Receiver, error::TrySendError}; + +/// Types that carry a monotonic version for deduplication of out-of-order messages. +pub trait Versioned { + /// Returns the monotonic version of this value. + fn version(&self) -> u64; +} + +/// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete +/// keys when they are evicted. +pub trait Deleter: Send + Clone + 'static { + /// Delete the given key from the cache. Deletions for different keys may be invoked + /// concurrently and in arbitrary order. Correctness is ensured by the caller through + /// a context-based guard (e.g. matching on a unique file ID), not by invocation order. + fn delete(&mut self, key: K, ctx: Ctx) -> impl Future + Send; +} + +/// Messages sent to the LRU eviction tracker worker. +#[derive(Debug, Clone, Copy)] +enum Message { + /// Notify the LRU eviction tracker that the given key was accessed. + Accessed(K), + /// Request an eviction set of the given size. + Evict(u32), + /// Notify the LRU eviction tracker that a key was inserted or overwritten. + Upserted(K, C), +} + +/// Tracks in-flight eviction batches and individual deletions using a single packed `AtomicI64`. +/// +/// Layout: `[upper 32 bits: pending batches | lower 32 bits: active deletions]` +/// +/// The lifecycle of an eviction is: +/// 1. Producer calls `submit_batch()` — increments upper half by 1. +/// 2. Producer enqueues the `Evict` message into the channel. +/// 3. Worker receives the message and calls `process_batch(n)` — decrements upper half by 1 and +/// increments lower half by `n` (the actual number of keys to delete). +/// 4. Each spawned deletion task calls `observe_deletion()` when done — decrements lower half by 1. +/// +/// The indicator reads zero only when no batches are pending and no deletions are in flight. +#[derive(Debug)] +struct DeletionIndicator { + underlying: AtomicI64, +} + +impl DeletionIndicator { + fn new() -> Self { + Self { + underlying: AtomicI64::new(0), + } + } + + /// Mark that a new eviction batch has been submitted by a producer. Called on the producer + /// side *before* the `Evict` message is sent into the channel. + fn submit_batch(&self) { + self.underlying + .fetch_add(1 << 32, atomic::Ordering::Relaxed); + } + + /// Undo a `submit_batch` call. Used when the channel `try_send` fails after we already + /// incremented the pending-batch counter. + fn undo_submit_batch(&self) { + self.underlying + .fetch_sub(1 << 32, atomic::Ordering::Relaxed); + } + + /// Called by the worker when it begins processing an eviction batch. Atomically decrements + /// the pending-batch counter (upper half) by 1 and increments the active-deletion counter + /// (lower half) by `count`. + fn process_batch(&self, count: u32) { + self.underlying + .fetch_add(i64::from(count) - (1 << 32), atomic::Ordering::Relaxed); + } + + /// Called by a spawned deletion task when it finishes deleting one key. + fn observe_deletion(&self) { + self.underlying.fetch_sub(1, atomic::Ordering::Relaxed); + } + + /// Returns `true` if there is any eviction work in progress — either batches waiting to be + /// processed by the worker, or individual deletions still in flight. + fn have_pending_work(&self) -> bool { + self.underlying.load(atomic::Ordering::Relaxed) != 0 + } +} + +#[derive(Debug)] +struct LruProcessingTask> { + receiver: Receiver>, + + /// The ordered set of keys, ordered according to the last-used policy. + ordered_key_map: LinkedHashMap, + + /// The deleter to call when we need to evict keys. + deleter: D, + + /// Pointer into the shared deletion tracker. + shared: Arc, +} + +impl> + LruProcessingTask +{ + fn new(deleter: D, receiver: Receiver>, shared: Arc) -> Self { + Self { + receiver, + ordered_key_map: LinkedHashMap::new(), + deleter, + shared, + } + } + + fn spawn_task(deleter: D, receiver: Receiver>, shared: Arc) { + // TODO(markovejnovic): This should have a best-effort drop. + tokio::spawn(async move { + let mut task = Self::new(deleter, receiver, shared); + task.work().await; + }); + } + + async fn work(&mut self) { + while let Some(msg) = self.receiver.recv().await + && self.service_message(msg) + {} + } + + /// Returns true if the task should continue working. + #[must_use] + fn service_message(&mut self, message: Message) -> bool { + match message { + Message::Accessed(k) => { + // The key may have been evicted between the access and this message arriving. + // If it was, the remove returns None and this is a no-op. + if let Some(entry) = self.ordered_key_map.remove(&k) { + self.ordered_key_map.insert(k, entry); + } + } + Message::Evict(max_count) => { + { + // These integer casts are safe, since max_count is guaranteed to fit + // within a u32, min(MAX_U32, MAX_USIZE) == MAX_U32. + #[expect(clippy::cast_possible_truncation)] + let take_count = self.ordered_key_map.len().min(max_count as usize) as u32; + + // Atomically transition this batch from "pending" to "active deletions". + // This decrements the upper half (pending batches) by 1 and increments the + // lower half (active deletions) by take_count. If take_count is 0, this + // still correctly clears the pending batch. + self.shared.active_deletions.process_batch(take_count); + + for _ in 0..take_count { + let Some((key, ctx)) = self.ordered_key_map.pop_front() else { + break; + }; + let mut deleter = self.deleter.clone(); + let shared_clone = Arc::clone(&self.shared); + tokio::spawn(async move { + // Drop guard ensures observe_deletion() runs even if the + // deletion task is cancelled or panics. + struct DeletionGuard(Arc); + impl Drop for DeletionGuard { + fn drop(&mut self) { + self.0.active_deletions.observe_deletion(); + } + } + let _guard = DeletionGuard(shared_clone); + deleter.delete(key, ctx).await; + }); + } + } + } + Message::Upserted(k, ctx) => { + if let Some(existing) = self.ordered_key_map.get(&k) + && ctx.version() < existing.version() + { + // Stale message from a previous incarnation; drop it. + return true; + } + self.ordered_key_map.remove(&k); + self.ordered_key_map.insert(k, ctx); + } + } + + true + } +} + +#[derive(Debug)] +struct WorkerState { + /// Packed atomic tracking pending eviction batches (upper 32 bits) and active deletions + /// (lower 32 bits). See `DeletionIndicator` for the full protocol. + active_deletions: DeletionIndicator, +} + +/// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and +/// to evict keys when necessary. +#[derive(Debug)] +pub struct LruEvictionTracker { + worker_message_sender: tokio::sync::mpsc::Sender>, + worker_state: Arc, +} + +impl + LruEvictionTracker +{ + /// Spawn a new LRU eviction tracker with the given deleter and channel size. + pub fn spawn>(deleter: D, channel_size: usize) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(channel_size); + + let worker_state = Arc::new(WorkerState { + active_deletions: DeletionIndicator::new(), + }); + // The worker task is intentionally detached: it exits when the channel closes + // (i.e. when all senders, including the one held by LruEvictionTracker, are dropped). + LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); + + Self { + worker_message_sender: tx, + worker_state, + } + } + + /// Notify the LRU tracker that a key was inserted or overwritten. + /// + /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the + /// notification cannot be lost to task cancellation. + pub fn upsert(&self, key: K, ctx: C) { + match self + .worker_message_sender + .try_send(Message::Upserted(key, ctx)) + { + Ok(()) | Err(TrySendError::Closed(_)) => {} + Err(TrySendError::Full(msg)) => { + let sender = self.worker_message_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } + } + + /// Notify the LRU eviction tracker that the given key was accessed. + /// + /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the + /// notification cannot be lost to task cancellation. + pub fn access(&self, key: K) { + match self.worker_message_sender.try_send(Message::Accessed(key)) { + Ok(()) | Err(TrySendError::Closed(_)) => {} + Err(TrySendError::Full(msg)) => { + let sender = self.worker_message_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } + } + + /// Try to cull the least recently used keys with the given deletion method. + /// + /// Returns `true` if the eviction message was successfully enqueued, or `false` if the + /// channel is full. In the latter case, the caller should yield and retry. + /// + /// The ordering here is critical for correctness: we `submit_batch()` *before* `try_send()` + /// so that the `DeletionIndicator` is always in a state where `have_pending_batches()` returns + /// `true` by the time anyone observes the enqueued message. If the send fails, we roll back + /// with `undo_submit_batch()`, which is a net-zero delta on the packed atomic. + #[must_use] + pub fn try_cull(&self, max_count: u32) -> bool { + self.worker_state.active_deletions.submit_batch(); + if self + .worker_message_sender + .try_send(Message::Evict(max_count)) + .is_ok() + { + true + } else { + self.worker_state.active_deletions.undo_submit_batch(); + false + } + } + + /// Check whether there are culls that are already scheduled or actively in progress. + #[must_use] + pub fn have_pending_culls(&self) -> bool { + self.worker_state.active_deletions.have_pending_work() + } +} diff --git a/lib/cache/eviction/mod.rs b/lib/cache/eviction/mod.rs new file mode 100644 index 0000000..94ca450 --- /dev/null +++ b/lib/cache/eviction/mod.rs @@ -0,0 +1,2 @@ +/// Different types of eviction policies for any cache. +pub mod lru; diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs new file mode 100644 index 0000000..8d69f1a --- /dev/null +++ b/lib/cache/fcache.rs @@ -0,0 +1,468 @@ +//! File-based MT-safe, async-safe data cache. + +use std::{ + fmt::Debug, + hash::Hash, + path::{Path, PathBuf}, +}; + +use std::sync::{ + Arc, + atomic::{self, AtomicU64, AtomicUsize}, +}; + +use tracing::error; + +use crate::{ + cache::{ + eviction::lru::{Deleter, LruEvictionTracker, Versioned}, + traits::{AsyncReadableCache, AsyncWritableCache}, + }, + io, +}; +use thiserror::Error; + +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + +/// Drop guard that deletes an unregistered cache file if the insert task is cancelled or fails +/// between file creation and map registration. Defused once the file is registered in the map. +struct FileGuard { + path: Option, +} + +impl Drop for FileGuard { + fn drop(&mut self) { + if let Some(path) = self.path.take() + && let Err(e) = std::fs::remove_file(&path) + && e.kind() != std::io::ErrorKind::NotFound + { + error!(error = ?e, path = ?path, "failed to clean up orphaned cache file"); + } + } +} + +struct FileCacheShared { + root_path: PathBuf, + map: scc::HashMap, + size_bytes: AtomicUsize, +} + +impl FileCacheShared { + fn size(&self) -> usize { + self.size_bytes.load(atomic::Ordering::Relaxed) + } + + fn path_for(&self, fid: usize) -> PathBuf { + self.root_path.join(fid.to_string()) + } + + /// Given an entry, remove it from the disk. Note this does not update the cache size tracker. + /// + /// Gracefully handles the case where the file doesn't exist, which can happen if the file was + /// deleted after we read the file ID from the map, but before we tried to delete + /// the file. + async fn delete_entry_from_disk_async(&self, entry: &CacheMapEntry) -> std::io::Result<()> { + match tokio::fs::remove_file(self.path_for(entry.fid)).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } +} + +#[derive(Debug, Clone, Copy)] +struct DeleterCtx { + fid: usize, + version: u64, +} + +impl Versioned for DeleterCtx { + fn version(&self) -> u64 { + self.version + } +} + +#[derive(Clone)] +struct LruEntryDeleter { + shared: Arc>, +} + +impl LruEntryDeleter { + fn new(shared: Arc>) -> Self { + Self { shared } + } +} + +impl Deleter for LruEntryDeleter { + /// The LRU cache will call this method when it wants to evict keys. + async fn delete(&mut self, key: K, ctx: DeleterCtx) { + if let Some(entry) = self + .shared + .map + .remove_if_async(&key, |m| m.fid == ctx.fid) + .await + { + if let Err(e) = self.shared.delete_entry_from_disk_async(&entry.1).await { + error!(error = ?e, "failed to delete evicted cache entry from disk"); + } + + self.shared + .size_bytes + .fetch_sub(entry.1.size_bytes, atomic::Ordering::Relaxed); + } + } +} + +/// Error thrown during construction of a `FileCache` which describes why the given root path is +/// invalid. +#[derive(Debug, Error)] +pub enum InvalidRootPathError { + /// The root path exists but isn't a directory. + #[error("Root path is not a directory: {0}")] + NotADirectory(PathBuf), + + /// The root path exists and is a directory, but it isn't empty and the data in it appears to + /// come from a different source than this application. + #[error("Root path appears to contain data stemming from sources different to this app: {0}")] + RootPathUnsafeCache(PathBuf), + + /// An IO error occurred while trying to access the root path. + #[error("IO error while accessing root path: {0}")] + Io(#[from] std::io::Error), +} + +/// Error thrown during insertion into the cache. +#[derive(Debug, Error)] +pub enum CacheWriteError { + /// An IO error occurred while trying to write the new value to disk. + #[error("IO error while inserting into cache: {0}")] + Io(#[from] std::io::Error), +} + +#[derive(Debug, Clone, Copy)] +struct CacheMapEntry { + /// A unique ID representing the name of the file on disk where the value for this cache entry + /// is stored. This is just an integer that is used to generate the file path for the cache + /// entry. + fid: usize, + + /// The size of the value for this cache entry in bytes. This is used to keep track of the + /// total size of the cache, and to evict entries when necessary. + size_bytes: usize, +} + +/// A general-purpose file-backed cache. This cache is designed to store arbitrary byte values on +/// disk, and to retrieve them later using a key. +/// +/// This cache is considered thread-safe and async-safe. +pub struct FileCache { + shared: Arc>, + + /// Generates unique file paths for new cache entries. This is just a counter that increments + /// for each new file, and the file ID is the value of the counter at the time of creation. + file_generator: AtomicUsize, + + /// The maximum size of the cache in bytes. This is used to determine when to evict entries + /// from the cache. This is a soft hint, rather than a hard limit, and the cache may + /// temporarily exceed this size during insertions, but it will try to evict entries as soon as + /// possible to get back under this limit. + max_size_bytes: usize, + + /// LRU eviction tracker. This is used to track the least recently used keys in the cache, and + /// to evict keys when necessary. + lru_tracker: LruEvictionTracker, + + /// Global monotonic counter for per-key versioning. Incremented under the scc bucket lock + /// to guarantee that versions are strictly monotonically increasing per key. + version_counter: AtomicU64, +} + +impl FileCache { + // How many cache entries to evict at most in a single batch. + // + // Not really sure how to determine this number. + const LRU_EVICTION_MAX_BATCH_SIZE: u32 = 8; + + // The maximum number of messages that can be buffered between this file cache and the LRU + // eviction tracker. If this is too small, then the file cache will be blocked waiting for the + // eviction tracker to catch up. + const MAX_LRU_TRACKER_CHANNEL_SIZE: usize = 256; + + // Dangerous: Changing this constant may cause the program to treat existing cache directories + // as invalid, and thus provide a worse user experience. Do not change this unless you have a + // very good reason to do so. Changing this will break backwards compatibility with existing + // cache directories. + const GITFS_MARKER_FILE: &'static str = ".gitfs_cache"; + + // The total maximum number of times we will try to read a "deleted" file before giving up and + // treating it as a hard error. + // + // See usage for reasoning on why this is necessary. + const MAX_READ_RETRY_COUNT: usize = 8; + + // The maximum number of eviction loop iterations before giving up and proceeding with the + // insert. Prevents livelock when the LRU worker's ordered map is empty (e.g., Upserted + // messages haven't been delivered yet) and try_cull succeeds but evicts nothing. + const MAX_EVICTION_ATTEMPTS: u32 = 4; + + /// Try to create a new file cache at the given path. + /// + /// + /// # Args + /// + /// * `file_path` - If the path exists, it must either be an empty directory, or a directory + /// which was previously used as a cache for this program. + /// * `max_size_bytes` - The maximum size of the cache in bytes. This is used to determine when + /// to evict entries from the cache. This is a soft hint, rather than a hard limit, and the + /// cache may temporarily exceed this size during insertions, but it will try to evict entries + /// as soon as possible to get back under this limit. + pub async fn new( + file_path: &Path, + max_size_bytes: usize, + ) -> Result { + let mut pbuf = match tokio::fs::canonicalize(file_path).await { + Ok(mut p) => { + if !tokio::fs::metadata(&p).await?.is_dir() { + return Err(InvalidRootPathError::NotADirectory(p)); + } + + let mut entries = tokio::fs::read_dir(&p).await?; + let is_empty = entries.next_entry().await?.is_none(); + + p.push(Self::GITFS_MARKER_FILE); + let marker_exists = tokio::fs::try_exists(&p).await?; + p.pop(); + + if !(is_empty || marker_exists) { + return Err(InvalidRootPathError::RootPathUnsafeCache(p)); + } + + io::remove_dir_contents(&p).await?; + + Ok(p) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + tokio::fs::create_dir_all(file_path).await?; + tokio::fs::canonicalize(file_path).await + } + Err(e) => return Err(e.into()), + }?; + + // Create marker file so that subsequent restarts of this application gracefully handle the + // existing cache directory. + pbuf.push(Self::GITFS_MARKER_FILE); + tokio::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&pbuf) + .await?; + pbuf.pop(); + + let shared = Arc::new(FileCacheShared { + root_path: pbuf, + map: scc::HashMap::new(), + size_bytes: AtomicUsize::new(0), + }); + + Ok(Self { + shared: Arc::clone(&shared), + file_generator: AtomicUsize::new(0), + max_size_bytes, + lru_tracker: LruEvictionTracker::spawn( + LruEntryDeleter::new(shared), + Self::MAX_LRU_TRACKER_CHANNEL_SIZE, + ), + version_counter: AtomicU64::new(0), + }) + } + + async fn create_file(&self, path: &Path) -> Result { + tokio::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path) + .await + } +} + +impl Drop for FileCache { + fn drop(&mut self) { + // Surprisingly, it's safe to delete files here. + // + // Note that the LRU tracker is running at the time of Drop. It, consequently, may end up + // calling the deleter, but that's fine, since the deleter ignores this problem gracefully. + // + // It is 100% safe to do this here, since it's guaranteed no concurrent task is borrowing + // the FileCache at this point. + if let Err(e) = std::fs::remove_dir_all(&self.shared.root_path) { + error!(error = ?e, "failed to delete cache directory on drop"); + } + } +} + +impl AsyncReadableCache> + for FileCache +{ + async fn get(&self, key: &K) -> Option> { + for _ in 0..Self::MAX_READ_RETRY_COUNT { + // Grab the best-guess file ID for the given key. If this fails, then the key is not in + // the cache, and we can return `None`. + let entry = *(self.shared.map.get_async(key).await?); + + // The next thing we need to do is try to open the file. This may fail for a plethora + // of reasons. + // + // TODO(markovejnovic): path_for allocates on heap, could be on stack. + let mut file = match tokio::fs::File::open(self.shared.path_for(entry.fid)).await { + Ok(file) => file, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // The file was not found. + // This may happen for two reasons: + // + // There is a bug in the code. + // + // The file was id was deleted after we read the file ID from the map on line: + // + // let fid: usize = *(self.map.get_async(key).await?); + // + // The file can get deleted as part of the insert procedure, or the deletion + // procedure, so we need to try to re-read the file. + continue; + } + Err(e) => { + error!(error = ?e, key = ?key, "IO error while opening file for cache key"); + + // This MIGHT be recoverable. The IO error may very well be transient, so we'll + // just say we don't have this value and then pray that the user will perform + // an .insert + return None; + } + }; + + let mut buf = Vec::new(); + file.read_to_end(&mut buf) + .await + .inspect_err(|e| { + error!(error = ?e, key = ?key, "IO error while reading file for cache key"); + }) + .ok()?; + self.lru_tracker.access(*key); + return Some(buf); + } + + error!(key = ?key, attempt_count = Self::MAX_READ_RETRY_COUNT, "could not find file."); + + // Again, might be recoverable if the user calls an .insert later on. + None + } + + async fn contains(&self, key: &K) -> bool { + self.shared.map.contains_async(key).await + } +} + +impl + AsyncWritableCache, CacheWriteError> for FileCache +{ + async fn insert(&self, key: &K, value: Vec) -> Result<(), CacheWriteError> { + let new_fid = self.file_generator.fetch_add(1, atomic::Ordering::Relaxed); + let new_size = value.len(); + + let mut eviction_attempts = 0u32; + while self.shared.size() + new_size > self.max_size_bytes { + if self.shared.size() == 0 { + // The new entry is larger than the entire cache size limit. Insert it anyway + // and let the cache temporarily exceed the limit. + break; + } + + if eviction_attempts >= Self::MAX_EVICTION_ATTEMPTS { + // We've tried enough times. The cache is over budget but max_size_bytes is a + // soft hint -- proceed with the insert and let future inserts drive further + // eviction. + break; + } + eviction_attempts += 1; + + if self.lru_tracker.have_pending_culls() { + tokio::task::yield_now().await; + continue; + } + + if !self.lru_tracker.try_cull(Self::LRU_EVICTION_MAX_BATCH_SIZE) { + tokio::task::yield_now().await; + } + } + + // Write file to disk. The guard is constructed *before* the `.await` so that if + // this future is cancelled while `create_file` is in flight, the drop guard still + // cleans up the (potentially created) file. + let path = self.shared.path_for(new_fid); + let mut guard = FileGuard { + path: Some(path.clone()), + }; + let mut new_file = self.create_file(&path).await?; + new_file.write_all(&value).await?; + + // Use entry_async to lock the bucket, then allocate version under the lock. + // This ensures version monotonicity per key matches the actual mutation order. + // Size accounting is also done under the lock to prevent transient underflow + // when concurrent inserts to the same key interleave their deltas. + let (old_entry, new_version) = match self.shared.map.entry_async(*key).await { + scc::hash_map::Entry::Occupied(mut o) => { + let old = *o.get(); + let v = self.version_counter.fetch_add(1, atomic::Ordering::Relaxed); + *o.get_mut() = CacheMapEntry { + fid: new_fid, + size_bytes: new_size, + }; + + let size_delta: isize = new_size.cast_signed() - old.size_bytes.cast_signed(); + if size_delta > 0 { + self.shared + .size_bytes + .fetch_add(size_delta.cast_unsigned(), atomic::Ordering::Relaxed); + } else if size_delta < 0 { + self.shared + .size_bytes + .fetch_sub(size_delta.unsigned_abs(), atomic::Ordering::Relaxed); + } + + (Some(old), v) + } + scc::hash_map::Entry::Vacant(vacant) => { + let v = self.version_counter.fetch_add(1, atomic::Ordering::Relaxed); + vacant.insert_entry(CacheMapEntry { + fid: new_fid, + size_bytes: new_size, + }); + + self.shared + .size_bytes + .fetch_add(new_size, atomic::Ordering::Relaxed); + + (None, v) + } + }; + // Bucket lock released here. + guard.path = None; + + // LRU notification (sync via try_send, non-cancellable). + self.lru_tracker.upsert( + *key, + DeleterCtx { + fid: new_fid, + version: new_version, + }, + ); + + // Deferrable: delete old file (safe to cancel — file is orphaned). + if let Some(old_entry) = old_entry { + drop(self.shared.delete_entry_from_disk_async(&old_entry).await); + } + + Ok(()) + } +} diff --git a/lib/cache/mod.rs b/lib/cache/mod.rs new file mode 100644 index 0000000..e0c1c97 --- /dev/null +++ b/lib/cache/mod.rs @@ -0,0 +1,6 @@ +/// Cache eviction policies. +pub mod eviction; +/// File-backed cache implementation. +pub mod fcache; +/// Cache traits for read and write operations. +pub mod traits; diff --git a/lib/cache/traits.rs b/lib/cache/traits.rs new file mode 100644 index 0000000..5a38464 --- /dev/null +++ b/lib/cache/traits.rs @@ -0,0 +1,32 @@ +use std::hash::Hash; + +/// A readable cache that can be read from asynchronously. +#[expect(async_fn_in_trait)] +pub trait AsyncReadableCache +where + K: Eq + Hash, + V: Clone, +{ + /// Fetch the value associated with the given key from the cache. If the key is not present in + /// the cache, return `None`. + async fn get(&self, key: &K) -> Option; + + /// Check if the cache contains the given key. + async fn contains(&self, key: &K) -> bool; +} + +/// A writable cache that can be written to asynchronously. +/// +/// Note that this trait does not require the cache to support manual eviction of entries. Many +/// caches only support automatic eviction (such as LRU eviction), and this trait is designed to +/// support those caches as well. +#[expect(async_fn_in_trait)] +pub trait AsyncWritableCache +where + K: Eq + Hash, + V: Clone, + E: std::error::Error, +{ + /// Insert the given value into the cache with the given key. + async fn insert(&self, key: &K, value: V) -> Result<(), E>; +} diff --git a/lib/io.rs b/lib/io.rs new file mode 100644 index 0000000..95a5d97 --- /dev/null +++ b/lib/io.rs @@ -0,0 +1,17 @@ +//! Random IO utilities + +use std::path::Path; + +/// Remove all files and directories in the given directory, but not the directory itself. +pub async fn remove_dir_contents(path: &Path) -> std::io::Result<()> { + let mut entries = tokio::fs::read_dir(path).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if tokio::fs::metadata(&path).await?.is_dir() { + tokio::fs::remove_dir_all(path).await?; + } else { + tokio::fs::remove_file(path).await?; + } + } + Ok(()) +} diff --git a/lib/lib.rs b/lib/lib.rs new file mode 100644 index 0000000..f7388bd --- /dev/null +++ b/lib/lib.rs @@ -0,0 +1,5 @@ +//! git-fs shared library. + +/// Caching primitives for git-fs. +pub mod cache; +pub mod io;