feat: add distributed build support for ZoneMap index#6840
Conversation
Jay-ju
commented
May 19, 2026
- Add distributed build path: each worker writes a per-fragment part__zonemap.lance file, later merged into zonemap.lance
- Add merge_index_files() for concatenating part files with progress tracking and cleanup
- Wire ZoneMap into Dataset::merge_index_metadata dispatch
- Register ZONEMAP in Python SupportedDistributedIndices enum
- Add write_index_to() to ZoneMapIndexBuilder for custom file names
- Add integration test test_distributed_build_zonemap
- Fix part file sorting to use numeric worker_id order instead of lexicographic order (prevents wrong merge order with 10+ fragments)
- Add distributed build path: each worker writes a per-fragment part_<id>_zonemap.lance file, later merged into zonemap.lance - Add merge_index_files() for concatenating part files with progress tracking and cleanup - Wire ZoneMap into Dataset::merge_index_metadata dispatch - Register ZONEMAP in Python SupportedDistributedIndices enum - Add write_index_to() to ZoneMapIndexBuilder for custom file names - Add integration test test_distributed_build_zonemap - Fix part file sorting to use numeric worker_id order instead of lexicographic order (prevents wrong merge order with 10+ fragments)
When using commitExistingIndexSegments (non-merge path), each segment writes a part_<id>_zonemap.lance file instead of zonemap.lance. The load method now falls back to looking for a single part file when zonemap.lance is not found, matching the pattern used by BTree index.
The Java JNI nativeGetZonemapStats directly opens zonemap.lance, which doesn't exist when using commitExistingIndexSegments path. Added the same fallback pattern: when zonemap.lance is not found, look for a single part_<id>_zonemap.lance file instead.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
|
@claude review |
| let first_reader = store.open_index_file(&part_files[0]).await?; | ||
| let first_num_rows = first_reader.num_rows(); | ||
| let first_batch = first_reader.read_range(0..first_num_rows, None).await?; | ||
| let merged_schema = first_batch.schema(); | ||
| let mut writer = store | ||
| .new_index_file(ZONEMAP_FILENAME, merged_schema) | ||
| .await?; | ||
| if first_num_rows > 0 { | ||
| writer.write_record_batch(first_batch).await?; |
There was a problem hiding this comment.
🔴 The merge_index_files writes the combined zonemap.lance using first_batch.schema() (the RecordBatch schema), which is built from the StructArray's fields and carries no schema-level metadata. ZONEMAP_SIZE_META_KEY (rows_per_zone) — present on the per-worker part files' schema but not on the batches — is silently dropped, so on subsequent load() the unwrap_or(ROWS_PER_ZONE_DEFAULT) substitutes 8192 even when the user configured a non-default value, causing wrong zone reconstruction. Fix by deriving merged_schema from first_reader.schema() (a lance Schema, whose ArrowSchema conversion at lance-core/src/datatypes/schema.rs:830 preserves metadata) or by explicitly copying ZONEMAP_SIZE_META_KEY into the arrow schema metadata before calling new_index_file.
Extended reasoning...
What is broken
merge_index_files (zonemap.rs:1117-1125) builds the merged zonemap.lance like this:
let first_reader = store.open_index_file(&part_files[0]).await?;
let first_num_rows = first_reader.num_rows();
let first_batch = first_reader.read_range(0..first_num_rows, None).await?;
let merged_schema = first_batch.schema(); // <-- RecordBatch schema, no file metadata
let mut writer = store
.new_index_file(ZONEMAP_FILENAME, merged_schema)
.await?;The comment on line 1114 claims this "recover[s] the arrow schema (including the ZONEMAP_SIZE_META_KEY metadata)". That comment is wrong: first_batch.schema() is an arrow Schema derived from the decoded StructArray, not the file's schema-level metadata.
Why the metadata is missing from the batch
In lance-encoding/src/decoder.rs:2700-2705, the batch is constructed as:
fn into_batch(self, ...) -> Result<(RecordBatch, u64)> {
let (struct_arr, data_size) = self.task.decode()...;
let batch = RecordBatch::from(struct_arr.as_struct());
...
}RecordBatch::from(struct_arr.as_struct()) derives the batch schema purely from the StructArray's fields/DataType, with empty metadata. The file's schema-level metadata lives only on the lance Schema exposed by IndexReader::schema(), not on the batch.
The authors knew this: in ZoneMapIndex::load (zonemap.rs:413-419) the existing code reads rows_per_zone from index_file.schema() — not from zone_maps.schema() (the batch) — precisely because the batch schema lacks the metadata. That distinction would be pointless if the batch carried it.
Likewise, ZoneMapIndexBuilder::write_index_to (zonemap.rs:763-774) clones the batch schema, inserts ZONEMAP_SIZE_META_KEY into the clone, and passes that as the file schema to new_index_file — the batch itself is never modified.
Step-by-step proof
- User configures
rows_per_zone = 4096and runs a distributed build over fragments F0, F1. - Worker for F0 writes
part_0_zonemap.lanceviawrite_index_to. Its lanceSchema.metadatacontains{"rows_per_zone": "4096"}. Same forpart_1_zonemap.lance. merge_index_filesopenspart_0_zonemap.lanceand callsread_range. The returned RecordBatch hasschema().metadata = {}(empty — see decoder.rs:2705).merged_schema = first_batch.schema()→ an arrow Schema with empty metadata.new_index_file(ZONEMAP_FILENAME, merged_schema)writes the merged zonemap.lance with norows_per_zonein its schema metadata.- On reopen,
ZoneMapIndex::loadcallsfile_schema.metadata.get(ZONEMAP_SIZE_META_KEY)→None→.unwrap_or(ROWS_PER_ZONE_DEFAULT)substitutes 8192 instead of the user's 4096. try_from_serializedreconstructs zone offsets/ranges using rows_per_zone=8192, producing zone row-address windows that are 2x too large. Updates, rebuilds, and statistics that consultself.rows_per_zoneall observe the wrong value.
Impact
Silent data corruption for distributed builds with non-default rows_per_zone. The single-machine path is unaffected because it never merges. The new PR test test_distributed_build_zonemap uses the default 8192, so the regression slips past CI. Search itself uses stored ZoneBound (fragment_id/start/length), so pruning of existing zones still works, but the configured zone size is silently lost — meaning index stats, the update path (which reads self.rows_per_zone), and any tooling assuming the configured zone size now observe stale values.
Fix
Either:
- Derive
merged_schemafromfirst_reader.schema()(a lanceSchema) and convert to arrow viaArrowSchema::from(&schema)(lance-core/src/datatypes/schema.rs:830 — that impl copiesmetadata), or - Clone the batch schema and explicitly insert
ZONEMAP_SIZE_META_KEYinto the arrowmetadatafromfirst_reader.schema().metadatabefore callingnew_index_file.
Adding a regression test that builds the distributed index with a non-default rows_per_zone and asserts the loaded value would catch this.
| let file_name = match fragment_ids.as_deref() { | ||
| // Distributed build: each worker writes a per-worker file. We use the | ||
| // smallest fragment id in the worker's assignment to name the file. | ||
| // ZoneTrainer already partitions zones strictly by fragment_id, so | ||
| // worker files never share zones across the same fragment_id and the | ||
| // final merge step is a pure concatenation. | ||
| Some(ids) if !ids.is_empty() => { | ||
| let worker_id = ids.iter().copied().min().expect("non-empty fragment_ids"); | ||
| zonemap_part_file_name(worker_id) | ||
| } | ||
| _ => ZONEMAP_FILENAME.to_string(), | ||
| }; | ||
| Self::train_zonemap_index_to(data, index_store, Some(request.params), &file_name).await?; |
There was a problem hiding this comment.
🟡 Nit / hardening suggestion. In train_index, the per-worker output file name is derived from worker_id = ids.iter().min() and used directly as part_<min>_zonemap.lance. If two workers are ever launched with assignments whose minimum fragment_id collides (overlapping ranges, retries with stale assignments, an orchestration bug), both write to the same file and the second silently overwrites the first; the merge step then concatenates whatever survived with no error and produces an under-indexed zonemap. Worth tightening: either require an explicit shard_id like bitmap_shard_partition_id already does for multi-fragment workers (see rust/lance-index/src/scalar/bitmap.rs:745-770), include all fragment ids in the file name, or fail fast if the target part file already exists.
Extended reasoning...
What the bug is
In rust/lance-index/src/scalar/zonemap.rs:976-988 (the distributed build branch of train_index), the file name for a worker is derived from the smallest fragment_id in the worker's assignment:
let worker_id = ids.iter().copied().min().expect("non-empty fragment_ids");
zonemap_part_file_name(worker_id)The accompanying comment claims ZoneTrainer already partitions zones strictly by fragment_id, so worker files never share zones across the same fragment_id. That guarantee is about zone content uniqueness within one builder, not about file-name uniqueness across builders. Two builders running on assignments that happen to share their minimum fragment_id will compute the same file name.
Step-by-step proof
- Caller dispatches two distributed workers with overlapping fragment lists:
- Worker A:
fragments = [3, 7] - Worker B:
fragments = [3, 9](e.g. a retry of a previously-assigned fragment, or a planner bug producing overlapping shards)
- Worker A:
train_indexruns for Worker A:worker_id = min(3,7) = 3→ openspart_3_zonemap.lancefor write viaindex_store.new_index_file(...), writes zones for fragments 3 and 7, finishes.train_indexruns for Worker B:worker_id = min(3,9) = 3→ openspart_3_zonemap.lanceagain.IndexStore::new_index_filedoes not error on overwrite (it just creates a fresh writer), so Worker A's file is silently replaced with Worker B's output (zones for fragments 3 and 9).merge_index_fileslater globspart_*_zonemap.lance, finds a singlepart_3_zonemap.lanceplus the other workers' files, and concatenates them. The zones originally written by Worker A for fragment 7 are gone; no error is reported.- Subsequent
SargableQuery::Rangecalls on values that previously lived in fragment 7's zones will miss the zonemap entry and either prune incorrectly or, more likely, give anAtMostresult that's wider than necessary — silent under-indexing rather than a crash.
Why existing code doesn't prevent it
- There is no precondition or assertion in
ZoneMapIndexPlugin::train_indexthat the worker'sfragment_idsare disjoint from any other worker's. new_index_file(...)on the IndexStore happily creates/overwrites files.- The merge step (
list_zonemap_part_files+merge_index_files) only requires non-empty results and a stable sort — it has no way to detect that a part file was overwritten between writes. - The new test (
test_distributed_build_zonemap) drives one fragment per worker, so this corner case is uncovered.
Why the refutation is partially right but not decisive
A refutation noted that btree.rs derives partition_id from fragment_ids.first() in a similar way, treating disjoint assignments as a documented orchestrator contract. That's true, but the bitmap pattern is the more recent and more defensive one: bitmap_shard_partition_id (rust/lance-index/src/scalar/bitmap.rs:745-770) explicitly rejects multi-fragment workers without an explicit shard_id with the error "Bitmap distributed build over multiple fragments requires an explicit shard_id ... Please assign mutually exclusive shard_id values to disjoint fragment groups." The codebase is clearly evolving toward explicit shard ids as the safe pattern; zonemap landing without that guard is inconsistent with the most recent precedent.
The refutation's point about retries being idempotent only holds when the retry assigns exactly the same fragment set — overlap of one fragment with a different other fragment (the example above) is not idempotent and will lose data.
Impact
Silent under-indexing of the zonemap. Pruning still returns correct answers (the zonemap can only prune; rows are then re-checked), so the bug does not produce wrong query results — it produces lower selectivity and slower scans. That said, "silent corruption of the index for a misconfigured orchestrator" is the exact failure mode the bitmap path now defends against, and zonemap should match.
Suggested fix
Three reasonable options, roughly in order of preference:
- Follow bitmap: take an explicit
shard_idfrom params for multi-fragment workers, and only fall back tofragment_ids[0]when there is exactly one fragment. - Use a hash or sorted concatenation of all
fragment_idsin the file name so duplicate writes are impossible by construction. - Detect existing file via
IndexStorelisting before write and fail fast with a clear error.
Severity tagged as nit because triggering requires an orchestrator error (callers are supposed to pass disjoint fragments) and the happy path is fine, but the inconsistency with the bitmap defense pattern makes this worth flagging now while distributed zonemap is still new and untested at scale.
| .await | ||
| .unwrap(); | ||
| match result { | ||
| SearchResult::AtMost(mask) | SearchResult::Exact(mask) => { |
There was a problem hiding this comment.
🟡 Test comment in test_distributed_build_zonemap is incorrect: it claims range [150, 250] overlaps both fragment 1 (100..115) and fragment 2 (200..215), but fragment 1's values are entirely below 150 — only fragment 2 overlaps the range. The test still passes because the assertions (kept < total and kept > 0) hold even when only fragment 2's zone is kept, but the comment is misleading for future maintainers. Suggested fix: drop the fragment 1 reference, or pick a range like [110, 210] that genuinely overlaps two fragments.
Extended reasoning...
What's wrong
In rust/lance/src/index/create.rs at lines 1665-1668, the new test_distributed_build_zonemap test exercises zone pruning with a range query and contains this comment:
// Open the merged zonemap and exercise pruning. Range [150, 250] only
// overlaps fragment 1 (100..115) and fragment 2 (200..215), so the
// returned mask should be a strict subset of the dataset.
This statement is factually wrong about fragment 1.
Step-by-step proof
The test constructs four fragments via:
let base = fragment_id * 100;
let values: Vec<i32> = (0..16).map(|i| base + i).collect();So each fragment has values:
- fragment 0:
base=0, values0..=15(max = 15) - fragment 1:
base=100, values100..=115(max = 115) - fragment 2:
base=200, values200..=215(min = 200, max = 215) - fragment 3:
base=300, values300..=315(min = 300)
The query is:
SargableQuery::Range(
Bound::Included(ScalarValue::Int32(Some(150))),
Bound::Included(ScalarValue::Int32(Some(250))),
)That is the closed range [150, 250]. Checking overlap against each fragment's value range:
- fragment 1:
100..=115. Max value 115 < 150 — does not overlap. - fragment 2:
200..=215. Lies entirely within[150, 250]— overlaps. - fragment 0 and fragment 3 obviously don't overlap.
So only fragment 2 overlaps [150, 250]. The comment's claim that fragment 1 (100..115) also overlaps is incorrect.
Why the test still passes
The assertions are:
assert!(kept < total_rows, ...);
assert!(kept > 0, ...);With total_rows = 64 and only fragment 2's 16 rows surviving, kept = 16 satisfies both 16 < 64 and 16 > 0. So the test still validates that some pruning happens and some zones survive — but the comment misrepresents which zones survive.
Why this matters
A future maintainer debugging zone-pruning behavior who reads this comment and runs the test under a debugger will see only fragment 2's zones in the surviving mask and assume a regression has occurred — when in fact the comment was wrong all along. This is the classic "misleading test comment causes false-positive bug investigation" failure mode.
Suggested fix
Either:
- Update the comment to say "Range [150, 250] only overlaps fragment 2 (200..215)" — minimal change, matches current behavior.
- Change the query range to
[110, 210](or[115, 215]) so the comment becomes accurate and the test exercises pruning that keeps two zones instead of one. This is a slightly stronger test of the zone-pruning math.
Either fix is purely a test-quality / documentation correctness improvement; no production code is affected.