diff --git a/.gitignore b/.gitignore index 9bb8dcb54..9f5a6a269 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,7 @@ common/src/main/resources/auron-build-info.properties .flattened-pom.xml -dependency-reduced-pom.xml \ No newline at end of file +dependency-reduced-pom.xml + +#lsp +*.prefs \ No newline at end of file diff --git a/auron-flink-extension/auron-flink-runtime/pom.xml b/auron-flink-extension/auron-flink-runtime/pom.xml index 3b5dfea21..4998e04c1 100644 --- a/auron-flink-extension/auron-flink-runtime/pom.xml +++ b/auron-flink-extension/auron-flink-runtime/pom.xml @@ -38,6 +38,41 @@ proto ${project.version} + + + + org.apache.arrow + arrow-c-data + + + org.apache.arrow + arrow-memory-unsafe + + + org.apache.arrow + arrow-vector + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + + org.apache.auron + auron-core + ${project.version} + test-jar + test + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java new file mode 100644 index 000000000..0763a8470 --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import java.util.ArrayList; +import java.util.List; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; + +/** + * Utility class for converting Flink {@link LogicalType} instances to Arrow types, fields and schemas. + */ +public final class FlinkArrowUtils { + + /** + * Root allocator for Arrow memory management. + */ + public static final RootAllocator ROOT_ALLOCATOR = new RootAllocator(Long.MAX_VALUE); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(ROOT_ALLOCATOR::close)); + } + + /** + * Creates a child allocator from the root allocator. + * + * @param name Name for the child allocator + * @return A new child allocator + */ + public static BufferAllocator createChildAllocator(String name) { + return ROOT_ALLOCATOR.newChildAllocator(name, 0, Long.MAX_VALUE); + } + + /** + * Converts a Flink LogicalType to Arrow ArrowType. + * + * @param logicalType The Flink logical type + * @return The corresponding Arrow type + * @throws UnsupportedOperationException if the type is not supported + */ + public static ArrowType toArrowType(LogicalType logicalType) { + if (logicalType == null) { + throw new IllegalArgumentException("logicalType cannot be null"); + } + if (logicalType instanceof NullType) { + return ArrowType.Null.INSTANCE; + } else if (logicalType instanceof BooleanType) { + return ArrowType.Bool.INSTANCE; + } else if (logicalType instanceof TinyIntType) { + return new ArrowType.Int(8, true); + } else if (logicalType instanceof SmallIntType) { + return new ArrowType.Int(16, true); + } else if (logicalType instanceof IntType) { + return new ArrowType.Int(32, true); + } else if (logicalType instanceof BigIntType) { + return new ArrowType.Int(64, true); + } else if (logicalType instanceof FloatType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + } else if (logicalType instanceof DoubleType) { + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + } else if (logicalType instanceof VarCharType || logicalType instanceof CharType) { + return ArrowType.Utf8.INSTANCE; + } else if (logicalType instanceof VarBinaryType || logicalType instanceof BinaryType) { + return ArrowType.Binary.INSTANCE; + } else if (logicalType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) logicalType; + // Note: Arrow Java only has DecimalVector (128-bit) and Decimal256Vector (256-bit). + // There's no Decimal64Vector, so we always use 128-bit to match the actual storage. + // Setting bitWidth=64 would cause FFI export issues since the actual data is 128-bit. + return new ArrowType.Decimal(decimalType.getPrecision(), decimalType.getScale(), 128); + } else if (logicalType instanceof DateType) { + return new ArrowType.Date(DateUnit.DAY); + } else if (logicalType instanceof TimeType) { + TimeType timeType = (TimeType) logicalType; + int precision = timeType.getPrecision(); + if (precision == 0) { + return new ArrowType.Time(TimeUnit.SECOND, 32); + } else if (precision >= 1 && precision <= 3) { + return new ArrowType.Time(TimeUnit.MILLISECOND, 32); + } else if (precision >= 4 && precision <= 6) { + return new ArrowType.Time(TimeUnit.MICROSECOND, 64); + } else { + return new ArrowType.Time(TimeUnit.NANOSECOND, 64); + } + } else if (logicalType instanceof TimestampType) { + TimestampType timestampType = (TimestampType) logicalType; + int precision = timestampType.getPrecision(); + if (precision == 0) { + return new ArrowType.Timestamp(TimeUnit.SECOND, null); + } else if (precision >= 1 && precision <= 3) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + } else if (precision >= 4 && precision <= 6) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else { + return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null); + } + } else if (logicalType instanceof LocalZonedTimestampType) { + LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; + int precision = localZonedTimestampType.getPrecision(); + if (precision == 0) { + return new ArrowType.Timestamp(TimeUnit.SECOND, null); + } else if (precision >= 1 && precision <= 3) { + return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null); + } else if (precision >= 4 && precision <= 6) { + return new ArrowType.Timestamp(TimeUnit.MICROSECOND, null); + } else { + return new ArrowType.Timestamp(TimeUnit.NANOSECOND, null); + } + } else { + throw new UnsupportedOperationException("Unsupported Flink type: " + logicalType.asSummaryString()); + } + } + + /** + * Converts a Flink LogicalType to an Arrow Field. + * + * @param name The field name + * @param logicalType The Flink logical type + * @return The corresponding Arrow Field + */ + public static Field toArrowField(String name, LogicalType logicalType) { + boolean nullable = logicalType.isNullable(); + if (logicalType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) logicalType; + LogicalType elementType = arrayType.getElementType(); + FieldType fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null); + Field elementField = toArrowField("element", elementType); + List children = new ArrayList<>(); + children.add(elementField); + return new Field(name, fieldType, children); + } else if (logicalType instanceof RowType) { + RowType rowType = (RowType) logicalType; + FieldType fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null); + List children = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + children.add(toArrowField(field.getName(), field.getType())); + } + return new Field(name, fieldType, children); + } else if (logicalType instanceof MapType) { + MapType mapType = (MapType) logicalType; + LogicalType keyType = mapType.getKeyType(); + LogicalType valueType = mapType.getValueType(); + + // Create entries field (struct) + FieldType entriesFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null); + List entriesChildren = new ArrayList<>(); + entriesChildren.add(toArrowField(MapVector.KEY_NAME, keyType.copy(false))); + entriesChildren.add(toArrowField(MapVector.VALUE_NAME, valueType)); + Field entriesField = new Field(MapVector.DATA_VECTOR_NAME, entriesFieldType, entriesChildren); + + // Create map field + FieldType mapFieldType = new FieldType(nullable, new ArrowType.Map(false), null); + List mapChildren = new ArrayList<>(); + mapChildren.add(entriesField); + return new Field(name, mapFieldType, mapChildren); + } else { + ArrowType arrowType = toArrowType(logicalType); + FieldType fieldType = new FieldType(nullable, arrowType, null); + return new Field(name, fieldType, new ArrayList<>()); + } + } + + /** + * Converts a Flink RowType to an Arrow Schema. + * + * @param rowType The Flink row type + * @return The corresponding Arrow Schema + */ + public static Schema toArrowSchema(RowType rowType) { + List fields = new ArrayList<>(); + for (RowType.RowField field : rowType.getFields()) { + fields.add(toArrowField(field.getName(), field.getType())); + } + return new Schema(fields); + } + + private FlinkArrowUtils() { + // Utility class + } +} diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java new file mode 100644 index 000000000..14116fb5e --- /dev/null +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.arrow; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; + +/** Unit tests for FlinkArrowUtils. */ +public class FlinkArrowUtilsTest { + + @Test + public void testBasicTypeConversion() { + // Null + assertEquals(ArrowType.Null.INSTANCE, FlinkArrowUtils.toArrowType(new NullType())); + + // Boolean + assertEquals(ArrowType.Bool.INSTANCE, FlinkArrowUtils.toArrowType(new BooleanType())); + + // Integer types + assertEquals(new ArrowType.Int(8, true), FlinkArrowUtils.toArrowType(new TinyIntType())); + assertEquals(new ArrowType.Int(16, true), FlinkArrowUtils.toArrowType(new SmallIntType())); + assertEquals(new ArrowType.Int(32, true), FlinkArrowUtils.toArrowType(new IntType())); + assertEquals(new ArrowType.Int(64, true), FlinkArrowUtils.toArrowType(new BigIntType())); + + // Floating point types + assertEquals( + new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), + FlinkArrowUtils.toArrowType(new FloatType())); + assertEquals( + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + FlinkArrowUtils.toArrowType(new DoubleType())); + + // String and binary types + assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new VarCharType(100))); + assertEquals(ArrowType.Utf8.INSTANCE, FlinkArrowUtils.toArrowType(new CharType(10))); + assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new VarBinaryType(100))); + assertEquals(ArrowType.Binary.INSTANCE, FlinkArrowUtils.toArrowType(new BinaryType(10))); + + // Decimal type + DecimalType decimalType = new DecimalType(10, 2); + ArrowType arrowDecimal = FlinkArrowUtils.toArrowType(decimalType); + assertTrue(arrowDecimal instanceof ArrowType.Decimal); + assertEquals(10, ((ArrowType.Decimal) arrowDecimal).getPrecision()); + assertEquals(2, ((ArrowType.Decimal) arrowDecimal).getScale()); + assertEquals(128, ((ArrowType.Decimal) arrowDecimal).getBitWidth()); + + // Date type + assertEquals(new ArrowType.Date(DateUnit.DAY), FlinkArrowUtils.toArrowType(new DateType())); + } + + @Test + public void testArrayTypeConversion() { + ArrayType arrayType = new ArrayType(new IntType()); + Field field = FlinkArrowUtils.toArrowField("test_array", arrayType); + + assertEquals("test_array", field.getName()); + assertTrue(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.List); + assertEquals(1, field.getChildren().size()); + + Field elementField = field.getChildren().get(0); + assertEquals("element", elementField.getName()); + assertTrue(elementField.getType() instanceof ArrowType.Int); + } + + @Test + public void testRowTypeConversion() { + RowType rowType = + RowType.of(new LogicalType[] {new IntType(), new VarCharType(100)}, new String[] {"id", "name"}); + // Create a non-nullable version of the row type + RowType nonNullableRowType = (RowType) rowType.copy(false); + + Field field = FlinkArrowUtils.toArrowField("test_row", nonNullableRowType); + + assertEquals("test_row", field.getName()); + assertFalse(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.Struct); + assertEquals(2, field.getChildren().size()); + + Field idField = field.getChildren().get(0); + assertEquals("id", idField.getName()); + assertTrue(idField.getType() instanceof ArrowType.Int); + + Field nameField = field.getChildren().get(1); + assertEquals("name", nameField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType()); + } + + @Test + public void testMapTypeConversion() { + MapType mapType = new MapType(new VarCharType(100), new IntType()); + Field field = FlinkArrowUtils.toArrowField("test_map", mapType); + + assertEquals("test_map", field.getName()); + assertTrue(field.isNullable()); + assertTrue(field.getType() instanceof ArrowType.Map); + assertEquals(1, field.getChildren().size()); + + Field entriesField = field.getChildren().get(0); + assertEquals("entries", entriesField.getName()); + assertTrue(entriesField.getType() instanceof ArrowType.Struct); + assertEquals(2, entriesField.getChildren().size()); + + Field keyField = entriesField.getChildren().get(0); + assertEquals("key", keyField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, keyField.getType()); + + Field valueField = entriesField.getChildren().get(1); + assertEquals("value", valueField.getName()); + assertTrue(valueField.getType() instanceof ArrowType.Int); + } + + @Test + public void testSchemaConversion() { + RowType rowType = RowType.of( + new LogicalType[] {new IntType(), new VarCharType(100), new DoubleType()}, + new String[] {"id", "name", "score"}); + + Schema schema = FlinkArrowUtils.toArrowSchema(rowType); + + assertEquals(3, schema.getFields().size()); + + Field idField = schema.getFields().get(0); + assertEquals("id", idField.getName()); + assertTrue(idField.getType() instanceof ArrowType.Int); + + Field nameField = schema.getFields().get(1); + assertEquals("name", nameField.getName()); + assertEquals(ArrowType.Utf8.INSTANCE, nameField.getType()); + + Field scoreField = schema.getFields().get(2); + assertEquals("score", scoreField.getName()); + assertTrue(scoreField.getType() instanceof ArrowType.FloatingPoint); + } + + @Test + public void testTimeTypeConversion() { + // Precision 0 -> SECOND, 32-bit + TimeType timeType0 = new TimeType(0); + ArrowType.Time arrowTime0 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType0); + assertEquals(TimeUnit.SECOND, arrowTime0.getUnit()); + assertEquals(32, arrowTime0.getBitWidth()); + + // Precision 1-3 -> MILLISECOND, 32-bit + TimeType timeType3 = new TimeType(3); + ArrowType.Time arrowTime3 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType3); + assertEquals(TimeUnit.MILLISECOND, arrowTime3.getUnit()); + assertEquals(32, arrowTime3.getBitWidth()); + + // Precision 4-6 -> MICROSECOND, 64-bit + TimeType timeType6 = new TimeType(6); + ArrowType.Time arrowTime6 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType6); + assertEquals(TimeUnit.MICROSECOND, arrowTime6.getUnit()); + assertEquals(64, arrowTime6.getBitWidth()); + + // Precision 7+ -> NANOSECOND, 64-bit + TimeType timeType9 = new TimeType(9); + ArrowType.Time arrowTime9 = (ArrowType.Time) FlinkArrowUtils.toArrowType(timeType9); + assertEquals(TimeUnit.NANOSECOND, arrowTime9.getUnit()); + assertEquals(64, arrowTime9.getBitWidth()); + } + + @Test + public void testTimestampTypeConversion() { + // Precision 0 -> SECOND + TimestampType ts0 = new TimestampType(0); + ArrowType.Timestamp arrowTs0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts0); + assertEquals(TimeUnit.SECOND, arrowTs0.getUnit()); + assertNull(arrowTs0.getTimezone()); + + // Precision 1-3 -> MILLISECOND + TimestampType ts3 = new TimestampType(3); + ArrowType.Timestamp arrowTs3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts3); + assertEquals(TimeUnit.MILLISECOND, arrowTs3.getUnit()); + assertNull(arrowTs3.getTimezone()); + + // Precision 4-6 -> MICROSECOND + TimestampType ts6 = new TimestampType(6); + ArrowType.Timestamp arrowTs6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts6); + assertEquals(TimeUnit.MICROSECOND, arrowTs6.getUnit()); + assertNull(arrowTs6.getTimezone()); + + // Precision 7+ -> NANOSECOND + TimestampType ts9 = new TimestampType(9); + ArrowType.Timestamp arrowTs9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(ts9); + assertEquals(TimeUnit.NANOSECOND, arrowTs9.getUnit()); + assertNull(arrowTs9.getTimezone()); + } + + @Test + public void testLocalZonedTimestampTypeConversion() { + // Precision 0 -> SECOND + LocalZonedTimestampType lzType0 = new LocalZonedTimestampType(0); + ArrowType.Timestamp arrowLz0 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType0); + assertEquals(TimeUnit.SECOND, arrowLz0.getUnit()); + assertNull(arrowLz0.getTimezone()); + + // Precision 1-3 -> MILLISECOND + LocalZonedTimestampType lzType3 = new LocalZonedTimestampType(3); + ArrowType.Timestamp arrowLz3 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType3); + assertEquals(TimeUnit.MILLISECOND, arrowLz3.getUnit()); + assertNull(arrowLz3.getTimezone()); + + // Precision 4-6 -> MICROSECOND + LocalZonedTimestampType lzType6 = new LocalZonedTimestampType(6); + ArrowType.Timestamp arrowLz6 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType6); + assertEquals(TimeUnit.MICROSECOND, arrowLz6.getUnit()); + assertNull(arrowLz6.getTimezone()); + + // Precision 7+ -> NANOSECOND + LocalZonedTimestampType lzType9 = new LocalZonedTimestampType(9); + ArrowType.Timestamp arrowLz9 = (ArrowType.Timestamp) FlinkArrowUtils.toArrowType(lzType9); + assertEquals(TimeUnit.NANOSECOND, arrowLz9.getUnit()); + assertNull(arrowLz9.getTimezone()); + } + + @Test + public void testUnsupportedTypeThrowsException() { + // RawType is not supported + assertThrows( + UnsupportedOperationException.class, + () -> FlinkArrowUtils.toArrowType(new RawType<>(String.class, StringSerializer.INSTANCE))); + } +}