Conversation
| def toSparkMap(flussMap: FlussInternalMap, mapType: FlussMapType): SparkMapData = { | ||
| // TODO: support map type in fluss-spark | ||
| throw new UnsupportedOperationException() | ||
| new FlussAsSparkMap(mapType).replace(flussMap) |
There was a problem hiding this comment.
do we need to call InternalRowUtils.copyMap ?
There was a problem hiding this comment.
No. What is needed here is an implementation of SparkMapData.
|
cc @beryllw could you help to review this? |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds Spark ↔ Fluss support for MAP types and updates tests to validate map read/write behavior (closes #2673).
Changes:
- Implement map wrappers/converters:
SparkAsFlussMap,FlussAsSparkMap, and enablegetMapin row/array wrappers. - Extend Spark test schemas and end-to-end read/write tests to include map columns.
- Add/unit-update tests covering map handling in rows, arrays, and the converter layer.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala | Implements getMap for Spark→Fluss row wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussMap.scala | New Spark→Fluss map wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala | Enables map elements inside arrays (Spark→Fluss) |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala | Implements getMap for Fluss→Spark row wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkMap.scala | New Fluss→Spark map wrapper |
| fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala | Implements toSparkMap |
| fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java | Exposes copyMap for reuse by Spark adapter |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/util/TestUtils.scala | Adds a map field to the shared test schema |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala | Extends row test with map assertions |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussArrayTest.scala | Adds array-of-map test coverage |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala | New unit/integration tests for Spark→Fluss map wrapper |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala | Updates row tests to validate getMap |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala | Updates array tests to validate getMap |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkMapTest.scala | New unit/integration tests for Fluss→Spark map wrapper |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala | Adds toSparkMap test, removes “unsupported” test |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala | Adds map literal to write test and asserts map contents |
| fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala | Adds map column to nested-types read test |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussMapTest.scala
Outdated
Show resolved
Hide resolved
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala
Outdated
Show resolved
Hide resolved
|
The failed CI is caused by a flaky UT that is fixed in #3045. |
| def toSparkArray(flussArray: FlussInternalArray, arrayType: FlussArrayType): SparkArrayData = { | ||
| val elementType = arrayType.getElementType | ||
| new FlussAsSparkArray(elementType) | ||
| .replace(InternalRowUtils.copyArray(flussArray, elementType)) |
There was a problem hiding this comment.
fluss/fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java
Lines 178 to 190 in b306ed0
May be we don't need to call InternalRowUtils.copyArray?
There was a problem hiding this comment.
@beryllw Thank you for your comments to make me think further. The main problem here is that toSparkArray and toSparkMap behave inconsistently on whether or not to copy. If we are more rigorous, we should all copy to avoid the 'use-after-free' problem. However, since the data objects converted by the 'DataConverter' will only be used in fluss-spark, we can control it and consider not copying. If we want to remote the copy in toSparkArray method, I think it will be a separate pr.
|
LGTM |
Purpose
Linked issue: close #2673
Brief change log
Tests
API and Format
Documentation