Skip to content

Commit dc8cd2d

Browse files
committed
[FLINK-38770] Fix parallelism overrides ignored in Application Mode
This commit fixes a bug where pipeline.jobvertex-parallelism-overrides configuration was completely ignored when running jobs in Application Mode with Flink 2.0+. This broke the Flink Kubernetes Operator's autoscaler functionality. Root Cause: - FLINK-36446 introduced direct StreamGraph submission in Application Mode - Parallelism overrides were only applied in Dispatcher before JobGraph creation, but in Application Mode, StreamGraph→JobGraph conversion happens in scheduler factories, bypassing the Dispatcher's override logic Solution: - Created ParallelismOverrideUtil to centralize override application logic - Modified all scheduler factories (DefaultScheduler, AdaptiveScheduler, AdaptiveBatchScheduler) to apply overrides after StreamGraph→JobGraph conversion - Removed duplicate override logic from Dispatcher.java - Ensured correct configuration precedence: job config > job master config Testing: - Added 7 unit tests for ParallelismOverrideUtil covering all scenarios - Added 4 integration tests using MiniCluster to verify end-to-end behavior across different scheduler types - All tests pass successfully
1 parent a8bbcdc commit dc8cd2d

File tree

11 files changed

+663
-29
lines changed

11 files changed

+663
-29
lines changed

.claude/settings.local.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"Bash(find:*)",
5+
"WebFetch(domain:nightlies.apache.org)"
6+
],
7+
"deny": [],
8+
"ask": []
9+
}
10+
}

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.configuration.ConfigOptions;
3131
import org.apache.flink.configuration.Configuration;
3232
import org.apache.flink.configuration.HighAvailabilityOptions;
33-
import org.apache.flink.configuration.PipelineOptions;
3433
import org.apache.flink.configuration.WebOptions;
3534
import org.apache.flink.core.execution.CheckpointType;
3635
import org.apache.flink.core.execution.SavepointFormatType;
@@ -57,9 +56,7 @@
5756
import org.apache.flink.runtime.highavailability.JobResultEntry;
5857
import org.apache.flink.runtime.highavailability.JobResultStore;
5958
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
60-
import org.apache.flink.runtime.jobgraph.JobGraph;
6159
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
62-
import org.apache.flink.runtime.jobgraph.JobVertex;
6360
import org.apache.flink.runtime.jobgraph.JobVertexID;
6461
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
6562
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
@@ -591,10 +588,6 @@ private CompletableFuture<Boolean> isInGloballyTerminalState(JobID jobId) {
591588
}
592589

593590
private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan executionPlan) {
594-
if (executionPlan instanceof JobGraph) {
595-
applyParallelismOverrides((JobGraph) executionPlan);
596-
}
597-
598591
log.info("Submitting job '{}' ({}).", executionPlan.getName(), executionPlan.getJobID());
599592

600593
// track as an outstanding job
@@ -1628,25 +1621,6 @@ public CompletableFuture<Void> onRemovedExecutionPlan(JobID jobId) {
16281621
return CompletableFuture.runAsync(() -> terminateJob(jobId), getMainThreadExecutor(jobId));
16291622
}
16301623

1631-
private void applyParallelismOverrides(JobGraph jobGraph) {
1632-
Map<String, String> overrides = new HashMap<>();
1633-
overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
1634-
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
1635-
for (JobVertex vertex : jobGraph.getVertices()) {
1636-
String override = overrides.get(vertex.getID().toHexString());
1637-
if (override != null) {
1638-
int currentParallelism = vertex.getParallelism();
1639-
int overrideParallelism = Integer.parseInt(override);
1640-
log.info(
1641-
"Changing job vertex {} parallelism from {} to {}",
1642-
vertex.getID(),
1643-
currentParallelism,
1644-
overrideParallelism);
1645-
vertex.setParallelism(overrideParallelism);
1646-
}
1647-
}
1648-
}
1649-
16501624
private Executor getIoExecutor(JobID jobID) {
16511625
// todo: consider caching
16521626
return MdcUtils.scopeToJob(jobID, ioExecutor);

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public SchedulerNG createInstance(
9595
"Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
9696
}
9797

98+
// Apply parallelism overrides after StreamGraph -> JobGraph conversion
99+
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);
100+
98101
final SlotPool slotPool =
99102
slotPoolService
100103
.castInto(SlotPool.class)
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.scheduler;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.PipelineOptions;
24+
import org.apache.flink.runtime.jobgraph.JobGraph;
25+
import org.apache.flink.runtime.jobgraph.JobVertex;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
33+
/**
34+
* Utility class for applying parallelism overrides from configuration to JobGraph vertices.
35+
*
36+
* <p>This utility must be called after converting StreamGraph to JobGraph in all SchedulerNGFactory
37+
* implementations to ensure parallelism overrides are respected in Application Mode (where
38+
* StreamGraph is submitted directly).
39+
*/
40+
@Internal
41+
public class ParallelismOverrideUtil {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(ParallelismOverrideUtil.class);
44+
45+
/**
46+
* Applies parallelism overrides from configuration to the JobGraph.
47+
*
48+
* <p>Overrides are taken from two sources (in order of precedence):
49+
*
50+
* <ol>
51+
* <li>JobGraph configuration (higher precedence)
52+
* <li>Job master configuration (lower precedence)
53+
* </ol>
54+
*
55+
* @param jobGraph the JobGraph to modify
56+
* @param jobMasterConfiguration the job master configuration containing potential overrides
57+
*/
58+
public static void applyParallelismOverrides(
59+
JobGraph jobGraph, Configuration jobMasterConfiguration) {
60+
Map<String, String> overrides = new HashMap<>();
61+
62+
// Add overrides from job master configuration
63+
overrides.putAll(jobMasterConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES));
64+
65+
// Add overrides from job configuration (these take precedence)
66+
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
67+
68+
// Apply overrides to each vertex
69+
for (JobVertex vertex : jobGraph.getVertices()) {
70+
String override = overrides.get(vertex.getID().toHexString());
71+
if (override != null) {
72+
int currentParallelism = vertex.getParallelism();
73+
int overrideParallelism = Integer.parseInt(override);
74+
LOG.info(
75+
"Applying parallelism override for job vertex {} ({}): {} -> {}",
76+
vertex.getName(),
77+
vertex.getID(),
78+
currentParallelism,
79+
overrideParallelism);
80+
vertex.setParallelism(overrideParallelism);
81+
}
82+
}
83+
}
84+
85+
private ParallelismOverrideUtil() {
86+
// Utility class, not meant to be instantiated
87+
}
88+
}

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.flink.runtime.rpc.FatalErrorHandler;
4343
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
4444
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
45+
import org.apache.flink.runtime.scheduler.ParallelismOverrideUtil;
4546
import org.apache.flink.runtime.scheduler.SchedulerNG;
4647
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
4748
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
@@ -99,6 +100,9 @@ public SchedulerNG createInstance(
99100
"Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
100101
}
101102

103+
// Apply parallelism overrides after StreamGraph -> JobGraph conversion
104+
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);
105+
102106
final DeclarativeSlotPool declarativeSlotPool =
103107
slotPoolService
104108
.castInto(DeclarativeSlotPool.class)

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ public static AdaptiveBatchScheduler createScheduler(
285285
executionPlan,
286286
jobRecoveryHandler instanceof DefaultBatchJobRecoveryHandler,
287287
userCodeLoader,
288-
futureExecutor);
288+
futureExecutor,
289+
jobMasterConfiguration);
289290

290291
return new AdaptiveBatchScheduler(
291292
log,

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
package org.apache.flink.runtime.scheduler.adaptivebatch;
2020

21+
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.runtime.jobgraph.JobGraph;
23+
import org.apache.flink.runtime.scheduler.ParallelismOverrideUtil;
2224
import org.apache.flink.streaming.api.graph.ExecutionPlan;
2325
import org.apache.flink.streaming.api.graph.StreamGraph;
2426
import org.apache.flink.util.DynamicCodeLoadingException;
@@ -46,6 +48,8 @@ public class AdaptiveExecutionHandlerFactory {
4648
* @param enableBatchJobRecovery Whether to enable batch job recovery.
4749
* @param userClassLoader The class loader for the user code.
4850
* @param serializationExecutor The executor used for serialization tasks.
51+
* @param jobMasterConfiguration The job master configuration containing potential parallelism
52+
* overrides.
4953
* @return An instance of {@link AdaptiveExecutionHandler}.
5054
* @throws IllegalArgumentException if the execution plan is neither a {@link JobGraph} nor a
5155
* {@link StreamGraph}.
@@ -54,15 +58,19 @@ public static AdaptiveExecutionHandler create(
5458
ExecutionPlan executionPlan,
5559
boolean enableBatchJobRecovery,
5660
ClassLoader userClassLoader,
57-
Executor serializationExecutor)
61+
Executor serializationExecutor,
62+
Configuration jobMasterConfiguration)
5863
throws DynamicCodeLoadingException {
5964
if (executionPlan instanceof JobGraph) {
6065
return new NonAdaptiveExecutionHandler((JobGraph) executionPlan);
6166
} else {
6267
checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan.");
6368
if (enableBatchJobRecovery) {
6469
StreamGraph streamGraph = (StreamGraph) executionPlan;
65-
return new NonAdaptiveExecutionHandler(streamGraph.getJobGraph(userClassLoader));
70+
JobGraph jobGraph = streamGraph.getJobGraph(userClassLoader);
71+
// Apply parallelism overrides after StreamGraph -> JobGraph conversion
72+
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);
73+
return new NonAdaptiveExecutionHandler(jobGraph);
6674
} else {
6775
return new DefaultAdaptiveExecutionHandler(
6876
userClassLoader, (StreamGraph) executionPlan, serializationExecutor);

0 commit comments

Comments
 (0)