Skip to content

Commit ee98751

Browse files
authored
[index] system table index add field (#7623)
1 parent 369b8d9 commit ee98751

2 files changed

Lines changed: 78 additions & 6 deletions

File tree

paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.disk.IOManager;
2828
import org.apache.paimon.fs.FileIO;
2929
import org.apache.paimon.index.DeletionVectorMeta;
30+
import org.apache.paimon.index.GlobalIndexMeta;
3031
import org.apache.paimon.index.IndexFileHandler;
3132
import org.apache.paimon.index.IndexFileMetaSerializer;
3233
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -84,9 +85,11 @@ public class TableIndexesTable implements ReadonlyTable {
8485
new DataField(4, "file_size", new BigIntType(false)),
8586
new DataField(5, "row_count", new BigIntType(false)),
8687
new DataField(
87-
6,
88-
"dv_ranges",
89-
new ArrayType(true, DeletionVectorMeta.SCHEMA))));
88+
6, "dv_ranges", new ArrayType(true, DeletionVectorMeta.SCHEMA)),
89+
new DataField(7, "row_range_start", new BigIntType(true)),
90+
new DataField(8, "row_range_end", new BigIntType(true)),
91+
new DataField(9, "index_field_id", new IntType(true)),
92+
new DataField(10, "index_field_name", newStringType(true))));
9093

9194
private final FileStoreTable dataTable;
9295

@@ -201,10 +204,16 @@ public RecordReader<InternalRow> createReader(Split split) {
201204
CastExecutors.resolveToString(
202205
dataTable.schema().logicalPartitionType());
203206

207+
RowType logicalRowType = dataTable.schema().logicalRowType();
208+
204209
Iterator<InternalRow> rows =
205210
Iterators.transform(
206211
manifestFileMetas.iterator(),
207-
indexManifestEntry -> toRow(indexManifestEntry, partitionCastExecutor));
212+
indexManifestEntry ->
213+
toRow(
214+
indexManifestEntry,
215+
partitionCastExecutor,
216+
logicalRowType));
208217
if (readType != null) {
209218
rows =
210219
Iterators.transform(
@@ -218,9 +227,18 @@ public RecordReader<InternalRow> createReader(Split split) {
218227

219228
private InternalRow toRow(
220229
IndexManifestEntry indexManifestEntry,
221-
CastExecutor<InternalRow, BinaryString> partitionCastExecutor) {
230+
CastExecutor<InternalRow, BinaryString> partitionCastExecutor,
231+
RowType logicalRowType) {
222232
LinkedHashMap<String, DeletionVectorMeta> dvMetas =
223233
indexManifestEntry.indexFile().dvRanges();
234+
GlobalIndexMeta globalMeta = indexManifestEntry.indexFile().globalIndexMeta();
235+
String indexFieldName = null;
236+
if (globalMeta != null) {
237+
try {
238+
indexFieldName = logicalRowType.getField(globalMeta.indexFieldId()).name();
239+
} catch (RuntimeException ignored) {
240+
}
241+
}
224242
return GenericRow.of(
225243
partitionCastExecutor.cast(indexManifestEntry.partition()),
226244
indexManifestEntry.bucket(),
@@ -230,7 +248,11 @@ private InternalRow toRow(
230248
indexManifestEntry.indexFile().rowCount(),
231249
dvMetas == null
232250
? null
233-
: IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()));
251+
: IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()),
252+
globalMeta != null ? globalMeta.rowRangeStart() : null,
253+
globalMeta != null ? globalMeta.rowRangeEnd() : null,
254+
globalMeta != null ? globalMeta.indexFieldId() : null,
255+
indexFieldName != null ? BinaryString.fromString(indexFieldName) : null);
234256
}
235257
}
236258

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,56 @@ class LuminaVectorIndexTest extends PaimonSparkTestBase {
6868
}
6969
}
7070

71+
test("table_indexes system table - global index metadata") {
72+
withTable("T") {
73+
spark.sql("""
74+
|CREATE TABLE T (id INT, v ARRAY<FLOAT>)
75+
|TBLPROPERTIES (
76+
| 'bucket' = '-1',
77+
| 'global-index.row-count-per-shard' = '10000',
78+
| 'row-tracking.enabled' = 'true',
79+
| 'data-evolution.enabled' = 'true')
80+
|""".stripMargin)
81+
82+
val values = (0 until 100)
83+
.map(
84+
i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
85+
.mkString(",")
86+
spark.sql(s"INSERT INTO T VALUES $values")
87+
88+
spark
89+
.sql(
90+
s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
91+
.collect()
92+
93+
// Query table_indexes system table
94+
val indexRows = spark
95+
.sql("""
96+
|SELECT index_type, row_count, row_range_start, row_range_end,
97+
| index_field_id, index_field_name
98+
|FROM `T$table_indexes`
99+
|WHERE index_type = 'lumina-vector-ann'
100+
|""".stripMargin)
101+
.collect()
102+
103+
assert(indexRows.nonEmpty)
104+
val row = indexRows.head
105+
assert(row.getAs[String]("index_type") == "lumina-vector-ann")
106+
assert(row.getAs[Long]("row_count") == 100L)
107+
assert(row.getAs[Long]("row_range_start") == 0L)
108+
assert(row.getAs[Long]("row_range_end") == 99L)
109+
assert(row.getAs[String]("index_field_name") == "v")
110+
111+
// Verify max row id matches snapshot next_row_id - 1
112+
val nextRowId = spark
113+
.sql("SELECT next_row_id FROM `T$snapshots` ORDER BY snapshot_id DESC LIMIT 1")
114+
.collect()
115+
.head
116+
.getAs[Long]("next_row_id")
117+
assert(row.getAs[Long]("row_range_end") == nextRowId - 1)
118+
}
119+
}
120+
71121
test("create lumina vector index - with partitioned table") {
72122
withTable("T") {
73123
spark.sql("""

0 commit comments

Comments
 (0)