-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function #27901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c9f0b44
f11a83a
1bda3fc
a56c0af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,9 +30,116 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | |
|
|
||
| | Function | Description | | ||
| |:---------|:------------| | ||
| | [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table | | ||
| | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | | ||
|
|
||
| <!-- Placeholder for future FROM_CHANGELOG function --> | ||
| ## FROM_CHANGELOG | ||
|
|
||
| The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding change operation. | ||
|
|
||
| This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when the user wants to turn the append-only table back into an updating table after doing some specific transformation to the events. | ||
|
|
||
| ### Syntax | ||
|
|
||
| ```sql | ||
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE source_table PARTITION BY key_col, | ||
| [op => DESCRIPTOR(op_column_name),] | ||
| [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point and good idea, let's keep this PR not grow too big. |
||
| ) | ||
| ``` | ||
|
|
||
| ### Parameters | ||
|
|
||
| | Parameter | Required | Description | | ||
| |:-------------|:---------|:------------| | ||
| | `input` | Yes | The input table. Must be append-only and include `PARTITION BY` for parallel execution. | | ||
| | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | | ||
| | `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | | ||
|
|
||
| #### Default op_mapping | ||
|
|
||
| When `op_mapping` is omitted, the following standard names are used: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're missing the UPDATE_BEFORE -> UPDATE_BEFORE We're actually not going to drop UPDATE_BEFORE in the default implementations. We want a simply flat mapping in the default behavior
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a dynamic way to detect the changelog mode based on the op_mapping:
|
||
|
|
||
| | Input code | Change operation | | ||
| |:-------------------|:------------------| | ||
| | `'INSERT'` | INSERT | | ||
| | `'UPDATE_BEFORE'` | UPDATE_BEFORE | | ||
| | `'UPDATE_AFTER'` | UPDATE_AFTER | | ||
| | `'DELETE'` | DELETE | | ||
|
|
||
| ### Output Schema | ||
|
|
||
| The output columns are ordered as: | ||
|
|
||
| ``` | ||
| [partition_key_columns, remaining_columns_without_op] | ||
| ``` | ||
|
|
||
| The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). | ||
|
|
||
| ### Examples | ||
|
|
||
| #### Basic usage with standard op names | ||
|
|
||
| ```sql | ||
| -- Input (append-only): | ||
| -- +I[id:1, op:'INSERT', name:'Alice'] | ||
| -- +I[id:1, op:'UPDATE_BEFORE', name:'Alice'] | ||
| -- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] | ||
| -- +I[id:2, op:'DELETE', name:'Bob'] | ||
|
|
||
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE cdc_stream PARTITION BY id | ||
| ) | ||
|
|
||
| -- Output (updating table): | ||
| -- +I[id:1, name:'Alice'] | ||
| -- -U[id:1, name:'Alice'] | ||
| -- +U[id:1, name:'Alice2'] | ||
| -- -D[id:2, name:'Bob'] | ||
|
|
||
| -- Table state after all events: | ||
| -- | id | name | | ||
| -- |----|--------| | ||
| -- | 1 | Alice2 | | ||
| ``` | ||
|
|
||
| #### Debezium-style CDC codes | ||
|
|
||
| ```sql | ||
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE cdc_stream PARTITION BY id, | ||
| op => DESCRIPTOR(__op), | ||
| op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'] | ||
| ) | ||
| -- 'c' (create) and 'r' (read/snapshot) both produce INSERT | ||
| -- 'u' produces UPDATE_AFTER | ||
| -- 'd' produces DELETE | ||
| ``` | ||
|
|
||
| #### Custom operation column name | ||
|
|
||
| ```sql | ||
| SELECT * FROM FROM_CHANGELOG( | ||
| input => TABLE cdc_stream PARTITION BY id, | ||
| op => DESCRIPTOR(operation) | ||
| ) | ||
| -- The operation column named 'operation' is used instead of 'op' | ||
| ``` | ||
|
|
||
| #### Table API | ||
|
|
||
| ```java | ||
| // Default: reads 'op' column with standard change operation names | ||
| Table result = cdcStream.partitionBy($("id")).fromChangelog(); | ||
|
|
||
| // Debezium-style mapping | ||
| Table result = cdcStream.partitionBy($("id")).fromChangelog( | ||
| descriptor("__op").asArgument("op"), | ||
| map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping") | ||
| ); | ||
| ``` | ||
|
|
||
| ## TO_CHANGELOG | ||
|
|
||
|
|
@@ -88,6 +195,11 @@ All output rows have `INSERT` - the table is always append-only. | |
| -- +I[id:1, name:'Alice', cnt:1] | ||
| -- +U[id:1, name:'Alice', cnt:2] | ||
| -- -D[id:2, name:'Bob', cnt:1] | ||
| -- | ||
| -- Table state after all events: | ||
| -- | id | name | cnt | | ||
| -- |----|-------|-----| | ||
| -- | 1 | Alice | 2 | | ||
|
|
||
| SELECT * FROM TO_CHANGELOG( | ||
| input => TABLE my_aggregation PARTITION BY id | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |||||
| import org.apache.flink.annotation.Internal; | ||||||
| import org.apache.flink.table.api.TableException; | ||||||
| import org.apache.flink.table.catalog.DataTypeFactory; | ||||||
| import org.apache.flink.table.connector.ChangelogMode; | ||||||
| import org.apache.flink.table.types.DataType; | ||||||
| import org.apache.flink.table.types.inference.InputTypeStrategy; | ||||||
| import org.apache.flink.table.types.inference.StaticArgument; | ||||||
|
|
@@ -33,6 +34,7 @@ | |||||
| import java.util.Arrays; | ||||||
| import java.util.Locale; | ||||||
| import java.util.Optional; | ||||||
| import java.util.function.Function; | ||||||
| import java.util.regex.Pattern; | ||||||
|
|
||||||
| import static org.apache.flink.util.Preconditions.checkArgument; | ||||||
|
|
@@ -52,7 +54,7 @@ | |||||
| * <p>Equality is defined by reference equality. | ||||||
| */ | ||||||
| @Internal | ||||||
| public final class BuiltInFunctionDefinition implements SpecializedFunction { | ||||||
| public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction { | ||||||
|
|
||||||
| private final String name; | ||||||
|
|
||||||
|
|
@@ -72,6 +74,9 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { | |||||
|
|
||||||
| private final SqlCallSyntax sqlCallSyntax; | ||||||
|
|
||||||
| private final @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode> | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| changelogModeResolver; | ||||||
|
|
||||||
| private final String sqlName; | ||||||
|
|
||||||
| private BuiltInFunctionDefinition( | ||||||
|
|
@@ -84,7 +89,10 @@ private BuiltInFunctionDefinition( | |||||
| boolean isDeterministic, | ||||||
| boolean isRuntimeProvided, | ||||||
| String runtimeClass, | ||||||
| boolean isInternal) { | ||||||
| boolean isInternal, | ||||||
| @Nullable | ||||||
| Function<ChangelogFunction.ChangelogContext, ChangelogMode> | ||||||
| changelogModeResolver) { | ||||||
| this.name = checkNotNull(name, "Name must not be null."); | ||||||
| this.sqlName = sqlName; | ||||||
| this.version = isInternal ? null : version; | ||||||
|
|
@@ -95,6 +103,7 @@ private BuiltInFunctionDefinition( | |||||
| this.runtimeClass = runtimeClass; | ||||||
| this.isInternal = isInternal; | ||||||
| this.sqlCallSyntax = sqlCallSyntax; | ||||||
| this.changelogModeResolver = changelogModeResolver; | ||||||
| validateFunction(this.name, this.version, this.isInternal); | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -131,6 +140,16 @@ public boolean isInternal() { | |||||
| return isInternal; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Returns the optional changelog mode resolver for declaring the output changelog mode of a | ||||||
| * built-in process table function (e.g., FROM_CHANGELOG). Receives the {@link CallContext} so | ||||||
| * it can inspect function arguments to dynamically determine the changelog mode. | ||||||
| */ | ||||||
| public Optional<Function<ChangelogFunction.ChangelogContext, ChangelogMode>> | ||||||
| getChangelogModeResolver() { | ||||||
| return Optional.ofNullable(changelogModeResolver); | ||||||
| } | ||||||
|
|
||||||
| public String getQualifiedName() { | ||||||
| if (isInternal) { | ||||||
| return name; | ||||||
|
|
@@ -225,6 +244,14 @@ public static String qualifyFunctionName(String name, int version) { | |||||
| return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) { | ||||||
| if (changelogModeResolver != null) { | ||||||
| return changelogModeResolver.apply(changelogContext); | ||||||
| } | ||||||
| return ChangelogMode.insertOnly(); | ||||||
| } | ||||||
|
|
||||||
| // -------------------------------------------------------------------------------------------- | ||||||
| // Builder | ||||||
| // -------------------------------------------------------------------------------------------- | ||||||
|
|
@@ -253,6 +280,9 @@ public static final class Builder { | |||||
|
|
||||||
| private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION; | ||||||
|
|
||||||
| private @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode> | ||||||
| changelogModeResolver; | ||||||
|
|
||||||
| public Builder() { | ||||||
| // default constructor to allow a fluent definition | ||||||
| } | ||||||
|
|
@@ -399,6 +429,18 @@ public Builder sqlName(String name) { | |||||
| return this; | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Sets a resolver that dynamically determines the output {@link ChangelogMode} for this | ||||||
| * built-in PTF. The resolver receives the {@link CallContext} and can inspect function | ||||||
| * arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit | ||||||
| * updates (e.g., FROM_CHANGELOG). | ||||||
| */ | ||||||
| public Builder changelogModeResolver( | ||||||
| Function<ChangelogFunction.ChangelogContext, ChangelogMode> changelogModeResolver) { | ||||||
| this.changelogModeResolver = changelogModeResolver; | ||||||
| return this; | ||||||
| } | ||||||
|
|
||||||
| public BuiltInFunctionDefinition build() { | ||||||
| return new BuiltInFunctionDefinition( | ||||||
| name, | ||||||
|
|
@@ -410,7 +452,8 @@ public BuiltInFunctionDefinition build() { | |||||
| isDeterministic, | ||||||
| isRuntimeProvided, | ||||||
| runtimeClass, | ||||||
| isInternal); | ||||||
| isInternal, | ||||||
| changelogModeResolver); | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're moving to row semantics as the first default version - no partition by. I'm finishing a new PR that should help you see the changes that you'll have to make
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the PR #27911