Skip to content

Commit 264a955

Browse files
committed
[spark] Fix column projection on log/upsert read
1 parent 17f5400 commit 264a955

4 files changed

Lines changed: 108 additions & 7 deletions

File tree

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ package org.apache.fluss.spark.read
2020
import org.apache.fluss.client.table.scanner.ScanRecord
2121
import org.apache.fluss.config.Configuration
2222
import org.apache.fluss.metadata.{TableBucket, TablePath}
23+
import org.apache.fluss.row.{InternalRow => FlussInternalRow}
24+
import org.apache.fluss.spark.row.DataConverter
25+
26+
import org.apache.spark.sql.catalyst.InternalRow
2327

2428
/** Partition reader that reads log data from a single Fluss table bucket. */
2529
class FlussAppendPartitionReader(
@@ -29,6 +33,12 @@ class FlussAppendPartitionReader(
2933
flussConfig: Configuration)
3034
extends FlussPartitionReader(tablePath, flussConfig) {
3135

36+
private lazy val projectedRowType = rowType.project(projection)
37+
38+
override protected def convertToSparkRow(flussRow: FlussInternalRow): InternalRow = {
39+
DataConverter.toSparkInternalRow(flussRow, projectedRowType)
40+
}
41+
3242
private val tableBucket: TableBucket = flussPartition.tableBucket
3343
private val partitionId = tableBucket.getPartitionId
3444
private val bucketId = tableBucket.getBucket

fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import org.apache.fluss.config.Configuration
2424
import org.apache.fluss.memory.MemorySegment
2525
import org.apache.fluss.metadata.{TableBucket, TablePath}
2626
import org.apache.fluss.record.LogRecord
27-
import org.apache.fluss.row.{encode, InternalRow, KeyValueRow}
27+
import org.apache.fluss.row.{encode, InternalRow => FlussInternalRow, KeyValueRow}
2828
import org.apache.fluss.spark.SparkFlussConf
29+
import org.apache.fluss.spark.row.DataConverter
2930
import org.apache.fluss.spark.utils.LogChangesIterator
3031
import org.apache.fluss.types.{DataField, RowType}
3132
import org.apache.fluss.utils.CloseableIterator
3233

3334
import org.apache.spark.internal.Logging
35+
import org.apache.spark.sql.catalyst.InternalRow
3436

3537
import java.util.Comparator
3638

@@ -51,6 +53,12 @@ class FlussUpsertPartitionReader(
5153
extends FlussPartitionReader(tablePath, flussConfig)
5254
with Logging {
5355

56+
private lazy val projectedRowType = rowType.project(projectionWithPks)
57+
58+
override protected def convertToSparkRow(flussRow: FlussInternalRow): InternalRow = {
59+
DataConverter.toSparkInternalRow(flussRow, projectedRowType)
60+
}
61+
5462
private val readOptimized = flussConfig.get(SparkFlussConf.READ_OPTIMIZED_OPTION)
5563
private val tableBucket: TableBucket = flussPartition.tableBucket
5664
private val snapshotId: Long = flussPartition.snapshotId
@@ -79,7 +87,7 @@ class FlussUpsertPartitionReader(
7987

8088
private var snapshotScanner: BatchScanner = _
8189
private var logScanner: LogScanner = _
82-
private var mergedIterator: Iterator[InternalRow] = _
90+
private var mergedIterator: Iterator[FlussInternalRow] = _
8391

8492
// initialize scanners
8593
initialize()
@@ -111,8 +119,8 @@ class FlussUpsertPartitionReader(
111119
tableInfo.isDefaultBucketKey)
112120

113121
// Create comparators based on primary key
114-
val comparator = new Comparator[InternalRow] {
115-
override def compare(o1: InternalRow, o2: InternalRow): Int = {
122+
val comparator = new Comparator[FlussInternalRow] {
123+
override def compare(o1: FlussInternalRow, o2: FlussInternalRow): Int = {
116124
val key1 = keyEncoder.encodeKey(o1)
117125
val key2 = keyEncoder.encodeKey(o2)
118126
MemorySegment.wrap(key1).compare(MemorySegment.wrap(key2), 0, 0, key1.length)
@@ -160,7 +168,7 @@ class FlussUpsertPartitionReader(
160168

161169
// Convert snapshot iterator to LogRecord iterator for SortMergeReader
162170
new CloseableIterator[LogRecord] {
163-
private var currentBatch: java.util.Iterator[InternalRow] = _
171+
private var currentBatch: java.util.Iterator[FlussInternalRow] = _
164172
private var hasMoreBatches = true
165173

166174
override def hasNext: Boolean = {
@@ -200,9 +208,9 @@ class FlussUpsertPartitionReader(
200208
createSnapshotIterator()
201209
}
202210

203-
// Create the SortMergeReader
211+
// null: scanners already project rows; passing projectionWithPks here double-projects
204212
val sortMergeReader = new SortMergeReader(
205-
projectionWithPks,
213+
null,
206214
pkProjection,
207215
snapshotIterators,
208216
comparator,

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,42 @@ class SparkLogTableReadTest extends FlussSparkTestBase {
211211
}
212212
}
213213

214+
test("Spark Read: log table projection with type-dependent columns") {
215+
withTable("t") {
216+
sql(s"""
217+
|CREATE TABLE $DEFAULT_DATABASE.t (
218+
|id INT,
219+
|ts TIMESTAMP,
220+
|name STRING,
221+
|arr ARRAY<INT>,
222+
|struct_col STRUCT<col1: INT, col2: STRING>,
223+
|ts_ltz TIMESTAMP_LTZ
224+
|)""".stripMargin)
225+
226+
sql(s"""
227+
|INSERT INTO $DEFAULT_DATABASE.t VALUES
228+
|(1, TIMESTAMP "2026-01-01 12:00:00", "a", ARRAY(1, 2), STRUCT(10, 'x'),
229+
| TIMESTAMP "2026-01-01 12:00:00"),
230+
|(2, TIMESTAMP "2026-01-02 12:00:00", "b", ARRAY(3, 4), STRUCT(20, 'y'),
231+
| TIMESTAMP "2026-01-02 12:00:00")
232+
|""".stripMargin)
233+
234+
// Projection reorders type-dependent columns (array, timestamp, struct)
235+
checkAnswer(
236+
sql(s"SELECT arr, ts, struct_col FROM $DEFAULT_DATABASE.t ORDER BY ts"),
237+
Row(Seq(1, 2), java.sql.Timestamp.valueOf("2026-01-01 12:00:00"), Row(10, "x")) ::
238+
Row(Seq(3, 4), java.sql.Timestamp.valueOf("2026-01-02 12:00:00"), Row(20, "y")) :: Nil
239+
)
240+
241+
// Projection with timestamp_ltz at shifted ordinal
242+
checkAnswer(
243+
sql(s"SELECT ts_ltz, name FROM $DEFAULT_DATABASE.t ORDER BY name"),
244+
Row(java.sql.Timestamp.valueOf("2026-01-01 12:00:00"), "a") ::
245+
Row(java.sql.Timestamp.valueOf("2026-01-02 12:00:00"), "b") :: Nil
246+
)
247+
}
248+
}
249+
214250
test("Spark Read: nested data types table") {
215251
withTable("t") {
216252
// TODO: support map type

fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,53 @@ class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
255255
}
256256
}
257257

258+
test("Spark Read: primary key table projection with type-dependent columns") {
259+
withTable("t") {
260+
val tablePath = createTablePath("t")
261+
sql(s"""
262+
|CREATE TABLE $DEFAULT_DATABASE.t (
263+
|pk INT,
264+
|ts TIMESTAMP,
265+
|name STRING,
266+
|arr ARRAY<INT>,
267+
|struct_col STRUCT<col1: INT, col2: STRING>,
268+
|ts_ltz TIMESTAMP_LTZ
269+
|) TBLPROPERTIES("primary.key" = "pk", "bucket.num" = 1)
270+
|""".stripMargin)
271+
272+
sql(s"""
273+
|INSERT INTO $DEFAULT_DATABASE.t VALUES
274+
|(1, TIMESTAMP "2026-01-01 12:00:00", "a", ARRAY(1, 2), STRUCT(10, 'x'),
275+
| TIMESTAMP "2026-01-01 12:00:00"),
276+
|(2, TIMESTAMP "2026-01-02 12:00:00", "b", ARRAY(3, 4), STRUCT(20, 'y'),
277+
| TIMESTAMP "2026-01-02 12:00:00")
278+
|""".stripMargin)
279+
280+
// Log-only: projection reorders type-dependent columns (PK not in projection)
281+
checkAnswer(
282+
sql(s"SELECT arr, ts, struct_col FROM $DEFAULT_DATABASE.t ORDER BY ts"),
283+
Row(Seq(1, 2), java.sql.Timestamp.valueOf("2026-01-01 12:00:00"), Row(10, "x")) ::
284+
Row(Seq(3, 4), java.sql.Timestamp.valueOf("2026-01-02 12:00:00"), Row(20, "y")) :: Nil
285+
)
286+
287+
// Trigger snapshot, then test with snapshot + log merge
288+
flussServer.triggerAndWaitSnapshot(tablePath)
289+
290+
sql(s"""
291+
|INSERT INTO $DEFAULT_DATABASE.t VALUES
292+
|(1, TIMESTAMP "2026-03-01 12:00:00", "a_updated", ARRAY(10, 20), STRUCT(100, 'xx'),
293+
| TIMESTAMP "2026-03-01 12:00:00")
294+
|""".stripMargin)
295+
296+
// Snapshot + log: projection with type-dependent columns at shifted ordinals
297+
checkAnswer(
298+
sql(s"SELECT ts_ltz, arr, name FROM $DEFAULT_DATABASE.t ORDER BY name"),
299+
Row(java.sql.Timestamp.valueOf("2026-03-01 12:00:00"), Seq(10, 20), "a_updated") ::
300+
Row(java.sql.Timestamp.valueOf("2026-01-02 12:00:00"), Seq(3, 4), "b") :: Nil
301+
)
302+
}
303+
}
304+
258305
private def genInputPartition(
259306
tablePath: TablePath,
260307
partitionName: String): Array[FlussUpsertInputPartition] = {

0 commit comments

Comments
 (0)