Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,16 @@ variant:
parser will keep the last occurrence of all fields with the same key, otherwise when
`allowDuplicateKeys` is false it will throw an error. The default value of
`allowDuplicateKeys` is false.
- sql: variant '[' INT ']'
table: VARIANT.at(INT)
description: |
If the VARIANT is an ARRAY value, returns a VARIANT whose value is the element at
the specified index. Otherwise, NULL is returned.
- sql: variant '[' STRING ']'
table: VARIANT.at(STRING)
description: |
If the VARIANT is a MAP value that has an element with this key, a VARIANT holding
the associated value is returned. Otherwise, NULL is returned.

valueconstruction:
- sql: |
Expand Down
11 changes: 10 additions & 1 deletion docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,6 @@ variant:
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
allowDuplicateKeys 的值为 false。


- sql: TRY_PARSE_JSON(json_string[, allow_duplicate_keys])
description: |
尽可能将 JSON 字符串解析为 Variant。如果 JSON 字符串无效,则返回 NULL。如果希望抛出错误而不是返回 NULL,
Expand All @@ -1251,6 +1250,16 @@ variant:
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
allowDuplicateKeys 的值为 false。

- sql: variant '[' INT ']'
table: VARIANT.at(INT)
description: |
如果这是一个 ARRAY 类型的 VARIANT,则返回一个 VARIANT,其值为指定索引处的元素。否则返回 NULL。

- sql: variant '[' STRING ']'
table: VARIANT.at(STRING)
description: |
如果这是一个 MAP 类型的 VARIANT,则返回一个 VARIANT,其值为与指定键关联的值。否则返回 NULL。


valueconstruction:
- sql: |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.sql.fun;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperandCountRange;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.calcite.sql.type.SqlSingleOperandTypeChecker;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;

import java.util.Arrays;

import static java.util.Objects.requireNonNull;
import static org.apache.calcite.sql.type.NonNullableAccessors.getComponentTypeOrThrow;
import static org.apache.calcite.sql.validate.SqlNonNullableAccessors.getOperandLiteralValueOrThrow;

/**
* The item operator {@code [ ... ]}, used to access a given element of an array, map or struct. For
* example, {@code myArray[3]}, {@code "myMap['foo']"}, {@code myStruct[2]} or {@code
* myStruct['fieldName']}.
*
* <p>This class was copied over from Calcite 1.39.0 version to support access variant
* (FLINK-37924).
*
* <p>Line 148, CALCITE-7325, should be removed after upgrading Calcite to 1.42.0.
*/
public class SqlItemOperator extends SqlSpecialOperator {
public final int offset;
public final boolean safe;

public SqlItemOperator(
String name, SqlSingleOperandTypeChecker operandTypeChecker, int offset, boolean safe) {
super(name, SqlKind.ITEM, 100, true, null, null, operandTypeChecker);
this.offset = offset;
this.safe = safe;
}

@Override
public ReduceResult reduceExpr(int ordinal, TokenSequence list) {
SqlNode left = list.node(ordinal - 1);
SqlNode right = list.node(ordinal + 1);
return new ReduceResult(
ordinal - 1,
ordinal + 2,
createCall(
SqlParserPos.sum(
Arrays.asList(
requireNonNull(left, "left").getParserPosition(),
requireNonNull(right, "right").getParserPosition(),
list.pos(ordinal))),
left,
right));
}

@Override
public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
call.operand(0).unparse(writer, leftPrec, 0);
final SqlWriter.Frame frame = writer.startList("[", "]");
if (!this.getName().equals("ITEM")) {
final SqlWriter.Frame offsetFrame = writer.startFunCall(this.getName());
call.operand(1).unparse(writer, 0, 0);
writer.endFunCall(offsetFrame);
} else {
call.operand(1).unparse(writer, 0, 0);
}
writer.endList(frame);
}

@Override
public SqlOperandCountRange getOperandCountRange() {
return SqlOperandCountRanges.of(2);
}

@Override
public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
final SqlNode left = callBinding.operand(0);
final SqlNode right = callBinding.operand(1);
if (!getOperandTypeChecker().checkSingleOperandType(callBinding, left, 0, throwOnFailure)) {
return false;
}
final SqlSingleOperandTypeChecker checker = getChecker(callBinding);
return checker.checkSingleOperandType(callBinding, right, 0, throwOnFailure);
}

@Override
public SqlSingleOperandTypeChecker getOperandTypeChecker() {
return (SqlSingleOperandTypeChecker)
requireNonNull(super.getOperandTypeChecker(), "operandTypeChecker");
}

private static SqlSingleOperandTypeChecker getChecker(SqlCallBinding callBinding) {
final RelDataType operandType = callBinding.getOperandType(0);
switch (operandType.getSqlTypeName()) {
case ARRAY:
return OperandTypes.family(SqlTypeFamily.INTEGER);
case MAP:
RelDataType keyType =
requireNonNull(operandType.getKeyType(), "operandType.getKeyType()");
SqlTypeName sqlTypeName = keyType.getSqlTypeName();
return OperandTypes.family(
requireNonNull(
sqlTypeName.getFamily(),
() ->
"keyType.getSqlTypeName().getFamily() null, type is "
+ sqlTypeName));
case ROW:
case ANY:
case DYNAMIC_STAR:
case VARIANT:
return OperandTypes.family(SqlTypeFamily.INTEGER)
.or(OperandTypes.family(SqlTypeFamily.CHARACTER));
default:
throw callBinding.newValidationSignatureError();
}
}

@Override
public String getAllowedSignatures(String name) {
if (name.equals("ITEM")) {
return "<ARRAY>[<INTEGER>]\n"
+ "<MAP>[<ANY>]\n"
+ "<ROW>[<CHARACTER>|<INTEGER>]\n"
+ "<VARIANT>[<CHARACTER>|<INTEGER>]";
} else {
return "<ARRAY>[" + name + "(<INTEGER>)]";
}
}

@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
final RelDataType operandType = opBinding.getOperandType(0);
switch (operandType.getSqlTypeName()) {
case VARIANT:
// Return type is always nullable VARIANT
return typeFactory.createTypeWithNullability(operandType, true);
case ARRAY:
return typeFactory.createTypeWithNullability(
getComponentTypeOrThrow(operandType), true);
case MAP:
return typeFactory.createTypeWithNullability(
requireNonNull(
operandType.getValueType(),
() -> "operandType.getValueType() is null for " + operandType),
true);
case ROW:
RelDataType fieldType;
RelDataType indexType = opBinding.getOperandType(1);

if (SqlTypeUtil.isString(indexType)) {
final String fieldName =
getOperandLiteralValueOrThrow(opBinding, 1, String.class);
RelDataTypeField field = operandType.getField(fieldName, false, false);
if (field == null) {
throw new AssertionError(
"Cannot infer type of field '"
+ fieldName
+ "' within ROW type: "
+ operandType);
} else {
fieldType = field.getType();
}
} else if (SqlTypeUtil.isIntType(indexType)) {
Integer index = opBinding.getOperandLiteralValue(1, Integer.class);
if (index == null || index < 1 || index > operandType.getFieldCount()) {
throw new AssertionError(
"Cannot infer type of field at position "
+ index
+ " within ROW type: "
+ operandType);
} else {
fieldType =
operandType.getFieldList().get(index - 1).getType(); // 1 indexed
}
} else {
throw new AssertionError(
"Unsupported field identifier type: '" + indexType + "'");
}
if (operandType.isNullable()) {
fieldType = typeFactory.createTypeWithNullability(fieldType, true);
}
return fieldType;
case ANY:
case DYNAMIC_STAR:
return typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.ANY), true);
default:
throw new AssertionError();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2442,7 +2442,8 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
sequence(
or(
logical(LogicalTypeRoot.ARRAY),
logical(LogicalTypeRoot.MAP)),
logical(LogicalTypeRoot.MAP),
logical(LogicalTypeRoot.VARIANT)),
InputTypeStrategies.ITEM_AT_INDEX))
.outputTypeStrategy(SpecificTypeStrategies.ITEM_AT)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
* or {@link LogicalTypeRoot#MULTISET}
*
* <p>the type to be equal to the key type of {@link LogicalTypeRoot#MAP} if the first argument is a
* map.
* map
*
* <p>a {@link LogicalTypeFamily#NUMERIC} type or {LogicalTypeFamily.CHARACTER_STRING} type if the
* first argument is a {@link LogicalTypeRoot#VARIANT}.
*/
@Internal
public final class ItemAtIndexArgumentTypeStrategy implements ArgumentTypeStrategy {
Expand Down Expand Up @@ -86,12 +89,36 @@ public Optional<DataType> inferArgumentType(
}
}

if (collectionType.is(LogicalTypeRoot.VARIANT)) {
if (indexType.getLogicalType().is(LogicalTypeFamily.INTEGER_NUMERIC)) {

if (callContext.isArgumentLiteral(1)) {
Optional<Integer> literalVal = callContext.getArgumentValue(1, Integer.class);
if (literalVal.isPresent() && literalVal.get() <= 0) {
return callContext.fail(
throwOnFailure,
"The provided index must be a valid SQL index starting from 1, but was '%s'",
literalVal.get());
}
}

return Optional.of(indexType);
} else if (indexType.getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) {
return Optional.of(indexType);
} else {
return callContext.fail(
throwOnFailure,
"Incorrect type %s supplied for the variant value. Variant values can only be accessed with a CHARACTER STRING map key or an INTEGER NUMERIC array index.",
indexType.getLogicalType().toString());
}
}

return Optional.empty();
}

@Override
public Signature.Argument getExpectedArgument(
FunctionDefinition functionDefinition, int argumentPos) {
return Signature.Argument.of("[<INTEGER NUMERIC> | <MAP_KEY_TYPE>]");
return Signature.Argument.of("[<CHARACTER STRING> | <INTEGER NUMERIC> | <MAP_KEY_TYPE>]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.KeyValueDataType;
Expand All @@ -33,27 +34,29 @@
/**
* An output type strategy for {@link BuiltInFunctionDefinitions#AT}.
*
* <p>Returns either the element of an {@link LogicalTypeFamily#COLLECTION} type or the value of
* {@link LogicalTypeRoot#MAP}.
* <p>Returns either the element of an {@link LogicalTypeFamily#COLLECTION} type, the value of
* {@link LogicalTypeRoot#MAP}, or the variant itself for {@link LogicalTypeRoot#VARIANT}.
*/
@Internal
public final class ItemAtTypeStrategy implements TypeStrategy {
@Override
public Optional<DataType> inferType(CallContext callContext) {

DataType arrayOrMapType = callContext.getArgumentDataTypes().get(0);
DataType containerType = callContext.getArgumentDataTypes().get(0);
final Optional<DataType> legacyArrayElement =
StrategyUtils.extractLegacyArrayElement(arrayOrMapType);
StrategyUtils.extractLegacyArrayElement(containerType);

if (legacyArrayElement.isPresent()) {
return legacyArrayElement;
}

if (arrayOrMapType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
if (containerType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
return Optional.of(
((CollectionDataType) arrayOrMapType).getElementDataType().nullable());
} else if (arrayOrMapType instanceof KeyValueDataType) {
return Optional.of(((KeyValueDataType) arrayOrMapType).getValueDataType().nullable());
((CollectionDataType) containerType).getElementDataType().nullable());
} else if (containerType instanceof KeyValueDataType) {
return Optional.of(((KeyValueDataType) containerType).getValueDataType().nullable());
} else if (containerType.getLogicalType().is(LogicalTypeRoot.VARIANT)) {
return Optional.of(((AtomicDataType) containerType).nullable());
}

return Optional.empty();
Expand Down
Loading