diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java index f1aa70a97d953..7ec6477c6d060 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.functions; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; @@ -112,4 +113,30 @@ public interface RichFunction extends Function { * @param t The runtime context. */ void setRuntimeContext(RuntimeContext t); + + /** + * Can be overridden to enable splittable timers for this particular function even if config + * option is enabled. By default, splittable timers are disabled. NOTE: operator that executes + * the function is expected to: + * + *
    + *
  1. Inherit org.apache.flink.streaming.api.operators.AbstractStreamOperator or + * org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2 (or handle watermarks + * similarly) + *
  2. Pass {@link org.apache.flink.api.common.operators.MailboxExecutor MailboxExecutor} to + * it in one of the following ways: + * + *
  3. Not override watermark handling methods or call super + *
+ */ + @Internal + default boolean useInterruptibleTimers() { + return false; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a4f2a7563842b..176379ac794ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -73,7 +73,10 @@ import java.util.Collections; import java.util.Locale; import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** @@ -124,6 +127,8 @@ public abstract class AbstractStreamOperator private transient @Nullable MailboxWatermarkProcessor watermarkProcessor; + private final WatermarkConsumerSupplier watermarkConsumerSupplier; + // ---------------- key/value state ------------------ /** @@ -160,9 +165,17 @@ public abstract class AbstractStreamOperator protected transient RecordAttributes lastRecordAttributes1; protected transient RecordAttributes lastRecordAttributes2; - public AbstractStreamOperator() {} + public AbstractStreamOperator() { + this(null); + } public AbstractStreamOperator(StreamOperatorParameters parameters) { + this(parameters, WatermarkConsumerSupplier.defaultSupplier()); + } + + public AbstractStreamOperator( + StreamOperatorParameters parameters, + WatermarkConsumerSupplier watermarkConsumerSupplier) { if (parameters != null) { setup( parameters.getContainingTask(), @@ -170,7 +183,9 @@ public AbstractStreamOperator(StreamOperatorParameters parameters) { parameters.getOutput()); this.processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService()); + this.mailboxExecutor = parameters.getMailboxExecutor(); } + this.watermarkConsumerSupplier = checkNotNull(watermarkConsumerSupplier); } // ------------------------------------------------------------------------ @@ -381,7 +396,9 @@ && areInterruptibleTimersConfigured() && getTimeServiceManager().isPresent()) { this.watermarkProcessor = new MailboxWatermarkProcessor( - output, mailboxExecutor, getTimeServiceManager().get()); + watermarkConsumerSupplier.apply(output), + mailboxExecutor, + getTimeServiceManager().get()); } } @@ -770,4 +787,43 @@ public void processWatermark1(WatermarkEvent watermark) throws Exception { public void processWatermark2(WatermarkEvent watermark) throws Exception { output.emitWatermark(watermark); } + + public interface WatermarkConsumerSupplier + extends Function>, Consumer>, Serializable { + + static WatermarkConsumerSupplier defaultSupplier() { + return new DirectWatermarkConsumerSupplier<>(); + } + + static WatermarkConsumerSupplier delayedSupplier(long delay) { + return new DelayedWatermarkConsumerSupplier<>(delay); + } + + class DirectWatermarkConsumerSupplier implements WatermarkConsumerSupplier { + private static final long serialVersionUID = 1L; + + @Override + public Consumer apply(Output> output) { + return output::emitWatermark; + } + } + + class DelayedWatermarkConsumerSupplier implements WatermarkConsumerSupplier { + private static final long serialVersionUID = 1L; + + private final long watermarkDelay; + + public DelayedWatermarkConsumerSupplier(long watermarkDelay) { + Preconditions.checkArgument( + watermarkDelay > 0, "The watermark delay should be positive."); + this.watermarkDelay = watermarkDelay; + } + + @Override + public Consumer apply(Output> out) { + return mark -> + out.emitWatermark(new Watermark(mark.getTimestamp() - watermarkDelay)); + } + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index d89eaf4654458..ad06a6e256bfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -107,12 +107,23 @@ public abstract class AbstractStreamOperatorV2 protected final LatencyStats latencyStats; protected final ProcessingTimeService processingTimeService; protected final RecordAttributes[] lastRecordAttributes; + private final AbstractStreamOperator.WatermarkConsumerSupplier watermarkConsumerSupplier; protected StreamOperatorStateHandler stateHandler; protected InternalTimeServiceManager timeServiceManager; private @Nullable MailboxWatermarkProcessor watermarkProcessor; public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int numberOfInputs) { + this( + parameters, + numberOfInputs, + AbstractStreamOperator.WatermarkConsumerSupplier.defaultSupplier()); + } + + public AbstractStreamOperatorV2( + StreamOperatorParameters parameters, + int numberOfInputs, + AbstractStreamOperator.WatermarkConsumerSupplier watermarkConsumerSupplier) { final Environment environment = parameters.getContainingTask().getEnvironment(); config = parameters.getStreamConfig(); output = parameters.getOutput(); @@ -148,6 +159,7 @@ public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int nu environment.getExternalResourceInfoProvider()); mailboxExecutor = parameters.getMailboxExecutor(); + this.watermarkConsumerSupplier = watermarkConsumerSupplier; } private LatencyStats createLatencyStats( @@ -241,7 +253,9 @@ && areInterruptibleTimersConfigured() && getTimeServiceManager().isPresent()) { watermarkProcessor = new MailboxWatermarkProcessor( - output, mailboxExecutor, getTimeServiceManager().get()); + watermarkConsumerSupplier.apply(output), + mailboxExecutor, + getTimeServiceManager().get()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 656bc2ed9414f..38bc21b2db8cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -18,10 +18,12 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.DefaultOpenContext; import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -61,7 +63,14 @@ public AbstractUdfStreamOperator(F userFunction) { } protected AbstractUdfStreamOperator(StreamOperatorParameters parameters, F userFunction) { - super(parameters); + this(parameters, userFunction, WatermarkConsumerSupplier.defaultSupplier()); + } + + protected AbstractUdfStreamOperator( + StreamOperatorParameters parameters, + F userFunction, + WatermarkConsumerSupplier watermarkConsumerSupplier) { + super(parameters, watermarkConsumerSupplier); this.userFunction = requireNonNull(userFunction); checkUdfCheckpointingPreconditions(); } @@ -176,4 +185,10 @@ private void checkUdfCheckpointingPreconditions() { + "CheckpointedFunction AND ListCheckpointed."); } } + + @Internal + public boolean useInterruptibleTimers() { + return userFunction instanceof RichFunction + && ((RichFunction) userFunction).useInterruptibleTimers(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java index fb498f65f07ee..481ea5543982a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Consumer; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -41,7 +43,7 @@ public class MailboxWatermarkProcessor { protected static final Logger LOG = LoggerFactory.getLogger(MailboxWatermarkProcessor.class); - private final Output> output; + private final Consumer output; private final MailboxExecutor mailboxExecutor; private final InternalTimeServiceManager internalTimeServiceManager; @@ -57,6 +59,13 @@ public MailboxWatermarkProcessor( Output> output, MailboxExecutor mailboxExecutor, InternalTimeServiceManager internalTimeServiceManager) { + this(output::emitWatermark, mailboxExecutor, internalTimeServiceManager); + } + + public MailboxWatermarkProcessor( + Consumer output, + MailboxExecutor mailboxExecutor, + InternalTimeServiceManager internalTimeServiceManager) { this.output = checkNotNull(output); this.mailboxExecutor = checkNotNull(mailboxExecutor); this.internalTimeServiceManager = checkNotNull(internalTimeServiceManager); @@ -73,7 +82,7 @@ private void emitWatermarkInsideMailbox() throws Exception { if (internalTimeServiceManager.tryAdvanceWatermark( maxInputWatermark, mailboxExecutor::shouldInterrupt)) { // In case output watermark has fully progressed emit it downstream. - output.emitWatermark(maxInputWatermark); + output.accept(maxInputWatermark); } else if (!progressWatermarkScheduled) { progressWatermarkScheduled = true; // We still have work to do, but we need to let other mails to be processed first. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java index 39eadba89e8a5..9126c4bebdee6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java @@ -54,7 +54,13 @@ public class KeyedCoProcessOperator private transient OnTimerContextImpl onTimerContext; public KeyedCoProcessOperator(KeyedCoProcessFunction keyedCoProcessFunction) { - super(keyedCoProcessFunction); + this(keyedCoProcessFunction, WatermarkConsumerSupplier.defaultSupplier()); + } + + public KeyedCoProcessOperator( + KeyedCoProcessFunction keyedCoProcessFunction, + WatermarkConsumerSupplier watermarkConsumerSupplier) { + super(null, keyedCoProcessFunction, watermarkConsumerSupplier); } @Override diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java index 7552a40e6624b..5b4cd9e6019e6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/KeyedCoProcessOperatorWithWatermarkDelay.java @@ -19,14 +19,7 @@ package org.apache.flink.table.runtime.operators.join; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; -import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.util.Optional; -import java.util.function.Consumer; /** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */ public class KeyedCoProcessOperatorWithWatermarkDelay @@ -34,34 +27,14 @@ public class KeyedCoProcessOperatorWithWatermarkDelay private static final long serialVersionUID = -7435774708099223442L; - private final Consumer emitter; - public KeyedCoProcessOperatorWithWatermarkDelay( KeyedCoProcessFunction flatMapper, long watermarkDelay) { - super(flatMapper); - Preconditions.checkArgument( - watermarkDelay >= 0, "The watermark delay should be non-negative."); - if (watermarkDelay == 0) { - // emits watermark without delay - emitter = - (Consumer & Serializable) - (Watermark mark) -> output.emitWatermark(mark); - } else { - // emits watermark with delay - emitter = - (Consumer & Serializable) - (Watermark mark) -> - output.emitWatermark( - new Watermark(mark.getTimestamp() - watermarkDelay)); - } + super(flatMapper, getWatermarkConsumer(watermarkDelay)); } - @Override - public void processWatermark(Watermark mark) throws Exception { - Optional> timeServiceManager = getTimeServiceManager(); - if (timeServiceManager.isPresent()) { - timeServiceManager.get().advanceWatermark(mark); - } - emitter.accept(mark); + private static WatermarkConsumerSupplier getWatermarkConsumer(long watermarkDelay) { + return watermarkDelay == 0 + ? WatermarkConsumerSupplier.defaultSupplier() + : WatermarkConsumerSupplier.delayedSupplier(watermarkDelay); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java index ba0cee825f572..902a5673543a8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java @@ -484,4 +484,8 @@ private void removeExpiredRows( * @param cleanupTime timestamp for the timer */ abstract void registerTimer(Context ctx, long cleanupTime); + + public boolean useInterruptibleTimers() { + return true; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java index 64e9f6b03b4ec..19a104a652f7e 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java @@ -19,8 +19,12 @@ package org.apache.flink.table.runtime.operators.join.interval; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.Mail; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; @@ -31,9 +35,14 @@ import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE; +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL; +import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_UNALIGNED; +import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.assertj.core.api.Assertions.assertThat; @@ -413,6 +422,93 @@ void testRowTimeFullOuterJoin() throws Exception { testHarness.close(); } + @Test + public void testInterruptibleTimers() throws Exception { + final boolean[] isMailProcessed = new boolean[1]; // e.g. checkpoint mail + + int allowedLateness = 1; + int leftUpperBound = 1; + RowTimeIntervalJoin joinProcessFunc = + new RowTimeIntervalJoin( + FlinkJoinType.FULL, + -1, + leftUpperBound, + allowedLateness, + 0, + rowType, + rowType, + joinFunction, + 0, + 0); + + KeyedTwoInputStreamOperatorTestHarness testHarness = + createTestHarness(joinProcessFunc); + testHarness + .getEnvironment() + .getJobConfiguration() + .set(ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true) + .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10)) // just enable checkpointing + .set(ENABLE_UNALIGNED, true) + .set(CHECKPOINTING_CONSISTENCY_MODE, CheckpointingMode.EXACTLY_ONCE); + + final int operatorMailPriority = 123; + testHarness + .getOperator() + .setMailboxExecutor( + new MailboxExecutorImpl( + testHarness.getTaskMailbox(), + operatorMailPriority, + StreamTaskActionExecutor.IMMEDIATE)); + + testHarness.open(); + + testHarness.processElement1(insertRecord(5L, "k1")); + testHarness.processElement2(insertRecord(6L, "k2")); + testHarness.processElement1(insertRecord(7L, "k3")); + testHarness.processElement2(insertRecord(8L, "k4")); + + testHarness + .getTaskMailbox() + .put(new Mail(() -> isMailProcessed[0] = true, operatorMailPriority, "%s", "test")); + + final int timersBeforeWatermark = testHarness.numEventTimeTimers(); + assertThat(timersBeforeWatermark).isPositive(); + + final int endTime = 99; // should trigger emission of all elements + testHarness.processWatermark1(new Watermark(endTime)); + testHarness.processWatermark2(new Watermark(endTime)); + + final int timersAfterWatermark = testHarness.numEventTimeTimers(); + assertThat(timersAfterWatermark) + .as("On watermark, some timers should be processed, some should be postponed") + .isLessThan(timersBeforeWatermark) + .isNotZero(); + + testHarness.getTaskMailbox().take(operatorMailPriority).run(); + assertThat(isMailProcessed[0]).as("The mail should be processed").isTrue(); + assertThat(testHarness.numEventTimeTimers()) + .as("The number of timers shouldn't change after the 1st mail") + .isEqualTo(timersAfterWatermark); + + // process the remaining timers + for (Mail mail : testHarness.getTaskMailbox().drain()) { + mail.run(); + } + assertThat(testHarness.numEventTimeTimers()) + .as("Eventually, all timers should be processed") + .isZero(); + + final List expectedOutput = new ArrayList<>(); + expectedOutput.add(insertRecord(5L, "k1", null, null)); + expectedOutput.add(insertRecord(null, null, 6L, "k2")); + expectedOutput.add(insertRecord(7L, "k3", null, null)); + expectedOutput.add(insertRecord(null, null, 8L, "k4")); + expectedOutput.add(new Watermark(endTime - allowedLateness - leftUpperBound)); + assertor.assertOutputEquals("Wrong output", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + private KeyedTwoInputStreamOperatorTestHarness createTestHarness(RowTimeIntervalJoin intervalJoinFunc) throws Exception { KeyedCoProcessOperator operator =