diff --git a/docs/content.zh/docs/sql/reference/queries/joins.md b/docs/content.zh/docs/sql/reference/queries/joins.md index 0cf8d669709c4..a1e1db2caab8b 100644 --- a/docs/content.zh/docs/sql/reference/queries/joins.md +++ b/docs/content.zh/docs/sql/reference/queries/joins.md @@ -181,6 +181,8 @@ o_002 12.51 EUR 1.10 12:06:00 这里的 `INTERVAL` 时间减法用于等待后续事件,以确保 join 满足预期。 请确保 join 两边设置了正确的 watermark 。 +**Note:** Probe-side (left) records that arrive late (their event time is less than or equal to the current watermark) are dropped on arrival and counted by the `numLateRecordsDropped` operator metric. They are not joined or emitted, not even as null-padded results for `LEFT JOIN`, because the matching build-side version may already have been cleaned up. + **注意:** 事件时间 temporal join 需要包含主键相等的条件,即:`currency_rates` 表的主键 `currency_rates.currency` 包含在条件 `orders.currency = currency_rates.currency` 中。 与 [regular joins](#regular-joins) 相比,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 的结果也不会被影响。 diff --git a/docs/content/docs/sql/reference/queries/joins.md b/docs/content/docs/sql/reference/queries/joins.md index be99c737d3f58..d000a46b822ce 100644 --- a/docs/content/docs/sql/reference/queries/joins.md +++ b/docs/content/docs/sql/reference/queries/joins.md @@ -186,6 +186,8 @@ o_002 12.51 EUR 1.10 12:06:00 The `INTERVAL` time subtraction is used to wait for late events in order to make sure the join will meet the expectation. Please ensure both sides of the join have set watermark correctly. +**Note:** Probe-side (left) records that arrive late (their event time is less than or equal to the current watermark) are dropped on arrival and counted by the `numLateRecordsDropped` operator metric. They are not joined or emitted, not even as null-padded results for `LEFT JOIN`, because the matching build-side version may already have been cleaned up. + **Note:** The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition, e.g., The primary key `currency_rates.currency` of table `currency_rates` to be constrained in the condition `orders.currency = currency_rates.currency`. In contrast to [regular joins](#regular-joins), the previous temporal table results will not be affected despite the changes on the build side. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java index ed83169c74a80..b8a03ffacf9f9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java @@ -45,7 +45,7 @@ public class TemporalJoinTestPrograms { Row.of(3L, "Euro", "2020-10-10 00:00:45")) .producedAfterRestore( Row.of(1L, "Euro", "2020-10-10 00:00:58"), - Row.of(1L, "USD", "2020-10-10 00:00:58")) + Row.of(1L, "USD", "2020-10-10 00:00:59")) .build(); static final SourceTestStep ORDERS_WITH_NESTED_ID = @@ -88,7 +88,7 @@ public class TemporalJoinTestPrograms { 1L, Row.of("usd"), mapOf("currency", "USD"), - "2020-10-10 00:00:58")) + "2020-10-10 00:00:59")) .build(); static final SourceTestStep RATES = diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java index 940e15c2b4d03..71ed75b5970df 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -61,9 +62,14 @@ * idea is that between watermarks we are collecting those elements and once we are sure that there * will be no updates we emit the correct result and clean up the expired data in state. * - *

Cleaning up the state drops all of the "old" values from the probe side, where "old" is - * defined as older then the current watermark. Build side is also cleaned up in the similar - * fashion, however we always keep at least one record - the latest one - even if it's past the last + *

Probe-side records that arrive late (their event time is less than or equal to the current + * watermark) are dropped on arrival and counted via the {@code numLateRecordsDropped} metric; they + * are not joined or emitted (not even as null-padded results for left outer joins), because the + * matching build-side version may already have been cleaned up. + * + *

Cleaning up the state drops all the "old" values from the probe side, where "old" is defined + * as older than the current watermark. Build side is also cleaned up in the similar fashion, + * however we always keep at least one record - the latest one - even if it's past the last * watermark. * *

One more trick is how the emitting results and cleaning up is triggered. It is achieved by @@ -84,6 +90,7 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS private static final String RIGHT_STATE_NAME = "right"; private static final String REGISTERED_TIMER_STATE_NAME = "timer"; private static final String TIMERS_STATE_NAME = "timers"; + private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped"; private final boolean isLeftOuterJoin; private final InternalTypeInfo leftType; @@ -123,6 +130,8 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS private transient JoinedRowData outRow; private transient GenericRowData rightNullRow; + private transient Counter numLateRecordsDropped; + public TemporalRowTimeJoinOperator( InternalTypeInfo leftType, InternalTypeInfo rightType, @@ -174,13 +183,23 @@ public void open() throws Exception { outRow = new JoinedRowData(); rightNullRow = new GenericRowData(rightType.toRowType().getFieldCount()); collector = new TimestampedCollector<>(output); + + numLateRecordsDropped = + getRuntimeContext().getMetricGroup().counter(LATE_ELEMENTS_DROPPED_METRIC_NAME); } @Override public void processElement1(StreamRecord element) throws Exception { RowData row = element.getValue(); + long leftTime = getLeftTime(row); + if (leftTime <= timerService.currentWatermark()) { + // The probe-side record is late. Drop it, because the matching build-side version may + // already have been cleaned up. + numLateRecordsDropped.inc(); + return; + } leftState.put(getNextLeftIndex(), row); - registerSmallestTimer(getLeftTime(row)); // Timer to emit and clean up the state + registerSmallestTimer(leftTime); // Timer to emit and clean up the state registerProcessingCleanupTimer(); } @@ -441,4 +460,9 @@ static String getNextLeftIndexStateName() { static String getRegisteredTimerStateName() { return REGISTERED_TIMER_STATE_NAME; } + + @VisibleForTesting + Counter getNumLateRecordsDropped() { + return numLateRecordsDropped; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java index bff3b8ae73aaf..2a54ac02fdcfb 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java @@ -39,9 +39,9 @@ class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase { /** Test rowtime temporal join. */ @Test - void testRowTimeTemporalJoin() throws Exception { + void testRowTimeInnerTemporalJoin() throws Exception { List expectedOutput = new ArrayList<>(); - expectedOutput.add(new Watermark(1)); + expectedOutput.add(new Watermark(0)); expectedOutput.add(new Watermark(2)); expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2")); expectedOutput.add(new Watermark(5)); @@ -54,14 +54,12 @@ void testRowTimeTemporalJoin() throws Exception { testRowTimeTemporalJoin(false, expectedOutput); } - /** Test rowtime left temporal join. */ @Test void testRowTimeLeftTemporalJoin() throws Exception { List expectedOutput = new ArrayList<>(); - expectedOutput.add(new Watermark(1)); + expectedOutput.add(new Watermark(0)); expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null)); expectedOutput.add(new Watermark(2)); - expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null)); expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2")); expectedOutput.add(new Watermark(5)); expectedOutput.add(insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4")); @@ -84,8 +82,8 @@ private void testRowTimeTemporalJoin(boolean isLeftOuterJoin, List expec testHarness.open(); - testHarness.processWatermark1(new Watermark(1)); - testHarness.processWatermark2(new Watermark(1)); + testHarness.processWatermark1(new Watermark(0)); + testHarness.processWatermark2(new Watermark(0)); testHarness.processElement1(insertRecord(1L, "k1", "1a1")); testHarness.processElement2(insertRecord(2L, "k1", "1a2")); @@ -93,7 +91,6 @@ private void testRowTimeTemporalJoin(boolean isLeftOuterJoin, List expec testHarness.processWatermark1(new Watermark(2)); testHarness.processWatermark2(new Watermark(2)); - testHarness.processElement1(insertRecord(1L, "k1", "1a1")); testHarness.processElement1(insertRecord(3L, "k1", "1a3")); testHarness.processElement2(insertRecord(4L, "k2", "2a4")); @@ -194,9 +191,9 @@ void testRowTimeTemporalJoinWithStateRetention() throws Exception { } @Test - void testRowTimeTemporalJoinOnUpsertSource() throws Exception { + void testRowTimeInnerTemporalJoinOnUpsertSource() throws Exception { List expectedOutput = new ArrayList<>(); - expectedOutput.add(new Watermark(1)); + expectedOutput.add(new Watermark(0)); expectedOutput.add(new Watermark(2)); expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", "1a2")); expectedOutput.add(new Watermark(5)); @@ -212,7 +209,7 @@ void testRowTimeTemporalJoinOnUpsertSource() throws Exception { @Test void testRowTimeLeftTemporalJoinOnUpsertSource() throws Exception { List expectedOutput = new ArrayList<>(); - expectedOutput.add(new Watermark(1)); + expectedOutput.add(new Watermark(0)); expectedOutput.add(insertRecord(1L, "k1", "1a1", null, null, null)); expectedOutput.add(new Watermark(2)); expectedOutput.add(updateAfterRecord(3L, "k1", "1a3", 2L, "k1", "1a2")); @@ -237,8 +234,8 @@ private void testRowTimeTemporalJoinOnUpsertSource( testHarness.open(); - testHarness.processWatermark1(new Watermark(1)); - testHarness.processWatermark2(new Watermark(1)); + testHarness.processWatermark1(new Watermark(0)); + testHarness.processWatermark2(new Watermark(0)); testHarness.processElement1(insertRecord(1L, "k1", "1a1")); testHarness.processElement2(insertRecord(2L, "k1", "1a2")); @@ -270,6 +267,96 @@ private void testRowTimeTemporalJoinOnUpsertSource( testHarness.close(); } + @Test + void testRowTimeInnerTemporalJoinLateRecords() throws Exception { + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2")); + expectedOutput.add(new Watermark(5)); + expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2")); + expectedOutput.add(new Watermark(8)); + expectedOutput.add(new Watermark(11)); + expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9")); + expectedOutput.add(new Watermark(13)); + expectedOutput.add(new Watermark(15)); + + testRowTimeTemporalJoinLateRecords(false, expectedOutput); + } + + @Test + void testRowTimeLeftTemporalJoinLateRecords() throws Exception { + List expectedOutput = new ArrayList<>(); + expectedOutput.add(new Watermark(1)); + expectedOutput.add(insertRecord(3L, "k1", "1a3", 2L, "k1", "2a2")); + expectedOutput.add(new Watermark(5)); + expectedOutput.add(insertRecord(7L, "k1", "1a7", 2L, "k1", "2a2")); + expectedOutput.add(new Watermark(8)); + expectedOutput.add(insertRecord(10L, "k2", "1a10", null, null, null)); + expectedOutput.add(new Watermark(11)); + expectedOutput.add(insertRecord(13L, "k2", "1a13", 9L, "k2", "2a9")); + expectedOutput.add(new Watermark(13)); + expectedOutput.add(new Watermark(15)); + + testRowTimeTemporalJoinLateRecords(true, expectedOutput); + } + + /** + * Verifies that probe-side records whose event time is less than or equal to the current + * watermark are dropped on arrival: they are not joined, not emitted (even with a left outer + * join), and are counted in the {@code numLateRecordsDropped} metric. + */ + private void testRowTimeTemporalJoinLateRecords( + boolean isLeftOuter, List expectedOutput) throws Exception { + TemporalRowTimeJoinOperator joinOperator = + new TemporalRowTimeJoinOperator( + rowType, rowType, joinCondition, 0, 0, 0, 0, isLeftOuter); + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(joinOperator); + + testHarness.open(); + + // initialize watermark to 1 + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + + // Establish a build-side version at time 2 and a non-late probe record at time 3. + testHarness.processElement2(insertRecord(2L, "k1", "2a2")); + testHarness.processElement1(insertRecord(3L, "k1", "1a3")); + testHarness.processWatermark1(new Watermark(5)); + testHarness.processWatermark2(new Watermark(5)); + + // After Watermark(5), any probe record with leftTime <= 5 is late and must be dropped. + testHarness.processElement1(insertRecord(5L, "k1", "1a5")); // leftTime == watermark + testHarness.processElement1(insertRecord(4L, "k1", "1a4")); // leftTime < watermark + testHarness.processElement1(insertRecord(1L, "k1", "1a1")); // leftTime << watermark + // A non-late probe record should still be processed. + testHarness.processElement1(insertRecord(7L, "k1", "1a7")); + testHarness.processWatermark1(new Watermark(8)); + testHarness.processWatermark2(new Watermark(8)); + + // A record for late retraction + testHarness.processElement1(insertRecord(10L, "k2", "1a10")); + testHarness.processWatermark1(new Watermark(11)); + testHarness.processWatermark2(new Watermark(11)); + + // Add a late retraction and a late build-side record + testHarness.processElement1(insertRecord(13L, "k2", "1a13")); + testHarness.processElement2(insertRecord(9L, "k2", "2a9")); + testHarness.processElement1(deleteRecord(10L, "k2", "1a10")); // late -> dropped + testHarness.processWatermark1(new Watermark(13)); + testHarness.processWatermark2(new Watermark(13)); + + // Another late retraction + testHarness.processElement1(deleteRecord(13L, "k2", "1a13")); + testHarness.processWatermark1(new Watermark(15)); + testHarness.processWatermark2(new Watermark(15)); + + assertor.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); + assertThat(joinOperator.getNumLateRecordsDropped().getCount()).isEqualTo(5L); + + testHarness.close(); + } + private KeyedTwoInputStreamOperatorTestHarness createTestHarness(TemporalRowTimeJoinOperator temporalJoinOperator) throws Exception {