Skip to content

chore: async engine readiness - blockers and polish before default #462

@andreatgretel

Description

@andreatgretel

Items needed before the async engine can become the default execution path. Split into blockers (must-fix for correctness/safety) and polish (nice-to-have).

Follows from #456 review (Nabin, Greptile, Codex) and subsequent investigation.

Relationship to #447

#447 proposes a larger structural refactor (AsyncRunController, explicit sinks) that solves several of the same problems by restructuring the code. This issue takes a targeted approach - minimal fixes to the existing code to unblock making async the default. The two are not mutually exclusive: land these fixes first, flip the default, then do #447's refactor for code quality.

Relationship to #552

#552 (workflow chaining) replaces allow_resize and pre-batch processor resize with between-stage chaining. The allow_resize deprecation path and pre-batch resize ban are migration concerns tracked there, not correctness blockers for this issue. This issue only adds a compatibility fallback so existing allow_resize users aren't broken when async becomes the default.

Blockers for default

Processor callback failures must raise and clean up

Post-batch processor exceptions currently drop the entire row group silently (log + drop, no exception to caller). Pre-batch processor exceptions do the same. Users lose data with only a log entry as evidence.

Both on_seeds_complete and on_before_checkpoint callbacks should propagate failures as DatasetGenerationError instead of silently dropping row groups. When a processor failure propagates, in-flight workers for the affected row group must be cancelled before the error is raised - otherwise orphaned coroutines continue running (making HTTP requests, retrying) after the scheduler has moved on.

Related: #447.

Early shutdown must drain in-flight workers

When _early_shutdown is triggered, _main_dispatch_loop salvages deferred tasks and checkpoints, then breaks. But workers that were already in-flight before the flag was set are never cancelled or awaited - _cancel_workers() is only called in the CancelledError path, not the normal early-shutdown exit. Those orphaned worker coroutines continue running (finishing HTTP requests, retrying, etc.) and their log calls trickle in after log_final() / progress bar teardown.

Fix: after _main_dispatch_loop returns, call _cancel_workers() (or at minimum await remaining workers) before log_final() so no stragglers outlive the scheduler.

Related: #447.

Partial completion must be surfaced

build() returns the success path whether it generated 100% or 30% of records. Callers have to compare target_num_records vs actual_num_records in metadata to detect partial failure.

When early shutdown triggers and actual_num_records is significantly below target_num_records, _build_async should log a prominent warning or raise, rather than silently returning a partial dataset.

Related: #447.

allow_resize auto-fallback to sync

The async engine rejects columns with allow_resize=True via _validate_async_compatibility(). This is a documented feature used in custom columns, plugins, and agent rollout ingestion. Making async the default would break those users.

Quick fix: replace the hard rejection with an auto-fallback to the sync engine when allow_resize=True is detected. This requires a per-run execution-mode switch rather than just a top-level branch in build(), because _run_cell_by_cell_generator() also reads the module-global DATA_DESIGNER_ASYNC_ENGINE for fan-out strategy. The fallback must set a run-local flag that overrides the global for the duration of that build.

# Sketch - the key point is a per-run flag, not a build()-level if/else:
use_async = self._should_use_async()  # checks global + allow_resize
if not use_async and DATA_DESIGNER_ASYNC_ENGINE:
    warnings.warn("allow_resize is deprecated ...", DeprecationWarning, stacklevel=2)
# Pass use_async down so _run_cell_by_cell_generator uses the right fan-out strategy

This keeps existing behavior for allow_resize users (sync path works as before) while making async the default for everyone else. Full deprecation and removal of allow_resize is tracked in #552.

Pre-batch processor resize must fail-fast (async only)

Async handles pre-batch processor shrink accidentally (via drop-marking in replace_dataframe) but silently ignores expansion. This must be explicit before async is the default.

Fix: add a row-count guard in ProcessorRunner.run_pre_batch_on_df() (the async path) that raises DatasetProcessingError if the returned DataFrame has a different row count.

Note: sync currently allows pre-batch processors to resize freely via replace_buffer(allow_resize=True). That's documented behavior. Whether to also restrict it in sync is a migration concern tracked in #552, not a blocker here.

Async trace support for preview path

_build_async_preview doesn't pass trace=True to _prepare_async_run, even when DATA_DESIGNER_ASYNC_TRACE=1 is set. The full _build_async path reads the env var, but the preview path skips it.

This means TaskTrace data isn't collected during preview(), which makes it harder to profile short iterative runs where preview() is the primary execution path.

Fix: mirror the trace check from _build_async into _build_async_preview, and add a task_traces field to PreviewResults so the collected data is actually accessible.

Polish

Progress bar / reporting

  • ProgressSnapshot dataclass - Replace the 8-tuple returned by ProgressTracker.get_snapshot() with a named frozen dataclass. Callers currently unpack with positional _-prefixed throwaways.
  • Gate StickyProgressBar on reporter existence - async_scheduler.py creates a StickyProgressBar even when there are no CELL_BY_CELL columns (no reporter). Skip creation when reporter is None.
  • Type _make_wrapper properly - _wrapped_handlers in StickyProgressBar is typed as list[tuple[StreamHandler, object]]. Use Callable[[logging.LogRecord], None] for the emit reference.
  • Cache terminal size in _redraw - shutil.get_terminal_size() is a syscall called on every bar update. Cache with a short TTL under high throughput.
  • _compute_stats_width rate overflow - The sample string uses 9999.9 rec/s which could be exceeded at very high throughput. Low priority.
  • Race in StickyProgressBar.__exit__ - After _clear_bars() releases the lock, _active is still True and handlers are still wrapped. A concurrent log emit can re-_redraw() bars that were just cleared, leaving ghost lines on the terminal. Fix: set _active = False inside the lock and add an _active guard in _redraw() so threads that sneak past the lock after teardown don't redraw.

Scheduler accounting

  • Double-counted skips on non-retryable seed failure (low likelihood - seed columns rarely hit LLM APIs) - When a seed task fails non-retryably, _drop_row_group records skips for all CELL_BY_CELL columns, then _run_seeds_complete_check fires and records skips again for the same rows. Fix: snapshot dropped rows before on_seeds_complete and only record skips for newly-dropped rows.

Metadata contract parity

  • Sync/async write_metadata() produce different fields - Sync and async paths write metadata with different keys and structures. Before async becomes the default, the metadata contract should be aligned so downstream consumers (notebooks, dashboards, CI checks) don't break when switching engines.

Slow request diagnostics

  • Warn on slow HTTP requests - Add periodic warnings in HttpModelClient._apost when a request has been pending longer than a threshold (e.g. 30s). Useful for diagnosing streaming responses that trickle data without timing out.
  • Scheduler stall warning - Add a timeout on _wake_event.wait() in the main dispatch loop that logs a warning with in-flight/active/deferred counts when no progress is made for 30s.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions