diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 15ad336f4d0d4..a1cfa699bad8c 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -30,9 +30,116 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | Function | Description | |:---------|:------------| +| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table | | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | - +## FROM_CHANGELOG + +The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding change operation. + +This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when the user wants to turn the append-only table back into an updating table after doing some specific transformation to the events. + +### Syntax + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE source_table PARTITION BY key_col, + [op => DESCRIPTOR(op_column_name),] + [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']] +) +``` + +### Parameters + +| Parameter | Required | Description | +|:-------------|:---------|:------------| +| `input` | Yes | The input table. Must be append-only and include `PARTITION BY` for parallel execution. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | +| `op_mapping` | No | A `MAP` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | + +#### Default op_mapping + +When `op_mapping` is omitted, the following standard names are used: + +| Input code | Change operation | +|:-------------------|:------------------| +| `'INSERT'` | INSERT | +| `'UPDATE_BEFORE'` | UPDATE_BEFORE | +| `'UPDATE_AFTER'` | UPDATE_AFTER | +| `'DELETE'` | DELETE | + +### Output Schema + +The output columns are ordered as: + +``` +[partition_key_columns, remaining_columns_without_op] +``` + +The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). + +### Examples + +#### Basic usage with standard op names + +```sql +-- Input (append-only): +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id +) + +-- Output (updating table): +-- +I[id:1, name:'Alice'] +-- -U[id:1, name:'Alice'] +-- +U[id:1, name:'Alice2'] +-- -D[id:2, name:'Bob'] + +-- Table state after all events: +-- | id | name | +-- |----|--------| +-- | 1 | Alice2 | +``` + +#### Debezium-style CDC codes + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id, + op => DESCRIPTOR(__op), + op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'] +) +-- 'c' (create) and 'r' (read/snapshot) both produce INSERT +-- 'u' produces UPDATE_AFTER +-- 'd' produces DELETE +``` + +#### Custom operation column name + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id, + op => DESCRIPTOR(operation) +) +-- The operation column named 'operation' is used instead of 'op' +``` + +#### Table API + +```java +// Default: reads 'op' column with standard change operation names +Table result = cdcStream.partitionBy($("id")).fromChangelog(); + +// Debezium-style mapping +Table result = cdcStream.partitionBy($("id")).fromChangelog( + descriptor("__op").asArgument("op"), + map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping") +); +``` ## TO_CHANGELOG @@ -88,6 +195,11 @@ All output rows have `INSERT` - the table is always append-only. -- +I[id:1, name:'Alice', cnt:1] -- +U[id:1, name:'Alice', cnt:2] -- -D[id:2, name:'Bob', cnt:1] +-- +-- Table state after all events: +-- | id | name | cnt | +-- |----|-------|-----| +-- | 1 | Alice | 2 | SELECT * FROM TO_CHANGELOG( input => TABLE my_aggregation PARTITION BY id diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 7db750a296d35..d9943c56a44a1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -172,4 +172,30 @@ public interface PartitionedTable { * columns */ Table toChangelog(Expression... arguments); + + /** + * Converts this append-only table with an explicit operation code column into a dynamic table + * using the built-in {@code FROM_CHANGELOG} process table function. + * + *

Each input row is expected to have a string operation code column (default: {@code "op"}) + * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The + * output table is a dynamic table backed by a changelog stream. + * + *

Optional arguments can be passed using named expressions: + * + *

{@code
+     * // Default: reads 'op' column with standard change operation names
+     * table.partitionBy($("id")).fromChangelog();
+     *
+     * // Custom op column name and mapping (Debezium-style codes)
+     * table.partitionBy($("id")).fromChangelog(
+     *     descriptor("__op").asArgument("op"),
+     *     map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
+     * );
+     * }
+ * + * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @return a dynamic {@link Table} with the op column removed and proper RowKind semantics + */ + Table fromChangelog(Expression... arguments); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 335f28f2f8e85..c47b757644718 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -906,6 +906,12 @@ public Table toChangelog(Expression... arguments) { return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), (Object[]) arguments); } + @Override + public Table fromChangelog(Expression... arguments) { + return process( + BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments); + } + private QueryOperation createPartitionQueryOperation() { return table.operationTreeBuilder.partition(partitionKeys, table.operationTree); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java index 77cfaf7166771..1ceed993c5cd0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.StaticArgument; @@ -33,6 +34,7 @@ import java.util.Arrays; import java.util.Locale; import java.util.Optional; +import java.util.function.Function; import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkArgument; @@ -52,7 +54,7 @@ *

Equality is defined by reference equality. */ @Internal -public final class BuiltInFunctionDefinition implements SpecializedFunction { +public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction { private final String name; @@ -72,6 +74,9 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { private final SqlCallSyntax sqlCallSyntax; + private final @Nullable Function + changelogModeResolver; + private final String sqlName; private BuiltInFunctionDefinition( @@ -84,7 +89,10 @@ private BuiltInFunctionDefinition( boolean isDeterministic, boolean isRuntimeProvided, String runtimeClass, - boolean isInternal) { + boolean isInternal, + @Nullable + Function + changelogModeResolver) { this.name = checkNotNull(name, "Name must not be null."); this.sqlName = sqlName; this.version = isInternal ? null : version; @@ -95,6 +103,7 @@ private BuiltInFunctionDefinition( this.runtimeClass = runtimeClass; this.isInternal = isInternal; this.sqlCallSyntax = sqlCallSyntax; + this.changelogModeResolver = changelogModeResolver; validateFunction(this.name, this.version, this.isInternal); } @@ -131,6 +140,16 @@ public boolean isInternal() { return isInternal; } + /** + * Returns the optional changelog mode resolver for declaring the output changelog mode of a + * built-in process table function (e.g., FROM_CHANGELOG). Receives the {@link CallContext} so + * it can inspect function arguments to dynamically determine the changelog mode. + */ + public Optional> + getChangelogModeResolver() { + return Optional.ofNullable(changelogModeResolver); + } + public String getQualifiedName() { if (isInternal) { return name; @@ -225,6 +244,14 @@ public static String qualifyFunctionName(String name, int version) { return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version); } + @Override + public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) { + if (changelogModeResolver != null) { + return changelogModeResolver.apply(changelogContext); + } + return ChangelogMode.insertOnly(); + } + // -------------------------------------------------------------------------------------------- // Builder // -------------------------------------------------------------------------------------------- @@ -253,6 +280,9 @@ public static final class Builder { private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION; + private @Nullable Function + changelogModeResolver; + public Builder() { // default constructor to allow a fluent definition } @@ -399,6 +429,18 @@ public Builder sqlName(String name) { return this; } + /** + * Sets a resolver that dynamically determines the output {@link ChangelogMode} for this + * built-in PTF. The resolver receives the {@link CallContext} and can inspect function + * arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit + * updates (e.g., FROM_CHANGELOG). + */ + public Builder changelogModeResolver( + Function changelogModeResolver) { + this.changelogModeResolver = changelogModeResolver; + return this; + } + public BuiltInFunctionDefinition build() { return new BuiltInFunctionDefinition( name, @@ -410,7 +452,8 @@ public BuiltInFunctionDefinition build() { isDeterministic, isRuntimeProvided, runtimeClass, - isInternal); + isInternal, + changelogModeResolver); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index e8783ba51c859..3fb526d6f43ba 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -37,6 +37,7 @@ import org.apache.flink.table.types.inference.StaticArgumentTrait; import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy; +import org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy; import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies; import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies; import org.apache.flink.table.types.logical.LocalZonedTimestampType; @@ -106,6 +107,7 @@ import static org.apache.flink.table.types.inference.TypeStrategies.varyingString; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE; +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY; @@ -115,6 +117,7 @@ import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND; +import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY; @@ -802,6 +805,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) "org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction") .build(); + public static final BuiltInFunctionDefinition FROM_CHANGELOG = + BuiltInFunctionDefinition.newBuilder() + .name("FROM_CHANGELOG") + .kind(PROCESS_TABLE) + .staticArguments( + StaticArgument.table( + "input", + Row.class, + false, + EnumSet.of( + StaticArgumentTrait.TABLE, + StaticArgumentTrait.SET_SEMANTIC_TABLE)), + StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), + StaticArgument.scalar( + "op_mapping", + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + true)) + .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY) + .changelogModeResolver(FromChangelogTypeStrategy::resolveChangelogMode) + .runtimeClass( + "org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction") + .build(); + public static final BuiltInFunctionDefinition GREATEST = BuiltInFunctionDefinition.newBuilder() .name("GREATEST") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java index ad6e56d09d924..7c5763548ee5c 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java @@ -23,6 +23,8 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.types.RowKind; +import java.util.Optional; + /** * An extension that allows a process table function (PTF) to emit results with changelog semantics. * @@ -105,5 +107,14 @@ interface ChangelogContext { * are required and {@link ChangelogMode#keyOnlyDeletes()} are supported. */ ChangelogMode getRequiredChangelogMode(); + + /** + * Returns the value of a scalar argument at the given position. + * + * @param pos the argument position + * @param clazz the expected class of the argument value + * @return the argument value, or empty if the argument is null or not available + */ + Optional getArgumentValue(int pos, Class clazz); } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java new file mode 100644 index 0000000000000..ace2f5a5dab54 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.ChangelogFunction; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** Type strategies for the {@code FROM_CHANGELOG} process table function. */ +@Internal +public final class FromChangelogTypeStrategy { + + private static final String DEFAULT_OP_COLUMN_NAME = "op"; + + private static final Set VALID_ROW_KIND_NAMES = + Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + + // -------------------------------------------------------------------------------------------- + // Input validation + // -------------------------------------------------------------------------------------------- + + public static final InputTypeStrategy INPUT_TYPE_STRATEGY = + new InputTypeStrategy() { + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.between(1, 3); + } + + @Override + public Optional> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + return validateInputs(callContext, throwOnFailure); + } + + @Override + public List getExpectedSignatures(final FunctionDefinition definition) { + return List.of( + Signature.of(Argument.of("input", "TABLE")), + Signature.of( + Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR")), + Signature.of( + Argument.of("input", "TABLE"), + Argument.of("op", "DESCRIPTOR"), + Argument.of("op_mapping", "MAP"))); + } + }; + + // -------------------------------------------------------------------------------------------- + // Output type inference + // -------------------------------------------------------------------------------------------- + + public static final TypeStrategy OUTPUT_TYPE_STRATEGY = + callContext -> { + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow( + () -> + new ValidationException( + "First argument must be a table for FROM_CHANGELOG.")); + + final String opColumnName = resolveOpColumnName(callContext); + + final List outputFields = buildOutputFields(tableSemantics, opColumnName); + + return Optional.of(DataTypes.ROW(outputFields).notNull()); + }; + + // -------------------------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static Optional> validateInputs( + final CallContext callContext, final boolean throwOnFailure) { + final boolean isMissingTableArg = callContext.getTableSemantics(0).isEmpty(); + if (isMissingTableArg) { + return callContext.fail( + throwOnFailure, "First argument must be a table for FROM_CHANGELOG."); + } + + final Optional opDescriptor = callContext.getArgumentValue(1, ColumnList.class); + final boolean hasInvalidOpDescriptor = + opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1; + if (hasInvalidOpDescriptor) { + return callContext.fail( + throwOnFailure, + "The descriptor for argument 'op' must contain exactly one column name."); + } + + // Validate that the op column exists in the input schema and is of STRING type + final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); + final String opColumnName = resolveOpColumnName(callContext); + final List inputFields = DataType.getFields(tableSemantics.dataType()); + final Optional opField = + inputFields.stream().filter(f -> f.getName().equals(opColumnName)).findFirst(); + if (opField.isEmpty()) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' does not exist in the input schema.", + opColumnName)); + } + if (!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' must be of STRING type, but was '%s'.", + opColumnName, opField.get().getDataType().getLogicalType())); + } + + final boolean hasMappingArgProvided = !callContext.isArgumentNull(2); + final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2); + if (hasMappingArgProvided && !isMappingArgLiteral) { + return callContext.fail( + throwOnFailure, "The 'op_mapping' argument must be a constant MAP literal."); + } + + final Optional opMapping = callContext.getArgumentValue(2, Map.class); + if (opMapping.isPresent()) { + final Optional> validationError = + validateOpMappingValues(callContext, opMapping.get(), throwOnFailure); + if (validationError.isPresent()) { + return validationError; + } + } + + return Optional.of(callContext.getArgumentDataTypes()); + } + + /** + * Validates op_mapping values. Values must be valid RowKind names from {INSERT, UPDATE_AFTER, + * DELETE}. Keys are arbitrary user strings (e.g., 'c', 'u', 'd') and may be comma-separated to + * map multiple user codes to the same RowKind. Each RowKind name must appear at most once + * across all entries. + */ + private static Optional> validateOpMappingValues( + final CallContext callContext, + final Map opMapping, + final boolean throwOnFailure) { + final Set allRowKindsSeen = new HashSet<>(); + + for (final Entry entry : opMapping.entrySet()) { + if (!(entry.getKey() instanceof String)) { + return callContext.fail( + throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + } + final Object value = entry.getValue(); + if (!(value instanceof String)) { + return callContext.fail( + throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + } + final String rowKindName = ((String) value).trim(); + if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Unknown change operation: '%s'. Valid values are: %s.", + rowKindName, VALID_ROW_KIND_NAMES)); + } + final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); + if (isDuplicate) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Duplicate change operation: '%s'.", + rowKindName)); + } + } + return Optional.empty(); + } + + private static String resolveOpColumnName(final CallContext callContext) { + return callContext + .getArgumentValue(1, ColumnList.class) + .filter(cl -> !cl.getNames().isEmpty()) + .map(cl -> cl.getNames().get(0)) + .orElse(DEFAULT_OP_COLUMN_NAME); + } + + private static List buildOutputFields( + final TableSemantics tableSemantics, final String opColumnName) { + final Set partitionKeys = + IntStream.of(tableSemantics.partitionByColumns()) + .boxed() + .collect(Collectors.toSet()); + final List inputFields = DataType.getFields(tableSemantics.dataType()); + + // Exclude partition keys (prepended by framework) and the op column (becomes RowKind) + return IntStream.range(0, inputFields.size()) + .filter( + i -> + !partitionKeys.contains(i) + && !inputFields.get(i).getName().equals(opColumnName)) + .mapToObj(inputFields::get) + .collect(Collectors.toList()); + } + + /** + * Resolves the output changelog mode based on the op_mapping argument. If op_mapping is absent + * (default) or includes UPDATE_BEFORE, returns retract mode (all). Otherwise, returns upsert + * mode (no UPDATE_BEFORE). + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static ChangelogMode resolveChangelogMode( + final ChangelogFunction.ChangelogContext changelogContext) { + final Optional opMapping = changelogContext.getArgumentValue(2, Map.class); + if (opMapping.isEmpty()) { + // Default mapping includes UPDATE_BEFORE -> retract mode + return ChangelogMode.all(); + } + final boolean hasUpdateBefore = + ((Map) opMapping.get()) + .values().stream() + .anyMatch(v -> RowKind.UPDATE_BEFORE.name().equals(v.trim())); + return hasUpdateBefore ? ChangelogMode.all() : ChangelogMode.upsert(false); + } + + private FromChangelogTypeStrategy() {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java index 4455b083ce2e9..b4cd96e0349e2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java @@ -126,6 +126,10 @@ public static InputTypeStrategy windowTimeIndicator() { public static final InputTypeStrategy TO_CHANGELOG_INPUT_TYPE_STRATEGY = ToChangelogTypeStrategy.INPUT_TYPE_STRATEGY; + /** Input strategy for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. */ + public static final InputTypeStrategy FROM_CHANGELOG_INPUT_TYPE_STRATEGY = + FromChangelogTypeStrategy.INPUT_TYPE_STRATEGY; + /** See {@link ExtractInputTypeStrategy}. */ public static final InputTypeStrategy EXTRACT = new ExtractInputTypeStrategy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java index c1b854cad7c3b..06b6cbb2d3bf3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java @@ -202,6 +202,10 @@ public final class SpecificTypeStrategies { public static final TypeStrategy TO_CHANGELOG_OUTPUT_TYPE_STRATEGY = ToChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY; + /** Type strategy specific for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. */ + public static final TypeStrategy FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY = + FromChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY; + private SpecificTypeStrategies() { // no instantiation } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java new file mode 100644 index 0000000000000..0de6bc6bd090c --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; +import org.apache.flink.table.types.inference.utils.TableSemanticsMock; +import org.apache.flink.types.ColumnList; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY; + +/** Tests for {@link FromChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */ +class FromChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { + + private static final DataType TABLE_TYPE = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD("name", DataTypes.STRING())); + + private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR(); + + private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + + @Override + protected Stream testData() { + return Stream.of( + // Valid: all three arguments with Debezium-style mapping + TestSpec.forStrategy("Valid with all arguments", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "u", "UPDATE_AFTER", + "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Valid: retract-style mapping with UPDATE_BEFORE + TestSpec.forStrategy("Valid with UPDATE_BEFORE", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "ub", "UPDATE_BEFORE", + "ua", "UPDATE_AFTER", + "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Error: op column not found + TestSpec.forStrategy( + "Op column not found in schema", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("nonexistent"))) + .expectErrorMessage("The op column 'nonexistent' does not exist"), + + // Error: op column is not STRING + TestSpec.forStrategy("Op column wrong type", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())), + DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt( + 0, + new TableSemanticsMock( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())))) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .expectErrorMessage("must be of STRING type"), + + // Error: multi-column descriptor + TestSpec.forStrategy( + "Descriptor with multiple columns", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("a", "b"))) + .expectErrorMessage("must contain exactly one column name"), + + // Error: invalid RowKind in op_mapping value + TestSpec.forStrategy( + "Invalid RowKind in mapping value", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INVALID_KIND")) + .expectErrorMessage("Unknown change operation: 'INVALID_KIND'"), + + // Error: duplicate RowKind across entries + TestSpec.forStrategy( + "Duplicate RowKind in mapping values", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INSERT", "r", "INSERT")) + .expectErrorMessage("Duplicate change operation: 'INSERT'")); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 56c9cb1262a48..0c6b7e8185103 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -40,6 +40,7 @@ import org.apache.flink.table.planner.utils.{JavaScalaConversionUtil, ShortcutUt import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.flink.table.runtime.operators.join.FlinkJoinType import org.apache.flink.table.types.inference.{StaticArgument, StaticArgumentTrait} +import org.apache.flink.table.types.inference.CallContext import org.apache.flink.types.RowKind import org.apache.calcite.linq4j.Ord @@ -1680,8 +1681,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti inputChangelogModes, outputChangelogMode) - // Expose a simplified context to let users focus on important characteristics. - // If necessary, we can expose the full CallContext in the future. new ChangelogContext { override def getTableChangelogMode(pos: Int): ChangelogMode = { val tableSemantics = callContext.getTableSemantics(pos).orElse(null) @@ -1694,6 +1693,10 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti override def getRequiredChangelogMode: ChangelogMode = { callContext.getOutputChangelogMode.orElse(null) } + + override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = { + callContext.getArgumentValue(pos, clazz) + } } } @@ -1705,17 +1708,21 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti defaultTraitSet: T): T = { val call = process.getCall val definition = ShortcutUtils.unwrapFunctionDefinition(call) - definition match { - case changelogFunction: ChangelogFunction => - val inputChangelogModes = children.map(toChangelogMode(_, None, None)) + val inputChangelogModes = children.map(toChangelogMode(_, None, None)) + val changelogModeOpt: Option[ChangelogMode] = definition match { + case cf: ChangelogFunction => val changelogContext = toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode) - val changelogMode = changelogFunction.getChangelogMode(changelogContext) + Some(cf.getChangelogMode(changelogContext)) + case _ => None + } + changelogModeOpt match { + case Some(changelogMode) => if (!changelogMode.containsOnly(RowKind.INSERT)) { verifyPtfTableArgsForUpdates(call) } toTraitSet(changelogMode) - case _ => + case None => defaultTraitSet } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java new file mode 100644 index 0000000000000..d4846230532c6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.List; + +/** Semantic tests for the built-in FROM_CHANGELOG process table function. */ +public class FromChangelogSemanticTests extends SemanticTestBase { + + @Override + protected void applyDefaultEnvironmentOptions(TableConfig config) { + super.applyDefaultEnvironmentOptions(config); + config.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, + OptimizerConfigOptions.NonDeterministicUpdateStrategy.IGNORE); + } + + @Override + public List programs() { + return List.of( + FromChangelogTestPrograms.DEFAULT_OP_MAPPING, + FromChangelogTestPrograms.DEBEZIUM_MAPPING, + FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED, + FromChangelogTestPrograms.TABLE_API_DEFAULT, + FromChangelogTestPrograms.MISSING_PARTITION_BY); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java new file mode 100644 index 0000000000000..4f12d4e174368 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import static org.apache.flink.table.api.Expressions.$; + +/** {@link TableTestProgram} definitions for testing the built-in FROM_CHANGELOG PTF. */ +public class FromChangelogTestPrograms { + + private static final SourceTestStep SIMPLE_CDC_SOURCE = + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues(Row.of(1, "INSERT", "Alice")) + .build(); + + // -------------------------------------------------------------------------------------------- + // SQL tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram DEFAULT_OP_MAPPING = + TableTestProgram.of( + "from-changelog-default-op-mapping", + "default mapping with standard op names") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id)") + .build(); + + public static final TableTestProgram DEBEZIUM_MAPPING = + TableTestProgram.of( + "from-changelog-debezium-mapping", + "Debezium-style c/r/u/d codes via custom op_mapping") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "__op STRING", "name STRING") + .producedValues( + Row.of(1, "c", "Alice"), + Row.of(2, "r", "Bob"), + Row.of(1, "u", "Alice2"), + Row.of(2, "d", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'])") + .build(); + + public static final TableTestProgram UNMAPPED_CODES_DROPPED = + TableTestProgram.of( + "from-changelog-unmapped-codes-dropped", + "unmapped op codes are silently dropped") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UNKNOWN", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id)") + .build(); + + // -------------------------------------------------------------------------------------------- + // Table API test + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram TABLE_API_DEFAULT = + TableTestProgram.of( + "from-changelog-table-api-default", + "PartitionedTable.fromChangelog() convenience method") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob")) + .build()) + .runTableApi( + env -> env.from("cdc_stream").partitionBy($("id")).fromChangelog(), + "sink") + .build(); + + // -------------------------------------------------------------------------------------------- + // Error validation tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram MISSING_PARTITION_BY = + TableTestProgram.of( + "from-changelog-missing-partition-by", + "fails when PARTITION BY is missing") + .setupTableSource(SIMPLE_CDC_SOURCE) + .runFailingSql( + "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)", + ValidationException.class, + "Table argument 'input' requires a PARTITION BY clause for parallel processing.") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java new file mode 100644 index 0000000000000..e15da07400d0d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +/** + * Plan tests for the FROM_CHANGELOG built-in process table function. Uses {@link + * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through the plan. + */ +public class FromChangelogTest extends TableTestBase { + + private static final java.util.List CHANGELOG_MODE = + Collections.singletonList(ExplainDetail.CHANGELOG_MODE); + + private TableTestUtil util; + + @BeforeEach + void setup() { + util = streamTestUtil(TableConfig.getDefault()); + } + + @Test + void testInsertOnlySource() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id)", + CHANGELOG_MODE); + } + + @Test + void testDebeziumMapping() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " __op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'])", + CHANGELOG_MODE); + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml new file mode 100644 index 0000000000000..3925730071f43 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml @@ -0,0 +1,59 @@ + + + + + + TABLE cdc_stream PARTITION BY id, op => DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'])]]> + + + + + + + + + + + TABLE cdc_stream PARTITION BY id)]]> + + + + + + + + + diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java new file mode 100644 index 0000000000000..8fbbe84a933d7 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.ptf; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. + * + *

Converts each append-only input row (which contains an operation code column) back into a + * changelog stream with proper {@link RowKind} annotations. The output schema excludes the + * operation code column and partition key columns (which are prepended by the framework + * automatically). + * + *

This is the reverse operation of {@link ToChangelogFunction}. + */ +@Internal +public class FromChangelogFunction extends BuiltInProcessTableFunction { + + private static final long serialVersionUID = 1L; + + private static final Map DEFAULT_OP_MAPPING = + Map.of( + "INSERT", RowKind.INSERT, + "UPDATE_BEFORE", RowKind.UPDATE_BEFORE, + "UPDATE_AFTER", RowKind.UPDATE_AFTER, + "DELETE", RowKind.DELETE); + + private final Map rawOpMap; + private final int opColumnIndex; + private final int[] outputIndices; + + private transient HashMap opMap; + private transient ProjectedRowData projectedOutput; + + public FromChangelogFunction(final SpecializedContext context) { + super(BuiltInFunctionDefinitions.FROM_CHANGELOG, context); + final CallContext callContext = context.getCallContext(); + + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow(() -> new IllegalStateException("Table argument expected.")); + final int[] partitionKeys = tableSemantics.partitionByColumns(); + final Set partitionKeySet = + IntStream.of(partitionKeys).boxed().collect(Collectors.toSet()); + + final RowType inputType = (RowType) tableSemantics.dataType().getLogicalType(); + final String opColumnName = resolveOpColumnName(callContext); + final List fieldNames = inputType.getFieldNames(); + this.opColumnIndex = fieldNames.indexOf(opColumnName); + + this.outputIndices = + IntStream.range(0, inputType.getFieldCount()) + .filter(i -> !partitionKeySet.contains(i) && i != opColumnIndex) + .toArray(); + + this.rawOpMap = buildOpMap(callContext); + } + + @Override + public void open(final FunctionContext context) throws Exception { + super.open(context); + opMap = new HashMap<>(); + rawOpMap.forEach((code, kind) -> opMap.put(StringData.fromString(code), kind)); + projectedOutput = ProjectedRowData.from(outputIndices); + } + + private static String resolveOpColumnName(final CallContext callContext) { + return callContext + .getArgumentValue(1, ColumnList.class) + .map(cl -> cl.getNames().get(0)) + .orElse("op"); + } + + /** + * Builds a String-to-RowKind map. Keys in the provided mapping may be comma-separated (e.g., + * "INSERT, UPDATE_AFTER") to map multiple input codes to the same RowKind. + */ + private static Map buildOpMap(CallContext callContext) { + return callContext + .getArgumentValue(2, Map.class) + .map(FromChangelogFunction::parseOpMapping) + .orElse(DEFAULT_OP_MAPPING); + } + + private static Map parseOpMapping(Map opMapping) { + return opMapping.entrySet().stream() + .flatMap( + e -> { + final RowKind kind = RowKind.valueOf(e.getValue().trim()); + return Arrays.stream(e.getKey().split(",")) + .map(code -> Map.entry(code.trim(), kind)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public void eval( + final Context ctx, + final RowData input, + @Nullable final ColumnList op, + @Nullable final MapData opMapping) { + final StringData opCode = input.getString(opColumnIndex); + final RowKind rowKind = opMap.get(opCode); + if (rowKind == null) { + return; + } + + projectedOutput.replaceRow(input); + projectedOutput.setRowKind(rowKind); + collect(projectedOutput); + } +}