diff --git a/docs/content.zh/docs/sql/reference/queries/joins.md b/docs/content.zh/docs/sql/reference/queries/joins.md
index 0cf8d669709c4..a1e1db2caab8b 100644
--- a/docs/content.zh/docs/sql/reference/queries/joins.md
+++ b/docs/content.zh/docs/sql/reference/queries/joins.md
@@ -181,6 +181,8 @@ o_002 12.51 EUR 1.10 12:06:00
这里的 `INTERVAL` 时间减法用于等待后续事件,以确保 join 满足预期。
请确保 join 两边设置了正确的 watermark 。
+**Note:** Probe-side (left) records that arrive late (their event time is less than or equal to the current watermark) are dropped on arrival and counted by the `numLateRecordsDropped` operator metric. They are not joined or emitted, not even as null-padded results for `LEFT JOIN`, because the matching build-side version may already have been cleaned up.
+
**注意:** 事件时间 temporal join 需要包含主键相等的条件,即:`currency_rates` 表的主键 `currency_rates.currency` 包含在条件 `orders.currency = currency_rates.currency` 中。
与 [regular joins](#regular-joins) 相比,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 的结果也不会被影响。
diff --git a/docs/content/docs/sql/reference/queries/joins.md b/docs/content/docs/sql/reference/queries/joins.md
index be99c737d3f58..d000a46b822ce 100644
--- a/docs/content/docs/sql/reference/queries/joins.md
+++ b/docs/content/docs/sql/reference/queries/joins.md
@@ -186,6 +186,8 @@ o_002 12.51 EUR 1.10 12:06:00
The `INTERVAL` time subtraction is used to wait for late events in order to make sure the join will meet the expectation.
Please ensure both sides of the join have set watermark correctly.
+**Note:** Probe-side (left) records that arrive late (their event time is less than or equal to the current watermark) are dropped on arrival and counted by the `numLateRecordsDropped` operator metric. They are not joined or emitted, not even as null-padded results for `LEFT JOIN`, because the matching build-side version may already have been cleaned up.
+
**Note:** The event-time temporal join requires the primary key contained in the equivalence condition of the temporal join condition, e.g., The primary key `currency_rates.currency` of table `currency_rates` to be constrained in the condition `orders.currency = currency_rates.currency`.
In contrast to [regular joins](#regular-joins), the previous temporal table results will not be affected despite the changes on the build side.
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
index ed83169c74a80..b8a03ffacf9f9 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinTestPrograms.java
@@ -45,7 +45,7 @@ public class TemporalJoinTestPrograms {
Row.of(3L, "Euro", "2020-10-10 00:00:45"))
.producedAfterRestore(
Row.of(1L, "Euro", "2020-10-10 00:00:58"),
- Row.of(1L, "USD", "2020-10-10 00:00:58"))
+ Row.of(1L, "USD", "2020-10-10 00:00:59"))
.build();
static final SourceTestStep ORDERS_WITH_NESTED_ID =
@@ -88,7 +88,7 @@ public class TemporalJoinTestPrograms {
1L,
Row.of("usd"),
mapOf("currency", "USD"),
- "2020-10-10 00:00:58"))
+ "2020-10-10 00:00:59"))
.build();
static final SourceTestStep RATES =
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index 940e15c2b4d03..71ed75b5970df 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -25,6 +25,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -61,9 +62,14 @@
* idea is that between watermarks we are collecting those elements and once we are sure that there
* will be no updates we emit the correct result and clean up the expired data in state.
*
- *
Cleaning up the state drops all of the "old" values from the probe side, where "old" is
- * defined as older then the current watermark. Build side is also cleaned up in the similar
- * fashion, however we always keep at least one record - the latest one - even if it's past the last
+ *
Probe-side records that arrive late (their event time is less than or equal to the current
+ * watermark) are dropped on arrival and counted via the {@code numLateRecordsDropped} metric; they
+ * are not joined or emitted (not even as null-padded results for left outer joins), because the
+ * matching build-side version may already have been cleaned up.
+ *
+ *
Cleaning up the state drops all the "old" values from the probe side, where "old" is defined
+ * as older than the current watermark. Build side is also cleaned up in the similar fashion,
+ * however we always keep at least one record - the latest one - even if it's past the last
* watermark.
*
*
One more trick is how the emitting results and cleaning up is triggered. It is achieved by
@@ -84,6 +90,7 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
private static final String RIGHT_STATE_NAME = "right";
private static final String REGISTERED_TIMER_STATE_NAME = "timer";
private static final String TIMERS_STATE_NAME = "timers";
+ private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
private final boolean isLeftOuterJoin;
private final InternalTypeInfo leftType;
@@ -123,6 +130,8 @@ public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithS
private transient JoinedRowData outRow;
private transient GenericRowData rightNullRow;
+ private transient Counter numLateRecordsDropped;
+
public TemporalRowTimeJoinOperator(
InternalTypeInfo leftType,
InternalTypeInfo rightType,
@@ -174,13 +183,23 @@ public void open() throws Exception {
outRow = new JoinedRowData();
rightNullRow = new GenericRowData(rightType.toRowType().getFieldCount());
collector = new TimestampedCollector<>(output);
+
+ numLateRecordsDropped =
+ getRuntimeContext().getMetricGroup().counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
}
@Override
public void processElement1(StreamRecord element) throws Exception {
RowData row = element.getValue();
+ long leftTime = getLeftTime(row);
+ if (leftTime <= timerService.currentWatermark()) {
+ // The probe-side record is late. Drop it, because the matching build-side version may
+ // already have been cleaned up.
+ numLateRecordsDropped.inc();
+ return;
+ }
leftState.put(getNextLeftIndex(), row);
- registerSmallestTimer(getLeftTime(row)); // Timer to emit and clean up the state
+ registerSmallestTimer(leftTime); // Timer to emit and clean up the state
registerProcessingCleanupTimer();
}
@@ -441,4 +460,9 @@ static String getNextLeftIndexStateName() {
static String getRegisteredTimerStateName() {
return REGISTERED_TIMER_STATE_NAME;
}
+
+ @VisibleForTesting
+ Counter getNumLateRecordsDropped() {
+ return numLateRecordsDropped;
+ }
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
index bff3b8ae73aaf..2a54ac02fdcfb 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.java
@@ -39,9 +39,9 @@
class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase {
/** Test rowtime temporal join. */
@Test
- void testRowTimeTemporalJoin() throws Exception {
+ void testRowTimeInnerTemporalJoin() throws Exception {
List