From f1e01235c23ed26ebdea175d1e7343fc11c13a12 Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Wed, 20 May 2026 21:15:04 -0400 Subject: [PATCH 1/7] enable bounded-borrow task admission Signed-off-by: Eric W. Tramel --- .../dataset_builders/async_scheduler.py | 2 + .../scheduling/task_policies.py | 136 +++++-- .../scheduling/test_task_admission.py | 49 +++ .../dataset_builders/test_async_scheduler.py | 18 + .../benchmark_bounded_borrow_admission.py | 377 ++++++++++++++++++ 5 files changed, 553 insertions(+), 29 deletions(-) create mode 100644 scripts/benchmarks/benchmark_bounded_borrow_admission.py diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index 9109eafcc..adad6bc0c 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -44,6 +44,7 @@ TaskAdmissionLease, ) from data_designer.engine.dataset_builders.scheduling.task_model import SliceRef, Task, TaskTrace +from data_designer.engine.dataset_builders.scheduling.task_policies import BoundedBorrowTaskAdmissionPolicyConfig from data_designer.engine.dataset_builders.utils.async_progress_reporter import ( DEFAULT_REPORT_INTERVAL, AsyncProgressReporter, @@ -185,6 +186,7 @@ def __init__( admission_config = task_admission_config or TaskAdmissionConfig( submission_capacity=max_submitted_tasks, resource_limits={"llm_wait": max_model_task_admission, "local": max_submitted_tasks}, + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(), ) self._task_admission = TaskAdmissionController(admission_config) self._task_admission_config = admission_config diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py index 011e4e703..9950ffbbf 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py @@ -3,6 +3,7 @@ from __future__ import annotations +import math from collections.abc import Mapping from dataclasses import dataclass, field from typing import TYPE_CHECKING, Literal, Protocol @@ -12,6 +13,7 @@ SchedulableTask, SchedulerResourceKey, TaskGroupKey, + TaskGroupSpec, ) if TYPE_CHECKING: @@ -27,6 +29,7 @@ "shutdown", "policy_denial", ] +DEFAULT_BOUNDED_BORROW_CEILING = 1 @dataclass(frozen=True) @@ -41,10 +44,17 @@ class BoundedBorrowTaskAdmissionPolicyConfig: borrow_ceiling_by_group_resource: Mapping[tuple[TaskGroupKey, SchedulerResourceKey], int] = field( default_factory=dict ) - default_borrow_ceiling: int = 0 + default_borrow_ceiling: int = DEFAULT_BOUNDED_BORROW_CEILING strict_share_rounding: Literal["floor", "ceil"] = "floor" repay_on_withheld_peer_pressure: bool = True + def __post_init__(self) -> None: + if self.default_borrow_ceiling < 0: + raise ValueError("default_borrow_ceiling must be non-negative.") + for key, ceiling in self.borrow_ceiling_by_group_resource.items(): + if ceiling < 0: + raise ValueError(f"Borrow ceiling for {key!r} must be non-negative.") + @dataclass(frozen=True) class TaskAdmissionPolicyDecision: @@ -127,51 +137,73 @@ def evaluate( queue_view: QueueView, admission_view: TaskAdmissionView, ) -> TaskAdmissionPolicyDecision: - limit = item.group.admitted_limit - if limit is None: - return TaskAdmissionPolicyDecision(allowed=True) - - leased_count = admission_view.running_counts_by_group.get(item.group.key, 0) - if leased_count < limit: + if item.group.admitted_limit is None: return TaskAdmissionPolicyDecision(allowed=True) pressure_resources = _queued_peer_pressure_resources(item, queue_view, admission_view) - if pressure_resources: - for resource in pressure_resources: - debt_key = (item.group.key, resource) - debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0) + borrow_resources: list[tuple[SchedulerResourceKey, int]] = [] + diagnostics_by_resource: dict[SchedulerResourceKey, dict[str, int]] = {} + for resource, amount in item.resource_request.amounts.items(): + admitted = admission_view.leased_resources_by_group.get(item.group.key, {}).get(resource, 0) + strict_share = _strict_share(item, resource, queue_view, admission_view, self._config.strict_share_rounding) + projected = admitted + amount + debt_key = (item.group.key, resource) + debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0) + diagnostics_by_resource[resource] = { + "admitted": admitted, + "requested": amount, + "strict_share": strict_share, + "debt": debt, + } + + if resource in pressure_resources: if debt > 0: return TaskAdmissionPolicyDecision( allowed=False, reason="borrow_debt", - diagnostics={"resource": resource, "debt": debt}, + diagnostics={"resource": resource, "debt": debt, "strict_share": strict_share}, ) - return TaskAdmissionPolicyDecision( - allowed=False, - reason="group_cap", - diagnostics={ - "admitted_limit": limit, - "leased_count": leased_count, - "pressure_resources": pressure_resources, - }, - ) + if projected > strict_share: + return TaskAdmissionPolicyDecision( + allowed=False, + reason="group_cap", + diagnostics={ + "resource": resource, + "admitted": admitted, + "requested": amount, + "strict_share": strict_share, + "pressure_resources": pressure_resources, + }, + ) + continue - borrow_resources: list[tuple[SchedulerResourceKey, int]] = [] - for resource, amount in item.resource_request.amounts.items(): - debt_key = (item.group.key, resource) - debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0) + if projected <= strict_share: + continue + + new_debt = projected - strict_share ceiling = self._config.borrow_ceiling_by_group_resource.get( debt_key, self._config.default_borrow_ceiling, ) - if debt + amount > ceiling: + diagnostics_by_resource[resource]["ceiling"] = ceiling + if debt + new_debt > ceiling: return TaskAdmissionPolicyDecision( allowed=False, reason="borrow_debt", - diagnostics={"resource": resource, "debt": debt, "requested": amount, "ceiling": ceiling}, + diagnostics={ + "resource": resource, + "debt": debt, + "requested": amount, + "new_debt": new_debt, + "ceiling": ceiling, + "strict_share": strict_share, + }, ) - borrow_resources.append((resource, amount)) - return TaskAdmissionPolicyDecision(allowed=True, diagnostics={"borrow_resources": tuple(borrow_resources)}) + borrow_resources.append((resource, new_debt)) + return TaskAdmissionPolicyDecision( + allowed=True, + diagnostics={"borrow_resources": tuple(borrow_resources), "strict_share": diagnostics_by_resource}, + ) def on_acquire( self, @@ -215,6 +247,52 @@ def _queued_peer_pressure_resources( return tuple(pressure_resources) +def _strict_share( + item: SchedulableTask, + resource: SchedulerResourceKey, + queue_view: QueueView, + admission_view: TaskAdmissionView, + rounding: Literal["floor", "ceil"], +) -> int: + resource_limit = admission_view.resource_limits.get(resource, 0) + if resource_limit <= 1: + return 1 + + candidate_groups = _competing_group_specs(item, resource, queue_view, admission_view) + group_weight = max(1.0, item.group.weight) + if len(candidate_groups) <= 1: + total_weight = group_weight * 2 + else: + total_weight = sum(max(1.0, group.weight) for group in candidate_groups.values()) + raw_share = resource_limit * group_weight / total_weight + if rounding == "ceil": + rounded_share = math.ceil(raw_share) + else: + rounded_share = math.floor(raw_share) + strict_share = max(1, rounded_share) + if item.group.admitted_limit is not None: + strict_share = min(strict_share, item.group.admitted_limit) + return min(resource_limit, strict_share) + + +def _competing_group_specs( + item: SchedulableTask, + resource: SchedulerResourceKey, + queue_view: QueueView, + admission_view: TaskAdmissionView, +) -> dict[TaskGroupKey, TaskGroupSpec]: + groups: dict[TaskGroupKey, TaskGroupSpec] = {item.group.key: item.group} + for group_key, peer_resources in queue_view.first_candidate_resources_by_group.items(): + if peer_resources.get(resource, 0) <= 0: + continue + if not _is_hard_resource_eligible(peer_resources, admission_view): + continue + group = queue_view.first_candidate_group_specs_by_group.get(group_key) + if group is not None: + groups[group_key] = group + return groups + + def _fair_pressure_resources( resources: Mapping[SchedulerResourceKey, int], ) -> tuple[SchedulerResourceKey, ...]: diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py index fbb2fd469..c85c78674 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py @@ -228,6 +228,31 @@ def test_bounded_borrow_limits_solo_group_borrow_debt() -> None: assert controller.view().policy_debt_by_group_resource[(group.key, "submission")] == 1 +def test_bounded_borrow_prevents_solo_heavy_group_from_consuming_all_typed_capacity() -> None: + group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "hot")), weight=4.0, admitted_limit=4) + controller = TaskAdmissionController( + TaskAdmissionConfig( + submission_capacity=4, + resource_limits={"llm_wait": 4}, + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=1), + ) + ) + items = [_item("hot", row, group=group, resources={"submission": 1, "llm_wait": 1}) for row in range(4)] + first = controller.try_acquire(items[0], _queue_view(*items)) + second = controller.try_acquire(items[1], _queue_view(*items[1:])) + third = controller.try_acquire(items[2], _queue_view(*items[2:])) + assert isinstance(first, TaskAdmissionLease) + assert isinstance(second, TaskAdmissionLease) + assert isinstance(third, TaskAdmissionLease) + + denied = controller.try_acquire(items[3], _queue_view(items[3])) + + assert isinstance(denied, TaskAdmissionDenied) + assert denied.reason == "borrow_debt" + assert controller.view().resources_available["llm_wait"] == 1 + assert controller.view().policy_debt_by_group_resource[(group.key, "llm_wait")] == 1 + + def test_bounded_borrow_debt_blocks_under_peer_pressure_and_releases() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) controller = TaskAdmissionController( @@ -253,6 +278,30 @@ def test_bounded_borrow_debt_blocks_under_peer_pressure_and_releases() -> None: assert (group.key, "submission") not in controller.view().policy_debt_by_group_resource +def test_bounded_borrow_debt_yields_queue_selection_to_new_peer() -> None: + group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "hot")), weight=4.0, admitted_limit=4) + peer_group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "peer")), weight=1.0, admitted_limit=1) + controller = TaskAdmissionController( + TaskAdmissionConfig( + submission_capacity=4, + resource_limits={"llm_wait": 4}, + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=1), + ) + ) + hot_items = [_item("hot", row, group=group, resources={"submission": 1, "llm_wait": 1}) for row in range(4)] + for index in range(3): + lease = controller.try_acquire(hot_items[index], _queue_view(*hot_items[index:])) + assert isinstance(lease, TaskAdmissionLease) + peer = _item("peer", 0, group=peer_group, resources={"submission": 1, "llm_wait": 1}) + queue = FairTaskQueue() + queue.enqueue((hot_items[3], peer)) + + selection = queue.select_next(controller.is_eligible) + + assert selection is not None + assert selection.item.task_id == peer.task_id + + def test_bounded_borrow_release_repayment_is_group_level() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) controller = TaskAdmissionController( diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index 41191c609..2a78bcc98 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -1840,6 +1840,24 @@ async def test_scheduler_llm_bound_one_way_handoff() -> None: assert snapshot.resources_available["llm_wait"] == max_llm_wait +def test_scheduler_default_task_admission_uses_bounded_borrow_policy() -> None: + provider = _mock_provider() + configs = [SamplerColumnConfig(name="seed", sampler_type=SamplerType.CATEGORY, params={"values": ["A"]})] + strategies = {"seed": GenerationStrategy.FULL_COLUMN} + generators = {"seed": MockSeedGenerator(config=_expr_config("seed"), resource_provider=provider)} + graph = ExecutionGraph.create(configs, strategies) + row_groups = [(0, 1)] + + scheduler = AsyncTaskScheduler( + generators=generators, + graph=graph, + tracker=CompletionTracker.with_graph(graph, row_groups), + row_groups=row_groups, + ) + + assert scheduler._task_admission_config.bounded_borrow is not None + + @pytest.mark.asyncio(loop_scope="session") async def test_scheduler_non_llm_holds_submission_slot() -> None: """Non-LLM generators hold the submission slot for the entire execution (no handoff).""" diff --git a/scripts/benchmarks/benchmark_bounded_borrow_admission.py b/scripts/benchmarks/benchmark_bounded_borrow_admission.py new file mode 100644 index 000000000..a6c8a1406 --- /dev/null +++ b/scripts/benchmarks/benchmark_bounded_borrow_admission.py @@ -0,0 +1,377 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import argparse +import heapq +import json +import platform +import subprocess +import time +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Literal + +from data_designer.engine.dataset_builders.scheduling.queue import FairTaskQueue +from data_designer.engine.dataset_builders.scheduling.resources import ( + SchedulableTask, + SchedulerResourceRequest, + TaskGroupKey, + TaskGroupSpec, + stable_task_id, +) +from data_designer.engine.dataset_builders.scheduling.task_admission import ( + TaskAdmissionConfig, + TaskAdmissionController, + TaskAdmissionLease, +) +from data_designer.engine.dataset_builders.scheduling.task_model import Task +from data_designer.engine.dataset_builders.scheduling.task_policies import BoundedBorrowTaskAdmissionPolicyConfig + +PolicyName = Literal["strict", "bounded"] +ScenarioName = Literal["heavy_root_peer_arrival", "neutral_ready_at_start"] + + +@dataclass(frozen=True) +class BenchmarkTask: + name: str + group_name: str + ready_at: float + duration: float + item: SchedulableTask + + +@dataclass(frozen=True) +class TaskRecord: + name: str + group_name: str + ready_at: float + dispatch_at: float + completed_at: float + + @property + def wait_seconds(self) -> float: + return self.dispatch_at - self.ready_at + + +@dataclass(frozen=True) +class ScenarioConfig: + name: ScenarioName + capacity: int + hot_task_count: int + peer_task_count: int + hot_ready_at: float + peer_ready_at: float + hot_duration: float + peer_duration: float + hot_weight: float + peer_weight: float + borrow_ceiling: int + + +@dataclass(frozen=True) +class ScenarioResult: + policy: PolicyName + scenario: ScenarioName + capacity: int + task_count: int + wall_time_seconds: float + utilization_ratio: float + hot_dispatch_count_before_peer_ready: int + peer_first_wait_seconds: float + peer_wait_mean_seconds: float + peer_wait_p50_seconds: float + peer_wait_p95_seconds: float + peer_wait_max_seconds: float + final_zero_task_leases: bool + + +@dataclass(frozen=True) +class BenchmarkReport: + created_at: str + git_sha: str + python: str + platform: str + scenarios: list[dict[str, object]] + comparisons: list[dict[str, object]] + + +def main() -> None: + parser = argparse.ArgumentParser(description="Benchmark strict fair vs bounded-borrow task admission.") + parser.add_argument( + "--output-dir", + type=Path, + default=Path(".scratch") / "bounded-borrow-admission", + help="Directory where JSON and Markdown reports are written.", + ) + args = parser.parse_args() + + output_dir = args.output_dir / time.strftime("%Y%m%d-%H%M%S") + output_dir.mkdir(parents=True, exist_ok=True) + + configs = ( + ScenarioConfig( + name="heavy_root_peer_arrival", + capacity=8, + hot_task_count=512, + peer_task_count=256, + hot_ready_at=0.0, + peer_ready_at=0.05, + hot_duration=0.5, + peer_duration=0.05, + hot_weight=4.0, + peer_weight=1.0, + borrow_ceiling=1, + ), + ScenarioConfig( + name="neutral_ready_at_start", + capacity=8, + hot_task_count=256, + peer_task_count=256, + hot_ready_at=0.0, + peer_ready_at=0.0, + hot_duration=0.1, + peer_duration=0.1, + hot_weight=1.0, + peer_weight=1.0, + borrow_ceiling=1, + ), + ) + + results = [run_scenario(config, policy) for config in configs for policy in ("strict", "bounded")] + comparisons = [_compare_results(config.name, results) for config in configs] + report = BenchmarkReport( + created_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + git_sha=_git_sha(), + python=platform.python_version(), + platform=platform.platform(), + scenarios=[asdict(result) for result in results], + comparisons=comparisons, + ) + + json_path = output_dir / "bounded_borrow_admission_benchmark.json" + markdown_path = output_dir / "bounded_borrow_admission_benchmark.md" + json_path.write_text(json.dumps(asdict(report), indent=2, sort_keys=True) + "\n", encoding="utf-8") + markdown_path.write_text(_markdown_report(report), encoding="utf-8") + print(f"Wrote {json_path}") + print(f"Wrote {markdown_path}") + + +def run_scenario(config: ScenarioConfig, policy: PolicyName) -> ScenarioResult: + tasks = _scenario_tasks(config) + pending = sorted(tasks, key=lambda task: (task.ready_at, task.name)) + queue = FairTaskQueue() + controller = _controller(config, policy) + now = 0.0 + records: list[TaskRecord] = [] + running: list[tuple[float, int, BenchmarkTask, TaskAdmissionLease]] = [] + sequence = 0 + + while pending or running or queue.has_queued_tasks: + while pending and pending[0].ready_at <= now: + ready = [] + while pending and pending[0].ready_at <= now: + ready.append(pending.pop(0)) + queue.enqueue(task.item for task in ready) + + dispatched = False + while queue.has_queued_tasks: + selection = queue.select_next(controller.is_eligible) + if selection is None: + break + decision = controller.try_acquire(selection.item, selection.queue_view) + if not isinstance(decision, TaskAdmissionLease): + break + committed = queue.commit(selection) + if committed is None: + controller.release(decision) + break + task = next(task for task in tasks if task.item.task_id == committed.task_id) + sequence += 1 + completed_at = now + task.duration + heapq.heappush(running, (completed_at, sequence, task, decision)) + records.append( + TaskRecord( + name=task.name, + group_name=task.group_name, + ready_at=task.ready_at, + dispatch_at=now, + completed_at=completed_at, + ) + ) + dispatched = True + + if dispatched: + continue + + next_ready_at = pending[0].ready_at if pending else None + next_completion_at = running[0][0] if running else None + next_times = [value for value in (next_ready_at, next_completion_at) if value is not None] + if not next_times: + break + now = min(next_times) + while running and running[0][0] <= now: + _completed_at, _sequence, _task, lease = heapq.heappop(running) + controller.release(lease) + + total_busy_seconds = sum(task.duration for task in tasks) + wall_time = max((record.completed_at for record in records), default=0.0) + peer_records = [record for record in records if record.group_name == "peer"] + peer_waits = [record.wait_seconds for record in peer_records] + hot_before_peer = sum( + 1 for record in records if record.group_name == "hot" and record.dispatch_at < config.peer_ready_at + ) + return ScenarioResult( + policy=policy, + scenario=config.name, + capacity=config.capacity, + task_count=len(tasks), + wall_time_seconds=wall_time, + utilization_ratio=total_busy_seconds / (wall_time * config.capacity) if wall_time else 0.0, + hot_dispatch_count_before_peer_ready=hot_before_peer, + peer_first_wait_seconds=peer_waits[0] if peer_waits else 0.0, + peer_wait_mean_seconds=sum(peer_waits) / len(peer_waits) if peer_waits else 0.0, + peer_wait_p50_seconds=_percentile(peer_waits, 0.50), + peer_wait_p95_seconds=_percentile(peer_waits, 0.95), + peer_wait_max_seconds=max(peer_waits, default=0.0), + final_zero_task_leases=not controller.view().leased_resources, + ) + + +def _scenario_tasks(config: ScenarioConfig) -> list[BenchmarkTask]: + hot_group = TaskGroupSpec( + TaskGroupKey(kind="model", identity=("benchmark", "hot")), + weight=config.hot_weight, + admitted_limit=config.capacity, + ) + peer_group = TaskGroupSpec( + TaskGroupKey(kind="model", identity=("benchmark", "peer")), + weight=config.peer_weight, + admitted_limit=config.capacity, + ) + return [ + *_tasks_for_group("hot", hot_group, config.hot_task_count, config.hot_ready_at, config.hot_duration), + *_tasks_for_group("peer", peer_group, config.peer_task_count, config.peer_ready_at, config.peer_duration), + ] + + +def _tasks_for_group( + group_name: str, + group: TaskGroupSpec, + count: int, + ready_at: float, + duration: float, +) -> list[BenchmarkTask]: + tasks = [] + for index in range(count): + task = Task(column=group_name, row_group=0, row_index=index, task_type="cell") + item = SchedulableTask( + task_id=stable_task_id(task), + payload=task, + group=group, + resource_request=SchedulerResourceRequest({"submission": 1, "llm_wait": 1}), + ) + tasks.append( + BenchmarkTask( + name=f"{group_name}-{index}", + group_name=group_name, + ready_at=ready_at, + duration=duration, + item=item, + ) + ) + return tasks + + +def _controller(config: ScenarioConfig, policy: PolicyName) -> TaskAdmissionController: + bounded_borrow = ( + BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=config.borrow_ceiling) + if policy == "bounded" + else None + ) + return TaskAdmissionController( + TaskAdmissionConfig( + submission_capacity=config.capacity, + resource_limits={"llm_wait": config.capacity}, + bounded_borrow=bounded_borrow, + ) + ) + + +def _compare_results(scenario: ScenarioName, results: list[ScenarioResult]) -> dict[str, object]: + by_policy = {result.policy: result for result in results if result.scenario == scenario} + strict = by_policy["strict"] + bounded = by_policy["bounded"] + return { + "scenario": scenario, + "peer_p95_wait_delta_seconds": bounded.peer_wait_p95_seconds - strict.peer_wait_p95_seconds, + "peer_p95_wait_reduction_ratio": _reduction_ratio(strict.peer_wait_p95_seconds, bounded.peer_wait_p95_seconds), + "peer_first_wait_delta_seconds": bounded.peer_first_wait_seconds - strict.peer_first_wait_seconds, + "wall_time_delta_seconds": bounded.wall_time_seconds - strict.wall_time_seconds, + "utilization_delta": bounded.utilization_ratio - strict.utilization_ratio, + "strict_hot_dispatch_before_peer_ready": strict.hot_dispatch_count_before_peer_ready, + "bounded_hot_dispatch_before_peer_ready": bounded.hot_dispatch_count_before_peer_ready, + } + + +def _percentile(values: list[float], quantile: float) -> float: + if not values: + return 0.0 + ordered = sorted(values) + index = min(len(ordered) - 1, max(0, int(round((len(ordered) - 1) * quantile)))) + return ordered[index] + + +def _reduction_ratio(strict_value: float, bounded_value: float) -> float: + if strict_value == 0.0: + return 0.0 + return (strict_value - bounded_value) / strict_value + + +def _git_sha() -> str: + try: + return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() + except Exception: + return "unknown" + + +def _markdown_report(report: BenchmarkReport) -> str: + lines = [ + "# Bounded Borrow Admission Benchmark", + "", + f"- Git SHA: `{report.git_sha}`", + f"- Python: `{report.python}`", + f"- Platform: `{report.platform}`", + "", + "## Scenario Results", + "", + "| Scenario | Policy | Tasks | Wall time (s) | Utilization | Hot dispatches before peer ready | Peer wait p95 (s) | Peer first wait (s) |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", + ] + for scenario in report.scenarios: + lines.append( + "| {scenario} | {policy} | {task_count} | {wall_time_seconds:.3f} | {utilization_ratio:.3f} | " + "{hot_dispatch_count_before_peer_ready} | {peer_wait_p95_seconds:.3f} | " + "{peer_first_wait_seconds:.3f} |".format(**scenario) + ) + lines.extend( + [ + "", + "## Comparisons", + "", + "| Scenario | Peer p95 wait reduction | Peer first wait delta (s) | Wall time delta (s) | Utilization delta |", + "| --- | ---: | ---: | ---: | ---: |", + ] + ) + for comparison in report.comparisons: + lines.append( + "| {scenario} | {peer_p95_wait_reduction_ratio:.1%} | {peer_first_wait_delta_seconds:.3f} | " + "{wall_time_delta_seconds:.3f} | {utilization_delta:.3f} |".format(**comparison) + ) + lines.append("") + return "\n".join(lines) + + +if __name__ == "__main__": + main() From 8cc515bbc4b3c0345b1d91f3938547fbc1522efb Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Wed, 20 May 2026 21:41:49 -0400 Subject: [PATCH 2/7] tune bounded-borrow strict-share rounding Signed-off-by: Eric W. Tramel --- .../dataset_builders/scheduling/task_policies.py | 2 +- .../scheduling/test_task_admission.py | 15 ++++++++++++--- .../scheduling/test_task_policies.py | 13 ++++++++++++- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py index 9950ffbbf..27aa88464 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py @@ -45,7 +45,7 @@ class BoundedBorrowTaskAdmissionPolicyConfig: default_factory=dict ) default_borrow_ceiling: int = DEFAULT_BOUNDED_BORROW_CEILING - strict_share_rounding: Literal["floor", "ceil"] = "floor" + strict_share_rounding: Literal["floor", "ceil"] = "ceil" repay_on_withheld_peer_pressure: bool = True def __post_init__(self) -> None: diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py index c85c78674..da756e6bf 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py @@ -210,7 +210,10 @@ def test_bounded_borrow_limits_solo_group_borrow_debt() -> None: controller = TaskAdmissionController( TaskAdmissionConfig( submission_capacity=3, - bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=1), + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig( + default_borrow_ceiling=1, + strict_share_rounding="floor", + ), ) ) first = _item("a", 0, group=group) @@ -258,7 +261,10 @@ def test_bounded_borrow_debt_blocks_under_peer_pressure_and_releases() -> None: controller = TaskAdmissionController( TaskAdmissionConfig( submission_capacity=3, - bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=1), + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig( + default_borrow_ceiling=1, + strict_share_rounding="floor", + ), ) ) first = _item("a", 0, group=group) @@ -307,7 +313,10 @@ def test_bounded_borrow_release_repayment_is_group_level() -> None: controller = TaskAdmissionController( TaskAdmissionConfig( submission_capacity=3, - bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=1), + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig( + default_borrow_ceiling=1, + strict_share_rounding="floor", + ), ) ) first = _item("a", 0, group=group) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py index 286fdee96..b5e1f9eb5 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py @@ -88,7 +88,12 @@ def test_strict_fair_policy_denies_capped_group_with_peer_pressure() -> None: def test_bounded_borrow_policy_records_borrow_without_peer_pressure() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) item = _item("a", group) - policy = BoundedBorrowTaskAdmissionPolicy(BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=1)) + policy = BoundedBorrowTaskAdmissionPolicy( + BoundedBorrowTaskAdmissionPolicyConfig( + default_borrow_ceiling=1, + strict_share_rounding="floor", + ) + ) decision = policy.evaluate(item, _queue_view(item), _admission_view(running_group=group.key)) delta = policy.on_acquire(_lease(item), decision) @@ -97,6 +102,12 @@ def test_bounded_borrow_policy_records_borrow_without_peer_pressure() -> None: assert delta.debt_changes == {(group.key, "submission"): 1} +def test_bounded_borrow_policy_defaults_to_ceil_strict_share_rounding() -> None: + config = BoundedBorrowTaskAdmissionPolicyConfig() + + assert config.strict_share_rounding == "ceil" + + def test_bounded_borrow_policy_denies_existing_debt_under_peer_pressure() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) peer_group = TaskGroupSpec(TaskGroupKey(kind="local", identity=("peer",))) From 7274b8ff01eee31316d2d4a1989aebbf100e1b31 Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Wed, 20 May 2026 21:48:42 -0400 Subject: [PATCH 3/7] derive bounded-borrow ceiling from capacity reserve Signed-off-by: Eric W. Tramel --- .../scheduling/task_policies.py | 65 ++++++++++++++++--- .../scheduling/test_task_admission.py | 24 +++++++ .../scheduling/test_task_policies.py | 3 + .../benchmark_bounded_borrow_admission.py | 18 ++--- 4 files changed, 94 insertions(+), 16 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py index 27aa88464..d1e8ad66e 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py @@ -29,7 +29,8 @@ "shutdown", "policy_denial", ] -DEFAULT_BOUNDED_BORROW_CEILING = 1 +DEFAULT_DYNAMIC_BORROW_RESERVE_FRACTION = 0.125 +DEFAULT_DYNAMIC_BORROW_MAX_RESERVED_SLOTS = 8 @dataclass(frozen=True) @@ -38,19 +39,28 @@ class BoundedBorrowTaskAdmissionPolicyConfig: Borrow debt is tracked by task group and scheduler resource. Any completed lease in the same group repays debt for the released resources; repayment is - not tied to the specific lease that originally borrowed. + not tied to the specific lease that originally borrowed. When no explicit + borrow ceiling is configured, the policy reserves one slot per eight + resource slots, capped at eight reserved slots, and lets solo groups borrow + up to the remaining capacity. """ borrow_ceiling_by_group_resource: Mapping[tuple[TaskGroupKey, SchedulerResourceKey], int] = field( default_factory=dict ) - default_borrow_ceiling: int = DEFAULT_BOUNDED_BORROW_CEILING + default_borrow_ceiling: int | None = None + dynamic_borrow_reserve_fraction: float = DEFAULT_DYNAMIC_BORROW_RESERVE_FRACTION + dynamic_borrow_max_reserved_slots: int = DEFAULT_DYNAMIC_BORROW_MAX_RESERVED_SLOTS strict_share_rounding: Literal["floor", "ceil"] = "ceil" repay_on_withheld_peer_pressure: bool = True def __post_init__(self) -> None: - if self.default_borrow_ceiling < 0: + if self.default_borrow_ceiling is not None and self.default_borrow_ceiling < 0: raise ValueError("default_borrow_ceiling must be non-negative.") + if not 0 <= self.dynamic_borrow_reserve_fraction <= 1: + raise ValueError("dynamic_borrow_reserve_fraction must be between 0 and 1.") + if self.dynamic_borrow_max_reserved_slots <= 0: + raise ValueError("dynamic_borrow_max_reserved_slots must be positive.") for key, ceiling in self.borrow_ceiling_by_group_resource.items(): if ceiling < 0: raise ValueError(f"Borrow ceiling for {key!r} must be non-negative.") @@ -142,7 +152,7 @@ def evaluate( pressure_resources = _queued_peer_pressure_resources(item, queue_view, admission_view) borrow_resources: list[tuple[SchedulerResourceKey, int]] = [] - diagnostics_by_resource: dict[SchedulerResourceKey, dict[str, int]] = {} + diagnostics_by_resource: dict[SchedulerResourceKey, dict[str, int | str]] = {} for resource, amount in item.resource_request.amounts.items(): admitted = admission_view.leased_resources_by_group.get(item.group.key, {}).get(resource, 0) strict_share = _strict_share(item, resource, queue_view, admission_view, self._config.strict_share_rounding) @@ -181,11 +191,12 @@ def evaluate( continue new_debt = projected - strict_share - ceiling = self._config.borrow_ceiling_by_group_resource.get( + ceiling, ceiling_diagnostics = self._borrow_ceiling( debt_key, - self._config.default_borrow_ceiling, + resource_limit=admission_view.resource_limits.get(resource, 0), + strict_share=strict_share, ) - diagnostics_by_resource[resource]["ceiling"] = ceiling + diagnostics_by_resource[resource].update(ceiling_diagnostics) if debt + new_debt > ceiling: return TaskAdmissionPolicyDecision( allowed=False, @@ -228,6 +239,36 @@ def on_release(self, lease: TaskAdmissionLease) -> PolicyStateDelta: debt_changes={(lease.item.group.key, resource): -amount for resource, amount in lease.resources.items()} ) + def _borrow_ceiling( + self, + debt_key: tuple[TaskGroupKey, SchedulerResourceKey], + *, + resource_limit: int, + strict_share: int, + ) -> tuple[int, dict[str, int | str]]: + explicit_ceiling = self._config.borrow_ceiling_by_group_resource.get(debt_key) + if explicit_ceiling is not None: + return explicit_ceiling, {"ceiling": explicit_ceiling, "ceiling_source": "group_resource"} + if self._config.default_borrow_ceiling is not None: + return self._config.default_borrow_ceiling, { + "ceiling": self._config.default_borrow_ceiling, + "ceiling_source": "default", + } + reserved_slots = _dynamic_reserved_slots( + resource_limit, + reserve_fraction=self._config.dynamic_borrow_reserve_fraction, + max_reserved_slots=self._config.dynamic_borrow_max_reserved_slots, + ) + target_solo_cap = max(0, resource_limit - reserved_slots) + borrow_slots = max(0, target_solo_cap - strict_share) + ceiling = _triangular_number(borrow_slots) + return ceiling, { + "ceiling": ceiling, + "ceiling_source": "dynamic", + "reserved_slots": reserved_slots, + "borrow_slots": borrow_slots, + } + def _queued_peer_pressure_resources( item: SchedulableTask, @@ -275,6 +316,14 @@ def _strict_share( return min(resource_limit, strict_share) +def _dynamic_reserved_slots(resource_limit: int, *, reserve_fraction: float, max_reserved_slots: int) -> int: + return min(max_reserved_slots, max(1, math.ceil(resource_limit * reserve_fraction))) + + +def _triangular_number(value: int) -> int: + return value * (value + 1) // 2 + + def _competing_group_specs( item: SchedulableTask, resource: SchedulerResourceKey, diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py index da756e6bf..41d18e596 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py @@ -256,6 +256,30 @@ def test_bounded_borrow_prevents_solo_heavy_group_from_consuming_all_typed_capac assert controller.view().policy_debt_by_group_resource[(group.key, "llm_wait")] == 1 +def test_bounded_borrow_dynamic_ceiling_reserves_capacity() -> None: + group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "hot")), weight=4.0, admitted_limit=8) + controller = TaskAdmissionController( + TaskAdmissionConfig( + submission_capacity=8, + resource_limits={"llm_wait": 8}, + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(), + ) + ) + items = [_item("hot", row, group=group, resources={"submission": 1, "llm_wait": 1}) for row in range(8)] + for index in range(7): + decision = controller.try_acquire(items[index], _queue_view(*items[index:])) + assert isinstance(decision, TaskAdmissionLease) + + denied = controller.try_acquire(items[7], _queue_view(items[7])) + + assert isinstance(denied, TaskAdmissionDenied) + assert denied.reason == "borrow_debt" + assert denied.diagnostics["ceiling"] == 6 + assert denied.diagnostics["strict_share"] == 4 + assert controller.view().resources_available["llm_wait"] == 1 + assert controller.view().policy_debt_by_group_resource[(group.key, "llm_wait")] == 6 + + def test_bounded_borrow_debt_blocks_under_peer_pressure_and_releases() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) controller = TaskAdmissionController( diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py index b5e1f9eb5..dab264ba0 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py @@ -106,6 +106,9 @@ def test_bounded_borrow_policy_defaults_to_ceil_strict_share_rounding() -> None: config = BoundedBorrowTaskAdmissionPolicyConfig() assert config.strict_share_rounding == "ceil" + assert config.default_borrow_ceiling is None + assert config.dynamic_borrow_reserve_fraction == 0.125 + assert config.dynamic_borrow_max_reserved_slots == 8 def test_bounded_borrow_policy_denies_existing_debt_under_peer_pressure() -> None: diff --git a/scripts/benchmarks/benchmark_bounded_borrow_admission.py b/scripts/benchmarks/benchmark_bounded_borrow_admission.py index a6c8a1406..63bd5e303 100644 --- a/scripts/benchmarks/benchmark_bounded_borrow_admission.py +++ b/scripts/benchmarks/benchmark_bounded_borrow_admission.py @@ -67,7 +67,7 @@ class ScenarioConfig: peer_duration: float hot_weight: float peer_weight: float - borrow_ceiling: int + borrow_ceiling: int | None @dataclass(frozen=True) @@ -122,7 +122,7 @@ def main() -> None: peer_duration=0.05, hot_weight=4.0, peer_weight=1.0, - borrow_ceiling=1, + borrow_ceiling=None, ), ScenarioConfig( name="neutral_ready_at_start", @@ -135,7 +135,7 @@ def main() -> None: peer_duration=0.1, hot_weight=1.0, peer_weight=1.0, - borrow_ceiling=1, + borrow_ceiling=None, ), ) @@ -285,11 +285,13 @@ def _tasks_for_group( def _controller(config: ScenarioConfig, policy: PolicyName) -> TaskAdmissionController: - bounded_borrow = ( - BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=config.borrow_ceiling) - if policy == "bounded" - else None - ) + bounded_borrow = None + if policy == "bounded": + bounded_borrow = ( + BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=config.borrow_ceiling) + if config.borrow_ceiling is not None + else BoundedBorrowTaskAdmissionPolicyConfig() + ) return TaskAdmissionController( TaskAdmissionConfig( submission_capacity=config.capacity, From 5d12c6553f3af95da8a8cdec571c1ba19bd8554e Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Wed, 20 May 2026 21:57:16 -0400 Subject: [PATCH 4/7] fix bounded-borrow marginal debt accounting Signed-off-by: Eric W. Tramel --- .../scheduling/task_policies.py | 12 +++------ .../scheduling/test_task_admission.py | 26 +++++++++++++++++-- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py index d1e8ad66e..2c846c8da 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py @@ -190,7 +190,7 @@ def evaluate( if projected <= strict_share: continue - new_debt = projected - strict_share + new_debt = min(amount, projected - strict_share) ceiling, ceiling_diagnostics = self._borrow_ceiling( debt_key, resource_limit=admission_view.resource_limits.get(resource, 0), @@ -261,9 +261,8 @@ def _borrow_ceiling( ) target_solo_cap = max(0, resource_limit - reserved_slots) borrow_slots = max(0, target_solo_cap - strict_share) - ceiling = _triangular_number(borrow_slots) - return ceiling, { - "ceiling": ceiling, + return borrow_slots, { + "ceiling": borrow_slots, "ceiling_source": "dynamic", "reserved_slots": reserved_slots, "borrow_slots": borrow_slots, @@ -302,6 +301,7 @@ def _strict_share( candidate_groups = _competing_group_specs(item, resource, queue_view, admission_view) group_weight = max(1.0, item.group.weight) if len(candidate_groups) <= 1: + # Reserve headroom for a future peer; otherwise solo groups would never reach borrow accounting. total_weight = group_weight * 2 else: total_weight = sum(max(1.0, group.weight) for group in candidate_groups.values()) @@ -320,10 +320,6 @@ def _dynamic_reserved_slots(resource_limit: int, *, reserve_fraction: float, max return min(max_reserved_slots, max(1, math.ceil(resource_limit * reserve_fraction))) -def _triangular_number(value: int) -> int: - return value * (value + 1) // 2 - - def _competing_group_specs( item: SchedulableTask, resource: SchedulerResourceKey, diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py index 41d18e596..2d12be946 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_admission.py @@ -274,10 +274,32 @@ def test_bounded_borrow_dynamic_ceiling_reserves_capacity() -> None: assert isinstance(denied, TaskAdmissionDenied) assert denied.reason == "borrow_debt" - assert denied.diagnostics["ceiling"] == 6 + assert denied.diagnostics["ceiling"] == 3 assert denied.diagnostics["strict_share"] == 4 assert controller.view().resources_available["llm_wait"] == 1 - assert controller.view().policy_debt_by_group_resource[(group.key, "llm_wait")] == 6 + assert controller.view().policy_debt_by_group_resource[(group.key, "llm_wait")] == 3 + + +def test_bounded_borrow_explicit_ceiling_counts_marginal_borrowed_slots() -> None: + group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "hot")), weight=4.0, admitted_limit=8) + controller = TaskAdmissionController( + TaskAdmissionConfig( + submission_capacity=8, + resource_limits={"llm_wait": 8}, + bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(default_borrow_ceiling=3), + ) + ) + items = [_item("hot", row, group=group, resources={"submission": 1, "llm_wait": 1}) for row in range(8)] + for index in range(7): + decision = controller.try_acquire(items[index], _queue_view(*items[index:])) + assert isinstance(decision, TaskAdmissionLease) + + denied = controller.try_acquire(items[7], _queue_view(items[7])) + + assert isinstance(denied, TaskAdmissionDenied) + assert denied.reason == "borrow_debt" + assert denied.diagnostics["ceiling"] == 3 + assert controller.view().policy_debt_by_group_resource[(group.key, "llm_wait")] == 3 def test_bounded_borrow_debt_blocks_under_peer_pressure_and_releases() -> None: From 1ef28e475a7d9910d6243c5c433858163ef11448 Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Wed, 20 May 2026 22:02:51 -0400 Subject: [PATCH 5/7] clarify model wait utilization benchmark metrics Signed-off-by: Eric W. Tramel --- .../benchmark_bounded_borrow_admission.py | 59 ++++++++++++++++--- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/scripts/benchmarks/benchmark_bounded_borrow_admission.py b/scripts/benchmarks/benchmark_bounded_borrow_admission.py index 63bd5e303..825b483f2 100644 --- a/scripts/benchmarks/benchmark_bounded_borrow_admission.py +++ b/scripts/benchmarks/benchmark_bounded_borrow_admission.py @@ -77,7 +77,9 @@ class ScenarioResult: capacity: int task_count: int wall_time_seconds: float - utilization_ratio: float + model_wait_slot_utilization_ratio: float + model_wait_idle_slot_seconds: float + model_resource_zero_inflight_idle_seconds: float hot_dispatch_count_before_peer_ready: int peer_first_wait_seconds: float peer_wait_mean_seconds: float @@ -217,6 +219,7 @@ def run_scenario(config: ScenarioConfig, policy: PolicyName) -> ScenarioResult: total_busy_seconds = sum(task.duration for task in tasks) wall_time = max((record.completed_at for record in records), default=0.0) + model_wait_idle_slot_seconds = max(0.0, wall_time * config.capacity - total_busy_seconds) peer_records = [record for record in records if record.group_name == "peer"] peer_waits = [record.wait_seconds for record in peer_records] hot_before_peer = sum( @@ -228,7 +231,9 @@ def run_scenario(config: ScenarioConfig, policy: PolicyName) -> ScenarioResult: capacity=config.capacity, task_count=len(tasks), wall_time_seconds=wall_time, - utilization_ratio=total_busy_seconds / (wall_time * config.capacity) if wall_time else 0.0, + model_wait_slot_utilization_ratio=total_busy_seconds / (wall_time * config.capacity) if wall_time else 0.0, + model_wait_idle_slot_seconds=model_wait_idle_slot_seconds, + model_resource_zero_inflight_idle_seconds=_zero_inflight_idle_seconds(records, wall_time), hot_dispatch_count_before_peer_ready=hot_before_peer, peer_first_wait_seconds=peer_waits[0] if peer_waits else 0.0, peer_wait_mean_seconds=sum(peer_waits) / len(peer_waits) if peer_waits else 0.0, @@ -311,7 +316,15 @@ def _compare_results(scenario: ScenarioName, results: list[ScenarioResult]) -> d "peer_p95_wait_reduction_ratio": _reduction_ratio(strict.peer_wait_p95_seconds, bounded.peer_wait_p95_seconds), "peer_first_wait_delta_seconds": bounded.peer_first_wait_seconds - strict.peer_first_wait_seconds, "wall_time_delta_seconds": bounded.wall_time_seconds - strict.wall_time_seconds, - "utilization_delta": bounded.utilization_ratio - strict.utilization_ratio, + "model_wait_slot_utilization_delta": ( + bounded.model_wait_slot_utilization_ratio - strict.model_wait_slot_utilization_ratio + ), + "model_wait_idle_slot_delta_seconds": ( + bounded.model_wait_idle_slot_seconds - strict.model_wait_idle_slot_seconds + ), + "model_resource_zero_inflight_idle_delta_seconds": ( + bounded.model_resource_zero_inflight_idle_seconds - strict.model_resource_zero_inflight_idle_seconds + ), "strict_hot_dispatch_before_peer_ready": strict.hot_dispatch_count_before_peer_ready, "bounded_hot_dispatch_before_peer_ready": bounded.hot_dispatch_count_before_peer_ready, } @@ -331,6 +344,25 @@ def _reduction_ratio(strict_value: float, bounded_value: float) -> float: return (strict_value - bounded_value) / strict_value +def _zero_inflight_idle_seconds(records: list[TaskRecord], wall_time: float) -> float: + events: dict[float, int] = {} + for record in records: + events[record.dispatch_at] = events.get(record.dispatch_at, 0) + 1 + events[record.completed_at] = events.get(record.completed_at, 0) - 1 + + idle_seconds = 0.0 + active = 0 + last_time = 0.0 + for event_time in sorted(events): + if active == 0: + idle_seconds += event_time - last_time + active += events[event_time] + last_time = event_time + if active == 0: + idle_seconds += wall_time - last_time + return idle_seconds + + def _git_sha() -> str: try: return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() @@ -348,12 +380,19 @@ def _markdown_report(report: BenchmarkReport) -> str: "", "## Scenario Results", "", - "| Scenario | Policy | Tasks | Wall time (s) | Utilization | Hot dispatches before peer ready | Peer wait p95 (s) | Peer first wait (s) |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: |", + "Utilization is model-wait slot utilization: time spent waiting on model responses divided by " + "`wall_time * model_resource_capacity`. Idle slot-seconds is the complementary model-wait capacity " + "with no in-flight request occupying that slot. Zero-inflight idle time is the interval where the " + "benchmark model resource had no in-flight request at all.", + "", + "| Scenario | Policy | Tasks | Wall time (s) | Model wait slot utilization | Model wait idle slot-s | Zero-inflight idle (s) | Hot dispatches before peer ready | Peer wait p95 (s) | Peer first wait (s) |", + "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", ] for scenario in report.scenarios: lines.append( - "| {scenario} | {policy} | {task_count} | {wall_time_seconds:.3f} | {utilization_ratio:.3f} | " + "| {scenario} | {policy} | {task_count} | {wall_time_seconds:.3f} | " + "{model_wait_slot_utilization_ratio:.3f} | " + "{model_wait_idle_slot_seconds:.3f} | {model_resource_zero_inflight_idle_seconds:.3f} | " "{hot_dispatch_count_before_peer_ready} | {peer_wait_p95_seconds:.3f} | " "{peer_first_wait_seconds:.3f} |".format(**scenario) ) @@ -362,14 +401,16 @@ def _markdown_report(report: BenchmarkReport) -> str: "", "## Comparisons", "", - "| Scenario | Peer p95 wait reduction | Peer first wait delta (s) | Wall time delta (s) | Utilization delta |", - "| --- | ---: | ---: | ---: | ---: |", + "| Scenario | Peer p95 wait reduction | Peer first wait delta (s) | Wall time delta (s) | Model wait utilization delta | Model wait idle slot-s delta | Zero-inflight idle delta (s) |", + "| --- | ---: | ---: | ---: | ---: | ---: | ---: |", ] ) for comparison in report.comparisons: lines.append( "| {scenario} | {peer_p95_wait_reduction_ratio:.1%} | {peer_first_wait_delta_seconds:.3f} | " - "{wall_time_delta_seconds:.3f} | {utilization_delta:.3f} |".format(**comparison) + "{wall_time_delta_seconds:.3f} | {model_wait_slot_utilization_delta:.3f} | " + "{model_wait_idle_slot_delta_seconds:.3f} | " + "{model_resource_zero_inflight_idle_delta_seconds:.3f} |".format(**comparison) ) lines.append("") return "\n".join(lines) From 1f0422bf0273c865f698a532996c18c1fd0b748d Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Wed, 20 May 2026 22:09:39 -0400 Subject: [PATCH 6/7] report per-column generation idle in benchmark Signed-off-by: Eric W. Tramel --- .../benchmark_bounded_borrow_admission.py | 77 ++++++++++++++----- 1 file changed, 57 insertions(+), 20 deletions(-) diff --git a/scripts/benchmarks/benchmark_bounded_borrow_admission.py b/scripts/benchmarks/benchmark_bounded_borrow_admission.py index 825b483f2..af16f1d9a 100644 --- a/scripts/benchmarks/benchmark_bounded_borrow_admission.py +++ b/scripts/benchmarks/benchmark_bounded_borrow_admission.py @@ -79,7 +79,9 @@ class ScenarioResult: wall_time_seconds: float model_wait_slot_utilization_ratio: float model_wait_idle_slot_seconds: float - model_resource_zero_inflight_idle_seconds: float + generation_column_idle_seconds: dict[str, float] + total_generation_column_idle_seconds: float + max_generation_column_idle_seconds: float hot_dispatch_count_before_peer_ready: int peer_first_wait_seconds: float peer_wait_mean_seconds: float @@ -220,6 +222,7 @@ def run_scenario(config: ScenarioConfig, policy: PolicyName) -> ScenarioResult: total_busy_seconds = sum(task.duration for task in tasks) wall_time = max((record.completed_at for record in records), default=0.0) model_wait_idle_slot_seconds = max(0.0, wall_time * config.capacity - total_busy_seconds) + generation_column_idle_seconds = _generation_column_idle_seconds(records, wall_time) peer_records = [record for record in records if record.group_name == "peer"] peer_waits = [record.wait_seconds for record in peer_records] hot_before_peer = sum( @@ -233,7 +236,9 @@ def run_scenario(config: ScenarioConfig, policy: PolicyName) -> ScenarioResult: wall_time_seconds=wall_time, model_wait_slot_utilization_ratio=total_busy_seconds / (wall_time * config.capacity) if wall_time else 0.0, model_wait_idle_slot_seconds=model_wait_idle_slot_seconds, - model_resource_zero_inflight_idle_seconds=_zero_inflight_idle_seconds(records, wall_time), + generation_column_idle_seconds=generation_column_idle_seconds, + total_generation_column_idle_seconds=sum(generation_column_idle_seconds.values()), + max_generation_column_idle_seconds=max(generation_column_idle_seconds.values(), default=0.0), hot_dispatch_count_before_peer_ready=hot_before_peer, peer_first_wait_seconds=peer_waits[0] if peer_waits else 0.0, peer_wait_mean_seconds=sum(peer_waits) / len(peer_waits) if peer_waits else 0.0, @@ -322,8 +327,15 @@ def _compare_results(scenario: ScenarioName, results: list[ScenarioResult]) -> d "model_wait_idle_slot_delta_seconds": ( bounded.model_wait_idle_slot_seconds - strict.model_wait_idle_slot_seconds ), - "model_resource_zero_inflight_idle_delta_seconds": ( - bounded.model_resource_zero_inflight_idle_seconds - strict.model_resource_zero_inflight_idle_seconds + "total_generation_column_idle_delta_seconds": ( + bounded.total_generation_column_idle_seconds - strict.total_generation_column_idle_seconds + ), + "max_generation_column_idle_delta_seconds": ( + bounded.max_generation_column_idle_seconds - strict.max_generation_column_idle_seconds + ), + "generation_column_idle_delta_seconds": _idle_delta_seconds( + strict.generation_column_idle_seconds, + bounded.generation_column_idle_seconds, ), "strict_hot_dispatch_before_peer_ready": strict.hot_dispatch_count_before_peer_ready, "bounded_hot_dispatch_before_peer_ready": bounded.hot_dispatch_count_before_peer_ready, @@ -344,9 +356,16 @@ def _reduction_ratio(strict_value: float, bounded_value: float) -> float: return (strict_value - bounded_value) / strict_value -def _zero_inflight_idle_seconds(records: list[TaskRecord], wall_time: float) -> float: +def _generation_column_idle_seconds(records: list[TaskRecord], wall_time: float) -> dict[str, float]: + group_names = sorted({record.group_name for record in records}) + return {group_name: _zero_inflight_idle_seconds(records, wall_time, group_name) for group_name in group_names} + + +def _zero_inflight_idle_seconds(records: list[TaskRecord], wall_time: float, group_name: str) -> float: events: dict[float, int] = {} for record in records: + if record.group_name != group_name: + continue events[record.dispatch_at] = events.get(record.dispatch_at, 0) + 1 events[record.completed_at] = events.get(record.completed_at, 0) - 1 @@ -363,6 +382,21 @@ def _zero_inflight_idle_seconds(records: list[TaskRecord], wall_time: float) -> return idle_seconds +def _idle_delta_seconds(strict: dict[str, float], bounded: dict[str, float]) -> dict[str, float]: + return {key: bounded.get(key, 0.0) - strict.get(key, 0.0) for key in sorted(strict.keys() | bounded.keys())} + + +def _format_idle_seconds(value: object) -> str: + if not isinstance(value, dict): + return "" + items = [] + for key in sorted(value): + item = value[key] + if isinstance(item, int | float): + items.append(f"{key}={item:.3f}") + return ", ".join(items) + + def _git_sha() -> str: try: return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() @@ -380,37 +414,40 @@ def _markdown_report(report: BenchmarkReport) -> str: "", "## Scenario Results", "", - "Utilization is model-wait slot utilization: time spent waiting on model responses divided by " - "`wall_time * model_resource_capacity`. Idle slot-seconds is the complementary model-wait capacity " - "with no in-flight request occupying that slot. Zero-inflight idle time is the interval where the " - "benchmark model resource had no in-flight request at all.", + "Generation-column idle is the amount of workflow wall time where that column has no in-flight " + "generation task. This is the benchmark proxy for endpoint/GPU time with no tokens being generated " + "for that model resource.", "", - "| Scenario | Policy | Tasks | Wall time (s) | Model wait slot utilization | Model wait idle slot-s | Zero-inflight idle (s) | Hot dispatches before peer ready | Peer wait p95 (s) | Peer first wait (s) |", - "| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |", + "| Scenario | Policy | Tasks | Wall time (s) | Generation-column idle (s) | Total column idle (s) | Max column idle (s) | Hot dispatches before peer ready | Peer wait p95 (s) | Peer first wait (s) |", + "| --- | --- | ---: | ---: | --- | ---: | ---: | ---: | ---: | ---: |", ] for scenario in report.scenarios: lines.append( - "| {scenario} | {policy} | {task_count} | {wall_time_seconds:.3f} | " - "{model_wait_slot_utilization_ratio:.3f} | " - "{model_wait_idle_slot_seconds:.3f} | {model_resource_zero_inflight_idle_seconds:.3f} | " + "| {scenario} | {policy} | {task_count} | {wall_time_seconds:.3f} | {idle_by_column} | " + "{total_generation_column_idle_seconds:.3f} | {max_generation_column_idle_seconds:.3f} | " "{hot_dispatch_count_before_peer_ready} | {peer_wait_p95_seconds:.3f} | " - "{peer_first_wait_seconds:.3f} |".format(**scenario) + "{peer_first_wait_seconds:.3f} |".format( + idle_by_column=_format_idle_seconds(scenario.get("generation_column_idle_seconds")), + **scenario, + ) ) lines.extend( [ "", "## Comparisons", "", - "| Scenario | Peer p95 wait reduction | Peer first wait delta (s) | Wall time delta (s) | Model wait utilization delta | Model wait idle slot-s delta | Zero-inflight idle delta (s) |", - "| --- | ---: | ---: | ---: | ---: | ---: | ---: |", + "| Scenario | Peer p95 wait reduction | Peer first wait delta (s) | Wall time delta (s) | Total column idle delta (s) | Max column idle delta (s) | Column idle delta (s) |", + "| --- | ---: | ---: | ---: | ---: | ---: | --- |", ] ) for comparison in report.comparisons: lines.append( "| {scenario} | {peer_p95_wait_reduction_ratio:.1%} | {peer_first_wait_delta_seconds:.3f} | " - "{wall_time_delta_seconds:.3f} | {model_wait_slot_utilization_delta:.3f} | " - "{model_wait_idle_slot_delta_seconds:.3f} | " - "{model_resource_zero_inflight_idle_delta_seconds:.3f} |".format(**comparison) + "{wall_time_delta_seconds:.3f} | {total_generation_column_idle_delta_seconds:.3f} | " + "{max_generation_column_idle_delta_seconds:.3f} | {idle_delta} |".format( + idle_delta=_format_idle_seconds(comparison.get("generation_column_idle_delta_seconds")), + **comparison, + ) ) lines.append("") return "\n".join(lines) From 96b1a4db58592189f633530601202f6e0a285c15 Mon Sep 17 00:00:00 2001 From: "Eric W. Tramel" Date: Thu, 21 May 2026 21:59:03 -0400 Subject: [PATCH 7/7] address bounded-borrow review feedback Signed-off-by: Eric W. Tramel --- architecture/dataset-builders.md | 2 +- .../dataset_builders/async_scheduler.py | 5 +++++ .../scheduling/task_policies.py | 7 +++++-- .../scheduling/test_task_policies.py | 20 +++++++++++++++++++ .../dataset_builders/test_async_scheduler.py | 3 ++- .../benchmark_bounded_borrow_admission.py | 2 +- 6 files changed, 34 insertions(+), 5 deletions(-) diff --git a/architecture/dataset-builders.md b/architecture/dataset-builders.md index fc3981543..9580defb1 100644 --- a/architecture/dataset-builders.md +++ b/architecture/dataset-builders.md @@ -138,7 +138,7 @@ When request admission is available, async scheduling may use request-pressure s - **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code. - **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config. -- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission. +- **Fair async admission with bounded borrow by default** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. The default `BoundedBorrowTaskAdmissionPolicyConfig` computes a strict per-group share, lets solo groups borrow only up to a capacity-derived reserve, and makes borrowed groups yield when eligible peer pressure appears. Passing `bounded_borrow=None` selects strict-fair admission for tests and benchmark comparisons. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission. - **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation. - **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler. diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py index adad6bc0c..4c98de673 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py @@ -1844,6 +1844,11 @@ def task_admission_snapshot(self) -> object: """Return the current scheduler task-admission snapshot for diagnostics.""" return self._task_admission.view() + @property + def task_admission_config(self) -> TaskAdmissionConfig: + """Return the effective scheduler task-admission config.""" + return self._task_admission_config + def capacity_plan(self) -> AsyncCapacityPlan: """Return the scheduler-side async capacity explanation for this run.""" task_view = self._task_admission.view() diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py index 2c846c8da..227cf8658 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/task_policies.py @@ -295,13 +295,16 @@ def _strict_share( rounding: Literal["floor", "ceil"], ) -> int: resource_limit = admission_view.resource_limits.get(resource, 0) - if resource_limit <= 1: + if resource_limit <= 0: + return 0 + if resource_limit == 1: return 1 candidate_groups = _competing_group_specs(item, resource, queue_view, admission_view) group_weight = max(1.0, item.group.weight) if len(candidate_groups) <= 1: - # Reserve headroom for a future peer; otherwise solo groups would never reach borrow accounting. + # 2x synthesizes one equally weighted future peer, keeping solo strict share near 50%. + # Dynamic borrow then decides how much of the remaining capacity the group may use. total_weight = group_weight * 2 else: total_weight = sum(max(1.0, group.weight) for group in candidate_groups.values()) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py index dab264ba0..15c8deb28 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/scheduling/test_task_policies.py @@ -111,6 +111,26 @@ def test_bounded_borrow_policy_defaults_to_ceil_strict_share_rounding() -> None: assert config.dynamic_borrow_max_reserved_slots == 8 +def test_bounded_borrow_policy_zero_resource_limit_has_no_strict_share() -> None: + group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) + item = _item("a", group) + policy = BoundedBorrowTaskAdmissionPolicy(BoundedBorrowTaskAdmissionPolicyConfig()) + view = TaskAdmissionView( + resource_limits={"submission": 0}, + resources_available={"submission": 0}, + leased_resources={}, + leased_resources_by_group={}, + running_counts_by_group={}, + policy_debt_by_group_resource={}, + ) + + decision = policy.evaluate(item, _queue_view(item), view) + + assert decision.allowed is False + assert decision.reason == "borrow_debt" + assert decision.diagnostics["strict_share"] == 0 + + def test_bounded_borrow_policy_denies_existing_debt_under_peer_pressure() -> None: group = TaskGroupSpec(TaskGroupKey(kind="model", identity=("provider", "model")), admitted_limit=1) peer_group = TaskGroupSpec(TaskGroupKey(kind="local", identity=("peer",))) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py index 2a78bcc98..3880c33f8 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py @@ -38,6 +38,7 @@ from data_designer.engine.dataset_builders.scheduling.completion import CompletionTracker, FrontierDelta from data_designer.engine.dataset_builders.scheduling.task_admission import TaskAdmissionConfig, TaskAdmissionLease from data_designer.engine.dataset_builders.scheduling.task_model import Task +from data_designer.engine.dataset_builders.scheduling.task_policies import BoundedBorrowTaskAdmissionPolicyConfig from data_designer.engine.dataset_builders.utils.execution_graph import ExecutionGraph from data_designer.engine.dataset_builders.utils.row_group_buffer import RowGroupBufferManager from data_designer.engine.models.errors import ( @@ -1855,7 +1856,7 @@ def test_scheduler_default_task_admission_uses_bounded_borrow_policy() -> None: row_groups=row_groups, ) - assert scheduler._task_admission_config.bounded_borrow is not None + assert isinstance(scheduler.task_admission_config.bounded_borrow, BoundedBorrowTaskAdmissionPolicyConfig) @pytest.mark.asyncio(loop_scope="session") diff --git a/scripts/benchmarks/benchmark_bounded_borrow_admission.py b/scripts/benchmarks/benchmark_bounded_borrow_admission.py index af16f1d9a..a4c102e9b 100644 --- a/scripts/benchmarks/benchmark_bounded_borrow_admission.py +++ b/scripts/benchmarks/benchmark_bounded_borrow_admission.py @@ -400,7 +400,7 @@ def _format_idle_seconds(value: object) -> str: def _git_sha() -> str: try: return subprocess.check_output(["git", "rev-parse", "HEAD"], text=True).strip() - except Exception: + except (subprocess.CalledProcessError, OSError): return "unknown"