Skip to content

feat(jni,io): default-open Session reuse + ObjectStoreRegistry single-flight#6839

Open
LuciferYang wants to merge 3 commits into
lance-format:mainfrom
LuciferYang:feat/session-reuse-singleflight
Open

feat(jni,io): default-open Session reuse + ObjectStoreRegistry single-flight#6839
LuciferYang wants to merge 3 commits into
lance-format:mainfrom
LuciferYang:feat/session-reuse-singleflight

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang commented May 19, 2026

Closes #6838

Coalesces concurrent cold Dataset::open against the same URI so N parallel opens reuse one ObjectStore instead of each rebuilding the credential chain and HTTP client.

Two changes, both required:

lance-io — registry single-flight. ObjectStoreRegistry::get_store gains a per-CacheKey tokio::sync::Mutex so concurrent cold builds serialize behind one in-flight build(). The lock map entry is freed via a BuildLockSession RAII guard that drops the inner mutex first, then removes the HashMap entry if it observes strong_count == 1, making panics / errors / cancellations all reclaim the slot. Failures are not cached (one-Err-per-N-waiters retry); an opportunistic sweep gated on cold_attempt_nth % 64 == 0 amortizes HashMap shrinkage.

lance-jni — default-open registry sharing. BlockingDataset::open without an explicit Session now uses a process-wide LazyLock<Arc<ObjectStoreRegistry>> (the Session itself is still per-call so metadata/index caches stay isolated). This is what makes the lance-io fix actually coalesce across JNI default opens — without it each open had its own registry and had nothing to coalesce against.

Bare-URI opens (empty storage options, no provider, no namespace handler) collapse onto one cache entry per URI: the first caller's resolved default-credential chain becomes the credentials used by every subsequent caller for the lifetime of that Arc<ObjectStore>. This is documented in java/README.md along with three opt-outs:

  • env LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING={1,true,yes}
  • pass tenant-distinguishing storage options
  • supply an explicit Session

Benchmark

144 concurrent opens against the same URI, semaphore-gated to 12, S3 → MinIO :9000, BENCH_SHARED_SESSION=1, run via the standalone reproducer in the Tests section below:

wall p50 p99 hits / misses
baseline ad9f38227 9.50 s 784 ms 828 ms 0 / 144
this PR 0.18 s 2.3 ms 144 ms 143 / 1

no-shared mode (each open creates its own Session) is unchanged between branches, as expected.

Tests

  • lance-io: hit/miss, single-flight coalescing, panic + error RAII cleanup, sanitized_authority (IPv6 / userinfo / non-default port), opportunistic sweep counter.
  • lance-jni: env-var truthy whitelist + warn path, DisableSharingTestGuard ordering, end-to-end test driving 144 concurrent default-opens through the JNI layer and asserting a single ObjectStore build.
  • Standalone reproducer for the benchmark above, drop into rust/lance/examples/concurrent_open_bench.rs:
concurrent_open_bench.rs
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Concurrent `Dataset::open` reproducer
//!
//! Reproduces the JNI default-open path bottleneck: when many threads call
//! `Dataset::open(uri)` concurrently with no shared `Session`, every call
//! constructs a fresh `ObjectStoreRegistry` and a cold object-store client,
//! which dominates wall time even when most opens hit the same URI.
//!
//! Usage:
//! ```bash
//! BENCH_URI=s3://bucket/foo.lance \
//!   BENCH_CONCURRENCY=12 BENCH_TOTAL=144 \
//!   BENCH_STORAGE_OPTS='aws_endpoint=http://localhost:9000,aws_access_key_id=...' \
//!   cargo run --release --example concurrent_open_bench
//! ```
//!
//! Set `BENCH_SHARED_SESSION=1` to thread one `Arc<Session>` through every
//! `DatasetBuilder` — i.e. the candidate fix — and compare wall time + the
//! registry's hit/miss counters against the baseline.

#![allow(clippy::print_stdout)]

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use lance::dataset::builder::DatasetBuilder;
use lance::session::Session;
use tokio::sync::Semaphore;

fn parse_storage_opts(s: &str) -> HashMap<String, String> {
    s.split(',')
        .filter_map(|kv| {
            let kv = kv.trim();
            if kv.is_empty() {
                return None;
            }
            let mut it = kv.splitn(2, '=');
            Some((it.next()?.trim().to_string(), it.next()?.trim().to_string()))
        })
        .collect()
}

fn percentile(latencies: &[Duration], p: f64) -> Duration {
    if latencies.is_empty() {
        return Duration::ZERO;
    }
    let mut sorted: Vec<_> = latencies.to_vec();
    sorted.sort();
    let idx = ((p / 100.0) * (sorted.len() - 1) as f64).round() as usize;
    sorted[idx]
}

fn env_usize(key: &str, default: usize) -> usize {
    std::env::var(key)
        .ok()
        .and_then(|s| s.parse().ok())
        .unwrap_or(default)
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let uri = std::env::var("BENCH_URI").map_err(|_| "BENCH_URI must be set")?;
    let concurrency = env_usize("BENCH_CONCURRENCY", 12);
    let total = env_usize("BENCH_TOTAL", 144);
    let storage_opts = std::env::var("BENCH_STORAGE_OPTS")
        .map(|s| parse_storage_opts(&s))
        .unwrap_or_default();
    let shared_session = std::env::var("BENCH_SHARED_SESSION").is_ok();
    let warm = std::env::var("BENCH_WARM").is_ok();

    println!("=== concurrent_open_bench ===");
    println!("URI               = {}", uri);
    println!("concurrency       = {}", concurrency);
    println!("total opens       = {}", total);
    println!(
        "storage_opts keys = {:?}",
        storage_opts.keys().collect::<Vec<_>>()
    );
    println!("shared_session    = {}", shared_session);
    println!("warm (1 prep open)= {}", warm);
    println!();

    let shared = if shared_session {
        Some(Arc::new(Session::default()))
    } else {
        None
    };

    // Hold the warm dataset alive so its Arc<ObjectStore> keeps the registry's
    // weak entry upgradeable for the entire concurrent run — otherwise the first
    // open's strong Arc dies before the next acquires the permit.
    let _warm_dataset_keepalive = if warm {
        let mut b = DatasetBuilder::from_uri(&uri).with_storage_options(storage_opts.clone());
        if let Some(s) = shared.as_ref() {
            b = b.with_session(s.clone());
        }
        let t0 = Instant::now();
        let ds = b.load().await?;
        println!("warmup open = {:?}", t0.elapsed());
        println!();
        Some(ds)
    } else {
        None
    };

    let semaphore = Arc::new(Semaphore::new(concurrency));
    let mut handles = Vec::with_capacity(total);
    let wall_start = Instant::now();

    for i in 0..total {
        let uri = uri.clone();
        let storage_opts = storage_opts.clone();
        let semaphore = semaphore.clone();
        let shared = shared.clone();
        handles.push(tokio::spawn(async move {
            let _permit = semaphore.acquire_owned().await.unwrap();
            let t0 = Instant::now();
            let mut builder = DatasetBuilder::from_uri(&uri).with_storage_options(storage_opts);
            if let Some(s) = shared {
                builder = builder.with_session(s);
            }
            let res = builder.load().await;
            let elapsed = t0.elapsed();
            (i, res.is_ok(), elapsed, res.err().map(|e| e.to_string()))
        }));
    }

    let mut latencies = Vec::with_capacity(total);
    let mut errors: Vec<(usize, String)> = Vec::new();
    for h in handles {
        let (i, ok, elapsed, err) = h.await?;
        latencies.push(elapsed);
        if !ok {
            errors.push((i, err.unwrap_or_default()));
        }
    }
    let wall = wall_start.elapsed();

    println!("--- results ---");
    println!("wall            = {:?}", wall);
    println!("succeeded       = {} / {}", total - errors.len(), total);
    println!("p50 per-open    = {:?}", percentile(&latencies, 50.0));
    println!("p95 per-open    = {:?}", percentile(&latencies, 95.0));
    println!("p99 per-open    = {:?}", percentile(&latencies, 99.0));
    println!(
        "max per-open    = {:?}",
        latencies.iter().max().copied().unwrap_or_default()
    );

    if let Some(s) = shared {
        let stats = s.store_registry().stats();
        println!("registry hits   = {}", stats.hits);
        println!("registry misses = {}", stats.misses);
        println!("registry active = {}", stats.active_stores);
    } else {
        println!("(no shared session — registry stats unavailable)");
    }

    if !errors.is_empty() {
        println!();
        println!("--- first 5 errors ---");
        for (i, e) in errors.iter().take(5) {
            println!("  task {}: {}", i, e);
        }
    }
    Ok(())
}

Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

…-flight

Coalesce concurrent cold `Dataset.open` calls in the JNI layer so 144
parallel opens against the same URI reuse one `ObjectStore` instead of
each rebuilding the credential chain and HTTP client.

# JNI default-open path (lance-jni)

`BlockingDataset::open` without an explicit `Session` now constructs a
fresh per-call `Session` whose `ObjectStoreRegistry` is the
process-wide `GLOBAL_OBJECT_STORE_REGISTRY` (a `LazyLock<Arc<...>>`).
Sharing only the registry — not the `Session` — keeps the asymmetric
caching shape: the registry coalesces expensive `ObjectStore` builds
(credential probe, IMDS, TLS handshake), while metadata/index caches
remain per-call so callers do not share eviction policy or cache size
with other tenants.

Bare-URI opens (empty storage options, no provider, no namespace
commit handler) collapse onto a single cache entry per URI: the first
caller's resolved default-credential chain becomes the credentials
used by every subsequent caller for the lifetime of that
`Arc<ObjectStore>`. This cross-tenant credential bleed is intentional
under the bare-URI invariant; callers that need per-tenant isolation
can opt out via `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING={1,true,yes}`,
pass tenant-distinguishing storage options, or supply an explicit
`Session`.

The opt-out env var is parsed once into an `AtomicU8` tri-state
(`UNINIT/ENABLED/DISABLED`) on first read, with a truthy whitelist
(`1|true|yes` — anything else falls back to enabled and warns). A
`DisableSharingTestGuard` RAII helper resets the atomic for unit
tests so the env-var probe can be exercised deterministically.

# Registry single-flight (lance-io)

`ObjectStoreRegistry::get_store` adds a per-key build-lock so concurrent
cold builds for the same `CacheKey` serialize behind one in-flight
`build()` instead of all racing the credential chain. The lock map is
`Mutex<HashMap<CacheKey, Arc<tokio::sync::Mutex<()>>>>`; each caller
acquires its slot via a `BuildLockSession` RAII guard whose `Drop`
releases the inner mutex first, then opportunistically removes the
HashMap entry if it observes `strong_count == 1`. This ordering
guarantees that panics, errors, and cancellations all reclaim the
build-lock entry without leaking a permanently-locked key.

Failures are not cached — when a cold build returns `Err`, the next
waiter retries from scratch (one-Err-per-N-waiters retry contract).
An opportunistic sweep gated by `cold_attempt_nth.is_multiple_of(64)`
amortizes HashMap shrinkage across cold-path attempts so a long-lived
process does not accumulate stale entries from short-lived URIs.

The `CacheKey` uses `sanitized_authority` derived from `Url::host()`
(IPv6 brackets preserved, port preserved, userinfo stripped) so URIs
that differ only in embedded credentials still collapse onto the same
cache slot.

# Tests

- `lance-io`: hit/miss, single-flight coalescing, panic-path RAII
  cleanup, error-path RAII cleanup, `sanitized_authority` over IPv6
  / userinfo / non-default port, opportunistic sweep counter.
- `lance-jni`: `parse_disable_value` truthy whitelist + warn path,
  `DisableSharingTestGuard` ordering, end-to-end coalescing test
  that drives 144 concurrent default-opens through the JNI layer
  and asserts a single `ObjectStore` is built.

# Configuration

`java/README.md` documents the `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING`
env var, the bare-URI credential-bleed invariant, and the three
opt-out paths (env var, tenant-distinguishing storage options,
explicit `Session`).
@LuciferYang LuciferYang force-pushed the feat/session-reuse-singleflight branch from d26a288 to 9e5b879 Compare May 19, 2026 11:58
@LuciferYang LuciferYang marked this pull request as ready for review May 19, 2026 12:11
@codecov
Copy link
Copy Markdown

codecov Bot commented May 19, 2026

Codecov Report

❌ Patch coverage is 93.65079% with 16 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance-io/src/object_store/providers.rs 93.65% 3 Missing and 13 partials ⚠️

📢 Thoughts on this report? Let us know!

@LuciferYang LuciferYang marked this pull request as draft May 20, 2026 12:56
@LuciferYang LuciferYang marked this pull request as ready for review May 20, 2026 12:56
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Concurrent Dataset::open against the same URI rebuilds ObjectStore N times under a shared Session

1 participant