refactor: remove Java-side dataset cache, rely on Rust-side Session#353
Conversation
hamersaw
left a comment
There was a problem hiding this comment.
Do you think it would make sense to just remove the LanceDatasetCache and house fragment caching in the LanceRuntime? It feels like there's just multiple layers of cache here.
625df7c followed this approach |
|
I think overall this looks reasonable, but we need some relatively comprehensive benchmarks to support any performance implications. |
…ches-335 # Conflicts: # lance-spark-base_2.12/src/main/java/org/lance/spark/internal/LanceDatasetCache.java # lance-spark-base_2.12/src/main/java/org/lance/spark/read/LanceCountStarPartitionReader.java
Replace non-existent LanceNamespaceStorageOptionsProvider with LanceNamespace from getOrCreateNamespace(), matching the namespace API used by OpenDatasetBuilder. Add missing java.util.List import.
Adds FragmentLoadingBenchmarkTest to quantify the performance difference between getFragments() (old eager approach, O(N)) and getFragment(id) (new lazy approach, O(1)). Results on datasets with 10-1000 fragments show 10x-609x speedup for the lazy approach, confirming the motivation for PR lance-format#353. Tagged with @tag("benchmark") to exclude from normal test runs.
Add test.excludedGroups property (default: benchmark) to surefire config so @tag("benchmark") tests are excluded from normal mvn test runs. Override with -Dtest.excludedGroups= to include them. Run benchmarks with: mvn test -Dtest=FragmentLoadingBenchmarkTest \ -Dtest.excludedGroups= -Dgroups=benchmark
| * <p>Tagged with "benchmark" so it is excluded from normal test runs. | ||
| */ | ||
| @Tag("benchmark") | ||
| public class FragmentLoadingBenchmarkTest { |
There was a problem hiding this comment.
Is this microbenchmark sufficient to demonstrate the effect? We can run
mvn test -pl lance-spark-base_2.12 \
-Dtest=FragmentLoadingBenchmarkTest \
-Dtest.excludedGroups= \
-Dgroups=benchmark
to verify:
=== Fragment Loading Benchmark ===
Fragments | getFragments() (ms) | getFragment(id) (ms) | Speedup
----------------------------------------------------------------------
10 | 0.082 ms | 0.007 ms | 11.0x
50 | 0.329 ms | 0.008 ms | 43.2x
100 | 0.630 ms | 0.010 ms | 65.9x
500 | 2.929 ms | 0.008 ms | 380.9x
1000 | 6.064 ms | 0.008 ms | 760.4x
Notes:
- getFragments(): loads ALL fragment metadata (old eager approach)
- getFragment(id): loads ONE fragment by ID (new lazy approach)
- Each worker partition only needs one fragment, so the lazy approach avoids
loading metadata for all other fragments in the dataset.
The project does not integrate JMH yet, so this benchmark is relatively simple.
There was a problem hiding this comment.
If end-to-end impact is needed, we are running tests on a 1TB TPC-DS dataset, but this improvement likely won’t show a noticeable difference there.
There was a problem hiding this comment.
@summaryzb Do you have any test results about this ?
BatchScanExec.equals() compares batch objects via equals(), which delegates to LanceScan since it implements Batch. Without overriding equals/hashCode, Object identity is used, so two scans of the same table are never equal and Spark cannot reuse exchanges. Compare schema, readOptions, filters, limit, offset, topN, and aggregation. Exclude scanId (per-instance UUID for tracing only).
This reverts commit a34190c.
|
Ok, so diving a little deeper. The rust-side implementation has it's own caches; for metadata (LANCE_METADATA_CACHE_SIZE) and indexes (LANCE_INDEX_CACHE_SIZE). By caching the actual dataset handle, we're really only saving time in deserializing the dataset manifest bytes, which should be measureable in microseconds (or milliseconds at worst). I'm wondering if we should just remove the notion of Spark-side caches for everything (dataset + fragment) and rely on the rust-side Lance cache. This is basically what the |
|
That makes sense. Since the Session already caches metadata and index blocks on the Rust side, the Java-side
I'll also enhance Do you think this is reasonable? |
I'm not following exactly where these are going to be inserted. This gets a little bit tricky because these a global session configuration options and may not be reasonably applied to a single dataset? Maybe it's worth hacking together a proposal and we can iterate? |
|
@hamersaw I have refactored the code, removed |
| readOptions, | ||
| fragmentId, | ||
| dataset = | ||
| LanceRuntime.openDataset( |
There was a problem hiding this comment.
Let me check if we can use the utility methods from Utils here.
| inputPartition.getInitialStorageOptions(), | ||
| inputPartition.getNamespaceImpl(), | ||
| inputPartition.getNamespaceProperties()); | ||
| dataset = |
There was a problem hiding this comment.
@hamersaw On your earlier concern about blockSize/indexCacheSize/metadataCacheSize: this pr doesn't add new API for them now, but switching to Utils.openDatasetBuilder does mean the fragment-scan path now passes these through (previously they were dropped by LanceDatasetCache). This matches what LanceCountStarPartitionReader and ~27 other call sites already do. WDYT?
The refactor to remove LanceDatasetCache dropped namespace reconstruction on executors. Vended credentials (STS tokens) need the namespace client to refresh during long-running scans. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
hamersaw
left a comment
There was a problem hiding this comment.
Thanks, I think this looks great! I added the namespace build back in so that we can ensure credential refresh on long-running operations.
|
Thank you @hamersaw |
Summary
LanceDatasetCache; the Rust-sideSessionalready caches metadata and index blocks, so a Java-sideDatasetcache only saves manifest deserialization (microseconds).dataset.getFragment(id)per partition, instead of eagerly buildingMap<Integer, Fragment>fromdataset.getFragments()on every cache miss — O(N) in total fragment count, even though each partition reads one fragment.LanceFragmentScanneropens, owns, and closes its ownDatasetthrough a newLanceRuntime.openDataset()helper.Changes
LanceDatasetCache.javaLanceRuntime.javaopenDataset()— wires the catalogSession, reconstructs the namespace from the(namespaceImpl, namespaceProperties)strings carried inLanceInputPartition, merges storage options, and pins the version.LanceFragmentScanner.javacreate()opens its ownDatasetand loads only the target fragment;close()closes both scanner and dataset, usingThrowable.addSuppressedso the first failure isn't masked by cleanup.Before / after
The per-catalog Rust
Session(configured viaLANCE_INDEX_CACHE_SIZE/LANCE_METADATA_CACHE_SIZE) and the global ArrowBufferAllocatorare unchanged.