diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java new file mode 100644 index 0000000000000..894f6365206b8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A synchronous {@link ComponentMainThreadExecutor} that executes tasks directly on the calling + * thread without performing strict thread identity checks. + * + *

Unlike {@link ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does + * not assert that the current thread is the main thread, avoiding flaky test failures when {@link + * java.util.concurrent.CompletableFuture} callbacks are dispatched from background threads. + */ +public class NoMainThreadCheckComponentMainThreadExecutor implements ComponentMainThreadExecutor { + + private final DirectScheduledExecutorService executor = new DirectScheduledExecutorService(); + + @Override + public void assertRunningInMainThread() { + // No-op: Skip thread assertion to avoid flaky test failures + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return executor.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return executor.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return executor.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java index aa27bb2f907e6..2ceddc7176b9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java @@ -18,18 +18,13 @@ package org.apache.flink.runtime.operators.coordination; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker; -import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; -import org.apache.flink.util.concurrent.ScheduledExecutor; -import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; - -import javax.annotation.Nonnull; import java.io.IOException; import java.util.ArrayList; @@ -39,8 +34,6 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; @@ -242,48 +235,4 @@ public List getTaskFailoverReasons() { return taskFailoverReasons; } } - - /** - * An implementation of {@link ComponentMainThreadExecutor} that executes Runnables with a - * wrapped {@link ScheduledExecutor} and disables {@link #assertRunningInMainThread()} checks. - */ - private static class NoMainThreadCheckComponentMainThreadExecutor - implements ComponentMainThreadExecutor { - private final ScheduledExecutor scheduledExecutor; - - private NoMainThreadCheckComponentMainThreadExecutor() { - this.scheduledExecutor = - new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return scheduledExecutor.schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return scheduledExecutor.schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, delay, unit); - } - - @Override - public void assertRunningInMainThread() {} - - @Override - public void execute(@Nonnull Runnable command) { - scheduledExecutor.execute(command); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java index baab5227f09d7..988f90b6d5786 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.Execution; @@ -111,7 +111,7 @@ public static DefaultScheduler createAndInitScheduler( final JobGraph jobGraph = createJobGraph(jobVertices, jobConfiguration); final ComponentMainThreadExecutor mainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forMainThread(); + new NoMainThreadCheckComponentMainThreadExecutor(); DefaultSchedulerBuilder schedulerBuilder = new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutorService)