Skip to content

Commit 5ededeb

Browse files
committed
gh-146392: Delay Executor.map shutdown error until buffer is drained
1 parent 6009309 commit 5ededeb

File tree

3 files changed

+50
-7
lines changed

3 files changed

+50
-7
lines changed

Lib/concurrent/futures/_base.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -651,21 +651,35 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
651651
# Yield must be hidden in closure so that the futures are submitted
652652
# before the first iterator value is required.
653653
def result_iterator():
654+
submit_exc = None
655+
exhausted = object()
656+
654657
try:
655658
# reverse to keep finishing order
656659
fs.reverse()
657660
while fs:
658-
if (
659-
buffersize
660-
and (executor := executor_weakref())
661-
and (args := next(zipped_iterables, None))
662-
):
663-
fs.appendleft(executor.submit(fn, *args))
664-
# Careful not to keep a reference to the popped future
661+
if buffersize and submit_exc is None:
662+
args = next(zipped_iterables, exhausted)
663+
if args is not exhausted:
664+
executor = executor_weakref()
665+
if executor is None:
666+
submit_exc = RuntimeError(
667+
"cannot schedule new futures after shutdown"
668+
)
669+
else:
670+
try:
671+
fs.appendleft(executor.submit(fn, *args))
672+
except RuntimeError as exc:
673+
submit_exc = exc
674+
675+
# buffered result should still be yielded
665676
if timeout is None:
666677
yield _result_or_cancel(fs.pop())
667678
else:
668679
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
680+
681+
if submit_exc is not None:
682+
raise submit_exc
669683
finally:
670684
for future in fs:
671685
future.cancel()

Lib/test/test_concurrent_futures/executor.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,33 @@ def test_map_buffersize_when_buffer_is_full(self):
168168
msg="should have fetched only `buffersize` elements from `ints`.",
169169
)
170170

171+
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
172+
def test_map_buffersize_shutdown_drains_buffer_then_raises(self):
173+
with self.executor_type(max_workers=1) as executor:
174+
it = executor.map(str, range(8), buffersize=2)
175+
176+
self.assertEqual(next(it), "0")
177+
self.assertEqual(next(it), "1")
178+
with self.assertRaisesRegex(
179+
RuntimeError,
180+
"cannot schedule new futures after shutdown",
181+
):
182+
next(it)
183+
184+
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
185+
def test_map_buffersize_shutdown_after_iteration_started_drains_buffer_then_raises(self):
186+
with self.executor_type(max_workers=1) as executor:
187+
it = executor.map(str, range(8), buffersize=2)
188+
self.assertEqual(next(it), "0")
189+
190+
self.assertEqual(next(it), "1")
191+
self.assertEqual(next(it), "2")
192+
with self.assertRaisesRegex(
193+
RuntimeError,
194+
"cannot schedule new futures after shutdown",
195+
):
196+
next(it)
197+
171198
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
172199
def test_shutdown_race_issue12456(self):
173200
# Issue #12456: race condition at shutdown where trying to post a
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix Executor.map(buffersize=...) to drain buffered results before raising
2+
after shutdown. Patch by Shamil Abdulaev.

0 commit comments

Comments
 (0)