From abff99cbec637c3324301a89eb3fa451d180a5f9 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 10:51:19 -0800 Subject: [PATCH 01/15] Set up local caching --- Cargo.lock | 21 +++- Cargo.toml | 4 + lib/cache/eviction/lru.rs | 210 ++++++++++++++++++++++++++++++++++++++ lib/cache/eviction/mod.rs | 2 + lib/cache/fcache.rs | 106 +++++++++++++++++++ lib/cache/mod.rs | 3 + lib/cache/traits.rs | 86 ++++++++++++++++ lib/io.rs | 17 +++ lib/lib.rs | 9 ++ 9 files changed, 457 insertions(+), 1 deletion(-) create mode 100644 lib/cache/eviction/lru.rs create mode 100644 lib/cache/eviction/mod.rs create mode 100644 lib/cache/fcache.rs create mode 100644 lib/cache/mod.rs create mode 100644 lib/cache/traits.rs create mode 100644 lib/io.rs create mode 100644 lib/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 577f7b3..ebbe571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -574,6 +574,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -767,6 +773,7 @@ dependencies = [ "dirs", "fuser", "futures", + "hashlink", "http", "inquire", "libc", @@ -835,7 +842,7 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -843,6 +850,18 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", +] + +[[package]] +name = "hashlink" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230" +dependencies = [ + "hashbrown 0.16.1", +] [[package]] name = "heck" diff --git a/Cargo.toml b/Cargo.toml index 9256d6e..b6fe250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,9 @@ license = "MIT" repository = "https://github.com/mesa-dot-dev/git-fs" authors = ["Marko Vejnovic "] +[lib] +path = "lib/lib.rs" + [dependencies] clap = { version = "4.5.54", features = ["derive", "env"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } @@ -47,6 +50,7 @@ opentelemetry = { version = "0.29" } opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.29", default-features = false, features = ["http-proto", "trace", "reqwest-blocking-client"] } tracing-opentelemetry = { version = "0.30" } +hashlink = "0.11.0" [features] default = [] diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs new file mode 100644 index 0000000..7e21cda --- /dev/null +++ b/lib/cache/eviction/lru.rs @@ -0,0 +1,210 @@ +//! Implements the LRU eviction policy. + +use std::{collections::HashSet, future::Future, hash::Hash, pin::Pin, sync::Arc}; + +use hashlink::LinkedHashSet; +use tokio::{sync::mpsc::Receiver, task::JoinHandle}; + +/// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete +/// keys when they are evicted. +pub trait Deleter: Send + 'static { + /// Delete the given keys from the cache. The keys are guaranteed to be in the order of + /// eviction. You absolutely MUST delete the keys in the order they are given, otherwise the + /// LRU eviction tracker will get very confused and break. + fn delete<'a>(&mut self, keys: impl Iterator) + where + K: 'a; +} + +/// Messages sent to the LRU eviction tracker worker. +#[derive(Debug, Clone)] +enum Message { + /// Notify the LRU eviction tracker that the given key was accessed. + Accessed(K), + /// Request an eviction set of the given size. + Evict(usize), + /// Notify the LRU eviction tracker that a given key was inserted. + Inserted(K), +} + +#[derive(Debug)] +struct LruProcessingTask> { + receiver: Receiver>, + + /// The ordered set of keys, ordered according to the last-used policy. + ordered_key_set: LinkedHashSet, + + /// Tracks which keys are currently considered evicted. Read the long explanation in + /// service_message for why this is necessary. + evicted_keys: HashSet, + + /// The deleter to call when we need to evict keys. + deleter: D, +} + +impl> LruProcessingTask { + fn new(deleter: D, receiver: Receiver>) -> Self { + Self { + receiver, + ordered_key_set: LinkedHashSet::new(), + evicted_keys: HashSet::new(), + deleter, + } + } + + fn spawn_task(deleter: D, receiver: Receiver>) -> JoinHandle<()> { + // TODO(markovejnovic): This should have a best-effort drop. + tokio::spawn(async move { + let mut task = Self::new(deleter, receiver); + task.work().await; + }) + } + + async fn work(&mut self) { + while let Some(msg) = self.receiver.recv().await + && self.service_message(msg).await + {} + } + + /// Returns true if the task should continue working. + #[must_use] + fn service_message( + &mut self, + message: Message, + ) -> Pin + Send + '_>> { + Box::pin(async move { + match message { + Message::Accessed(k) => { + if self.evicted_keys.contains(&k) { + // This is a ghost access, happens when a client accesses a key that may not + // have been cleaned up yet. Just ignore it. + return true; + } + + self.reposition_existing_key(k); + } + Message::Evict(max_count) => { + // Before we send off the eviction set, we actually need to remove the evicted + // keys from the pending queue. Read the following for an explanation. + // + // The client may access a key AFTER an eviction notice. Until the eviction + // notice is processed by the client, the client will have the key available to + // it. + // + // Consequently, the client may very well access the key after it sent out an + // eviction notice, but before the eviction notice is processed. This will + // result in the key being added as an access message after the eviction + // notice, but before the eviction set is sent to the client. + // + // The problem is, the LruProcessingTask has no way of knowing whether the + // message the client sent is a stale message, or a legit message. + // + // Consider the following queue state in between two loop iterations of the + // LruProcessingTask: + // A -- access + // B -- insert + // E -- eviction request + // C -- the client thread which made the request + // + // A1 A2 A3 E1 A1 B1 -- after this point, no clients can access A1 + // C1 C1 C1 C1 C2 C1 ---------------------------------------------> time + // + // The scenario here is: + // - A1, A2, A3 are used by the clients. + // - C1 wants to add B1, but it doesn't have enough space, so it sends an + // eviction request. + // - In parallel, C2 accesses A1, which is completely legal. + // + // The result is that our queue has A1, even after we sent out the eviction + // request, and we have no way of knowing whether A1 is a stale message or not. + // + // To prevent this whole "is this thing stale or not" problem, we need to + // "erase" keys evicted in the future. Unfortunately, you can't mutate queues, + // so the best we can do is simply mark the stupid keys as "evicted" and ignore + // any accesses from these keys until they're marked as "inserted" again. + // + // We cannot mutate the queue to mark messages as stale, so the best we can do + // is mark the keys as "evicted" and ignore any accesses from these keys until + // they're marked as "inserted" again. + { + let take_count = self.ordered_key_set.len().min(max_count); + let eviction_set_it = self.ordered_key_set.iter().take(take_count); + self.evicted_keys.extend(eviction_set_it.clone()); + self.deleter.delete(eviction_set_it); + for _ in 0..take_count { + if let None = self.ordered_key_set.pop_front() { + break; + } + } + } + } + Message::Inserted(k) => { + debug_assert!(!self.ordered_key_set.contains(&k)); + + // If the key has been evicted, but is now inserted, that means the key is no + // longer stale. + if self.evicted_keys.contains(&k) { + self.evicted_keys.remove(&k); + } + + self.ordered_key_set.insert(k); + } + } + + true + }) + } + + fn reposition_existing_key(&mut self, key: K) { + debug_assert!(self.ordered_key_set.contains(&key)); + + self.ordered_key_set.remove(&key); + self.ordered_key_set.insert(key); + } +} + +/// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and +/// to evict keys when necessary. +#[derive(Debug, Clone)] +pub struct LruEvictionTracker { + worker_message_sender: tokio::sync::mpsc::Sender>, + _worker: Arc>, +} + +impl LruEvictionTracker { + /// Spawn a new LRU eviction tracker with the given deleter and channel size. + pub fn spawn>(deleter: D, channel_size: usize) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(channel_size); + + Self { + worker_message_sender: tx, + _worker: Arc::new(LruProcessingTask::spawn_task(deleter, rx)), + } + } + + /// Notify the LRU eviction tracker that the given key was inserted. + /// + /// You MUST call this method for every new key that is inserted into the cache. + pub async fn insert(&mut self, key: K) { + self.send_msg(Message::Inserted(key)).await; + } + + /// Notify the LRU eviction tracker that the given key was accessed. + /// + /// You MUST call this method for every read or update to a key. + pub async fn access(&mut self, key: K) { + self.send_msg(Message::Accessed(key)).await; + } + + /// Cull the least recently used keys with the given deletion method. + pub async fn cull(&mut self, max_count: usize) { + self.send_msg(Message::Evict(max_count)).await; + } + + /// Send a message to the worker. This is a helper method to reduce code duplication. + async fn send_msg(&mut self, message: Message) { + if let Err(_) = self.worker_message_sender.send(message.clone()).await { + unreachable!(); + } + } +} diff --git a/lib/cache/eviction/mod.rs b/lib/cache/eviction/mod.rs new file mode 100644 index 0000000..94ca450 --- /dev/null +++ b/lib/cache/eviction/mod.rs @@ -0,0 +1,2 @@ +/// Different types of eviction policies for any cache. +pub mod lru; diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs new file mode 100644 index 0000000..388d591 --- /dev/null +++ b/lib/cache/fcache.rs @@ -0,0 +1,106 @@ +//! File-based cache. + +use std::{ + hash::Hash, + path::{Path, PathBuf}, + sync::atomic::AtomicUsize, +}; + +use crate::{cache::traits::AsyncReadableCache, io}; +use thiserror::Error; + +use tokio::io::AsyncReadExt; + +#[derive(Debug, Error)] +pub enum InvalidRootPathError { + #[error("Root path is not a directory: {0}")] + NotADirectory(PathBuf), + + #[error("Root path appears to contain data stemming from sources different to this app: {0}")] + RootPathUnsafeCache(PathBuf), + + #[error("IO error while accessing root path: {0}")] + Io(#[from] std::io::Error), +} + +pub struct FileCache { + root_path: PathBuf, + map: scc::HashMap, + file_generator: AtomicUsize, +} + +impl FileCache { + // Dangerous: Changing this constant may cause the program to treat existing cache directories + // as invalid, and thus provide a worse user experience. Do not change this unless you have a + // very good reason to do so. Changing this will break backwards compatibility with existing + // cache directories. + const GITFS_MARKER_FILE: &'static str = ".gitfs_cache"; + + /// Try to create a new file cache at the given path. + /// + /// If the path exists, it must either be an empty directory, or a directory which was + /// previously used as a cache for this program. + pub async fn new(file_path: &Path) -> Result { + let mut pbuf = match tokio::fs::canonicalize(file_path).await { + Ok(mut p) => { + if !tokio::fs::metadata(&p).await?.is_dir() { + return Err(InvalidRootPathError::NotADirectory(p)); + } + + let mut entries = tokio::fs::read_dir(&p).await?; + let is_empty = entries.next_entry().await?.is_none(); + + p.push(Self::GITFS_MARKER_FILE); + let marker_exists = tokio::fs::try_exists(&p).await?; + p.pop(); + + if !(is_empty || marker_exists) { + return Err(InvalidRootPathError::RootPathUnsafeCache(p)); + } + + io::remove_dir_contents(&p).await?; + + Ok(p) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + tokio::fs::create_dir_all(file_path).await?; + tokio::fs::canonicalize(file_path).await + } + Err(e) => return Err(e.into()), + }?; + + // Create marker file so that subsequent restarts of this application gracefully handle the + // existing cache directory. + pbuf.push(Self::GITFS_MARKER_FILE); + tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&pbuf) + .await?; + pbuf.pop(); + + Ok(Self { + root_path: pbuf, + map: scc::HashMap::new(), + file_generator: AtomicUsize::new(0), + }) + } +} + +impl AsyncReadableCache> for FileCache { + async fn get(&self, key: &K) -> Option> { + let mut file = { + let entry = self.map.get_async(key).await?; + let path = self.root_path.join(entry.get().to_string()); + tokio::fs::File::open(&path).await.ok()? + }; + + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.ok()?; + Some(buf) + } + + async fn contains(&self, key: &K) -> bool { + self.map.contains_async(key).await + } +} diff --git a/lib/cache/mod.rs b/lib/cache/mod.rs new file mode 100644 index 0000000..1bb3741 --- /dev/null +++ b/lib/cache/mod.rs @@ -0,0 +1,3 @@ +pub mod eviction; +pub mod fcache; +pub mod traits; diff --git a/lib/cache/traits.rs b/lib/cache/traits.rs new file mode 100644 index 0000000..8e167be --- /dev/null +++ b/lib/cache/traits.rs @@ -0,0 +1,86 @@ +use std::{future::Future, hash::Hash, pin::Pin}; + +pub trait ReadableCache +where + K: Eq + Hash, + V: Clone, +{ + type Fut<'a, T: 'a>: Future + 'a + where + Self: 'a, + K: 'a, + V: 'a; + + fn get<'a>(&'a self, key: &'a K) -> Self::Fut<'a, Option> + where + V: 'a; + fn contains<'a>(&'a self, key: &'a K) -> Self::Fut<'a, bool> + where + V: 'a; +} + +pub trait SyncReadableCache +where + K: Eq + Hash, + V: Clone, +{ + fn get(&self, key: &K) -> Option; + fn contains(&self, key: &K) -> bool; +} + +pub trait AsyncReadableCache +where + K: Eq + Hash, + V: Clone, +{ + async fn get(&self, key: &K) -> Option; + async fn contains(&self, key: &K) -> bool; +} + +impl> !SyncReadableCache for T {} + +impl> ReadableCache for T { + type Fut<'a, O: 'a> + = Pin + 'a>> + where + Self: 'a, + K: 'a, + V: 'a; + + fn get<'a>(&'a self, key: &'a K) -> Self::Fut<'a, Option> + where + V: 'a, + { + Box::pin(AsyncReadableCache::get(self, key)) + } + + fn contains<'a>(&'a self, key: &'a K) -> Self::Fut<'a, bool> + where + V: 'a, + { + Box::pin(AsyncReadableCache::contains(self, key)) + } +} + +impl> ReadableCache for T { + type Fut<'a, O: 'a> + = std::future::Ready + where + Self: 'a, + K: 'a, + V: 'a; + + fn get<'a>(&'a self, key: &'a K) -> Self::Fut<'a, Option> + where + V: 'a, + { + std::future::ready(SyncReadableCache::get(self, key)) + } + + fn contains<'a>(&'a self, key: &'a K) -> Self::Fut<'a, bool> + where + V: 'a, + { + std::future::ready(SyncReadableCache::contains(self, key)) + } +} diff --git a/lib/io.rs b/lib/io.rs new file mode 100644 index 0000000..95a5d97 --- /dev/null +++ b/lib/io.rs @@ -0,0 +1,17 @@ +//! Random IO utilities + +use std::path::Path; + +/// Remove all files and directories in the given directory, but not the directory itself. +pub async fn remove_dir_contents(path: &Path) -> std::io::Result<()> { + let mut entries = tokio::fs::read_dir(path).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if tokio::fs::metadata(&path).await?.is_dir() { + tokio::fs::remove_dir_all(path).await?; + } else { + tokio::fs::remove_file(path).await?; + } + } + Ok(()) +} diff --git a/lib/lib.rs b/lib/lib.rs new file mode 100644 index 0000000..058aae4 --- /dev/null +++ b/lib/lib.rs @@ -0,0 +1,9 @@ +//! git-fs shared library. + +#![feature(negative_impls)] +#![feature(with_negative_coherence)] +#![feature(fn_traits)] + +/// Caching primitives for git-fs. +pub mod cache; +pub mod io; From b5594f97b47cdb8b444fb754d251a70be2e0a371 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 11:01:17 -0800 Subject: [PATCH 02/15] hookup LRU cache to fcache.rs --- lib/cache/eviction/lru.rs | 8 ++++---- lib/cache/fcache.rs | 41 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index 7e21cda..0faa7fc 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -185,24 +185,24 @@ impl LruEvictionTracker { /// Notify the LRU eviction tracker that the given key was inserted. /// /// You MUST call this method for every new key that is inserted into the cache. - pub async fn insert(&mut self, key: K) { + pub async fn insert(&self, key: K) { self.send_msg(Message::Inserted(key)).await; } /// Notify the LRU eviction tracker that the given key was accessed. /// /// You MUST call this method for every read or update to a key. - pub async fn access(&mut self, key: K) { + pub async fn access(&self, key: K) { self.send_msg(Message::Accessed(key)).await; } /// Cull the least recently used keys with the given deletion method. - pub async fn cull(&mut self, max_count: usize) { + pub async fn cull(&self, max_count: usize) { self.send_msg(Message::Evict(max_count)).await; } /// Send a message to the worker. This is a helper method to reduce code duplication. - async fn send_msg(&mut self, message: Message) { + async fn send_msg(&self, message: Message) { if let Err(_) = self.worker_message_sender.send(message.clone()).await { unreachable!(); } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 388d591..6c04d5d 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -6,11 +6,35 @@ use std::{ sync::atomic::AtomicUsize, }; -use crate::{cache::traits::AsyncReadableCache, io}; +use crate::{ + cache::{ + eviction::lru::{Deleter, LruEvictionTracker}, + traits::AsyncReadableCache, + }, + io, +}; use thiserror::Error; use tokio::io::AsyncReadExt; +struct LruEntryDeleter; + +impl LruEntryDeleter { + fn new() -> Self { + Self + } +} + +impl Deleter for LruEntryDeleter { + fn delete<'a>(&mut self, keys: impl Iterator) + where + K: 'a, + { + // TODO(markovejnovic): Implement this. + unimplemented!() + } +} + #[derive(Debug, Error)] pub enum InvalidRootPathError { #[error("Root path is not a directory: {0}")] @@ -27,9 +51,15 @@ pub struct FileCache { root_path: PathBuf, map: scc::HashMap, file_generator: AtomicUsize, + lru_tracker: LruEvictionTracker, } -impl FileCache { +impl FileCache { + // How many cache entries to evict at most in a single batch. + // + // Not really sure how to determine this number. + const LRU_EVICTION_MAX_BATCH_SIZE: usize = 32; + // Dangerous: Changing this constant may cause the program to treat existing cache directories // as invalid, and thus provide a worse user experience. Do not change this unless you have a // very good reason to do so. Changing this will break backwards compatibility with existing @@ -83,17 +113,22 @@ impl FileCache { root_path: pbuf, map: scc::HashMap::new(), file_generator: AtomicUsize::new(0), + lru_tracker: LruEvictionTracker::spawn( + LruEntryDeleter::new(), + Self::LRU_EVICTION_MAX_BATCH_SIZE, + ), }) } } -impl AsyncReadableCache> for FileCache { +impl AsyncReadableCache> for FileCache { async fn get(&self, key: &K) -> Option> { let mut file = { let entry = self.map.get_async(key).await?; let path = self.root_path.join(entry.get().to_string()); tokio::fs::File::open(&path).await.ok()? }; + self.lru_tracker.access(*key).await; let mut buf = Vec::new(); file.read_to_end(&mut buf).await.ok()?; From 26ecc8006789ae143421aa5d8dc17d10619c2226 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 14:15:38 -0800 Subject: [PATCH 03/15] before claude --- Cargo.lock | 1 + Cargo.toml | 1 + lib/cache/fcache.rs | 284 ++++++++++++++++++++++++++++++++++++++++---- lib/cache/traits.rs | 141 ++++++++++++++++++++++ 4 files changed, 402 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ebbe571..93ac821 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -788,6 +788,7 @@ dependencies = [ "reqwest-middleware", "rustc-hash", "scc", + "scopeguard", "secrecy", "self_update", "semver", diff --git a/Cargo.toml b/Cargo.toml index b6fe250..5ea9bb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.29", default-features = false, features = ["http-proto", "trace", "reqwest-blocking-client"] } tracing-opentelemetry = { version = "0.30" } hashlink = "0.11.0" +scopeguard = "1.2.0" [features] default = [] diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 6c04d5d..214cad5 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -1,56 +1,121 @@ -//! File-based cache. +//! File-based MT-safe, async-safe data cache. use std::{ + fmt::Debug, hash::Hash, path::{Path, PathBuf}, - sync::atomic::AtomicUsize, + sync::{Arc, atomic::AtomicUsize}, }; +use tracing::error; + use crate::{ cache::{ eviction::lru::{Deleter, LruEvictionTracker}, - traits::AsyncReadableCache, + traits::{AsyncReadableCache, AsyncWritableCache}, }, io, }; use thiserror::Error; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; -struct LruEntryDeleter; +struct LruEntryDeleter { + file_cache: Arc>, +} -impl LruEntryDeleter { - fn new() -> Self { - Self +impl LruEntryDeleter { + fn new(file_cache: Arc>) -> Self { + Self { file_cache } } } -impl Deleter for LruEntryDeleter { +impl Deleter for LruEntryDeleter { + /// The LRU cache will call this method when it wants to evict keys. fn delete<'a>(&mut self, keys: impl Iterator) where K: 'a, { - // TODO(markovejnovic): Implement this. - unimplemented!() + for key in keys { + // Deleting keys should be atomic from scc::HashMap, which means that all subsequent + // lookups on the same key will fail, and thus we'll be a-ok. + if let Some(entry) = self.file_cache.map.remove_sync(key) { + // On the other hand, deleting the file may fail for a variety of reasons, but + // that's all mostly managed by the delete_entry_from_disk_sync method, which + // gracefully handles the case where the file is already deleted. + self.file_cache.delete_entry_from_disk_sync(&entry.1); + } + } } } +/// Error thrown during construction of a FileCache which describes why the given root path is +/// invalid. #[derive(Debug, Error)] pub enum InvalidRootPathError { + /// The root path exists but isn't a directory. #[error("Root path is not a directory: {0}")] NotADirectory(PathBuf), + /// The root path exists and is a directory, but it isn't empty and the data in it appears to + /// come from a different source than this application. #[error("Root path appears to contain data stemming from sources different to this app: {0}")] RootPathUnsafeCache(PathBuf), + /// An IO error occurred while trying to access the root path. #[error("IO error while accessing root path: {0}")] Io(#[from] std::io::Error), } +/// Error thrown during insertion into the cache. +#[derive(Debug, Error)] +pub enum CacheWriteError { + /// An IO error occurred while trying to write the new value to disk. + #[error("IO error while inserting into cache: {0}")] + Io(#[from] std::io::Error), +} + +#[derive(Debug, Clone, Copy)] +struct CacheMapEntry { + /// A unique ID representing the name of the file on disk where the value for this cache entry + /// is stored. This is just an integer that is used to generate the file path for the cache + /// entry. + fid: usize, + + /// The size of the value for this cache entry in bytes. This is used to keep track of the + /// total size of the cache, and to evict entries when necessary. + size_bytes: usize, +} + +/// A general-purpose file-backed cache. This cache is designed to store arbitrary byte values on +/// disk, and to retrieve them later using a key. +/// +/// This cache is considered thread-safe and async-safe. pub struct FileCache { + /// The root path for the cache. This is the directory where all cache files will be stored. root_path: PathBuf, - map: scc::HashMap, + + /// The main map of the cache. This maps keys to file IDs, which are just integers that + /// correspond to file paths on disk. The file ID is used to generate the file path for the + /// cache entry. + map: scc::HashMap, + + /// Generates unique file paths for new cache entries. This is just a counter that increments + /// for each new file, and the file ID is the value of the counter at the time of creation. file_generator: AtomicUsize, + + /// Rough estimate of the current size of the cache in bytes. Not exact, but should be close + /// enough for eviction purposes. + size_bytes: AtomicUsize, + + /// The maximum size of the cache in bytes. This is used to determine when to evict entries + /// from the cache. This is a soft hint, rather than a hard limit, and the cache may + /// temporarily exceed this size during insertions, but it will try to evict entries as soon as + /// possible to get back under this limit. + max_size_bytes: usize, + + /// LRU eviction tracker. This is used to track the least recently used keys in the cache, and + /// to evict keys when necessary. lru_tracker: LruEvictionTracker, } @@ -66,11 +131,28 @@ impl FileCache { // cache directories. const GITFS_MARKER_FILE: &'static str = ".gitfs_cache"; + // The total maximum number of times we will try to read a "deleted" file before giving up and + // treating it as a hard error. + // + // See usage for reasoning on why this is necessary. + const MAX_READ_RETRY_COUNT: usize = 8; + /// Try to create a new file cache at the given path. /// - /// If the path exists, it must either be an empty directory, or a directory which was - /// previously used as a cache for this program. - pub async fn new(file_path: &Path) -> Result { + /// + /// # Args + /// + /// * `file_path` - If the path exists, it must either be an empty directory, or a directory + /// which was previously used as a cache for this program. + /// * `max_size_bytes` - The maximum size of the cache in bytes. This is used to determine when + /// to evict entries from the cache. This is a soft hint, rather than a + /// hard limit, and the cache may temporarily exceed this size during + /// insertions, but it will try to evict entries as soon as possible to + /// get back under this limit. + pub async fn new( + file_path: &Path, + max_size_bytes: usize, + ) -> Result { let mut pbuf = match tokio::fs::canonicalize(file_path).await { Ok(mut p) => { if !tokio::fs::metadata(&p).await?.is_dir() { @@ -113,29 +195,181 @@ impl FileCache { root_path: pbuf, map: scc::HashMap::new(), file_generator: AtomicUsize::new(0), + size_bytes: AtomicUsize::new(0), + max_size_bytes, lru_tracker: LruEvictionTracker::spawn( LruEntryDeleter::new(), Self::LRU_EVICTION_MAX_BATCH_SIZE, ), }) } + + /// Retrieve the correct path for the given file ID. + fn path_for(&self, fid: usize) -> PathBuf { + self.root_path.join(fid.to_string()) + } + + async fn create_file(&self, path: &Path) -> Result { + tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .open(path) + .await + } + + /// Given an entry, remove it from the disk and update the size of the cache accordingly. + /// + /// Gracefully handles the case where the file doesn't exist, which can happen if the file was + /// deleted after we read the file ID from the map, but before we tried to delete + /// the file. + async fn delete_entry_from_disk_async(&self, entry: &CacheMapEntry) -> std::io::Result<()> { + match tokio::fs::remove_file(self.path_for(entry.fid)).await { + Ok(()) => { + self.size_bytes + .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + // Note that there's a race condition between the deleter and the mutator in + // AsyncWritableCache. Either one of these operations atomically grabs the file ID (so + // accessing the file ID is safe), but then they race to delete the file. + // + // Consequently, it's quite possible that the file is already deleted by the time we + // try to delete it, and that's already fine with us, so we just treat that case as a + // successful deletion. + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } + + /// Given an entry, remove it from the disk and update the size of the cache accordingly. + /// + /// Gracefully handles the case where the file doesn't exist, which can happen if the file was + /// deleted after we read the file ID from the map, but before we tried to delete + /// the file. + fn delete_entry_from_disk_sync(&self, entry: &CacheMapEntry) -> std::io::Result<()> { + match std::fs::remove_file(self.path_for(entry.fid)) { + Ok(()) => { + self.size_bytes + .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + // Note that there's a race condition between the deleter and the mutator in + // AsyncWritableCache. Either one of these operations atomically grabs the file ID (so + // accessing the file ID is safe), but then they race to delete the file. + // + // Consequently, it's quite possible that the file is already deleted by the time we + // try to delete it, and that's already fine with us, so we just treat that case as a + // successful deletion. + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } } -impl AsyncReadableCache> for FileCache { +impl AsyncReadableCache> for FileCache { async fn get(&self, key: &K) -> Option> { - let mut file = { - let entry = self.map.get_async(key).await?; - let path = self.root_path.join(entry.get().to_string()); - tokio::fs::File::open(&path).await.ok()? - }; - self.lru_tracker.access(*key).await; + for _ in 0..Self::MAX_READ_RETRY_COUNT { + // Grab the best-guess file ID for the given key. If this fails, then the key is not in + // the cache, and we can return `None`. + let entry = *(self.map.get_async(key).await?); + + // The next thing we need to do is try to open the file. This may fail for a plethora + // of reasons. + // + // TODO(markovejnovic): path_for allocates on heap, could be on stack. + let mut file = match tokio::fs::File::open(self.path_for(entry.fid)).await { + Ok(file) => file, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // The file was not found. + // This may happen for two reasons: + // + // There is a bug in the code. + // + // The file was id was deleted after we read the file ID from the map on line: + // + // let fid: usize = *(self.map.get_async(key).await?); + // + // The file can get deleted as part of the insert procedure, or the deletion + // procedure, so we need to try to re-read the file. + continue; + } + Err(e) => { + error!(error = ?e, key = ?key, "IO error while opening file for cache key"); + + // This MIGHT be recoverable. The IO error may very well be transient, so we'll + // just say we don't have this value and then pray that the user will perform + // an .insert + return None; + } + }; - let mut buf = Vec::new(); - file.read_to_end(&mut buf).await.ok()?; - Some(buf) + let mut buf = Vec::new(); + file.read_to_end(&mut buf) + .await + .inspect_err(|e| { + error!(error = ?e, key = ?key, "IO error while reading file for cache key"); + }) + .ok()?; + self.lru_tracker.access(*key).await; + return Some(buf); + } + + error!(key = ?key, attempt_count = Self::MAX_READ_RETRY_COUNT, "could not find file."); + + // Again, might be recoverable if the user calls an .insert later on. + None } async fn contains(&self, key: &K) -> bool { self.map.contains_async(key).await } } + +impl AsyncWritableCache, CacheWriteError> + for FileCache +{ + async fn insert(&self, key: &K, value: Vec) -> Result<(), CacheWriteError> { + // Inserts are tricky. The first thing we'll do is find a new file handle for the new + // value. + let new_entry = CacheMapEntry { + fid: self + .file_generator + .fetch_add(1, std::sync::atomic::Ordering::Relaxed), + size_bytes: value.len(), + }; + + if self.size_bytes.load(std::sync::atomic::Ordering::Relaxed) + new_entry.size_bytes + > self.max_size_bytes + { + // We need to evict some entries before we can insert this new entry. Let's just evict + // a batch of entries. + self.lru_tracker + .cull(Self::LRU_EVICTION_MAX_BATCH_SIZE) + .await; + } + + let mut new_file = self.create_file(&self.path_for(new_entry.fid)).await?; + new_file.write_all(&value).await?; + + self.size_bytes + .fetch_add(new_entry.size_bytes, std::sync::atomic::Ordering::Relaxed); + + // Now we insert the new file ID into the map, and get the old file ID if it exists. + if let Some(old_entry) = self.map.upsert_async(*key, new_entry).await { + // If there was an old file ID, we need to delete the old file and notify the LRU + // tracker that the key was accessed. + self.lru_tracker.access(*key).await; + + // TODO(markovejnovic): Could stack allocate the path. + // Note that the file already may be deleted at this point -- the LRU tracker deleter + // may have already deleted this file, so if the file doesn't exist, that's already + // fine with us. + self.delete_entry_from_disk_async(&old_entry).await?; + } else { + self.lru_tracker.insert(*key).await; + } + + // Epic, we did it. + Ok(()) + } +} diff --git a/lib/cache/traits.rs b/lib/cache/traits.rs index 8e167be..0cb01db 100644 --- a/lib/cache/traits.rs +++ b/lib/cache/traits.rs @@ -1,39 +1,69 @@ use std::{future::Future, hash::Hash, pin::Pin}; +/// A readable cache is a cache that can be read from, but not necessarily written to. +/// +/// This trait is designed to be a generic trait for any cache. When using a cache as a generic +/// parameter, this is the correct trait to use. +/// +/// You should avoid manually implementing this trait, and prefer implementing the +/// `SyncReadableCache` or `AsyncReadableCache` traits instead, which will automatically implement +/// this trait for you. pub trait ReadableCache where K: Eq + Hash, V: Clone, { + /// The future type returned by the `get` and `contains` methods. This is used to allow + /// zero-cost synchronous and asynchronous implementations of the `ReadableCache` trait. type Fut<'a, T: 'a>: Future + 'a where Self: 'a, K: 'a, V: 'a; + /// Fetch the value associated with the given key from the cache. If the key is not present in + /// the cache, return `None`. fn get<'a>(&'a self, key: &'a K) -> Self::Fut<'a, Option> where V: 'a; + + /// Check if the cache contains the given key. fn contains<'a>(&'a self, key: &'a K) -> Self::Fut<'a, bool> where V: 'a; } +/// Convenience trait for implementing `ReadableCache` with synchronous cache accesses. +/// +/// Avoid using this cache as a generic parameter. `ReadableCache` is the correct trait to use for +/// generic parameters, and will support both synchronous and asynchronous implementations of the +/// cache. pub trait SyncReadableCache where K: Eq + Hash, V: Clone, { + /// See `ReadableCache::get`. fn get(&self, key: &K) -> Option; + + /// See `ReadableCache::contains`. fn contains(&self, key: &K) -> bool; } +/// Convenience trait for implementing `ReadableCache` with asynchronous cache accesses. +/// +/// Avoid using this cache as a generic parameter. `ReadableCache` is the correct trait to use for +/// generic parameters, and will support both synchronous and asynchronous implementations of the +/// cache. pub trait AsyncReadableCache where K: Eq + Hash, V: Clone, { + /// See `ReadableCache::get`. async fn get(&self, key: &K) -> Option; + + /// See `ReadableCache::contains`. async fn contains(&self, key: &K) -> bool; } @@ -84,3 +114,114 @@ impl> ReadableCache for std::future::ready(SyncReadableCache::contains(self, key)) } } + +/// A writable cache is a cache that can be written to, but not necessarily read from. +/// +/// Note that this trait also does not require the cache support manual eviction of entries. Many +/// caches only support automatic eviction (such as LRU eviction), and this trait is designed to +/// support those caches as well. +/// +/// This trait is designed to be a generic trait for any cache. When using a cache as a generic +/// parameter, this is the correct trait to use. +/// +/// You should avoid manually implementing this trait, and prefer implementing the +/// `SyncWritableCache` or `AsyncWritableCache` traits instead, which will automatically implement +/// this trait for you. +pub trait WritableCache +where + K: Eq + Hash, + V: Clone, + E: std::error::Error, +{ + /// The future type returned by the `insert` method. This is used to allow zero-cost + /// synchronous and asynchronous implementations of the `WritableCache` trait. + type Fut<'a, T: 'a>: Future + 'a + where + Self: 'a, + K: 'a, + V: 'a, + E: 'a; + + /// Insert the given value into the cache with the given key. If the key was already present in + /// the cache, return the old value. Otherwise, return `None`. + fn insert<'a>(&'a self, key: &'a K, value: V) -> Self::Fut<'a, Result<(), E>> + where + V: 'a; +} + +/// Convenience trait for implementing `WritableCache` with synchronous cache accesses. +/// +/// Avoid using this cache as a generic parameter. `WritableCache` is the correct trait to use for +/// generic parameters, and will support both synchronous and asynchronous implementations of the +/// cache. +pub trait SyncWritableCache +where + K: Eq + Hash, + V: Clone, + E: std::error::Error, +{ + /// Insert the given value into the cache with the given key. If the key was already present in + /// the cache, return the old value. Otherwise, return `None`. + fn insert(&self, key: &K, value: V) -> Result<(), E>; +} + +/// Convenience trait for implementing `WritableCache` with asynchronous cache accesses. +/// +/// Avoid using this cache as a generic parameter. `WritableCache` is the correct trait to use for +/// generic parameters, and will support both synchronous and asynchronous implementations of the +/// cache. +pub trait AsyncWritableCache +where + K: Eq + Hash, + V: Clone, + E: std::error::Error, +{ + /// Insert the given value into the cache with the given key. If the key was already present in + /// the cache, return the old value. Otherwise, return `None`. + async fn insert(&self, key: &K, value: V) -> Result<(), E>; +} + +impl> + !SyncWritableCache for T +{ +} + +impl> + WritableCache for T +{ + type Fut<'a, O: 'a> + = Pin + 'a>> + where + Self: 'a, + K: 'a, + V: 'a, + E: 'a; + + fn insert<'a>(&'a self, key: &'a K, value: V) -> Self::Fut<'a, Result<(), E>> + where + V: 'a, + E: 'a, + { + Box::pin(AsyncWritableCache::insert(self, key, value)) + } +} + +impl> + WritableCache for T +{ + type Fut<'a, O: 'a> + = std::future::Ready + where + Self: 'a, + K: 'a, + V: 'a, + E: 'a; + + fn insert<'a>(&'a self, key: &'a K, value: V) -> Self::Fut<'a, Result<(), E>> + where + V: 'a, + E: 'a, + { + std::future::ready(SyncWritableCache::insert(self, key, value)) + } +} From a4bc26b3c4863a3b62aa4f21a2595d3a1dd6b785 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 14:33:37 -0800 Subject: [PATCH 04/15] random lint --- lib/cache/eviction/lru.rs | 21 ++++++++++++++++----- lib/cache/mod.rs | 3 +++ lib/cache/traits.rs | 2 ++ 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index 0faa7fc..287038d 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -35,7 +35,7 @@ struct LruProcessingTask> { ordered_key_set: LinkedHashSet, /// Tracks which keys are currently considered evicted. Read the long explanation in - /// service_message for why this is necessary. + /// `service_message` for why this is necessary. evicted_keys: HashSet, /// The deleter to call when we need to evict keys. @@ -132,14 +132,17 @@ impl> LruProcessingTask { - debug_assert!(!self.ordered_key_set.contains(&k)); + debug_assert!( + !self.ordered_key_set.contains(&k), + "key must not already exist in the ordered set when inserting" + ); // If the key has been evicted, but is now inserted, that means the key is no // longer stale. @@ -156,7 +159,10 @@ impl> LruProcessingTask LruEvictionTracker { /// Send a message to the worker. This is a helper method to reduce code duplication. async fn send_msg(&self, message: Message) { - if let Err(_) = self.worker_message_sender.send(message.clone()).await { + if self + .worker_message_sender + .send(message.clone()) + .await + .is_err() + { unreachable!(); } } diff --git a/lib/cache/mod.rs b/lib/cache/mod.rs index 1bb3741..e0c1c97 100644 --- a/lib/cache/mod.rs +++ b/lib/cache/mod.rs @@ -1,3 +1,6 @@ +/// Cache eviction policies. pub mod eviction; +/// File-backed cache implementation. pub mod fcache; +/// Cache traits for read and write operations. pub mod traits; diff --git a/lib/cache/traits.rs b/lib/cache/traits.rs index 0cb01db..9a3d7ac 100644 --- a/lib/cache/traits.rs +++ b/lib/cache/traits.rs @@ -55,6 +55,7 @@ where /// Avoid using this cache as a generic parameter. `ReadableCache` is the correct trait to use for /// generic parameters, and will support both synchronous and asynchronous implementations of the /// cache. +#[expect(async_fn_in_trait)] pub trait AsyncReadableCache where K: Eq + Hash, @@ -170,6 +171,7 @@ where /// Avoid using this cache as a generic parameter. `WritableCache` is the correct trait to use for /// generic parameters, and will support both synchronous and asynchronous implementations of the /// cache. +#[expect(async_fn_in_trait)] pub trait AsyncWritableCache where K: Eq + Hash, From f8c4952a5a6f82873c52099c2f58d1baff63eebc Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 14:39:57 -0800 Subject: [PATCH 05/15] Get the fcache.rs compiling --- lib/cache/fcache.rs | 181 ++++++++++++++++++++------------------------ 1 file changed, 84 insertions(+), 97 deletions(-) diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 214cad5..521401b 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -18,15 +18,60 @@ use crate::{ }; use thiserror::Error; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + +struct FileCacheShared { + root_path: PathBuf, + map: scc::HashMap, + size_bytes: AtomicUsize, +} + +impl FileCacheShared { + fn size(&self) -> usize { + self.size_bytes.load(std::sync::atomic::Ordering::Relaxed) + } + + fn path_for(&self, fid: usize) -> PathBuf { + self.root_path.join(fid.to_string()) + } + + /// Given an entry, remove it from the disk and update the size of the cache accordingly. + /// + /// Gracefully handles the case where the file doesn't exist, which can happen if the file was + /// deleted after we read the file ID from the map, but before we tried to delete + /// the file. + async fn delete_entry_from_disk_async(&self, entry: &CacheMapEntry) -> std::io::Result<()> { + match tokio::fs::remove_file(self.path_for(entry.fid)).await { + Ok(()) => { + self.size_bytes + .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } + + fn delete_entry_from_disk_sync(&self, entry: &CacheMapEntry) -> std::io::Result<()> { + match std::fs::remove_file(self.path_for(entry.fid)) { + Ok(()) => { + self.size_bytes + .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); + Ok(()) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } +} struct LruEntryDeleter { - file_cache: Arc>, + shared: Arc>, } impl LruEntryDeleter { - fn new(file_cache: Arc>) -> Self { - Self { file_cache } + fn new(shared: Arc>) -> Self { + Self { shared } } } @@ -37,19 +82,16 @@ impl Deleter for LruEntryDeleter K: 'a, { for key in keys { - // Deleting keys should be atomic from scc::HashMap, which means that all subsequent - // lookups on the same key will fail, and thus we'll be a-ok. - if let Some(entry) = self.file_cache.map.remove_sync(key) { - // On the other hand, deleting the file may fail for a variety of reasons, but - // that's all mostly managed by the delete_entry_from_disk_sync method, which - // gracefully handles the case where the file is already deleted. - self.file_cache.delete_entry_from_disk_sync(&entry.1); + if let Some(entry) = self.shared.map.remove_sync(key) + && let Err(e) = self.shared.delete_entry_from_disk_sync(&entry.1) + { + error!(error = ?e, "failed to delete evicted cache entry from disk"); } } } } -/// Error thrown during construction of a FileCache which describes why the given root path is +/// Error thrown during construction of a `FileCache` which describes why the given root path is /// invalid. #[derive(Debug, Error)] pub enum InvalidRootPathError { @@ -92,22 +134,12 @@ struct CacheMapEntry { /// /// This cache is considered thread-safe and async-safe. pub struct FileCache { - /// The root path for the cache. This is the directory where all cache files will be stored. - root_path: PathBuf, - - /// The main map of the cache. This maps keys to file IDs, which are just integers that - /// correspond to file paths on disk. The file ID is used to generate the file path for the - /// cache entry. - map: scc::HashMap, + shared: Arc>, /// Generates unique file paths for new cache entries. This is just a counter that increments /// for each new file, and the file ID is the value of the counter at the time of creation. file_generator: AtomicUsize, - /// Rough estimate of the current size of the cache in bytes. Not exact, but should be close - /// enough for eviction purposes. - size_bytes: AtomicUsize, - /// The maximum size of the cache in bytes. This is used to determine when to evict entries /// from the cache. This is a soft hint, rather than a hard limit, and the cache may /// temporarily exceed this size during insertions, but it will try to evict entries as soon as @@ -119,7 +151,7 @@ pub struct FileCache { lru_tracker: LruEvictionTracker, } -impl FileCache { +impl FileCache { // How many cache entries to evict at most in a single batch. // // Not really sure how to determine this number. @@ -143,12 +175,11 @@ impl FileCache { /// # Args /// /// * `file_path` - If the path exists, it must either be an empty directory, or a directory - /// which was previously used as a cache for this program. + /// which was previously used as a cache for this program. /// * `max_size_bytes` - The maximum size of the cache in bytes. This is used to determine when - /// to evict entries from the cache. This is a soft hint, rather than a - /// hard limit, and the cache may temporarily exceed this size during - /// insertions, but it will try to evict entries as soon as possible to - /// get back under this limit. + /// to evict entries from the cache. This is a soft hint, rather than a hard limit, and the + /// cache may temporarily exceed this size during insertions, but it will try to evict entries + /// as soon as possible to get back under this limit. pub async fn new( file_path: &Path, max_size_bytes: usize, @@ -186,98 +217,53 @@ impl FileCache { pbuf.push(Self::GITFS_MARKER_FILE); tokio::fs::OpenOptions::new() .create(true) + .truncate(true) .write(true) .open(&pbuf) .await?; pbuf.pop(); - Ok(Self { + let shared = Arc::new(FileCacheShared { root_path: pbuf, map: scc::HashMap::new(), - file_generator: AtomicUsize::new(0), size_bytes: AtomicUsize::new(0), + }); + + Ok(Self { + shared: Arc::clone(&shared), + file_generator: AtomicUsize::new(0), max_size_bytes, lru_tracker: LruEvictionTracker::spawn( - LruEntryDeleter::new(), + LruEntryDeleter::new(shared), Self::LRU_EVICTION_MAX_BATCH_SIZE, ), }) } - /// Retrieve the correct path for the given file ID. - fn path_for(&self, fid: usize) -> PathBuf { - self.root_path.join(fid.to_string()) - } - async fn create_file(&self, path: &Path) -> Result { tokio::fs::OpenOptions::new() .create(true) + .truncate(true) .write(true) .open(path) .await } - - /// Given an entry, remove it from the disk and update the size of the cache accordingly. - /// - /// Gracefully handles the case where the file doesn't exist, which can happen if the file was - /// deleted after we read the file ID from the map, but before we tried to delete - /// the file. - async fn delete_entry_from_disk_async(&self, entry: &CacheMapEntry) -> std::io::Result<()> { - match tokio::fs::remove_file(self.path_for(entry.fid)).await { - Ok(()) => { - self.size_bytes - .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); - Ok(()) - } - // Note that there's a race condition between the deleter and the mutator in - // AsyncWritableCache. Either one of these operations atomically grabs the file ID (so - // accessing the file ID is safe), but then they race to delete the file. - // - // Consequently, it's quite possible that the file is already deleted by the time we - // try to delete it, and that's already fine with us, so we just treat that case as a - // successful deletion. - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(e) => Err(e), - } - } - - /// Given an entry, remove it from the disk and update the size of the cache accordingly. - /// - /// Gracefully handles the case where the file doesn't exist, which can happen if the file was - /// deleted after we read the file ID from the map, but before we tried to delete - /// the file. - fn delete_entry_from_disk_sync(&self, entry: &CacheMapEntry) -> std::io::Result<()> { - match std::fs::remove_file(self.path_for(entry.fid)) { - Ok(()) => { - self.size_bytes - .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); - Ok(()) - } - // Note that there's a race condition between the deleter and the mutator in - // AsyncWritableCache. Either one of these operations atomically grabs the file ID (so - // accessing the file ID is safe), but then they race to delete the file. - // - // Consequently, it's quite possible that the file is already deleted by the time we - // try to delete it, and that's already fine with us, so we just treat that case as a - // successful deletion. - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(e) => Err(e), - } - } } -impl AsyncReadableCache> for FileCache { +impl AsyncReadableCache> + for FileCache +{ async fn get(&self, key: &K) -> Option> { for _ in 0..Self::MAX_READ_RETRY_COUNT { // Grab the best-guess file ID for the given key. If this fails, then the key is not in // the cache, and we can return `None`. - let entry = *(self.map.get_async(key).await?); + let entry = *(self.shared.map.get_async(key).await?); // The next thing we need to do is try to open the file. This may fail for a plethora // of reasons. // // TODO(markovejnovic): path_for allocates on heap, could be on stack. - let mut file = match tokio::fs::File::open(self.path_for(entry.fid)).await { + let mut file = match tokio::fs::File::open(self.shared.path_for(entry.fid)).await { Ok(file) => file, Err(e) if e.kind() == std::io::ErrorKind::NotFound => { // The file was not found. @@ -321,11 +307,11 @@ impl AsyncReadableCache } async fn contains(&self, key: &K) -> bool { - self.map.contains_async(key).await + self.shared.map.contains_async(key).await } } -impl AsyncWritableCache, CacheWriteError> +impl AsyncWritableCache, CacheWriteError> for FileCache { async fn insert(&self, key: &K, value: Vec) -> Result<(), CacheWriteError> { @@ -338,9 +324,7 @@ impl AsyncWritableCache, CacheW size_bytes: value.len(), }; - if self.size_bytes.load(std::sync::atomic::Ordering::Relaxed) + new_entry.size_bytes - > self.max_size_bytes - { + if self.shared.size() + new_entry.size_bytes > self.max_size_bytes { // We need to evict some entries before we can insert this new entry. Let's just evict // a batch of entries. self.lru_tracker @@ -348,14 +332,17 @@ impl AsyncWritableCache, CacheW .await; } - let mut new_file = self.create_file(&self.path_for(new_entry.fid)).await?; + let mut new_file = self + .create_file(&self.shared.path_for(new_entry.fid)) + .await?; new_file.write_all(&value).await?; - self.size_bytes + self.shared + .size_bytes .fetch_add(new_entry.size_bytes, std::sync::atomic::Ordering::Relaxed); // Now we insert the new file ID into the map, and get the old file ID if it exists. - if let Some(old_entry) = self.map.upsert_async(*key, new_entry).await { + if let Some(old_entry) = self.shared.map.upsert_async(*key, new_entry).await { // If there was an old file ID, we need to delete the old file and notify the LRU // tracker that the key was accessed. self.lru_tracker.access(*key).await; @@ -364,7 +351,7 @@ impl AsyncWritableCache, CacheW // Note that the file already may be deleted at this point -- the LRU tracker deleter // may have already deleted this file, so if the file doesn't exist, that's already // fine with us. - self.delete_entry_from_disk_async(&old_entry).await?; + self.shared.delete_entry_from_disk_async(&old_entry).await?; } else { self.lru_tracker.insert(*key).await; } From fc8bb4757581708480e98c535acc60ed981c3379 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 14:40:30 -0800 Subject: [PATCH 06/15] remove dumb stuff --- Cargo.lock | 1 - Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93ac821..ebbe571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -788,7 +788,6 @@ dependencies = [ "reqwest-middleware", "rustc-hash", "scc", - "scopeguard", "secrecy", "self_update", "semver", diff --git a/Cargo.toml b/Cargo.toml index 5ea9bb9..b6fe250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,6 @@ opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.29", default-features = false, features = ["http-proto", "trace", "reqwest-blocking-client"] } tracing-opentelemetry = { version = "0.30" } hashlink = "0.11.0" -scopeguard = "1.2.0" [features] default = [] From 97c647bf6e312fb38701a0ae43b2683372cb7bf9 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 17:03:44 -0800 Subject: [PATCH 07/15] address clanker comments --- lib/cache/eviction/lru.rs | 300 +++++++++++++++++++++++++------------- lib/cache/fcache.rs | 109 ++++++++------ 2 files changed, 265 insertions(+), 144 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index 287038d..f821bc3 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -1,19 +1,68 @@ //! Implements the LRU eviction policy. -use std::{collections::HashSet, future::Future, hash::Hash, pin::Pin, sync::Arc}; +use std::{ + collections::HashSet, + future::Future, + hash::Hash, + pin::Pin, + sync::{Arc, OnceLock, atomic::AtomicU64}, +}; use hashlink::LinkedHashSet; use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +#[derive(Debug)] +struct DeletionIndicator { + underlying: AtomicU64, +} + +impl DeletionIndicator { + /// Call to mark the start of a batch of deletions. + fn submit_batch(&self) { + self.underlying + .fetch_add(1 << 32, std::sync::atomic::Ordering::Relaxed); + } + + /// Call to mark a batch as being processed right now. + fn process_batch(&self, count: u32) { + self.underlying.fetch_add( + count as u64 - (1 << 32), + std::sync::atomic::Ordering::Relaxed, + ); + } + + /// Call to mark a deletion as being completed. + fn observe_deletion(&self) { + self.underlying + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } + + /// Check if there are any scheduled or active deletions. + fn have_pending_deletions(&self) -> bool { + self.underlying.load(std::sync::atomic::Ordering::Relaxed) == 0 + } + + /// Check if there are any scheduled batches of deletions. + fn have_pending_batches(&self) -> bool { + self.underlying.load(std::sync::atomic::Ordering::Relaxed) >= 1 << 32 + } +} + +impl Default for DeletionIndicator { + fn default() -> Self { + Self { + underlying: AtomicU64::new(0), + } + } +} + /// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete /// keys when they are evicted. -pub trait Deleter: Send + 'static { +pub trait Deleter: Send + Clone + 'static { /// Delete the given keys from the cache. The keys are guaranteed to be in the order of /// eviction. You absolutely MUST delete the keys in the order they are given, otherwise the /// LRU eviction tracker will get very confused and break. - fn delete<'a>(&mut self, keys: impl Iterator) - where - K: 'a; + fn delete(&mut self, key: K) -> impl Future + Send; } /// Messages sent to the LRU eviction tracker worker. @@ -22,7 +71,7 @@ enum Message { /// Notify the LRU eviction tracker that the given key was accessed. Accessed(K), /// Request an eviction set of the given size. - Evict(usize), + Evict(u32), /// Notify the LRU eviction tracker that a given key was inserted. Inserted(K), } @@ -40,122 +89,132 @@ struct LruProcessingTask> { /// The deleter to call when we need to evict keys. deleter: D, + + /// Pointer into the shared deletion tracker. + shared: Arc, } impl> LruProcessingTask { - fn new(deleter: D, receiver: Receiver>) -> Self { + fn new(deleter: D, receiver: Receiver>, shared: Arc) -> Self { Self { receiver, ordered_key_set: LinkedHashSet::new(), evicted_keys: HashSet::new(), deleter, + shared, } } - fn spawn_task(deleter: D, receiver: Receiver>) -> JoinHandle<()> { + fn spawn_task( + deleter: D, + receiver: Receiver>, + shared: Arc, + ) -> JoinHandle<()> { // TODO(markovejnovic): This should have a best-effort drop. tokio::spawn(async move { - let mut task = Self::new(deleter, receiver); + let mut task = Self::new(deleter, receiver, shared); task.work().await; }) } async fn work(&mut self) { while let Some(msg) = self.receiver.recv().await - && self.service_message(msg).await + && self.service_message(msg) {} } /// Returns true if the task should continue working. #[must_use] - fn service_message( - &mut self, - message: Message, - ) -> Pin + Send + '_>> { - Box::pin(async move { - match message { - Message::Accessed(k) => { - if self.evicted_keys.contains(&k) { - // This is a ghost access, happens when a client accesses a key that may not - // have been cleaned up yet. Just ignore it. - return true; - } - - self.reposition_existing_key(k); + fn service_message(&mut self, message: Message) -> bool { + match message { + Message::Accessed(k) => { + if self.evicted_keys.contains(&k) { + // This is a ghost access, happens when a client accesses a key that may not + // have been cleaned up yet. Just ignore it. + return true; } - Message::Evict(max_count) => { - // Before we send off the eviction set, we actually need to remove the evicted - // keys from the pending queue. Read the following for an explanation. - // - // The client may access a key AFTER an eviction notice. Until the eviction - // notice is processed by the client, the client will have the key available to - // it. - // - // Consequently, the client may very well access the key after it sent out an - // eviction notice, but before the eviction notice is processed. This will - // result in the key being added as an access message after the eviction - // notice, but before the eviction set is sent to the client. - // - // The problem is, the LruProcessingTask has no way of knowing whether the - // message the client sent is a stale message, or a legit message. - // - // Consider the following queue state in between two loop iterations of the - // LruProcessingTask: - // A -- access - // B -- insert - // E -- eviction request - // C -- the client thread which made the request - // - // A1 A2 A3 E1 A1 B1 -- after this point, no clients can access A1 - // C1 C1 C1 C1 C2 C1 ---------------------------------------------> time - // - // The scenario here is: - // - A1, A2, A3 are used by the clients. - // - C1 wants to add B1, but it doesn't have enough space, so it sends an - // eviction request. - // - In parallel, C2 accesses A1, which is completely legal. - // - // The result is that our queue has A1, even after we sent out the eviction - // request, and we have no way of knowing whether A1 is a stale message or not. - // - // To prevent this whole "is this thing stale or not" problem, we need to - // "erase" keys evicted in the future. Unfortunately, you can't mutate queues, - // so the best we can do is simply mark the stupid keys as "evicted" and ignore - // any accesses from these keys until they're marked as "inserted" again. - // - // We cannot mutate the queue to mark messages as stale, so the best we can do - // is mark the keys as "evicted" and ignore any accesses from these keys until - // they're marked as "inserted" again. - { - let take_count = self.ordered_key_set.len().min(max_count); - let eviction_set_it = self.ordered_key_set.iter().take(take_count); - self.evicted_keys.extend(eviction_set_it.clone()); - self.deleter.delete(eviction_set_it); - for _ in 0..take_count { - if self.ordered_key_set.pop_front().is_none() { - break; - } - } + + self.reposition_existing_key(k); + } + Message::Evict(max_count) => { + // Before we send off the eviction set, we actually need to remove the evicted + // keys from the pending queue. Read the following for an explanation. + // + // The client may access a key AFTER an eviction notice. Until the eviction + // notice is processed by the client, the client will have the key available to + // it. + // + // Consequently, the client may very well access the key after it sent out an + // eviction notice, but before the eviction notice is processed. This will + // result in the key being added as an access message after the eviction + // notice, but before the eviction set is sent to the client. + // + // The problem is, the LruProcessingTask has no way of knowing whether the + // message the client sent is a stale message, or a legit message. + // + // Consider the following queue state in between two loop iterations of the + // LruProcessingTask: + // A -- access + // B -- insert + // E -- eviction request + // C -- the client thread which made the request + // + // A1 A2 A3 E1 A1 B1 -- after this point, no clients can access A1 + // C1 C1 C1 C1 C2 C1 ---------------------------------------------> time + // + // The scenario here is: + // - A1, A2, A3 are used by the clients. + // - C1 wants to add B1, but it doesn't have enough space, so it sends an + // eviction request. + // - In parallel, C2 accesses A1, which is completely legal. + // + // The result is that our queue has A1, even after we sent out the eviction + // request, and we have no way of knowing whether A1 is a stale message or not. + // + // To prevent this whole "is this thing stale or not" problem, we need to + // "erase" keys evicted in the future. Unfortunately, you can't mutate queues, + // so the best we can do is simply mark the stupid keys as "evicted" and ignore + // any accesses from these keys until they're marked as "inserted" again. + // + // We cannot mutate the queue to mark messages as stale, so the best we can do + // is mark the keys as "evicted" and ignore any accesses from these keys until + // they're marked as "inserted" again. + { + // These integer casts are safe, since max_count is guaranteed to fit + // within a u32, min(MAX_U32, MAX_USIZE) == MAX_U32. + let take_count = self.ordered_key_set.len().min(max_count as usize) as u32; + self.shared.active_deletions.process_batch(take_count); + for _ in 0..take_count { + let Some(k) = self.ordered_key_set.pop_front() else { + break; + }; + self.evicted_keys.insert(k); + let mut deleter = self.deleter.clone(); + let shared_clone = self.shared.clone(); + tokio::spawn(async move { + deleter.delete(k).await; + shared_clone.active_deletions.observe_deletion(); + }); } } - Message::Inserted(k) => { - debug_assert!( - !self.ordered_key_set.contains(&k), - "key must not already exist in the ordered set when inserting" - ); - - // If the key has been evicted, but is now inserted, that means the key is no - // longer stale. - if self.evicted_keys.contains(&k) { - self.evicted_keys.remove(&k); - } + } + Message::Inserted(k) => { + debug_assert!( + !self.ordered_key_set.contains(&k), + "key must not already exist in the ordered set when inserting" + ); - self.ordered_key_set.insert(k); + // If the key has been evicted, but is now inserted, that means the key is no + // longer stale. + if self.evicted_keys.contains(&k) { + self.evicted_keys.remove(&k); } + + self.ordered_key_set.insert(k); } + } - true - }) + true } fn reposition_existing_key(&mut self, key: K) { @@ -169,12 +228,18 @@ impl> LruProcessingTask>, +} + /// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and /// to evict keys when necessary. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct LruEvictionTracker { worker_message_sender: tokio::sync::mpsc::Sender>, - _worker: Arc>, + worker_state: Arc, } impl LruEvictionTracker { @@ -182,40 +247,69 @@ impl LruEvictionTracker { pub fn spawn>(deleter: D, channel_size: usize) -> Self { let (tx, rx) = tokio::sync::mpsc::channel(channel_size); + let worker_state = Arc::new(WorkerState { + active_deletions: DeletionIndicator::default(), + _worker: OnceLock::new(), + }); + let worker = LruProcessingTask::spawn_task(deleter, rx, worker_state.clone()); + worker_state._worker.set(worker).unwrap(); + Self { worker_message_sender: tx, - _worker: Arc::new(LruProcessingTask::spawn_task(deleter, rx)), + worker_state, } } /// Notify the LRU eviction tracker that the given key was inserted. /// /// You MUST call this method for every new key that is inserted into the cache. - pub async fn insert(&self, key: K) { - self.send_msg(Message::Inserted(key)).await; + pub fn insert(&self, key: K) { + self.send_msg(Message::Inserted(key)); } /// Notify the LRU eviction tracker that the given key was accessed. /// /// You MUST call this method for every read or update to a key. - pub async fn access(&self, key: K) { - self.send_msg(Message::Accessed(key)).await; + pub fn access(&self, key: K) { + self.send_msg(Message::Accessed(key)); } /// Cull the least recently used keys with the given deletion method. - pub async fn cull(&self, max_count: usize) { - self.send_msg(Message::Evict(max_count)).await; + pub fn cull(&self, max_count: u32) { + // Culling sets the top-most bit of the active deletion count to indicate that there are + // queued deletions. + self.worker_state.active_deletions.submit_batch(); + self.send_msg(Message::Evict(max_count)); + } + + /// Check whether there are culls that are already scheduled. + pub fn have_pending_culls(&self) -> bool { + self.worker_state.active_deletions.have_pending_batches() } /// Send a message to the worker. This is a helper method to reduce code duplication. - async fn send_msg(&self, message: Message) { + fn send_msg(&self, message: Message) { if self .worker_message_sender - .send(message.clone()) - .await + .blocking_send(message.clone()) .is_err() { unreachable!(); } } + + /// Get the total number of currently active deletions. + /// + /// Useful if you need to spinlock on this, for whatever reason. + /// + /// + /// # Note + /// + /// Does not guarantee ordering in respect to the deletion. The only guarantee this method + /// provides is whether any deletions are pending or not, but it is not safe to use this to + /// synchronize upon the results of your deletions, since the underlying atomic is used in a + /// relaxed manner. + pub fn have_active_deletions(&self) -> bool { + self.worker_state.active_deletions.have_pending_deletions() + } } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 521401b..ea1b8ee 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -35,36 +35,21 @@ impl FileCacheShared { self.root_path.join(fid.to_string()) } - /// Given an entry, remove it from the disk and update the size of the cache accordingly. + /// Given an entry, remove it from the disk. Note this does not update the cache size tracker. /// /// Gracefully handles the case where the file doesn't exist, which can happen if the file was /// deleted after we read the file ID from the map, but before we tried to delete /// the file. async fn delete_entry_from_disk_async(&self, entry: &CacheMapEntry) -> std::io::Result<()> { match tokio::fs::remove_file(self.path_for(entry.fid)).await { - Ok(()) => { - self.size_bytes - .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); - Ok(()) - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), - Err(e) => Err(e), - } - } - - fn delete_entry_from_disk_sync(&self, entry: &CacheMapEntry) -> std::io::Result<()> { - match std::fs::remove_file(self.path_for(entry.fid)) { - Ok(()) => { - self.size_bytes - .fetch_sub(entry.size_bytes, std::sync::atomic::Ordering::Relaxed); - Ok(()) - } + Ok(()) => Ok(()), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => Err(e), } } } +#[derive(Clone)] struct LruEntryDeleter { shared: Arc>, } @@ -77,16 +62,15 @@ impl LruEntryDeleter { impl Deleter for LruEntryDeleter { /// The LRU cache will call this method when it wants to evict keys. - fn delete<'a>(&mut self, keys: impl Iterator) - where - K: 'a, - { - for key in keys { - if let Some(entry) = self.shared.map.remove_sync(key) - && let Err(e) = self.shared.delete_entry_from_disk_sync(&entry.1) - { + async fn delete(&mut self, key: K) { + if let Some(entry) = self.shared.map.remove_sync(&key) { + if let Err(e) = self.shared.delete_entry_from_disk_async(&entry.1).await { error!(error = ?e, "failed to delete evicted cache entry from disk"); } + + self.shared + .size_bytes + .fetch_sub(entry.1.size_bytes, std::sync::atomic::Ordering::Relaxed); } } } @@ -155,7 +139,12 @@ impl FileCache { // How many cache entries to evict at most in a single batch. // // Not really sure how to determine this number. - const LRU_EVICTION_MAX_BATCH_SIZE: usize = 32; + const LRU_EVICTION_MAX_BATCH_SIZE: u32 = 8; + + // The maximum number of messages that can be buffered between this file cache and the LRU + // eviction tracker. If this is too small, then the file cache will be blocked waiting for the + // eviction tracker to catch up. + const MAX_LRU_TRACKER_CHANNEL_SIZE: usize = 256; // Dangerous: Changing this constant may cause the program to treat existing cache directories // as invalid, and thus provide a worse user experience. Do not change this unless you have a @@ -235,7 +224,7 @@ impl FileCache { max_size_bytes, lru_tracker: LruEvictionTracker::spawn( LruEntryDeleter::new(shared), - Self::LRU_EVICTION_MAX_BATCH_SIZE, + Self::MAX_LRU_TRACKER_CHANNEL_SIZE, ), }) } @@ -250,6 +239,21 @@ impl FileCache { } } +impl Drop for FileCache { + fn drop(&mut self) { + // Surprisingly, it's safe to delete files here. + // + // Note that the LRU tracker is running at the time of Drop. It, consequently, may end up + // calling the deleter, but that's fine, since the deleter ignores this problem gracefully. + // + // It is 100% safe to do this here, since it's guaranteed no concurrent task is borrowing + // the FileCache at this point. + if let Err(e) = std::fs::remove_dir_all(&self.shared.root_path) { + error!(error = ?e, "failed to delete cache directory on drop"); + } + } +} + impl AsyncReadableCache> for FileCache { @@ -296,7 +300,7 @@ impl AsyncReadableCache AsyncWritableCache, size_bytes: value.len(), }; - if self.shared.size() + new_entry.size_bytes > self.max_size_bytes { - // We need to evict some entries before we can insert this new entry. Let's just evict - // a batch of entries. - self.lru_tracker - .cull(Self::LRU_EVICTION_MAX_BATCH_SIZE) - .await; + while self.shared.size() + new_entry.size_bytes > self.max_size_bytes { + // TODO(markovejnovic): This whole spinlock situation sounded better in my head, but + // realistically, I think I could've gotten away with just a plain Notify. It is what + // it is now. + + // The cache doesn't have any space for this new entry, so we need to evict some + // entries before we can insert this new entry. + // Before we can do that, we need to make sure there are no other pending culls. If we + // let multiple culls happen at the same time, we risk draining a lot more entries from + // the cache than necessary, which results in a regression. + if self.lru_tracker.have_pending_culls() { + // If there are any pending culls, then we need to just wait. + tokio::task::yield_now().await; + continue; + } + + // There are no culls in progress, but the cache is still too full for this new entry, + // which means we need to evict some entries. + self.lru_tracker.cull(Self::LRU_EVICTION_MAX_BATCH_SIZE); } let mut new_file = self @@ -337,15 +354,14 @@ impl AsyncWritableCache, .await?; new_file.write_all(&value).await?; - self.shared - .size_bytes - .fetch_add(new_entry.size_bytes, std::sync::atomic::Ordering::Relaxed); + let mut size_delta: isize = new_entry.size_bytes as isize; // Now we insert the new file ID into the map, and get the old file ID if it exists. - if let Some(old_entry) = self.shared.map.upsert_async(*key, new_entry).await { + if let Some(old_entry) = self.shared.map.upsert_sync(*key, new_entry) { // If there was an old file ID, we need to delete the old file and notify the LRU // tracker that the key was accessed. - self.lru_tracker.access(*key).await; + self.lru_tracker.access(*key); + size_delta -= old_entry.size_bytes as isize; // TODO(markovejnovic): Could stack allocate the path. // Note that the file already may be deleted at this point -- the LRU tracker deleter @@ -353,7 +369,18 @@ impl AsyncWritableCache, // fine with us. self.shared.delete_entry_from_disk_async(&old_entry).await?; } else { - self.lru_tracker.insert(*key).await; + self.lru_tracker.insert(*key); + } + + if size_delta > 0 { + self.shared + .size_bytes + .fetch_add(size_delta as usize, std::sync::atomic::Ordering::Relaxed); + } else if size_delta < 0 { + self.shared.size_bytes.fetch_sub( + size_delta.abs() as usize, + std::sync::atomic::Ordering::Relaxed, + ); } // Epic, we did it. From e07d5be8080ba5f14ff7fdb31ca2f395360214ef Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 17:12:15 -0800 Subject: [PATCH 08/15] deterministic clanker feedback --- lib/cache/eviction/lru.rs | 26 +++++++++++++------------- lib/cache/fcache.rs | 13 +++++++------ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index f821bc3..4890d82 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -4,7 +4,6 @@ use std::{ collections::HashSet, future::Future, hash::Hash, - pin::Pin, sync::{Arc, OnceLock, atomic::AtomicU64}, }; @@ -26,7 +25,7 @@ impl DeletionIndicator { /// Call to mark a batch as being processed right now. fn process_batch(&self, count: u32) { self.underlying.fetch_add( - count as u64 - (1 << 32), + u64::from(count) - (1 << 32), std::sync::atomic::Ordering::Relaxed, ); } @@ -66,7 +65,7 @@ pub trait Deleter: Send + Clone + 'static { } /// Messages sent to the LRU eviction tracker worker. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] enum Message { /// Notify the LRU eviction tracker that the given key was accessed. Accessed(K), @@ -182,6 +181,7 @@ impl> LruProcessingTask> LruProcessingTask> LruProcessingTask>, + worker: OnceLock>, } /// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and @@ -249,10 +249,12 @@ impl LruEvictionTracker { let worker_state = Arc::new(WorkerState { active_deletions: DeletionIndicator::default(), - _worker: OnceLock::new(), + worker: OnceLock::new(), }); - let worker = LruProcessingTask::spawn_task(deleter, rx, worker_state.clone()); - worker_state._worker.set(worker).unwrap(); + let worker = LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); + if worker_state.worker.set(worker).is_err() { + unreachable!("worker should only be set once, and we just set it"); + } Self { worker_message_sender: tx, @@ -283,17 +285,14 @@ impl LruEvictionTracker { } /// Check whether there are culls that are already scheduled. + #[must_use] pub fn have_pending_culls(&self) -> bool { self.worker_state.active_deletions.have_pending_batches() } /// Send a message to the worker. This is a helper method to reduce code duplication. fn send_msg(&self, message: Message) { - if self - .worker_message_sender - .blocking_send(message.clone()) - .is_err() - { + if self.worker_message_sender.blocking_send(message).is_err() { unreachable!(); } } @@ -309,6 +308,7 @@ impl LruEvictionTracker { /// provides is whether any deletions are pending or not, but it is not safe to use this to /// synchronize upon the results of your deletions, since the underlying atomic is used in a /// relaxed manner. + #[must_use] pub fn have_active_deletions(&self) -> bool { self.worker_state.active_deletions.have_pending_deletions() } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index ea1b8ee..281d7d1 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -354,14 +354,14 @@ impl AsyncWritableCache, .await?; new_file.write_all(&value).await?; - let mut size_delta: isize = new_entry.size_bytes as isize; + let mut size_delta: isize = new_entry.size_bytes.cast_signed(); // Now we insert the new file ID into the map, and get the old file ID if it exists. if let Some(old_entry) = self.shared.map.upsert_sync(*key, new_entry) { // If there was an old file ID, we need to delete the old file and notify the LRU // tracker that the key was accessed. self.lru_tracker.access(*key); - size_delta -= old_entry.size_bytes as isize; + size_delta -= old_entry.size_bytes.cast_signed(); // TODO(markovejnovic): Could stack allocate the path. // Note that the file already may be deleted at this point -- the LRU tracker deleter @@ -373,12 +373,13 @@ impl AsyncWritableCache, } if size_delta > 0 { - self.shared - .size_bytes - .fetch_add(size_delta as usize, std::sync::atomic::Ordering::Relaxed); + self.shared.size_bytes.fetch_add( + size_delta.cast_unsigned(), + std::sync::atomic::Ordering::Relaxed, + ); } else if size_delta < 0 { self.shared.size_bytes.fetch_sub( - size_delta.abs() as usize, + size_delta.unsigned_abs(), std::sync::atomic::Ordering::Relaxed, ); } From b2c454a904a76a74ed12dfacf41dad0ca868331c Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Fri, 13 Feb 2026 18:22:34 -0800 Subject: [PATCH 09/15] more clanker comments --- lib/cache/eviction/lru.rs | 129 ++++++++++++++++++++++---------------- lib/cache/fcache.rs | 41 +++++++++--- 2 files changed, 108 insertions(+), 62 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index 4890d82..fa0e269 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -1,18 +1,20 @@ //! Implements the LRU eviction policy. use std::{ - collections::HashSet, future::Future, hash::Hash, - sync::{Arc, OnceLock, atomic::AtomicU64}, + sync::{ + Arc, OnceLock, + atomic::{AtomicI64, AtomicUsize}, + }, }; -use hashlink::LinkedHashSet; +use hashlink::LinkedHashMap; use tokio::{sync::mpsc::Receiver, task::JoinHandle}; #[derive(Debug)] struct DeletionIndicator { - underlying: AtomicU64, + underlying: AtomicI64, } impl DeletionIndicator { @@ -25,7 +27,7 @@ impl DeletionIndicator { /// Call to mark a batch as being processed right now. fn process_batch(&self, count: u32) { self.underlying.fetch_add( - u64::from(count) - (1 << 32), + i64::from(count) - (1 << 32), std::sync::atomic::Ordering::Relaxed, ); } @@ -38,7 +40,7 @@ impl DeletionIndicator { /// Check if there are any scheduled or active deletions. fn have_pending_deletions(&self) -> bool { - self.underlying.load(std::sync::atomic::Ordering::Relaxed) == 0 + self.underlying.load(std::sync::atomic::Ordering::Relaxed) != 0 } /// Check if there are any scheduled batches of deletions. @@ -50,41 +52,37 @@ impl DeletionIndicator { impl Default for DeletionIndicator { fn default() -> Self { Self { - underlying: AtomicU64::new(0), + underlying: AtomicI64::new(0), } } } /// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete /// keys when they are evicted. -pub trait Deleter: Send + Clone + 'static { +pub trait Deleter: Send + Clone + 'static { /// Delete the given keys from the cache. The keys are guaranteed to be in the order of /// eviction. You absolutely MUST delete the keys in the order they are given, otherwise the /// LRU eviction tracker will get very confused and break. - fn delete(&mut self, key: K) -> impl Future + Send; + fn delete(&mut self, key: K, ctx: Ctx) -> impl Future + Send; } /// Messages sent to the LRU eviction tracker worker. #[derive(Debug, Clone, Copy)] -enum Message { +enum Message { /// Notify the LRU eviction tracker that the given key was accessed. - Accessed(K), + Accessed(K, usize), /// Request an eviction set of the given size. Evict(u32), /// Notify the LRU eviction tracker that a given key was inserted. - Inserted(K), + Inserted(K, C), } #[derive(Debug)] -struct LruProcessingTask> { - receiver: Receiver>, +struct LruProcessingTask> { + receiver: Receiver>, /// The ordered set of keys, ordered according to the last-used policy. - ordered_key_set: LinkedHashSet, - - /// Tracks which keys are currently considered evicted. Read the long explanation in - /// `service_message` for why this is necessary. - evicted_keys: HashSet, + ordered_key_map: LinkedHashMap, /// The deleter to call when we need to evict keys. deleter: D, @@ -93,12 +91,13 @@ struct LruProcessingTask> { shared: Arc, } -impl> LruProcessingTask { - fn new(deleter: D, receiver: Receiver>, shared: Arc) -> Self { +impl> + LruProcessingTask +{ + fn new(deleter: D, receiver: Receiver>, shared: Arc) -> Self { Self { receiver, - ordered_key_set: LinkedHashSet::new(), - evicted_keys: HashSet::new(), + ordered_key_map: LinkedHashMap::new(), deleter, shared, } @@ -106,7 +105,7 @@ impl> LruProcessingTask>, + receiver: Receiver>, shared: Arc, ) -> JoinHandle<()> { // TODO(markovejnovic): This should have a best-effort drop. @@ -124,18 +123,32 @@ impl> LruProcessingTask) -> bool { + fn service_message(&mut self, message: Message) -> bool { match message { - Message::Accessed(k) => { - if self.evicted_keys.contains(&k) { - // This is a ghost access, happens when a client accesses a key that may not - // have been cleaned up yet. Just ignore it. + Message::Accessed(k, ev_gen) => { + if ev_gen + < self + .shared + .eviction_generation + .load(std::sync::atomic::Ordering::Relaxed) + { + // This is a ghost access. Happens when one client sends an eviction, but other + // clients add accesses to the same keys, before our worker thread had a chance + // to clean up. + // + // We're looking at an access in the "future". return true; } self.reposition_existing_key(k); } Message::Evict(max_count) => { + // We got an eviction notice. Bump the eviction generation to indicate that all + // accesses after this point are considered in the "future". Read the subsequent + // comment for an explanation of why we need to do this. + self.shared + .eviction_generation + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); // Before we send off the eviction set, we actually need to remove the evicted // keys from the pending queue. Read the following for an explanation. // @@ -175,6 +188,15 @@ impl> LruProcessingTask> LruProcessingTask { + Message::Inserted(k, ctx) => { debug_assert!( - !self.ordered_key_set.contains(&k), + !self.ordered_key_map.contains_key(&k), "key must not already exist in the ordered set when inserting" ); - // If the key has been evicted, but is now inserted, that means the key is no - // longer stale. - if self.evicted_keys.contains(&k) { - self.evicted_keys.remove(&k); - } - - self.ordered_key_set.insert(k); + self.ordered_key_map.insert(k, ctx); } } @@ -219,36 +234,39 @@ impl> LruProcessingTask>, } /// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and /// to evict keys when necessary. #[derive(Debug)] -pub struct LruEvictionTracker { - worker_message_sender: tokio::sync::mpsc::Sender>, +pub struct LruEvictionTracker { + worker_message_sender: tokio::sync::mpsc::Sender>, worker_state: Arc, } -impl LruEvictionTracker { +impl LruEvictionTracker { /// Spawn a new LRU eviction tracker with the given deleter and channel size. - pub fn spawn>(deleter: D, channel_size: usize) -> Self { + pub fn spawn>(deleter: D, channel_size: usize) -> Self { let (tx, rx) = tokio::sync::mpsc::channel(channel_size); let worker_state = Arc::new(WorkerState { active_deletions: DeletionIndicator::default(), + eviction_generation: AtomicUsize::new(0), worker: OnceLock::new(), }); let worker = LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); @@ -265,15 +283,20 @@ impl LruEvictionTracker { /// Notify the LRU eviction tracker that the given key was inserted. /// /// You MUST call this method for every new key that is inserted into the cache. - pub fn insert(&self, key: K) { - self.send_msg(Message::Inserted(key)); + pub fn insert(&self, key: K, ctx: C) { + self.send_msg(Message::Inserted(key, ctx)); } /// Notify the LRU eviction tracker that the given key was accessed. /// /// You MUST call this method for every read or update to a key. pub fn access(&self, key: K) { - self.send_msg(Message::Accessed(key)); + self.send_msg(Message::Accessed( + key, + self.worker_state + .eviction_generation + .load(std::sync::atomic::Ordering::Relaxed), + )); } /// Cull the least recently used keys with the given deletion method. @@ -291,7 +314,7 @@ impl LruEvictionTracker { } /// Send a message to the worker. This is a helper method to reduce code duplication. - fn send_msg(&self, message: Message) { + fn send_msg(&self, message: Message) { if self.worker_message_sender.blocking_send(message).is_err() { unreachable!(); } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 281d7d1..b002ec1 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -49,6 +49,11 @@ impl FileCacheShared { } } +#[derive(Debug, Clone)] +struct DeleterCtx { + fid: usize, +} + #[derive(Clone)] struct LruEntryDeleter { shared: Arc>, @@ -60,10 +65,15 @@ impl LruEntryDeleter { } } -impl Deleter for LruEntryDeleter { +impl Deleter for LruEntryDeleter { /// The LRU cache will call this method when it wants to evict keys. - async fn delete(&mut self, key: K) { - if let Some(entry) = self.shared.map.remove_sync(&key) { + async fn delete(&mut self, key: K, ctx: DeleterCtx) { + if let Some(entry) = self + .shared + .map + .remove_if_async(&key, |m| m.fid == ctx.fid) + .await + { if let Err(e) = self.shared.delete_entry_from_disk_async(&entry.1).await { error!(error = ?e, "failed to delete evicted cache entry from disk"); } @@ -132,7 +142,7 @@ pub struct FileCache { /// LRU eviction tracker. This is used to track the least recently used keys in the cache, and /// to evict keys when necessary. - lru_tracker: LruEvictionTracker, + lru_tracker: LruEvictionTracker, } impl FileCache { @@ -315,8 +325,8 @@ impl AsyncReadableCache AsyncWritableCache, CacheWriteError> - for FileCache +impl + AsyncWritableCache, CacheWriteError> for FileCache { async fn insert(&self, key: &K, value: Vec) -> Result<(), CacheWriteError> { // Inserts are tricky. The first thing we'll do is find a new file handle for the new @@ -329,6 +339,14 @@ impl AsyncWritableCache, }; while self.shared.size() + new_entry.size_bytes > self.max_size_bytes { + if self.shared.size() == 0 { + // This means that the new entry is larger than the entire cache size limit. In + // this case, we have no choice but to just insert the entry and let the cache + // temporarily exceed the size limit. We will try to evict entries as soon as + // possible, but in the meantime, we need to at least insert this entry. + break; + } + // TODO(markovejnovic): This whole spinlock situation sounded better in my head, but // realistically, I think I could've gotten away with just a plain Notify. It is what // it is now. @@ -340,6 +358,8 @@ impl AsyncWritableCache, // the cache than necessary, which results in a regression. if self.lru_tracker.have_pending_culls() { // If there are any pending culls, then we need to just wait. + // TODO(markovejnovic): This could be nicer and maybe starve the CPU less. Chances + // are we actually need to yield to the LRU tracker task. tokio::task::yield_now().await; continue; } @@ -357,7 +377,7 @@ impl AsyncWritableCache, let mut size_delta: isize = new_entry.size_bytes.cast_signed(); // Now we insert the new file ID into the map, and get the old file ID if it exists. - if let Some(old_entry) = self.shared.map.upsert_sync(*key, new_entry) { + if let Some(old_entry) = self.shared.map.upsert_async(*key, new_entry).await { // If there was an old file ID, we need to delete the old file and notify the LRU // tracker that the key was accessed. self.lru_tracker.access(*key); @@ -367,9 +387,12 @@ impl AsyncWritableCache, // Note that the file already may be deleted at this point -- the LRU tracker deleter // may have already deleted this file, so if the file doesn't exist, that's already // fine with us. - self.shared.delete_entry_from_disk_async(&old_entry).await?; + if let Err(e) = self.shared.delete_entry_from_disk_async(&old_entry).await { + error!(error = ?e, key = ?key, "failed to delete old cache entry from disk"); + } } else { - self.lru_tracker.insert(*key); + self.lru_tracker + .insert(*key, DeleterCtx { fid: new_entry.fid }); } if size_delta > 0 { From 8379ebb867e45e97e058e56780b913888e730841 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Sun, 15 Feb 2026 19:10:21 -0800 Subject: [PATCH 10/15] Fix edge-case bugs --- lib/cache/eviction/lru.rs | 157 ++++++++++++++++++++++---------------- lib/cache/fcache.rs | 42 +++++++--- 2 files changed, 124 insertions(+), 75 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index fa0e269..e8f3917 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -12,19 +12,67 @@ use std::{ use hashlink::LinkedHashMap; use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +/// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete +/// keys when they are evicted. +pub trait Deleter: Send + Clone + 'static { + /// Delete the given keys from the cache. The keys are guaranteed to be in the order of + /// eviction. You absolutely MUST delete the keys in the order they are given, otherwise the + /// LRU eviction tracker will get very confused and break. + fn delete(&mut self, key: K, ctx: Ctx) -> impl Future + Send; +} + +/// Messages sent to the LRU eviction tracker worker. +#[derive(Debug, Clone, Copy)] +enum Message { + /// Notify the LRU eviction tracker that the given key was accessed. + Accessed(K, usize), + /// Request an eviction set of the given size. + Evict(u32), + /// Notify the LRU eviction tracker that a given key was inserted. + Inserted(K, C), +} + +/// Tracks in-flight eviction batches and individual deletions using a single packed `AtomicI64`. +/// +/// Layout: `[upper 32 bits: pending batches | lower 32 bits: active deletions]` +/// +/// The lifecycle of an eviction is: +/// 1. Producer calls `submit_batch()` — increments upper half by 1. +/// 2. Producer enqueues the `Evict` message into the channel. +/// 3. Worker receives the message and calls `process_batch(n)` — decrements upper half by 1 and +/// increments lower half by `n` (the actual number of keys to delete). +/// 4. Each spawned deletion task calls `observe_deletion()` when done — decrements lower half by 1. +/// +/// The indicator reads zero only when no batches are pending and no deletions are in flight. #[derive(Debug)] struct DeletionIndicator { underlying: AtomicI64, } impl DeletionIndicator { - /// Call to mark the start of a batch of deletions. + fn new() -> Self { + Self { + underlying: AtomicI64::new(0), + } + } + + /// Mark that a new eviction batch has been submitted by a producer. Called on the producer + /// side *before* the `Evict` message is sent into the channel. fn submit_batch(&self) { self.underlying .fetch_add(1 << 32, std::sync::atomic::Ordering::Relaxed); } - /// Call to mark a batch as being processed right now. + /// Undo a `submit_batch` call. Used when the channel `try_send` fails after we already + /// incremented the pending-batch counter. + fn undo_submit_batch(&self) { + self.underlying + .fetch_sub(1 << 32, std::sync::atomic::Ordering::Relaxed); + } + + /// Called by the worker when it begins processing an eviction batch. Atomically decrements + /// the pending-batch counter (upper half) by 1 and increments the active-deletion counter + /// (lower half) by `count`. fn process_batch(&self, count: u32) { self.underlying.fetch_add( i64::from(count) - (1 << 32), @@ -32,51 +80,19 @@ impl DeletionIndicator { ); } - /// Call to mark a deletion as being completed. + /// Called by a spawned deletion task when it finishes deleting one key. fn observe_deletion(&self) { self.underlying .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); } - /// Check if there are any scheduled or active deletions. - fn have_pending_deletions(&self) -> bool { - self.underlying.load(std::sync::atomic::Ordering::Relaxed) != 0 - } - - /// Check if there are any scheduled batches of deletions. + /// Returns `true` if there are any pending batches (submitted but not yet processed by the + /// worker). Used by producers to avoid enqueuing redundant eviction requests. fn have_pending_batches(&self) -> bool { self.underlying.load(std::sync::atomic::Ordering::Relaxed) >= 1 << 32 } } -impl Default for DeletionIndicator { - fn default() -> Self { - Self { - underlying: AtomicI64::new(0), - } - } -} - -/// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete -/// keys when they are evicted. -pub trait Deleter: Send + Clone + 'static { - /// Delete the given keys from the cache. The keys are guaranteed to be in the order of - /// eviction. You absolutely MUST delete the keys in the order they are given, otherwise the - /// LRU eviction tracker will get very confused and break. - fn delete(&mut self, key: K, ctx: Ctx) -> impl Future + Send; -} - -/// Messages sent to the LRU eviction tracker worker. -#[derive(Debug, Clone, Copy)] -enum Message { - /// Notify the LRU eviction tracker that the given key was accessed. - Accessed(K, usize), - /// Request an eviction set of the given size. - Evict(u32), - /// Notify the LRU eviction tracker that a given key was inserted. - Inserted(K, C), -} - #[derive(Debug)] struct LruProcessingTask> { receiver: Receiver>, @@ -205,7 +221,13 @@ impl> // within a u32, min(MAX_U32, MAX_USIZE) == MAX_U32. #[expect(clippy::cast_possible_truncation)] let take_count = self.ordered_key_map.len().min(max_count as usize) as u32; + + // Atomically transition this batch from "pending" to "active deletions". + // This decrements the upper half (pending batches) by 1 and increments the + // lower half (active deletions) by take_count. If take_count is 0, this + // still correctly clears the pending batch. self.shared.active_deletions.process_batch(take_count); + for _ in 0..take_count { let Some(e) = self.ordered_key_map.pop_front() else { break; @@ -246,6 +268,8 @@ impl> #[derive(Debug)] struct WorkerState { + /// Packed atomic tracking pending eviction batches (upper 32 bits) and active deletions + /// (lower 32 bits). See `DeletionIndicator` for the full protocol. active_deletions: DeletionIndicator, eviction_generation: AtomicUsize, worker: OnceLock>, @@ -265,7 +289,7 @@ impl LruEvictionTracker let (tx, rx) = tokio::sync::mpsc::channel(channel_size); let worker_state = Arc::new(WorkerState { - active_deletions: DeletionIndicator::default(), + active_deletions: DeletionIndicator::new(), eviction_generation: AtomicUsize::new(0), worker: OnceLock::new(), }); @@ -283,56 +307,57 @@ impl LruEvictionTracker /// Notify the LRU eviction tracker that the given key was inserted. /// /// You MUST call this method for every new key that is inserted into the cache. - pub fn insert(&self, key: K, ctx: C) { - self.send_msg(Message::Inserted(key, ctx)); + pub async fn insert(&self, key: K, ctx: C) { + self.send_msg(Message::Inserted(key, ctx)).await; } /// Notify the LRU eviction tracker that the given key was accessed. /// /// You MUST call this method for every read or update to a key. - pub fn access(&self, key: K) { + pub async fn access(&self, key: K) { self.send_msg(Message::Accessed( key, self.worker_state .eviction_generation .load(std::sync::atomic::Ordering::Relaxed), - )); + )) + .await; } - /// Cull the least recently used keys with the given deletion method. - pub fn cull(&self, max_count: u32) { - // Culling sets the top-most bit of the active deletion count to indicate that there are - // queued deletions. + /// Try to cull the least recently used keys with the given deletion method. + /// + /// Returns `true` if the eviction message was successfully enqueued, or `false` if the + /// channel is full. In the latter case, the caller should yield and retry. + /// + /// The ordering here is critical for correctness: we `submit_batch()` *before* `try_send()` + /// so that the `DeletionIndicator` is always in a state where `have_pending_batches()` returns + /// `true` by the time anyone observes the enqueued message. If the send fails, we roll back + /// with `undo_submit_batch()`, which is a net-zero delta on the packed atomic. + #[must_use] + pub fn try_cull(&self, max_count: u32) -> bool { self.worker_state.active_deletions.submit_batch(); - self.send_msg(Message::Evict(max_count)); + if self + .worker_message_sender + .try_send(Message::Evict(max_count)) + .is_ok() + { + true + } else { + self.worker_state.active_deletions.undo_submit_batch(); + false + } } - /// Check whether there are culls that are already scheduled. + /// Check whether there are culls that are already scheduled or actively in progress. #[must_use] pub fn have_pending_culls(&self) -> bool { self.worker_state.active_deletions.have_pending_batches() } /// Send a message to the worker. This is a helper method to reduce code duplication. - fn send_msg(&self, message: Message) { - if self.worker_message_sender.blocking_send(message).is_err() { + async fn send_msg(&self, message: Message) { + if self.worker_message_sender.send(message).await.is_err() { unreachable!(); } } - - /// Get the total number of currently active deletions. - /// - /// Useful if you need to spinlock on this, for whatever reason. - /// - /// - /// # Note - /// - /// Does not guarantee ordering in respect to the deletion. The only guarantee this method - /// provides is whether any deletions are pending or not, but it is not safe to use this to - /// synchronize upon the results of your deletions, since the underlying atomic is used in a - /// relaxed manner. - #[must_use] - pub fn have_active_deletions(&self) -> bool { - self.worker_state.active_deletions.have_pending_deletions() - } } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index b002ec1..13b35fa 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -20,6 +20,23 @@ use thiserror::Error; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +/// Drop guard that deletes an unregistered cache file if the insert task is cancelled or fails +/// between file creation and map registration. Defused once the file is registered in the map. +struct FileGuard { + path: Option, +} + +impl Drop for FileGuard { + fn drop(&mut self) { + if let Some(path) = self.path.take() + && let Err(e) = std::fs::remove_file(&path) + && e.kind() != std::io::ErrorKind::NotFound + { + error!(error = ?e, path = ?path, "failed to clean up orphaned cache file"); + } + } +} + struct FileCacheShared { root_path: PathBuf, map: scc::HashMap, @@ -310,7 +327,7 @@ impl AsyncReadableCache // There are no culls in progress, but the cache is still too full for this new entry, // which means we need to evict some entries. - self.lru_tracker.cull(Self::LRU_EVICTION_MAX_BATCH_SIZE); + if !self.lru_tracker.try_cull(Self::LRU_EVICTION_MAX_BATCH_SIZE) { + tokio::task::yield_now().await; + } } - let mut new_file = self - .create_file(&self.shared.path_for(new_entry.fid)) - .await?; + let path = self.shared.path_for(new_entry.fid); + let mut new_file = self.create_file(&path).await?; + let mut guard = FileGuard { path: Some(path) }; new_file.write_all(&value).await?; let mut size_delta: isize = new_entry.size_bytes.cast_signed(); - // Now we insert the new file ID into the map, and get the old file ID if it exists. - if let Some(old_entry) = self.shared.map.upsert_async(*key, new_entry).await { + // Register the file in the map. After this point, the map owns the file and the guard + // must not delete it. + let old_entry = self.shared.map.upsert_async(*key, new_entry).await; + guard.path = None; + + if let Some(old_entry) = old_entry { // If there was an old file ID, we need to delete the old file and notify the LRU // tracker that the key was accessed. - self.lru_tracker.access(*key); + self.lru_tracker.access(*key).await; size_delta -= old_entry.size_bytes.cast_signed(); // TODO(markovejnovic): Could stack allocate the path. @@ -392,7 +415,8 @@ impl } } else { self.lru_tracker - .insert(*key, DeleterCtx { fid: new_entry.fid }); + .insert(*key, DeleterCtx { fid: new_entry.fid }) + .await; } if size_delta > 0 { From c741f6c4cca11ad05b89d1d1091844b6bb8f04de Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Sun, 15 Feb 2026 20:05:45 -0800 Subject: [PATCH 11/15] Fix upsert-to-LRU non-atomicity causing permanently un-evictable entries The upsert_async + LRU notification sequence was not atomic: concurrent inserts or evictions could interleave between the HashMap mutation and the LRU tracker message, causing stale DeleterCtx{fid} values that make remove_if_async silently fail and leave entries permanently un-evictable. Fix by switching to entry_async (holds the bucket lock while allocating a global monotonic version), then using a sync try_send-based upsert() for the LRU notification so it cannot be lost to task cancellation. Key changes: - Add Versioned trait to lru.rs for version-based message deduplication - Replace Message::Inserted with Message::Upserted; worker deduplicates by comparing versions and gracefully handles missing keys on Accessed - Replace async insert() with sync upsert() using try_send + spawn fallback, making the LRU notification non-cancellable - FileCache::insert uses entry_async to atomically read old entry and allocate version under the bucket lock; all post-lock operations (guard defuse, LRU upsert, size accounting) are synchronous - Add check-cfg for loom and wire up the sync shim module to enable future loom-based concurrency testing --- Cargo.lock | 35 ++++++++++ Cargo.toml | 4 ++ lib/cache/eviction/lru.rs | 127 +++++++++++++++++++----------------- lib/cache/fcache.rs | 132 +++++++++++++++++++++----------------- lib/lib.rs | 1 + lib/sync/atomic.rs | 13 ++++ lib/sync/mod.rs | 67 +++++++++++++++++++ 7 files changed, 259 insertions(+), 120 deletions(-) create mode 100644 lib/sync/atomic.rs create mode 100644 lib/sync/mod.rs diff --git a/Cargo.lock b/Cargo.lock index ebbe571..7cbaf5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -718,6 +718,21 @@ dependencies = [ "thread_local", ] +[[package]] +name = "generator" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows-link", + "windows-result", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -777,6 +792,7 @@ dependencies = [ "http", "inquire", "libc", + "loom", "mesa-dev", "nix", "num-traits", @@ -1304,6 +1320,19 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -2119,6 +2148,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index b6fe250..6f777d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,9 @@ opentelemetry-otlp = { version = "0.29", default-features = false, features = [" tracing-opentelemetry = { version = "0.30" } hashlink = "0.11.0" +[target.'cfg(loom)'.dependencies] +loom = "0.7" + [features] default = [] staging = [] @@ -60,6 +63,7 @@ staging = [] vergen-gitcl = { version = "1", features = [] } [lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } unsafe_code = "allow" missing_docs = "warn" unreachable_pub = "allow" diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index e8f3917..7a33cde 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -1,16 +1,20 @@ //! Implements the LRU eviction policy. -use std::{ - future::Future, - hash::Hash, - sync::{ - Arc, OnceLock, - atomic::{AtomicI64, AtomicUsize}, - }, -}; +use std::{future::Future, hash::Hash}; + +use crate::sync::{Arc, OnceLock, atomic, atomic::AtomicI64, atomic::AtomicUsize}; use hashlink::LinkedHashMap; -use tokio::{sync::mpsc::Receiver, task::JoinHandle}; +use tokio::{ + sync::mpsc::{Receiver, error::TrySendError}, + task::JoinHandle, +}; + +/// Types that carry a monotonic version for deduplication of out-of-order messages. +pub trait Versioned { + /// Returns the monotonic version of this value. + fn version(&self) -> u64; +} /// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete /// keys when they are evicted. @@ -28,8 +32,8 @@ enum Message { Accessed(K, usize), /// Request an eviction set of the given size. Evict(u32), - /// Notify the LRU eviction tracker that a given key was inserted. - Inserted(K, C), + /// Notify the LRU eviction tracker that a key was inserted or overwritten. + Upserted(K, C), } /// Tracks in-flight eviction batches and individual deletions using a single packed `AtomicI64`. @@ -60,36 +64,33 @@ impl DeletionIndicator { /// side *before* the `Evict` message is sent into the channel. fn submit_batch(&self) { self.underlying - .fetch_add(1 << 32, std::sync::atomic::Ordering::Relaxed); + .fetch_add(1 << 32, atomic::Ordering::Relaxed); } /// Undo a `submit_batch` call. Used when the channel `try_send` fails after we already /// incremented the pending-batch counter. fn undo_submit_batch(&self) { self.underlying - .fetch_sub(1 << 32, std::sync::atomic::Ordering::Relaxed); + .fetch_sub(1 << 32, atomic::Ordering::Relaxed); } /// Called by the worker when it begins processing an eviction batch. Atomically decrements /// the pending-batch counter (upper half) by 1 and increments the active-deletion counter /// (lower half) by `count`. fn process_batch(&self, count: u32) { - self.underlying.fetch_add( - i64::from(count) - (1 << 32), - std::sync::atomic::Ordering::Relaxed, - ); + self.underlying + .fetch_add(i64::from(count) - (1 << 32), atomic::Ordering::Relaxed); } /// Called by a spawned deletion task when it finishes deleting one key. fn observe_deletion(&self) { - self.underlying - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + self.underlying.fetch_sub(1, atomic::Ordering::Relaxed); } /// Returns `true` if there are any pending batches (submitted but not yet processed by the /// worker). Used by producers to avoid enqueuing redundant eviction requests. fn have_pending_batches(&self) -> bool { - self.underlying.load(std::sync::atomic::Ordering::Relaxed) >= 1 << 32 + self.underlying.load(atomic::Ordering::Relaxed) >= 1 << 32 } } @@ -107,7 +108,7 @@ struct LruProcessingTask> { shared: Arc, } -impl> +impl> LruProcessingTask { fn new(deleter: D, receiver: Receiver>, shared: Arc) -> Self { @@ -146,17 +147,18 @@ impl> < self .shared .eviction_generation - .load(std::sync::atomic::Ordering::Relaxed) + .load(atomic::Ordering::Relaxed) { - // This is a ghost access. Happens when one client sends an eviction, but other - // clients add accesses to the same keys, before our worker thread had a chance - // to clean up. - // - // We're looking at an access in the "future". + // This is a ghost access. Happens when one client sends an eviction, but + // other clients add accesses to the same keys, before our worker thread had + // a chance to clean up. return true; } - self.reposition_existing_key(k); + // The key may have been evicted between the access and this message arriving. + if let Some(entry) = self.ordered_key_map.remove(&k) { + self.ordered_key_map.insert(k, entry); + } } Message::Evict(max_count) => { // We got an eviction notice. Bump the eviction generation to indicate that all @@ -164,7 +166,7 @@ impl> // comment for an explanation of why we need to do this. self.shared .eviction_generation - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + .fetch_add(1, atomic::Ordering::Relaxed); // Before we send off the eviction set, we actually need to remove the evicted // keys from the pending queue. Read the following for an explanation. // @@ -204,12 +206,12 @@ impl> // so the best we can do is simply mark the stupid keys as "evicted" and ignore // any accesses from these keys until they're marked as "inserted" again. // - // This is what the generation counter does -- it allows us to track which messages - // are considered "stale". There are some problems with the generation counter, - // however. We drop all messages that are "generationally old". Under high - // contention, this results in the eviction policy acting like a fuzzy LRU policy, - // rather than a strict LRU policy, but for the purposes of this cache, this is an - // acceptable tradeoff. + // This is what the generation counter does -- it allows us to track which + // messages are considered "stale". There are some problems with the generation + // counter, however. We drop all messages that are "generationally old". Under + // high contention, this results in the eviction policy acting like a fuzzy LRU + // policy, rather than a strict LRU policy, but for the purposes of this cache, + // this is an acceptable tradeoff. // // TODO(markovejnovic): Make this a strict LRU. // @@ -229,41 +231,32 @@ impl> self.shared.active_deletions.process_batch(take_count); for _ in 0..take_count { - let Some(e) = self.ordered_key_map.pop_front() else { + let Some((key, ctx)) = self.ordered_key_map.pop_front() else { break; }; let mut deleter = self.deleter.clone(); let shared_clone = Arc::clone(&self.shared); tokio::spawn(async move { - deleter.delete(e.0, e.1).await; + deleter.delete(key, ctx).await; shared_clone.active_deletions.observe_deletion(); }); } } } - Message::Inserted(k, ctx) => { - debug_assert!( - !self.ordered_key_map.contains_key(&k), - "key must not already exist in the ordered set when inserting" - ); - + Message::Upserted(k, ctx) => { + if let Some(existing) = self.ordered_key_map.get(&k) + && ctx.version() < existing.version() + { + // Stale message from a previous incarnation; drop it. + return true; + } + self.ordered_key_map.remove(&k); self.ordered_key_map.insert(k, ctx); } } true } - - fn reposition_existing_key(&mut self, key: K) { - debug_assert!( - self.ordered_key_map.contains_key(&key), - "key must exist in the ordered set before repositioning" - ); - - if let Some(c) = self.ordered_key_map.remove(&key) { - self.ordered_key_map.insert(key, c); - } - } } #[derive(Debug)] @@ -283,7 +276,9 @@ pub struct LruEvictionTracker { worker_state: Arc, } -impl LruEvictionTracker { +impl + LruEvictionTracker +{ /// Spawn a new LRU eviction tracker with the given deleter and channel size. pub fn spawn>(deleter: D, channel_size: usize) -> Self { let (tx, rx) = tokio::sync::mpsc::channel(channel_size); @@ -304,11 +299,23 @@ impl LruEvictionTracker } } - /// Notify the LRU eviction tracker that the given key was inserted. + /// Notify the LRU tracker that a key was inserted or overwritten. /// - /// You MUST call this method for every new key that is inserted into the cache. - pub async fn insert(&self, key: K, ctx: C) { - self.send_msg(Message::Inserted(key, ctx)).await; + /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the + /// notification cannot be lost to task cancellation. + pub fn upsert(&self, key: K, ctx: C) { + match self + .worker_message_sender + .try_send(Message::Upserted(key, ctx)) + { + Ok(()) | Err(TrySendError::Closed(_)) => {} + Err(TrySendError::Full(msg)) => { + let sender = self.worker_message_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } } /// Notify the LRU eviction tracker that the given key was accessed. @@ -319,7 +326,7 @@ impl LruEvictionTracker key, self.worker_state .eviction_generation - .load(std::sync::atomic::Ordering::Relaxed), + .load(atomic::Ordering::Relaxed), )) .await; } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 13b35fa..2cfb58a 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -4,14 +4,15 @@ use std::{ fmt::Debug, hash::Hash, path::{Path, PathBuf}, - sync::{Arc, atomic::AtomicUsize}, }; +use crate::sync::{Arc, atomic, atomic::AtomicU64, atomic::AtomicUsize}; + use tracing::error; use crate::{ cache::{ - eviction::lru::{Deleter, LruEvictionTracker}, + eviction::lru::{Deleter, LruEvictionTracker, Versioned}, traits::{AsyncReadableCache, AsyncWritableCache}, }, io, @@ -45,7 +46,7 @@ struct FileCacheShared { impl FileCacheShared { fn size(&self) -> usize { - self.size_bytes.load(std::sync::atomic::Ordering::Relaxed) + self.size_bytes.load(atomic::Ordering::Relaxed) } fn path_for(&self, fid: usize) -> PathBuf { @@ -66,9 +67,16 @@ impl FileCacheShared { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] struct DeleterCtx { fid: usize, + version: u64, +} + +impl Versioned for DeleterCtx { + fn version(&self) -> u64 { + self.version + } } #[derive(Clone)] @@ -97,7 +105,7 @@ impl Deleter for Lru self.shared .size_bytes - .fetch_sub(entry.1.size_bytes, std::sync::atomic::Ordering::Relaxed); + .fetch_sub(entry.1.size_bytes, atomic::Ordering::Relaxed); } } } @@ -160,6 +168,10 @@ pub struct FileCache { /// LRU eviction tracker. This is used to track the least recently used keys in the cache, and /// to evict keys when necessary. lru_tracker: LruEvictionTracker, + + /// Global monotonic counter for per-key versioning. Incremented under the scc bucket lock + /// to guarantee that versions are strictly monotonically increasing per key. + version_counter: AtomicU64, } impl FileCache { @@ -253,6 +265,7 @@ impl FileCache { LruEntryDeleter::new(shared), Self::MAX_LRU_TRACKER_CHANNEL_SIZE, ), + version_counter: AtomicU64::new(0), }) } @@ -346,21 +359,13 @@ impl AsyncWritableCache, CacheWriteError> for FileCache { async fn insert(&self, key: &K, value: Vec) -> Result<(), CacheWriteError> { - // Inserts are tricky. The first thing we'll do is find a new file handle for the new - // value. - let new_entry = CacheMapEntry { - fid: self - .file_generator - .fetch_add(1, std::sync::atomic::Ordering::Relaxed), - size_bytes: value.len(), - }; + let new_fid = self.file_generator.fetch_add(1, atomic::Ordering::Relaxed); + let new_size = value.len(); - while self.shared.size() + new_entry.size_bytes > self.max_size_bytes { + while self.shared.size() + new_size > self.max_size_bytes { if self.shared.size() == 0 { - // This means that the new entry is larger than the entire cache size limit. In - // this case, we have no choice but to just insert the entry and let the cache - // temporarily exceed the size limit. We will try to evict entries as soon as - // possible, but in the meantime, we need to at least insert this entry. + // The new entry is larger than the entire cache size limit. Insert it anyway + // and let the cache temporarily exceed the limit. break; } @@ -368,70 +373,77 @@ impl // realistically, I think I could've gotten away with just a plain Notify. It is what // it is now. - // The cache doesn't have any space for this new entry, so we need to evict some - // entries before we can insert this new entry. - // Before we can do that, we need to make sure there are no other pending culls. If we - // let multiple culls happen at the same time, we risk draining a lot more entries from - // the cache than necessary, which results in a regression. if self.lru_tracker.have_pending_culls() { - // If there are any pending culls, then we need to just wait. // TODO(markovejnovic): This could be nicer and maybe starve the CPU less. Chances // are we actually need to yield to the LRU tracker task. tokio::task::yield_now().await; continue; } - // There are no culls in progress, but the cache is still too full for this new entry, - // which means we need to evict some entries. if !self.lru_tracker.try_cull(Self::LRU_EVICTION_MAX_BATCH_SIZE) { tokio::task::yield_now().await; } } - let path = self.shared.path_for(new_entry.fid); + // Write file to disk. + let path = self.shared.path_for(new_fid); let mut new_file = self.create_file(&path).await?; let mut guard = FileGuard { path: Some(path) }; new_file.write_all(&value).await?; - let mut size_delta: isize = new_entry.size_bytes.cast_signed(); - - // Register the file in the map. After this point, the map owns the file and the guard - // must not delete it. - let old_entry = self.shared.map.upsert_async(*key, new_entry).await; - guard.path = None; - - if let Some(old_entry) = old_entry { - // If there was an old file ID, we need to delete the old file and notify the LRU - // tracker that the key was accessed. - self.lru_tracker.access(*key).await; - size_delta -= old_entry.size_bytes.cast_signed(); - - // TODO(markovejnovic): Could stack allocate the path. - // Note that the file already may be deleted at this point -- the LRU tracker deleter - // may have already deleted this file, so if the file doesn't exist, that's already - // fine with us. - if let Err(e) = self.shared.delete_entry_from_disk_async(&old_entry).await { - error!(error = ?e, key = ?key, "failed to delete old cache entry from disk"); + // Use entry_async to lock the bucket, then allocate version under the lock. + // This ensures version monotonicity per key matches the actual mutation order. + let (old_entry, new_version) = match self.shared.map.entry_async(*key).await { + scc::hash_map::Entry::Occupied(mut o) => { + let old = *o.get(); + let v = self.version_counter.fetch_add(1, atomic::Ordering::Relaxed); + *o.get_mut() = CacheMapEntry { + fid: new_fid, + size_bytes: new_size, + }; + (Some(old), v) } - } else { - self.lru_tracker - .insert(*key, DeleterCtx { fid: new_entry.fid }) - .await; - } + scc::hash_map::Entry::Vacant(vacant) => { + let v = self.version_counter.fetch_add(1, atomic::Ordering::Relaxed); + vacant.insert_entry(CacheMapEntry { + fid: new_fid, + size_bytes: new_size, + }); + (None, v) + } + }; + // Bucket lock released here. + guard.path = None; + // LRU notification (sync via try_send, non-cancellable). + self.lru_tracker.upsert( + *key, + DeleterCtx { + fid: new_fid, + version: new_version, + }, + ); + + // Size accounting (sync atomic, non-cancellable). + let size_delta: isize = new_size.cast_signed() + - old_entry + .as_ref() + .map_or(0isize, |e| e.size_bytes.cast_signed()); if size_delta > 0 { - self.shared.size_bytes.fetch_add( - size_delta.cast_unsigned(), - std::sync::atomic::Ordering::Relaxed, - ); + self.shared + .size_bytes + .fetch_add(size_delta.cast_unsigned(), atomic::Ordering::Relaxed); } else if size_delta < 0 { - self.shared.size_bytes.fetch_sub( - size_delta.unsigned_abs(), - std::sync::atomic::Ordering::Relaxed, - ); + self.shared + .size_bytes + .fetch_sub(size_delta.unsigned_abs(), atomic::Ordering::Relaxed); + } + + // Deferrable: delete old file (safe to cancel — file is orphaned). + if let Some(old_entry) = old_entry { + drop(self.shared.delete_entry_from_disk_async(&old_entry).await); } - // Epic, we did it. Ok(()) } } diff --git a/lib/lib.rs b/lib/lib.rs index 058aae4..390f786 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -7,3 +7,4 @@ /// Caching primitives for git-fs. pub mod cache; pub mod io; +pub mod sync; diff --git a/lib/sync/atomic.rs b/lib/sync/atomic.rs new file mode 100644 index 0000000..2c77deb --- /dev/null +++ b/lib/sync/atomic.rs @@ -0,0 +1,13 @@ +//! Re-export of atomics, so that we can use loom's atomics when testing with loom. + +#[cfg(loom)] +pub use loom::sync::atomic::{ + AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize, AtomicU8, AtomicU16, + AtomicU32, AtomicU64, AtomicUsize, Ordering, +}; + +#[cfg(not(loom))] +pub use std::sync::atomic::{ + AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize, AtomicU8, AtomicU16, + AtomicU32, AtomicU64, AtomicUsize, Ordering, +}; diff --git a/lib/sync/mod.rs b/lib/sync/mod.rs new file mode 100644 index 0000000..32ce948 --- /dev/null +++ b/lib/sync/mod.rs @@ -0,0 +1,67 @@ +//! Synchronization primitives. +//! +//! Shims between loom and std synchronization primitives. +pub mod atomic; + +#[cfg(loom)] +pub use loom::sync::{Arc, Condvar, Mutex, Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +#[cfg(loom)] +pub struct OnceLock { + once: Once, +} + +#[cfg(loom)] +unsafe impl Send for OnceLock {} + +#[cfg(loom)] +unsafe impl Sync for OnceLock {} + +#[cfg(loom)] +impl OnceLock { + pub fn new() -> Self { + Self { + once: Once::new(), + value: UnsafeCell::new(None), + } + } + + pub fn get(&self) -> Option<&T> { + if self.once.is_completed() { + // Safety: once is completed, so value is initialized + // and will never be written to again + unsafe { (*self.value.with(|ptr| ptr)).as_ref() } + } else { + None + } + } + + pub fn get_or_init(&self, f: impl FnOnce() -> T) -> &T { + self.once.call_once(|| unsafe { + self.value.with_mut(|ptr| { + *ptr = Some(f()); + }); + }); + // Safety: call_once guarantees initialization is complete + unsafe { (*self.value.with(|ptr| ptr)).as_ref().unwrap() } + } + + pub fn set(&self, value: T) -> Result<(), T> { + let mut value = Some(value); + self.once.call_once(|| { + let val = value.take().unwrap(); + unsafe { + self.value.with_mut(|ptr| { + *ptr = Some(val); + }); + } + }); + match value { + None => Ok(()), // we consumed it — success + Some(v) => Err(v), // already initialized + } + } +} + +#[cfg(not(loom))] +pub use std::sync::{Arc, Condvar, Mutex, OnceLock, RwLock, RwLockReadGuard, RwLockWriteGuard}; From abfdc5eb8b79d8ff0c69e70df5a0b53419736d06 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Sun, 15 Feb 2026 20:43:23 -0800 Subject: [PATCH 12/15] Fix three concurrency bugs in file cache - DeletionIndicator::have_pending_work now checks != 0 (was >= 1<<32) so the eviction loop waits for in-flight deletions, not just pending batches, preventing cascading over-eviction. A DeletionGuard drop guard on spawned deletion tasks ensures observe_deletion() fires even on cancellation or panic. - Size accounting moved inside the scc bucket lock in insert() to prevent transient AtomicUsize underflow when concurrent inserts to the same key interleave their deltas. - access() changed from async send().await (cancellable, panics on channel close) to sync try_send + tokio::spawn fallback, matching the existing upsert() pattern. Removes the unreachable!() panic path during runtime shutdown. --- lib/cache/eviction/lru.rs | 46 ++++++++++++++++++++++++--------------- lib/cache/fcache.rs | 36 ++++++++++++++++-------------- 2 files changed, 48 insertions(+), 34 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index 7a33cde..ef052d6 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -87,10 +87,10 @@ impl DeletionIndicator { self.underlying.fetch_sub(1, atomic::Ordering::Relaxed); } - /// Returns `true` if there are any pending batches (submitted but not yet processed by the - /// worker). Used by producers to avoid enqueuing redundant eviction requests. - fn have_pending_batches(&self) -> bool { - self.underlying.load(atomic::Ordering::Relaxed) >= 1 << 32 + /// Returns `true` if there is any eviction work in progress — either batches waiting to be + /// processed by the worker, or individual deletions still in flight. + fn have_pending_work(&self) -> bool { + self.underlying.load(atomic::Ordering::Relaxed) != 0 } } @@ -237,8 +237,16 @@ impl); + impl Drop for DeletionGuard { + fn drop(&mut self) { + self.0.active_deletions.observe_deletion(); + } + } + let _guard = DeletionGuard(shared_clone); deleter.delete(key, ctx).await; - shared_clone.active_deletions.observe_deletion(); }); } } @@ -320,15 +328,24 @@ impl /// Notify the LRU eviction tracker that the given key was accessed. /// - /// You MUST call this method for every read or update to a key. - pub async fn access(&self, key: K) { - self.send_msg(Message::Accessed( + /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the + /// notification cannot be lost to task cancellation. + pub fn access(&self, key: K) { + let msg = Message::Accessed( key, self.worker_state .eviction_generation .load(atomic::Ordering::Relaxed), - )) - .await; + ); + match self.worker_message_sender.try_send(msg) { + Ok(()) | Err(TrySendError::Closed(_)) => {} + Err(TrySendError::Full(msg)) => { + let sender = self.worker_message_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } } /// Try to cull the least recently used keys with the given deletion method. @@ -358,13 +375,6 @@ impl /// Check whether there are culls that are already scheduled or actively in progress. #[must_use] pub fn have_pending_culls(&self) -> bool { - self.worker_state.active_deletions.have_pending_batches() - } - - /// Send a message to the worker. This is a helper method to reduce code duplication. - async fn send_msg(&self, message: Message) { - if self.worker_message_sender.send(message).await.is_err() { - unreachable!(); - } + self.worker_state.active_deletions.have_pending_work() } } diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 2cfb58a..8974783 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -340,7 +340,7 @@ impl AsyncReadableCache // Use entry_async to lock the bucket, then allocate version under the lock. // This ensures version monotonicity per key matches the actual mutation order. + // Size accounting is also done under the lock to prevent transient underflow + // when concurrent inserts to the same key interleave their deltas. let (old_entry, new_version) = match self.shared.map.entry_async(*key).await { scc::hash_map::Entry::Occupied(mut o) => { let old = *o.get(); @@ -401,6 +403,18 @@ impl fid: new_fid, size_bytes: new_size, }; + + let size_delta: isize = new_size.cast_signed() - old.size_bytes.cast_signed(); + if size_delta > 0 { + self.shared + .size_bytes + .fetch_add(size_delta.cast_unsigned(), atomic::Ordering::Relaxed); + } else if size_delta < 0 { + self.shared + .size_bytes + .fetch_sub(size_delta.unsigned_abs(), atomic::Ordering::Relaxed); + } + (Some(old), v) } scc::hash_map::Entry::Vacant(vacant) => { @@ -409,6 +423,11 @@ impl fid: new_fid, size_bytes: new_size, }); + + self.shared + .size_bytes + .fetch_add(new_size, atomic::Ordering::Relaxed); + (None, v) } }; @@ -424,21 +443,6 @@ impl }, ); - // Size accounting (sync atomic, non-cancellable). - let size_delta: isize = new_size.cast_signed() - - old_entry - .as_ref() - .map_or(0isize, |e| e.size_bytes.cast_signed()); - if size_delta > 0 { - self.shared - .size_bytes - .fetch_add(size_delta.cast_unsigned(), atomic::Ordering::Relaxed); - } else if size_delta < 0 { - self.shared - .size_bytes - .fetch_sub(size_delta.unsigned_abs(), atomic::Ordering::Relaxed); - } - // Deferrable: delete old file (safe to cancel — file is orphaned). if let Some(old_entry) = old_entry { drop(self.shared.delete_entry_from_disk_async(&old_entry).await); From 82f353a70fcc728b5c44dcca208f7896f8ecdb1b Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Sun, 15 Feb 2026 21:05:25 -0800 Subject: [PATCH 13/15] Avoid an infinite loop --- lib/cache/fcache.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 8974783..2140ce6 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -197,6 +197,11 @@ impl FileCache { // See usage for reasoning on why this is necessary. const MAX_READ_RETRY_COUNT: usize = 8; + // The maximum number of eviction loop iterations before giving up and proceeding with the + // insert. Prevents livelock when the LRU worker's ordered map is empty (e.g., Upserted + // messages haven't been delivered yet) and try_cull succeeds but evicts nothing. + const MAX_EVICTION_ATTEMPTS: u32 = 4; + /// Try to create a new file cache at the given path. /// /// @@ -362,6 +367,7 @@ impl let new_fid = self.file_generator.fetch_add(1, atomic::Ordering::Relaxed); let new_size = value.len(); + let mut eviction_attempts = 0u32; while self.shared.size() + new_size > self.max_size_bytes { if self.shared.size() == 0 { // The new entry is larger than the entire cache size limit. Insert it anyway @@ -369,13 +375,15 @@ impl break; } - // TODO(markovejnovic): This whole spinlock situation sounded better in my head, but - // realistically, I think I could've gotten away with just a plain Notify. It is what - // it is now. + if eviction_attempts >= Self::MAX_EVICTION_ATTEMPTS { + // We've tried enough times. The cache is over budget but max_size_bytes is a + // soft hint -- proceed with the insert and let future inserts drive further + // eviction. + break; + } + eviction_attempts += 1; if self.lru_tracker.have_pending_culls() { - // TODO(markovejnovic): This could be nicer and maybe starve the CPU less. Chances - // are we actually need to yield to the LRU tracker task. tokio::task::yield_now().await; continue; } From 5dab7ead00ee41943b420b70dbbe7432850118aa Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Sun, 15 Feb 2026 22:31:28 -0800 Subject: [PATCH 14/15] remove broken code --- Cargo.lock | 35 --------------- Cargo.toml | 5 +-- lib/cache/eviction/lru.rs | 89 ++++----------------------------------- lib/cache/fcache.rs | 5 ++- lib/lib.rs | 1 - lib/sync/atomic.rs | 13 ------ lib/sync/mod.rs | 67 ----------------------------- 7 files changed, 13 insertions(+), 202 deletions(-) delete mode 100644 lib/sync/atomic.rs delete mode 100644 lib/sync/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 7cbaf5b..ebbe571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -718,21 +718,6 @@ dependencies = [ "thread_local", ] -[[package]] -name = "generator" -version = "0.8.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" -dependencies = [ - "cc", - "cfg-if", - "libc", - "log", - "rustversion", - "windows-link", - "windows-result", -] - [[package]] name = "getrandom" version = "0.2.17" @@ -792,7 +777,6 @@ dependencies = [ "http", "inquire", "libc", - "loom", "mesa-dev", "nix", "num-traits", @@ -1320,19 +1304,6 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -[[package]] -name = "loom" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" -dependencies = [ - "cfg-if", - "generator", - "scoped-tls", - "tracing", - "tracing-subscriber", -] - [[package]] name = "lru-slab" version = "0.1.2" @@ -2148,12 +2119,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 6f777d9..399c11b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,9 +52,6 @@ opentelemetry-otlp = { version = "0.29", default-features = false, features = [" tracing-opentelemetry = { version = "0.30" } hashlink = "0.11.0" -[target.'cfg(loom)'.dependencies] -loom = "0.7" - [features] default = [] staging = [] @@ -63,7 +60,7 @@ staging = [] vergen-gitcl = { version = "1", features = [] } [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } +unexpected_cfgs = "warn" unsafe_code = "allow" missing_docs = "warn" unreachable_pub = "allow" diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index ef052d6..b48f666 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -2,7 +2,10 @@ use std::{future::Future, hash::Hash}; -use crate::sync::{Arc, OnceLock, atomic, atomic::AtomicI64, atomic::AtomicUsize}; +use std::sync::{ + Arc, OnceLock, + atomic::{self, AtomicI64}, +}; use hashlink::LinkedHashMap; use tokio::{ @@ -29,7 +32,7 @@ pub trait Deleter: Send + Clone + 'static { #[derive(Debug, Clone, Copy)] enum Message { /// Notify the LRU eviction tracker that the given key was accessed. - Accessed(K, usize), + Accessed(K), /// Request an eviction set of the given size. Evict(u32), /// Notify the LRU eviction tracker that a key was inserted or overwritten. @@ -142,82 +145,14 @@ impl) -> bool { match message { - Message::Accessed(k, ev_gen) => { - if ev_gen - < self - .shared - .eviction_generation - .load(atomic::Ordering::Relaxed) - { - // This is a ghost access. Happens when one client sends an eviction, but - // other clients add accesses to the same keys, before our worker thread had - // a chance to clean up. - return true; - } - + Message::Accessed(k) => { // The key may have been evicted between the access and this message arriving. + // If it was, the remove returns None and this is a no-op. if let Some(entry) = self.ordered_key_map.remove(&k) { self.ordered_key_map.insert(k, entry); } } Message::Evict(max_count) => { - // We got an eviction notice. Bump the eviction generation to indicate that all - // accesses after this point are considered in the "future". Read the subsequent - // comment for an explanation of why we need to do this. - self.shared - .eviction_generation - .fetch_add(1, atomic::Ordering::Relaxed); - // Before we send off the eviction set, we actually need to remove the evicted - // keys from the pending queue. Read the following for an explanation. - // - // The client may access a key AFTER an eviction notice. Until the eviction - // notice is processed by the client, the client will have the key available to - // it. - // - // Consequently, the client may very well access the key after it sent out an - // eviction notice, but before the eviction notice is processed. This will - // result in the key being added as an access message after the eviction - // notice, but before the eviction set is sent to the client. - // - // The problem is, the LruProcessingTask has no way of knowing whether the - // message the client sent is a stale message, or a legit message. - // - // Consider the following queue state in between two loop iterations of the - // LruProcessingTask: - // A -- access - // B -- insert - // E -- eviction request - // C -- the client thread which made the request - // - // A1 A2 A3 E1 A1 B1 -- after this point, no clients can access A1 - // C1 C1 C1 C1 C2 C1 ---------------------------------------------> time - // - // The scenario here is: - // - A1, A2, A3 are used by the clients. - // - C1 wants to add B1, but it doesn't have enough space, so it sends an - // eviction request. - // - In parallel, C2 accesses A1, which is completely legal. - // - // The result is that our queue has A1, even after we sent out the eviction - // request, and we have no way of knowing whether A1 is a stale message or not. - // - // To prevent this whole "is this thing stale or not" problem, we need to - // "erase" keys evicted in the future. Unfortunately, you can't mutate queues, - // so the best we can do is simply mark the stupid keys as "evicted" and ignore - // any accesses from these keys until they're marked as "inserted" again. - // - // This is what the generation counter does -- it allows us to track which - // messages are considered "stale". There are some problems with the generation - // counter, however. We drop all messages that are "generationally old". Under - // high contention, this results in the eviction policy acting like a fuzzy LRU - // policy, rather than a strict LRU policy, but for the purposes of this cache, - // this is an acceptable tradeoff. - // - // TODO(markovejnovic): Make this a strict LRU. - // - // We cannot mutate the queue to mark messages as stale, so the best we can do - // is mark the keys as "evicted" and ignore any accesses from these keys until - // they're marked as "inserted" again. { // These integer casts are safe, since max_count is guaranteed to fit // within a u32, min(MAX_U32, MAX_USIZE) == MAX_U32. @@ -272,7 +207,6 @@ struct WorkerState { /// Packed atomic tracking pending eviction batches (upper 32 bits) and active deletions /// (lower 32 bits). See `DeletionIndicator` for the full protocol. active_deletions: DeletionIndicator, - eviction_generation: AtomicUsize, worker: OnceLock>, } @@ -293,7 +227,6 @@ impl let worker_state = Arc::new(WorkerState { active_deletions: DeletionIndicator::new(), - eviction_generation: AtomicUsize::new(0), worker: OnceLock::new(), }); let worker = LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); @@ -331,13 +264,7 @@ impl /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the /// notification cannot be lost to task cancellation. pub fn access(&self, key: K) { - let msg = Message::Accessed( - key, - self.worker_state - .eviction_generation - .load(atomic::Ordering::Relaxed), - ); - match self.worker_message_sender.try_send(msg) { + match self.worker_message_sender.try_send(Message::Accessed(key)) { Ok(()) | Err(TrySendError::Closed(_)) => {} Err(TrySendError::Full(msg)) => { let sender = self.worker_message_sender.clone(); diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index 2140ce6..a68cce2 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -6,7 +6,10 @@ use std::{ path::{Path, PathBuf}, }; -use crate::sync::{Arc, atomic, atomic::AtomicU64, atomic::AtomicUsize}; +use std::sync::{ + Arc, + atomic::{self, AtomicU64, AtomicUsize}, +}; use tracing::error; diff --git a/lib/lib.rs b/lib/lib.rs index 390f786..058aae4 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -7,4 +7,3 @@ /// Caching primitives for git-fs. pub mod cache; pub mod io; -pub mod sync; diff --git a/lib/sync/atomic.rs b/lib/sync/atomic.rs deleted file mode 100644 index 2c77deb..0000000 --- a/lib/sync/atomic.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! Re-export of atomics, so that we can use loom's atomics when testing with loom. - -#[cfg(loom)] -pub use loom::sync::atomic::{ - AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize, AtomicU8, AtomicU16, - AtomicU32, AtomicU64, AtomicUsize, Ordering, -}; - -#[cfg(not(loom))] -pub use std::sync::atomic::{ - AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize, AtomicU8, AtomicU16, - AtomicU32, AtomicU64, AtomicUsize, Ordering, -}; diff --git a/lib/sync/mod.rs b/lib/sync/mod.rs deleted file mode 100644 index 32ce948..0000000 --- a/lib/sync/mod.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! Synchronization primitives. -//! -//! Shims between loom and std synchronization primitives. -pub mod atomic; - -#[cfg(loom)] -pub use loom::sync::{Arc, Condvar, Mutex, Once, RwLock, RwLockReadGuard, RwLockWriteGuard}; - -#[cfg(loom)] -pub struct OnceLock { - once: Once, -} - -#[cfg(loom)] -unsafe impl Send for OnceLock {} - -#[cfg(loom)] -unsafe impl Sync for OnceLock {} - -#[cfg(loom)] -impl OnceLock { - pub fn new() -> Self { - Self { - once: Once::new(), - value: UnsafeCell::new(None), - } - } - - pub fn get(&self) -> Option<&T> { - if self.once.is_completed() { - // Safety: once is completed, so value is initialized - // and will never be written to again - unsafe { (*self.value.with(|ptr| ptr)).as_ref() } - } else { - None - } - } - - pub fn get_or_init(&self, f: impl FnOnce() -> T) -> &T { - self.once.call_once(|| unsafe { - self.value.with_mut(|ptr| { - *ptr = Some(f()); - }); - }); - // Safety: call_once guarantees initialization is complete - unsafe { (*self.value.with(|ptr| ptr)).as_ref().unwrap() } - } - - pub fn set(&self, value: T) -> Result<(), T> { - let mut value = Some(value); - self.once.call_once(|| { - let val = value.take().unwrap(); - unsafe { - self.value.with_mut(|ptr| { - *ptr = Some(val); - }); - } - }); - match value { - None => Ok(()), // we consumed it — success - Some(v) => Err(v), // already initialized - } - } -} - -#[cfg(not(loom))] -pub use std::sync::{Arc, Condvar, Mutex, OnceLock, RwLock, RwLockReadGuard, RwLockWriteGuard}; From b282b3bc83d727f818551197b120fcf58fbac0e7 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Sun, 15 Feb 2026 23:11:11 -0800 Subject: [PATCH 15/15] more cleanup --- lib/cache/eviction/lru.rs | 30 ++++++++++-------------------- lib/cache/fcache.rs | 8 ++++++-- lib/lib.rs | 1 - 3 files changed, 16 insertions(+), 23 deletions(-) diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs index b48f666..ff56b2c 100644 --- a/lib/cache/eviction/lru.rs +++ b/lib/cache/eviction/lru.rs @@ -3,15 +3,12 @@ use std::{future::Future, hash::Hash}; use std::sync::{ - Arc, OnceLock, + Arc, atomic::{self, AtomicI64}, }; use hashlink::LinkedHashMap; -use tokio::{ - sync::mpsc::{Receiver, error::TrySendError}, - task::JoinHandle, -}; +use tokio::sync::mpsc::{Receiver, error::TrySendError}; /// Types that carry a monotonic version for deduplication of out-of-order messages. pub trait Versioned { @@ -22,9 +19,9 @@ pub trait Versioned { /// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete /// keys when they are evicted. pub trait Deleter: Send + Clone + 'static { - /// Delete the given keys from the cache. The keys are guaranteed to be in the order of - /// eviction. You absolutely MUST delete the keys in the order they are given, otherwise the - /// LRU eviction tracker will get very confused and break. + /// Delete the given key from the cache. Deletions for different keys may be invoked + /// concurrently and in arbitrary order. Correctness is ensured by the caller through + /// a context-based guard (e.g. matching on a unique file ID), not by invocation order. fn delete(&mut self, key: K, ctx: Ctx) -> impl Future + Send; } @@ -123,16 +120,12 @@ impl>, - shared: Arc, - ) -> JoinHandle<()> { + fn spawn_task(deleter: D, receiver: Receiver>, shared: Arc) { // TODO(markovejnovic): This should have a best-effort drop. tokio::spawn(async move { let mut task = Self::new(deleter, receiver, shared); task.work().await; - }) + }); } async fn work(&mut self) { @@ -207,7 +200,6 @@ struct WorkerState { /// Packed atomic tracking pending eviction batches (upper 32 bits) and active deletions /// (lower 32 bits). See `DeletionIndicator` for the full protocol. active_deletions: DeletionIndicator, - worker: OnceLock>, } /// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and @@ -227,12 +219,10 @@ impl let worker_state = Arc::new(WorkerState { active_deletions: DeletionIndicator::new(), - worker: OnceLock::new(), }); - let worker = LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); - if worker_state.worker.set(worker).is_err() { - unreachable!("worker should only be set once, and we just set it"); - } + // The worker task is intentionally detached: it exits when the channel closes + // (i.e. when all senders, including the one held by LruEvictionTracker, are dropped). + LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); Self { worker_message_sender: tx, diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs index a68cce2..8d69f1a 100644 --- a/lib/cache/fcache.rs +++ b/lib/cache/fcache.rs @@ -396,10 +396,14 @@ impl } } - // Write file to disk. + // Write file to disk. The guard is constructed *before* the `.await` so that if + // this future is cancelled while `create_file` is in flight, the drop guard still + // cleans up the (potentially created) file. let path = self.shared.path_for(new_fid); + let mut guard = FileGuard { + path: Some(path.clone()), + }; let mut new_file = self.create_file(&path).await?; - let mut guard = FileGuard { path: Some(path) }; new_file.write_all(&value).await?; // Use entry_async to lock the bucket, then allocate version under the lock. diff --git a/lib/lib.rs b/lib/lib.rs index 058aae4..34ca807 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -2,7 +2,6 @@ #![feature(negative_impls)] #![feature(with_negative_coherence)] -#![feature(fn_traits)] /// Caching primitives for git-fs. pub mod cache;