diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 83bc11abe4451..96e3e49bab52d 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1167,6 +1167,16 @@ variant:
parser will keep the last occurrence of all fields with the same key, otherwise when
`allowDuplicateKeys` is false it will throw an error. The default value of
`allowDuplicateKeys` is false.
+ - sql: variant '[' INT ']'
+ table: VARIANT.at(INT)
+ description: |
+ If the VARIANT is an ARRAY value, returns a VARIANT whose value is the element at
+ the specified index. Otherwise, NULL is returned.
+ - sql: variant '[' STRING ']'
+ table: VARIANT.at(STRING)
+ description: |
+ If the VARIANT is a MAP value that has an element with this key, a VARIANT holding
+ the associated value is returned. Otherwise, NULL is returned.
valueconstruction:
- sql: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 8a5c772d21505..821294824b13e 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1241,7 +1241,6 @@ variant:
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
allowDuplicateKeys 的值为 false。
-
- sql: TRY_PARSE_JSON(json_string[, allow_duplicate_keys])
description: |
尽可能将 JSON 字符串解析为 Variant。如果 JSON 字符串无效,则返回 NULL。如果希望抛出错误而不是返回 NULL,
@@ -1251,6 +1250,16 @@ variant:
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
allowDuplicateKeys 的值为 false。
+ - sql: variant '[' INT ']'
+ table: VARIANT.at(INT)
+ description: |
+ 如果这是一个 ARRAY 类型的 VARIANT,则返回一个 VARIANT,其值为指定索引处的元素。否则返回 NULL。
+
+ - sql: variant '[' STRING ']'
+ table: VARIANT.at(STRING)
+ description: |
+ 如果这是一个 MAP 类型的 VARIANT,则返回一个 VARIANT,其值为与指定键关联的值。否则返回 NULL。
+
valueconstruction:
- sql: |
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/fun/SqlItemOperator.java b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/fun/SqlItemOperator.java
new file mode 100644
index 0000000000000..7f30f97ac105f
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/fun/SqlItemOperator.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.fun;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlSingleOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+
+import java.util.Arrays;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.calcite.sql.type.NonNullableAccessors.getComponentTypeOrThrow;
+import static org.apache.calcite.sql.validate.SqlNonNullableAccessors.getOperandLiteralValueOrThrow;
+
+/**
+ * The item operator {@code [ ... ]}, used to access a given element of an array, map or struct. For
+ * example, {@code myArray[3]}, {@code "myMap['foo']"}, {@code myStruct[2]} or {@code
+ * myStruct['fieldName']}.
+ *
+ *
This class was copied over from Calcite 1.39.0 version to support access variant
+ * (FLINK-37924).
+ *
+ *
Line 148, CALCITE-7325, should be removed after upgrading Calcite to 1.42.0.
+ */
+public class SqlItemOperator extends SqlSpecialOperator {
+ public final int offset;
+ public final boolean safe;
+
+ public SqlItemOperator(
+ String name, SqlSingleOperandTypeChecker operandTypeChecker, int offset, boolean safe) {
+ super(name, SqlKind.ITEM, 100, true, null, null, operandTypeChecker);
+ this.offset = offset;
+ this.safe = safe;
+ }
+
+ @Override
+ public ReduceResult reduceExpr(int ordinal, TokenSequence list) {
+ SqlNode left = list.node(ordinal - 1);
+ SqlNode right = list.node(ordinal + 1);
+ return new ReduceResult(
+ ordinal - 1,
+ ordinal + 2,
+ createCall(
+ SqlParserPos.sum(
+ Arrays.asList(
+ requireNonNull(left, "left").getParserPosition(),
+ requireNonNull(right, "right").getParserPosition(),
+ list.pos(ordinal))),
+ left,
+ right));
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ call.operand(0).unparse(writer, leftPrec, 0);
+ final SqlWriter.Frame frame = writer.startList("[", "]");
+ if (!this.getName().equals("ITEM")) {
+ final SqlWriter.Frame offsetFrame = writer.startFunCall(this.getName());
+ call.operand(1).unparse(writer, 0, 0);
+ writer.endFunCall(offsetFrame);
+ } else {
+ call.operand(1).unparse(writer, 0, 0);
+ }
+ writer.endList(frame);
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(2);
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+ final SqlNode left = callBinding.operand(0);
+ final SqlNode right = callBinding.operand(1);
+ if (!getOperandTypeChecker().checkSingleOperandType(callBinding, left, 0, throwOnFailure)) {
+ return false;
+ }
+ final SqlSingleOperandTypeChecker checker = getChecker(callBinding);
+ return checker.checkSingleOperandType(callBinding, right, 0, throwOnFailure);
+ }
+
+ @Override
+ public SqlSingleOperandTypeChecker getOperandTypeChecker() {
+ return (SqlSingleOperandTypeChecker)
+ requireNonNull(super.getOperandTypeChecker(), "operandTypeChecker");
+ }
+
+ private static SqlSingleOperandTypeChecker getChecker(SqlCallBinding callBinding) {
+ final RelDataType operandType = callBinding.getOperandType(0);
+ switch (operandType.getSqlTypeName()) {
+ case ARRAY:
+ return OperandTypes.family(SqlTypeFamily.INTEGER);
+ case MAP:
+ RelDataType keyType =
+ requireNonNull(operandType.getKeyType(), "operandType.getKeyType()");
+ SqlTypeName sqlTypeName = keyType.getSqlTypeName();
+ return OperandTypes.family(
+ requireNonNull(
+ sqlTypeName.getFamily(),
+ () ->
+ "keyType.getSqlTypeName().getFamily() null, type is "
+ + sqlTypeName));
+ case ROW:
+ case ANY:
+ case DYNAMIC_STAR:
+ case VARIANT:
+ return OperandTypes.family(SqlTypeFamily.INTEGER)
+ .or(OperandTypes.family(SqlTypeFamily.CHARACTER));
+ default:
+ throw callBinding.newValidationSignatureError();
+ }
+ }
+
+ @Override
+ public String getAllowedSignatures(String name) {
+ if (name.equals("ITEM")) {
+ return "[]\n"
+ + "