Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ JVM engine connectors can be built using the Lance Java SDK. Here are some conne
* [Flink Lance connector](https://github.com/lancedb/lance-flink)
* [Trino Lance connector](https://github.com/lancedb/lance-trino)

## Configuration

Environment variables read by the JNI layer at runtime:

| Variable | Default | Effect |
|---|---|---|
| `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` | unset (sharing on) | Set to `1`/`true`/`yes` to give every default `Dataset.open` (no explicit `Session`) a fresh `ObjectStoreRegistry`. Disables the process-wide registry that coalesces concurrent cold builds for the same URI. |

Default-open paths share an `ObjectStoreRegistry` so concurrent opens against the same URI reuse one `ObjectStore` instead of each rebuilding the credential chain and HTTP client. Bare-URI opens (empty storage options, no provider, no namespace commit handler) collapse onto a single cache entry per URI: the first caller's resolved default credentials become the credentials used by every subsequent caller for the lifetime of that `ObjectStore`. Callers that need cross-tenant isolation under bare URIs should either pass an explicit `Session`, supply tenant-distinguishing storage options, or set the variable above to opt out entirely.

## Contributing

From the codebase dimension, the lance project is a multiple-lang project. All Java-related code is located in the `java` directory.
Expand Down
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions java/lance-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ roaring = "0.11"
prost-types = "0.14.1"
chrono = "0.4.41"

[dev-dependencies]
# Test-only: builds Url values for the JNI default-open coalescing test in
# src/blocking_dataset.rs. Workspace pins this same version transitively.
url = "2.5.7"

[profile.dev]
debug = "line-tables-only"
incremental = false
Expand Down
345 changes: 345 additions & 0 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,155 @@ use std::time::{Duration, UNIX_EPOCH};

pub const NATIVE_DATASET: &str = "nativeDatasetHandle";

/// Reads `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` once at first access and
/// caches the result.
///
/// Only `"1" | "true" | "yes"` (case-insensitive) disable sharing. Empty / unset
/// keeps sharing enabled — `export VAR=` does NOT activate the escape hatch.
/// Logs a warning the first time the value is read when sharing is disabled,
/// and a separate warning when the variable is set to a value the parser does
/// not recognize.
///
/// Backed by `AtomicU8` rather than `OnceLock<bool>` so unit tests can override
/// the resolved value via `DisableSharingTestGuard` without spawning a fresh
/// process per case.
///
/// State encoding:
/// - 0 = uninitialized (read env on next access)
/// - 1 = sharing enabled
/// - 2 = sharing disabled
const SHARING_UNINIT: u8 = 0;
const SHARING_ENABLED: u8 = 1;
const SHARING_DISABLED: u8 = 2;

static DISABLE_DEFAULT_REGISTRY_SHARING: std::sync::atomic::AtomicU8 =
std::sync::atomic::AtomicU8::new(SHARING_UNINIT);

fn disable_default_registry_sharing() -> bool {
use std::sync::atomic::Ordering;
match DISABLE_DEFAULT_REGISTRY_SHARING.load(Ordering::Relaxed) {
SHARING_ENABLED => false,
SHARING_DISABLED => true,
_ => {
// First read: resolve from env, warn if appropriate, then publish.
// Under a race multiple threads may each observe UNINIT and emit
// duplicate `log::warn!` lines — the env read is idempotent so the
// resolved bool is the same. Cheap duplication is the price of an
// `AtomicU8` that tests can override.
let raw = std::env::var("LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING").ok();
let disabled = match raw.as_deref().map(parse_disable_value) {
Some(Some(parsed)) => parsed,
Some(None) => {
// Set but not recognized — warn so a misspelled escape hatch
// (e.g. `=on`, `=y`) surfaces instead of silently keeping
// sharing enabled and looking like the env var "did nothing".
log::warn!(
"LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING={:?} is unrecognized; \
keeping sharing enabled. Use 1/true/yes to disable.",
raw.as_deref().unwrap_or(""),
);
false
}
None => false,
};
if disabled {
log::warn!(
"LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING is set; JNI default-open will \
use a fresh ObjectStoreRegistry per call (single-flight coalescing \
disabled)."
);
}
DISABLE_DEFAULT_REGISTRY_SHARING.store(
if disabled {
SHARING_DISABLED
} else {
SHARING_ENABLED
},
Ordering::Relaxed,
);
disabled
}
}
}

/// Parse a `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` env value.
///
/// Returns (after trim + ASCII lowercase):
/// - `Some(true)` for `1`/`true`/`yes` — operator opted out of sharing.
/// - `Some(false)` for empty, whitespace-only, `0`/`false`/`no` — explicit keep-default.
/// - `None` for anything else (typos, `on`, `y`, numeric noise) — caller
/// should warn and fall back to default to keep misconfigurations visible.
///
/// Pure helper extracted from [`disable_default_registry_sharing`] so the
/// truthiness rules can be unit-tested without manipulating process env vars.
fn parse_disable_value(raw: &str) -> Option<bool> {
// `trim()` canonicalizes whitespace-only input to `""`, so the empty-string
// arm catches both literal `""` and ` `/`\n` etc. — keeping the doc
// contract honest with a single match arm.
match raw.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "yes" => Some(true),
"" | "0" | "false" | "no" => Some(false),
_ => None,
}
}

/// Pick the `ObjectStoreRegistry` for the JNI default-open path.
///
/// When sharing is enabled (the default), every default-open call reuses the
/// process-wide [`crate::GLOBAL_OBJECT_STORE_REGISTRY`] so concurrent cold
/// builds for the same URI coalesce on its single-flight. When the
/// `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING` escape hatch is set, each call
/// gets a fresh registry — pre-PR isolation at the cost of single-flight.
///
/// Extracted into a pure helper so the selection logic is unit-testable
/// without spinning up a JVM.
fn select_default_open_registry(
disable_sharing: bool,
) -> Arc<lance_io::object_store::ObjectStoreRegistry> {
if disable_sharing {
Arc::new(lance_io::object_store::ObjectStoreRegistry::default())
} else {
crate::GLOBAL_OBJECT_STORE_REGISTRY.clone()
}
}

#[cfg(test)]
/// RAII guard that restores `DISABLE_DEFAULT_REGISTRY_SHARING` to its prior
/// value on drop. Tests should always go through this guard so they cannot
/// leak state into sibling tests run in the same process.
///
/// Each guard captures and restores its own snapshot, so nested guards compose
/// correctly: LIFO drop order means the outer guard's snapshot always wins,
/// returning the flag to the value seen before the outermost `set` call.
struct DisableSharingTestGuard {
prior: u8,
}

#[cfg(test)]
impl DisableSharingTestGuard {
fn set(disabled: bool) -> Self {
use std::sync::atomic::Ordering;
let prior = DISABLE_DEFAULT_REGISTRY_SHARING.load(Ordering::Relaxed);
DISABLE_DEFAULT_REGISTRY_SHARING.store(
if disabled {
SHARING_DISABLED
} else {
SHARING_ENABLED
},
Ordering::Relaxed,
);
Self { prior }
}
}

#[cfg(test)]
impl Drop for DisableSharingTestGuard {
fn drop(&mut self) {
use std::sync::atomic::Ordering;
DISABLE_DEFAULT_REGISTRY_SHARING.store(self.prior, Ordering::Relaxed);
}
}

impl FromJObjectWithEnv<BasePath> for JObject<'_> {
fn extract_object(&self, env: &mut JNIEnv<'_>) -> Result<BasePath> {
let id = env.get_u32_from_method(self, "getId")?;
Expand Down Expand Up @@ -175,6 +324,21 @@ impl BlockingDataset {
storage_options_accessor: accessor,
..Default::default()
};

// Default-open path: share the process-wide registry so concurrent
// opens for the same URI coalesce on single-flight; each call still
// gets its own `Session`. Tenant-isolation contract is documented on
// [`crate::GLOBAL_OBJECT_STORE_REGISTRY`]; opt out via
// `LANCE_JNI_DISABLE_DEFAULT_REGISTRY_SHARING=1`.
let session = session.or_else(|| {
let registry = select_default_open_registry(disable_default_registry_sharing());
Some(Arc::new(LanceSession::new(
index_cache_size_bytes as usize,
metadata_cache_size_bytes as usize,
registry,
)))
});

let params = ReadParams {
index_cache_size_bytes: index_cache_size_bytes as usize,
metadata_cache_size_bytes: metadata_cache_size_bytes as usize,
Expand Down Expand Up @@ -3646,3 +3810,184 @@ fn inner_get_zonemap_stats<'local>(

Ok(array_list)
}

#[cfg(test)]
mod default_open_registry_tests {
use super::*;

/// Default-open path must reuse the process-wide
/// `GLOBAL_OBJECT_STORE_REGISTRY` so concurrent opens can coalesce on its
/// single-flight. Compare by `Arc::ptr_eq` — a fresh registry would have
/// a distinct allocation.
#[test]
fn select_default_open_registry_reuses_global_when_sharing_enabled() {
let registry = select_default_open_registry(false);
assert!(
Arc::ptr_eq(&registry, &crate::GLOBAL_OBJECT_STORE_REGISTRY),
"sharing-enabled path must hand back the GLOBAL_OBJECT_STORE_REGISTRY Arc"
);
}

/// Escape-hatch path must hand back a *fresh* registry per call so the
/// pre-PR isolation behavior is preserved when an operator opts out.
#[test]
fn select_default_open_registry_returns_fresh_when_disabled() {
let r1 = select_default_open_registry(true);
let r2 = select_default_open_registry(true);
assert!(
!Arc::ptr_eq(&r1, &crate::GLOBAL_OBJECT_STORE_REGISTRY),
"disabled path must NOT alias the global registry"
);
assert!(
!Arc::ptr_eq(&r1, &r2),
"disabled path must allocate a new registry per call"
);
}

/// `disable_default_registry_sharing()` must honor the in-process override
/// hook. This guards against future refactors that re-introduce a once-cell
/// that bypasses the AtomicU8 state.
#[test]
fn disable_flag_honors_test_override() {
// RAII guard restores prior state at end-of-scope so this test cannot
// leak SHARING_ENABLED/DISABLED state into sibling tests that rely on
// env-driven first-read resolution.
let _g = DisableSharingTestGuard::set(true);
assert!(disable_default_registry_sharing());

let _g2 = DisableSharingTestGuard::set(false);
assert!(!disable_default_registry_sharing());
}

/// Truthy spellings (case + whitespace insensitive) must parse to `Some(true)`.
/// These are the values that opt the operator OUT of registry sharing —
/// getting any of them wrong silently re-enables coalescing in a config
/// that asked for isolation, so accept-list correctness is load-bearing.
#[test]
fn parse_disable_value_accepts_truthy() {
for raw in [
"1", "true", "yes", "TRUE", "Yes", "True", " 1 ", "\ttrue\n",
] {
assert_eq!(
parse_disable_value(raw),
Some(true),
"expected {raw:?} to parse as Some(true)",
);
}
}

/// Documented falsy spellings — empty, whitespace-only, `0`/`false`/`no` —
/// must parse to `Some(false)`. These are explicit keep-default opts;
/// distinguishing them from unrecognized noise lets the caller skip the
/// warning log.
#[test]
fn parse_disable_value_accepts_falsy() {
for raw in ["", " ", "\t\n", "0", "false", "no", "FALSE", " No "] {
assert_eq!(
parse_disable_value(raw),
Some(false),
"expected {raw:?} to parse as Some(false)",
);
}
}

/// Anything outside the accept-list — typos, `on`, `y`, numeric noise —
/// must parse to `None` so [`disable_default_registry_sharing`] can warn
/// the operator that their escape hatch is being silently ignored.
#[test]
fn parse_disable_value_returns_none_for_unrecognized() {
for raw in ["off", "disable", "2", "1.0", "true!", "y", "on", "enable"] {
assert_eq!(
parse_disable_value(raw),
None,
"expected {raw:?} to parse as None (unrecognized)",
);
}
}

/// End-to-end pin for PR#1's two-piece contract at the JNI boundary:
/// `select_default_open_registry` must hand back a registry whose
/// `get_store` coalesces concurrent cold builds for the same URI.
/// A regression in single-flight (dropped from `get_store`) would
/// surface here as `provider.builds > 1` or callers observing
/// different `Arc<ObjectStore>` instances.
///
/// The complementary "sharing-on returns the process-wide registry"
/// invariant is covered by
/// `select_default_open_registry_reuses_global_when_sharing_enabled`,
/// so this test deliberately uses the sharing-disabled path to take a
/// fresh registry — registering a one-off provider on
/// `GLOBAL_OBJECT_STORE_REGISTRY` would leak that scheme's `Arc` for
/// the rest of the test binary's lifetime.
///
/// Provider counts invocations and sleeps before returning so concurrent
/// callers reliably queue on the build lock before the first build
/// completes — without the delay the winner returns before any contention
/// develops and the test would pass even if single-flight were broken.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn jni_default_open_coalesces_concurrent_cold_builds() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use lance_core::Result as LanceResult;
use lance_io::object_store::ObjectStore as LanceObjectStore;
use lance_io::object_store::providers::memory::MemoryStoreProvider;
use lance_io::object_store::{ObjectStoreParams, ObjectStoreProvider};
use url::Url;

#[derive(Debug, Default)]
struct CountingProvider {
builds: AtomicU64,
}

#[async_trait::async_trait]
impl ObjectStoreProvider for CountingProvider {
async fn new_store(
&self,
base_path: Url,
params: &ObjectStoreParams,
) -> LanceResult<LanceObjectStore> {
self.builds.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(20)).await;
MemoryStoreProvider.new_store(base_path, params).await
}
}

let provider = Arc::new(CountingProvider::default());
// Take a fresh registry via the sharing-disabled path so the
// test-only `pr1-jni-coalesce` provider never touches
// `GLOBAL_OBJECT_STORE_REGISTRY`.
let registry = select_default_open_registry(true);
registry.insert("pr1-jni-coalesce", provider.clone());

let url = Url::parse("pr1-jni-coalesce://x").unwrap();
let params = ObjectStoreParams::default();

let n = 8;
let mut handles = Vec::with_capacity(n);
for _ in 0..n {
let registry = registry.clone();
let url = url.clone();
let params = params.clone();
handles.push(tokio::spawn(async move {
registry.get_store(url, &params).await.unwrap()
}));
}
let mut stores = Vec::with_capacity(n);
for h in handles {
stores.push(h.await.expect("task must not panic"));
}

assert_eq!(
provider.builds.load(Ordering::Relaxed),
1,
"single-flight must collapse N concurrent cold misses into one provider call",
);
for store in &stores[1..] {
assert!(
Arc::ptr_eq(&stores[0], store),
"every coalesced waiter must observe the same Arc<ObjectStore>",
);
}
}
}
Loading
Loading