[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#2…#27602
[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#2…#27602dawidwys merged 1 commit intoapache:masterfrom
Conversation
|
@twalthr @pnowojski Could you take a look at this PR? I tried addressing @pnowojski concerns. |
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
4645e69 to
e3acc7e
Compare
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
| buffer.put(timestamp, records); | ||
|
|
||
| timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp); | ||
| timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp + 1); |
There was a problem hiding this comment.
nit: could you explain in the comment how this +1 here and -1 in timer firing is supposed to work? I get it, but I think it would be good to explain it for the future.
There was a problem hiding this comment.
I remember some code paths when we emit Long.MAX_VALUE on end_of_input; can we add a check that this increment doesn't result in overflow?
There was a problem hiding this comment.
I don't see a point in that. When you emit MAX_VALUE there won't be any new records anyhow afterwards. So there will be no records with timestamp MAX_VALUE and we don't care if a timer fires or not.
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Outdated
Show resolved
Hide resolved
d12d81c to
b784674
Compare
pnowojski
left a comment
There was a problem hiding this comment.
LGTM % I would like @rkhachatryan to also take a look here before merging.
I also presume we don't need a feature toggle for this one, as users would have to manually change the conflict resolution strategy in their schemas/tables for this change to take the effect. Right?
Correct |
b784674 to
51392e5
Compare
There was a problem hiding this comment.
Thanks for the PR!
I've left some comments - PTAL (sorry if some questions were already asked on the PR).
Meta remark:
During our previous discussions around SinkUpsertMaterializer, my understanding was that we'll implement compaction on watermark on top of the existing implementation (be it sum v1 or v2).
I'm fine with adding a 3rd one, but I must say it complicates not only the code, but also the operations for the user.
On testing:
if the bugs I described are real, we should probably plug the existing testing code for SUM V1/2 - it was extended significantly for FLIP-544
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
Show resolved
Hide resolved
...nner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
Show resolved
Hide resolved
...untime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
Show resolved
Hide resolved
...me/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
Show resolved
Hide resolved
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
| if (previousValue != null) { | ||
| records.add(previousValue); | ||
| } | ||
| Iterator<Map.Entry<Long, List<RowData>>> iterator = buffer.entries().iterator(); |
There was a problem hiding this comment.
For every timer timestamp X, we should know exactly the time X-1 when the record was added, right?
Why do we need to iterate over the whole state here?
Can't we use point lookup (which is MUCH less expensive than iteration)
There was a problem hiding this comment.
Theoretically you're correct. Still I'd say it's safer to iterate over the records. In a common scenario it should not matter much as there should not be many parallel watermarks flowing through a channel.
There was a problem hiding this comment.
In a common scenario it should not matter much as there should not be many parallel watermarks flowing through a channel.
That's a happy path, but if one channel is idling for some reason, we might have a SUMv1-like performance problem.
| switch (pendingRecord.getRowKind()) { | ||
| case INSERT: | ||
| case UPDATE_AFTER: | ||
| addRow(records, pendingRecord); |
There was a problem hiding this comment.
This call is O(N), so the innermost loop is O(N^N).
Why don't we use a hashmap instead of linear findFirst?
There was a problem hiding this comment.
This bit is copied over from SUM v1.
There was a problem hiding this comment.
Yes, but in SUMv1 this is scattered over time; here, it happens at once for all the buffered records.
There was a problem hiding this comment.
It is also scattered over time in here. We eagerly try to apply it when processing a single record.
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
Show resolved
Hide resolved
First time that I hear that. I can't find any such comments on the FLIP discussion. Moreover I can't think how that could be possible since we're changing the semantics slightly. Lastly adding watermark compaction to the existing SUM would not help with the state size. It still needs to keep the entire history. |
|
The latest test failure seems to be caused by FLINK-39103 - which is now fixed in master. |
9a22e3c to
61146ea
Compare
| if (ttlConfig.isEnabled()) { | ||
| bufferDescriptor.enableTimeToLive( | ||
| StateTtlConfig.newBuilder(ttlConfig) | ||
| .setTimeToLive(ttlConfig.getTimeToLive().plus(Duration.ofSeconds(1))) | ||
| .build()); | ||
| currentValueDescriptor.enableTimeToLive(ttlConfig); | ||
| } |
There was a problem hiding this comment.
Should consolidatedCheckpointId also be covered?
Although the value is small, with unbounded key space growth it can be a problem as well (and I guess unbounded key space is a common reason to enable TTL).
NIT: enforce stateVisibility = NeverReturnExpired or document potential issues of ReturnExpiredIfNotCleanedUp?
NIT: document the motivation for +1 second
NIT: increase this to 5-10 seconds
There was a problem hiding this comment.
Also would be good have test coverage for TTL; but I think that's NIT
a7b5181 to
0c79f05
Compare
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
||
| /** Tests for {@link WatermarkCompactingSinkMaterializer}. */ | ||
| class WatermarkCompactingSinkMaterializerTest { |
There was a problem hiding this comment.
It would be great to parameterize this test with the type of (heap / rocksdb).
Or at least make it use rocksdb as the most widely used state backend.
| if (ttlConfig.isEnabled()) { | ||
| bufferDescriptor.enableTimeToLive( | ||
| StateTtlConfig.newBuilder(ttlConfig) | ||
| .setTimeToLive(ttlConfig.getTimeToLive().plus(Duration.ofSeconds(1))) | ||
| .build()); | ||
| currentValueDescriptor.enableTimeToLive(ttlConfig); | ||
| } |
There was a problem hiding this comment.
Also would be good have test coverage for TTL; but I think that's NIT
.../java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
Show resolved
Hide resolved
| void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception { | ||
| try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = | ||
| createHarness(behavior)) { | ||
| harness.open(); | ||
|
|
||
| // Insert and compact | ||
| harness.processElement(insertRecord(1L, 1, "a1")); |
There was a problem hiding this comment.
Many tests in this class cover the case before the 1st watermark.
For example, here both currentWatermark is null and record timestamp is null.
There are tests that cover after-watermark processing as well, but the coverage seems to be lower.
Maybe we can parameterize the class with something like nullable initialWatermark.
If it's set, then send it the harness before the test; and set on created record unless a specific timestamp is provided.
WDYT?
| if (isErrorOrNothingConflictStrategy()) { | ||
| // Use input parallelism to preserve watermark semantics | ||
| transformForMaterializer = | ||
| ExecNodeUtil.createOneInputTransformation( | ||
| inputTransform, | ||
| createTransformationMeta( | ||
| WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION, | ||
| "WatermarkTimestampAssigner", | ||
| "WatermarkTimestampAssigner", | ||
| config), | ||
| new WatermarkTimestampAssigner(), | ||
| inputTransform.getOutputType(), | ||
| inputTransform.getParallelism(), | ||
| false); |
There was a problem hiding this comment.
Is adding WatermarkTimestampAssigner covered by any tests?
I think it would be also good to have an ITCase with more than one input channels.
c3dbb16 to
2dd2f0d
Compare
rkhachatryan
left a comment
There was a problem hiding this comment.
LGTM
I still have some concerns (around validation, performance, test coverage) but I think we can address those in the follow-up PRs.
second attempt at #27502