Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
364a47e
[FLINK-39379][table] Add state management to PTF test harness
autophagy May 19, 2026
cd965dc
Support row state
autophagy May 21, 2026
0282cc4
Remove FQDNs
autophagy May 21, 2026
01b0ec1
Make better use of stream over for loop
autophagy May 21, 2026
1e04b25
Improve docs
autophagy May 22, 2026
faf08df
Rename withInitialStateArgument to be clearer
autophagy May 22, 2026
5b749de
Give a clearer name to the state manager's state setting method
autophagy May 22, 2026
b18ef3b
Fix toList
autophagy May 22, 2026
cd57502
More consistent naming for state access with keys
autophagy May 22, 2026
47a27bd
Mention other public state interaction/mutation methods in docs
autophagy May 22, 2026
f86fd4f
Add validation for if initial state configuration does not match the …
autophagy May 22, 2026
4b29928
Improve tests
autophagy May 22, 2026
80e5f96
Validate keys against the partition key info when doing state mutatio…
autophagy May 22, 2026
c0f3e89
Use value state consistently
autophagy May 22, 2026
f63e165
Rename state mutation functions
autophagy May 29, 2026
6cf9da6
Add test for OPTIONAL_PARTITION_BY with initial state, add clarifying…
autophagy May 29, 2026
876a2c7
Improve error messages involving keyed state
autophagy May 29, 2026
bcbcff0
Improve comments on partition key consistency and validation
autophagy May 29, 2026
8a64b84
Improve docs around row/pojo state, add example and docs for optional…
autophagy May 29, 2026
32450aa
Add tests for partition mutation with stateful PTFS with optional par…
autophagy May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 236 additions & 2 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,240 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}

#### Testing with State

The harness supports all PTF state types: value state (Pojo and `Row`), list state (`ListView`),
and map state (`MapView`).

{{< tabs "state-testing" >}}
{{< tab "Java" >}}
```java
// A PTF that uses all four state types: Pojo value state, Row value state, ListView state, and MapView state.
@DataTypeHint("ROW<count BIGINT>")
public class StatefulPTF extends ProcessTableFunction<Row> {
public static class ValueState {
public long count = 0L;
}

public void eval(
@StateHint ValueState valueState,
@StateHint(type = @DataTypeHint("ROW<lastValue INT>")) Row rowState,
@StateHint(type = @DataTypeHint("ARRAY<INT>")) ListView<Integer> listState,
@StateHint MapView<String, Integer> mapState,
@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) throws Exception {
// Value state — increment counter
valueState.count++;

// Row state — track the last value seen
int value = input.getFieldAs("value");
rowState.setField("lastValue", value);

// ListView state — accumulate values
listState.add(value);

// MapView state — count occurrences by name
String name = input.getFieldAs("name");
Integer tagCount = mapState.get(name);
mapState.put(name, tagCount == null ? 1 : tagCount + 1);

collect(Row.of(valueState.count));
}
}

@Test
void testWithState() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
.build()) {

harness.processElement(Row.of("Alice", 10));
harness.processElement(Row.of("Alice", 20));

List<Row> output = harness.getOutput();
assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
Comment thread
autophagy marked this conversation as resolved.
}
}
```
{{< /tab >}}
{{< /tabs >}}

**Initial State Setup**: Use `.withInitialStateForKey()` to pre-populate state before processing.
State initialization is scoped per partition key:

{{< tabs "initial-state" >}}
{{< tab "Java" >}}
```java
@Test
void testWithInitialState() throws Exception {
// Value state
StatefulPTF.ValueState initialValue = new StatefulPTF.ValueState();
initialValue.count = 100L;

// Row state
Row initialRow = Row.withNames();
initialRow.setField("lastValue", 42);

// ListView state
ListView<Integer> initialList = new ListView<>();
initialList.add(10);
initialList.add(20);

// MapView state
MapView<String, Integer> initialMap = new MapView<>();
initialMap.put("Alice", 5);

try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
// Initial state is set per partition key
.withInitialStateForKey("valueState", Row.of("Alice"), initialValue)
.withInitialStateForKey("rowState", Row.of("Alice"), initialRow)
.withInitialStateForKey("listState", Row.of("Alice"), initialList)
.withInitialStateForKey("mapState", Row.of("Alice"), initialMap)
.build()) {

harness.processElement(Row.of("Alice", 10));

List<Row> output = harness.getOutput();
assertThat(output).containsExactly(Row.of("Alice", 101L));
Comment thread
autophagy marked this conversation as resolved.
}
}
```
{{< /tab >}}
{{< /tabs >}}

**State Introspection**: Use `getStateForKey()`, `getKeysForState()`, and `getStateForAllKeys()` to
inspect state during tests:

{{< tabs "state-introspection" >}}
{{< tab "Java" >}}
```java
@Test
void testStateIntrospection() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
.build()) {

harness.processElement(Row.of("Alice", 10));
harness.processElement(Row.of("Bob", 20));

// Check value state
StatefulPTF.ValueState aliceState =
harness.getStateForKey("valueState", Row.of("Alice"));
assertThat(aliceState.count).isEqualTo(1L);

// Check Row state
Row aliceRowState = harness.getStateForKey("rowState", Row.of("Alice"));
assertThat(aliceRowState.getField("lastValue")).isEqualTo(10);

// Check ListView state
ListView<Integer> aliceList = harness.getStateForKey("listState", Row.of("Alice"));
assertThat(aliceList.getList()).containsExactly(10);

// Check MapView state
MapView<String, Integer> aliceMap = harness.getStateForKey("mapState", Row.of("Alice"));
assertThat(aliceMap.get("Alice")).isEqualTo(1);

// Get all partition keys with state
Set<Row> keys = harness.getKeysForState("valueState");
assertThat(keys).containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"));

// Get all state across partition keys
Map<Row, StatefulPTF.ValueState> allState =
harness.getStateForAllKeys("valueState");
assertThat(allState.get(Row.of("Bob")).count).isEqualTo(1L);
}
}
```
{{< /tab >}}
{{< /tabs >}}

Comment thread
autophagy marked this conversation as resolved.
**State Mutation**: Use `setStateForKey()`, `clearAllStatesForKey()`, and `clearStateForKey()` to
modify state during tests:

{{< tabs "state-mutation" >}}
{{< tab "Java" >}}
```java
@Test
void testStateMutation() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
.build()) {

harness.processElement(Row.of("Alice", 10));

// Overwrite a specific state entry for a partition key
StatefulPTF.ValueState newState = new StatefulPTF.ValueState();
newState.count = 100L;
harness.setStateForKey("valueState", Row.of("Alice"), newState);

Comment thread
autophagy marked this conversation as resolved.
// Verify the state was updated
StatefulPTF.ValueState state = harness.getStateForKey("valueState", Row.of("Alice"));
assertThat(state.count).isEqualTo(100L);

// Clear a specific state entry (resets to default)
harness.clearStateForKey("listState", Row.of("Alice"));

// Clear all state for a partition key
harness.clearAllStatesForKey(Row.of("Alice"));
}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Optional Partitioning

For PTFs with `OPTIONAL_PARTITION_BY`, you can omit `withPartitionBy()` during harness setup. The
harness executes the function as if it had a parallelism of 1, with the default `Row.of()` key,
so all data is routed through the same function instance. Use `Row.of()` to access state:

{{< tabs "optional-partition" >}}
{{< tab "Java" >}}
```java
// A PTF with optional partitioning that counts all rows.
@DataTypeHint("ROW<count BIGINT>")
public class GlobalCountPTF extends ProcessTableFunction<Row> {
public static class CountState {
public long count = 0L;
}

public void eval(
@StateHint CountState state,
@ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, ArgumentTrait.OPTIONAL_PARTITION_BY})
Row input) {
state.count++;
collect(Row.of(state.count));
}
}

@Test
void testOptionalPartitionWithoutPartitionBy() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(GlobalCountPTF.class)
.withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>"))
.build()) {

harness.processElement(Row.of("A", 10));
harness.processElement(Row.of("B", 20));

// All data shares a single Row.of() partition key
GlobalCountPTF.CountState state = harness.getStateForKey("state", Row.of());
assertThat(state.count).isEqualTo(2L);
}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Configuring Table Argument Types

In contexts where the harness can't infer the table argument types for table arguments (when using unannotated `Row` inputs,
Expand Down Expand Up @@ -2348,8 +2582,8 @@ void testPOJO() throws Exception {

### PTF Features Unsupported by the TestHarness

- `Context` paramter
- State (`@StateHint`)
- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`
- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
- State TTL (state is supported but TTL expiration is not yet implemented)
Loading