feat(java): support distributed splits planning#6328
feat(java): support distributed splits planning#6328summaryzb wants to merge 2 commits intolance-format:mainfrom
Conversation
|
I think this is an alternative proposal to #5863 right? While I think this approach is a little easier API-wise, I think there is a clear disadvantage in that it does not support user-configurable split sizes. The prior work attempted to combine multiple fragments into a split, or split large fragments into multiple splits, to ensure distributed execution partitions would be similar sized. As an aside from split definitions, we chose not to merge #5863 because in the Spark model, this actually resulted in slower queries. For may workloads, isolating index lookups / planning to the Spark Master resulted in memory and CPU contention. I think that without clear benchmarked performance improvements it doesn't make sense to merge something like this. |
Agreed, i'll work on the benchmark for comparison |
Summary
This PR adds distributed split planning for Lance's filtered read execution, enabling a plan/execute separation pattern where a coordinator node plans a scan and worker nodes execute per-fragment portions of it. The implementation spans the Rust core (scanner, filtered read exec, protobuf serialization) and Java bindings, with a new
FilteredReadJava class that exposes the full workflow.Problem
Lance's
FilteredReadExecsupports serializing an entire scan plan to protobuf for remote execution, but there was no mechanism to split a multi-fragment plan into per-fragment tasks that could be distributed to individual workers. Distributed engines like Spark need to: (1) plan a scan on the coordinator, (2) split the plan into independent per-fragment tasks, (3) serialize each task and ship it to a worker, and (4) execute each task independently. The existing code could serialize and deserialize a full plan, but lacked the splitting, metadata extraction, and end-to-end orchestration API needed for this workflow.Approach
The implementation follows a three-layer design:
Rust core -- Scanner entry point (
scanner.rs): A newScanner::plan_filtered_read()method constructs aFilteredReadExecfrom the current scanner settings (filter, projection, fragments, batch size, etc.), triggers internal planning viaensure_plan_initialized()to compute theRowAddrTreeMap(which fragments/rows to read), and serializes the result to protobuf bytes. This mirrors the logic increate_plan/new_filtered_readbut exposes the result as an opaque serializable blob rather than executing it.Rust core -- Proto splitting and execution (
filtered_read_proto.rs): Three new public functions:split_plan_proto()decodes a fullFilteredReadExecProto, iterates over theRowAddrTreeMapto extract per-fragment entries, and re-serializes each as a standalone proto with the same table identifier and options but a single-fragment plan. The globalscan_range_after_filteris intentionally dropped from per-fragment protos since it can only be applied after aggregating results across all workers.extract_plan_metadata()provides a lightweight summary (fragment IDs and row counts per fragment, where -1 means "full fragment") without needing a dataset handle, enabling coordinators to estimate task sizes for load balancing.split_and_inspect_plan_proto()combines both operations in a single decode pass to avoid redundant deserialization.execute_filtered_read_from_bytes()is the worker-side counterpart: it decodes a proto, reconstructs aFilteredReadExec(optionally reusing an existing dataset handle), and returns aSendableRecordBatchStream.Rust core -- FilteredReadExec (
filtered_read.rs): A newensure_plan_initialized()method triggers internal plan computation without converting to the externalFilteredReadPlanformat, caching the result for subsequent serialization.Java bindings: A new
FilteredReadclass implementsSerializableand provides the full distributed workflow API:planFilteredRead(scanner)calls through JNI toScanner::plan_filtered_read()andsplit_and_inspect_plan_proto(), returning aFilteredReadobject containing the full proto, per-fragment split protos, fragment IDs, and rows-per-fragment metadata.getTasks()returns the per-fragment task protos for distribution.executeFilteredRead(dataset, taskProto, allocator)executes a single task on a worker node.transient-- they are not included in Java serialization; the receiver reconstructs them by callingsplit_and_inspect_plan_protoon the deserialized full proto.Per-fragment filter deduplication from the original plan is preserved:
split_plan_protolooks up each fragment's filter expression ID in the sharedfilter_expressionsarray and copies only the relevant entry into the per-fragment proto.Changes
Rust --
rust/lance/src/dataset/scanner.rs:Scanner::plan_filtered_read()method (gated onsubstraitfeature) that builds aFilteredReadExec, triggers planning, and serializes to protobuf bytes.Rust --
rust/lance/src/io/exec/filtered_read.rs:FilteredReadExec::ensure_plan_initialized()public method to trigger and cache plan computation without external conversion.Rust --
rust/lance/src/io/exec/filtered_read_proto.rs:FilteredReadPlanMetadatastruct andextract_plan_metadata()function for lightweight plan inspection.execute_filtered_read_from_bytes()for worker-side deserialization and execution.split_plan_proto()for per-fragment plan splitting.SplitPlanResultstruct andsplit_and_inspect_plan_proto()for combined split + metadata extraction.Proto --
protos/filtered_read.proto:FilteredReadPlanProto.row_addr_tree_mapfield.Java JNI --
java/lance-jni/src/blocking_scanner.rs:nativeCreatePlanandnativeExecuteFilteredReadJNI entry points.Java --
java/src/main/java/org/lance/ipc/FilteredRead.java:FilteredReadclass implementingSerializablewith full distributed workflow API.Java --
java/src/test/java/org/lance/FilteredReadTest.java:Test Coverage
test_extract_plan_metadata: Verifies fragment IDs and row counts are correctly extracted from a serialized plan (2 fragments, 50 rows each).test_split_plan_proto: Splits a 2-fragment plan and verifies each split executes independently, with total rows matching direct execution.test_split_plan_proto_with_filter: Same as above but with a filter expression (x > 10), verifying per-fragment filters are correctly preserved in splits.test_plan_serialize_execute_roundtrip: End-to-end: build exec with filter, plan, serialize to bytes, deserialize viaexecute_filtered_read_from_bytes, and verify identical output.testBasicPlanAndExecute: Plans a 2-fragment dataset, splits into tasks, executes each, and verifies total row count (50 rows).testPlanMetadata: Verifies fragment count, fragment IDs, and rows-per-fragment arrays for a 3-fragment dataset.testDistributedSplitAndExecute: Simulates coordinator/worker pattern with 3 fragments, compares distributed execution total with direct scan total (60 rows).testPlanWithFilter: Plans withid > 10filter on a 40-row fragment, verifies 29 rows returned after distributed execution.testSerializableRoundtrip: Serializes a task proto via JavaObjectOutputStream, deserializes, executes both, and verifies identical row counts.