[FLINK-39636][table] Add produces_full_deletes parameter in TO_CHANGELOG#28235
[FLINK-39636][table] Add produces_full_deletes parameter in TO_CHANGELOG#28235raminqaf wants to merge 4 commits into
Conversation
14e0413 to
a8b3dbc
Compare
Adds an optional `produces_full_deletes` boolean parameter to the TO_CHANGELOG built-in PTF. When the caller asserts the input emits full DELETE rows, the function passes them through unchanged and the planner can skip ChangelogNormalize via the REQUIRE_FULL_DELETE conditional trait. Otherwise the function emits partial DELETE rows that preserve identifying columns (partition keys via the framework, upsert keys via the function) and null the rest. Rejects produces_full_deletes=true when the input changelog never emits DELETE rows (insert-only mode or op_mapping strips DELETE).
…angelogTypeStrategy Aligns the structure of ToChangelogTypeStrategy with its sibling FromChangelogTypeStrategy: introduces positional argument-index constants, switches the input type strategy to ValidationOnlyInputTypeStrategy so signature/argument-count come from the StaticArguments in the function definition, splits the monolithic validateInputs into per-argument helpers, and reuses ChangelogTypeStrategyUtils.resolveOpColumnName instead of the local duplicate.
Adds upsertKeyColumns() to TableSemantics, populated by the planner via FlinkRelMetadataQuery.getUpsertKeys. ProcessTableFunctions can now read the upsert key of each input table at specialization time without having to re-derive it from a RelNode or require the caller to repeat the key via PARTITION BY. Plumbs the value end-to-end: StreamPhysicalProcessTableFunction queries the metadata, StreamExecProcessTableFunction persists it as @JsonProperty inputUpsertKeys (one entry per table input) so compiled plans round-trip, and OperatorBindingCallContext / BridgingSqlFunction / ProcessTableRunnerGenerator thread it to the specialized function constructor.
bed8261 to
7078ccf
Compare
…ert key Consumes the new TableSemantics.upsertKeyColumns() in ToChangelogFunction to emit partial DELETE rows in row semantics when the input declares an upsert key. Identifying columns are preserved on the DELETE row, non-key columns are emitted as null. Without an upsert key the function still passes the input through unchanged. [hotfix][table] Use generic Map<String, String> for ToChangelogTypeStrategy op_mapping Drops raw Map types and the 'rawtypes' suppressions in validateOpMappingKeys and mapsDelete by passing the resolved op_mapping argument as Map<String, String>. Also extracts the literal 'DELETE' string into a constant derived from RowKind.DELETE.name().
| * we cannot null anything safely and fall back to passing the input through unchanged, which is | ||
| * equivalent to a full delete. | ||
| */ | ||
| private static boolean resolveProducesFullDelete( |
There was a problem hiding this comment.
Is this resolving to saying that we produce partial deletes as default?
- I think we should probably go with full deletes as default
- If the user specify partial delete, we try to accomodate. If we can't, we throw an error
| private final int[] outputIndices; | ||
| private final RowType inputRowType; | ||
| private final boolean producesFullDelete; | ||
| private final boolean[] preserveOnDelete; |
There was a problem hiding this comment.
Something like this probably makes more sense
| private final boolean[] preserveOnDelete; | |
| private final boolean[] upsertKeyIndex; |
Or upsertKeysAtIndex? or let me know if you have a better idea
| validateOpMap(this.rawOpMap, tableSemantics); | ||
| } | ||
| final boolean producesFullDeletesArg = | ||
| callContext.getArgumentValue(3, Boolean.class).orElse(false); |
There was a problem hiding this comment.
Didn't we add constant for these indexes?
| When the input has no derivable upsert key (e.g. a pure append-only source, or an upstream operator that erased the key), there is no identifying column to preserve. The function then passes the input through unchanged. | ||
|
|
||
| ```sql | ||
| -- Source emits -D[id:5] (key-only, no declared key). | ||
| -- Output: +I[op:'DELETE', id:5, name:null] | ||
| SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) | ||
| ``` |
There was a problem hiding this comment.
The example here is not matching what you want to show. You mention it passes through unchanged but the example does name:null
|
|
||
| @Override | ||
| public int[] upsertKeyColumns() { | ||
| return new int[0]; |
There was a problem hiding this comment.
Can't we run into a out of index exception here? Can we guarantee that int[0] is the shortest key?
What happens if the user has for example two primary keys defined for a table and they're both of length 1? We randomly pick one? I first thought we would expose all upsertKeys. If we only expose one, we have to make it clear in the documentation and explain which one we expose to the user
| // sources that emit key-only deletes. | ||
| TraitCondition.and( | ||
| TraitCondition.argIsEqualTo( | ||
| "produces_full_deletes", Boolean.TRUE), |
There was a problem hiding this comment.
Let's go with produces_full_deletes yes as default (adjustment from the flip). This aligns with what we added in the first versions and I think it's a better default behavior than the opposite.
gustavodemorais
left a comment
There was a problem hiding this comment.
Thanks for working on this, @raminqaf! Added the first set of comments, take a look
What is the purpose of the change
Adds the built-in
TO_CHANGELOGprocess table function (PTF) introduced in FLIP-564. The function converts a table (insert-only, retract, or upsert) into a changelog view that surfaces the row-kind (INSERT,UPDATE_BEFORE,UPDATE_AFTER,DELETE) as an explicitopcolumn.A new
produces_full_deletesboolean parameter controls how DELETE rows are emitted. Whenfalse(default), the function emits partial DELETEs: the upsert-key columns of the input table are preserved and the remaining columns are nulled. Whentrue, full DELETE rows are passed through unchanged. Partial deletes match the contract of most upsert sinks and avoid forcing users to retain full pre-image state.Brief change log
BuiltInFunctionDefinitions.TO_CHANGELOGand the correspondingToChangelogFunctionruntime inflink-table-api-java.StreamPhysicalToChangelog/StreamExecToChangelogexec nodes wired throughFlinkRelMdChangelogModeandFlinkRelMdUpsertKeysso the planner picks the correct downstream changelog mode.FlinkRelMetadataQuery.getUpsertKeys(input)collapsed to one candidate viaUpsertKeyUtil.smallestKey(...), and threaded it through the codegen path so the runtime knows which columns to keep on DELETE.PARTITION BYover a row-semantics input forwards the partitioning column as the upsert key.StreamExecProcessTableFunction(newinputUpsertKeysfield with a per-input empty-array default for back-compat with older plans).Verifying this change
This change added tests and can be verified as follows:
ToChangelogTest.ToChangelogSemanticTestscovering: insert-only inputs, retract inputs with a derivable upsert key (partial DELETE), retract inputs withproduces_full_deletes=true(full DELETE pass-through),PARTITION BYover a non-leading column, single-column upsert key from the input PK, and row semantics where the upsert key comes from the PK constraint rather thanPARTITION BY.ToChangelogRestoreTestvalidates the compiled-plan + savepoint round trip for a retract source.inputargument, wrong argument type, conflictingPARTITION BYand upsert key.mvn -pl flink-table/flink-table-planner test -Dtest='ToChangelogTest,ToChangelogSemanticTests,ToChangelogRestoreTest'. Result: 50 pass, 1 skip (existing savepoint stub).Does this pull request potentially affect one of the following parts:
@Public(Evolving): yes (newBuiltInFunctionDefinitions.TO_CHANGELOG; new compiled-plan JSON field onStreamExecProcessTableFunctionwith a back-compat default)ToChangelogFunctionoperator runs per record, but it only writes the row kind plus a projected key on DELETE; no extra state)Documentation
BuiltInFunctionDefinitions.TO_CHANGELOGJavaDoc)Was generative AI tooling used to co-author this PR?
Generated-by: Claude Opus 4.7