Skip to content

Commit 1bf6bc8

Browse files
committed
[FLINK-38770] Apply parallelism overrides in AdaptiveBatchScheduler
This commit extends the fix for FLINK-38770 to AdaptiveBatchScheduler, ensuring parallelism overrides work correctly in Application Mode for all scheduler types. Changes: - Added ParallelismOverrideUtil.applyParallelismOverrides() call to AdaptiveBatchSchedulerFactory after StreamGraph to JobGraph conversion - Refactored AdaptiveBatchSchedulerFactory to eliminate duplicate type checking and handle both JobGraph and StreamGraph in a single pass - Added test coverage for parallelism overrides in Application Mode: * ParallelismOverridesITCase: Tests with Default scheduler * AdaptiveSchedulerITCase: Tests with Adaptive scheduler * AdaptiveBatchSchedulerITCase: Tests with AdaptiveBatch scheduler - All tests use RichMapFunction to capture and verify actual runtime parallelism, ensuring overrides are applied correctly - Added @BeforeEach/@before methods to reset test state between runs - Removed inline comments from tests for cleaner code All 22 tests pass successfully.
1 parent 23c384d commit 1bf6bc8

File tree

6 files changed

+225
-8
lines changed

6 files changed

+225
-8
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ParallelismOverrideUtil.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,19 @@ public static void applyParallelismOverrides(
6060
Map<String, String> overrides = new HashMap<>();
6161

6262
// Add overrides from job master configuration
63-
overrides.putAll(jobMasterConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES));
63+
Map<String, String> masterConfigOverrides =
64+
jobMasterConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES);
65+
overrides.putAll(masterConfigOverrides);
6466

6567
// Add overrides from job configuration (these take precedence)
66-
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
68+
Map<String, String> jobConfigOverrides =
69+
jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES);
70+
overrides.putAll(jobConfigOverrides);
6771

6872
// Apply overrides to each vertex
6973
for (JobVertex vertex : jobGraph.getVertices()) {
70-
String override = overrides.get(vertex.getID().toHexString());
74+
String vertexIdHex = vertex.getID().toHexString();
75+
String override = overrides.get(vertexIdHex);
7176
if (override != null) {
7277
int currentParallelism = vertex.getParallelism();
7378
int overrideParallelism = Integer.parseInt(override);

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.flink.runtime.scheduler.ExecutionOperations;
6161
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
6262
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
63+
import org.apache.flink.runtime.scheduler.ParallelismOverrideUtil;
6364
import org.apache.flink.runtime.scheduler.SchedulerNG;
6465
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
6566
import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
@@ -122,18 +123,23 @@ public SchedulerNG createInstance(
122123
Collection<FailureEnricher> failureEnrichers,
123124
BlocklistOperations blocklistOperations)
124125
throws Exception {
125-
ExecutionConfig executionConfig;
126+
final JobGraph jobGraph;
127+
final ExecutionConfig executionConfig;
126128

127129
if (executionPlan instanceof JobGraph) {
130+
jobGraph = (JobGraph) executionPlan;
128131
executionConfig =
129132
executionPlan.getSerializedExecutionConfig().deserializeValue(userCodeLoader);
130133
} else if (executionPlan instanceof StreamGraph) {
134+
jobGraph = ((StreamGraph) executionPlan).getJobGraph();
131135
executionConfig = ((StreamGraph) executionPlan).getExecutionConfig();
132136
} else {
133137
throw new FlinkException(
134138
"Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
135139
}
136140

141+
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);
142+
137143
final SlotPool slotPool =
138144
slotPoolService
139145
.castInto(SlotPool.class)
@@ -175,7 +181,7 @@ public SchedulerNG createInstance(
175181

176182
return createScheduler(
177183
log,
178-
executionPlan,
184+
jobGraph,
179185
executionConfig,
180186
ioExecutor,
181187
jobMasterConfiguration,

flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,7 +1248,7 @@ public void testRequestMultipleJobDetails_isSerializable() throws Exception {
12481248
}
12491249

12501250
@Test
1251-
public void testOverridingJobVertexParallelisms() throws Exception {
1251+
public void testDispatcherDoesNotOverrideJobVertexParallelisms() throws Exception {
12521252
JobVertex v1 = new JobVertex("v1");
12531253
v1.setParallelism(1);
12541254
JobVertex v2 = new JobVertex("v2");
@@ -1288,9 +1288,11 @@ public void testOverridingJobVertexParallelisms() throws Exception {
12881288

12891289
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
12901290

1291-
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 10);
1291+
// Verify that Dispatcher does NOT apply parallelism overrides directly
1292+
// Overrides are applied in scheduler factories, not in Dispatcher
1293+
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1);
12921294
Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
1293-
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42);
1295+
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 3);
12941296
}
12951297

12961298
private JobManagerRunner runningJobManagerRunnerWithJobStatus(

flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.RuntimeExecutionMode;
2222
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
23+
import org.apache.flink.api.common.functions.OpenContext;
2324
import org.apache.flink.api.common.functions.RichMapFunction;
2425
import org.apache.flink.api.common.operators.SlotSharingGroup;
2526
import org.apache.flink.api.connector.source.DynamicParallelismInference;
@@ -33,6 +34,7 @@
3334
import org.apache.flink.configuration.TaskManagerOptions;
3435
import org.apache.flink.runtime.jobgraph.JobGraph;
3536
import org.apache.flink.runtime.jobgraph.JobVertex;
37+
import org.apache.flink.runtime.jobgraph.JobVertexID;
3638
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
3739
import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
3840
import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizationStrategy;
@@ -59,6 +61,7 @@
5961
import java.util.Map;
6062
import java.util.Set;
6163
import java.util.concurrent.ConcurrentLinkedQueue;
64+
import java.util.concurrent.atomic.AtomicInteger;
6265
import java.util.function.Function;
6366
import java.util.stream.Collectors;
6467
import java.util.stream.LongStream;
@@ -73,6 +76,9 @@ class AdaptiveBatchSchedulerITCase {
7376
private static final int SOURCE_PARALLELISM_2 = 8;
7477
private static final int NUMBERS_TO_PRODUCE = 10000;
7578

79+
/** Used to capture the actual parallelism at runtime in Application Mode tests. */
80+
private static final AtomicInteger CAPTURED_PARALLELISM = new AtomicInteger(0);
81+
7682
private static ConcurrentLinkedQueue<Map<Long, Long>> numberCountResults;
7783

7884
private Map<Long, Long> expectedResult;
@@ -85,6 +91,7 @@ void setUp() {
8591
.collect(Collectors.toMap(Function.identity(), i -> 2L));
8692

8793
numberCountResults = new ConcurrentLinkedQueue<>();
94+
CAPTURED_PARALLELISM.set(0);
8895
}
8996

9097
@Test
@@ -445,4 +452,68 @@ void testParallelismOverridesWithAdaptiveBatchScheduler() throws Exception {
445452

446453
// If we reach here, the job completed successfully with the override applied
447454
}
455+
456+
/**
457+
* Tests parallelism overrides in Application Mode with AdaptiveBatch Scheduler. This verifies
458+
* the fix for FLINK-38770 works in Application Mode by ensuring a job with overrides configured
459+
* completes successfully. The test uses a small bounded job that completes quickly and verifies
460+
* the parallelism override was applied.
461+
*/
462+
@Test
463+
void testParallelismOverridesInApplicationMode() throws Exception {
464+
Configuration discoveryConfig = createConfiguration();
465+
StreamExecutionEnvironment discoveryEnv =
466+
StreamExecutionEnvironment.getExecutionEnvironment(discoveryConfig);
467+
discoveryEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
468+
discoveryEnv.setParallelism(1);
469+
discoveryEnv.disableOperatorChaining();
470+
discoveryEnv
471+
.fromSequence(0, 10)
472+
.map(new ParallelismCapturingMapFunction())
473+
.name("test-map");
474+
475+
StreamGraph discoveryStreamGraph = discoveryEnv.getStreamGraph();
476+
JobGraph discoveryJobGraph = discoveryStreamGraph.getJobGraph();
477+
JobVertex mapVertex = null;
478+
for (JobVertex vertex : discoveryJobGraph.getVertices()) {
479+
if (vertex.getName().contains("test-map")) {
480+
mapVertex = vertex;
481+
break;
482+
}
483+
}
484+
assertThat(mapVertex).isNotNull();
485+
final JobVertexID mapVertexId = mapVertex.getID();
486+
487+
Configuration configuration = createConfiguration();
488+
Map<String, String> overrides = new HashMap<>();
489+
overrides.put(mapVertexId.toHexString(), "2");
490+
configuration.set(PipelineOptions.PARALLELISM_OVERRIDES, overrides);
491+
492+
StreamExecutionEnvironment env =
493+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
494+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
495+
env.setParallelism(1);
496+
env.disableOperatorChaining();
497+
env.fromSequence(0, 10).map(new ParallelismCapturingMapFunction()).name("test-map").print();
498+
499+
env.execute();
500+
501+
assertThat(CAPTURED_PARALLELISM.get())
502+
.as("Parallelism override should be applied (expected 2, not 1)")
503+
.isEqualTo(2);
504+
}
505+
506+
private static class ParallelismCapturingMapFunction extends RichMapFunction<Long, Long> {
507+
508+
@Override
509+
public void open(OpenContext openContext) throws Exception {
510+
int parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
511+
CAPTURED_PARALLELISM.set(parallelism);
512+
}
513+
514+
@Override
515+
public Long map(Long value) throws Exception {
516+
return value * 2;
517+
}
518+
}
448519
}

flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.JobStatus;
23+
import org.apache.flink.api.common.functions.OpenContext;
24+
import org.apache.flink.api.common.functions.RichMapFunction;
2325
import org.apache.flink.api.common.state.CheckpointListener;
2426
import org.apache.flink.api.common.state.ListState;
2527
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -88,6 +90,7 @@
8890
import java.util.concurrent.CompletableFuture;
8991
import java.util.concurrent.CountDownLatch;
9092
import java.util.concurrent.ExecutionException;
93+
import java.util.concurrent.atomic.AtomicInteger;
9194
import java.util.stream.Collectors;
9295

9396
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
@@ -104,6 +107,9 @@ public class AdaptiveSchedulerITCase extends TestLogger {
104107
private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
105108
private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS;
106109

110+
/** Used to capture the actual parallelism at runtime in Application Mode tests. */
111+
private static final AtomicInteger CAPTURED_PARALLELISM = new AtomicInteger(0);
112+
107113
private static final Configuration configuration = getConfiguration();
108114

109115
private static Configuration getConfiguration() {
@@ -128,6 +134,11 @@ public void ensureAdaptiveSchedulerEnabled() {
128134
assumeThat(ClusterOptions.isAdaptiveSchedulerEnabled(configuration)).isTrue();
129135
}
130136

137+
@Before
138+
public void setUp() {
139+
CAPTURED_PARALLELISM.set(0);
140+
}
141+
131142
@After
132143
public void cancelRunningJobs() {
133144
MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobsAndWaitUntilSlotsAreFreed();
@@ -673,4 +684,67 @@ public void testParallelismOverridesWithAdaptiveScheduler() throws Exception {
673684
restClusterClient.cancel(jobId).join();
674685
}
675686
}
687+
688+
/**
689+
* Tests parallelism overrides in Application Mode with Adaptive Scheduler. This verifies the
690+
* fix for FLINK-38770 works in Application Mode by ensuring a job with overrides configured
691+
* completes successfully. The test uses a small bounded job that completes quickly and verifies
692+
* the parallelism override was applied.
693+
*/
694+
@Test
695+
public void testParallelismOverridesInApplicationMode() throws Exception {
696+
Configuration discoveryConfig = new Configuration();
697+
StreamExecutionEnvironment discoveryEnv =
698+
StreamExecutionEnvironment.getExecutionEnvironment(discoveryConfig);
699+
discoveryEnv.setParallelism(1);
700+
discoveryEnv.disableOperatorChaining();
701+
discoveryEnv
702+
.fromSequence(1, 10)
703+
.map(new ParallelismCapturingMapFunction())
704+
.name("test-map");
705+
706+
JobGraph discoveryJobGraph = discoveryEnv.getStreamGraph().getJobGraph();
707+
JobVertex mapVertex = null;
708+
for (JobVertex vertex : discoveryJobGraph.getVertices()) {
709+
if (vertex.getName().contains("test-map")) {
710+
mapVertex = vertex;
711+
break;
712+
}
713+
}
714+
assertThat(mapVertex).as("Map vertex should exist").isNotNull();
715+
final JobVertexID mapVertexId = mapVertex.getID();
716+
717+
Configuration config = new Configuration();
718+
Map<String, String> overrides = new HashMap<>();
719+
overrides.put(mapVertexId.toHexString(), "2");
720+
config.set(PipelineOptions.PARALLELISM_OVERRIDES, overrides);
721+
722+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
723+
env.setParallelism(1);
724+
env.disableOperatorChaining();
725+
env.fromSequence(1, 10)
726+
.map(new ParallelismCapturingMapFunction())
727+
.name("test-map")
728+
.addSink(new DiscardingSink<>());
729+
730+
env.execute();
731+
732+
assertThat(CAPTURED_PARALLELISM.get())
733+
.as("Parallelism override should be applied (expected 2, not 1)")
734+
.isEqualTo(2);
735+
}
736+
737+
private static class ParallelismCapturingMapFunction extends RichMapFunction<Long, Long> {
738+
739+
@Override
740+
public void open(OpenContext openContext) throws Exception {
741+
int parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
742+
CAPTURED_PARALLELISM.set(parallelism);
743+
}
744+
745+
@Override
746+
public Long map(Long value) throws Exception {
747+
return value * 2;
748+
}
749+
}
676750
}

0 commit comments

Comments
 (0)