diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
new file mode 100644
index 0000000000000..894f6365206b8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/NoMainThreadCheckComponentMainThreadExecutor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A synchronous {@link ComponentMainThreadExecutor} that executes tasks directly on the calling
+ * thread without performing strict thread identity checks.
+ *
+ *
Unlike {@link ComponentMainThreadExecutorServiceAdapter#forMainThread()}, this executor does
+ * not assert that the current thread is the main thread, avoiding flaky test failures when {@link
+ * java.util.concurrent.CompletableFuture} callbacks are dispatched from background threads.
+ */
+public class NoMainThreadCheckComponentMainThreadExecutor implements ComponentMainThreadExecutor {
+
+ private final DirectScheduledExecutorService executor = new DirectScheduledExecutorService();
+
+ @Override
+ public void assertRunningInMainThread() {
+ // No-op: Skip thread assertion to avoid flaky test failures
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return executor.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ return executor.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return executor.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index aa27bb2f907e6..2ceddc7176b9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -18,18 +18,13 @@
package org.apache.flink.runtime.operators.coordination;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
-import org.apache.flink.util.concurrent.ScheduledExecutor;
-import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-
-import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.ArrayList;
@@ -39,8 +34,6 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -242,48 +235,4 @@ public List getTaskFailoverReasons() {
return taskFailoverReasons;
}
}
-
- /**
- * An implementation of {@link ComponentMainThreadExecutor} that executes Runnables with a
- * wrapped {@link ScheduledExecutor} and disables {@link #assertRunningInMainThread()} checks.
- */
- private static class NoMainThreadCheckComponentMainThreadExecutor
- implements ComponentMainThreadExecutor {
- private final ScheduledExecutor scheduledExecutor;
-
- private NoMainThreadCheckComponentMainThreadExecutor() {
- this.scheduledExecutor =
- new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService());
- }
-
- @Override
- public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
- return scheduledExecutor.schedule(command, delay, unit);
- }
-
- @Override
- public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
- return scheduledExecutor.schedule(callable, delay, unit);
- }
-
- @Override
- public ScheduledFuture> scheduleAtFixedRate(
- Runnable command, long initialDelay, long period, TimeUnit unit) {
- return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
- }
-
- @Override
- public ScheduledFuture> scheduleWithFixedDelay(
- Runnable command, long initialDelay, long delay, TimeUnit unit) {
- return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, delay, unit);
- }
-
- @Override
- public void assertRunningInMainThread() {}
-
- @Override
- public void execute(@Nonnull Runnable command) {
- scheduledExecutor.execute(command);
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
index baab5227f09d7..988f90b6d5786 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java
@@ -21,7 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
-import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.NoMainThreadCheckComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -111,7 +111,7 @@ public static DefaultScheduler createAndInitScheduler(
final JobGraph jobGraph = createJobGraph(jobVertices, jobConfiguration);
final ComponentMainThreadExecutor mainThreadExecutor =
- ComponentMainThreadExecutorServiceAdapter.forMainThread();
+ new NoMainThreadCheckComponentMainThreadExecutor();
DefaultSchedulerBuilder schedulerBuilder =
new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutorService)