From 94f28ba40e6f28a95a3e377992c7cfa9771f4cc6 Mon Sep 17 00:00:00 2001 From: swapna marru Date: Thu, 12 Feb 2026 19:19:26 -0800 Subject: [PATCH] variant accessor methods for handling map or array types --- .../flink/types/variant/BinaryVariant.java | 23 +++++++++++++++++++ .../apache/flink/types/variant/Variant.java | 18 +++++++++++++++ .../types/variant/BinaryVariantTest.java | 5 ++++ 3 files changed, 46 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java b/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java index 2e8f80f0c2c86..ce5664d34e3f2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java +++ b/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariant.java @@ -32,8 +32,10 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.List; import java.util.Objects; import static org.apache.flink.types.variant.BinaryVariantUtil.BINARY_SEARCH_THRESHOLD; @@ -242,11 +244,32 @@ public Variant getElement(int index) throws VariantTypeException { return getElementAtIndex(index); } + @Override + public int getArraySize() throws VariantTypeException { + return handleArray(value, pos, (size, offsetSize, offsetStart, dataStart) -> size); + } + @Override public Variant getField(String fieldName) throws VariantTypeException { return getFieldByKey(fieldName); } + @Override + public List getFieldNames() throws VariantTypeException { + return handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> { + List fieldNames = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + int id = readUnsigned(value, idStart + idSize * i, idSize); + String fieldName = getMetadataKey(metadata, id); + fieldNames.add(fieldName); + } + return fieldNames; + }); + } + @Override public String toJson() { StringBuilder sb = new StringBuilder(); diff --git a/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java b/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java index 6d6753c0406f1..c0f15788f3f75 100644 --- a/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java +++ b/flink-core/src/main/java/org/apache/flink/types/variant/Variant.java @@ -24,6 +24,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.List; /** Variant represent a semi-structured data. */ @PublicEvolving @@ -175,6 +176,14 @@ public interface Variant { */ Variant getElement(int index) throws VariantTypeException; + /** + * Get the size of an array variant. + * + * @return Number of elements if this variant is an array + * @throws VariantTypeException If this variant is not an array. + */ + int getArraySize() throws VariantTypeException; + /** * Access value of the specified field of an object variant. If there is no field with the * specified name, null is returned. @@ -186,6 +195,15 @@ public interface Variant { */ Variant getField(String fieldName) throws VariantTypeException; + /** + * Get the field names of an object variant only at top level. Doesn't include the nested + * fields. + * + * @return List of field names if this variant is an object + * @throws VariantTypeException If this variant is not an object. + */ + List getFieldNames() throws VariantTypeException; + /** Parses the variant to json. */ String toJson(); diff --git a/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java b/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java index 9b52bb328c18e..83896ec53e141 100644 --- a/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/variant/BinaryVariantTest.java @@ -113,6 +113,7 @@ void testArrayVariant() { assertThat(variant.isPrimitive()).isFalse(); assertThat(variant.isObject()).isFalse(); assertThat(variant.getType()).isEqualTo(Variant.Type.ARRAY); + assertThat(variant.getArraySize()).isEqualTo(5); assertThat(variant.getElement(-1)).isNull(); assertThat(variant.getElement(0).getInt()).isEqualTo(1); @@ -144,12 +145,16 @@ void testObjectVariant() { assertThat(variant.isPrimitive()).isFalse(); assertThat(variant.isObject()).isTrue(); assertThat(variant.getType()).isEqualTo(Variant.Type.OBJECT); + assertThat(variant.getFieldNames()).containsExactlyInAnyOrder("list", "object", "bb"); assertThat(variant.getField("list").isArray()).isTrue(); + assertThat(variant.getField("list").getArraySize()).isEqualTo(2); assertThat(variant.getField("list").getElement(0).getString()).isEqualTo("hello"); assertThat(variant.getField("list").getElement(1).getInt()).isEqualTo(1); assertThat(variant.getField("object").isObject()).isTrue(); + assertThat(variant.getField("object").getFieldNames()) + .containsExactlyInAnyOrder("ss", "ff"); assertThat(variant.getField("object").getField("ss").getShort()).isEqualTo((short) 1); assertThat(variant.getField("object").getField("ff").getFloat()).isEqualTo((10.0f));