diff --git a/java/README.md b/java/README.md index b49a489252..5f4c7f70ef 100644 --- a/java/README.md +++ b/java/README.md @@ -212,6 +212,16 @@ JVM engine connectors can be built using the Lance Java SDK. Here are some conne * [Flink Lance connector](https://github.com/lancedb/lance-flink) * [Trino Lance connector](https://github.com/lancedb/lance-trino) +## Configuration + +Environment variables read by the JNI layer at runtime: + +| Variable | Default | Effect | +|---|---|---| +| `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` | unset (sharing on) | Set to `1`/`true`/`yes` to give every default `Dataset.open` (no explicit `Session`) a fresh `ObjectStoreRegistry`. Disables the process-wide registry that coalesces concurrent cold builds for the same URI. | + +Default-open paths share an `ObjectStoreRegistry` so concurrent opens against the same URI reuse one `ObjectStore` instead of each rebuilding the credential chain and HTTP client. Bare-URI opens (empty storage options, no provider, no namespace commit handler) collapse onto a single cache entry per URI: the first caller's resolved default credentials become the credentials used by every subsequent caller for the lifetime of that `ObjectStore`. Callers that need cross-tenant isolation under bare URIs should either pass an explicit `Session`, supply tenant-distinguishing storage options, or set the variable above to opt out entirely. + ## Contributing From the codebase dimension, the lance project is a multiple-lang project. All Java-related code is located in the `java` directory. diff --git a/java/lance-jni/Cargo.lock b/java/lance-jni/Cargo.lock index 81afc1dae0..fe2918d3cb 100644 --- a/java/lance-jni/Cargo.lock +++ b/java/lance-jni/Cargo.lock @@ -4019,6 +4019,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "url", "uuid", ] diff --git a/java/lance-jni/Cargo.toml b/java/lance-jni/Cargo.toml index e391bc7b30..51b63d58c2 100644 --- a/java/lance-jni/Cargo.toml +++ b/java/lance-jni/Cargo.toml @@ -52,6 +52,11 @@ roaring = "0.11" prost-types = "0.14.1" chrono = "0.4.41" +[dev-dependencies] +# Test-only: builds Url values for the JNI default-open coalescing test in +# src/blocking_dataset.rs. Workspace pins this same version transitively. +url = "2.5.7" + [profile.dev] debug = "line-tables-only" incremental = false diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index d338ff0fc4..4af052971b 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -65,6 +65,155 @@ use std::time::{Duration, UNIX_EPOCH}; pub const NATIVE_DATASET: &str = "nativeDatasetHandle"; +/// Reads `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` once at first access and +/// caches the result. +/// +/// Only `"1" | "true" | "yes"` (case-insensitive) disable sharing. Empty / unset +/// keeps sharing enabled — `export VAR=` does NOT activate the escape hatch. +/// Logs a warning the first time the value is read when sharing is disabled, +/// and a separate warning when the variable is set to a value the parser does +/// not recognize. +/// +/// Backed by `AtomicU8` rather than `OnceLock` so unit tests can override +/// the resolved value via `DisableSharingTestGuard` without spawning a fresh +/// process per case. +/// +/// State encoding: +/// - 0 = uninitialized (read env on next access) +/// - 1 = sharing enabled +/// - 2 = sharing disabled +const SHARING_UNINIT: u8 = 0; +const SHARING_ENABLED: u8 = 1; +const SHARING_DISABLED: u8 = 2; + +static DISABLE_DEFAULT_REGISTRY_SHARING: std::sync::atomic::AtomicU8 = + std::sync::atomic::AtomicU8::new(SHARING_UNINIT); + +fn disable_default_registry_sharing() -> bool { + use std::sync::atomic::Ordering; + match DISABLE_DEFAULT_REGISTRY_SHARING.load(Ordering::Relaxed) { + SHARING_ENABLED => false, + SHARING_DISABLED => true, + _ => { + // First read: resolve from env, warn if appropriate, then publish. + // Under a race multiple threads may each observe UNINIT and emit + // duplicate `log::warn!` lines — the env read is idempotent so the + // resolved bool is the same. Cheap duplication is the price of an + // `AtomicU8` that tests can override. + let raw = std::env::var("LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING").ok(); + let disabled = match raw.as_deref().map(parse_disable_value) { + Some(Some(parsed)) => parsed, + Some(None) => { + // Set but not recognized — warn so a misspelled escape hatch + // (e.g. `=on`, `=y`) surfaces instead of silently keeping + // sharing enabled and looking like the env var "did nothing". + log::warn!( + "LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING={:?} is unrecognized; \ + keeping sharing enabled. Use 1/true/yes to disable.", + raw.as_deref().unwrap_or(""), + ); + false + } + None => false, + }; + if disabled { + log::warn!( + "LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING is set; JNI default-open will \ + use a fresh ObjectStoreRegistry per call (single-flight coalescing \ + disabled)." + ); + } + DISABLE_DEFAULT_REGISTRY_SHARING.store( + if disabled { + SHARING_DISABLED + } else { + SHARING_ENABLED + }, + Ordering::Relaxed, + ); + disabled + } + } +} + +/// Parse a `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` env value. +/// +/// Returns (after trim + ASCII lowercase): +/// - `Some(true)` for `1`/`true`/`yes` — operator opted out of sharing. +/// - `Some(false)` for empty, whitespace-only, `0`/`false`/`no` — explicit keep-default. +/// - `None` for anything else (typos, `on`, `y`, numeric noise) — caller +/// should warn and fall back to default to keep misconfigurations visible. +/// +/// Pure helper extracted from [`disable_default_registry_sharing`] so the +/// truthiness rules can be unit-tested without manipulating process env vars. +fn parse_disable_value(raw: &str) -> Option { + // `trim()` canonicalizes whitespace-only input to `""`, so the empty-string + // arm catches both literal `""` and ` `/`\n` etc. — keeping the doc + // contract honest with a single match arm. + match raw.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "yes" => Some(true), + "" | "0" | "false" | "no" => Some(false), + _ => None, + } +} + +/// Pick the `ObjectStoreRegistry` for the JNI default-open path. +/// +/// When sharing is enabled (the default), every default-open call reuses the +/// process-wide [`crate::GLOBAL_OBJECT_STORE_REGISTRY`] so concurrent cold +/// builds for the same URI coalesce on its single-flight. When the +/// `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` escape hatch is set, each call +/// gets a fresh registry — pre-PR isolation at the cost of single-flight. +/// +/// Extracted into a pure helper so the selection logic is unit-testable +/// without spinning up a JVM. +fn select_default_open_registry( + disable_sharing: bool, +) -> Arc { + if disable_sharing { + Arc::new(lance_io::object_store::ObjectStoreRegistry::default()) + } else { + crate::GLOBAL_OBJECT_STORE_REGISTRY.clone() + } +} + +#[cfg(test)] +/// RAII guard that restores `DISABLE_DEFAULT_REGISTRY_SHARING` to its prior +/// value on drop. Tests should always go through this guard so they cannot +/// leak state into sibling tests run in the same process. +/// +/// Each guard captures and restores its own snapshot, so nested guards compose +/// correctly: LIFO drop order means the outer guard's snapshot always wins, +/// returning the flag to the value seen before the outermost `set` call. +struct DisableSharingTestGuard { + prior: u8, +} + +#[cfg(test)] +impl DisableSharingTestGuard { + fn set(disabled: bool) -> Self { + use std::sync::atomic::Ordering; + let prior = DISABLE_DEFAULT_REGISTRY_SHARING.load(Ordering::Relaxed); + DISABLE_DEFAULT_REGISTRY_SHARING.store( + if disabled { + SHARING_DISABLED + } else { + SHARING_ENABLED + }, + Ordering::Relaxed, + ); + Self { prior } + } +} + +#[cfg(test)] +impl Drop for DisableSharingTestGuard { + fn drop(&mut self) { + use std::sync::atomic::Ordering; + DISABLE_DEFAULT_REGISTRY_SHARING.store(self.prior, Ordering::Relaxed); + } +} + impl FromJObjectWithEnv for JObject<'_> { fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result { let id = env.get_u32_from_method(self, "getId")?; @@ -175,6 +324,21 @@ impl BlockingDataset { storage_options_accessor: accessor, ..Default::default() }; + + // Default-open path: share the process-wide registry so concurrent + // opens for the same URI coalesce on single-flight; each call still + // gets its own `Session`. Tenant-isolation contract is documented on + // [`crate::GLOBAL_OBJECT_STORE_REGISTRY`]; opt out via + // `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING=1`. + let session = session.or_else(|| { + let registry = select_default_open_registry(disable_default_registry_sharing()); + Some(Arc::new(LanceSession::new( + index_cache_size_bytes as usize, + metadata_cache_size_bytes as usize, + registry, + ))) + }); + let params = ReadParams { index_cache_size_bytes: index_cache_size_bytes as usize, metadata_cache_size_bytes: metadata_cache_size_bytes as usize, @@ -3646,3 +3810,184 @@ fn inner_get_zonemap_stats<'local>( Ok(array_list) } + +#[cfg(test)] +mod default_open_registry_tests { + use super::*; + + /// Default-open path must reuse the process-wide + /// `GLOBAL_OBJECT_STORE_REGISTRY` so concurrent opens can coalesce on its + /// single-flight. Compare by `Arc::ptr_eq` — a fresh registry would have + /// a distinct allocation. + #[test] + fn select_default_open_registry_reuses_global_when_sharing_enabled() { + let registry = select_default_open_registry(false); + assert!( + Arc::ptr_eq(®istry, &crate::GLOBAL_OBJECT_STORE_REGISTRY), + "sharing-enabled path must hand back the GLOBAL_OBJECT_STORE_REGISTRY Arc" + ); + } + + /// Escape-hatch path must hand back a *fresh* registry per call so the + /// pre-PR isolation behavior is preserved when an operator opts out. + #[test] + fn select_default_open_registry_returns_fresh_when_disabled() { + let r1 = select_default_open_registry(true); + let r2 = select_default_open_registry(true); + assert!( + !Arc::ptr_eq(&r1, &crate::GLOBAL_OBJECT_STORE_REGISTRY), + "disabled path must NOT alias the global registry" + ); + assert!( + !Arc::ptr_eq(&r1, &r2), + "disabled path must allocate a new registry per call" + ); + } + + /// `disable_default_registry_sharing()` must honor the in-process override + /// hook. This guards against future refactors that re-introduce a once-cell + /// that bypasses the AtomicU8 state. + #[test] + fn disable_flag_honors_test_override() { + // RAII guard restores prior state at end-of-scope so this test cannot + // leak SHARING_ENABLED/DISABLED state into sibling tests that rely on + // env-driven first-read resolution. + let _g = DisableSharingTestGuard::set(true); + assert!(disable_default_registry_sharing()); + + let _g2 = DisableSharingTestGuard::set(false); + assert!(!disable_default_registry_sharing()); + } + + /// Truthy spellings (case + whitespace insensitive) must parse to `Some(true)`. + /// These are the values that opt the operator OUT of registry sharing — + /// getting any of them wrong silently re-enables coalescing in a config + /// that asked for isolation, so accept-list correctness is load-bearing. + #[test] + fn parse_disable_value_accepts_truthy() { + for raw in [ + "1", "true", "yes", "TRUE", "Yes", "True", " 1 ", "\ttrue\n", + ] { + assert_eq!( + parse_disable_value(raw), + Some(true), + "expected {raw:?} to parse as Some(true)", + ); + } + } + + /// Documented falsy spellings — empty, whitespace-only, `0`/`false`/`no` — + /// must parse to `Some(false)`. These are explicit keep-default opts; + /// distinguishing them from unrecognized noise lets the caller skip the + /// warning log. + #[test] + fn parse_disable_value_accepts_falsy() { + for raw in ["", " ", "\t\n", "0", "false", "no", "FALSE", " No "] { + assert_eq!( + parse_disable_value(raw), + Some(false), + "expected {raw:?} to parse as Some(false)", + ); + } + } + + /// Anything outside the accept-list — typos, `on`, `y`, numeric noise — + /// must parse to `None` so [`disable_default_registry_sharing`] can warn + /// the operator that their escape hatch is being silently ignored. + #[test] + fn parse_disable_value_returns_none_for_unrecognized() { + for raw in ["off", "disable", "2", "1.0", "true!", "y", "on", "enable"] { + assert_eq!( + parse_disable_value(raw), + None, + "expected {raw:?} to parse as None (unrecognized)", + ); + } + } + + /// End-to-end pin for PR#1's two-piece contract at the JNI boundary: + /// `select_default_open_registry` must hand back a registry whose + /// `get_store` coalesces concurrent cold builds for the same URI. + /// A regression in single-flight (dropped from `get_store`) would + /// surface here as `provider.builds > 1` or callers observing + /// different `Arc` instances. + /// + /// The complementary "sharing-on returns the process-wide registry" + /// invariant is covered by + /// `select_default_open_registry_reuses_global_when_sharing_enabled`, + /// so this test deliberately uses the sharing-disabled path to take a + /// fresh registry — registering a one-off provider on + /// `GLOBAL_OBJECT_STORE_REGISTRY` would leak that scheme's `Arc` for + /// the rest of the test binary's lifetime. + /// + /// Provider counts invocations and sleeps before returning so concurrent + /// callers reliably queue on the build lock before the first build + /// completes — without the delay the winner returns before any contention + /// develops and the test would pass even if single-flight were broken. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn jni_default_open_coalesces_concurrent_cold_builds() { + use std::sync::atomic::{AtomicU64, Ordering}; + use std::time::Duration; + + use lance_core::Result as LanceResult; + use lance_io::object_store::ObjectStore as LanceObjectStore; + use lance_io::object_store::providers::memory::MemoryStoreProvider; + use lance_io::object_store::{ObjectStoreParams, ObjectStoreProvider}; + use url::Url; + + #[derive(Debug, Default)] + struct CountingProvider { + builds: AtomicU64, + } + + #[async_trait::async_trait] + impl ObjectStoreProvider for CountingProvider { + async fn new_store( + &self, + base_path: Url, + params: &ObjectStoreParams, + ) -> LanceResult { + self.builds.fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(Duration::from_millis(20)).await; + MemoryStoreProvider.new_store(base_path, params).await + } + } + + let provider = Arc::new(CountingProvider::default()); + // Take a fresh registry via the sharing-disabled path so the + // test-only `pr1-jni-coalesce` provider never touches + // `GLOBAL_OBJECT_STORE_REGISTRY`. + let registry = select_default_open_registry(true); + registry.insert("pr1-jni-coalesce", provider.clone()); + + let url = Url::parse("pr1-jni-coalesce://x").unwrap(); + let params = ObjectStoreParams::default(); + + let n = 8; + let mut handles = Vec::with_capacity(n); + for _ in 0..n { + let registry = registry.clone(); + let url = url.clone(); + let params = params.clone(); + handles.push(tokio::spawn(async move { + registry.get_store(url, ¶ms).await.unwrap() + })); + } + let mut stores = Vec::with_capacity(n); + for h in handles { + stores.push(h.await.expect("task must not panic")); + } + + assert_eq!( + provider.builds.load(Ordering::Relaxed), + 1, + "single-flight must collapse N concurrent cold misses into one provider call", + ); + for store in &stores[1..] { + assert!( + Arc::ptr_eq(&stores[0], store), + "every coalesced waiter must observe the same Arc", + ); + } + } +} diff --git a/java/lance-jni/src/lib.rs b/java/lance-jni/src/lib.rs index 622ce5e878..06cb5e61bd 100644 --- a/java/lance-jni/src/lib.rs +++ b/java/lance-jni/src/lib.rs @@ -84,6 +84,75 @@ pub static RT: LazyLock = LazyLock::new(|| { .expect("Failed to create tokio runtime") }); +/// Process-wide [`lance_io::object_store::ObjectStoreRegistry`] used for JNI +/// default-open paths. +/// +/// When the Java caller does not supply an explicit session, the JNI open +/// path constructs a per-call session that shares this registry. Sharing the +/// registry across calls allows the registry's per-key single-flight to +/// coalesce concurrent cold builds for the same URI, and lets long-lived +/// `ObjectStore` strong references be reused across opens — both of which +/// turn what would otherwise be a thundering herd into a cheap weak-Arc +/// upgrade. +/// +/// # Why the registry is shared but the `Session` is not +/// +/// The JNI default-open path is intentionally asymmetric: the +/// `ObjectStoreRegistry` is process-global, but each open builds a fresh +/// `Session` (which owns the metadata/index caches). This shape is chosen +/// because the two layers cache fundamentally different things: +/// +/// - **Registry → `Arc`**: an HTTP/S3 client, credential chain, +/// and connection pool. Building one is the *expensive* operation +/// (credential probe, IMDS round-trip, TLS handshake) — so this is what +/// the 144-concurrent-open regression was made of, and what the global +/// registry exists to coalesce. +/// +/// The cache key is derived from the provider-specific store prefix +/// (typically scheme + authority — e.g. `s3://bucket` — but providers +/// such as Hugging Face fold `repo_id` in instead) plus the relevant +/// fields of `ObjectStoreParams` (block size, dynamic +/// `storage_options_accessor`'s `provider_id()`, etc.). It does **not** +/// incorporate auth headers, STS tokens, namespace identity, or any +/// bearer credentials. Tenant isolation under sharing therefore relies +/// entirely on callers providing a key-distinguishing input — typically +/// non-empty `storage_options`, a `storage_options_provider` whose +/// `provider_id()` carries tenant identity, or an explicit `session`. +/// +/// Bare-URI opens (empty `storage_options`, no provider, no namespace +/// commit-handler) collapse onto a single cache entry per URI: the first +/// caller's resolved default-credential chain becomes the credentials +/// used by every subsequent caller for the lifetime of that +/// `Arc`. Callers who need cross-tenant isolation under +/// bare URIs MUST opt out via +/// `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING=1`; the resolved bool is +/// consulted on every default-open path. +/// +/// - **Session → metadata/index caches**: query-shaped, sized by +/// `index_cache_size_bytes` and `metadata_cache_size_bytes` from each +/// open's `ReadParams`. Sharing a Session across opens would force every +/// caller to pick the same cache size, would make eviction policy a +/// cross-tenant policy decision, and would let one tenant's hot dataset +/// evict another's. None of those are problems we want to take on inside +/// the JNI bridge — Java callers that want metadata-cache reuse can build +/// their own [`lance::session::Session`] and pass it in explicitly via +/// `BlockingDataset::open` with `session: Some(...)`. +/// +/// # Lifetime +/// +/// This static lives for the lifetime of the process. JVM unload (e.g. via +/// `System.exit`) on most platforms exits the host process, so the +/// registry is dropped along with it; the JNI library is not designed to +/// be unloaded and re-loaded within a single process. Embedders that +/// genuinely need per-JVM isolation — multiple JVMs in one address space +/// or hot-reload of the Lance native library — should construct their own +/// `Session` per JVM and pass it explicitly via +/// `BlockingDataset::open(..., session: Some(...))`, bypassing this +/// static entirely. +pub(crate) static GLOBAL_OBJECT_STORE_REGISTRY: LazyLock< + Arc, +> = LazyLock::new(|| Arc::new(lance_io::object_store::ObjectStoreRegistry::default())); + fn set_timestamp_precision(builder: &mut env_logger::Builder) { if let Ok(timestamp_precision) = env::var("LANCE_LOG_TS_PRECISION") { match timestamp_precision.as_str() { diff --git a/rust/lance-io/src/object_store/providers.rs b/rust/lance-io/src/object_store/providers.rs index 20fa251a0c..177c5ad642 100644 --- a/rust/lance-io/src/object_store/providers.rs +++ b/rust/lance-io/src/object_store/providers.rs @@ -10,13 +10,14 @@ use std::{ }; use object_store::path::Path; -use url::Url; +use tokio::sync::Mutex as AsyncMutex; +use url::{Host, Url}; use crate::object_store::WrappingObjectStore; use crate::object_store::uri_to_url; use super::{ObjectStore, ObjectStoreParams, tracing::ObjectStoreTracingExt}; -use lance_core::error::{Error, LanceOptionExt, Result}; +use lance_core::error::{Error, Result}; #[cfg(feature = "aws")] pub mod aws; @@ -36,6 +37,15 @@ pub mod tencent; #[async_trait::async_trait] pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { + /// Construct a new object store for the given base path and params. + /// + /// **Reentry warning**: implementations MUST NOT recursively call + /// [`ObjectStoreRegistry::get_store`] for the same `(base_path, params)` + /// key. The registry coalesces concurrent cold builds via a per-key + /// async lock held across the call to `new_store`; a re-entrant call + /// would deadlock waiting on the lock the current task already holds. + /// Calling `get_store` for a *different* key (e.g. an underlying delegate + /// store) is safe. async fn new_store(&self, base_path: Url, params: &ObjectStoreParams) -> Result; /// Extract the path relative to the base of the store. @@ -62,16 +72,60 @@ pub trait ObjectStoreProvider: std::fmt::Debug + Sync + Send { /// this will be something like 'az$account_name@container' /// /// Providers should override this if they have special requirements like Azure's. + /// + /// # Userinfo / IPv6 handling + /// + /// Overrides that build the prefix from URL authority MUST use + /// [`sanitized_authority`] (or equivalent) instead of `Url::authority()` — + /// the latter includes any embedded `userinfo@`, which would leak + /// credentials into the cache key. The default impl below already does + /// this. Azure is the only intentional exception: it parses `userinfo` + /// position as the container name, not credentials. fn calculate_object_store_prefix( &self, url: &Url, _storage_options: Option<&HashMap>, ) -> Result { - Ok(format!("{}${}", url.scheme(), url.authority())) + Ok(format!("{}${}", url.scheme(), sanitized_authority(url))) + } +} + +/// Build a cache-key authority component from a URL with two safety properties: +/// +/// 1. Strips any `userinfo@` segment so URL-embedded credentials never land in +/// the cache key (which is logged via `Debug` and observed by anything that +/// calls `calculate_object_store_prefix`). +/// 2. Wraps IPv6 hosts in brackets so `[::1]:9000` is unambiguous — +/// `Url::host_str` returns `"::1"` without brackets, which combined with a +/// port would produce the ambiguous string `"::1:9000"`. +/// +/// Returns just `host[:port]`. Use this in any +/// [`ObjectStoreProvider::calculate_object_store_prefix`] override that would +/// otherwise reach for `Url::authority()`. +pub fn sanitized_authority(url: &Url) -> String { + let host = match url.host() { + Some(Host::Ipv6(addr)) => format!("[{}]", addr), + Some(Host::Ipv4(addr)) => addr.to_string(), + Some(Host::Domain(d)) => d.to_string(), + None => String::new(), + }; + match url.port() { + Some(p) => format!("{}:{}", host, p), + None => host, } } +type CacheKey = (String, ObjectStoreParams); +type BuildLockMap = HashMap>>; + /// Statistics for the object store registry cache. +/// +/// Exposed for in-tree diagnostic consumers (e.g. the +/// `concurrent_open_bench` example, which prints hit/miss counts to prove +/// registry sharing across opens). `#[doc(hidden)]` because this is an +/// observability surface, not a stable API — field names and types may +/// change without a semver bump. +#[doc(hidden)] #[derive(Debug, Clone, Default)] pub struct ObjectStoreRegistryStats { /// Number of cache hits (store was already cached and reused). @@ -80,6 +134,20 @@ pub struct ObjectStoreRegistryStats { pub misses: u64, /// Number of currently active object stores in the cache. pub active_stores: usize, + /// Number of cache keys with an in-flight or recently-finished cold + /// build whose RAII cleanup has not yet acquired the build-locks mutex. + /// + /// Steady-state should be 0 — non-zero indicates either an in-progress + /// thundering herd (expected during cold start) or a stuck builder. + pub build_locks: usize, + /// Number of cold builds that returned an error from the underlying + /// provider (`ObjectStoreProvider::new_store`). + /// + /// Per-call, not per-key: single-flight serializes failures via the + /// per-key lock but does not cache the `Err` result. N waiters racing + /// a failing upstream each retry in turn and bump this counter, so a + /// stuck failure grows roughly linearly with attempts. + pub build_failures: u64, } /// A registry of object store providers. @@ -99,19 +167,63 @@ pub struct ObjectStoreRegistryStats { /// Use [`Self::empty()`] to create an empty registry, with no providers registered. /// /// The registry also caches object stores that are currently in use. It holds -/// weak references to the object stores, so they are not held onto. If an object -/// store is no longer in use, it will be removed from the cache on the next -/// call to either [`Self::active_stores()`] or [`Self::get_store()`]. +/// weak references to the object stores, so they are not held onto. Stale +/// entries are reclaimed lazily: +/// - [`Self::active_stores()`] sweeps the whole map. +/// - [`Self::get_store()`] evicts the queried key on miss, and opportunistically +/// prunes other dead entries on every cold-build insert (bounded work, keyed +/// off in-use opens). #[derive(Debug)] pub struct ObjectStoreRegistry { providers: RwLock>>, // Cache of object stores currently in use. We use a weak reference so the // cache itself doesn't keep them alive if no object store is actually using // it. - active_stores: RwLock>>, + active_stores: RwLock>>, + // Per-key async build locks coalesce concurrent cold builds for the same + // key into one `provider.new_store` call (avoids the thundering herd on + // S3-style providers whose client construction is expensive). + build_locks: std::sync::Mutex, // Cache statistics hits: AtomicU64, misses: AtomicU64, + build_failures: AtomicU64, +} + +/// RAII session for the cold-build path: holds the per-key async lock guard +/// and reclaims the `build_locks` HashMap entry on drop, in that order. +/// +/// `Drop::drop` explicitly releases `guard` *before* running +/// `cleanup_build_lock_if_idle`, so cleanup observes `strong_count == 1` +/// (only the HashMap entry) and can remove the lock entry safely. Encoding +/// the ordering in this Drop body — rather than relying on the declaration +/// order of two separate `let` bindings at the call site — keeps the +/// invariant locally checkable: a future refactor of the cold path cannot +/// accidentally invert it. +/// +/// Drop runs on every exit (success, error, panic, cancellation), so the +/// `build_locks` entry can never leak if the cold-build path is interrupted +/// between acquiring the lock and finishing the build. +/// +/// Owns the `CacheKey` (rather than borrowing) so the session's drop is not +/// tied to the lifetime of a local variable. +struct BuildLockSession<'a> { + registry: &'a ObjectStoreRegistry, + cache_key: CacheKey, + guard: Option>, +} + +impl Drop for BuildLockSession<'_> { + fn drop(&mut self) { + // 1. Release the per-key async lock first by dropping the + // `OwnedMutexGuard`. This decrements the per-key + // `Arc>` strong count. + self.guard.take(); + // 2. Reclaim the `build_locks` entry. With the guard gone, only the + // HashMap entry holds an Arc, so cleanup observes count == 1 and + // safely removes it. + self.registry.cleanup_build_lock_if_idle(&self.cache_key); + } } impl ObjectStoreRegistry { @@ -123,8 +235,10 @@ impl ObjectStoreRegistry { Self { providers: RwLock::new(HashMap::new()), active_stores: RwLock::new(HashMap::new()), + build_locks: std::sync::Mutex::new(HashMap::new()), hits: AtomicU64::new(0), misses: AtomicU64::new(0), + build_failures: AtomicU64::new(0), } } @@ -132,7 +246,7 @@ impl ObjectStoreRegistry { pub fn get_provider(&self, scheme: &str) -> Option> { self.providers .read() - .expect("ObjectStoreRegistry lock poisoned") + .unwrap_or_else(|p| p.into_inner()) .get(scheme) .cloned() } @@ -141,12 +255,18 @@ impl ObjectStoreRegistry { /// /// Calling this will also clean up any weak references to object stores that /// are no longer valid. + /// + /// **Caution**: the returned `Arc` instances may carry + /// per-tenant credential state (S3 access keys, Azure SAS tokens, etc.). + /// Callers that hand these stores out across trust boundaries are + /// responsible for filtering — the registry itself does not know which + /// store belongs to which tenant. pub fn active_stores(&self) -> Vec> { let mut found_inactive = false; let output = self .active_stores .read() - .expect("ObjectStoreRegistry lock poisoned") + .unwrap_or_else(|p| p.into_inner()) .values() .filter_map(|weak| match weak.upgrade() { Some(store) => Some(store), @@ -162,7 +282,7 @@ impl ObjectStoreRegistry { let mut cache_lock = self .active_stores .write() - .expect("ObjectStoreRegistry lock poisoned"); + .unwrap_or_else(|p| p.into_inner()); cache_lock.retain(|_, weak| weak.upgrade().is_some()); } output @@ -170,19 +290,35 @@ impl ObjectStoreRegistry { /// Get cache statistics for monitoring and debugging. /// - /// Returns the number of cache hits, misses, and currently active stores. - /// This is useful for detecting configuration issues that cause excessive - /// cache misses (e.g., storage options that vary per-request). + /// Returns counters for hits, misses, currently active stores, + /// in-flight build locks, and accumulated build failures — see + /// [`ObjectStoreRegistryStats`]. Useful for detecting configuration + /// issues that cause excessive cache misses (e.g., storage options + /// that vary per-request). + /// + /// `#[doc(hidden)]`: paired with [`ObjectStoreRegistryStats`] — this is + /// an unstable observability surface kept `pub` only to support the + /// in-tree `concurrent_open_bench` example. + #[doc(hidden)] pub fn stats(&self) -> ObjectStoreRegistryStats { let active_stores = self .active_stores .read() - .map(|s| s.values().filter(|w| w.strong_count() > 0).count()) - .unwrap_or(0); + .unwrap_or_else(|p| p.into_inner()) + .values() + .filter(|w| w.strong_count() > 0) + .count(); + let build_locks = self + .build_locks + .lock() + .unwrap_or_else(|p| p.into_inner()) + .len(); ObjectStoreRegistryStats { hits: self.hits.load(Ordering::Relaxed), misses: self.misses.load(Ordering::Relaxed), active_stores, + build_locks, + build_failures: self.build_failures.load(Ordering::Relaxed), } } @@ -200,6 +336,15 @@ impl ObjectStoreRegistry { /// If the object store is already in use, it will return a strong reference /// to the object store. If the object store is not in use, it will create a /// new object store and return a strong reference to it. + /// + /// Concurrent cold builds for the same key are coalesced via a per-key + /// async lock: the first task builds the store, all others wait then + /// re-check the cache and observe the freshly populated entry. + /// + /// On build *failure*, the cache is not populated, so each waiter retries + /// in turn (serialized — not parallel-amplified, but not deduplicated + /// either). Transient errors thus surface to operators rather than being + /// masked by stale-error reuse. pub async fn get_store( &self, base_path: Url, @@ -214,57 +359,184 @@ impl ObjectStoreRegistry { provider.calculate_object_store_prefix(&base_path, params.storage_options())?; let cache_key = (cache_path.clone(), params.clone()); - // Check if we have a cached store for this base path and params - { - let maybe_store = self - .active_stores - .read() - .ok() - .expect_ok()? - .get(&cache_key) - .cloned(); - if let Some(store) = maybe_store { - if let Some(store) = store.upgrade() { - self.hits.fetch_add(1, Ordering::Relaxed); - return Ok(store); - } else { - // Remove the weak reference if it is no longer valid - let mut cache_lock = self - .active_stores - .write() - .expect("ObjectStoreRegistry lock poisoned"); - if let Some(store) = cache_lock.get(&cache_key) - && store.upgrade().is_none() - { - // Remove the weak reference if it is no longer valid - cache_lock.remove(&cache_key); - } - } - } + // Fast path: cache hit avoids both the std mutex and the async lock. + if let Some(store) = self.lookup_cached(&cache_key) { + self.hits.fetch_add(1, Ordering::Relaxed); + return Ok(store); } - self.misses.fetch_add(1, Ordering::Relaxed); + // Cold path: per-key single-flight. The RAII session guarantees the + // build_locks entry is GC'd on every exit (success, error, panic, + // cancellation), preventing unbounded HashMap growth. Drop ordering + // (guard released *before* cleanup) is encoded in + // `BuildLockSession::drop`, so the cold path can't accidentally + // invert it via `let` reordering. + let lock = self.acquire_build_lock(&cache_key); + let guard = lock.lock_owned().await; + let _session = BuildLockSession { + registry: self, + cache_key: cache_key.clone(), + guard: Some(guard), + }; - let mut store = provider.new_store(base_path, params).await?; + // Re-check after acquiring the lock — coalesced waiters become hits. + if let Some(store) = self.lookup_cached(&cache_key) { + self.hits.fetch_add(1, Ordering::Relaxed); + log::debug!( + "ObjectStoreRegistry: coalesced wait hit for cache_key path={:?}", + cache_key.0, + ); + return Ok(store); + } - store.inner = store.inner.traced(); + // `fetch_add` returns the prior value; `+ 1` is this attempt's + // 1-indexed sequence number. Use it (not a separate counter) to + // gate the opportunistic sweep below: cadence is approximate + // because cold attempts that fail still bump `misses` but skip + // the sweep block, so a failure at a multiple-of-N sequence + // skips that sweep cycle entirely; the next candidate is the + // next multiple-of-N cold attempt that succeeds. That's fine — + // the sweep is a footprint trim, not a correctness invariant. + // `wrapping_add` only silences debug-build overflow checks; at + // 10M cold opens/sec the wrap is >58 millennia away. + let cold_attempt_nth = self.misses.fetch_add(1, Ordering::Relaxed).wrapping_add(1); + log::debug!( + "ObjectStoreRegistry: cold build starting for cache_key path={:?}", + cache_key.0, + ); + let mut store = match provider.new_store(base_path, params).await { + Ok(s) => s, + Err(e) => { + self.build_failures.fetch_add(1, Ordering::Relaxed); + // Intentionally no log here: provider error Display impls can + // surface raw URLs whose username or query-string component + // may carry credentials (SAS tokens, access keys), and `Url`'s + // own `Display` only masks the password. The full error goes + // to the caller via `Err(e)`; operators wanting an aggregate + // signal can scrape `stats().build_failures`. + return Err(e); + } + }; + store.inner = store.inner.traced(); if let Some(wrapper) = ¶ms.object_store_wrapper { store.inner = wrapper.wrap(&cache_path, store.inner); } - - // Always wrap with IO tracking store.inner = store.io_tracker.wrap("", store.inner); + let cached = Arc::new(store); + // Amortized opportunistic sweep: gate the O(n) `retain` behind a + // mod-N counter so a burst of cold builds with distinct URIs costs + // O(n) work in aggregate, not O(n²). N=64 is small enough that the + // map's footprint stays roughly proportional to live opens, large + // enough that bursty cold-open paths don't pay sweep cost on every + // insert. Per-key `lookup_cached` and full sweep via `active_stores()` + // remain unchanged; this is purely a per-insert cost lever. + // + // Cadence is "every 64th cold attempt" (1-indexed via the pre-bump + // above), so the first sweep fires at the 64th, not the 0th, and + // a near-empty map never pays the retain cost on its very first + // cold build. + const SWEEP_INTERVAL: u64 = 64; + let should_sweep = cold_attempt_nth.is_multiple_of(SWEEP_INTERVAL); + { + let mut cache_lock = self + .active_stores + .write() + .unwrap_or_else(|p| p.into_inner()); + // Safe under the write lock: no other writer can resurrect a weak + // ref between `retain` and `insert`, and `insert` for `cache_key` + // overwrites whatever `retain` left (live or dead) for that key. + if should_sweep { + cache_lock.retain(|_, weak| weak.upgrade().is_some()); + } + cache_lock.insert(cache_key, Arc::downgrade(&cached)); + } + Ok(cached) + } - let store = Arc::new(store); + /// Acquire (or create) the per-key async build lock for `cache_key`. + /// + /// On poison: recovers via `into_inner()` rather than bricking the + /// registry. The protected data is just a HashMap of build-lock Arcs; + /// any stale entry left by a panic is reclaimed by `BuildLockSession`. + fn acquire_build_lock(&self, cache_key: &CacheKey) -> Arc> { + let mut locks = self + .build_locks + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + locks + .entry(cache_key.clone()) + .or_insert_with(|| Arc::new(AsyncMutex::new(()))) + .clone() + } + /// Drop the build-lock entry for `cache_key` if no waiters remain. + /// + /// Strong-count == 1 means only the HashMap entry references the Arc, so + /// no concurrent task is using or queued on the lock. The HashMap mutex + /// makes this atomic: a new caller can't clone the Arc between the count + /// read and the removal because `acquire_build_lock` takes the same mutex. + /// A waiter parked inside `lock_owned()` holds its own Arc clone in the + /// future, so it counts toward `strong_count` and keeps the entry alive + /// until it wakes. + /// + /// Best-effort: if a task drops without calling this (e.g. between + /// dropping the build lock and entering this function), the entry is + /// reclaimed by the next caller for the same key, who finds count == 1 + /// and removes it. + /// + /// Bound: per-key cleanup is sufficient — no periodic full-map sweep is + /// needed. The map's worst-case size is the number of distinct cache keys + /// with an in-flight builder *plus* keys whose builder has finished but + /// whose `BuildLockSession::drop` has not yet acquired the std mutex. + /// Both terms are bounded by concurrently outstanding cold opens, which + /// the underlying I/O concurrency limits already cap. RAII guarantees + /// every successful `acquire_build_lock` is paired with exactly one + /// `cleanup_build_lock_if_idle`, on every exit path. + fn cleanup_build_lock_if_idle(&self, cache_key: &CacheKey) { + let mut locks = self + .build_locks + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if let Some(entry) = locks.get(cache_key) + && Arc::strong_count(entry) == 1 { - // Insert the store into the cache - let mut cache_lock = self.active_stores.write().ok().expect_ok()?; - cache_lock.insert(cache_key, Arc::downgrade(&store)); + locks.remove(cache_key); } + } - Ok(store) + #[cfg(test)] + fn build_locks_len(&self) -> usize { + self.build_locks + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .len() + } + + /// Look up a cached store by key, evicting the entry if its weak ref is + /// no longer upgradable. + fn lookup_cached(&self, cache_key: &CacheKey) -> Option> { + let maybe_weak = self + .active_stores + .read() + .unwrap_or_else(|p| p.into_inner()) + .get(cache_key) + .cloned(); + let weak = maybe_weak?; + if let Some(store) = weak.upgrade() { + return Some(store); + } + // Stale weak; evict under the write lock. + let mut cache_lock = self + .active_stores + .write() + .unwrap_or_else(|p| p.into_inner()); + if let Some(weak) = cache_lock.get(cache_key) + && weak.upgrade().is_none() + { + cache_lock.remove(cache_key); + } + None } /// Calculate the datastore prefix based on the URI and the storage options. @@ -333,8 +605,10 @@ impl Default for ObjectStoreRegistry { Self { providers: RwLock::new(providers), active_stores: RwLock::new(HashMap::new()), + build_locks: std::sync::Mutex::new(HashMap::new()), hits: AtomicU64::new(0), misses: AtomicU64::new(0), + build_failures: AtomicU64::new(0), } } } @@ -345,7 +619,7 @@ impl ObjectStoreRegistry { pub fn insert(&self, scheme: &str, provider: Arc) { self.providers .write() - .expect("ObjectStoreRegistry lock poisoned") + .unwrap_or_else(|p| p.into_inner()) .insert(scheme.into(), provider); } } @@ -380,6 +654,54 @@ mod tests { ); } + /// `userinfo@host` (URL-embedded credentials) MUST be stripped from the + /// cache-key prefix so the secret never lands in HashMap Debug logs or + /// downstream state. Two URLs that differ only in `userinfo` collapse to + /// the same prefix; multi-tenant isolation has to come from + /// `ObjectStoreParams` (storage_options / accessor), not URL embedding. + #[test] + fn test_calculate_object_store_prefix_strips_userinfo() { + let provider = DummyProvider; + let with_creds = Url::parse("dummy://user:s3cret@blah/path").unwrap(); + let without = Url::parse("dummy://blah/path").unwrap(); + let with_prefix = provider + .calculate_object_store_prefix(&with_creds, None) + .unwrap(); + assert_eq!("dummy$blah", with_prefix); + assert_eq!( + with_prefix, + provider + .calculate_object_store_prefix(&without, None) + .unwrap(), + ); + // Defense in depth: assert the secret literally cannot appear in the + // key, regardless of formatting changes. + assert!(!with_prefix.contains('@')); + assert!(!with_prefix.contains("s3cret")); + assert!(!with_prefix.contains("user")); + } + + #[test] + fn test_calculate_object_store_prefix_keeps_port() { + let provider = DummyProvider; + let url = Url::parse("dummy://host:9000/path").unwrap(); + assert_eq!( + "dummy$host:9000", + provider.calculate_object_store_prefix(&url, None).unwrap() + ); + } + + #[test] + fn test_sanitized_authority_brackets_ipv6() { + // Url::host_str returns "::1" without brackets; combined with a port, + // the naive `format!("{}:{}", host, port)` would produce the + // ambiguous string "::1:9000". sanitized_authority restores brackets. + let url = Url::parse("dummy://[::1]:9000/path").unwrap(); + assert_eq!("[::1]:9000", sanitized_authority(&url)); + let url_no_port = Url::parse("dummy://[2001:db8::1]/path").unwrap(); + assert_eq!("[2001:db8::1]", sanitized_authority(&url_no_port)); + } + #[test] fn test_calculate_object_store_scheme_not_found() { let registry = ObjectStoreRegistry::empty(); @@ -460,7 +782,253 @@ mod tests { ); } - // Same params returns same instance + // The cache reused the entry for the second call (hits == 1 above); + // when the same params hit the same cache slot, the returned Arcs + // point to the same underlying ObjectStore. assert!(Arc::ptr_eq(&stores[0], &stores[1])); } + + /// A provider that counts `new_store` invocations and yields explicitly, + /// so concurrent callers reliably contend on the registry's build lock. + #[derive(Debug, Default)] + struct CountingProvider { + builds: AtomicU64, + } + + #[async_trait::async_trait] + impl ObjectStoreProvider for CountingProvider { + async fn new_store( + &self, + base_path: Url, + params: &ObjectStoreParams, + ) -> Result { + self.builds.fetch_add(1, Ordering::Relaxed); + // Force a yield + delay so other tasks reach `get_store` and queue + // on the build lock before this build completes. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + memory::MemoryStoreProvider + .new_store(base_path, params) + .await + } + } + + /// Concurrent cold-misses for the same key must coalesce into a single + /// `provider.new_store` invocation, with all callers receiving the same Arc. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_get_store_coalesces_concurrent_misses() { + let provider = Arc::new(CountingProvider::default()); + let registry = Arc::new(ObjectStoreRegistry::empty()); + registry.insert("counting", provider.clone()); + + let url = Url::parse("counting://test").unwrap(); + let params = ObjectStoreParams::default(); + + let n = 16; + let mut handles = Vec::with_capacity(n); + for _ in 0..n { + let registry = registry.clone(); + let url = url.clone(); + let params = params.clone(); + handles.push(tokio::spawn(async move { + registry.get_store(url, ¶ms).await.unwrap() + })); + } + let mut stores = Vec::with_capacity(n); + for h in handles { + stores.push(h.await.unwrap()); + } + + assert_eq!( + provider.builds.load(Ordering::Relaxed), + 1, + "single-flight must collapse N concurrent misses into 1 build" + ); + // All concurrent callers share the same cached Arc — single-flight + // produced exactly one ObjectStore and the registry handed it out + // to every waiter. + for s in &stores[1..] { + assert!(Arc::ptr_eq(&stores[0], s)); + } + let s = registry.stats(); + assert_eq!((s.misses, s.hits as usize, s.active_stores), (1, n - 1, 1)); + } + + /// After a successful build, the per-key build lock must be removed so + /// `build_locks` does not grow unbounded across many distinct keys. + #[tokio::test] + async fn test_get_store_cleans_build_locks_after_use() { + let registry = ObjectStoreRegistry::default(); + let params = ObjectStoreParams::default(); + + for i in 0..5 { + let url = Url::parse(&format!("memory://cleanup-{}", i)).unwrap(); + let _store = registry.get_store(url, ¶ms).await.unwrap(); + } + assert_eq!( + registry.build_locks_len(), + 0, + "build_locks must be empty once all builds settle" + ); + } + + /// A provider whose `new_store` panics, used to verify the RAII cleanup + /// guard reclaims build_locks entries on panic. + #[derive(Debug, Default)] + struct PanickingProvider; + + #[async_trait::async_trait] + impl ObjectStoreProvider for PanickingProvider { + async fn new_store( + &self, + _base_path: Url, + _params: &ObjectStoreParams, + ) -> Result { + panic!("boom from provider") + } + } + + /// A provider that returns an `Err` after a yield, simulating a normal + /// (non-panic) build failure such as bad credentials or a network error. + /// Counts invocations so the test can assert the single-flight collapsed + /// N concurrent failures into 1 underlying call. + #[derive(Debug, Default)] + struct FailingProvider { + builds: AtomicU64, + } + + #[async_trait::async_trait] + impl ObjectStoreProvider for FailingProvider { + async fn new_store( + &self, + _base_path: Url, + _params: &ObjectStoreParams, + ) -> Result { + self.builds.fetch_add(1, Ordering::Relaxed); + // Yield + delay so concurrent callers reach the build lock and + // queue behind us — this is the configuration that exercises + // the coalesced-failure path in `get_store`. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + Err(Error::invalid_input("simulated cold-build failure")) + } + } + + /// Build-Err (non-panic) path under coalesced contention asserts the + /// shipped behavior of PR#1's single-flight: failures **serialize via + /// the per-key lock but are not cached**. Each of N waiters acquires + /// the lock in turn, re-checks the (still-empty) cache, and retries + /// the build. The lock keeps failures from *fanning out in parallel* + /// (so an upstream that costs T ms sees ~N·T total, not T concurrent), + /// but it does not collapse them into one call. + /// + /// Concretely this test pins: + /// 1. `provider.new_store` fires once per caller (no Err coalescing), + /// 2. every concurrent caller surfaces an `Err`, + /// 3. `build_failures` increments per failed call (not per key), + /// 4. `active_stores` stays 0 — failed builds must never be cached, + /// 5. `build_locks` drains back to 0 via the RAII cleanup, even on + /// the Err return path. + /// + /// Caching errors (with TTL or until-eviction) is a separate design + /// point. This test exists to lock in the current contract so a future + /// "let's cache Err for 100ms" change is forced to update this guard + /// deliberately rather than land silently. + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_get_store_serializes_build_failures_without_caching() { + let provider = Arc::new(FailingProvider::default()); + let registry = Arc::new(ObjectStoreRegistry::empty()); + registry.insert("failing", provider.clone()); + + let url = Url::parse("failing://x").unwrap(); + let params = ObjectStoreParams::default(); + + let n = 8; + let mut handles = Vec::with_capacity(n); + for _ in 0..n { + let registry = registry.clone(); + let url = url.clone(); + let params = params.clone(); + handles.push(tokio::spawn(async move { + registry.get_store(url, ¶ms).await + })); + } + + let mut err_count = 0; + for h in handles { + let result = h.await.expect("task must not panic"); + assert!(result.is_err(), "every waiter must surface Err"); + err_count += 1; + } + assert_eq!(err_count, n, "all N callers must observe an error"); + + assert_eq!( + provider.builds.load(Ordering::Relaxed) as usize, + n, + "shipped behavior: failures are not cached, so each waiter retries the build", + ); + + let s = registry.stats(); + assert_eq!( + s.build_failures as usize, n, + "build_failures counts every failed call (per-call, not per-key)", + ); + assert_eq!( + s.active_stores, 0, + "failed builds must never populate active_stores", + ); + assert_eq!( + s.build_locks, 0, + "RAII cleanup must drain build_locks even on Err path", + ); + } + + /// If `new_store` panics, the RAII guard must still remove the build_locks + /// entry — otherwise a single bad URI would leak a lock per failed open. + /// + /// Relies on the default `panic = "unwind"` profile: the panic unwinds + /// the spawned task and `Drop` runs on the way out, which is what triggers + /// `BuildLockSession`. With `panic = "abort"`, the process dies before + /// cleanup can run — that's a stricter regime where this test (and the + /// leak it guards against) becomes moot. + #[tokio::test] + async fn test_get_store_cleans_build_locks_after_panic() { + let registry = Arc::new(ObjectStoreRegistry::empty()); + registry.insert("boom", Arc::new(PanickingProvider)); + let url = Url::parse("boom://x").unwrap(); + let params = ObjectStoreParams::default(); + + let registry_for_task = registry.clone(); + let result = + tokio::task::spawn(async move { registry_for_task.get_store(url, ¶ms).await }) + .await; + assert!( + result.as_ref().err().is_some_and(|e| e.is_panic()), + "task must propagate the panic" + ); + assert_eq!( + registry.build_locks_len(), + 0, + "RAII guard must clean up the build_locks entry on panic" + ); + } + + /// Regression: two callers resolving the same cache key must receive + /// the same cached `Arc`. Single-flight correctness depends + /// on this — see `test_get_store_coalesces_concurrent_misses` for the + /// concurrent variant. + #[tokio::test] + async fn test_get_store_returns_shared_arc_on_hit() { + let registry = ObjectStoreRegistry::default(); + let params = ObjectStoreParams::default(); + let url = Url::parse("memory://shared-arc").unwrap(); + + let a = registry.get_store(url.clone(), ¶ms).await.unwrap(); + let b = registry.get_store(url, ¶ms).await.unwrap(); + + let s = registry.stats(); + assert_eq!((s.misses, s.hits, s.active_stores), (1, 1, 1)); + assert!( + Arc::ptr_eq(&a, &b), + "cache hit must return the same Arc as the cold-build caller" + ); + } }