Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 113 additions & 1 deletion docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,116 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan

| Function | Description |
|:---------|:------------|
| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes |

<!-- Placeholder for future FROM_CHANGELOG function -->
## FROM_CHANGELOG

The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding change operation.

This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when the user wants to turn the append-only table back into an updating table after doing some specific transformation to the events.

### Syntax

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE source_table PARTITION BY key_col,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're moving to row semantics as the first default version - no partition by. I'm finishing a new PR that should help you see the changes that you'll have to make

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #27911

[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalid_op_handling param would already be relevant here but we can add it in a follow up PR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point and good idea, let's keep this PR not grow too big.

)
```

### Parameters

| Parameter | Required | Description |
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Must be append-only and include `PARTITION BY` for parallel execution. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. |

#### Default op_mapping

When `op_mapping` is omitted, the following standard names are used:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're missing the UPDATE_BEFORE -> UPDATE_BEFORE

We're actually not going to drop UPDATE_BEFORE in the default implementations. We want a simply flat mapping in the default behavior

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a dynamic way to detect the changelog mode based on the op_mapping:

  • no op_mapping (default) -> retract
  • op_mapping not containing UPDATE_BEFORE -> upsert
  • op_mapping containing UPDATE_BEFORE -> retract


| Input code | Change operation |
|:-------------------|:------------------|
| `'INSERT'` | INSERT |
| `'UPDATE_BEFORE'` | UPDATE_BEFORE |
| `'UPDATE_AFTER'` | UPDATE_AFTER |
| `'DELETE'` | DELETE |

### Output Schema

The output columns are ordered as:

```
[partition_key_columns, remaining_columns_without_op]
```

The op column is removed from the output. Output rows carry the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).

### Examples

#### Basic usage with standard op names

```sql
-- Input (append-only):
-- +I[id:1, op:'INSERT', name:'Alice']
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
-- +I[id:2, op:'DELETE', name:'Bob']

SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id
)

-- Output (updating table):
-- +I[id:1, name:'Alice']
-- -U[id:1, name:'Alice']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']

-- Table state after all events:
-- | id | name |
-- |----|--------|
-- | 1 | Alice2 |
```

#### Debezium-style CDC codes

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id,
op => DESCRIPTOR(__op),
op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
)
-- 'c' (create) and 'r' (read/snapshot) both produce INSERT
-- 'u' produces UPDATE_AFTER
-- 'd' produces DELETE
```

#### Custom operation column name

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id,
op => DESCRIPTOR(operation)
)
-- The operation column named 'operation' is used instead of 'op'
```

#### Table API

```java
// Default: reads 'op' column with standard change operation names
Table result = cdcStream.partitionBy($("id")).fromChangelog();

// Debezium-style mapping
Table result = cdcStream.partitionBy($("id")).fromChangelog(
descriptor("__op").asArgument("op"),
map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
);
```

## TO_CHANGELOG

Expand Down Expand Up @@ -88,6 +195,11 @@ All output rows have `INSERT` - the table is always append-only.
-- +I[id:1, name:'Alice', cnt:1]
-- +U[id:1, name:'Alice', cnt:2]
-- -D[id:2, name:'Bob', cnt:1]
--
-- Table state after all events:
-- | id | name | cnt |
-- |----|-------|-----|
-- | 1 | Alice | 2 |

SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,30 @@ public interface PartitionedTable {
* columns
*/
Table toChangelog(Expression... arguments);

/**
* Converts this append-only table with an explicit operation code column into a dynamic table
* using the built-in {@code FROM_CHANGELOG} process table function.
*
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
* output table is a dynamic table backed by a changelog stream.
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
* // Default: reads 'op' column with standard change operation names
* table.partitionBy($("id")).fromChangelog();
*
* // Custom op column name and mapping (Debezium-style codes)
* table.partitionBy($("id")).fromChangelog(
* descriptor("__op").asArgument("op"),
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
* );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
* @return a dynamic {@link Table} with the op column removed and proper RowKind semantics
*/
Table fromChangelog(Expression... arguments);
}
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,12 @@ public Table toChangelog(Expression... arguments) {
return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), (Object[]) arguments);
}

@Override
public Table fromChangelog(Expression... arguments) {
return process(
BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments);
}

private QueryOperation createPartitionQueryOperation() {
return table.operationTreeBuilder.partition(partitionKeys, table.operationTree);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.StaticArgument;
Expand All @@ -33,6 +34,7 @@
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -52,7 +54,7 @@
* <p>Equality is defined by reference equality.
*/
@Internal
public final class BuiltInFunctionDefinition implements SpecializedFunction {
public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction {

private final String name;

Expand All @@ -72,6 +74,9 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction {

private final SqlCallSyntax sqlCallSyntax;

private final @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode>
private final @Nullable Function<ChangelogContext, ChangelogMode>

changelogModeResolver;

private final String sqlName;

private BuiltInFunctionDefinition(
Expand All @@ -84,7 +89,10 @@ private BuiltInFunctionDefinition(
boolean isDeterministic,
boolean isRuntimeProvided,
String runtimeClass,
boolean isInternal) {
boolean isInternal,
@Nullable
Function<ChangelogFunction.ChangelogContext, ChangelogMode>
changelogModeResolver) {
this.name = checkNotNull(name, "Name must not be null.");
this.sqlName = sqlName;
this.version = isInternal ? null : version;
Expand All @@ -95,6 +103,7 @@ private BuiltInFunctionDefinition(
this.runtimeClass = runtimeClass;
this.isInternal = isInternal;
this.sqlCallSyntax = sqlCallSyntax;
this.changelogModeResolver = changelogModeResolver;
validateFunction(this.name, this.version, this.isInternal);
}

Expand Down Expand Up @@ -131,6 +140,16 @@ public boolean isInternal() {
return isInternal;
}

/**
* Returns the optional changelog mode resolver for declaring the output changelog mode of a
* built-in process table function (e.g., FROM_CHANGELOG). Receives the {@link CallContext} so
* it can inspect function arguments to dynamically determine the changelog mode.
*/
public Optional<Function<ChangelogFunction.ChangelogContext, ChangelogMode>>
getChangelogModeResolver() {
return Optional.ofNullable(changelogModeResolver);
}

public String getQualifiedName() {
if (isInternal) {
return name;
Expand Down Expand Up @@ -225,6 +244,14 @@ public static String qualifyFunctionName(String name, int version) {
return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version);
}

@Override
public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) {
if (changelogModeResolver != null) {
return changelogModeResolver.apply(changelogContext);
}
return ChangelogMode.insertOnly();
}

// --------------------------------------------------------------------------------------------
// Builder
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -253,6 +280,9 @@ public static final class Builder {

private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;

private @Nullable Function<ChangelogFunction.ChangelogContext, ChangelogMode>
changelogModeResolver;

public Builder() {
// default constructor to allow a fluent definition
}
Expand Down Expand Up @@ -399,6 +429,18 @@ public Builder sqlName(String name) {
return this;
}

/**
* Sets a resolver that dynamically determines the output {@link ChangelogMode} for this
* built-in PTF. The resolver receives the {@link CallContext} and can inspect function
* arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit
* updates (e.g., FROM_CHANGELOG).
*/
public Builder changelogModeResolver(
Function<ChangelogFunction.ChangelogContext, ChangelogMode> changelogModeResolver) {
this.changelogModeResolver = changelogModeResolver;
return this;
}

public BuiltInFunctionDefinition build() {
return new BuiltInFunctionDefinition(
name,
Expand All @@ -410,7 +452,8 @@ public BuiltInFunctionDefinition build() {
isDeterministic,
isRuntimeProvided,
runtimeClass,
isInternal);
isInternal,
changelogModeResolver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
Expand Down Expand Up @@ -106,6 +107,7 @@
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
Expand All @@ -115,6 +117,7 @@
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;

Expand Down Expand Up @@ -802,6 +805,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
.build();

public static final BuiltInFunctionDefinition FROM_CHANGELOG =
BuiltInFunctionDefinition.newBuilder()
.name("FROM_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.SET_SEMANTIC_TABLE)),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
true))
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
.changelogModeResolver(FromChangelogTypeStrategy::resolveChangelogMode)
.runtimeClass(
"org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction")
.build();

public static final BuiltInFunctionDefinition GREATEST =
BuiltInFunctionDefinition.newBuilder()
.name("GREATEST")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;

import java.util.Optional;

/**
* An extension that allows a process table function (PTF) to emit results with changelog semantics.
*
Expand Down Expand Up @@ -105,5 +107,14 @@ interface ChangelogContext {
* are required and {@link ChangelogMode#keyOnlyDeletes()} are supported.
*/
ChangelogMode getRequiredChangelogMode();

/**
* Returns the value of a scalar argument at the given position.
*
* @param pos the argument position
* @param clazz the expected class of the argument value
* @return the argument value, or empty if the argument is null or not available
*/
<T> Optional<T> getArgumentValue(int pos, Class<T> clazz);
}
}
Loading