Skip to content

feat: add distributed build support for ZoneMap index#6840

Open
Jay-ju wants to merge 5 commits into
lance-format:mainfrom
Jay-ju:feat/zonemap-distributed-build
Open

feat: add distributed build support for ZoneMap index#6840
Jay-ju wants to merge 5 commits into
lance-format:mainfrom
Jay-ju:feat/zonemap-distributed-build

Conversation

@Jay-ju
Copy link
Copy Markdown
Contributor

@Jay-ju Jay-ju commented May 19, 2026

  • Add distributed build path: each worker writes a per-fragment part__zonemap.lance file, later merged into zonemap.lance
  • Add merge_index_files() for concatenating part files with progress tracking and cleanup
  • Wire ZoneMap into Dataset::merge_index_metadata dispatch
  • Register ZONEMAP in Python SupportedDistributedIndices enum
  • Add write_index_to() to ZoneMapIndexBuilder for custom file names
  • Add integration test test_distributed_build_zonemap
  • Fix part file sorting to use numeric worker_id order instead of lexicographic order (prevents wrong merge order with 10+ fragments)

- Add distributed build path: each worker writes a per-fragment
  part_<id>_zonemap.lance file, later merged into zonemap.lance
- Add merge_index_files() for concatenating part files with progress
  tracking and cleanup
- Wire ZoneMap into Dataset::merge_index_metadata dispatch
- Register ZONEMAP in Python SupportedDistributedIndices enum
- Add write_index_to() to ZoneMapIndexBuilder for custom file names
- Add integration test test_distributed_build_zonemap
- Fix part file sorting to use numeric worker_id order instead of
  lexicographic order (prevents wrong merge order with 10+ fragments)
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude Code Review

This pull request is from a fork — automated review is disabled. A repository maintainer can comment @claude review to run a one-time review.

@github-actions github-actions Bot added enhancement New feature or request python labels May 19, 2026
Jay-ju added 3 commits May 19, 2026 20:36
When using commitExistingIndexSegments (non-merge path), each segment
writes a part_<id>_zonemap.lance file instead of zonemap.lance. The
load method now falls back to looking for a single part file when
zonemap.lance is not found, matching the pattern used by BTree index.
The Java JNI nativeGetZonemapStats directly opens zonemap.lance,
which doesn't exist when using commitExistingIndexSegments path.
Added the same fallback pattern: when zonemap.lance is not found,
look for a single part_<id>_zonemap.lance file instead.
@github-actions github-actions Bot added the java label May 19, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 19, 2026

Codecov Report

❌ Patch coverage is 84.47653% with 43 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
rust/lance-index/src/scalar/zonemap.rs 71.32% 16 Missing and 25 partials ⚠️
rust/lance/src/index/create.rs 98.42% 0 Missing and 2 partials ⚠️

📢 Thoughts on this report? Let us know!

@yanghua
Copy link
Copy Markdown
Collaborator

yanghua commented May 20, 2026

@claude review

Comment on lines +1117 to +1125
let first_reader = store.open_index_file(&part_files[0]).await?;
let first_num_rows = first_reader.num_rows();
let first_batch = first_reader.read_range(0..first_num_rows, None).await?;
let merged_schema = first_batch.schema();
let mut writer = store
.new_index_file(ZONEMAP_FILENAME, merged_schema)
.await?;
if first_num_rows > 0 {
writer.write_record_batch(first_batch).await?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The merge_index_files writes the combined zonemap.lance using first_batch.schema() (the RecordBatch schema), which is built from the StructArray's fields and carries no schema-level metadata. ZONEMAP_SIZE_META_KEY (rows_per_zone) — present on the per-worker part files' schema but not on the batches — is silently dropped, so on subsequent load() the unwrap_or(ROWS_PER_ZONE_DEFAULT) substitutes 8192 even when the user configured a non-default value, causing wrong zone reconstruction. Fix by deriving merged_schema from first_reader.schema() (a lance Schema, whose ArrowSchema conversion at lance-core/src/datatypes/schema.rs:830 preserves metadata) or by explicitly copying ZONEMAP_SIZE_META_KEY into the arrow schema metadata before calling new_index_file.

Extended reasoning...

What is broken

merge_index_files (zonemap.rs:1117-1125) builds the merged zonemap.lance like this:

let first_reader = store.open_index_file(&part_files[0]).await?;
let first_num_rows = first_reader.num_rows();
let first_batch = first_reader.read_range(0..first_num_rows, None).await?;
let merged_schema = first_batch.schema();          // <-- RecordBatch schema, no file metadata
let mut writer = store
    .new_index_file(ZONEMAP_FILENAME, merged_schema)
    .await?;

The comment on line 1114 claims this "recover[s] the arrow schema (including the ZONEMAP_SIZE_META_KEY metadata)". That comment is wrong: first_batch.schema() is an arrow Schema derived from the decoded StructArray, not the file's schema-level metadata.

Why the metadata is missing from the batch

In lance-encoding/src/decoder.rs:2700-2705, the batch is constructed as:

fn into_batch(self, ...) -> Result<(RecordBatch, u64)> {
    let (struct_arr, data_size) = self.task.decode()...;
    let batch = RecordBatch::from(struct_arr.as_struct());
    ...
}

RecordBatch::from(struct_arr.as_struct()) derives the batch schema purely from the StructArray's fields/DataType, with empty metadata. The file's schema-level metadata lives only on the lance Schema exposed by IndexReader::schema(), not on the batch.

The authors knew this: in ZoneMapIndex::load (zonemap.rs:413-419) the existing code reads rows_per_zone from index_file.schema()not from zone_maps.schema() (the batch) — precisely because the batch schema lacks the metadata. That distinction would be pointless if the batch carried it.

Likewise, ZoneMapIndexBuilder::write_index_to (zonemap.rs:763-774) clones the batch schema, inserts ZONEMAP_SIZE_META_KEY into the clone, and passes that as the file schema to new_index_file — the batch itself is never modified.

Step-by-step proof

  1. User configures rows_per_zone = 4096 and runs a distributed build over fragments F0, F1.
  2. Worker for F0 writes part_0_zonemap.lance via write_index_to. Its lance Schema.metadata contains {"rows_per_zone": "4096"}. Same for part_1_zonemap.lance.
  3. merge_index_files opens part_0_zonemap.lance and calls read_range. The returned RecordBatch has schema().metadata = {} (empty — see decoder.rs:2705).
  4. merged_schema = first_batch.schema() → an arrow Schema with empty metadata.
  5. new_index_file(ZONEMAP_FILENAME, merged_schema) writes the merged zonemap.lance with no rows_per_zone in its schema metadata.
  6. On reopen, ZoneMapIndex::load calls file_schema.metadata.get(ZONEMAP_SIZE_META_KEY)None.unwrap_or(ROWS_PER_ZONE_DEFAULT) substitutes 8192 instead of the user's 4096.
  7. try_from_serialized reconstructs zone offsets/ranges using rows_per_zone=8192, producing zone row-address windows that are 2x too large. Updates, rebuilds, and statistics that consult self.rows_per_zone all observe the wrong value.

Impact

Silent data corruption for distributed builds with non-default rows_per_zone. The single-machine path is unaffected because it never merges. The new PR test test_distributed_build_zonemap uses the default 8192, so the regression slips past CI. Search itself uses stored ZoneBound (fragment_id/start/length), so pruning of existing zones still works, but the configured zone size is silently lost — meaning index stats, the update path (which reads self.rows_per_zone), and any tooling assuming the configured zone size now observe stale values.

Fix

Either:

  • Derive merged_schema from first_reader.schema() (a lance Schema) and convert to arrow via ArrowSchema::from(&schema) (lance-core/src/datatypes/schema.rs:830 — that impl copies metadata), or
  • Clone the batch schema and explicitly insert ZONEMAP_SIZE_META_KEY into the arrow metadata from first_reader.schema().metadata before calling new_index_file.

Adding a regression test that builds the distributed index with a non-default rows_per_zone and asserts the loaded value would catch this.

Comment on lines +976 to +988
let file_name = match fragment_ids.as_deref() {
// Distributed build: each worker writes a per-worker file. We use the
// smallest fragment id in the worker's assignment to name the file.
// ZoneTrainer already partitions zones strictly by fragment_id, so
// worker files never share zones across the same fragment_id and the
// final merge step is a pure concatenation.
Some(ids) if !ids.is_empty() => {
let worker_id = ids.iter().copied().min().expect("non-empty fragment_ids");
zonemap_part_file_name(worker_id)
}
_ => ZONEMAP_FILENAME.to_string(),
};
Self::train_zonemap_index_to(data, index_store, Some(request.params), &file_name).await?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit / hardening suggestion. In train_index, the per-worker output file name is derived from worker_id = ids.iter().min() and used directly as part_<min>_zonemap.lance. If two workers are ever launched with assignments whose minimum fragment_id collides (overlapping ranges, retries with stale assignments, an orchestration bug), both write to the same file and the second silently overwrites the first; the merge step then concatenates whatever survived with no error and produces an under-indexed zonemap. Worth tightening: either require an explicit shard_id like bitmap_shard_partition_id already does for multi-fragment workers (see rust/lance-index/src/scalar/bitmap.rs:745-770), include all fragment ids in the file name, or fail fast if the target part file already exists.

Extended reasoning...

What the bug is

In rust/lance-index/src/scalar/zonemap.rs:976-988 (the distributed build branch of train_index), the file name for a worker is derived from the smallest fragment_id in the worker's assignment:

let worker_id = ids.iter().copied().min().expect("non-empty fragment_ids");
zonemap_part_file_name(worker_id)

The accompanying comment claims ZoneTrainer already partitions zones strictly by fragment_id, so worker files never share zones across the same fragment_id. That guarantee is about zone content uniqueness within one builder, not about file-name uniqueness across builders. Two builders running on assignments that happen to share their minimum fragment_id will compute the same file name.

Step-by-step proof

  1. Caller dispatches two distributed workers with overlapping fragment lists:
    • Worker A: fragments = [3, 7]
    • Worker B: fragments = [3, 9] (e.g. a retry of a previously-assigned fragment, or a planner bug producing overlapping shards)
  2. train_index runs for Worker A: worker_id = min(3,7) = 3 → opens part_3_zonemap.lance for write via index_store.new_index_file(...), writes zones for fragments 3 and 7, finishes.
  3. train_index runs for Worker B: worker_id = min(3,9) = 3 → opens part_3_zonemap.lance again. IndexStore::new_index_file does not error on overwrite (it just creates a fresh writer), so Worker A's file is silently replaced with Worker B's output (zones for fragments 3 and 9).
  4. merge_index_files later globs part_*_zonemap.lance, finds a single part_3_zonemap.lance plus the other workers' files, and concatenates them. The zones originally written by Worker A for fragment 7 are gone; no error is reported.
  5. Subsequent SargableQuery::Range calls on values that previously lived in fragment 7's zones will miss the zonemap entry and either prune incorrectly or, more likely, give an AtMost result that's wider than necessary — silent under-indexing rather than a crash.

Why existing code doesn't prevent it

  • There is no precondition or assertion in ZoneMapIndexPlugin::train_index that the worker's fragment_ids are disjoint from any other worker's.
  • new_index_file(...) on the IndexStore happily creates/overwrites files.
  • The merge step (list_zonemap_part_files + merge_index_files) only requires non-empty results and a stable sort — it has no way to detect that a part file was overwritten between writes.
  • The new test (test_distributed_build_zonemap) drives one fragment per worker, so this corner case is uncovered.

Why the refutation is partially right but not decisive

A refutation noted that btree.rs derives partition_id from fragment_ids.first() in a similar way, treating disjoint assignments as a documented orchestrator contract. That's true, but the bitmap pattern is the more recent and more defensive one: bitmap_shard_partition_id (rust/lance-index/src/scalar/bitmap.rs:745-770) explicitly rejects multi-fragment workers without an explicit shard_id with the error "Bitmap distributed build over multiple fragments requires an explicit shard_id ... Please assign mutually exclusive shard_id values to disjoint fragment groups." The codebase is clearly evolving toward explicit shard ids as the safe pattern; zonemap landing without that guard is inconsistent with the most recent precedent.

The refutation's point about retries being idempotent only holds when the retry assigns exactly the same fragment set — overlap of one fragment with a different other fragment (the example above) is not idempotent and will lose data.

Impact

Silent under-indexing of the zonemap. Pruning still returns correct answers (the zonemap can only prune; rows are then re-checked), so the bug does not produce wrong query results — it produces lower selectivity and slower scans. That said, "silent corruption of the index for a misconfigured orchestrator" is the exact failure mode the bitmap path now defends against, and zonemap should match.

Suggested fix

Three reasonable options, roughly in order of preference:

  1. Follow bitmap: take an explicit shard_id from params for multi-fragment workers, and only fall back to fragment_ids[0] when there is exactly one fragment.
  2. Use a hash or sorted concatenation of all fragment_ids in the file name so duplicate writes are impossible by construction.
  3. Detect existing file via IndexStore listing before write and fail fast with a clear error.

Severity tagged as nit because triggering requires an orchestrator error (callers are supposed to pass disjoint fragments) and the happy path is fine, but the inconsistency with the bitmap defense pattern makes this worth flagging now while distributed zonemap is still new and untested at scale.

Comment on lines +1665 to +1668
.await
.unwrap();
match result {
SearchResult::AtMost(mask) | SearchResult::Exact(mask) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Test comment in test_distributed_build_zonemap is incorrect: it claims range [150, 250] overlaps both fragment 1 (100..115) and fragment 2 (200..215), but fragment 1's values are entirely below 150 — only fragment 2 overlaps the range. The test still passes because the assertions (kept < total and kept > 0) hold even when only fragment 2's zone is kept, but the comment is misleading for future maintainers. Suggested fix: drop the fragment 1 reference, or pick a range like [110, 210] that genuinely overlaps two fragments.

Extended reasoning...

What's wrong

In rust/lance/src/index/create.rs at lines 1665-1668, the new test_distributed_build_zonemap test exercises zone pruning with a range query and contains this comment:

// Open the merged zonemap and exercise pruning. Range [150, 250] only
// overlaps fragment 1 (100..115) and fragment 2 (200..215), so the
// returned mask should be a strict subset of the dataset.

This statement is factually wrong about fragment 1.

Step-by-step proof

The test constructs four fragments via:

let base = fragment_id * 100;
let values: Vec<i32> = (0..16).map(|i| base + i).collect();

So each fragment has values:

  • fragment 0: base=0, values 0..=15 (max = 15)
  • fragment 1: base=100, values 100..=115 (max = 115)
  • fragment 2: base=200, values 200..=215 (min = 200, max = 215)
  • fragment 3: base=300, values 300..=315 (min = 300)

The query is:

SargableQuery::Range(
    Bound::Included(ScalarValue::Int32(Some(150))),
    Bound::Included(ScalarValue::Int32(Some(250))),
)

That is the closed range [150, 250]. Checking overlap against each fragment's value range:

  • fragment 1: 100..=115. Max value 115 < 150 — does not overlap.
  • fragment 2: 200..=215. Lies entirely within [150, 250] — overlaps.
  • fragment 0 and fragment 3 obviously don't overlap.

So only fragment 2 overlaps [150, 250]. The comment's claim that fragment 1 (100..115) also overlaps is incorrect.

Why the test still passes

The assertions are:

assert!(kept < total_rows, ...);
assert!(kept > 0, ...);

With total_rows = 64 and only fragment 2's 16 rows surviving, kept = 16 satisfies both 16 < 64 and 16 > 0. So the test still validates that some pruning happens and some zones survive — but the comment misrepresents which zones survive.

Why this matters

A future maintainer debugging zone-pruning behavior who reads this comment and runs the test under a debugger will see only fragment 2's zones in the surviving mask and assume a regression has occurred — when in fact the comment was wrong all along. This is the classic "misleading test comment causes false-positive bug investigation" failure mode.

Suggested fix

Either:

  1. Update the comment to say "Range [150, 250] only overlaps fragment 2 (200..215)" — minimal change, matches current behavior.
  2. Change the query range to [110, 210] (or [115, 215]) so the comment becomes accurate and the test exercises pruning that keeps two zones instead of one. This is a slightly stronger test of the zone-pruning math.

Either fix is purely a test-quality / documentation correctness improvement; no production code is affected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request java python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants