Skip to content

[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#2…#27602

Merged
dawidwys merged 1 commit intoapache:masterfrom
dawidwys:flink38928-2
Feb 26, 2026
Merged

[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#2…#27602
dawidwys merged 1 commit intoapache:masterfrom
dawidwys:flink38928-2

Conversation

@dawidwys
Copy link
Contributor

second attempt at #27502

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 13, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@dawidwys
Copy link
Contributor Author

@twalthr @pnowojski Could you take a look at this PR? I tried addressing @pnowojski concerns.

buffer.put(timestamp, records);

timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you explain in the comment how this +1 here and -1 in timer firing is supposed to work? I get it, but I think it would be good to explain it for the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember some code paths when we emit Long.MAX_VALUE on end_of_input; can we add a check that this increment doesn't result in overflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a point in that. When you emit MAX_VALUE there won't be any new records anyhow afterwards. So there will be no records with timestamp MAX_VALUE and we don't care if a timer fires or not.

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % I would like @rkhachatryan to also take a look here before merging.

I also presume we don't need a feature toggle for this one, as users would have to manually change the conflict resolution strategy in their schemas/tables for this change to take the effect. Right?

@dawidwys
Copy link
Contributor Author

I also presume we don't need a feature toggle for this one, as users would have to manually change the conflict resolution strategy in their schemas/tables for this change to take the effect. Right

Correct

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!
I've left some comments - PTAL (sorry if some questions were already asked on the PR).

Meta remark:
During our previous discussions around SinkUpsertMaterializer, my understanding was that we'll implement compaction on watermark on top of the existing implementation (be it sum v1 or v2).

I'm fine with adding a 3rd one, but I must say it complicates not only the code, but also the operations for the user.

On testing:
if the bugs I described are real, we should probably plug the existing testing code for SUM V1/2 - it was extended significantly for FLIP-544

if (previousValue != null) {
records.add(previousValue);
}
Iterator<Map.Entry<Long, List<RowData>>> iterator = buffer.entries().iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every timer timestamp X, we should know exactly the time X-1 when the record was added, right?

Why do we need to iterate over the whole state here?
Can't we use point lookup (which is MUCH less expensive than iteration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically you're correct. Still I'd say it's safer to iterate over the records. In a common scenario it should not matter much as there should not be many parallel watermarks flowing through a channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a common scenario it should not matter much as there should not be many parallel watermarks flowing through a channel.

That's a happy path, but if one channel is idling for some reason, we might have a SUMv1-like performance problem.

switch (pendingRecord.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
addRow(records, pendingRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is O(N), so the innermost loop is O(N^N).
Why don't we use a hashmap instead of linear findFirst?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit is copied over from SUM v1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in SUMv1 this is scattered over time; here, it happens at once for all the buffered records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also scattered over time in here. We eagerly try to apply it when processing a single record.

@dawidwys
Copy link
Contributor Author

Meta remark:
During our previous discussions around SinkUpsertMaterializer, my understanding was that we'll implement compaction on watermark on top of the existing implementation (be it sum v1 or v2).
I'm fine with adding a 3rd one, but I must say it complicates not only the code, but also the operations for the user.

First time that I hear that. I can't find any such comments on the FLIP discussion. Moreover I can't think how that could be possible since we're changing the semantics slightly. Lastly adding watermark compaction to the existing SUM would not help with the state size. It still needs to keep the entire history.

@rkhachatryan
Copy link
Contributor

The latest test failure seems to be caused by FLINK-39103 - which is now fixed in master.

@dawidwys dawidwys force-pushed the flink38928-2 branch 5 times, most recently from 9a22e3c to 61146ea Compare February 24, 2026 14:31
Comment on lines 169 to 175
if (ttlConfig.isEnabled()) {
bufferDescriptor.enableTimeToLive(
StateTtlConfig.newBuilder(ttlConfig)
.setTimeToLive(ttlConfig.getTimeToLive().plus(Duration.ofSeconds(1)))
.build());
currentValueDescriptor.enableTimeToLive(ttlConfig);
}
Copy link
Contributor

@rkhachatryan rkhachatryan Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should consolidatedCheckpointId also be covered?
Although the value is small, with unbounded key space growth it can be a problem as well (and I guess unbounded key space is a common reason to enable TTL).

NIT: enforce stateVisibility = NeverReturnExpired or document potential issues of ReturnExpiredIfNotCleanedUp?
NIT: document the motivation for +1 second
NIT: increase this to 5-10 seconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also would be good have test coverage for TTL; but I think that's NIT

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
class WatermarkCompactingSinkMaterializerTest {
Copy link
Contributor

@rkhachatryan rkhachatryan Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to parameterize this test with the type of (heap / rocksdb).
Or at least make it use rocksdb as the most widely used state backend.

Comment on lines 169 to 175
if (ttlConfig.isEnabled()) {
bufferDescriptor.enableTimeToLive(
StateTtlConfig.newBuilder(ttlConfig)
.setTimeToLive(ttlConfig.getTimeToLive().plus(Duration.ofSeconds(1)))
.build());
currentValueDescriptor.enableTimeToLive(ttlConfig);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also would be good have test coverage for TTL; but I think that's NIT

Comment on lines 115 to 121
void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness =
createHarness(behavior)) {
harness.open();

// Insert and compact
harness.processElement(insertRecord(1L, 1, "a1"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many tests in this class cover the case before the 1st watermark.
For example, here both currentWatermark is null and record timestamp is null.
There are tests that cover after-watermark processing as well, but the coverage seems to be lower.

Maybe we can parameterize the class with something like nullable initialWatermark.
If it's set, then send it the harness before the test; and set on created record unless a specific timestamp is provided.

WDYT?

Comment on lines +362 to +375
if (isErrorOrNothingConflictStrategy()) {
// Use input parallelism to preserve watermark semantics
transformForMaterializer =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
"WatermarkTimestampAssigner",
"WatermarkTimestampAssigner",
config),
new WatermarkTimestampAssigner(),
inputTransform.getOutputType(),
inputTransform.getParallelism(),
false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is adding WatermarkTimestampAssigner covered by any tests?

I think it would be also good to have an ITCase with more than one input channels.

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I still have some concerns (around validation, performance, test coverage) but I think we can address those in the follow-up PRs.

@dawidwys dawidwys merged commit 3ecbbd1 into apache:master Feb 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants