diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
index f63566befce7d..7382a9a9b848a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
@@ -86,6 +86,27 @@ public interface TableSemantics {
*/
int[] partitionByColumns();
+ /**
+ * Returns the upsert key of the passed table as derived by the planner from primary key
+ * constraints and the rewritten relational plan. The upsert key uniquely identifies a row
+ * within the input changelog and survives planner transformations that preserve key semantics
+ * (e.g. filters, projections that retain key columns). Applies to both table arguments with row
+ * and set semantics.
+ *
+ *
This complements {@link #partitionByColumns()}: a caller is not required to repeat the
+ * primary key via {@code PARTITION BY} just so a PTF can identify rows - the planner already
+ * knows the key from the input table's declaration.
+ *
+ * @return An array of indexes (0-based) that specify the upsert key columns. Returns an empty
+ * array if the planner could not derive an upsert key for the input (e.g., append-only
+ * sources without a declared primary key, or operations that destroyed the key). Returns an
+ * empty array during the type inference phase as the upsert key is still unknown at that
+ * point.
+ */
+ default int[] upsertKeyColumns() {
+ return new int[0];
+ }
+
/**
* Returns information about how the passed table is ordered. Applies only to table arguments
* with set semantics.
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
index fe881f8fd1fc2..ad8cad8379ab9 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java
@@ -35,9 +35,10 @@ public class TableSemanticsMock implements TableSemantics {
private final SortDirection[] orderByDirections;
private final int timeColumn;
private final ChangelogMode changelogMode;
+ private final int[] upsertKeyColumns;
public TableSemanticsMock(DataType dataType) {
- this(dataType, new int[0], new int[0], -1, null);
+ this(dataType, new int[0], new int[0], -1, null, new int[0]);
}
public TableSemanticsMock(
@@ -46,6 +47,16 @@ public TableSemanticsMock(
int[] orderByColumns,
int timeColumn,
@Nullable ChangelogMode changelogMode) {
+ this(dataType, partitionByColumns, orderByColumns, timeColumn, changelogMode, new int[0]);
+ }
+
+ public TableSemanticsMock(
+ DataType dataType,
+ int[] partitionByColumns,
+ int[] orderByColumns,
+ int timeColumn,
+ @Nullable ChangelogMode changelogMode,
+ int[] upsertKeyColumns) {
this.dataType = dataType;
this.partitionByColumns = partitionByColumns;
this.orderByColumns = orderByColumns;
@@ -55,6 +66,7 @@ public TableSemanticsMock(
}
this.timeColumn = timeColumn;
this.changelogMode = changelogMode;
+ this.upsertKeyColumns = upsertKeyColumns;
}
@Override
@@ -82,6 +94,11 @@ public int timeColumn() {
return timeColumn;
}
+ @Override
+ public int[] upsertKeyColumns() {
+ return upsertKeyColumns;
+ }
+
@Override
public Optional changelogMode() {
return Optional.ofNullable(changelogMode);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 2c393adf875ec..59e67eaea94f8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -335,19 +335,20 @@ public boolean hasScalarArgument(String name) {
* scalar arguments through the same coercion path as validation.
*/
public CallContext toCallContext(RexCall call) {
- return toCallContext(call, null, null, null);
+ return toCallContext(call, null, null, null, null);
}
/**
* Variant of {@link #toCallContext(RexCall)} that additionally exposes the call's input time
- * columns and changelog modes - needed by the streaming codegen path so PTFs can specialize
- * themselves to the exact call.
+ * columns, changelog modes, and per-input upsert keys - needed by the streaming codegen path so
+ * PTFs can specialize themselves to the exact call.
*/
public CallContext toCallContext(
RexCall call,
@Nullable List inputTimeColumns,
@Nullable List inputChangelogModes,
- @Nullable ChangelogMode outputChangelogMode) {
+ @Nullable ChangelogMode outputChangelogMode,
+ @Nullable List inputUpsertKeys) {
return new OperatorBindingCallContext(
dataTypeFactory,
getDefinition(),
@@ -355,7 +356,8 @@ public CallContext toCallContext(
call.getType(),
inputTimeColumns,
inputChangelogModes,
- outputChangelogMode);
+ outputChangelogMode,
+ inputUpsertKeys);
}
/**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index f31406dad196d..47ae23ed482c8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -64,13 +64,14 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext {
private final @Nullable List inputTimeColumns;
private final @Nullable List inputChangelogModes;
private final @Nullable ChangelogMode outputChangelogMode;
+ private final @Nullable List inputUpsertKeys;
public OperatorBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
SqlOperatorBinding binding,
RelDataType returnRelDataType) {
- this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null);
+ this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null, null);
}
public OperatorBindingCallContext(
@@ -80,7 +81,8 @@ public OperatorBindingCallContext(
RelDataType returnRelDataType,
@Nullable List inputTimeColumns,
@Nullable List inputChangelogModes,
- @Nullable ChangelogMode outputChangelogMode) {
+ @Nullable ChangelogMode outputChangelogMode,
+ @Nullable List inputUpsertKeys) {
super(
dataTypeFactory,
definition,
@@ -109,6 +111,7 @@ public int size() {
this.inputTimeColumns = inputTimeColumns;
this.inputChangelogModes = inputChangelogModes;
this.outputChangelogMode = outputChangelogMode;
+ this.inputUpsertKeys = inputUpsertKeys;
}
@Override
@@ -173,13 +176,18 @@ public Optional getTableSemantics(int pos) {
Optional.ofNullable(inputChangelogModes)
.map(m -> m.get(tableArgCall.getInputIndex()))
.orElse(null);
+ final int[] upsertKeyColumns =
+ Optional.ofNullable(inputUpsertKeys)
+ .map(m -> m.get(tableArgCall.getInputIndex()))
+ .orElse(new int[0]);
return Optional.of(
OperatorBindingTableSemantics.create(
argumentDataTypes.get(pos),
staticArg,
tableArgCall,
timeColumn,
- changelogMode));
+ changelogMode,
+ upsertKeyColumns));
}
@Override
@@ -283,20 +291,23 @@ private static class OperatorBindingTableSemantics implements TableSemantics {
private final SortDirection[] orderByDirections;
private final int timeColumn;
private final @Nullable ChangelogMode changelogMode;
+ private final int[] upsertKeyColumns;
public static OperatorBindingTableSemantics create(
DataType tableDataType,
StaticArgument staticArg,
RexTableArgCall tableArgCall,
int timeColumn,
- @Nullable ChangelogMode changelogMode) {
+ @Nullable ChangelogMode changelogMode,
+ int[] upsertKeyColumns) {
return new OperatorBindingTableSemantics(
createDataType(tableDataType, staticArg),
tableArgCall.getPartitionKeys(),
tableArgCall.getOrderKeys(),
RexTableArgCall.toSortDirections(tableArgCall.getSortOrder()),
timeColumn,
- changelogMode);
+ changelogMode,
+ upsertKeyColumns);
}
private OperatorBindingTableSemantics(
@@ -305,13 +316,15 @@ private OperatorBindingTableSemantics(
int[] orderByColumns,
SortDirection[] orderByDirections,
int timeColumn,
- @Nullable ChangelogMode changelogMode) {
+ @Nullable ChangelogMode changelogMode,
+ int[] upsertKeyColumns) {
this.dataType = dataType;
this.partitionByColumns = partitionByColumns;
this.orderByColumns = orderByColumns;
this.orderByDirections = orderByDirections;
this.timeColumn = timeColumn;
this.changelogMode = changelogMode;
+ this.upsertKeyColumns = upsertKeyColumns;
}
private static DataType createDataType(DataType tableDataType, StaticArgument staticArg) {
@@ -349,6 +362,11 @@ public int timeColumn() {
return timeColumn;
}
+ @Override
+ public int[] upsertKeyColumns() {
+ return upsertKeyColumns;
+ }
+
@Override
public Optional changelogMode() {
return Optional.ofNullable(changelogMode);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index 3973329af7484..33de7a7bc8464 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -108,6 +108,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase
public static final String FIELD_NAME_FUNCTION_CALL = "functionCall";
public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes";
public static final String FIELD_NAME_OUTPUT_CHANGELOG_MODE = "outputChangelogMode";
+ public static final String FIELD_NAME_INPUT_UPSERT_KEYS = "inputUpsertKeys";
@JsonProperty(FIELD_NAME_UID)
private final @Nullable String uid;
@@ -121,6 +122,9 @@ public class StreamExecProcessTableFunction extends ExecNodeBase
@JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE)
private final ChangelogMode outputChangelogMode;
+ @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+ private final List inputUpsertKeys;
+
public StreamExecProcessTableFunction(
ReadableConfig tableConfig,
List inputProperties,
@@ -129,7 +133,8 @@ public StreamExecProcessTableFunction(
@Nullable String uid,
RexCall invocation,
List inputChangelogModes,
- ChangelogMode outputChangelogMode) {
+ ChangelogMode outputChangelogMode,
+ List inputUpsertKeys) {
this(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(StreamExecProcessTableFunction.class),
@@ -141,7 +146,8 @@ public StreamExecProcessTableFunction(
uid,
invocation,
inputChangelogModes,
- outputChangelogMode);
+ outputChangelogMode,
+ inputUpsertKeys);
}
@JsonCreator
@@ -155,7 +161,8 @@ public StreamExecProcessTableFunction(
@JsonProperty(FIELD_NAME_UID) @Nullable String uid,
@JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
@JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) List inputChangelogModes,
- @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode) {
+ @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode,
+ @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) @Nullable List inputUpsertKeys) {
super(id, context, persistedConfig, inputProperties, outputType, description);
this.uid = uid;
// Mirror the FlinkLogicalTableFunctionScan converter for the compiled-plan restore path:
@@ -164,6 +171,14 @@ public StreamExecProcessTableFunction(
this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) invocation);
this.inputChangelogModes = inputChangelogModes;
this.outputChangelogMode = outputChangelogMode;
+ // Older compiled plans (pre-FLINK-39735) did not persist this field. Default to per-input
+ // empty arrays so the runtime sees the same behavior as before (no derivable upsert key).
+ this.inputUpsertKeys =
+ inputUpsertKeys != null
+ ? inputUpsertKeys
+ : IntStream.range(0, inputChangelogModes.size())
+ .mapToObj(i -> new int[0])
+ .collect(Collectors.toList());
}
public @Nullable String getUid() {
@@ -202,7 +217,12 @@ protected Transformation translateToPlanInternal(
final RexCall udfCall = StreamPhysicalProcessTableFunction.toUdfCall(invocation);
final GeneratedRunnerResult generated =
ProcessTableRunnerGenerator.generate(
- ctx, udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode);
+ ctx,
+ udfCall,
+ inputTimeColumns,
+ inputChangelogModes,
+ outputChangelogMode,
+ inputUpsertKeys);
final GeneratedProcessTableRunner generatedRunner = generated.runner();
final LinkedHashMap stateInfos = generated.stateInfos();
@@ -309,6 +329,7 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
}
final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex());
+ final int[] upsertKeyColumns = inputUpsertKeys.get(tableArgCall.getInputIndex());
return new RuntimeTableSemantics(
tableArg.getName(),
@@ -320,7 +341,8 @@ private RuntimeTableSemantics createRuntimeTableSemantics(
consumedChangelogMode,
tableArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH),
tableArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE),
- timeColumn);
+ timeColumn,
+ upsertKeyColumns);
}
private Transformation createKeyedTransformation(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
index 5ccecf18e71be..6f90b38e431d1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java
@@ -27,11 +27,13 @@
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.RexTableArgCall;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
+import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.types.inference.StaticArgument;
@@ -165,6 +167,7 @@ public ExecNode> translateToExecNode() {
verifyTimeAttributes(getInputs(), call, inputChangelogModes, outputChangelogMode);
final List> providedInputArgs = getProvidedInputArgs(call);
verifyPassThroughColumnsForUpdates(providedInputArgs, outputChangelogMode);
+ final List inputUpsertKeys = deriveInputUpsertKeys(getInputs());
return new StreamExecProcessTableFunction(
unwrapTableConfig(this),
getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()),
@@ -173,7 +176,26 @@ public ExecNode> translateToExecNode() {
uid,
call,
inputChangelogModes,
- outputChangelogMode);
+ outputChangelogMode,
+ inputUpsertKeys);
+ }
+
+ /**
+ * Derives an upsert key (collapsed to one candidate via {@link UpsertKeyUtil#smallestKey}) for
+ * each input. Returns an empty array entry for inputs without a derivable upsert key
+ * (append-only sources without a declared primary key, or operations that destroyed the key).
+ * Surfaces as {@link org.apache.flink.table.functions.TableSemantics#upsertKeyColumns()} so
+ * PTFs can identify rows without requiring callers to repeat the key via PARTITION BY.
+ */
+ private static List deriveInputUpsertKeys(List inputs) {
+ final List perInput = new ArrayList<>(inputs.size());
+ for (RelNode input : inputs) {
+ final FlinkRelMetadataQuery fmq =
+ FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery());
+ final Set upsertKeys = fmq.getUpsertKeys(input);
+ perInput.add(UpsertKeyUtil.smallestKey(upsertKeys).orElse(new int[0]));
+ }
+ return perInput;
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
index 52df803d5c8f8..402157aa0fdb6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala
@@ -65,7 +65,8 @@ object ProcessTableRunnerGenerator {
udfCall: RexCall,
inputTimeColumns: java.util.List[Integer],
inputChangelogModes: java.util.List[ChangelogMode],
- outputChangelogMode: ChangelogMode): GeneratedRunnerResult = {
+ outputChangelogMode: ChangelogMode,
+ inputUpsertKeys: java.util.List[Array[Int]]): GeneratedRunnerResult = {
val function: BridgingSqlFunction = udfCall.getOperator.asInstanceOf[BridgingSqlFunction]
val definition: FunctionDefinition = function.getDefinition
val dataTypeFactory = function.getDataTypeFactory
@@ -77,7 +78,12 @@ object ProcessTableRunnerGenerator {
// Thus, functions can reconfigure themselves for the exact use case.
// Including updating their state layout.
val callContext =
- function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode)
+ function.toCallContext(
+ udfCall,
+ inputTimeColumns,
+ inputChangelogModes,
+ outputChangelogMode,
+ inputUpsertKeys)
// Create the final UDF for runtime
val udf = UserDefinedFunctionHelper.createSpecializedFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index fd646a34148b4..47939aa8dffe4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -1676,7 +1676,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val inputTimeColumns = StreamPhysicalProcessTableFunction.toInputTimeColumns(process.getCall)
val function = udfCall.getOperator.asInstanceOf[BridgingSqlFunction]
val callContext =
- function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode)
+ function.toCallContext(
+ udfCall,
+ inputTimeColumns,
+ inputChangelogModes,
+ outputChangelogMode,
+ null)
// Expose a simplified context focused on changelog-relevant inputs: changelog modes,
// resolved literal arguments, and table semantics (e.g., partition-by columns).
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 8d83e51770d59..76187ed679e0b 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -456,6 +456,18 @@ public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r, String s)
}
}
+ /**
+ * Testing function for FLINK-39735: surfaces the planner-derived upsert key on {@link
+ * TableSemantics}. Used by tests to assert that {@code upsertKeyColumns()} reports the
+ * primary-key columns of the input even when the caller did not write {@code PARTITION BY}.
+ */
+ public static class UpsertKeyContextFunction extends AppendProcessTableFunctionBase {
+ public void eval(Context ctx, @ArgumentHint(ROW_SEMANTIC_TABLE) Row r) {
+ final TableSemantics semantics = ctx.tableSemanticsFor("r");
+ collectObjects(r, semantics.upsertKeyColumns());
+ }
+ }
+
/** Testing function. */
public static class PojoStateFunction extends AppendProcessTableFunctionBase {
public void eval(@StateHint Score s, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
index cabab4c613143..6719adda32034 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java
@@ -44,6 +44,7 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable {
private final boolean passColumnsThrough;
private final boolean hasSetSemantics;
private final int timeColumn;
+ private final int[] upsertKeyColumns;
private transient ChangelogMode changelogMode;
@@ -57,7 +58,8 @@ public RuntimeTableSemantics(
RuntimeChangelogMode consumedChangelogMode,
boolean passColumnsThrough,
boolean hasSetSemantics,
- int timeColumn) {
+ int timeColumn,
+ int[] upsertKeyColumns) {
this.argName = argName;
this.inputIndex = inputIndex;
this.dataType = dataType;
@@ -68,6 +70,7 @@ public RuntimeTableSemantics(
this.passColumnsThrough = passColumnsThrough;
this.hasSetSemantics = hasSetSemantics;
this.timeColumn = timeColumn;
+ this.upsertKeyColumns = upsertKeyColumns;
}
public String getArgName() {
@@ -118,6 +121,11 @@ public int timeColumn() {
return timeColumn;
}
+ @Override
+ public int[] upsertKeyColumns() {
+ return upsertKeyColumns;
+ }
+
@Override
public Optional changelogMode() {
return Optional.of(getChangelogMode());
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
index be390ab5f5557..38a0df4811168 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
@@ -246,7 +246,8 @@ private static RuntimeTableSemantics tableSemantics() {
RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
/* passColumnsThrough */ false,
/* hasSetSemantics */ true,
- /* timeColumn */ 1);
+ /* timeColumn */ 1,
+ /* upsertKeyColumns */ new int[0]);
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
index fadf21d7dd942..1f81a36ed767a 100644
--- a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
+++ b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
@@ -30,10 +30,16 @@
class TestHarnessTableSemantics implements TableSemantics {
private final DataType dataType;
private final int[] partitionByColumns;
+ private final int[] upsertKeyColumns;
TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) {
+ this(dataType, partitionByColumns, new int[0]);
+ }
+
+ TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns, int[] upsertKeyColumns) {
this.dataType = dataType;
this.partitionByColumns = partitionByColumns;
+ this.upsertKeyColumns = upsertKeyColumns;
}
@Override
@@ -61,6 +67,11 @@ public int timeColumn() {
return -1;
}
+ @Override
+ public int[] upsertKeyColumns() {
+ return upsertKeyColumns;
+ }
+
@Override
public Optional changelogMode() {
return Optional.empty();