From 637f4a5ed7901f8f320f5486004b595c4d357fed Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 19:18:08 +0530 Subject: [PATCH 01/10] [Observability] Log interrupted processing tasks on unexpected worker death When a worker drops off the cluster unexpectedly (e.g., due to an OOM kill), the scheduler tracks the processing_keys but previously did not log them to the console. This change surfaces exactly which tasks were interrupted, significantly improving debugging provenance for cluster hangs and memory crashes. --- distributed/scheduler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 445ffd96e8..663e3829e2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5685,6 +5685,12 @@ async def remove_worker( f"Removing worker {ws.address!r} caused the cluster to lose scattered " f"data, which can't be recovered: {lost_keys} ({stimulus_id=})" ) + if not expected and processing_keys: + logger.warning( + f"Worker {ws.address!r} dropped unexpectedly. " + f"Interrupting {len(processing_keys)} processing tasks: " + f"{processing_keys} ({stimulus_id=})" + ) event_msg = { "action": "remove-worker", From 1f19b6f686d16f40654413cfa55c211bf4b31c12 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 19:18:08 +0530 Subject: [PATCH 02/10] Fix test_log_remove_worker to expect new observability warning --- distributed/tests/test_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 5dbf62a430..39bf286d84 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2995,6 +2995,8 @@ async def test_log_remove_worker(c, s, a, b): "(stimulus_id='ungraceful')", f"Removing worker '{b.address}' caused the cluster to lose scattered " "data, which can't be recovered: {'z'} (stimulus_id='ungraceful')", + f"Worker {b.address!r} dropped unexpectedly. Interrupting 1 " + "processing tasks: {'y'} (stimulus_id='ungraceful')", "Lost all workers", ] From d9cdd23e2abf503e38e83206254aa241dee8d93e Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 19:18:09 +0530 Subject: [PATCH 03/10] [Heuristics] Introduce margin-of-improvement bound to prevent work-stealing thrashing --- distributed/stealing.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index cc6f1e6d65..d6036cd33e 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -486,8 +486,11 @@ def balance(self) -> None: comm_cost_thief = self.scheduler.get_comm_cost(ts, thief) comm_cost_victim = self.scheduler.get_comm_cost(ts, victim) compute = self.scheduler._get_prefix_duration(ts.prefix) + + # Require at least 50% ROI on the network transfer cost to prevent thrashing + margin = comm_cost_thief * 0.5 if ( - occ_thief + comm_cost_thief + compute + occ_thief + comm_cost_thief + compute + margin <= occ_victim - (comm_cost_victim + compute) / 2 ): self.move_task_request(ts, victim, thief) From 9f8516383935fe4e4ba579b5efa4d1f90e410a74 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 19:18:09 +0530 Subject: [PATCH 04/10] [Heuristics] Add steal rejection tracking metric and unit test --- distributed/stealing.py | 19 +++++++++++++++++-- distributed/tests/test_steal.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index d6036cd33e..fec0a0090a 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -118,6 +118,7 @@ def __init__(self, scheduler: Scheduler): self.metrics = { "request_count_total": defaultdict(int), "request_cost_total": defaultdict(int), + "reject_count_margin_total": defaultdict(int), } self._request_counter = 0 self.scheduler.stream_handlers["steal-response"] = self.move_task_confirm @@ -489,10 +490,17 @@ def balance(self) -> None: # Require at least 50% ROI on the network transfer cost to prevent thrashing margin = comm_cost_thief * 0.5 - if ( + + would_steal_without_margin = ( + occ_thief + comm_cost_thief + compute + <= occ_victim - (comm_cost_victim + compute) / 2 + ) + would_steal_with_margin = ( occ_thief + comm_cost_thief + compute + margin <= occ_victim - (comm_cost_victim + compute) / 2 - ): + ) + + if would_steal_with_margin: self.move_task_request(ts, victim, thief) cost = compute + comm_cost_victim log.append( @@ -523,6 +531,13 @@ def balance(self) -> None: # for removing ts from stealable. If we made sure to # properly clean up, we would not need this stealable.discard(ts) + elif would_steal_without_margin: + self.metrics["reject_count_margin_total"][level] += 1 + logger.debug( + "Work-stealing margin heuristic rejected steal of task %s " + "(thief=%s, victim=%s, level=%d, margin=%.4f)", + ts.key, thief.address, victim.address, level, margin, + ) self.scheduler.check_idle_saturated( victim, occ=combined_occupancy(victim) ) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 758a6e03c7..d14f73e6dd 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -2010,6 +2010,36 @@ def block(i: int, in_event: Event, block_event: Event) -> int: await block_event.set() +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 1)] * 2, + config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM}, +) +async def test_reject_count_margin_metric(c, s, a, b): + """ + Verify that the margin heuristic increments reject_count_margin_total + when a steal is suppressed that old logic would have permitted. + """ + steal = s.extensions["stealing"] + await steal.stop() + + # Generate large data on worker A to ensure high network transfer cost + [x] = await c.scatter([b"0" * 50_000_000], workers=a.address) + + # Create tasks on A to saturate it and trigger stealing evaluation + futures = [ + c.submit(slowidentity, x, pure=False, delay=0.01, workers=a.address, allow_other_workers=True) + for _ in range(10) + ] + + while len(a.state.tasks) < 10: + await asyncio.sleep(0.01) + + # Balance will evaluate the cost. High comm_cost, low compute. + # Without margin, it would steal. With 50% ROI margin, it should reject. + steal.balance() + + assert sum(steal.metrics["reject_count_margin_total"].values()) >= 1 @gen_cluster( nthreads=[("", 1)], client=True, From ba80c235977e32527f31f9c8a1046a769515b911 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 19:18:09 +0530 Subject: [PATCH 05/10] [Heuristics] Fix failing unit tests due to margin constraint --- distributed/tests/test_steal.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index d14f73e6dd..b07984405f 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1448,8 +1448,8 @@ def func(*args): "cost, ntasks, expect_steal", [ pytest.param(10, 10, False, id="not enough work to steal"), - pytest.param(10, 12, True, id="enough work to steal"), - pytest.param(20, 12, False, id="not enough work for increased cost"), + pytest.param(10, 17, True, id="enough work to steal"), + pytest.param(20, 17, False, id="not enough work for increased cost"), ], ) def test_balance_expensive_tasks(cost, ntasks, expect_steal): @@ -2013,7 +2013,11 @@ def block(i: int, in_event: Event, block_event: Event) -> int: @gen_cluster( client=True, nthreads=[("127.0.0.1", 1)] * 2, - config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM}, + config={ + "distributed.scheduler.work-stealing-interval": "100ms", + "distributed.scheduler.default-task-durations": {"slowidentity": 0.01}, + **NO_AMM + }, ) async def test_reject_count_margin_metric(c, s, a, b): """ @@ -2023,16 +2027,16 @@ async def test_reject_count_margin_metric(c, s, a, b): steal = s.extensions["stealing"] await steal.stop() - # Generate large data on worker A to ensure high network transfer cost - [x] = await c.scatter([b"0" * 50_000_000], workers=a.address) + # Generate large data on worker A to ensure high network transfer cost (~0.1s) + [x] = await c.scatter([b"0" * 10_000_000], workers=a.address) - # Create tasks on A to saturate it and trigger stealing evaluation + # Create 14 tasks on A to saturate it (0.01s each). occ_victim will be ~0.14s. futures = [ c.submit(slowidentity, x, pure=False, delay=0.01, workers=a.address, allow_other_workers=True) - for _ in range(10) + for _ in range(14) ] - while len(a.state.tasks) < 10: + while len(a.state.tasks) < 14: await asyncio.sleep(0.01) # Balance will evaluate the cost. High comm_cost, low compute. From 0f1af10c33bcf454a6e1da84c1d32a347b62364b Mon Sep 17 00:00:00 2001 From: prince8273 Date: Thu, 14 May 2026 19:18:09 +0530 Subject: [PATCH 06/10] fix: apply pre-commit formatting fixes --- distributed/stealing.py | 8 ++++++-- distributed/tests/test_steal.py | 17 +++++++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index fec0a0090a..a0aef63a28 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -490,7 +490,7 @@ def balance(self) -> None: # Require at least 50% ROI on the network transfer cost to prevent thrashing margin = comm_cost_thief * 0.5 - + would_steal_without_margin = ( occ_thief + comm_cost_thief + compute <= occ_victim - (comm_cost_victim + compute) / 2 @@ -536,7 +536,11 @@ def balance(self) -> None: logger.debug( "Work-stealing margin heuristic rejected steal of task %s " "(thief=%s, victim=%s, level=%d, margin=%.4f)", - ts.key, thief.address, victim.address, level, margin, + ts.key, + thief.address, + victim.address, + level, + margin, ) self.scheduler.check_idle_saturated( victim, occ=combined_occupancy(victim) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index b07984405f..f52c2c8f4f 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -2016,7 +2016,7 @@ def block(i: int, in_event: Event, block_event: Event) -> int: config={ "distributed.scheduler.work-stealing-interval": "100ms", "distributed.scheduler.default-task-durations": {"slowidentity": 0.01}, - **NO_AMM + **NO_AMM, }, ) async def test_reject_count_margin_metric(c, s, a, b): @@ -2029,13 +2029,20 @@ async def test_reject_count_margin_metric(c, s, a, b): # Generate large data on worker A to ensure high network transfer cost (~0.1s) [x] = await c.scatter([b"0" * 10_000_000], workers=a.address) - + # Create 14 tasks on A to saturate it (0.01s each). occ_victim will be ~0.14s. futures = [ - c.submit(slowidentity, x, pure=False, delay=0.01, workers=a.address, allow_other_workers=True) + c.submit( + slowidentity, + x, + pure=False, + delay=0.01, + workers=a.address, + allow_other_workers=True, + ) for _ in range(14) ] - + while len(a.state.tasks) < 14: await asyncio.sleep(0.01) @@ -2044,6 +2051,8 @@ async def test_reject_count_margin_metric(c, s, a, b): steal.balance() assert sum(steal.metrics["reject_count_margin_total"].values()) >= 1 + + @gen_cluster( nthreads=[("", 1)], client=True, From f1fcc4979f40e94bcf33116ff6b95b80fb8b8bcc Mon Sep 17 00:00:00 2001 From: prince8273 Date: Sat, 16 May 2026 12:08:35 +0530 Subject: [PATCH 07/10] Add margin-of-improvement bound to prevent work-stealing thrashing - Add reject_count_margin_total metric to WorkStealing.metrics - Add observability logging for interrupted tasks in scheduler.py - Add test_reject_count_margin_metric to test_steal.py - Revert accidental range() changes in test_steal.py Signed-off-by: prince8273 --- distributed/tests/test_steal.py | 50 ++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index f52c2c8f4f..ec254d37b7 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -11,7 +11,7 @@ from collections.abc import Callable, Coroutine, Iterable, Mapping, Sequence from operator import mul from time import sleep - +from unittest.mock import patch import pytest from tlz import merge, sliding_window @@ -2015,7 +2015,7 @@ def block(i: int, in_event: Event, block_event: Event) -> int: nthreads=[("127.0.0.1", 1)] * 2, config={ "distributed.scheduler.work-stealing-interval": "100ms", - "distributed.scheduler.default-task-durations": {"slowidentity": 0.01}, + "distributed.scheduler.default-task-durations": {"slowidentity": 0.021}, **NO_AMM, }, ) @@ -2027,31 +2027,37 @@ async def test_reject_count_margin_metric(c, s, a, b): steal = s.extensions["stealing"] await steal.stop() - # Generate large data on worker A to ensure high network transfer cost (~0.1s) - [x] = await c.scatter([b"0" * 10_000_000], workers=a.address) - - # Create 14 tasks on A to saturate it (0.01s each). occ_victim will be ~0.14s. - futures = [ - c.submit( - slowidentity, - x, - pure=False, - delay=0.01, - workers=a.address, - allow_other_workers=True, - ) - for _ in range(14) - ] + futures = c.map( + slowidentity, + range(21), + workers=a.address, + allow_other_workers=True, + delay=0.021, + ) + + while len(s.tasks) < 21: + await asyncio.sleep(0.01) - while len(a.state.tasks) < 14: + while len(a.state.tasks) < 21: await asyncio.sleep(0.01) - # Balance will evaluate the cost. High comm_cost, low compute. - # Without margin, it would steal. With 50% ROI margin, it should reject. - steal.balance() + for ws in s.workers.values(): + s.check_idle_saturated(ws) - assert sum(steal.metrics["reject_count_margin_total"].values()) >= 1 + a_ws = s.workers[a.address] + b_ws = s.workers[b.address] + assert a_ws in s.saturated, ( + f"Worker A not saturated: occupancy={a_ws.occupancy:.3f}, " + f"nthreads={a_ws.nthreads}, processing={len(a_ws.processing)}" + ) + assert b_ws in s.idle.values(), f"Worker B not idle: processing={len(b_ws.processing)}" + with patch.object( + s, "get_comm_cost", side_effect=lambda ts, ws: 0.3 if ws == b_ws else 0.0 + ): + steal.balance() + + assert sum(steal.metrics["reject_count_margin_total"].values()) >= 1 @gen_cluster( nthreads=[("", 1)], From 62efb1532a40767709a99b0f00884759146ac0de Mon Sep 17 00:00:00 2001 From: prince8273 Date: Sat, 16 May 2026 12:15:12 +0530 Subject: [PATCH 08/10] Fix linting Signed-off-by: prince8273 --- distributed/tests/test_steal.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index ec254d37b7..8d881b0cde 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -2050,7 +2050,9 @@ async def test_reject_count_margin_metric(c, s, a, b): f"Worker A not saturated: occupancy={a_ws.occupancy:.3f}, " f"nthreads={a_ws.nthreads}, processing={len(a_ws.processing)}" ) - assert b_ws in s.idle.values(), f"Worker B not idle: processing={len(b_ws.processing)}" + assert ( + b_ws in s.idle.values() + ), f"Worker B not idle: processing={len(b_ws.processing)}" with patch.object( s, "get_comm_cost", side_effect=lambda ts, ws: 0.3 if ws == b_ws else 0.0 @@ -2059,6 +2061,7 @@ async def test_reject_count_margin_metric(c, s, a, b): assert sum(steal.metrics["reject_count_margin_total"].values()) >= 1 + @gen_cluster( nthreads=[("", 1)], client=True, From 354fce88358f02b1ad31153109a98eba4ea2bfb4 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Sat, 16 May 2026 12:24:31 +0530 Subject: [PATCH 09/10] Fix import sorting in test_steal.py Signed-off-by: prince8273 --- distributed/tests/test_steal.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 8d881b0cde..29ec4abc3a 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -12,6 +12,7 @@ from operator import mul from time import sleep from unittest.mock import patch + import pytest from tlz import merge, sliding_window From 617590ff8f12ef087a09d66b82703d5d21425079 Mon Sep 17 00:00:00 2001 From: prince8273 Date: Sun, 17 May 2026 09:04:18 +0530 Subject: [PATCH 10/10] docs: justify 0.5 margin factor and clarify threshold changes in tests --- distributed/stealing.py | 4 +++- distributed/tests/test_steal.py | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index a0aef63a28..5aee84a017 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -488,7 +488,9 @@ def balance(self) -> None: comm_cost_victim = self.scheduler.get_comm_cost(ts, victim) compute = self.scheduler._get_prefix_duration(ts.prefix) - # Require at least 50% ROI on the network transfer cost to prevent thrashing + # Be conservative about marginal steals: require headroom equal + # to 50% of the thief's transfer cost to absorb estimation noise + # and routine network jitter. margin = comm_cost_thief * 0.5 would_steal_without_margin = ( diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 29ec4abc3a..a09305fdd9 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1449,6 +1449,8 @@ def func(*args): "cost, ntasks, expect_steal", [ pytest.param(10, 10, False, id="not enough work to steal"), + # The 50% margin heuristic raises the minimum backlog needed to justify + # stealing these expensive tasks; 12 was enough before, 17 is enough now. pytest.param(10, 17, True, id="enough work to steal"), pytest.param(20, 17, False, id="not enough work for increased cost"), ], @@ -2028,6 +2030,9 @@ async def test_reject_count_margin_metric(c, s, a, b): steal = s.extensions["stealing"] await steal.stop() + # Use enough short tasks to satisfy Scheduler.check_idle_saturated() for a + # single busy worker while still keeping the steal in the margin-rejection + # window once get_comm_cost() is patched below. futures = c.map( slowidentity, range(21),