From b33bea0982fefb5c47386fb04c1545aa778529da Mon Sep 17 00:00:00 2001 From: x-tong Date: Sun, 25 Jan 2026 19:07:08 +0800 Subject: [PATCH 1/8] [AURON #1850] Add FlinkArrowUtils for Flink-Arrow type conversion Part 1 of Flink RowData to Arrow conversion implementation. This PR adds the foundational type conversion utilities: - FlinkArrowUtils: Bidirectional conversion between Flink LogicalType and Arrow types - Support for all common Flink types including primitives, temporal, and complex types - Comprehensive unit tests for type conversion --- .gitignore | 5 +- .../auron-flink-runtime/pom.xml | 35 +++ .../auron/flink/arrow/FlinkArrowUtils.java | 193 +++++++++++++++++ .../flink/arrow/FlinkArrowUtilsTest.java | 202 ++++++++++++++++++ 4 files changed, 434 insertions(+), 1 deletion(-) create mode 100644 auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java create mode 100644 auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java diff --git a/.gitignore b/.gitignore index 9bb8dcb54..9f5a6a269 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties .flattened-pom.xml -dependency-reduced-pom.xml \ No newline at end of file +dependency-reduced-pom.xml + +#lsp +*.prefs \ No newline at end of file diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml index 3b5dfea21..4998e04c1 100644 --- a/auron-flink-extension/auron-flink-runtime/pom.xml +++ b/auron-flink-extension/auron-flink-runtime/pom.xml @@ -38,6 +38,41 @@ proto ${project.version} + + + + org.apache.arrow + arrow-c-data + + + org.apache.arrow + arrow-memory-unsafe + + + org.apache.arrow + arrow-vector + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + + org.apache.auron + auron-core + ${project.version} + test-jar + test + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java new file mode 100644 index 000000000..99b4b55e8 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +/** + * Utility class for converting between Flink LogicalType and Arrow types. + */ +public class FlinkArrowUtils { + + /** + * Root allocator for Arrow memory management. + */ + public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(ROOT_ALLOCATOR::close)); + } + + /** + * Creates a child allocator from the root allocator. + * + * @param name Name for the child allocator + * @return A new child allocator + */ + public static BufferAllocator createChildAllocator(String name) { + return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE); + } + + /** + * Converts a Flink LogicalType to Arrow ArrowType. + * + * @param logicalType The Flink logical type + * @return The corresponding Arrow type + * @throws UnsupportedOperationException if the type is not supported + */ + public static ArrowType toArrowType(LogicalType logicalType) { + if (logicalType instanceof NullType) { + return ArrowType.Null.INSTANCE; + } else if (logicalType instanceof BooleanType) { + return ArrowType.Bool.INSTANCE; + } else if (logicalType instanceof TinyIntType) { + return new ArrowType.Int(8, true); + } else if (logicalType instanceof SmallIntType) { + return new ArrowType.Int(16, true); + } else if (logicalType instanceof IntType) { + return new ArrowType.Int(32, true); + } else if (logicalType instanceof BigIntType) { + return new ArrowType.Int(64, true); + } else if (logicalType instanceof FloatType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + } else if (logicalType instanceof DoubleType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + } else if (logicalType instanceof VarCharType || logicalType instanceof CharType) { + return ArrowType.Utf8.INSTANCE; + } else if (logicalType instanceof VarBinaryType || logicalType instanceof BinaryType) { + return ArrowType.Binary.INSTANCE; + } else if (logicalType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) logicalType; + // Note: Arrow Java only has DecimalVector (128-bit) and Decimal256Vector (256-bit). + // There's no Decimal64Vector, so we always use 128-bit to match the actual storage. + // Setting bitWidth=64 would cause FFI export issues since the actual data is 128-bit. + return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128); + } else if (logicalType instanceof DateType) { + return new ArrowType.Date(DateUnit.DAY); + } else if (logicalType instanceof TimeType) { + // Flink TimeType stores time as milliseconds (int), convert to Arrow Time64 (microseconds) + return new ArrowType.Time(TimeUnit.MICROSECOND, 64); + } else if (logicalType instanceof TimestampType) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else if (logicalType instanceof LocalZonedTimestampType) { + // LocalZonedTimestampType is similar to TimestampType but with UTC timezone + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + } else { + throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString()); + } + } + + /** + * Converts a Flink LogicalType to an Arrow Field. + * + * @param name The field name + * @param logicalType The Flink logical type + * @param nullable Whether the field is nullable + * @return The corresponding Arrow Field + */ + public static Field toArrowField(String name, LogicalType logicalType, boolean nullable) { + if (logicalType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) logicalType; + LogicalType elementType = arrayType.getElementType(); + FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null); + Field elementField = toArrowField("element", elementType, elementType.isNullable()); + List children = new ArrayList<>(); + children.add(elementField); + return new Field(name, fieldType, children); + } else if (logicalType instanceof RowType) { + RowType rowType = (RowType) logicalType; + FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null); + List children = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + children.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + } + return new Field(name, fieldType, children); + } else if (logicalType instanceof MapType) { + MapType mapType = (MapType) logicalType; + LogicalType keyType = mapType.getKeyType(); + LogicalType valueType = mapType.getValueType(); + + // Create entries field (struct) + FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null); + List entriesChildren = new ArrayList<>(); + entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType, false)); + entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType, valueType.isNullable())); + Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren); + + // Create map field + FieldType mapFieldType = new FieldType(nullable, new ArrowType.Map(false), null); + List mapChildren = new ArrayList<>(); + mapChildren.add(entriesField); + return new Field(name, mapFieldType, mapChildren); + } else { + ArrowType arrowType = toArrowType(logicalType); + FieldType fieldType = new FieldType(nullable, arrowType, null); + return new Field(name, fieldType, new ArrayList<>()); + } + } + + /** + * Converts a Flink RowType to an Arrow Schema. + * + * @param rowType The Flink row type + * @return The corresponding Arrow Schema + */ + public static Schema toArrowSchema(RowType rowType) { + List fields = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + fields.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + } + return new Schema(fields); + } + + private FlinkArrowUtils() { + // Utility class + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java new file mode 100644 index 000000000..17bce4dba --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowUtils. */ +public class FlinkArrowUtilsTest { + + @Test + public void testBasicTypeConversion() { + // Boolean + assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new BooleanType())); + + // Integer types + assertEquals(new ArrowType.Int(8, true), FlinkArrowUtils.toArrowType(new TinyIntType())); + assertEquals(new ArrowType.Int(16, true), FlinkArrowUtils.toArrowType(new SmallIntType())); + assertEquals(new ArrowType.Int(32, true), FlinkArrowUtils.toArrowType(new IntType())); + assertEquals(new ArrowType.Int(64, true), FlinkArrowUtils.toArrowType(new BigIntType())); + + // Floating point types + assertEquals( + new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), + FlinkArrowUtils.toArrowType(new FloatType())); + assertEquals( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + FlinkArrowUtils.toArrowType(new DoubleType())); + + // String and binary types + assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new VarCharType(100))); + assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new CharType(10))); + assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new VarBinaryType(100))); + assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new BinaryType(10))); + + // Decimal type + DecimalType decimalType = new DecimalType(10, 2); + ArrowType arrowDecimal = FlinkArrowUtils.toArrowType(decimalType); + assertTrue(arrowDecimal instanceof ArrowType.Decimal); + assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision()); + assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale()); + + // Date and timestamp types + assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType())); + assertEquals( + new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), FlinkArrowUtils.toArrowType(new TimestampType(3))); + } + + @Test + public void testArrayTypeConversion() { + ArrayType arrayType = new ArrayType(new IntType()); + Field field = FlinkArrowUtils.toArrowField("test_array", arrayType, true); + + assertEquals("test_array", field.getName()); + assertTrue(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.List); + assertEquals(1, field.getChildren().size()); + + Field elementField = field.getChildren().get(0); + assertEquals("element", elementField.getName()); + assertTrue(elementField.getType() instanceof ArrowType.Int); + } + + @Test + public void testRowTypeConversion() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + + Field field = FlinkArrowUtils.toArrowField("test_row", rowType, false); + + assertEquals("test_row", field.getName()); + assertFalse(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.Struct); + assertEquals(2, field.getChildren().size()); + + Field idField = field.getChildren().get(0); + assertEquals("id", idField.getName()); + assertTrue(idField.getType() instanceof ArrowType.Int); + + Field nameField = field.getChildren().get(1); + assertEquals("name", nameField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType()); + } + + @Test + public void testMapTypeConversion() { + MapType mapType = new MapType(new VarCharType(100), new IntType()); + Field field = FlinkArrowUtils.toArrowField("test_map", mapType, true); + + assertEquals("test_map", field.getName()); + assertTrue(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.Map); + assertEquals(1, field.getChildren().size()); + + Field entriesField = field.getChildren().get(0); + assertEquals("entries", entriesField.getName()); + assertTrue(entriesField.getType() instanceof ArrowType.Struct); + assertEquals(2, entriesField.getChildren().size()); + + Field keyField = entriesField.getChildren().get(0); + assertEquals("key", keyField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, keyField.getType()); + + Field valueField = entriesField.getChildren().get(1); + assertEquals("value", valueField.getName()); + assertTrue(valueField.getType() instanceof ArrowType.Int); + } + + @Test + public void testSchemaConversion() { + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType()}, + new String[] {"id", "name", "score"}); + + Schema schema = FlinkArrowUtils.toArrowSchema(rowType); + + assertEquals(3, schema.getFields().size()); + + Field idField = schema.getFields().get(0); + assertEquals("id", idField.getName()); + assertTrue(idField.getType() instanceof ArrowType.Int); + + Field nameField = schema.getFields().get(1); + assertEquals("name", nameField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType()); + + Field scoreField = schema.getFields().get(2); + assertEquals("score", scoreField.getName()); + assertTrue(scoreField.getType() instanceof ArrowType.FloatingPoint); + } + + @Test + public void testTimeTypeConversion() { + TimeType timeType = new TimeType(3); + ArrowType arrowType = FlinkArrowUtils.toArrowType(timeType); + assertTrue(arrowType instanceof ArrowType.Time); + ArrowType.Time timeArrowType = (ArrowType.Time) arrowType; + assertEquals(TimeUnit.MICROSECOND, timeArrowType.getUnit()); + assertEquals(64, timeArrowType.getBitWidth()); + } + + @Test + public void testLocalZonedTimestampTypeConversion() { + LocalZonedTimestampType lzType = new LocalZonedTimestampType(6); + ArrowType arrowType = FlinkArrowUtils.toArrowType(lzType); + assertTrue(arrowType instanceof ArrowType.Timestamp); + ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; + assertEquals(TimeUnit.MICROSECOND, tsType.getUnit()); + assertEquals("UTC", tsType.getTimezone()); + } + + @Test + public void testUnsupportedTypeThrowsException() { + // RawType is not supported + assertThrows( + UnsupportedOperationException.class, + () -> FlinkArrowUtils.toArrowType(new RawType<>(String.class, StringSerializer.INSTANCE))); + } +} From 7de5c88208bd0e16a65d9345d055e5dd9da0d9ca Mon Sep 17 00:00:00 2001 From: xTong Date: Mon, 26 Jan 2026 22:11:24 +0800 Subject: [PATCH 2/8] Update auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index 99b4b55e8..f463cffb6 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -52,7 +52,7 @@ import org.apache.flink.table.types.logical.VarCharType; /** - * Utility class for converting between Flink LogicalType and Arrow types. + * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas. */ public class FlinkArrowUtils { From 666fc8578a20d4c5b9f464a5869b953711033ca9 Mon Sep 17 00:00:00 2001 From: x-tong Date: Tue, 27 Jan 2026 21:03:45 +0800 Subject: [PATCH 3/8] Fix code formatting --- .../java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index f463cffb6..651f911b6 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.List; - import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.complex.MapVector; @@ -146,7 +145,8 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null); List children = new ArrayList<>(); for (RowType.RowField field : rowType.getFields()) { - children.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + children.add(toArrowField( + field.getName(), field.getType(), field.getType().isNullable())); } return new Field(name, fieldType, children); } else if (logicalType instanceof MapType) { @@ -182,7 +182,8 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n public static Schema toArrowSchema(RowType rowType) { List fields = new ArrayList<>(); for (RowType.RowField field : rowType.getFields()) { - fields.add(toArrowField(field.getName(), field.getType(), field.getType().isNullable())); + fields.add(toArrowField( + field.getName(), field.getType(), field.getType().isNullable())); } return new Schema(fields); } From 9f6cfc6a97fb6c43b27aea2608c2fe273f3a3f79 Mon Sep 17 00:00:00 2001 From: x-tong Date: Wed, 28 Jan 2026 02:07:20 +0800 Subject: [PATCH 4/8] Address PR review comments for FlinkArrowUtils - Mark class as final to prevent subclassing - Add null check for logicalType parameter to throw IllegalArgumentException instead of NPE --- .../java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index 651f911b6..b20e5e626 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -53,7 +53,7 @@ /** * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas. */ -public class FlinkArrowUtils { +public final class FlinkArrowUtils { /** * Root allocator for Arrow memory management. @@ -82,6 +82,9 @@ public static BufferAllocator createChildAllocator(String name) { * @throws UnsupportedOperationException if the type is not supported */ public static ArrowType toArrowType(LogicalType logicalType) { + if (logicalType == null) { + throw new IllegalArgumentException("logicalType cannot be null"); + } if (logicalType instanceof NullType) { return ArrowType.Null.INSTANCE; } else if (logicalType instanceof BooleanType) { From 54875fe64fadc5cec01951ec48b9b469c43fabd5 Mon Sep 17 00:00:00 2001 From: x-tong Date: Wed, 28 Jan 2026 15:51:42 +0800 Subject: [PATCH 5/8] Add test coverage for NullType and Decimal bitWidth - Add test case for NullType to Arrow conversion - Add assertion to verify Decimal bitWidth is 128 --- .../org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java index 17bce4dba..ca47d7159 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -38,6 +38,7 @@ import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; import org.apache.flink.table.types.logical.RawType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.SmallIntType; @@ -53,6 +54,9 @@ public class FlinkArrowUtilsTest { @Test public void testBasicTypeConversion() { + // Null + assertEquals(ArrowType.Null.INSTANCE, FlinkArrowUtils.toArrowType(new NullType())); + // Boolean assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new BooleanType())); @@ -82,6 +86,7 @@ public void testBasicTypeConversion() { assertTrue(arrowDecimal instanceof ArrowType.Decimal); assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision()); assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale()); + assertEquals(128, ((ArrowType.Decimal) arrowDecimal).getBitWidth()); // Date and timestamp types assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType())); From 2bcd7d53226f4aaaa5e82102ed8ad7c4f2166acb Mon Sep 17 00:00:00 2001 From: x-tong Date: Fri, 30 Jan 2026 17:55:35 +0800 Subject: [PATCH 6/8] Refactor toArrowField to use logicalType.isNullable() directly - Remove redundant nullable parameter from toArrowField method - Use logicalType.isNullable() to determine field nullability - Improve TimeType conversion with precision-based Arrow type selection - Update tests to reflect API changes --- .../auron/flink/arrow/FlinkArrowUtils.java | 29 +++++++++------ .../flink/arrow/FlinkArrowUtilsTest.java | 37 ++++++++++++++----- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index b20e5e626..7423313ff 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -114,8 +114,17 @@ public static ArrowType toArrowType(LogicalType logicalType) { } else if (logicalType instanceof DateType) { return new ArrowType.Date(DateUnit.DAY); } else if (logicalType instanceof TimeType) { - // Flink TimeType stores time as milliseconds (int), convert to Arrow Time64 (microseconds) - return new ArrowType.Time(TimeUnit.MICROSECOND, 64); + TimeType timeType = (TimeType) logicalType; + int precision = timeType.getPrecision(); + if (precision == 0) { + return new ArrowType.Time(TimeUnit.SECOND, 32); + } else if (precision >= 1 && precision <= 3) { + return new ArrowType.Time(TimeUnit.MILLISECOND, 32); + } else if (precision >= 4 && precision <= 6) { + return new ArrowType.Time(TimeUnit.MICROSECOND, 64); + } else { + return new ArrowType.Time(TimeUnit.NANOSECOND, 64); + } } else if (logicalType instanceof TimestampType) { return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); } else if (logicalType instanceof LocalZonedTimestampType) { @@ -131,15 +140,15 @@ public static ArrowType toArrowType(LogicalType logicalType) { * * @param name The field name * @param logicalType The Flink logical type - * @param nullable Whether the field is nullable * @return The corresponding Arrow Field */ - public static Field toArrowField(String name, LogicalType logicalType, boolean nullable) { + public static Field toArrowField(String name, LogicalType logicalType) { + boolean nullable = logicalType.isNullable(); if (logicalType instanceof ArrayType) { ArrayType arrayType = (ArrayType) logicalType; LogicalType elementType = arrayType.getElementType(); FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null); - Field elementField = toArrowField("element", elementType, elementType.isNullable()); + Field elementField = toArrowField("element", elementType); List children = new ArrayList<>(); children.add(elementField); return new Field(name, fieldType, children); @@ -148,8 +157,7 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null); List children = new ArrayList<>(); for (RowType.RowField field : rowType.getFields()) { - children.add(toArrowField( - field.getName(), field.getType(), field.getType().isNullable())); + children.add(toArrowField(field.getName(), field.getType())); } return new Field(name, fieldType, children); } else if (logicalType instanceof MapType) { @@ -160,8 +168,8 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n // Create entries field (struct) FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null); List entriesChildren = new ArrayList<>(); - entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType, false)); - entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType, valueType.isNullable())); + entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType.copy(false))); + entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType)); Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren); // Create map field @@ -185,8 +193,7 @@ public static Field toArrowField(String name, LogicalType logicalType, boolean n public static Schema toArrowSchema(RowType rowType) { List fields = new ArrayList<>(); for (RowType.RowField field : rowType.getFields()) { - fields.add(toArrowField( - field.getName(), field.getType(), field.getType().isNullable())); + fields.add(toArrowField(field.getName(), field.getType())); } return new Schema(fields); } diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java index ca47d7159..90de0b551 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -97,7 +97,7 @@ public void testBasicTypeConversion() { @Test public void testArrayTypeConversion() { ArrayType arrayType = new ArrayType(new IntType()); - Field field = FlinkArrowUtils.toArrowField("test_array", arrayType, true); + Field field = FlinkArrowUtils.toArrowField("test_array", arrayType); assertEquals("test_array", field.getName()); assertTrue(field.isNullable()); @@ -113,8 +113,10 @@ public void testArrayTypeConversion() { public void testRowTypeConversion() { RowType rowType = RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + // Create a non-nullable version of the row type + RowType nonNullableRowType = (RowType) rowType.copy(false); - Field field = FlinkArrowUtils.toArrowField("test_row", rowType, false); + Field field = FlinkArrowUtils.toArrowField("test_row", nonNullableRowType); assertEquals("test_row", field.getName()); assertFalse(field.isNullable()); @@ -133,7 +135,7 @@ public void testRowTypeConversion() { @Test public void testMapTypeConversion() { MapType mapType = new MapType(new VarCharType(100), new IntType()); - Field field = FlinkArrowUtils.toArrowField("test_map", mapType, true); + Field field = FlinkArrowUtils.toArrowField("test_map", mapType); assertEquals("test_map", field.getName()); assertTrue(field.isNullable()); @@ -179,12 +181,29 @@ public void testSchemaConversion() { @Test public void testTimeTypeConversion() { - TimeType timeType = new TimeType(3); - ArrowType arrowType = FlinkArrowUtils.toArrowType(timeType); - assertTrue(arrowType instanceof ArrowType.Time); - ArrowType.Time timeArrowType = (ArrowType.Time) arrowType; - assertEquals(TimeUnit.MICROSECOND, timeArrowType.getUnit()); - assertEquals(64, timeArrowType.getBitWidth()); + // Precision 0 -> SECOND, 32-bit + TimeType timeType0 = new TimeType(0); + ArrowType.Time arrowTime0 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType0); + assertEquals(TimeUnit.SECOND, arrowTime0.getUnit()); + assertEquals(32, arrowTime0.getBitWidth()); + + // Precision 1-3 -> MILLISECOND, 32-bit + TimeType timeType3 = new TimeType(3); + ArrowType.Time arrowTime3 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType3); + assertEquals(TimeUnit.MILLISECOND, arrowTime3.getUnit()); + assertEquals(32, arrowTime3.getBitWidth()); + + // Precision 4-6 -> MICROSECOND, 64-bit + TimeType timeType6 = new TimeType(6); + ArrowType.Time arrowTime6 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType6); + assertEquals(TimeUnit.MICROSECOND, arrowTime6.getUnit()); + assertEquals(64, arrowTime6.getBitWidth()); + + // Precision 7+ -> NANOSECOND, 64-bit + TimeType timeType9 = new TimeType(9); + ArrowType.Time arrowTime9 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType9); + assertEquals(TimeUnit.NANOSECOND, arrowTime9.getUnit()); + assertEquals(64, arrowTime9.getBitWidth()); } @Test From 3d866bad5af1e9914f4243aee4c2a5bc3e3c5462 Mon Sep 17 00:00:00 2001 From: x-tong Date: Sat, 31 Jan 2026 16:40:37 +0800 Subject: [PATCH 7/8] Support precision-based TimeUnit selection for TimestampType and LocalZonedTimestampType Reference Flink's ArrowUtils implementation to select appropriate Arrow TimeUnit based on timestamp precision: - precision 0: SECOND - precision 1-3: MILLISECOND - precision 4-6: MICROSECOND - precision 7+: NANOSECOND --- .../auron/flink/arrow/FlinkArrowUtils.java | 25 +++++++- .../flink/arrow/FlinkArrowUtilsTest.java | 60 ++++++++++++++++--- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index 7423313ff..a3cffbd8c 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -126,10 +126,29 @@ public static ArrowType toArrowType(LogicalType logicalType) { return new ArrowType.Time(TimeUnit.NANOSECOND, 64); } } else if (logicalType instanceof TimestampType) { - return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + TimestampType timestampType = (TimestampType) logicalType; + int precision = timestampType.getPrecision(); + if (precision == 0) { + return new ArrowType.Timestamp(TimeUnit.SECOND, null); + } else if (precision >= 1 && precision <= 3) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + } else if (precision >= 4 && precision <= 6) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else { + return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null); + } } else if (logicalType instanceof LocalZonedTimestampType) { - // LocalZonedTimestampType is similar to TimestampType but with UTC timezone - return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; + int precision = localZonedTimestampType.getPrecision(); + if (precision == 0) { + return new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"); + } else if (precision >= 1 && precision <= 3) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"); + } else if (precision >= 4 && precision <= 6) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + } else { + return new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"); + } } else { throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString()); } diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java index 90de0b551..ca0b58238 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -88,10 +88,8 @@ public void testBasicTypeConversion() { assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale()); assertEquals(128, ((ArrowType.Decimal) arrowDecimal).getBitWidth()); - // Date and timestamp types + // Date type assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType())); - assertEquals( - new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), FlinkArrowUtils.toArrowType(new TimestampType(3))); } @Test @@ -206,14 +204,58 @@ public void testTimeTypeConversion() { assertEquals(64, arrowTime9.getBitWidth()); } + @Test + public void testTimestampTypeConversion() { + // Precision 0 -> SECOND + TimestampType ts0 = new TimestampType(0); + ArrowType.Timestamp arrowTs0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts0); + assertEquals(TimeUnit.SECOND, arrowTs0.getUnit()); + assertNull(arrowTs0.getTimezone()); + + // Precision 1-3 -> MILLISECOND + TimestampType ts3 = new TimestampType(3); + ArrowType.Timestamp arrowTs3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts3); + assertEquals(TimeUnit.MILLISECOND, arrowTs3.getUnit()); + assertNull(arrowTs3.getTimezone()); + + // Precision 4-6 -> MICROSECOND + TimestampType ts6 = new TimestampType(6); + ArrowType.Timestamp arrowTs6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts6); + assertEquals(TimeUnit.MICROSECOND, arrowTs6.getUnit()); + assertNull(arrowTs6.getTimezone()); + + // Precision 7+ -> NANOSECOND + TimestampType ts9 = new TimestampType(9); + ArrowType.Timestamp arrowTs9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts9); + assertEquals(TimeUnit.NANOSECOND, arrowTs9.getUnit()); + assertNull(arrowTs9.getTimezone()); + } + @Test public void testLocalZonedTimestampTypeConversion() { - LocalZonedTimestampType lzType = new LocalZonedTimestampType(6); - ArrowType arrowType = FlinkArrowUtils.toArrowType(lzType); - assertTrue(arrowType instanceof ArrowType.Timestamp); - ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; - assertEquals(TimeUnit.MICROSECOND, tsType.getUnit()); - assertEquals("UTC", tsType.getTimezone()); + // Precision 0 -> SECOND + LocalZonedTimestampType lzType0 = new LocalZonedTimestampType(0); + ArrowType.Timestamp arrowLz0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType0); + assertEquals(TimeUnit.SECOND, arrowLz0.getUnit()); + assertEquals("UTC", arrowLz0.getTimezone()); + + // Precision 1-3 -> MILLISECOND + LocalZonedTimestampType lzType3 = new LocalZonedTimestampType(3); + ArrowType.Timestamp arrowLz3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType3); + assertEquals(TimeUnit.MILLISECOND, arrowLz3.getUnit()); + assertEquals("UTC", arrowLz3.getTimezone()); + + // Precision 4-6 -> MICROSECOND + LocalZonedTimestampType lzType6 = new LocalZonedTimestampType(6); + ArrowType.Timestamp arrowLz6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType6); + assertEquals(TimeUnit.MICROSECOND, arrowLz6.getUnit()); + assertEquals("UTC", arrowLz6.getTimezone()); + + // Precision 7+ -> NANOSECOND + LocalZonedTimestampType lzType9 = new LocalZonedTimestampType(9); + ArrowType.Timestamp arrowLz9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType9); + assertEquals(TimeUnit.NANOSECOND, arrowLz9.getUnit()); + assertEquals("UTC", arrowLz9.getTimezone()); } @Test From 1e4590b1d43957ad44d743494a9776440e07aa77 Mon Sep 17 00:00:00 2001 From: xTong Date: Tue, 3 Feb 2026 22:54:08 +0800 Subject: [PATCH 8/8] Use null timezone for LocalZonedTimestampType to align with Flink's implementation --- .../org/apache/auron/flink/arrow/FlinkArrowUtils.java | 8 ++++---- .../org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java index a3cffbd8c..0763a8470 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -141,13 +141,13 @@ public static ArrowType toArrowType(LogicalType logicalType) { LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; int precision = localZonedTimestampType.getPrecision(); if (precision == 0) { - return new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"); + return new ArrowType.Timestamp(TimeUnit.SECOND, null); } else if (precision >= 1 && precision <= 3) { - return new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"); + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); } else if (precision >= 4 && precision <= 6) { - return new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); } else { - return new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"); + return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null); } } else { throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString()); diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java index ca0b58238..14116fb5e 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -237,25 +237,25 @@ public void testLocalZonedTimestampTypeConversion() { LocalZonedTimestampType lzType0 = new LocalZonedTimestampType(0); ArrowType.Timestamp arrowLz0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType0); assertEquals(TimeUnit.SECOND, arrowLz0.getUnit()); - assertEquals("UTC", arrowLz0.getTimezone()); + assertNull(arrowLz0.getTimezone()); // Precision 1-3 -> MILLISECOND LocalZonedTimestampType lzType3 = new LocalZonedTimestampType(3); ArrowType.Timestamp arrowLz3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType3); assertEquals(TimeUnit.MILLISECOND, arrowLz3.getUnit()); - assertEquals("UTC", arrowLz3.getTimezone()); + assertNull(arrowLz3.getTimezone()); // Precision 4-6 -> MICROSECOND LocalZonedTimestampType lzType6 = new LocalZonedTimestampType(6); ArrowType.Timestamp arrowLz6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType6); assertEquals(TimeUnit.MICROSECOND, arrowLz6.getUnit()); - assertEquals("UTC", arrowLz6.getTimezone()); + assertNull(arrowLz6.getTimezone()); // Precision 7+ -> NANOSECOND LocalZonedTimestampType lzType9 = new LocalZonedTimestampType(9); ArrowType.Timestamp arrowLz9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType9); assertEquals(TimeUnit.NANOSECOND, arrowLz9.getUnit()); - assertEquals("UTC", arrowLz9.getTimezone()); + assertNull(arrowLz9.getTimezone()); } @Test