From 65e4f499c7b51aa74fda91b123ece21fdfefb8b1 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Mon, 6 Apr 2026 19:10:07 -0400 Subject: [PATCH 1/2] Add a ResourceBudget mechanism which keeps disk usage in check Artifacts stay on disk between the ArtifactDownloader stage and the ArtifactSaver stage. If too many large files build up, it can exceed the allotted filesystem space of the working directory. Previously we used unecessarily small batch sizes by default in order to ensure the worst case was avoided. This approach dynamically controls how much disk space is being used by the task and provides backpressure when the limit is exceeded, flushing batches and preventing new artifacts from being downloaded. closes #7559 Assisted-By: claude-opus-4.6 --- CHANGES/7559.feature | 1 + docs/admin/reference/settings.md | 34 ++- pulpcore/app/settings.py | 9 +- pulpcore/plugin/stages/__init__.py | 1 + pulpcore/plugin/stages/api.py | 23 +- pulpcore/plugin/stages/artifact_stages.py | 156 ++++++++++++- pulpcore/plugin/stages/declarative_version.py | 7 +- .../tests/unit/stages/test_resource_budget.py | 216 ++++++++++++++++++ pulpcore/tests/unit/stages/test_stages.py | 90 ++++++++ 9 files changed, 520 insertions(+), 17 deletions(-) create mode 100644 CHANGES/7559.feature create mode 100644 pulpcore/tests/unit/stages/test_resource_budget.py diff --git a/CHANGES/7559.feature b/CHANGES/7559.feature new file mode 100644 index 00000000000..5ddc9b47b2e --- /dev/null +++ b/CHANGES/7559.feature @@ -0,0 +1 @@ +Add a configurable ResourceBudget for preventing over-subscription of the disk "properly". Adds a backpressure mechanism + flushing mechanism in order to ensure that batches get fully processed even if minsize hasn't yet been reached. Allows previous performance-reducing mitigations to be removed. diff --git a/docs/admin/reference/settings.md b/docs/admin/reference/settings.md index 20cbf5b03fe..2f57fe5ac07 100644 --- a/docs/admin/reference/settings.md +++ b/docs/admin/reference/settings.md @@ -474,10 +474,38 @@ Defaults to `/var/lib/pulp/tmp/`. ### MAX\_CONCURRENT\_CONTENT -The size of the batch of content processed in one go when syncing content from -a remote. +The maximum number of concurrent downloads during sync. Controls how many HTTP +download tasks can run in parallel within the `ArtifactDownloader` pipeline stage. -Defaults to 25. +Defaults to 200. + +!!! warning "Deprecated" + This setting is deprecated and may be removed in a future release. + Use `SYNC_MAX_IN_FLIGHT_ITEMS` instead, which provides similar + functionality. If `MAX_CONCURRENT_CONTENT` is set by the user and + `SYNC_MAX_IN_FLIGHT_ITEMS` is not, its value will be used as + `SYNC_MAX_IN_FLIGHT_ITEMS` automatically. + +### SYNC\_MAX\_IN\_FLIGHT\_MB + +The maximum total size (in megabytes) of downloaded artifacts that are waiting to be +saved. This limits the temporary disk space consumed by artifacts that have been +downloaded by `ArtifactDownloader` but not yet persisted by `ArtifactSaver`. + +When set, small artifacts will download with high concurrency while large artifacts +will automatically throttle to avoid exhausting disk space. + +Defaults to 5120 (5gb) + +### SYNC\_MAX\_IN\_FLIGHT\_ITEMS + +The maximum number of downloaded artifacts that are waiting to be saved. Like +`SYNC_MAX_IN_FLIGHT_MB`, this limits the buffer between `ArtifactDownloader` and +`ArtifactSaver`, but counts items rather than bytes. + +This is useful as a fallback when artifact sizes are not known ahead of time. + +Defaults to `None` (no limit). ## Redis Settings diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index a4411f50eac..48d334063a0 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -403,7 +403,14 @@ DOMAIN_ENABLED = False -MAX_CONCURRENT_CONTENT = 25 +MAX_CONCURRENT_CONTENT = 200 + +# Resource budget for sync pipeline: limits total in-flight artifact data between +# the ArtifactDownloader and ArtifactSaver stages. When set, these allow higher download +# concurrency for small artifacts while preventing disk exhaustion for large ones. +# None means no limit for that dimension. +SYNC_MAX_IN_FLIGHT_MB = None # Maximum megabytes of in-flight downloaded artifacts +SYNC_MAX_IN_FLIGHT_ITEMS = None SHELL_PLUS_IMPORTS = [ "from pulpcore.app.util import get_domain, get_domain_pk, set_domain, get_url, extract_pk", diff --git a/pulpcore/plugin/stages/__init__.py b/pulpcore/plugin/stages/__init__.py index e526b52d8f8..bba5fc41b51 100644 --- a/pulpcore/plugin/stages/__init__.py +++ b/pulpcore/plugin/stages/__init__.py @@ -4,6 +4,7 @@ from .artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, + ArtifactResourceBudget, ArtifactSaver, GenericDownloader, QueryExistingArtifacts, diff --git a/pulpcore/plugin/stages/api.py b/pulpcore/plugin/stages/api.py index 1d8939fd66f..87694ad850c 100644 --- a/pulpcore/plugin/stages/api.py +++ b/pulpcore/plugin/stages/api.py @@ -77,7 +77,7 @@ async def run(self): break yield content - async def batches(self, minsize=500): + async def batches(self, minsize=500, flush_event=None): """ Asynchronous iterator yielding batches of [DeclarativeContent][] from `self._in_q`. @@ -87,6 +87,9 @@ async def batches(self, minsize=500): Args: minsize (int): The minimum batch size to yield (unless it is the final batch) + flush_event (asyncio.Event): Optional event that, when set, causes the current + batch to be yielded immediately regardless of `minsize`. This is used by + `ArtifactSaver` to flush when the resource budget is under pressure. Yields: A list of [DeclarativeContent][] instances @@ -124,13 +127,20 @@ def add_to_batch(content): get_listener = asyncio.ensure_future(self._in_q.get()) thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) + flush_event_listener = asyncio.ensure_future(flush_event.wait()) if flush_event else None while not shutdown: - done, pending = await asyncio.wait( - [thaw_event_listener, get_listener], return_when=asyncio.FIRST_COMPLETED - ) + waitables = [thaw_event_listener, get_listener] + if flush_event_listener: + waitables.append(flush_event_listener) + done, pending = await asyncio.wait(waitables, return_when=asyncio.FIRST_COMPLETED) if thaw_event_listener in done: thaw_event_listener = asyncio.ensure_future(thaw_queue_event.wait()) no_block = True + if flush_event_listener and flush_event_listener in done: + # Don't re-arm until after we yield a batch, to avoid a spin loop + # when the event stays set but the batch is empty. + flush_event_listener = None + no_block = True if get_listener in done: content = await get_listener add_to_batch(content) @@ -153,8 +163,13 @@ def add_to_batch(content): yield batch batch = [] no_block = False + # Re-arm the flush listener after yielding + if flush_event and flush_event_listener is None: + flush_event_listener = asyncio.ensure_future(flush_event.wait()) thaw_event_listener.cancel() get_listener.cancel() + if flush_event_listener: + flush_event_listener.cancel() async def put(self, item): """ diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index eeff04c2416..364de478612 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -23,6 +23,96 @@ log = logging.getLogger(__name__) +class ArtifactResourceBudget: + """Tracks aggregate resource consumption of in-flight artifacts. + + Coordinates between `ArtifactDownloader` (acquires budget) and + `ArtifactSaver` (releases budget) to limit total temporary disk usage + from downloaded-but-not-yet-saved artifacts. + + This allows higher download concurrency for small artifacts while still + protecting against disk exhaustion when syncing large artifacts. + + Args: + max_bytes (int): Maximum total bytes of in-flight downloaded artifacts. + `None` means no byte limit (only item limit applies). + max_items (int): Maximum number of in-flight downloaded artifacts. + `None` means no item limit (only byte limit applies). + """ + + def __init__(self, max_bytes=None, max_items=None): + self.max_bytes = max_bytes + self.max_items = max_items + self._current_bytes = 0 + self._current_items = 0 + self._available = asyncio.Event() + self._available.set() + self._lock = asyncio.Lock() + self.pressure = asyncio.Event() + + @classmethod + def from_settings(cls): + """Create an `ArtifactResourceBudget` from Django settings, or return `None`. + + Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings. + Returns `None` if neither setting is configured. + """ + max_mb = settings.SYNC_MAX_IN_FLIGHT_MB + max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS + if max_mb is None and max_items is None: + return None + return cls( + max_bytes=max_mb * 1024 * 1024 if max_mb is not None else None, + max_items=max_items, + ) + + async def acquire(self, estimated_bytes=0): + """Block until resource budget is available. + + Always allows at least one item through (even if over budget) when nothing + is currently in flight, to prevent deadlock. + + When the budget is exhausted and `acquire` must wait, the `pressure` event + is set to signal downstream stages (e.g. `ArtifactSaver`) to flush their + batches early and free up budget. + + Args: + estimated_bytes (int): Estimated size of the artifact(s) to be downloaded. + """ + while True: + async with self._lock: + # Always allow if nothing is in flight (prevents deadlock) + if self._current_items == 0: + self._current_bytes += estimated_bytes + self._current_items += 1 + return + + bytes_ok = self.max_bytes is None or ( + self._current_bytes + estimated_bytes <= self.max_bytes + ) + items_ok = self.max_items is None or self._current_items < self.max_items + + if bytes_ok and items_ok: + self._current_bytes += estimated_bytes + self._current_items += 1 + return + + self._available.clear() + self.pressure.set() + await self._available.wait() + + def release(self, actual_bytes=0): + """Release resources after an artifact is saved and its temp file deleted. + + Args: + actual_bytes (int): Actual size of the artifact that was saved. + """ + self._current_bytes = max(0, self._current_bytes - actual_bytes) + self._current_items = max(0, self._current_items - 1) + self.pressure.clear() + self._available.set() + + def _check_for_forbidden_checksum_type(artifact): """Check if content doesn't have forbidden checksum type. @@ -220,28 +310,57 @@ class ArtifactDownloader(GenericDownloader): Each [pulpcore.plugin.stages.DeclarativeContent][] is sent to `self._out_q` after all of its [pulpcore.plugin.stages.DeclarativeArtifact][] objects have been handled. + + Args: + resource_budget (ArtifactResourceBudget): Optional shared resource budget that + limits total in-flight artifact bytes/items between download and save. + max_concurrent_content (int): The maximum number of content units to handle + simultaneously. Default is from settings.MAX_CONCURRENT_CONTENT. + args: unused positional arguments passed along to + [pulpcore.plugin.stages.GenericDownloader][]. + kwargs: unused keyword arguments passed along to + [pulpcore.plugin.stages.GenericDownloader][]. """ PROGRESS_REPORTING_MESSAGE = "Downloading Artifacts" PROGRESS_REPORTING_CODE = "sync.downloading.artifacts" + def __init__(self, resource_budget=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resource_budget = resource_budget + async def _handle_content_unit(self, d_content): """Handle one content unit. Returns: The number of downloads """ - downloaders_for_content = [ - d_artifact.download() + d_artifacts_to_download = [ + d_artifact for d_artifact in d_content.d_artifacts if d_artifact.artifact._state.adding and not d_artifact.deferred_download and not d_artifact.artifact.file ] - if downloaders_for_content: - await asyncio.gather(*downloaders_for_content) - await self.put(d_content) - return len(downloaders_for_content) + + budget_acquired = 0 + if d_artifacts_to_download and self.resource_budget: + budget_acquired = sum( + d_artifact.artifact.size or 0 for d_artifact in d_artifacts_to_download + ) + await self.resource_budget.acquire(budget_acquired) + + try: + if d_artifacts_to_download: + await asyncio.gather(*(da.download() for da in d_artifacts_to_download)) + + await self.put(d_content) + except BaseException: + if budget_acquired and self.resource_budget: + self.resource_budget.release(budget_acquired) + raise + + return len(d_artifacts_to_download) class ArtifactSaver(Stage): @@ -259,8 +378,18 @@ class ArtifactSaver(Stage): This stage drains all available items from `self._in_q` and batches everything into one large call to the db for efficiency. + + Args: + resource_budget (ArtifactResourceBudget): Optional shared resource budget. + When provided, budget is released after artifacts are saved and temp files deleted. + args: unused positional arguments passed along to [pulpcore.plugin.stages.Stage][]. + kwargs: unused keyword arguments passed along to [pulpcore.plugin.stages.Stage][]. """ + def __init__(self, resource_budget=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.resource_budget = resource_budget + async def run(self): """ The coroutine for this stage. @@ -268,7 +397,10 @@ async def run(self): Returns: The coroutine for this stage. """ - async for batch in self.batches(minsize=settings.MAX_CONCURRENT_CONTENT): + flush_event = self.resource_budget.pressure if self.resource_budget else None + async for batch in self.batches( + minsize=settings.MAX_CONCURRENT_CONTENT, flush_event=flush_event + ): da_to_save = [] for d_content in batch: for d_artifact in d_content.d_artifacts: @@ -291,6 +423,16 @@ async def run(self): if await aos.path.exists(tmp_file_path): await aos.remove(tmp_file_path) + # Release budget after temp files are cleaned up so the downloader can proceed + if self.resource_budget: + for d_content in batch: + budget_bytes = sum( + d_artifact.artifact.size or 0 + for d_artifact in d_content.d_artifacts + if not d_artifact.deferred_download + ) + self.resource_budget.release(budget_bytes) + for d_content in batch: await self.put(d_content) diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 18022e9ab38..22cd54173ca 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -6,6 +6,7 @@ from pulpcore.plugin.stages.artifact_stages import ( ACSArtifactHandler, ArtifactDownloader, + ArtifactResourceBudget, ArtifactSaver, QueryExistingArtifacts, RemoteArtifactSaver, @@ -129,6 +130,8 @@ def pipeline_stages(self, new_version): list: List of [pulpcore.plugin.stages.Stage][] instances """ + resource_budget = ArtifactResourceBudget.from_settings() + pipeline = [ self.first_stage, QueryExistingArtifacts(), @@ -137,8 +140,8 @@ def pipeline_stages(self, new_version): pipeline.append(ACSArtifactHandler()) pipeline.extend( [ - ArtifactDownloader(), - ArtifactSaver(), + ArtifactDownloader(resource_budget=resource_budget), + ArtifactSaver(resource_budget=resource_budget), QueryExistingContents(), ContentSaver(), RemoteArtifactSaver(), diff --git a/pulpcore/tests/unit/stages/test_resource_budget.py b/pulpcore/tests/unit/stages/test_resource_budget.py new file mode 100644 index 00000000000..f53caf12641 --- /dev/null +++ b/pulpcore/tests/unit/stages/test_resource_budget.py @@ -0,0 +1,216 @@ +import asyncio + +import pytest + +from pulpcore.plugin.stages import ArtifactResourceBudget + + +class TestAcquireRelease: + """Basic acquire/release semantics.""" + + @pytest.mark.asyncio + async def test_acquire_and_release_items(self): + budget = ArtifactResourceBudget(max_items=3) + await budget.acquire(0) + await budget.acquire(0) + await budget.acquire(0) + assert budget._current_items == 3 + budget.release(0) + budget.release(0) + budget.release(0) + assert budget._current_items == 0 + + @pytest.mark.asyncio + async def test_acquire_and_release_bytes(self): + budget = ArtifactResourceBudget(max_bytes=1000) + await budget.acquire(400) + await budget.acquire(400) + assert budget._current_bytes == 800 + budget.release(400) + assert budget._current_bytes == 400 + budget.release(400) + assert budget._current_bytes == 0 + + @pytest.mark.asyncio + async def test_release_does_not_go_negative(self): + budget = ArtifactResourceBudget(max_bytes=100, max_items=2) + budget.release(500) + assert budget._current_bytes == 0 + assert budget._current_items == 0 + + +class TestBlocking: + """Acquire blocks when budget is exhausted.""" + + @pytest.mark.asyncio + async def test_blocks_on_item_limit(self): + budget = ArtifactResourceBudget(max_items=1) + await budget.acquire(0) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(0) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "acquire should block when at item limit" + + budget.release(0) + await asyncio.sleep(0.05) + assert acquired.is_set(), "acquire should unblock after release" + task.cancel() + + @pytest.mark.asyncio + async def test_blocks_on_byte_limit(self): + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(80) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(50) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "acquire should block when bytes would exceed limit" + + budget.release(80) + await asyncio.sleep(0.05) + assert acquired.is_set(), "acquire should unblock after release" + task.cancel() + + @pytest.mark.asyncio + async def test_blocks_on_both_limits(self): + """When both limits are set, both must be satisfied.""" + budget = ArtifactResourceBudget(max_bytes=1000, max_items=1) + await budget.acquire(100) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(100) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set() + + budget.release(100) + await asyncio.sleep(0.05) + assert acquired.is_set() + task.cancel() + + +class TestDeadlockPrevention: + """The _current_items == 0 guard prevents deadlock.""" + + @pytest.mark.asyncio + async def test_allows_oversized_item_when_empty(self): + """A single item exceeding max_bytes is allowed when nothing is in flight.""" + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(500) # Should not block + assert budget._current_bytes == 500 + assert budget._current_items == 1 + + @pytest.mark.asyncio + async def test_allows_item_over_item_limit_when_empty(self): + """Even max_items=0 (if someone set it) doesn't block when nothing is in flight.""" + budget = ArtifactResourceBudget(max_items=0) + # This would deadlock without the guard -- it should return immediately + await budget.acquire(0) + assert budget._current_items == 1 + + @pytest.mark.asyncio + async def test_second_oversized_item_blocks(self): + """After allowing one oversized item through, the next must wait.""" + budget = ArtifactResourceBudget(max_bytes=100) + await budget.acquire(500) + + acquired = asyncio.Event() + + async def delayed_acquire(): + await budget.acquire(50) + acquired.set() + + task = asyncio.ensure_future(delayed_acquire()) + await asyncio.sleep(0.05) + assert not acquired.is_set(), "second item should block while oversized item is in flight" + + budget.release(500) + await asyncio.sleep(0.05) + assert acquired.is_set() + task.cancel() + + +class TestPressureEvent: + """The pressure event signals downstream stages to flush.""" + + @pytest.mark.asyncio + async def test_pressure_set_when_blocked(self): + budget = ArtifactResourceBudget(max_items=1) + assert not budget.pressure.is_set() + await budget.acquire(0) + + async def try_acquire(): + await budget.acquire(0) + + task = asyncio.ensure_future(try_acquire()) + await asyncio.sleep(0.05) + assert budget.pressure.is_set(), "pressure should be set when acquire blocks" + + budget.release(0) + await asyncio.sleep(0.05) + assert not budget.pressure.is_set(), "pressure should clear after release" + task.cancel() + + @pytest.mark.asyncio + async def test_pressure_not_set_when_budget_available(self): + budget = ArtifactResourceBudget(max_items=5) + await budget.acquire(0) + assert not budget.pressure.is_set() + await budget.acquire(0) + assert not budget.pressure.is_set() + + +class TestNoLimits: + """When max_bytes and max_items are both None, acquire never blocks.""" + + @pytest.mark.asyncio + async def test_unlimited_acquires(self): + budget = ArtifactResourceBudget(max_bytes=None, max_items=None) + for i in range(100): + await budget.acquire(1_000_000) + assert budget._current_items == 100 + assert budget._current_bytes == 100_000_000 + + +class TestConcurrentAcquireRelease: + """Multiple concurrent acquires and releases behave correctly.""" + + @pytest.mark.asyncio + async def test_concurrent_producers_and_consumer(self): + """Simulate multiple downloaders acquiring and a saver releasing.""" + budget = ArtifactResourceBudget(max_bytes=500, max_items=5) + completed = [] + + async def producer(item_id, size): + await budget.acquire(size) + await asyncio.sleep(0.01) # simulate download + completed.append(item_id) + return size + + async def consumer(): + """Release budget periodically, simulating ArtifactSaver.""" + while len(completed) < 10: + await asyncio.sleep(0.02) + if budget._current_items > 0: + budget.release(100) + + consumer_task = asyncio.ensure_future(consumer()) + producer_tasks = [asyncio.ensure_future(producer(i, 100)) for i in range(10)] + + await asyncio.gather(*producer_tasks, consumer_task) + assert len(completed) == 10 diff --git a/pulpcore/tests/unit/stages/test_stages.py b/pulpcore/tests/unit/stages/test_stages.py index e96fbbbfb54..230972eee8a 100644 --- a/pulpcore/tests/unit/stages/test_stages.py +++ b/pulpcore/tests/unit/stages/test_stages.py @@ -154,3 +154,93 @@ async def test_batch_queue_and_min_sizes(): last_stage._connect(queues[1], queues[2]) end_stage._connect(queues[2], None) await asyncio.gather(last_stage(), middle_stage(), first_stage(), end_stage()) + + +@pytest.mark.asyncio +async def test_flush_event_yields_early(stage, in_q): + """A flush_event causes batches() to yield before minsize is reached.""" + flush = asyncio.Event() + c1 = mock.Mock() + in_q.put_nowait(c1) + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # The single item is not enough to reach minsize=100, so the batch won't yield yet. + # Set the flush event to force an early yield. + flush.set() + + batch = await batch_it.__anext__() + assert batch == [c1] + + in_q.put_nowait(None) + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_no_spin_on_empty_batch(stage, in_q): + """When flush_event is set but the batch is empty, batches() must not spin.""" + flush = asyncio.Event() + flush.set() # Set before any items arrive + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # Put an item after a short delay -- if there were a spin loop, + # the test would hang or burn CPU before we get here. + async def put_later(): + await asyncio.sleep(0.05) + in_q.put_nowait(mock.Mock()) + in_q.put_nowait(None) + + asyncio.ensure_future(put_later()) + + batch = await batch_it.__anext__() + assert len(batch) == 1 + + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_rearms_after_yield(stage, in_q): + """After yielding a flush-triggered batch, the flush_event is re-armed.""" + flush = asyncio.Event() + c1 = mock.Mock() + c2 = mock.Mock() + + in_q.put_nowait(c1) + + batch_it = stage.batches(minsize=100, flush_event=flush) + + # First flush-triggered yield + flush.set() + batch = await batch_it.__anext__() + assert batch == [c1] + + # Clear and re-set to trigger another early yield + flush.clear() + in_q.put_nowait(c2) + flush.set() + batch = await batch_it.__anext__() + assert batch == [c2] + + in_q.put_nowait(None) + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() + + +@pytest.mark.asyncio +async def test_flush_event_not_needed_when_minsize_met(stage, in_q): + """Batches still yield normally when minsize is met, even without flush.""" + flush = asyncio.Event() # Never set + + for _ in range(5): + in_q.put_nowait(mock.Mock()) + in_q.put_nowait(None) + + batch_it = stage.batches(minsize=3, flush_event=flush) + batch = await batch_it.__anext__() + assert len(batch) >= 3 + + with pytest.raises(StopAsyncIteration): + await batch_it.__anext__() From 7ba48f8e4f23442e1d3084c6be3cd36d276c4128 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Tue, 7 Apr 2026 09:55:43 -0400 Subject: [PATCH 2/2] temp --- pulpcore/plugin/stages/artifact_stages.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index 364de478612..06756c1de16 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -55,10 +55,17 @@ def from_settings(cls): """Create an `ArtifactResourceBudget` from Django settings, or return `None`. Reads `SYNC_MAX_IN_FLIGHT_MB` and `SYNC_MAX_IN_FLIGHT_ITEMS` from settings. - Returns `None` if neither setting is configured. + Falls back to the deprecated `MAX_CONCURRENT_CONTENT` for `max_items` if the + user set it and `SYNC_MAX_IN_FLIGHT_ITEMS` is not configured. + Returns `None` if no settings are configured. """ max_mb = settings.SYNC_MAX_IN_FLIGHT_MB max_items = settings.SYNC_MAX_IN_FLIGHT_ITEMS + + # Backward compatibility: honor deprecated MAX_CONCURRENT_CONTENT + if max_items is None: + max_items = settings.MAX_CONCURRENT_CONTENT + if max_mb is None and max_items is None: return None return cls(