Fix KubernetesExecutor scheduler crash when pod_override is queued in-cluster#68819
Fix KubernetesExecutor scheduler crash when pod_override is queued in-cluster#68819vatsrahul1001 wants to merge 1 commit into
Conversation
fb24244 to
42916df
Compare
|
Thanks @amoghrajesh — agreed on the "wrong layer" point, switched away from serialize/deserialize. I went with your in-place approach but with one correction. Nulling The model setters hit during Fix: reset it to a fresh Validated on the deployment (no crash, task runs/finishes) and added a regression test that asserts both picklability and that Root cause is the v36 model-constructor change ( Drafted-by: Claude Code (Opus 4.8); reviewed by @vatsrahul1001 before posting |
amoghrajesh
left a comment
There was a problem hiding this comment.
Some nits and comments, otherwise LGTM.
…-cluster When the scheduler runs in-cluster, kubernetes-python-client v36 changed model constructors to use Configuration.get_default_copy() instead of Configuration() (kubernetes-client/python#2532, OpenAPI Generator v6.6.0). So every V1Pod built after load_incluster_config() captures the global in-cluster Configuration, whose refresh_api_key_hook is a local closure (InClusterConfigLoader._set_config.<locals>._refresh_api_key). pickle cannot serialize a local closure, so putting a task's pod_override V1Pod on the executor's multiprocessing queue raises PicklingError and crashes the scheduler in a loop. This affects any in-cluster KubernetesExecutor deployment where a task sets a V1Pod pod_override, independent of the Airflow version; pinning the client below 36 is not viable because 35.x has a separate no_proxy regression. Reset local_vars_configuration to a fresh Configuration() on the pod_override (recursively) before queuing -- exactly what v35 model constructors produced. It carries no in-cluster auth hook so the pod is picklable, while keeping client_side_validation so the worker-side reconcile_pods setters still work. (Setting it to None instead breaks reconcile: model setters dereference self.local_vars_configuration.client_side_validation.) The pod keeps its V1Pod type through the queue, so run_next is unchanged.
42916df to
7cf53dc
Compare
|
Thanks for the review @amoghrajesh! Addressed the nits:
Drafted-by: Claude Code (Opus 4.8); reviewed by @vatsrahul1001 before posting |
| ) | ||
|
|
||
|
|
||
| def _reset_local_vars_configuration(obj: Any) -> None: |
There was a problem hiding this comment.
| def _reset_local_vars_configuration(obj: Any) -> None: | |
| def _reset_local_vars_configuration(node: Any) -> None: |
IMO, this is more readable. I don't quite understand what obj is in the original context.
Also, should we type it? looks like we know the potential type.
| _reset_local_vars_configuration(v) | ||
| elif isinstance(obj, (list, tuple)): | ||
| for item in obj: | ||
| _reset_local_vars_configuration(item) |
There was a problem hiding this comment.
What should happen if we go into the else block? Is it expected to do nothing? or not expcted to happen
| ), | ||
| ) | ||
| self._install_unpicklable_incluster_hook(pod) | ||
| with pytest.raises((pickle.PicklingError, AttributeError, TypeError)): |
There was a problem hiding this comment.
Would like to check when will AttributeError, TypeError happen in different versions?
|
Closing in favour of #68831 |
ephraimbuddy
left a comment
There was a problem hiding this comment.
I've opened #68831, which fixes the same crash one layer up — at the airflow-core deserialize path where that Configuration actually gets attached. The pod_override the scheduler pickles onto the queue is produced by BaseSerialization.deserialize / ensure_pod_is_valid_after_unpickling (via ExecutorConfigType), not by the executor itself; deserializing there through a fresh Configuration() keeps the pod (and every nested model) picklable for every consumer of a deserialized V1Pod, not just the KubernetesExecutor queue. It also gives PodGenerator.deserialize_model_dict the same treatment, so the provider's worker-side reconstruction is covered too.
Since that resolves the crash at the source and makes the executor-level reset redundant, I think we can consolidate on #68831 and close this one. Let me know if that's reasonable?
|
Ah, thanks for closing |
Problem
When the scheduler runs in-cluster, the kubernetes Python client attaches an in-cluster
Configurationto everyV1Pod, and thatConfiguration.refresh_api_key_hookis a local closure (InClusterConfigLoader._set_config.<locals>._refresh_api_key).picklecannot serialize a local closure.KubernetesExecutorputs each task'sKubernetesJob(which embeds the task'spod_overrideV1Pod) onto amultiprocessing.Managerqueue, whose.put()pickles synchronously in the scheduler. So any in-cluster deployment where a task sets aV1Podpod_overridecrashes the scheduler in a loop:Fix
Serialize the
pod_overrideto a plain dict (dropping theConfiguration) before it is queued, using the existingPodGenerator.serialize_pod, and rebuild theV1Podworker-side inrun_nextviaPodGenerator.deserialize_model_dict. The worker already reconstructs its own kube client, so nothing is lost. The worker useskube_executor_configfor the pod override; the sameV1Podis also referenced by the workload'sexecutor_config, so that copy is sanitized too (it is otherwise pickled as part of the workload).This keeps
pod_overrideworking regardless of the kubernetes client version, rather than depending on a client version whoseConfigurationhappens to be picklable. The worker side already sanitizes pods withsanitize_for_serialization(kubernetes_executor_utils.py:492) — this closes the same gap on the scheduler→queue boundary.Tests
test_execute_async_queues_picklable_pod_override: builds apod_overridewhoseConfigurationcarries an unpicklable local-closure hook, asserts the raw pod is unpicklable (reproduces the crash), then asserts the queuedKubernetesJobpickles cleanly and the serialized dict round-trips back to an equivalentV1Pod.test_pod_template_file_override_in_executor_configto reflect the queuedkube_executor_confignow being a dict; its existingrun_nextround-trip assertion (the reconstructed pod) is unchanged and still passes.test_kubernetes_executor.pysuite passes (111 passed, 1 skipped AF<3.2-only).Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.8) following the guidelines