Skip to content
2 changes: 1 addition & 1 deletion architecture/dataset-builders.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ When request admission is available, async scheduling may use request-pressure s

- **Dual execution engines behind one API.** The sequential engine is simpler and easier to debug; the async engine adds row-group parallelism for throughput. Users switch via an environment variable without changing their code.
- **DAG-driven ordering** ensures columns with dependencies (e.g., a judge column that depends on a text column) are generated in the correct order, regardless of the order they appear in the config.
- **Fair async admission** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
- **Fair async admission with bounded borrow by default** keeps the scheduler flowing across ready columns and model groups. `FairTaskQueue.select_next(...)` chooses eligible ready work, `TaskAdmissionController` leases scheduler resources before spawn, and `FairTaskQueue.commit(...)` removes the selected task only after admission succeeds. The default `BoundedBorrowTaskAdmissionPolicyConfig` computes a strict per-group share, lets solo groups borrow only up to a capacity-derived reserve, and makes borrowed groups yield when eligible peer pressure appears. Passing `bounded_borrow=None` selects strict-fair admission for tests and benchmark comparisons. Per-group virtual-time ordering prevents a large ready frontier from degenerating into a column-by-column wave, and scheduler-resource accounting remains separate from provider/model request admission.
- **Salvage rounds in async mode** retry failed tasks after all other tasks in a round complete, improving resilience against transient LLM failures without blocking the entire generation.
- **Unified DAG construction.** `topologically_sort_column_configs` (in `execution_graph.py`) determines column ordering using Kahn's algorithm; the runtime `ExecutionGraph` adds strategy-aware dependency tracking for the async scheduler.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
TaskAdmissionLease,
)
from data_designer.engine.dataset_builders.scheduling.task_model import SliceRef, Task, TaskTrace
from data_designer.engine.dataset_builders.scheduling.task_policies import BoundedBorrowTaskAdmissionPolicyConfig
from data_designer.engine.dataset_builders.utils.async_progress_reporter import (
DEFAULT_REPORT_INTERVAL,
AsyncProgressReporter,
Expand Down Expand Up @@ -185,6 +186,7 @@ def __init__(
admission_config = task_admission_config or TaskAdmissionConfig(
submission_capacity=max_submitted_tasks,
resource_limits={"llm_wait": max_model_task_admission, "local": max_submitted_tasks},
bounded_borrow=BoundedBorrowTaskAdmissionPolicyConfig(),
)
self._task_admission = TaskAdmissionController(admission_config)
self._task_admission_config = admission_config
Expand Down Expand Up @@ -1842,6 +1844,11 @@ def task_admission_snapshot(self) -> object:
"""Return the current scheduler task-admission snapshot for diagnostics."""
return self._task_admission.view()

@property
def task_admission_config(self) -> TaskAdmissionConfig:
"""Return the effective scheduler task-admission config."""
return self._task_admission_config

def capacity_plan(self) -> AsyncCapacityPlan:
"""Return the scheduler-side async capacity explanation for this run."""
task_view = self._task_admission.view()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import math
from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal, Protocol
Expand All @@ -12,6 +13,7 @@
SchedulableTask,
SchedulerResourceKey,
TaskGroupKey,
TaskGroupSpec,
)

if TYPE_CHECKING:
Expand All @@ -27,6 +29,8 @@
"shutdown",
"policy_denial",
]
DEFAULT_DYNAMIC_BORROW_RESERVE_FRACTION = 0.125
DEFAULT_DYNAMIC_BORROW_MAX_RESERVED_SLOTS = 8


@dataclass(frozen=True)
Expand All @@ -35,16 +39,32 @@ class BoundedBorrowTaskAdmissionPolicyConfig:

Borrow debt is tracked by task group and scheduler resource. Any completed
lease in the same group repays debt for the released resources; repayment is
not tied to the specific lease that originally borrowed.
not tied to the specific lease that originally borrowed. When no explicit
borrow ceiling is configured, the policy reserves one slot per eight
resource slots, capped at eight reserved slots, and lets solo groups borrow
up to the remaining capacity.
"""

borrow_ceiling_by_group_resource: Mapping[tuple[TaskGroupKey, SchedulerResourceKey], int] = field(
default_factory=dict
)
default_borrow_ceiling: int = 0
strict_share_rounding: Literal["floor", "ceil"] = "floor"
default_borrow_ceiling: int | None = None
dynamic_borrow_reserve_fraction: float = DEFAULT_DYNAMIC_BORROW_RESERVE_FRACTION
dynamic_borrow_max_reserved_slots: int = DEFAULT_DYNAMIC_BORROW_MAX_RESERVED_SLOTS
strict_share_rounding: Literal["floor", "ceil"] = "ceil"
repay_on_withheld_peer_pressure: bool = True

def __post_init__(self) -> None:
if self.default_borrow_ceiling is not None and self.default_borrow_ceiling < 0:
raise ValueError("default_borrow_ceiling must be non-negative.")
if not 0 <= self.dynamic_borrow_reserve_fraction <= 1:
raise ValueError("dynamic_borrow_reserve_fraction must be between 0 and 1.")
if self.dynamic_borrow_max_reserved_slots <= 0:
raise ValueError("dynamic_borrow_max_reserved_slots must be positive.")
for key, ceiling in self.borrow_ceiling_by_group_resource.items():
if ceiling < 0:
raise ValueError(f"Borrow ceiling for {key!r} must be non-negative.")


@dataclass(frozen=True)
class TaskAdmissionPolicyDecision:
Expand Down Expand Up @@ -127,51 +147,74 @@ def evaluate(
queue_view: QueueView,
admission_view: TaskAdmissionView,
) -> TaskAdmissionPolicyDecision:
limit = item.group.admitted_limit
if limit is None:
return TaskAdmissionPolicyDecision(allowed=True)

leased_count = admission_view.running_counts_by_group.get(item.group.key, 0)
if leased_count < limit:
if item.group.admitted_limit is None:
return TaskAdmissionPolicyDecision(allowed=True)

pressure_resources = _queued_peer_pressure_resources(item, queue_view, admission_view)
if pressure_resources:
for resource in pressure_resources:
debt_key = (item.group.key, resource)
debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0)
borrow_resources: list[tuple[SchedulerResourceKey, int]] = []
diagnostics_by_resource: dict[SchedulerResourceKey, dict[str, int | str]] = {}
for resource, amount in item.resource_request.amounts.items():
admitted = admission_view.leased_resources_by_group.get(item.group.key, {}).get(resource, 0)
strict_share = _strict_share(item, resource, queue_view, admission_view, self._config.strict_share_rounding)
projected = admitted + amount
debt_key = (item.group.key, resource)
debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0)
diagnostics_by_resource[resource] = {
"admitted": admitted,
"requested": amount,
"strict_share": strict_share,
"debt": debt,
}

if resource in pressure_resources:
if debt > 0:
return TaskAdmissionPolicyDecision(
allowed=False,
reason="borrow_debt",
diagnostics={"resource": resource, "debt": debt},
diagnostics={"resource": resource, "debt": debt, "strict_share": strict_share},
)
return TaskAdmissionPolicyDecision(
allowed=False,
reason="group_cap",
diagnostics={
"admitted_limit": limit,
"leased_count": leased_count,
"pressure_resources": pressure_resources,
},
)
if projected > strict_share:
return TaskAdmissionPolicyDecision(
allowed=False,
reason="group_cap",
diagnostics={
"resource": resource,
"admitted": admitted,
"requested": amount,
"strict_share": strict_share,
"pressure_resources": pressure_resources,
},
)
continue

borrow_resources: list[tuple[SchedulerResourceKey, int]] = []
for resource, amount in item.resource_request.amounts.items():
debt_key = (item.group.key, resource)
debt = admission_view.policy_debt_by_group_resource.get(debt_key, 0)
ceiling = self._config.borrow_ceiling_by_group_resource.get(
if projected <= strict_share:
continue

new_debt = min(amount, projected - strict_share)
ceiling, ceiling_diagnostics = self._borrow_ceiling(
debt_key,
self._config.default_borrow_ceiling,
resource_limit=admission_view.resource_limits.get(resource, 0),
strict_share=strict_share,
)
if debt + amount > ceiling:
diagnostics_by_resource[resource].update(ceiling_diagnostics)
if debt + new_debt > ceiling:
return TaskAdmissionPolicyDecision(
allowed=False,
reason="borrow_debt",
diagnostics={"resource": resource, "debt": debt, "requested": amount, "ceiling": ceiling},
diagnostics={
"resource": resource,
"debt": debt,
"requested": amount,
"new_debt": new_debt,
"ceiling": ceiling,
"strict_share": strict_share,
},
)
borrow_resources.append((resource, amount))
return TaskAdmissionPolicyDecision(allowed=True, diagnostics={"borrow_resources": tuple(borrow_resources)})
borrow_resources.append((resource, new_debt))
return TaskAdmissionPolicyDecision(
allowed=True,
diagnostics={"borrow_resources": tuple(borrow_resources), "strict_share": diagnostics_by_resource},
)

def on_acquire(
self,
Expand All @@ -196,6 +239,35 @@ def on_release(self, lease: TaskAdmissionLease) -> PolicyStateDelta:
debt_changes={(lease.item.group.key, resource): -amount for resource, amount in lease.resources.items()}
)

def _borrow_ceiling(
self,
debt_key: tuple[TaskGroupKey, SchedulerResourceKey],
*,
resource_limit: int,
strict_share: int,
) -> tuple[int, dict[str, int | str]]:
explicit_ceiling = self._config.borrow_ceiling_by_group_resource.get(debt_key)
if explicit_ceiling is not None:
return explicit_ceiling, {"ceiling": explicit_ceiling, "ceiling_source": "group_resource"}
if self._config.default_borrow_ceiling is not None:
return self._config.default_borrow_ceiling, {
"ceiling": self._config.default_borrow_ceiling,
"ceiling_source": "default",
}
reserved_slots = _dynamic_reserved_slots(
resource_limit,
reserve_fraction=self._config.dynamic_borrow_reserve_fraction,
max_reserved_slots=self._config.dynamic_borrow_max_reserved_slots,
)
target_solo_cap = max(0, resource_limit - reserved_slots)
borrow_slots = max(0, target_solo_cap - strict_share)
return borrow_slots, {
"ceiling": borrow_slots,
"ceiling_source": "dynamic",
"reserved_slots": reserved_slots,
"borrow_slots": borrow_slots,
}


def _queued_peer_pressure_resources(
item: SchedulableTask,
Expand All @@ -215,6 +287,60 @@ def _queued_peer_pressure_resources(
return tuple(pressure_resources)


def _strict_share(
item: SchedulableTask,
resource: SchedulerResourceKey,
queue_view: QueueView,
admission_view: TaskAdmissionView,
rounding: Literal["floor", "ceil"],
) -> int:
resource_limit = admission_view.resource_limits.get(resource, 0)
if resource_limit <= 0:
return 0
if resource_limit == 1:
return 1

candidate_groups = _competing_group_specs(item, resource, queue_view, admission_view)
group_weight = max(1.0, item.group.weight)
if len(candidate_groups) <= 1:
# 2x synthesizes one equally weighted future peer, keeping solo strict share near 50%.
# Dynamic borrow then decides how much of the remaining capacity the group may use.
total_weight = group_weight * 2
else:
total_weight = sum(max(1.0, group.weight) for group in candidate_groups.values())
raw_share = resource_limit * group_weight / total_weight
if rounding == "ceil":
rounded_share = math.ceil(raw_share)
else:
rounded_share = math.floor(raw_share)
strict_share = max(1, rounded_share)
if item.group.admitted_limit is not None:
strict_share = min(strict_share, item.group.admitted_limit)
return min(resource_limit, strict_share)


def _dynamic_reserved_slots(resource_limit: int, *, reserve_fraction: float, max_reserved_slots: int) -> int:
return min(max_reserved_slots, max(1, math.ceil(resource_limit * reserve_fraction)))


def _competing_group_specs(
item: SchedulableTask,
resource: SchedulerResourceKey,
queue_view: QueueView,
admission_view: TaskAdmissionView,
) -> dict[TaskGroupKey, TaskGroupSpec]:
groups: dict[TaskGroupKey, TaskGroupSpec] = {item.group.key: item.group}
for group_key, peer_resources in queue_view.first_candidate_resources_by_group.items():
if peer_resources.get(resource, 0) <= 0:
continue
if not _is_hard_resource_eligible(peer_resources, admission_view):
continue
group = queue_view.first_candidate_group_specs_by_group.get(group_key)
if group is not None:
groups[group_key] = group
return groups


def _fair_pressure_resources(
resources: Mapping[SchedulerResourceKey, int],
) -> tuple[SchedulerResourceKey, ...]:
Expand Down
Loading
Loading