From 5ededeb807ed9f666cb61c718a3fcd1e0b1bde29 Mon Sep 17 00:00:00 2001 From: Shamil Abdulaev Date: Wed, 25 Mar 2026 01:03:42 +0300 Subject: [PATCH] gh-146392: Delay Executor.map shutdown error until buffer is drained --- Lib/concurrent/futures/_base.py | 28 ++++++++++++++----- Lib/test/test_concurrent_futures/executor.py | 27 ++++++++++++++++++ ...-03-25-01-03-05.gh-issue-146392.zb-lzE.rst | 2 ++ 3 files changed, 50 insertions(+), 7 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2026-03-25-01-03-05.gh-issue-146392.zb-lzE.rst diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index f506ce68aea5b2..87ebbbb0025b8f 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -651,21 +651,35 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None): # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): + submit_exc = None + exhausted = object() + try: # reverse to keep finishing order fs.reverse() while fs: - if ( - buffersize - and (executor := executor_weakref()) - and (args := next(zipped_iterables, None)) - ): - fs.appendleft(executor.submit(fn, *args)) - # Careful not to keep a reference to the popped future + if buffersize and submit_exc is None: + args = next(zipped_iterables, exhausted) + if args is not exhausted: + executor = executor_weakref() + if executor is None: + submit_exc = RuntimeError( + "cannot schedule new futures after shutdown" + ) + else: + try: + fs.appendleft(executor.submit(fn, *args)) + except RuntimeError as exc: + submit_exc = exc + + # buffered result should still be yielded if timeout is None: yield _result_or_cancel(fs.pop()) else: yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) + + if submit_exc is not None: + raise submit_exc finally: for future in fs: future.cancel() diff --git a/Lib/test/test_concurrent_futures/executor.py b/Lib/test/test_concurrent_futures/executor.py index a37c4d45f07b17..c3893036a7563d 100644 --- a/Lib/test/test_concurrent_futures/executor.py +++ b/Lib/test/test_concurrent_futures/executor.py @@ -168,6 +168,33 @@ def test_map_buffersize_when_buffer_is_full(self): msg="should have fetched only `buffersize` elements from `ints`.", ) + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_map_buffersize_shutdown_drains_buffer_then_raises(self): + with self.executor_type(max_workers=1) as executor: + it = executor.map(str, range(8), buffersize=2) + + self.assertEqual(next(it), "0") + self.assertEqual(next(it), "1") + with self.assertRaisesRegex( + RuntimeError, + "cannot schedule new futures after shutdown", + ): + next(it) + + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() + def test_map_buffersize_shutdown_after_iteration_started_drains_buffer_then_raises(self): + with self.executor_type(max_workers=1) as executor: + it = executor.map(str, range(8), buffersize=2) + self.assertEqual(next(it), "0") + + self.assertEqual(next(it), "1") + self.assertEqual(next(it), "2") + with self.assertRaisesRegex( + RuntimeError, + "cannot schedule new futures after shutdown", + ): + next(it) + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_shutdown_race_issue12456(self): # Issue #12456: race condition at shutdown where trying to post a diff --git a/Misc/NEWS.d/next/Library/2026-03-25-01-03-05.gh-issue-146392.zb-lzE.rst b/Misc/NEWS.d/next/Library/2026-03-25-01-03-05.gh-issue-146392.zb-lzE.rst new file mode 100644 index 00000000000000..819e1b9632a3c7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-25-01-03-05.gh-issue-146392.zb-lzE.rst @@ -0,0 +1,2 @@ +Fix Executor.map(buffersize=...) to drain buffered results before raising +after shutdown. Patch by Shamil Abdulaev.