diff --git a/.gitignore b/.gitignore
index 9bb8dcb54..9f5a6a269 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties
.flattened-pom.xml
-dependency-reduced-pom.xml
\ No newline at end of file
+dependency-reduced-pom.xml
+
+#lsp
+*.prefs
\ No newline at end of file
diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml
index 3b5dfea21..4998e04c1 100644
--- a/auron-flink-extension/auron-flink-runtime/pom.xml
+++ b/auron-flink-extension/auron-flink-runtime/pom.xml
@@ -38,6 +38,41 @@
proto
${project.version}
+
+
+
+ org.apache.arrow
+ arrow-c-data
+
+
+ org.apache.arrow
+ arrow-memory-unsafe
+
+
+ org.apache.arrow
+ arrow-vector
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.auron
+ auron-core
+ ${project.version}
+ test-jar
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.jupiter.version}
+ test
+
diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
new file mode 100644
index 000000000..0763a8470
--- /dev/null
+++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas.
+ */
+public final class FlinkArrowUtils {
+
+ /**
+ * Root allocator for Arrow memory management.
+ */
+ public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE);
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(ROOT_ALLOCATOR::close));
+ }
+
+ /**
+ * Creates a child allocator from the root allocator.
+ *
+ * @param name Name for the child allocator
+ * @return A new child allocator
+ */
+ public static BufferAllocator createChildAllocator(String name) {
+ return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE);
+ }
+
+ /**
+ * Converts a Flink LogicalType to Arrow ArrowType.
+ *
+ * @param logicalType The Flink logical type
+ * @return The corresponding Arrow type
+ * @throws UnsupportedOperationException if the type is not supported
+ */
+ public static ArrowType toArrowType(LogicalType logicalType) {
+ if (logicalType == null) {
+ throw new IllegalArgumentException("logicalType cannot be null");
+ }
+ if (logicalType instanceof NullType) {
+ return ArrowType.Null.INSTANCE;
+ } else if (logicalType instanceof BooleanType) {
+ return ArrowType.Bool.INSTANCE;
+ } else if (logicalType instanceof TinyIntType) {
+ return new ArrowType.Int(8, true);
+ } else if (logicalType instanceof SmallIntType) {
+ return new ArrowType.Int(16, true);
+ } else if (logicalType instanceof IntType) {
+ return new ArrowType.Int(32, true);
+ } else if (logicalType instanceof BigIntType) {
+ return new ArrowType.Int(64, true);
+ } else if (logicalType instanceof FloatType) {
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ } else if (logicalType instanceof DoubleType) {
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ } else if (logicalType instanceof VarCharType || logicalType instanceof CharType) {
+ return ArrowType.Utf8.INSTANCE;
+ } else if (logicalType instanceof VarBinaryType || logicalType instanceof BinaryType) {
+ return ArrowType.Binary.INSTANCE;
+ } else if (logicalType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) logicalType;
+ // Note: Arrow Java only has DecimalVector (128-bit) and Decimal256Vector (256-bit).
+ // There's no Decimal64Vector, so we always use 128-bit to match the actual storage.
+ // Setting bitWidth=64 would cause FFI export issues since the actual data is 128-bit.
+ return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128);
+ } else if (logicalType instanceof DateType) {
+ return new ArrowType.Date(DateUnit.DAY);
+ } else if (logicalType instanceof TimeType) {
+ TimeType timeType = (TimeType) logicalType;
+ int precision = timeType.getPrecision();
+ if (precision == 0) {
+ return new ArrowType.Time(TimeUnit.SECOND, 32);
+ } else if (precision >= 1 && precision <= 3) {
+ return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ } else if (precision >= 4 && precision <= 6) {
+ return new ArrowType.Time(TimeUnit.MICROSECOND, 64);
+ } else {
+ return new ArrowType.Time(TimeUnit.NANOSECOND, 64);
+ }
+ } else if (logicalType instanceof TimestampType) {
+ TimestampType timestampType = (TimestampType) logicalType;
+ int precision = timestampType.getPrecision();
+ if (precision == 0) {
+ return new ArrowType.Timestamp(TimeUnit.SECOND, null);
+ } else if (precision >= 1 && precision <= 3) {
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ } else if (precision >= 4 && precision <= 6) {
+ return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+ } else {
+ return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
+ }
+ } else if (logicalType instanceof LocalZonedTimestampType) {
+ LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
+ int precision = localZonedTimestampType.getPrecision();
+ if (precision == 0) {
+ return new ArrowType.Timestamp(TimeUnit.SECOND, null);
+ } else if (precision >= 1 && precision <= 3) {
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ } else if (precision >= 4 && precision <= 6) {
+ return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
+ } else {
+ return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString());
+ }
+ }
+
+ /**
+ * Converts a Flink LogicalType to an Arrow Field.
+ *
+ * @param name The field name
+ * @param logicalType The Flink logical type
+ * @return The corresponding Arrow Field
+ */
+ public static Field toArrowField(String name, LogicalType logicalType) {
+ boolean nullable = logicalType.isNullable();
+ if (logicalType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) logicalType;
+ LogicalType elementType = arrayType.getElementType();
+ FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null);
+ Field elementField = toArrowField("element", elementType);
+ List children = new ArrayList<>();
+ children.add(elementField);
+ return new Field(name, fieldType, children);
+ } else if (logicalType instanceof RowType) {
+ RowType rowType = (RowType) logicalType;
+ FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null);
+ List children = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ children.add(toArrowField(field.getName(), field.getType()));
+ }
+ return new Field(name, fieldType, children);
+ } else if (logicalType instanceof MapType) {
+ MapType mapType = (MapType) logicalType;
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+
+ // Create entries field (struct)
+ FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null);
+ List entriesChildren = new ArrayList<>();
+ entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType.copy(false)));
+ entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType));
+ Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren);
+
+ // Create map field
+ FieldType mapFieldType = new FieldType(nullable, new ArrowType.Map(false), null);
+ List mapChildren = new ArrayList<>();
+ mapChildren.add(entriesField);
+ return new Field(name, mapFieldType, mapChildren);
+ } else {
+ ArrowType arrowType = toArrowType(logicalType);
+ FieldType fieldType = new FieldType(nullable, arrowType, null);
+ return new Field(name, fieldType, new ArrayList<>());
+ }
+ }
+
+ /**
+ * Converts a Flink RowType to an Arrow Schema.
+ *
+ * @param rowType The Flink row type
+ * @return The corresponding Arrow Schema
+ */
+ public static Schema toArrowSchema(RowType rowType) {
+ List fields = new ArrayList<>();
+ for (RowType.RowField field : rowType.getFields()) {
+ fields.add(toArrowField(field.getName(), field.getType()));
+ }
+ return new Schema(fields);
+ }
+
+ private FlinkArrowUtils() {
+ // Utility class
+ }
+}
diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
new file mode 100644
index 000000000..14116fb5e
--- /dev/null
+++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.arrow;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for FlinkArrowUtils. */
+public class FlinkArrowUtilsTest {
+
+ @Test
+ public void testBasicTypeConversion() {
+ // Null
+ assertEquals(ArrowType.Null.INSTANCE, FlinkArrowUtils.toArrowType(new NullType()));
+
+ // Boolean
+ assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new BooleanType()));
+
+ // Integer types
+ assertEquals(new ArrowType.Int(8, true), FlinkArrowUtils.toArrowType(new TinyIntType()));
+ assertEquals(new ArrowType.Int(16, true), FlinkArrowUtils.toArrowType(new SmallIntType()));
+ assertEquals(new ArrowType.Int(32, true), FlinkArrowUtils.toArrowType(new IntType()));
+ assertEquals(new ArrowType.Int(64, true), FlinkArrowUtils.toArrowType(new BigIntType()));
+
+ // Floating point types
+ assertEquals(
+ new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE),
+ FlinkArrowUtils.toArrowType(new FloatType()));
+ assertEquals(
+ new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE),
+ FlinkArrowUtils.toArrowType(new DoubleType()));
+
+ // String and binary types
+ assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new VarCharType(100)));
+ assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new CharType(10)));
+ assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new VarBinaryType(100)));
+ assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new BinaryType(10)));
+
+ // Decimal type
+ DecimalType decimalType = new DecimalType(10, 2);
+ ArrowType arrowDecimal = FlinkArrowUtils.toArrowType(decimalType);
+ assertTrue(arrowDecimal instanceof ArrowType.Decimal);
+ assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision());
+ assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale());
+ assertEquals(128, ((ArrowType.Decimal) arrowDecimal).getBitWidth());
+
+ // Date type
+ assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType()));
+ }
+
+ @Test
+ public void testArrayTypeConversion() {
+ ArrayType arrayType = new ArrayType(new IntType());
+ Field field = FlinkArrowUtils.toArrowField("test_array", arrayType);
+
+ assertEquals("test_array", field.getName());
+ assertTrue(field.isNullable());
+ assertTrue(field.getType() instanceof ArrowType.List);
+ assertEquals(1, field.getChildren().size());
+
+ Field elementField = field.getChildren().get(0);
+ assertEquals("element", elementField.getName());
+ assertTrue(elementField.getType() instanceof ArrowType.Int);
+ }
+
+ @Test
+ public void testRowTypeConversion() {
+ RowType rowType =
+ RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"});
+ // Create a non-nullable version of the row type
+ RowType nonNullableRowType = (RowType) rowType.copy(false);
+
+ Field field = FlinkArrowUtils.toArrowField("test_row", nonNullableRowType);
+
+ assertEquals("test_row", field.getName());
+ assertFalse(field.isNullable());
+ assertTrue(field.getType() instanceof ArrowType.Struct);
+ assertEquals(2, field.getChildren().size());
+
+ Field idField = field.getChildren().get(0);
+ assertEquals("id", idField.getName());
+ assertTrue(idField.getType() instanceof ArrowType.Int);
+
+ Field nameField = field.getChildren().get(1);
+ assertEquals("name", nameField.getName());
+ assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType());
+ }
+
+ @Test
+ public void testMapTypeConversion() {
+ MapType mapType = new MapType(new VarCharType(100), new IntType());
+ Field field = FlinkArrowUtils.toArrowField("test_map", mapType);
+
+ assertEquals("test_map", field.getName());
+ assertTrue(field.isNullable());
+ assertTrue(field.getType() instanceof ArrowType.Map);
+ assertEquals(1, field.getChildren().size());
+
+ Field entriesField = field.getChildren().get(0);
+ assertEquals("entries", entriesField.getName());
+ assertTrue(entriesField.getType() instanceof ArrowType.Struct);
+ assertEquals(2, entriesField.getChildren().size());
+
+ Field keyField = entriesField.getChildren().get(0);
+ assertEquals("key", keyField.getName());
+ assertEquals(ArrowType.Utf8.INSTANCE, keyField.getType());
+
+ Field valueField = entriesField.getChildren().get(1);
+ assertEquals("value", valueField.getName());
+ assertTrue(valueField.getType() instanceof ArrowType.Int);
+ }
+
+ @Test
+ public void testSchemaConversion() {
+ RowType rowType = RowType.of(
+ new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType()},
+ new String[] {"id", "name", "score"});
+
+ Schema schema = FlinkArrowUtils.toArrowSchema(rowType);
+
+ assertEquals(3, schema.getFields().size());
+
+ Field idField = schema.getFields().get(0);
+ assertEquals("id", idField.getName());
+ assertTrue(idField.getType() instanceof ArrowType.Int);
+
+ Field nameField = schema.getFields().get(1);
+ assertEquals("name", nameField.getName());
+ assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType());
+
+ Field scoreField = schema.getFields().get(2);
+ assertEquals("score", scoreField.getName());
+ assertTrue(scoreField.getType() instanceof ArrowType.FloatingPoint);
+ }
+
+ @Test
+ public void testTimeTypeConversion() {
+ // Precision 0 -> SECOND, 32-bit
+ TimeType timeType0 = new TimeType(0);
+ ArrowType.Time arrowTime0 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType0);
+ assertEquals(TimeUnit.SECOND, arrowTime0.getUnit());
+ assertEquals(32, arrowTime0.getBitWidth());
+
+ // Precision 1-3 -> MILLISECOND, 32-bit
+ TimeType timeType3 = new TimeType(3);
+ ArrowType.Time arrowTime3 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType3);
+ assertEquals(TimeUnit.MILLISECOND, arrowTime3.getUnit());
+ assertEquals(32, arrowTime3.getBitWidth());
+
+ // Precision 4-6 -> MICROSECOND, 64-bit
+ TimeType timeType6 = new TimeType(6);
+ ArrowType.Time arrowTime6 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType6);
+ assertEquals(TimeUnit.MICROSECOND, arrowTime6.getUnit());
+ assertEquals(64, arrowTime6.getBitWidth());
+
+ // Precision 7+ -> NANOSECOND, 64-bit
+ TimeType timeType9 = new TimeType(9);
+ ArrowType.Time arrowTime9 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType9);
+ assertEquals(TimeUnit.NANOSECOND, arrowTime9.getUnit());
+ assertEquals(64, arrowTime9.getBitWidth());
+ }
+
+ @Test
+ public void testTimestampTypeConversion() {
+ // Precision 0 -> SECOND
+ TimestampType ts0 = new TimestampType(0);
+ ArrowType.Timestamp arrowTs0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts0);
+ assertEquals(TimeUnit.SECOND, arrowTs0.getUnit());
+ assertNull(arrowTs0.getTimezone());
+
+ // Precision 1-3 -> MILLISECOND
+ TimestampType ts3 = new TimestampType(3);
+ ArrowType.Timestamp arrowTs3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts3);
+ assertEquals(TimeUnit.MILLISECOND, arrowTs3.getUnit());
+ assertNull(arrowTs3.getTimezone());
+
+ // Precision 4-6 -> MICROSECOND
+ TimestampType ts6 = new TimestampType(6);
+ ArrowType.Timestamp arrowTs6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts6);
+ assertEquals(TimeUnit.MICROSECOND, arrowTs6.getUnit());
+ assertNull(arrowTs6.getTimezone());
+
+ // Precision 7+ -> NANOSECOND
+ TimestampType ts9 = new TimestampType(9);
+ ArrowType.Timestamp arrowTs9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts9);
+ assertEquals(TimeUnit.NANOSECOND, arrowTs9.getUnit());
+ assertNull(arrowTs9.getTimezone());
+ }
+
+ @Test
+ public void testLocalZonedTimestampTypeConversion() {
+ // Precision 0 -> SECOND
+ LocalZonedTimestampType lzType0 = new LocalZonedTimestampType(0);
+ ArrowType.Timestamp arrowLz0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType0);
+ assertEquals(TimeUnit.SECOND, arrowLz0.getUnit());
+ assertNull(arrowLz0.getTimezone());
+
+ // Precision 1-3 -> MILLISECOND
+ LocalZonedTimestampType lzType3 = new LocalZonedTimestampType(3);
+ ArrowType.Timestamp arrowLz3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType3);
+ assertEquals(TimeUnit.MILLISECOND, arrowLz3.getUnit());
+ assertNull(arrowLz3.getTimezone());
+
+ // Precision 4-6 -> MICROSECOND
+ LocalZonedTimestampType lzType6 = new LocalZonedTimestampType(6);
+ ArrowType.Timestamp arrowLz6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType6);
+ assertEquals(TimeUnit.MICROSECOND, arrowLz6.getUnit());
+ assertNull(arrowLz6.getTimezone());
+
+ // Precision 7+ -> NANOSECOND
+ LocalZonedTimestampType lzType9 = new LocalZonedTimestampType(9);
+ ArrowType.Timestamp arrowLz9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType9);
+ assertEquals(TimeUnit.NANOSECOND, arrowLz9.getUnit());
+ assertNull(arrowLz9.getTimezone());
+ }
+
+ @Test
+ public void testUnsupportedTypeThrowsException() {
+ // RawType is not supported
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> FlinkArrowUtils.toArrowType(new RawType<>(String.class, StringSerializer.INSTANCE)));
+ }
+}