From cdb91ad133bc0fe5bb4a67f7d01adac88f0c9f30 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 28 Mar 2026 16:09:51 +0800 Subject: [PATCH 1/5] refactor: remove index segment from distributed indexing --- docs/src/guide/distributed_indexing.md | 42 ++- .../distributed_vector_segment_build.svg | 24 +- java/lance-jni/src/blocking_dataset.rs | 125 +-------- java/src/main/java/org/lance/Dataset.java | 16 +- .../java/org/lance/index/VectorIndexTest.java | 25 +- python/python/lance/__init__.py | 2 - python/python/lance/dataset.py | 27 +- python/python/lance/indices/__init__.py | 4 - python/python/lance/lance/__init__.pyi | 17 +- .../python/lance/lance/indices/__init__.pyi | 17 -- python/python/tests/test_vector_index.py | 38 +-- python/src/dataset.rs | 104 ++----- python/src/indices.rs | 112 ++------ python/src/lib.rs | 5 +- rust/lance/src/dataset.rs | 2 +- rust/lance/src/dataset/index.rs | 24 +- rust/lance/src/index.rs | 263 ++++++++++-------- rust/lance/src/index/api.rs | 167 +---------- rust/lance/src/index/create.rs | 202 ++------------ rust/lance/src/index/vector/ivf.rs | 252 +++-------------- rust/lance/src/index/vector/ivf/v2.rs | 99 +++---- 21 files changed, 397 insertions(+), 1170 deletions(-) diff --git a/docs/src/guide/distributed_indexing.md b/docs/src/guide/distributed_indexing.md index 9c75c5ceb88..83da11dbe01 100644 --- a/docs/src/guide/distributed_indexing.md +++ b/docs/src/guide/distributed_indexing.md @@ -93,37 +93,29 @@ First, multiple workers build segments in parallel: or Python `create_index_uncommitted(..., fragment_ids=...)` 2. each worker writes one segment under `indices//` -### Segment Build +### Segment Merge -Then the caller turns those existing segments into one or more physical -segments: +Then the caller decides whether those existing segments should be committed as-is +or merged into larger segments: -1. create a builder with `create_index_segment_builder()` -2. provide segment metadata with `with_segments(...)` -3. optionally choose a grouping policy with `with_target_segment_bytes(...)` -4. call `plan()` to get `Vec` - -At that point the caller has two execution choices: - -- call `build(plan)` for each plan and run those builds in parallel -- call `build_all()` to let Lance build every planned segment on the current node - -After the physical segments are built, publish them with -`commit_existing_index_segments(...)`. +1. keep the worker outputs as-is and commit them directly with + `commit_existing_index_segments(...)`, or +2. group one or more existing segments and call + `merge_existing_index_segments(...)` for each caller-defined group +3. commit the final segment list with `commit_existing_index_segments(...)` Within a single commit, built segments must have disjoint fragment coverage. -## Internal Segmented Finalize Model +## Internal Finalize Model Internally, Lance models distributed vector segment build as: -1. **plan** which input segments should become each physical segment -2. **build** each segment from its selected input segments -3. **commit** the resulting physical segments as one logical index +1. **build** one uncommitted segment per worker +2. **optionally merge** caller-defined groups of existing segments +3. **commit** the resulting segments as one logical index -The plan step is driven by the segment metadata returned from -`execute_uncommitted()` and any additional inputs requested by the segment -build APIs. +The merge step is driven directly by the `IndexMetadata` returned from +`execute_uncommitted()`. This is intentionally a storage-level model: @@ -133,10 +125,10 @@ This is intentionally a storage-level model: ## Segment Grouping -When Lance builds segments from existing inputs, it may either: +The caller chooses the final segment grouping: -- keep segment boundaries, so each input segment becomes one physical segment -- group multiple input segments into a larger physical segment +- keep segment boundaries, so each worker output is committed directly +- merge multiple existing segments into a larger segment before commit The grouping decision is separate from worker build. Workers only build segments; Lance applies the segment build policy when it plans diff --git a/docs/src/images/distributed_vector_segment_build.svg b/docs/src/images/distributed_vector_segment_build.svg index d36f2726010..4d52e2a0c82 100644 --- a/docs/src/images/distributed_vector_segment_build.svg +++ b/docs/src/images/distributed_vector_segment_build.svg @@ -65,40 +65,40 @@ - + - Segment planner - create_index_segment_builder → plan() + Caller-defined grouping + merge_existing_index_segments(...) -Vec<IndexSegmentPlan> +Vec<IndexMetadata> - + - Parallel segment build + Optional segment merge - build(plan[0]) - → IndexSegment 0 + merge(group[0]) + → IndexMetadata 0 - build(plan[1]) - → IndexSegment 1 + merge(group[1]) + → IndexMetadata 1 - build(plan[N]) - → IndexSegment N + merge(group[N]) + → IndexMetadata N diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index cc82c8acee5..43edf5515ac 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -38,7 +38,7 @@ use lance::dataset::{ ColumnAlteration, CommitBuilder, Dataset, NewColumnTransform, ProjectionRequest, ReadParams, Version, WriteParams, }; -use lance::index::{DatasetIndexExt, IndexSegment}; +use lance::index::DatasetIndexExt; use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore; use lance::io::{ObjectStore, ObjectStoreParams}; use lance::session::Session as LanceSession; @@ -1075,53 +1075,33 @@ fn inner_merge_index_metadata( } #[unsafe(no_mangle)] -pub extern "system" fn Java_org_lance_Dataset_nativeBuildIndexSegments<'local>( +pub extern "system" fn Java_org_lance_Dataset_nativeMergeExistingIndexSegments<'local>( mut env: JNIEnv<'local>, java_dataset: JObject, java_segments: JObject, - target_segment_bytes_jobj: JObject, ) -> JObject<'local> { ok_or_throw!( env, - inner_build_index_segments( - &mut env, - java_dataset, - java_segments, - target_segment_bytes_jobj - ) + inner_merge_existing_index_segments(&mut env, java_dataset, java_segments) ) } -fn inner_build_index_segments<'local>( +fn inner_merge_existing_index_segments<'local>( env: &mut JNIEnv<'local>, java_dataset: JObject, java_segments: JObject, - target_segment_bytes_jobj: JObject, ) -> Result> { let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; - let target_segment_bytes = env - .get_long_opt(&target_segment_bytes_jobj)? - .map(|v| v as u64); - let template = segment_template(&segments)?; - - let built_segments = { + let merged_segment = { let dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - let mut builder = dataset_guard - .inner - .create_index_segment_builder() - .with_segments(segments); - if let Some(target_segment_bytes) = target_segment_bytes { - builder = builder.with_target_segment_bytes(target_segment_bytes); - } - RT.block_on(builder.build_all())? + RT.block_on( + dataset_guard + .inner + .merge_existing_index_segments(segments), + )? }; - - let built_metadata = built_segments - .into_iter() - .map(|segment| index_segment_to_metadata(&template, segment)) - .collect::>(); - export_vec(env, &built_metadata) + (&merged_segment).into_java(env) } #[unsafe(no_mangle)] @@ -1153,12 +1133,7 @@ fn inner_commit_existing_index_segments<'local>( ) -> Result> { let index_name = index_name.extract(env)?; let column = column.extract(env)?; - let segment_metadata = - import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; - let segments = segment_metadata - .iter() - .map(index_metadata_to_segment) - .collect::>>()?; + let segments = import_vec_to_rust(env, &java_segments, |env, obj| obj.extract_object(env))?; let committed = { let mut dataset_guard = @@ -1174,82 +1149,6 @@ fn inner_commit_existing_index_segments<'local>( export_vec(env, &committed) } -struct SegmentTemplate { - name: String, - fields: Vec, - dataset_version: u64, -} - -fn segment_template(segments: &[IndexMetadata]) -> Result { - let first = segments - .first() - .ok_or_else(|| Error::input_error("segments cannot be empty".to_string()))?; - for segment in &segments[1..] { - if segment.name != first.name { - return Err(Error::input_error(format!( - "All segments must share the same index name, got '{}' and '{}'", - first.name, segment.name - ))); - } - if segment.fields != first.fields { - return Err(Error::input_error(format!( - "All segments must target the same field ids, got {:?} and {:?}", - first.fields, segment.fields - ))); - } - if segment.dataset_version != first.dataset_version { - return Err(Error::input_error(format!( - "All segments must share the same dataset version, got {} and {}", - first.dataset_version, segment.dataset_version - ))); - } - } - - Ok(SegmentTemplate { - name: first.name.clone(), - fields: first.fields.clone(), - dataset_version: first.dataset_version, - }) -} - -fn index_metadata_to_segment(metadata: &IndexMetadata) -> Result { - let fragment_bitmap = metadata.fragment_bitmap.clone().ok_or_else(|| { - Error::input_error(format!( - "Segment '{}' is missing fragment coverage metadata", - metadata.uuid - )) - })?; - let index_details = metadata.index_details.clone().ok_or_else(|| { - Error::input_error(format!( - "Segment '{}' is missing index details metadata", - metadata.uuid - )) - })?; - - Ok(IndexSegment::new( - metadata.uuid, - fragment_bitmap, - index_details, - metadata.index_version, - )) -} - -fn index_segment_to_metadata(template: &SegmentTemplate, segment: IndexSegment) -> IndexMetadata { - let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); - IndexMetadata { - uuid, - fields: template.fields.clone(), - name: template.name.clone(), - dataset_version: template.dataset_version, - fragment_bitmap: Some(fragment_bitmap), - index_details: Some(index_details), - index_version, - created_at: Some(Utc::now()), - base_id: None, - files: None, - } -} - #[unsafe(no_mangle)] pub extern "system" fn Java_org_lance_Dataset_nativeOptimizeIndices( mut env: JNIEnv, diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index d9c58e9b54a..44077bab7c8 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -1024,25 +1024,17 @@ public void mergeIndexMetadata( private native void innerMergeIndexMetadata( String indexUUID, int indexType, Optional batchReadHead); - /** - * Build physical vector index segments from previously-created fragment-level index outputs. - * - * @param segments segment metadata returned by {@link #createIndex(IndexOptions)} when - * fragmentIds are provided - * @param targetSegmentBytes optional size target for merged physical segments - * @return built physical segment metadata - */ - public List buildIndexSegments(List segments, Optional targetSegmentBytes) { + /** Merge one caller-defined group of existing uncommitted vector index segments. */ + public Index mergeExistingIndexSegments(List segments) { Preconditions.checkNotNull(segments, "segments cannot be null"); Preconditions.checkArgument(!segments.isEmpty(), "segments cannot be empty"); try (LockManager.WriteLock writeLock = lockManager.acquireWriteLock()) { Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed"); - return nativeBuildIndexSegments(segments, targetSegmentBytes); + return nativeMergeExistingIndexSegments(segments); } } - private native List nativeBuildIndexSegments( - List segments, Optional targetSegmentBytes); + private native Index nativeMergeExistingIndexSegments(List segments); /** * Publish one or more existing physical index segments as a logical index. diff --git a/java/src/test/java/org/lance/index/VectorIndexTest.java b/java/src/test/java/org/lance/index/VectorIndexTest.java index a96b6593d30..50499197b34 100755 --- a/java/src/test/java/org/lance/index/VectorIndexTest.java +++ b/java/src/test/java/org/lance/index/VectorIndexTest.java @@ -29,7 +29,6 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; -import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -95,13 +94,11 @@ public void testCreateIvfFlatIndexDistributively(@TempDir Path tempDir) throws E dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_FLAT index should not present before commit"); - List builtSegments = - dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); - assertEquals(2, builtSegments.size()); - List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + TestVectorDataset.indexName, + TestVectorDataset.vectorColumnName, + List.of(firstSegment, secondSegment)); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } @@ -187,13 +184,11 @@ public void testCreateIvfPqIndexDistributively(@TempDir Path tempDir) throws Exc dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_PQ index should not present before commit"); - List builtSegments = - dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); - assertEquals(2, builtSegments.size()); - List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + TestVectorDataset.indexName, + TestVectorDataset.vectorColumnName, + List.of(firstSegment, secondSegment)); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } @@ -263,13 +258,11 @@ public void testCreateIvfSqIndexDistributively(@TempDir Path tempDir) throws Exc dataset.listIndexes().contains(TestVectorDataset.indexName), "Partially created IVF_SQ index should not present before commit"); - List builtSegments = - dataset.buildIndexSegments(List.of(firstSegment, secondSegment), Optional.empty()); - assertEquals(2, builtSegments.size()); - List committed = dataset.commitExistingIndexSegments( - TestVectorDataset.indexName, TestVectorDataset.vectorColumnName, builtSegments); + TestVectorDataset.indexName, + TestVectorDataset.vectorColumnName, + List.of(firstSegment, secondSegment)); assertEquals(2, committed.size()); assertTrue(dataset.listIndexes().contains(TestVectorDataset.indexName)); } diff --git a/python/python/lance/__init__.py b/python/python/lance/__init__.py index 95bacfc3091..0b905457a50 100644 --- a/python/python/lance/__init__.py +++ b/python/python/lance/__init__.py @@ -29,7 +29,6 @@ from .lance import ( DatasetBasePath, FFILanceTableProvider, - IndexSegmentBuilder, ScanStatistics, bytes_read_counter, iops_counter, @@ -65,7 +64,6 @@ "FragmentMetadata", "Index", "IndexFile", - "IndexSegmentBuilder", "LanceDataset", "LanceFragment", "LanceOperation", diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7496746285a..c50d5d607f1 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -47,7 +47,7 @@ from .dependencies import numpy as np from .dependencies import pandas as pd from .fragment import DataFile, FragmentMetadata, LanceFragment -from .indices import IndexConfig, IndexSegment, SupportedDistributedIndices +from .indices import IndexConfig, SupportedDistributedIndices from .lance import ( CleanupStats, Compaction, @@ -3261,7 +3261,7 @@ def create_index( This enables distributed/fragment-level indexing. When provided, the method creates one segment but does not commit the index to the dataset. The returned metadata can be passed to - ``create_index_segment_builder().with_segments(...)`` + optionally merged with ``merge_existing_index_segments(...)`` and then committed with ``commit_existing_index_segments(...)``. index_uuid : str, optional A UUID to use for the segment written by this call. @@ -3440,8 +3440,9 @@ def create_index_uncommitted( 1. run :meth:`create_index_uncommitted` on each worker with that worker's assigned ``fragment_ids`` 2. collect the returned :class:`Index` objects - 3. pass them to :meth:`IndexSegmentBuilder.with_segments` - 4. build one or more physical segments and commit them with + 3. optionally merge one or more caller-defined groups with + :meth:`merge_existing_index_segments` + 4. commit the final segment list with :meth:`commit_existing_index_segments` Parameters are the same as :meth:`create_index`, with one additional @@ -3517,9 +3518,9 @@ def merge_index_metadata( Merge distributed scalar index metadata. Vector distributed indexing no longer uses this API. For vector indices, - build segments with :meth:`create_index_uncommitted`, plan or - merge them with :meth:`create_index_segment_builder`, and publish them - with :meth:`commit_existing_index_segments`. + build segments with :meth:`create_index_uncommitted`, optionally merge + caller-defined groups with :meth:`merge_existing_index_segments`, and + publish them with :meth:`commit_existing_index_segments`. This method does NOT commit changes. @@ -3553,18 +3554,14 @@ def merge_index_metadata( self._ds.merge_index_metadata(index_uuid, t, batch_readhead) return None - def create_index_segment_builder(self): + def merge_existing_index_segments(self, segments: List[Index]) -> Index: """ - Create a builder for turning existing segments into physical segments. - - Provide the segment metadata returned by - :meth:`create_index_uncommitted` through - :meth:`IndexSegmentBuilder.with_segments`. + Merge one caller-defined group of existing uncommitted segments. """ - return self._ds.create_index_segment_builder() + return self._ds.merge_existing_index_segments(segments) def commit_existing_index_segments( - self, index_name: str, column: str, segments: List[IndexSegment] + self, index_name: str, column: str, segments: List[Index] ) -> LanceDataset: """ Commit built index segments as one logical index. diff --git a/python/python/lance/indices/__init__.py b/python/python/lance/indices/__init__.py index b35e5d5b174..edf9e5091ff 100644 --- a/python/python/lance/indices/__init__.py +++ b/python/python/lance/indices/__init__.py @@ -8,9 +8,7 @@ from .ivf import IvfModel from .pq import PqModel -IndexSegment = _lance.indices.IndexSegment IndexSegmentDescription = _lance.indices.IndexSegmentDescription -IndexSegmentPlan = _lance.indices.IndexSegmentPlan __all__ = [ "IndicesBuilder", @@ -18,9 +16,7 @@ "PqModel", "IvfModel", "IndexFileVersion", - "IndexSegment", "IndexSegmentDescription", - "IndexSegmentPlan", ] diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index f0be29f39ca..6b9e6e2ea0c 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -61,9 +61,7 @@ from .fragment import ( RowIdMeta as RowIdMeta, ) from .indices import IndexDescription as IndexDescription -from .indices import IndexSegment as IndexSegment from .indices import IndexSegmentDescription as IndexSegmentDescription -from .indices import IndexSegmentPlan as IndexSegmentPlan from .lance import PySearchFilter from .optimize import ( Compaction as Compaction, @@ -188,15 +186,6 @@ class LanceColumnStatistics: class _Session: def size_bytes(self) -> int: ... -class IndexSegmentBuilder: - @property - def staging_index_uuid(self) -> str: ... - def with_partial_indices(self, partial_indices: List[Index]) -> Self: ... - def with_target_segment_bytes(self, bytes: int) -> Self: ... - def plan(self) -> List[IndexSegmentPlan]: ... - def build(self, plan: IndexSegmentPlan) -> IndexSegment: ... - def build_all(self) -> List[IndexSegment]: ... - class LanceBlobFile: def close(self): ... def is_closed(self) -> bool: ... @@ -372,11 +361,9 @@ class _Dataset: def merge_index_metadata( self, index_uuid: str, index_type: str, batch_readhead: Optional[int] = None ): ... - def create_index_segment_builder( - self, staging_index_uuid: str - ) -> IndexSegmentBuilder: ... + def merge_existing_index_segments(self, segments: List[Index]) -> Index: ... def commit_existing_index_segments( - self, index_name: str, column: str, segments: List[IndexSegment] + self, index_name: str, column: str, segments: List[Index] ) -> None: ... def count_fragments(self) -> int: ... def num_small_files(self, max_rows_per_group: int) -> int: ... diff --git a/python/python/lance/lance/indices/__init__.pyi b/python/python/lance/lance/indices/__init__.pyi index 3369b61c619..152ea1d10b2 100644 --- a/python/python/lance/lance/indices/__init__.pyi +++ b/python/python/lance/lance/indices/__init__.pyi @@ -17,27 +17,10 @@ from typing import Optional import pyarrow as pa -from ...dataset import Index - class IndexConfig: index_type: str config: str -class IndexSegment: - uuid: str - fragment_ids: set[int] - index_version: int - - def __repr__(self) -> str: ... - -class IndexSegmentPlan: - staging_index_uuid: str - segment: IndexSegment - partial_indices: list[Index] - estimated_bytes: int - - def __repr__(self) -> str: ... - def train_ivf_model( dataset, column: str, diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 0c1d5ab9ed4..3da73269220 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -2148,9 +2148,6 @@ def build_distributed_vector_index( ) ) - segments = ( - dataset.create_index_segment_builder().with_segments(segments).build_all() - ) return dataset.commit_existing_index_segments(f"{column}_idx", column, segments) @@ -2522,7 +2519,6 @@ def test_metadata_merge_pq_success(tmp_path): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - segments = ds.create_index_segment_builder().with_segments(segments).build_all() ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2561,7 +2557,6 @@ def test_distributed_workflow_merge_and_search(tmp_path): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - segments = ds.create_index_segment_builder().with_segments(segments).build_all() ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 10}) @@ -2597,7 +2592,6 @@ def test_vector_merge_two_shards_success_flat(tmp_path): ivf_centroids=preprocessed["ivf_centroids"], pq_codebook=preprocessed["pq_codebook"], ) - segments = ds.create_index_segment_builder().with_segments(segments).build_all() ds = _commit_segments_helper(ds, segments, column="vector") q = np.random.rand(128).astype(np.float32) result = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) @@ -2650,7 +2644,6 @@ def test_distributed_ivf_parameterized(tmp_path, index_type, num_sub_vectors): ds.create_index_uncommitted(**kwargs1), ds.create_index_uncommitted(**kwargs2), ] - segments = ds.create_index_segment_builder().with_segments(segments).build_all() ds = _commit_segments_helper(ds, segments, "vector") q = np.random.rand(128).astype(np.float32) @@ -2711,20 +2704,16 @@ def test_merge_two_shards_parameterized(tmp_path, index_type, num_sub_vectors): kwargs2["pq_codebook"] = pre["pq_codebook"] segment2 = ds.create_index_uncommitted(**kwargs2) - segments = ( - ds.create_index_segment_builder() - .with_segments([segment1, segment2]) - .build_all() - ) - ds = _commit_segments_helper(ds, segments, column="vector") + merged_segment = ds.merge_existing_index_segments([segment1, segment2]) + ds = _commit_segments_helper(ds, [merged_segment], column="vector") q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) assert 0 < len(results) <= 5 -def test_index_segment_builder_builds_vector_segments(tmp_path): - ds = _make_sample_dataset_base(tmp_path, "segment_builder_ds", 2000, 128) +def test_merge_existing_index_segments_builds_vector_segment(tmp_path): + ds = _make_sample_dataset_base(tmp_path, "merge_existing_segments_ds", 2000, 128) frags = ds.get_fragments() assert len(frags) >= 2 builder = IndicesBuilder(ds, "vector") @@ -2751,14 +2740,12 @@ def test_index_segment_builder_builds_vector_segments(tmp_path): for fragment in frags[:2] ] - segment_builder = ds.create_index_segment_builder().with_segments(segments) - plans = segment_builder.plan() - assert len(plans) == 2 - assert all(len(plan.segments) == 1 for plan in plans) - - segments = segment_builder.build_all() - assert len(segments) == 2 - ds = ds.commit_existing_index_segments("vector_idx", "vector", segments) + merged_segment = ds.merge_existing_index_segments(segments) + assert merged_segment.fragment_bitmap is not None + assert sorted(merged_segment.fragment_bitmap) == sorted( + [fragment.fragment_id for fragment in frags[:2]] + ) + ds = ds.commit_existing_index_segments("vector_idx", "vector", [merged_segment]) q = np.random.rand(128).astype(np.float32) results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) @@ -2819,11 +2806,6 @@ def build_distributed_ivf_pq(ds_copy, shard_order): ivf_centroids=pre["ivf_centroids"], pq_codebook=pre["pq_codebook"], ) - segments = ( - ds_copy.create_index_segment_builder() - .with_segments(segments) - .build_all() - ) return _commit_segments_helper(ds_copy, segments, column="vector") except ValueError as e: raise e diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 35306636c93..86b6cf74789 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -87,7 +87,7 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler; use crate::error::PythonErrorExt; use crate::file::object_store_from_uri_or_path; use crate::fragment::FileFragment; -use crate::indices::{PyIndexConfig, PyIndexDescription, PyIndexSegment, PyIndexSegmentPlan}; +use crate::indices::{PyIndexConfig, PyIndexDescription}; use crate::namespace::extract_namespace_arc; use crate::rt; use crate::scanner::ScanStatistics; @@ -323,78 +323,6 @@ impl MergeInsertBuilder { } } -#[pyclass(name = "IndexSegmentBuilder", module = "lance", subclass)] -#[derive(Clone)] -pub struct PyIndexSegmentBuilder { - dataset: Arc, - segments: Vec, - target_segment_bytes: Option, -} - -impl PyIndexSegmentBuilder { - fn builder(&self) -> ::IndexSegmentBuilder<'_> { - let mut builder = self - .dataset - .create_index_segment_builder() - .with_segments(self.segments.clone()); - if let Some(target_segment_bytes) = self.target_segment_bytes { - builder = builder.with_target_segment_bytes(target_segment_bytes); - } - builder - } -} - -#[pymethods] -impl PyIndexSegmentBuilder { - fn with_segments<'a>( - mut slf: PyRefMut<'a, Self>, - segments: &Bound<'_, PyAny>, - ) -> PyResult> { - let mut indices = Vec::new(); - for item in segments.try_iter()? { - indices.push(item?.extract::>()?.0); - } - slf.segments = indices; - Ok(slf) - } - - fn with_target_segment_bytes<'a>( - mut slf: PyRefMut<'a, Self>, - bytes: u64, - ) -> PyResult> { - slf.target_segment_bytes = Some(bytes); - Ok(slf) - } - - fn plan(&self, py: Python<'_>) -> PyResult>> { - let plans = rt() - .block_on(Some(py), self.builder().plan())? - .infer_error()?; - plans - .into_iter() - .map(|plan| Py::new(py, PyIndexSegmentPlan::from_inner(plan))) - .collect() - } - - fn build(&self, py: Python<'_>, plan: &Bound<'_, PyAny>) -> PyResult> { - let plan = plan.extract::>()?; - let segment = rt() - .block_on(Some(py), self.builder().build(&plan.inner))? - .infer_error()?; - Py::new(py, PyIndexSegment::from_inner(segment)) - } - - fn build_all(&self, py: Python<'_>) -> PyResult>> { - let segments = rt() - .block_on(Some(py), self.builder().build_all())? - .infer_error()?; - segments - .into_iter() - .map(|segment| Py::new(py, PyIndexSegment::from_inner(segment))) - .collect() - } -} - impl MergeInsertBuilder { fn build_stats<'a>(stats: &MergeStats, py: Python<'a>) -> PyResult> { let dict = PyDict::new(py); @@ -2091,28 +2019,34 @@ impl Dataset { Ok(PyLance(index_metadata)) } - fn create_index_segment_builder(&self) -> PyResult { - Ok(PyIndexSegmentBuilder { - dataset: self.ds.clone(), - segments: Vec::new(), - target_segment_bytes: None, - }) + fn merge_existing_index_segments( + &self, + segments: Vec>, + ) -> PyResult> { + let merged = rt() + .block_on( + None, + self.ds + .merge_existing_index_segments(segments.into_iter().map(|s| s.0).collect()), + )? + .infer_error()?; + Ok(PyLance(merged)) } fn commit_existing_index_segments( &mut self, index_name: &str, column: &str, - segments: Vec>, + segments: Vec>, ) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); - let segments = segments - .into_iter() - .map(|segment| segment.inner.clone()) - .collect(); rt().block_on( None, - new_self.commit_existing_index_segments(index_name, column, segments), + new_self.commit_existing_index_segments( + index_name, + column, + segments.into_iter().map(|segment| segment.0).collect(), + ), )? .infer_error()?; self.ds = Arc::new(new_self); diff --git a/python/src/indices.rs b/python/src/indices.rs index cea7f2a968a..208a4b67f87 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -12,7 +12,7 @@ use chrono::{DateTime, Utc}; use lance::dataset::Dataset as LanceDataset; use lance::index::vector::ivf::builder::write_vector_storage; use lance::index::vector::pq::build_pq_model_in_fragments; -use lance::index::{DatasetIndexExt, IndexSegment, IndexSegmentPlan}; +use lance::index::DatasetIndexExt; use lance::io::ObjectStore; use lance_index::progress::NoopIndexBuildProgress; use lance_index::vector::ivf::shuffler::{IvfShuffler, shuffle_vectors}; @@ -21,6 +21,7 @@ use lance_index::vector::{ pq::{PQBuildParams, ProductQuantizer}, }; use lance_linalg::distance::DistanceType; +use lance_table::format::{IndexMetadata, list_index_files_with_sizes}; use pyo3::Bound; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; @@ -34,7 +35,7 @@ use pyo3::{ use lance::index::DatasetIndexInternalExt; use crate::fragment::FileFragment; -use crate::utils::{PyJson, PyLance}; +use crate::utils::PyJson; use crate::{ dataset::Dataset, error::PythonErrorExt, file::object_store_from_uri_or_path_no_options, rt, }; @@ -60,82 +61,6 @@ impl PyIndexConfig { } } -#[pyclass(name = "IndexSegment", module = "lance.indices")] -#[derive(Debug, Clone)] -pub struct PyIndexSegment { - pub(crate) inner: IndexSegment, -} - -impl PyIndexSegment { - pub(crate) fn from_inner(inner: IndexSegment) -> Self { - Self { inner } - } -} - -#[pymethods] -impl PyIndexSegment { - #[getter] - fn uuid(&self) -> String { - self.inner.uuid().to_string() - } - - #[getter] - fn fragment_ids(&self) -> HashSet { - self.inner.fragment_bitmap().iter().collect() - } - - #[getter] - fn index_version(&self) -> i32 { - self.inner.index_version() - } - - fn __repr__(&self) -> String { - format!( - "IndexSegment(uuid={}, fragment_ids={:?}, index_version={})", - self.uuid(), - self.fragment_ids(), - self.index_version() - ) - } -} - -#[pyclass(name = "IndexSegmentPlan", module = "lance.indices")] -#[derive(Debug, Clone)] -pub struct PyIndexSegmentPlan { - pub(crate) inner: IndexSegmentPlan, -} - -impl PyIndexSegmentPlan { - pub(crate) fn from_inner(inner: IndexSegmentPlan) -> Self { - Self { inner } - } -} - -#[pymethods] -impl PyIndexSegmentPlan { - #[getter] - fn segment(&self) -> PyIndexSegment { - PyIndexSegment::from_inner(self.inner.segment().clone()) - } - - #[getter] - fn segments(&self) -> Vec> { - self.inner.segments().iter().cloned().map(PyLance).collect() - } - - #[getter] - fn estimated_bytes(&self) -> u64 { - self.inner.estimated_bytes() - } - fn __repr__(&self) -> String { - format!( - "IndexSegmentPlan(segments={}, estimated_bytes={})", - self.inner.segments().len(), - self.estimated_bytes() - ) - } -} - #[pyclass(name = "IvfModel", module = "lance.indices")] #[derive(Debug, Clone)] pub struct PyIvfModel { @@ -505,18 +430,29 @@ async fn do_load_shuffled_vectors( .infer_error()?; let mut ds = dataset.ds.as_ref().clone(); + let index_dir = ds.indices_dir().child(index_id.to_string()); + let files = list_index_files_with_sizes(ds.object_store(), &index_dir) + .await + .infer_error()?; + let metadata = IndexMetadata { + uuid: index_id, + name: index_name.to_string(), + fields: vec![ds.schema().field(column).unwrap().id], + dataset_version: ds.manifest.version, + fragment_bitmap: Some(ds.fragments().iter().map(|f| f.id as u32).collect()), + index_details: Some(Arc::new( + prost_types::Any::from_msg(&lance_table::format::pb::VectorIndexDetails::default()) + .unwrap(), + )), + index_version: IndexType::IvfPq.version(), + created_at: Some(Utc::now()), + base_id: None, + files: Some(files), + }; ds.commit_existing_index_segments( index_name, column, - vec![IndexSegment::new( - index_id, - ds.fragments().iter().map(|f| f.id as u32), - Arc::new( - prost_types::Any::from_msg(&lance_table::format::pb::VectorIndexDetails::default()) - .unwrap(), - ), - IndexType::IvfPq.version(), - )], + vec![metadata], ) .await .infer_error()?; @@ -711,8 +647,6 @@ pub fn register_indices(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { indices.add_wrapped(wrap_pyfunction!(load_shuffled_vectors))?; indices.add_class::()?; indices.add_class::()?; - indices.add_class::()?; - indices.add_class::()?; indices.add_class::()?; indices.add_class::()?; indices.add_wrapped(wrap_pyfunction!(get_ivf_model))?; diff --git a/python/src/lib.rs b/python/src/lib.rs index 9730f2ba1c5..ee5eaf96a16 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -45,9 +45,7 @@ use dataset::io_stats::IoStats; use dataset::optimize::{ PyCompaction, PyCompactionMetrics, PyCompactionPlan, PyCompactionTask, PyRewriteResult, }; -use dataset::{ - DatasetBasePath, MergeInsertBuilder, PyFullTextQuery, PyIndexSegmentBuilder, PySearchFilter, -}; +use dataset::{DatasetBasePath, MergeInsertBuilder, PyFullTextQuery, PySearchFilter}; use env_logger::{Builder, Env}; use file::{ LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader, @@ -254,7 +252,6 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 817954da710..7ba85a8e52a 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -2790,7 +2790,7 @@ impl Dataset { IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => { Err(Error::invalid_input( "Vector distributed indexing no longer supports merge_index_metadata; \ - build segments, use create_index_segment_builder(), \ + build segments, optionally merge groups with merge_existing_index_segments(...), \ and commit with commit_existing_index_segments(...)" .to_string(), )) diff --git a/rust/lance/src/dataset/index.rs b/rust/lance/src/dataset/index.rs index fded774b151..856f7361892 100644 --- a/rust/lance/src/dataset/index.rs +++ b/rust/lance/src/dataset/index.rs @@ -179,8 +179,8 @@ mod tests { use super::*; use crate::dataset::WriteParams; + use crate::index::DatasetIndexExt; use crate::index::vector::VectorIndexParams; - use crate::index::{DatasetIndexExt, IndexSegment}; use lance_datagen::{BatchCount, RowCount, array}; use lance_index::IndexType; use lance_linalg::distance::MetricType; @@ -242,18 +242,16 @@ mod tests { } let segments = vec![ - IndexSegment::new( - first_segment_uuid, - [target_fragments[0].id() as u32], - built_index.index_details.clone().unwrap(), - built_index.index_version, - ), - IndexSegment::new( - second_segment_uuid, - [target_fragments[1].id() as u32], - built_index.index_details.clone().unwrap(), - built_index.index_version, - ), + IndexMetadata { + uuid: first_segment_uuid, + fragment_bitmap: Some(std::iter::once(target_fragments[0].id() as u32).collect()), + ..built_index.clone() + }, + IndexMetadata { + uuid: second_segment_uuid, + fragment_bitmap: Some(std::iter::once(target_fragments[1].id() as u32).collect()), + ..built_index + }, ]; dataset diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 6a88441029e..5d8d42b3555 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -80,7 +80,7 @@ use crate::dataset::index::LanceIndexStoreExt; use crate::dataset::optimize::RemappedIndex; use crate::dataset::optimize::remapping::RemapResult; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; -pub use crate::index::api::{DatasetIndexExt, IndexSegment, IndexSegmentPlan}; +pub use crate::index::api::DatasetIndexExt; use crate::index::frag_reuse::{load_frag_reuse_index_details, open_frag_reuse_index}; use crate::index::mem_wal::open_mem_wal_index; pub use crate::index::prefilter::{FilterLoader, PreFilter}; @@ -90,7 +90,7 @@ use crate::{Error, Result, dataset::Dataset}; pub use create::CreateIndexBuilder; pub use lance_index::IndexDescription; -fn validate_index_segments(index_name: &str, segments: &[IndexSegment]) -> Result<()> { +fn validate_segment_metadata(index_name: &str, segments: &[IndexMetadata]) -> Result<()> { if segments.is_empty() { return Err(Error::invalid_input( "CreateIndex: at least one index segment is required".to_string(), @@ -100,55 +100,30 @@ fn validate_index_segments(index_name: &str, segments: &[IndexSegment]) -> Resul let mut seen_segment_ids = HashSet::with_capacity(segments.len()); let mut covered_fragments = RoaringBitmap::new(); for segment in segments { - if !seen_segment_ids.insert(segment.uuid()) { + if !seen_segment_ids.insert(segment.uuid) { return Err(Error::invalid_input(format!( "CreateIndex: duplicate segment uuid {} for index '{}'", - segment.uuid(), - index_name + segment.uuid, index_name ))); } - if !covered_fragments.is_disjoint(segment.fragment_bitmap()) { + let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing fragment coverage", + segment.uuid + )) + })?; + if !covered_fragments.is_disjoint(fragment_bitmap) { return Err(Error::invalid_input(format!( "CreateIndex: overlapping fragment coverage in segment set for index '{}'", index_name ))); } - covered_fragments |= segment.fragment_bitmap().clone(); + covered_fragments |= fragment_bitmap.clone(); } Ok(()) } -pub(crate) async fn build_index_metadata_from_segments( - dataset: &Dataset, - index_name: &str, - field_id: i32, - segments: Vec, -) -> Result> { - validate_index_segments(index_name, &segments)?; - - let mut new_indices = Vec::with_capacity(segments.len()); - for segment in segments { - let (uuid, fragment_bitmap, index_details, index_version) = segment.into_parts(); - let index_dir = dataset.indices_dir().child(uuid.to_string()); - let files = list_index_files_with_sizes(&dataset.object_store, &index_dir).await?; - new_indices.push(IndexMetadata { - uuid, - name: index_name.to_string(), - fields: vec![field_id], - dataset_version: dataset.manifest.version, - fragment_bitmap: Some(fragment_bitmap), - index_details: Some(index_details), - index_version, - created_at: Some(chrono::Utc::now()), - base_id: None, - files: Some(files), - }); - } - - Ok(new_indices) -} - // Cache keys for different index types #[derive(Debug, Clone)] pub struct ScalarIndexCacheKey<'a> { @@ -644,7 +619,6 @@ impl IndexDescription for IndexDescriptionImpl { #[async_trait] impl DatasetIndexExt for Dataset { type IndexBuilder<'a> = CreateIndexBuilder<'a>; - type IndexSegmentBuilder<'a> = create::IndexSegmentBuilder<'a>; /// Create a builder for creating an index on columns. /// @@ -692,10 +666,6 @@ impl DatasetIndexExt for Dataset { CreateIndexBuilder::new(self, columns, index_type, params) } - fn create_index_segment_builder<'a>(&'a self) -> create::IndexSegmentBuilder<'a> { - create::IndexSegmentBuilder::new(self) - } - #[instrument(skip_all)] async fn create_index( &mut self, @@ -843,26 +813,73 @@ impl DatasetIndexExt for Dataset { } } + async fn merge_existing_index_segments( + &self, + source_segments: Vec, + ) -> Result { + validate_segment_metadata("uncommitted", &source_segments)?; + let field_id = *source_segments[0].fields.first().ok_or_else(|| { + Error::invalid_input(format!( + "CreateIndex: segment {} is missing field ids", + source_segments[0].uuid + )) + })?; + if source_segments + .iter() + .any(|segment| segment.fields != [field_id]) + { + return Err(Error::invalid_input( + "merge_existing_index_segments requires segments with identical fields".to_string(), + )); + } + if !source_segments.iter().all(|segment| { + segment + .index_details + .as_ref() + .is_some_and(|details| type_name_from_uri(&details.type_url) == "Vector") + }) { + return Err(Error::invalid_input( + "merge_existing_index_segments currently only supports vector segments".to_string(), + )); + } + + let mut merged_segment = crate::index::vector::ivf::merge_segments( + self.object_store(), + &self.indices_dir(), + source_segments, + ) + .await?; + merged_segment.dataset_version = self.manifest.version; + merged_segment.fields = vec![field_id]; + Ok(merged_segment) + } + async fn commit_existing_index_segments( &mut self, index_name: &str, column: &str, - segments: Vec, + segments: Vec, ) -> Result<()> { - if segments.is_empty() { - return Err(Error::invalid_input( - "CreateIndex: at least one index segment is required".to_string(), - )); - } - let Some(field) = self.schema().field(column) else { return Err(Error::index(format!( "CreateIndex: column '{column}' does not exist" ))); }; - let new_indices = - build_index_metadata_from_segments(self, index_name, field.id, segments).await?; + validate_segment_metadata(index_name, &segments)?; + + let mut new_indices = Vec::with_capacity(segments.len()); + for mut segment in segments { + if segment.fields != [field.id] { + return Err(Error::invalid_input(format!( + "CreateIndex: segment {} was built for fields {:?}, expected [{}]", + segment.uuid, segment.fields, field.id + ))); + } + segment.name = index_name.to_string(); + segment.dataset_version = self.manifest.version; + new_indices.push(segment); + } let transaction = Transaction::new( self.manifest.version, @@ -2080,6 +2097,40 @@ mod tests { use rstest::rstest; use std::collections::HashSet; + async fn write_vector_segment_metadata( + dataset: &Dataset, + index_name: &str, + field_id: i32, + uuid: Uuid, + fragment_bitmap: impl IntoIterator, + payload: &[u8], + ) -> IndexMetadata { + let index_path = dataset + .indices_dir() + .child(uuid.to_string()) + .child(INDEX_FILE_NAME); + dataset + .object_store() + .put(&index_path, payload) + .await + .unwrap(); + IndexMetadata { + uuid, + name: index_name.to_string(), + fields: vec![field_id], + dataset_version: dataset.manifest.version, + fragment_bitmap: Some(fragment_bitmap.into_iter().collect()), + index_details: Some(Arc::new(vector_index_details())), + index_version: IndexType::Vector.version(), + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(vec![lance_table::format::IndexFile { + path: INDEX_FILE_NAME.to_string(), + size_bytes: payload.len() as u64, + }]), + } + } + #[tokio::test] async fn test_recreate_index() { const DIM: i32 = 8; @@ -5260,36 +5311,25 @@ mod tests { .await .unwrap(); - let seg0 = IndexSegment::new( + let field_id = dataset.schema().field("vector").unwrap().id; + let seg0 = write_vector_segment_metadata( + &dataset, + "vector_idx", + field_id, Uuid::new_v4(), - std::iter::once(0_u32), - Arc::new(vector_index_details()), - IndexType::Vector.version(), - ); - let seg1 = IndexSegment::new( + [0_u32], + b"seg0", + ) + .await; + let seg1 = write_vector_segment_metadata( + &dataset, + "vector_idx", + field_id, Uuid::new_v4(), - std::iter::once(1_u32), - Arc::new(vector_index_details()), - IndexType::Vector.version(), - ); - let seg0_path = dataset - .indices_dir() - .child(seg0.uuid().to_string()) - .child(INDEX_FILE_NAME); - let seg1_path = dataset - .indices_dir() - .child(seg1.uuid().to_string()) - .child(INDEX_FILE_NAME); - dataset - .object_store() - .put(&seg0_path, b"seg0") - .await - .unwrap(); - dataset - .object_store() - .put(&seg1_path, b"seg1") - .await - .unwrap(); + [1_u32], + b"seg1", + ) + .await; dataset .commit_existing_index_segments( @@ -5305,7 +5345,7 @@ mod tests { let committed_uuids = committed.iter().map(|idx| idx.uuid).collect::>(); assert_eq!( committed_uuids, - HashSet::from([seg0.uuid(), seg1.uuid()]), + HashSet::from([seg0.uuid, seg1.uuid]), "all committed segment uuids should be preserved" ); assert_eq!( @@ -5346,12 +5386,16 @@ mod tests { let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); - let base = IndexSegment::new( + let field_id = dataset.schema().field("vector").unwrap().id; + let base = write_vector_segment_metadata( + &dataset, + "vector_idx", + field_id, Uuid::new_v4(), - std::iter::once(0_u32), - Arc::new(vector_index_details()), - IndexType::Vector.version(), - ); + [0_u32], + b"base", + ) + .await; let err = dataset .commit_existing_index_segments( @@ -5359,12 +5403,10 @@ mod tests { "vector", vec![ base.clone(), - IndexSegment::new( - base.uuid(), - std::iter::once(1_u32), - Arc::new(vector_index_details()), - IndexType::Vector.version(), - ), + IndexMetadata { + fragment_bitmap: Some(std::iter::once(1_u32).collect()), + ..base + }, ], ) .await @@ -5413,25 +5455,28 @@ mod tests { let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let field_id = dataset.schema().field("vector").unwrap().id; + let seg0 = write_vector_segment_metadata( + &dataset, + "vector_idx", + field_id, + Uuid::new_v4(), + [0_u32, 1_u32], + b"seg0", + ) + .await; + let seg1 = write_vector_segment_metadata( + &dataset, + "vector_idx", + field_id, + Uuid::new_v4(), + [1_u32], + b"seg1", + ) + .await; + let err = dataset - .commit_existing_index_segments( - "vector_idx", - "vector", - vec![ - IndexSegment::new( - Uuid::new_v4(), - [0_u32, 1_u32], - Arc::new(vector_index_details()), - IndexType::Vector.version(), - ), - IndexSegment::new( - Uuid::new_v4(), - [1_u32], - Arc::new(vector_index_details()), - IndexType::Vector.version(), - ), - ], - ) + .commit_existing_index_segments("vector_idx", "vector", vec![seg0, seg1]) .await .unwrap_err(); assert!(err.to_string().contains("overlapping fragment coverage")); diff --git a/rust/lance/src/index/api.rs b/rust/lance/src/index/api.rs index f8e7ee7d012..b42c10ff1dc 100644 --- a/rust/lance/src/index/api.rs +++ b/rust/lance/src/index/api.rs @@ -7,133 +7,13 @@ use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use lance_index::{IndexParams, IndexType, optimize::OptimizeOptions}; use lance_table::format::IndexMetadata; -use roaring::RoaringBitmap; -use uuid::Uuid; use crate::{Error, Result}; -/// A single physical segment of a logical index. -/// -/// Each segment is stored independently and will become one manifest entry when committed. -/// The logical index identity (name / target column / dataset version) is provided separately -/// by the commit API. -#[derive(Debug, Clone, PartialEq)] -pub struct IndexSegment { - /// Unique ID of the physical segment. - uuid: Uuid, - /// The fragments covered by this segment. - fragment_bitmap: RoaringBitmap, - /// Metadata specific to the index type. - index_details: Arc, - /// The on-disk index version for this segment. - index_version: i32, -} - -impl IndexSegment { - /// Create a fully described segment with the given UUID, fragment coverage, and index - /// metadata. - pub fn new( - uuid: Uuid, - fragment_bitmap: I, - index_details: Arc, - index_version: i32, - ) -> Self - where - I: IntoIterator, - { - Self { - uuid, - fragment_bitmap: fragment_bitmap.into_iter().collect(), - index_details, - index_version, - } - } - - /// Return the UUID of this segment. - pub fn uuid(&self) -> Uuid { - self.uuid - } - - /// Return the fragment coverage of this segment. - pub fn fragment_bitmap(&self) -> &RoaringBitmap { - &self.fragment_bitmap - } - - /// Return the serialized index details for this segment. - pub fn index_details(&self) -> &Arc { - &self.index_details - } - - /// Return the on-disk index version for this segment. - pub fn index_version(&self) -> i32 { - self.index_version - } - - /// Consume the segment and return its component parts. - pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc, i32) { - ( - self.uuid, - self.fragment_bitmap, - self.index_details, - self.index_version, - ) - } -} - -/// A plan for building one physical segment from one or more existing -/// vector index segments. -#[derive(Debug, Clone, PartialEq)] -pub struct IndexSegmentPlan { - segment: IndexSegment, - segments: Vec, - estimated_bytes: u64, - requested_index_type: Option, -} - -impl IndexSegmentPlan { - /// Create a plan for one built segment. - pub fn new( - segment: IndexSegment, - segments: Vec, - estimated_bytes: u64, - requested_index_type: Option, - ) -> Self { - Self { - segment, - segments, - estimated_bytes, - requested_index_type, - } - } - - /// Return the segment metadata that should be committed after this plan is built. - pub fn segment(&self) -> &IndexSegment { - &self.segment - } - - /// Return the input segment metadata that should be combined into the segment. - pub fn segments(&self) -> &[IndexMetadata] { - &self.segments - } - - /// Return the estimated number of bytes covered by this plan. - pub fn estimated_bytes(&self) -> u64 { - self.estimated_bytes - } - - /// Return the requested logical index type, if one was supplied to the planner. - pub fn requested_index_type(&self) -> Option { - self.requested_index_type - } -} - /// Extends [`crate::Dataset`] with secondary index APIs. #[async_trait] pub trait DatasetIndexExt { type IndexBuilder<'a> - where - Self: 'a; - type IndexSegmentBuilder<'a> where Self: 'a; @@ -148,18 +28,6 @@ pub trait DatasetIndexExt { params: &'a dyn IndexParams, ) -> Self::IndexBuilder<'a>; - /// Create a builder for building physical index segments from uncommitted - /// vector index outputs. - /// - /// The caller supplies the uncommitted index metadata returned by - /// `execute_uncommitted()` so the builder can plan segment grouping without - /// rediscovering fragment coverage. - /// - /// This is the canonical entry point for distributed vector segment build. - /// After building the physical segments, publish them as a - /// logical index with [`Self::commit_existing_index_segments`]. - fn create_index_segment_builder<'a>(&'a self) -> Self::IndexSegmentBuilder<'a>; - /// Create indices on columns. /// /// Upon finish, a new dataset version is generated. @@ -251,12 +119,19 @@ pub trait DatasetIndexExt { /// Find an index with the given name and return its serialized statistics. async fn index_statistics(&self, index_name: &str) -> Result; + /// Merge one caller-defined group of existing uncommitted index segments into a + /// single segment. + async fn merge_existing_index_segments( + &self, + segments: Vec, + ) -> Result; + /// Commit one or more existing physical index segments as a logical index. async fn commit_existing_index_segments( &mut self, index_name: &str, column: &str, - segments: Vec, + segments: Vec, ) -> Result<()>; async fn read_index_partition( @@ -266,29 +141,3 @@ pub trait DatasetIndexExt { with_vector: bool, ) -> Result; } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::{IndexSegment, IndexSegmentPlan}; - use lance_index::IndexType; - use uuid::Uuid; - - #[test] - fn test_index_segment_plan_accessors() { - let uuid = Uuid::new_v4(); - let segment = IndexSegment::new(uuid, [1_u32, 3], Arc::new(prost_types::Any::default()), 7); - let plan = IndexSegmentPlan::new(segment.clone(), vec![], 128, Some(IndexType::BTree)); - - assert_eq!(segment.uuid(), uuid); - assert_eq!( - segment.fragment_bitmap().iter().collect::>(), - vec![1, 3] - ); - assert_eq!(segment.index_version(), 7); - assert_eq!(plan.segment().uuid(), uuid); - assert_eq!(plan.estimated_bytes(), 128); - assert_eq!(plan.requested_index_type(), Some(IndexType::BTree)); - } -} diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 7403e2b24c9..058f4e3a69c 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -8,7 +8,7 @@ use crate::{ transaction::{Operation, TransactionBuilder}, }, index::{ - DatasetIndexExt, DatasetIndexInternalExt, build_index_metadata_from_segments, + DatasetIndexExt, DatasetIndexInternalExt, scalar::build_scalar_index, vector::{ LANCE_VECTOR_INDEX, VectorIndexParams, build_distributed_vector_index, @@ -17,7 +17,7 @@ use crate::{ vector_index_details, }, }; -use futures::future::{BoxFuture, try_join_all}; +use futures::future::BoxFuture; use lance_core::datatypes::format_field_path; use lance_index::progress::{IndexBuildProgress, NoopIndexBuildProgress}; use lance_index::{IndexParams, IndexType, scalar::CreatedIndex}; @@ -32,8 +32,6 @@ use uuid::Uuid; use arrow_array::RecordBatchReader; -use super::{IndexSegment, IndexSegmentPlan}; - /// Generate default index name from field path. /// /// Joins field names with `.` to create the base index name. @@ -476,42 +474,15 @@ impl<'a> CreateIndexBuilder<'a> { } else { vec![] }; - let transaction = if uses_segment_commit_path(self.index_type) { - let field_id = *new_idx.fields.first().ok_or_else(|| { - Error::internal(format!( - "Index '{}' is missing field ids after build", - new_idx.name - )) - })?; - let segments = self - .dataset - .create_index_segment_builder() - .with_segments(vec![new_idx.clone()]) - .build_all() - .await?; - let new_indices = - build_index_metadata_from_segments(self.dataset, &new_idx.name, field_id, segments) - .await?; - TransactionBuilder::new( - new_idx.dataset_version, - Operation::CreateIndex { - new_indices, - removed_indices, - }, - ) - .transaction_properties(self.transaction_properties.clone()) - .build() - } else { - TransactionBuilder::new( - new_idx.dataset_version, - Operation::CreateIndex { - new_indices: vec![new_idx], - removed_indices, - }, - ) - .transaction_properties(self.transaction_properties.clone()) - .build() - }; + let transaction = TransactionBuilder::new( + new_idx.dataset_version, + Operation::CreateIndex { + new_indices: vec![new_idx], + removed_indices, + }, + ) + .transaction_properties(self.transaction_properties.clone()) + .build(); self.dataset .apply_commit(transaction, &Default::default(), &Default::default()) @@ -533,20 +504,6 @@ impl<'a> CreateIndexBuilder<'a> { } } -fn uses_segment_commit_path(index_type: IndexType) -> bool { - matches!( - index_type, - IndexType::Vector - | IndexType::IvfPq - | IndexType::IvfSq - | IndexType::IvfFlat - | IndexType::IvfRq - | IndexType::IvfHnswFlat - | IndexType::IvfHnswPq - | IndexType::IvfHnswSq - ) -} - impl<'a> IntoFuture for CreateIndexBuilder<'a> { type Output = Result; type IntoFuture = BoxFuture<'a, Result>; @@ -556,82 +513,6 @@ impl<'a> IntoFuture for CreateIndexBuilder<'a> { } } -/// Build physical index segments from previously-written vector segment outputs. -/// -/// Use [`DatasetIndexExt::create_index_segment_builder`] and then either: -/// -/// - call [`Self::plan`] and orchestrate individual segment builds externally, or -/// - call [`Self::build_all`] to build all segments on the current node. -/// -/// This builder only builds physical segments. Publishing those segments as -/// a logical index still requires [`DatasetIndexExt::commit_existing_index_segments`]. -/// Together these two APIs form the canonical distributed vector segment build workflow. -#[derive(Clone)] -pub struct IndexSegmentBuilder<'a> { - dataset: &'a Dataset, - segments: Vec, - target_segment_bytes: Option, -} - -impl<'a> IndexSegmentBuilder<'a> { - pub(crate) fn new(dataset: &'a Dataset) -> Self { - Self { - dataset, - segments: Vec::new(), - target_segment_bytes: None, - } - } - - /// Provide the segment metadata returned by `execute_uncommitted()`. - /// - /// These segments must already exist in storage and must not have been - /// published into a logical index yet. - pub fn with_segments(mut self, segments: Vec) -> Self { - self.segments = segments; - self - } - - /// Set the target size, in bytes, for merged physical segments. - /// - /// When set, input segments will be grouped into larger physical segments - /// up to approximately this size. When unset, each input segment becomes - /// one physical segment. - pub fn with_target_segment_bytes(mut self, bytes: u64) -> Self { - self.target_segment_bytes = Some(bytes); - self - } - - /// Plan how input segments should be grouped into physical segments. - pub async fn plan(&self) -> Result> { - if self.segments.is_empty() { - return Err(Error::invalid_input( - "IndexSegmentBuilder requires at least one segment; \ - call with_segments(...) with execute_uncommitted() outputs" - .to_string(), - )); - } - - crate::index::vector::ivf::plan_segments(&self.segments, None, self.target_segment_bytes) - .await - } - - /// Build one segment from a previously-generated plan. - pub async fn build(&self, plan: &IndexSegmentPlan) -> Result { - crate::index::vector::ivf::build_segment( - self.dataset.object_store(), - &self.dataset.indices_dir(), - plan, - ) - .await - } - - /// Plan and build all segments from the provided inputs. - pub async fn build_all(&self) -> Result> { - let plans = self.plan().await?; - try_join_all(plans.iter().map(|plan| self.build(plan))).await - } -} - #[cfg(test)] mod tests { use super::*; @@ -1152,27 +1033,8 @@ mod tests { input_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_segments(input_segments.clone()) - .build_all() - .await - .unwrap(); - assert_eq!(segments.len(), fragments.len()); - let mut built_segment_ids = segments - .iter() - .map(|segment| segment.uuid()) - .collect::>(); - built_segment_ids.sort(); - let mut input_segment_ids = input_segments - .iter() - .map(|segment| segment.uuid) - .collect::>(); - input_segment_ids.sort(); - assert_eq!(built_segment_ids, input_segment_ids); - dataset - .commit_existing_index_segments("vector_idx", "vector", segments) + .commit_existing_index_segments("vector_idx", "vector", input_segments) .await .unwrap(); @@ -1202,7 +1064,7 @@ mod tests { } #[tokio::test] - async fn test_index_segment_builder_vector_commits_multi_segment_logical_index() { + async fn test_merge_existing_index_segments_vector_commits_single_logical_index() { let tmpdir = TempStrDir::default(); let dataset_uri = format!("file://{}", tmpdir.as_str()); @@ -1247,21 +1109,18 @@ mod tests { input_segments.push(segment); } - let segments = dataset - .create_index_segment_builder() - .with_segments(input_segments) - .build_all() + let segment = dataset + .merge_existing_index_segments(input_segments) .await .unwrap(); - assert_eq!(segments.len(), 2); dataset - .commit_existing_index_segments("vector_idx", "vector", segments) + .commit_existing_index_segments("vector_idx", "vector", vec![segment]) .await .unwrap(); let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); - assert_eq!(indices.len(), 2); + assert_eq!(indices.len(), 1); let mut committed_fragment_sets = indices .iter() .map(|metadata| { @@ -1274,7 +1133,7 @@ mod tests { }) .collect::>(); committed_fragment_sets.sort(); - assert_eq!(committed_fragment_sets, vec![vec![0], vec![1]]); + assert_eq!(committed_fragment_sets, vec![vec![0, 1]]); let query_batch = dataset .scan() @@ -1332,24 +1191,17 @@ mod tests { HnswBuildParams::default(), ); - CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) - .name("vector_idx".to_string()) - .index_uuid(uuid.to_string()) - .execute_uncommitted() - .await - .unwrap(); + let segment = + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .index_uuid(uuid.to_string()) + .execute_uncommitted() + .await + .unwrap(); + assert_eq!(segment.uuid, uuid); dataset - .commit_existing_index_segments( - "vector_idx", - "vector", - vec![IndexSegment::new( - uuid, - dataset.fragment_bitmap.as_ref().clone(), - Arc::new(vector_index_details()), - IndexType::IvfHnswFlat.version(), - )], - ) + .commit_existing_index_segments("vector_idx", "vector", vec![segment]) .await .unwrap(); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index d58cbae2fcd..d6a4b7b554f 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -89,21 +89,18 @@ use lance_io::{ }; use lance_linalg::distance::{DistanceType, Dot, L2, MetricType}; use lance_linalg::{distance::Normalize, kernels::normalize_fsl_owned}; -use lance_table::format::IndexMetadata as TableIndexMetadata; +use lance_table::format::{IndexMetadata as TableIndexMetadata, list_index_files_with_sizes}; use log::{info, warn}; use object_store::path::Path; use prost::Message; use roaring::RoaringBitmap; use serde::Serialize; use serde_json::json; -use std::collections::HashSet; use std::{any::Any, collections::HashMap, sync::Arc}; use tokio::sync::mpsc; use tracing::instrument; use uuid::Uuid; -use crate::index::{IndexSegment, IndexSegmentPlan}; - pub mod builder; pub mod io; pub mod v2; @@ -1871,181 +1868,66 @@ async fn write_ivf_hnsw_file( Ok(()) } -/// Distributed vector segment build uses three storage-level concepts: -/// -/// - A **segment** is a worker output written by `execute_uncommitted()`. It -/// already lives at its final storage path under `indices//`, -/// but it is not yet published in the manifest. -/// - A **physical segment** is an `IndexSegment` that can be committed into the -/// manifest with `commit_existing_index_segments(...)`. -/// - A **logical index** is the user-visible index identified by name; it may -/// contain one or more physical segments. -/// -/// The segment-build path is therefore: -/// -/// 1. workers build segments -/// 2. the caller groups those segments into one or more physical segments -/// 3. each grouped segment is built from its selected inputs -/// 4. the resulting physical segments are committed as one logical index -/// -/// Each plan says: -/// - which source segments should be consumed together -/// - what the physical segment metadata should look like -/// -/// The planner returns a `Vec` so callers can decide whether -/// to execute the work serially or fan it out externally. -/// -/// This function does not touch storage. It only: -/// - validates that the caller-supplied segment contract is self-consistent -/// - enforces that source fragment coverage is disjoint -/// - groups source segments into physical segments according to -/// `target_segment_bytes` -/// -/// The grouping rule is intentionally simple: -/// - `target_segment_bytes = None`: keep the existing segment boundary, so each -/// input segment becomes one physical segment -/// - `target_segment_bytes = Some(limit)`: greedily pack consecutive source -/// segments until the next source would exceed `limit` -pub(crate) async fn plan_segments( - segments: &[TableIndexMetadata], - requested_index_type: Option, - target_segment_bytes: Option, -) -> Result> { - if let Some(index_type) = requested_index_type - && !matches!( - index_type, - IndexType::IvfFlat - | IndexType::IvfPq - | IndexType::IvfSq - | IndexType::IvfRq - | IndexType::IvfHnswFlat - | IndexType::IvfHnswPq - | IndexType::IvfHnswSq - | IndexType::Vector - ) - { - return Err(Error::invalid_input(format!( - "Unsupported distributed vector segment build type: {}", - index_type - ))); - } - - if let Some(0) = target_segment_bytes { - return Err(Error::invalid_input( - "target_segment_bytes must be greater than zero".to_string(), - )); - } - +/// Merge one caller-defined group of source segments into a single segment. +pub(crate) async fn merge_segments( + object_store: &ObjectStore, + indices_dir: &Path, + segments: Vec, +) -> Result { if segments.is_empty() { return Err(Error::index("No segment metadata was provided".to_string())); } - - let mut sorted_segments = segments.to_vec(); - sorted_segments.sort_by_key(|index| index.uuid); - let mut expected_segment_ids = HashSet::with_capacity(sorted_segments.len()); - for segment in &sorted_segments { - if !expected_segment_ids.insert(segment.uuid) { - return Err(Error::index(format!( - "Distributed vector segment '{}' was provided more than once", - segment.uuid - ))); - } + if segments.len() == 1 { + return Ok(segments.into_iter().next().unwrap()); } - let mut covered_fragments = RoaringBitmap::new(); - for segment in &sorted_segments { - let fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { + let mut merged_segment = segments[0].clone(); + let mut fragment_bitmap = RoaringBitmap::new(); + for segment in &segments { + let source_fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { Error::index(format!( "Segment '{}' is missing fragment coverage", segment.uuid )) })?; - if covered_fragments.intersection_len(fragment_bitmap) > 0 { - return Err(Error::index( - "Distributed vector shards have overlapping fragment coverage".to_string(), - )); - } - covered_fragments |= fragment_bitmap.clone(); - } - - if target_segment_bytes.is_none() { - return sorted_segments - .into_iter() - .map(|segment| build_segment_plan(vec![segment], requested_index_type)) - .collect(); - } - - let target_segment_bytes = target_segment_bytes.unwrap(); - let mut plans = Vec::new(); - let mut current_group = Vec::new(); - let mut current_bytes = 0_u64; - - for segment in sorted_segments { - let source_bytes = estimate_source_index_bytes(&segment); - if !current_group.is_empty() - && current_bytes.saturating_add(source_bytes) > target_segment_bytes - { - plans.push(build_segment_plan( - std::mem::take(&mut current_group), - requested_index_type, - )?); - current_bytes = 0; - } - current_bytes = current_bytes.saturating_add(source_bytes); - current_group.push(segment); - } - - if !current_group.is_empty() { - plans.push(build_segment_plan(current_group, requested_index_type)?); - } - - Ok(plans) -} - -/// Build one planned segment into its output directory. -/// -/// Single-source plans are already materialized and return immediately. For -/// multi-source plans, this function writes a new merged physical segment under -/// `indices//`. -pub(crate) async fn build_segment( - object_store: &ObjectStore, - indices_dir: &Path, - segment_plan: &IndexSegmentPlan, -) -> Result { - let built_segment = segment_plan.segment().clone(); - let segments = segment_plan.segments(); - debug_assert!( - !segments.is_empty(), - "segment plans must have at least one source segment" - ); - - if segments.len() == 1 && segments[0].uuid == built_segment.uuid() { - return Ok(built_segment); + fragment_bitmap |= source_fragment_bitmap.clone(); } - let final_dir = indices_dir.child(built_segment.uuid().to_string()); - merge_segments_to_dir(object_store, indices_dir, &final_dir, segment_plan).await?; + let index_version = infer_source_index_version(&segments)?; + let segment_uuid = Uuid::new_v4(); + let final_dir = indices_dir.child(segment_uuid.to_string()); + merge_segments_to_dir(object_store, indices_dir, &final_dir, &segments).await?; + let files = list_index_files_with_sizes(object_store, &final_dir).await?; - Ok(built_segment) + merged_segment = TableIndexMetadata { + uuid: segment_uuid, + fragment_bitmap: Some(fragment_bitmap), + index_details: Some(Arc::new(crate::index::vector_index_details())), + index_version, + created_at: Some(chrono::Utc::now()), + base_id: None, + files: Some(files), + ..merged_segment + }; + Ok(merged_segment) } /// Merge the selected input segments into `final_dir`. /// -/// Callers must only invoke this helper for multi-source plans. It reads the -/// selected input segments directly from `indices//` and writes -/// the merged auxiliary/index files into `final_dir`. +/// The caller defines the source segment group explicitly. This helper reads +/// those input segments directly from `indices//` and writes the +/// merged auxiliary/index files into `final_dir`. async fn merge_segments_to_dir( object_store: &ObjectStore, indices_dir: &Path, final_dir: &Path, - segment_plan: &IndexSegmentPlan, + segments: &[TableIndexMetadata], ) -> Result<()> { reset_final_segment_dir(object_store, final_dir).await?; - let segments = segment_plan.segments(); debug_assert!( segments.len() > 1, - "merge helper should only be used for multi-source plans" + "merge helper should only be used for multi-source groups" ); let aux_paths = segments @@ -2071,64 +1953,12 @@ async fn merge_segments_to_dir( final_dir, ) .await?; - write_root_vector_index_from_auxiliary( - object_store, - final_dir, - segment_plan.requested_index_type(), - &source_index_paths, - ) - .await?; + write_root_vector_index_from_auxiliary(object_store, final_dir, None, &source_index_paths) + .await?; Ok(()) } -/// Collapse one group of source segments into a single physical-segment plan. -fn build_segment_plan( - group: Vec, - requested_index_type: Option, -) -> Result { - debug_assert!(!group.is_empty()); - let first = &group[0]; - let mut fragment_bitmap = RoaringBitmap::new(); - let mut estimated_bytes = 0_u64; - let mut segments = Vec::with_capacity(group.len()); - - for segment in &group { - let source_fragment_bitmap = segment.fragment_bitmap.as_ref().ok_or_else(|| { - Error::index(format!( - "Segment '{}' is missing fragment coverage", - segment.uuid - )) - })?; - fragment_bitmap |= source_fragment_bitmap.clone(); - estimated_bytes = estimated_bytes.saturating_add(estimate_source_index_bytes(segment)); - segments.push(segment.clone()); - } - - let segment_uuid = if group.len() == 1 { - first.uuid - } else { - Uuid::new_v4() - }; - let index_version = match requested_index_type { - Some(index_type) => index_type.version(), - None => infer_source_index_version(&group)?, - }; - let segment = IndexSegment::new( - segment_uuid, - fragment_bitmap, - Arc::new(crate::index::vector_index_details()), - index_version, - ); - - Ok(IndexSegmentPlan::new( - segment, - segments, - estimated_bytes, - requested_index_type, - )) -} - fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { debug_assert!(!group.is_empty()); let first = group[0].index_version; @@ -2140,14 +1970,6 @@ fn infer_source_index_version(group: &[TableIndexMetadata]) -> Result { Ok(first) } -fn estimate_source_index_bytes(index_metadata: &TableIndexMetadata) -> u64 { - index_metadata - .files - .as_ref() - .map(|files| files.iter().map(|file| file.size_bytes).sum()) - .unwrap_or(0) -} - /// Best-effort reset of one target directory before rewriting it. async fn reset_final_segment_dir(object_store: &ObjectStore, final_dir: &Path) -> Result<()> { match object_store.remove_dir_all(final_dir.clone()).await { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 01577b11936..59e98de75b1 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -628,9 +628,7 @@ mod tests { use crate::dataset::{InsertBuilder, UpdateBuilder, WriteMode, WriteParams}; use crate::index::DatasetIndexExt; use crate::index::DatasetIndexInternalExt; - use crate::index::IndexSegment; use crate::index::vector::ivf::v2::IvfPq; - use crate::index::vector::ivf::{build_segment, plan_segments}; use crate::utils::test::copy_test_data_to_tmp; use crate::{ Dataset, @@ -1508,8 +1506,8 @@ mod tests { let segments = build_segments_for_fragment_groups(dataset, fragment_groups, ¶ms, index_name).await; - let built_segments = build_distributed_segments(dataset, &segments, None, index_name).await; - assert!(!built_segments.is_empty()); + let committed_segments = build_distributed_segments(dataset, segments, index_name).await; + assert!(!committed_segments.is_empty()); } fn assert_centroids_equal(reference: &serde_json::Value, candidate: &serde_json::Value) { @@ -1585,32 +1583,17 @@ mod tests { assert_eq!(sizes_a, sizes_b, "aggregated partition sizes mismatch"); } - /// Execute the internal segment workflow used by the - /// regression tests: plan segment groups from caller-provided segment - /// metadata, build each segment, and publish them as one logical index. + /// Commit caller-defined segment groups as one logical index. async fn build_distributed_segments( dataset: &mut Dataset, - segments: &[IndexMetadata], - target_segment_bytes: Option, + segments: Vec, index_name: &str, - ) -> Vec { - let segment_plans = plan_segments(segments, None, target_segment_bytes) - .await - .unwrap(); - let mut built_segments = Vec::with_capacity(segment_plans.len()); - for plan in &segment_plans { - built_segments.push( - build_segment(dataset.object_store(), &dataset.indices_dir(), plan) - .await - .unwrap(), - ); - } + ) -> Vec { dataset - .commit_existing_index_segments(index_name, "vector", built_segments.clone()) + .commit_existing_index_segments(index_name, "vector", segments.clone()) .await .unwrap(); - - built_segments + segments } #[tokio::test] @@ -1812,12 +1795,12 @@ mod tests { INDEX_NAME, ) .await; - let segments = build_distributed_segments(&mut ds_split, &segments, None, INDEX_NAME).await; + let segments = build_distributed_segments(&mut ds_split, segments, INDEX_NAME).await; assert_eq!(segments.len(), expected_segment_count); for segment in &segments { let segment_index = ds_split .indices_dir() - .child(segment.uuid().to_string()) + .child(segment.uuid.to_string()) .child(crate::index::INDEX_FILE_NAME); assert!( ds_split @@ -1957,20 +1940,15 @@ mod tests { ) .await; - let shard_plan = plan_segments(&segments, None, None).await.unwrap(); - let shard_count = shard_plan.len(); - assert!(shard_count >= 4); - let target_segment_bytes = shard_plan[0].estimated_bytes().saturating_mul(2); - - let grouped_plan = plan_segments(&segments, None, Some(target_segment_bytes)) - .await - .unwrap(); - assert!(grouped_plan.len() < shard_count); - assert!(grouped_plan.iter().any(|plan| plan.segments().len() > 1)); - let mut expected_fragment_coverage = grouped_plan + assert!(segments.len() >= 4); + let grouped_inputs = segments + .chunks(2) + .map(|group| group.to_vec()) + .collect::>(); + let mut expected_fragment_coverage = grouped_inputs .iter() - .map(|plan| { - plan.segments() + .map(|group| { + group .iter() .flat_map(|partial| { partial @@ -1985,17 +1963,26 @@ mod tests { .collect::>(); expected_fragment_coverage.sort(); - let grouped_segments = build_distributed_segments( - &mut ds_split, - &segments, - Some(target_segment_bytes), - INDEX_NAME, + let grouped_segments = futures::future::try_join_all( + grouped_inputs + .into_iter() + .map(|group| ds_split.merge_existing_index_segments(group)), ) - .await; - assert_eq!(grouped_segments.len(), grouped_plan.len()); + .await + .unwrap(); + let grouped_segments = + build_distributed_segments(&mut ds_split, grouped_segments, INDEX_NAME).await; + assert_eq!(grouped_segments.len(), expected_fragment_coverage.len()); let mut actual_fragment_coverage = grouped_segments .iter() - .map(|segment| segment.fragment_bitmap().iter().collect::>()) + .map(|segment| { + segment + .fragment_bitmap + .as_ref() + .unwrap() + .iter() + .collect::>() + }) .collect::>(); actual_fragment_coverage.sort(); assert_eq!( @@ -2089,7 +2076,10 @@ mod tests { segments.push(segment); } - let err = plan_segments(&segments, None, None).await.unwrap_err(); + let err = dataset + .merge_existing_index_segments(segments) + .await + .unwrap_err(); assert!(err.to_string().contains("overlapping fragment coverage")); } @@ -2121,19 +2111,6 @@ mod tests { segments.push(segment); } - let plans = plan_segments(&segments, None, Some(1)).await.unwrap(); - assert_eq!(plans.len(), fragments.iter().take(2).count()); - - let mut segments = Vec::with_capacity(plans.len()); - for plan in &plans { - segments.push( - build_segment(dataset.object_store(), &dataset.indices_dir(), plan) - .await - .unwrap(), - ); - } - assert_eq!(segments.len(), plans.len()); - dataset .commit_existing_index_segments("vector_idx", "vector", segments) .await From 8d98aa1e779434b6f89f36c8c112d96ed30c22e0 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sun, 29 Mar 2026 16:09:05 +0800 Subject: [PATCH 2/5] fix: accept python vector metadata in segment merge --- rust/lance/src/index.rs | 13 +++++--- rust/lance/src/index/create.rs | 59 ++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 5d8d42b3555..f31dca361d6 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -833,10 +833,15 @@ impl DatasetIndexExt for Dataset { )); } if !source_segments.iter().all(|segment| { - segment - .index_details - .as_ref() - .is_some_and(|details| type_name_from_uri(&details.type_url) == "Vector") + segment.index_details.as_ref().map_or_else( + || { + segment + .files + .as_ref() + .is_some_and(|files| files.iter().any(|file| file.path == INDEX_FILE_NAME)) + }, + |details| details.type_url.ends_with("VectorIndexDetails"), + ) }) { return Err(Error::invalid_input( "merge_existing_index_segments currently only supports vector segments".to_string(), diff --git a/rust/lance/src/index/create.rs b/rust/lance/src/index/create.rs index 058f4e3a69c..73407ef8d69 100644 --- a/rust/lance/src/index/create.rs +++ b/rust/lance/src/index/create.rs @@ -1157,6 +1157,65 @@ mod tests { assert!(result.num_rows() > 0); } + #[tokio::test] + async fn test_merge_existing_index_segments_accepts_python_round_tripped_metadata() { + let tmpdir = TempStrDir::default(); + let dataset_uri = format!("file://{}", tmpdir.as_str()); + + let reader = gen_batch() + .col("id", lance_datagen::array::step::()) + .col( + "vector", + lance_datagen::array::rand_vec::(lance_datagen::Dimension::from(16)), + ) + .into_reader_rows( + lance_datagen::RowCount::from(256), + lance_datagen::BatchCount::from(4), + ); + let mut dataset = Dataset::write( + reader, + &dataset_uri, + Some(WriteParams { + max_rows_per_file: 64, + mode: WriteMode::Overwrite, + ..Default::default() + }), + ) + .await + .unwrap(); + + let fragments = dataset.get_fragments(); + assert!(fragments.len() >= 2); + let params = VectorIndexParams::with_ivf_flat_params( + DistanceType::L2, + prepare_vector_ivf(&dataset, "vector").await, + ); + let mut input_segments = Vec::new(); + + for fragment in fragments.iter().take(2) { + let mut segment = + CreateIndexBuilder::new(&mut dataset, &["vector"], IndexType::Vector, ¶ms) + .name("vector_idx".to_string()) + .fragments(vec![fragment.id() as u32]) + .execute_uncommitted() + .await + .unwrap(); + segment.index_details = None; + input_segments.push(segment); + } + + let merged_segment = dataset + .merge_existing_index_segments(input_segments) + .await + .unwrap(); + assert!( + merged_segment + .fragment_bitmap + .as_ref() + .is_some_and(|bitmap| bitmap.iter().collect::>() == vec![0, 1]) + ); + } + #[tokio::test] async fn test_commit_existing_index_supports_local_hnsw_segments() { let tmpdir = TempStrDir::default(); From d8df38297b5c87ce873922986f155e3c810fa2f5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 30 Mar 2026 15:33:00 +0800 Subject: [PATCH 3/5] test: fix python merged segment assertion --- python/python/tests/test_vector_index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 3da73269220..70d3c6e8b81 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -2741,8 +2741,8 @@ def test_merge_existing_index_segments_builds_vector_segment(tmp_path): ] merged_segment = ds.merge_existing_index_segments(segments) - assert merged_segment.fragment_bitmap is not None - assert sorted(merged_segment.fragment_bitmap) == sorted( + assert merged_segment.fragment_ids is not None + assert sorted(merged_segment.fragment_ids) == sorted( [fragment.fragment_id for fragment in frags[:2]] ) ds = ds.commit_existing_index_segments("vector_idx", "vector", [merged_segment]) From e099b24a1d56c41020daf5715c37119c653dc4cc Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 30 Mar 2026 15:42:31 +0800 Subject: [PATCH 4/5] style: apply CI rustfmt output --- java/lance-jni/src/blocking_dataset.rs | 6 +----- python/src/indices.rs | 12 ++++-------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index 43edf5515ac..eac5f195827 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -1095,11 +1095,7 @@ fn inner_merge_existing_index_segments<'local>( let merged_segment = { let dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?; - RT.block_on( - dataset_guard - .inner - .merge_existing_index_segments(segments), - )? + RT.block_on(dataset_guard.inner.merge_existing_index_segments(segments))? }; (&merged_segment).into_java(env) } diff --git a/python/src/indices.rs b/python/src/indices.rs index 208a4b67f87..62f3c0c64ec 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -10,9 +10,9 @@ use arrow_array::{Array, FixedSizeListArray}; use arrow_data::ArrayData; use chrono::{DateTime, Utc}; use lance::dataset::Dataset as LanceDataset; +use lance::index::DatasetIndexExt; use lance::index::vector::ivf::builder::write_vector_storage; use lance::index::vector::pq::build_pq_model_in_fragments; -use lance::index::DatasetIndexExt; use lance::io::ObjectStore; use lance_index::progress::NoopIndexBuildProgress; use lance_index::vector::ivf::shuffler::{IvfShuffler, shuffle_vectors}; @@ -449,13 +449,9 @@ async fn do_load_shuffled_vectors( base_id: None, files: Some(files), }; - ds.commit_existing_index_segments( - index_name, - column, - vec![metadata], - ) - .await - .infer_error()?; + ds.commit_existing_index_segments(index_name, column, vec![metadata]) + .await + .infer_error()?; Ok(()) } From 3fe0f47641faf021770af1592654d2ad7b17dac9 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 31 Mar 2026 16:24:53 +0800 Subject: [PATCH 5/5] style: restore python test spacing --- python/python/tests/test_vector_index.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 1f87fc89cfc..70d3c6e8b81 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -2751,6 +2751,7 @@ def test_merge_existing_index_segments_builds_vector_segment(tmp_path): results = ds.to_table(nearest={"column": "vector", "q": q, "k": 5}) assert 0 < len(results) <= 5 + def test_distributed_ivf_pq_order_invariance(tmp_path: Path): """Ensure distributed IVF_PQ build is invariant to shard build order.""" ds = _make_sample_dataset_base(tmp_path, "dist_ds", 2000, 128) @@ -2846,6 +2847,8 @@ def collect_ids_and_distances(ds_with_index): assert ids_12 == ids_21 for a, b in zip(dists_12, dists_21): assert np.allclose(a, b, atol=1e-6) + + def test_fts_filter_vector_search(tmp_path): # Create dataset with vector and text columns ids = list(range(1, 301))