diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
index f1aa70a97d953..7ec6477c6d060 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.functions;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
@@ -112,4 +113,30 @@ public interface RichFunction extends Function {
* @param t The runtime context.
*/
void setRuntimeContext(RuntimeContext t);
+
+ /**
+ * Can be overridden to enable splittable timers for this particular function even if config
+ * option is enabled. By default, splittable timers are disabled. NOTE: operator that executes
+ * the function is expected to:
+ *
+ *
+ * Inherit org.apache.flink.streaming.api.operators.AbstractStreamOperator or
+ * org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2 (or handle watermarks
+ * similarly)
+ * Pass {@link org.apache.flink.api.common.operators.MailboxExecutor MailboxExecutor} to
+ * it in one of the following ways:
+ *
+ * Pass org.apache.flink.streaming.api.operators.StreamOperatorParameters to super
+ * constructor
+ * Set via
+ * org.apache.flink.streaming.api.operators.YieldingOperator.setMailboxExecutor
+ * Set via org.apache.flink.streaming.api.operators.YieldingOperatorFactory
+ *
+ * Not override watermark handling methods or call super
+ *
+ */
+ @Internal
+ default boolean useInterruptibleTimers() {
+ return false;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a4f2a7563842b..176379ac794ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -73,7 +73,10 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -124,6 +127,8 @@ public abstract class AbstractStreamOperator
private transient @Nullable MailboxWatermarkProcessor watermarkProcessor;
+ private final WatermarkConsumerSupplier watermarkConsumerSupplier;
+
// ---------------- key/value state ------------------
/**
@@ -160,9 +165,17 @@ public abstract class AbstractStreamOperator
protected transient RecordAttributes lastRecordAttributes1;
protected transient RecordAttributes lastRecordAttributes2;
- public AbstractStreamOperator() {}
+ public AbstractStreamOperator() {
+ this(null);
+ }
public AbstractStreamOperator(StreamOperatorParameters parameters) {
+ this(parameters, WatermarkConsumerSupplier.defaultSupplier());
+ }
+
+ public AbstractStreamOperator(
+ StreamOperatorParameters parameters,
+ WatermarkConsumerSupplier watermarkConsumerSupplier) {
if (parameters != null) {
setup(
parameters.getContainingTask(),
@@ -170,7 +183,9 @@ public AbstractStreamOperator(StreamOperatorParameters parameters) {
parameters.getOutput());
this.processingTimeService =
Preconditions.checkNotNull(parameters.getProcessingTimeService());
+ this.mailboxExecutor = parameters.getMailboxExecutor();
}
+ this.watermarkConsumerSupplier = checkNotNull(watermarkConsumerSupplier);
}
// ------------------------------------------------------------------------
@@ -381,7 +396,9 @@ && areInterruptibleTimersConfigured()
&& getTimeServiceManager().isPresent()) {
this.watermarkProcessor =
new MailboxWatermarkProcessor(
- output, mailboxExecutor, getTimeServiceManager().get());
+ watermarkConsumerSupplier.apply(output),
+ mailboxExecutor,
+ getTimeServiceManager().get());
}
}
@@ -770,4 +787,43 @@ public void processWatermark1(WatermarkEvent watermark) throws Exception {
public void processWatermark2(WatermarkEvent watermark) throws Exception {
output.emitWatermark(watermark);
}
+
+ public interface WatermarkConsumerSupplier
+ extends Function>, Consumer>, Serializable {
+
+ static WatermarkConsumerSupplier defaultSupplier() {
+ return new DirectWatermarkConsumerSupplier<>();
+ }
+
+ static WatermarkConsumerSupplier delayedSupplier(long delay) {
+ return new DelayedWatermarkConsumerSupplier<>(delay);
+ }
+
+ class DirectWatermarkConsumerSupplier implements WatermarkConsumerSupplier {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Consumer apply(Output> output) {
+ return output::emitWatermark;
+ }
+ }
+
+ class DelayedWatermarkConsumerSupplier implements WatermarkConsumerSupplier {
+ private static final long serialVersionUID = 1L;
+
+ private final long watermarkDelay;
+
+ public DelayedWatermarkConsumerSupplier(long watermarkDelay) {
+ Preconditions.checkArgument(
+ watermarkDelay > 0, "The watermark delay should be positive.");
+ this.watermarkDelay = watermarkDelay;
+ }
+
+ @Override
+ public Consumer apply(Output> out) {
+ return mark ->
+ out.emitWatermark(new Watermark(mark.getTimestamp() - watermarkDelay));
+ }
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index d89eaf4654458..ad06a6e256bfd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -107,12 +107,23 @@ public abstract class AbstractStreamOperatorV2
protected final LatencyStats latencyStats;
protected final ProcessingTimeService processingTimeService;
protected final RecordAttributes[] lastRecordAttributes;
+ private final AbstractStreamOperator.WatermarkConsumerSupplier watermarkConsumerSupplier;
protected StreamOperatorStateHandler stateHandler;
protected InternalTimeServiceManager> timeServiceManager;
private @Nullable MailboxWatermarkProcessor watermarkProcessor;
public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int numberOfInputs) {
+ this(
+ parameters,
+ numberOfInputs,
+ AbstractStreamOperator.WatermarkConsumerSupplier.defaultSupplier());
+ }
+
+ public AbstractStreamOperatorV2(
+ StreamOperatorParameters parameters,
+ int numberOfInputs,
+ AbstractStreamOperator.WatermarkConsumerSupplier watermarkConsumerSupplier) {
final Environment environment = parameters.getContainingTask().getEnvironment();
config = parameters.getStreamConfig();
output = parameters.getOutput();
@@ -148,6 +159,7 @@ public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int nu
environment.getExternalResourceInfoProvider());
mailboxExecutor = parameters.getMailboxExecutor();
+ this.watermarkConsumerSupplier = watermarkConsumerSupplier;
}
private LatencyStats createLatencyStats(
@@ -241,7 +253,9 @@ && areInterruptibleTimersConfigured()
&& getTimeServiceManager().isPresent()) {
watermarkProcessor =
new MailboxWatermarkProcessor(
- output, mailboxExecutor, getTimeServiceManager().get());
+ watermarkConsumerSupplier.apply(output),
+ mailboxExecutor,
+ getTimeServiceManager().get());
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 656bc2ed9414f..38bc21b2db8cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -18,10 +18,12 @@
package org.apache.flink.streaming.api.operators;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -61,7 +63,14 @@ public AbstractUdfStreamOperator(F userFunction) {
}
protected AbstractUdfStreamOperator(StreamOperatorParameters parameters, F userFunction) {
- super(parameters);
+ this(parameters, userFunction, WatermarkConsumerSupplier.defaultSupplier());
+ }
+
+ protected AbstractUdfStreamOperator(
+ StreamOperatorParameters parameters,
+ F userFunction,
+ WatermarkConsumerSupplier watermarkConsumerSupplier) {
+ super(parameters, watermarkConsumerSupplier);
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
@@ -176,4 +185,10 @@ private void checkUdfCheckpointingPreconditions() {
+ "CheckpointedFunction AND ListCheckpointed.");
}
}
+
+ @Internal
+ public boolean useInterruptibleTimers() {
+ return userFunction instanceof RichFunction
+ && ((RichFunction) userFunction).useInterruptibleTimers();
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
index fb498f65f07ee..481ea5543982a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
@@ -26,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.function.Consumer;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -41,7 +43,7 @@
public class MailboxWatermarkProcessor {
protected static final Logger LOG = LoggerFactory.getLogger(MailboxWatermarkProcessor.class);
- private final Output> output;
+ private final Consumer output;
private final MailboxExecutor mailboxExecutor;
private final InternalTimeServiceManager> internalTimeServiceManager;
@@ -57,6 +59,13 @@ public MailboxWatermarkProcessor(
Output> output,
MailboxExecutor mailboxExecutor,
InternalTimeServiceManager> internalTimeServiceManager) {
+ this(output::emitWatermark, mailboxExecutor, internalTimeServiceManager);
+ }
+
+ public MailboxWatermarkProcessor(
+ Consumer output,
+ MailboxExecutor mailboxExecutor,
+ InternalTimeServiceManager> internalTimeServiceManager) {
this.output = checkNotNull(output);
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.internalTimeServiceManager = checkNotNull(internalTimeServiceManager);
@@ -73,7 +82,7 @@ private void emitWatermarkInsideMailbox() throws Exception {
if (internalTimeServiceManager.tryAdvanceWatermark(
maxInputWatermark, mailboxExecutor::shouldInterrupt)) {
// In case output watermark has fully progressed emit it downstream.
- output.emitWatermark(maxInputWatermark);
+ output.accept(maxInputWatermark);
} else if (!progressWatermarkScheduled) {
progressWatermarkScheduled = true;
// We still have work to do, but we need to let other mails to be processed first.
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
index 39eadba89e8a5..9126c4bebdee6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -54,7 +54,13 @@ public class KeyedCoProcessOperator
private transient OnTimerContextImpl onTimerContext;
public KeyedCoProcessOperator(KeyedCoProcessFunction keyedCoProcessFunction) {
- super(keyedCoProcessFunction);
+ this(keyedCoProcessFunction, WatermarkConsumerSupplier.defaultSupplier());
+ }
+
+ public KeyedCoProcessOperator(
+ KeyedCoProcessFunction keyedCoProcessFunction,
+ WatermarkConsumerSupplier watermarkConsumerSupplier) {
+ super(null, keyedCoProcessFunction, watermarkConsumerSupplier);
}
@Override
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java
index 7552a40e6624b..5b4cd9e6019e6 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java
@@ -19,14 +19,7 @@
package org.apache.flink.table.runtime.operators.join;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.util.Optional;
-import java.util.function.Consumer;
/** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */
public class KeyedCoProcessOperatorWithWatermarkDelay
@@ -34,34 +27,14 @@ public class KeyedCoProcessOperatorWithWatermarkDelay
private static final long serialVersionUID = -7435774708099223442L;
- private final Consumer emitter;
-
public KeyedCoProcessOperatorWithWatermarkDelay(
KeyedCoProcessFunction flatMapper, long watermarkDelay) {
- super(flatMapper);
- Preconditions.checkArgument(
- watermarkDelay >= 0, "The watermark delay should be non-negative.");
- if (watermarkDelay == 0) {
- // emits watermark without delay
- emitter =
- (Consumer & Serializable)
- (Watermark mark) -> output.emitWatermark(mark);
- } else {
- // emits watermark with delay
- emitter =
- (Consumer & Serializable)
- (Watermark mark) ->
- output.emitWatermark(
- new Watermark(mark.getTimestamp() - watermarkDelay));
- }
+ super(flatMapper, getWatermarkConsumer(watermarkDelay));
}
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- Optional> timeServiceManager = getTimeServiceManager();
- if (timeServiceManager.isPresent()) {
- timeServiceManager.get().advanceWatermark(mark);
- }
- emitter.accept(mark);
+ private static WatermarkConsumerSupplier getWatermarkConsumer(long watermarkDelay) {
+ return watermarkDelay == 0
+ ? WatermarkConsumerSupplier.defaultSupplier()
+ : WatermarkConsumerSupplier.delayedSupplier(watermarkDelay);
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java
index ba0cee825f572..902a5673543a8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java
@@ -484,4 +484,8 @@ private void removeExpiredRows(
* @param cleanupTime timestamp for the timer
*/
abstract void registerTimer(Context ctx, long cleanupTime);
+
+ public boolean useInterruptibleTimers() {
+ return true;
+ }
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java
index 64e9f6b03b4ec..19a104a652f7e 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java
@@ -19,8 +19,12 @@
package org.apache.flink.table.runtime.operators.join.interval;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -31,9 +35,14 @@
import org.junit.jupiter.api.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE;
+import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_UNALIGNED;
+import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS;
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
import static org.assertj.core.api.Assertions.assertThat;
@@ -413,6 +422,93 @@ void testRowTimeFullOuterJoin() throws Exception {
testHarness.close();
}
+ @Test
+ public void testInterruptibleTimers() throws Exception {
+ final boolean[] isMailProcessed = new boolean[1]; // e.g. checkpoint mail
+
+ int allowedLateness = 1;
+ int leftUpperBound = 1;
+ RowTimeIntervalJoin joinProcessFunc =
+ new RowTimeIntervalJoin(
+ FlinkJoinType.FULL,
+ -1,
+ leftUpperBound,
+ allowedLateness,
+ 0,
+ rowType,
+ rowType,
+ joinFunction,
+ 0,
+ 0);
+
+ KeyedTwoInputStreamOperatorTestHarness testHarness =
+ createTestHarness(joinProcessFunc);
+ testHarness
+ .getEnvironment()
+ .getJobConfiguration()
+ .set(ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true)
+ .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10)) // just enable checkpointing
+ .set(ENABLE_UNALIGNED, true)
+ .set(CHECKPOINTING_CONSISTENCY_MODE, CheckpointingMode.EXACTLY_ONCE);
+
+ final int operatorMailPriority = 123;
+ testHarness
+ .getOperator()
+ .setMailboxExecutor(
+ new MailboxExecutorImpl(
+ testHarness.getTaskMailbox(),
+ operatorMailPriority,
+ StreamTaskActionExecutor.IMMEDIATE));
+
+ testHarness.open();
+
+ testHarness.processElement1(insertRecord(5L, "k1"));
+ testHarness.processElement2(insertRecord(6L, "k2"));
+ testHarness.processElement1(insertRecord(7L, "k3"));
+ testHarness.processElement2(insertRecord(8L, "k4"));
+
+ testHarness
+ .getTaskMailbox()
+ .put(new Mail(() -> isMailProcessed[0] = true, operatorMailPriority, "%s", "test"));
+
+ final int timersBeforeWatermark = testHarness.numEventTimeTimers();
+ assertThat(timersBeforeWatermark).isPositive();
+
+ final int endTime = 99; // should trigger emission of all elements
+ testHarness.processWatermark1(new Watermark(endTime));
+ testHarness.processWatermark2(new Watermark(endTime));
+
+ final int timersAfterWatermark = testHarness.numEventTimeTimers();
+ assertThat(timersAfterWatermark)
+ .as("On watermark, some timers should be processed, some should be postponed")
+ .isLessThan(timersBeforeWatermark)
+ .isNotZero();
+
+ testHarness.getTaskMailbox().take(operatorMailPriority).run();
+ assertThat(isMailProcessed[0]).as("The mail should be processed").isTrue();
+ assertThat(testHarness.numEventTimeTimers())
+ .as("The number of timers shouldn't change after the 1st mail")
+ .isEqualTo(timersAfterWatermark);
+
+ // process the remaining timers
+ for (Mail mail : testHarness.getTaskMailbox().drain()) {
+ mail.run();
+ }
+ assertThat(testHarness.numEventTimeTimers())
+ .as("Eventually, all timers should be processed")
+ .isZero();
+
+ final List expectedOutput = new ArrayList<>();
+ expectedOutput.add(insertRecord(5L, "k1", null, null));
+ expectedOutput.add(insertRecord(null, null, 6L, "k2"));
+ expectedOutput.add(insertRecord(7L, "k3", null, null));
+ expectedOutput.add(insertRecord(null, null, 8L, "k4"));
+ expectedOutput.add(new Watermark(endTime - allowedLateness - leftUpperBound));
+ assertor.assertOutputEquals("Wrong output", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
private KeyedTwoInputStreamOperatorTestHarness
createTestHarness(RowTimeIntervalJoin intervalJoinFunc) throws Exception {
KeyedCoProcessOperator operator =