Items needed before the async engine can become the default execution path. Split into blockers (must-fix for correctness/safety) and polish (nice-to-have).
Follows from #456 review (Nabin, Greptile, Codex) and subsequent investigation.
Relationship to #447
#447 proposes a larger structural refactor (AsyncRunController, explicit sinks) that solves several of the same problems by restructuring the code. This issue takes a targeted approach - minimal fixes to the existing code to unblock making async the default. The two are not mutually exclusive: land these fixes first, flip the default, then do #447's refactor for code quality.
Relationship to #552
#552 (workflow chaining) replaces allow_resize and pre-batch processor resize with between-stage chaining. The allow_resize deprecation path and pre-batch resize ban are migration concerns tracked there, not correctness blockers for this issue. This issue only adds a compatibility fallback so existing allow_resize users aren't broken when async becomes the default.
Blockers for default
Processor callback failures must raise and clean up
Post-batch processor exceptions currently drop the entire row group silently (log + drop, no exception to caller). Pre-batch processor exceptions do the same. Users lose data with only a log entry as evidence.
Both on_seeds_complete and on_before_checkpoint callbacks should propagate failures as DatasetGenerationError instead of silently dropping row groups. When a processor failure propagates, in-flight workers for the affected row group must be cancelled before the error is raised - otherwise orphaned coroutines continue running (making HTTP requests, retrying) after the scheduler has moved on.
Related: #447.
Early shutdown must drain in-flight workers
When _early_shutdown is triggered, _main_dispatch_loop salvages deferred tasks and checkpoints, then breaks. But workers that were already in-flight before the flag was set are never cancelled or awaited - _cancel_workers() is only called in the CancelledError path, not the normal early-shutdown exit. Those orphaned worker coroutines continue running (finishing HTTP requests, retrying, etc.) and their log calls trickle in after log_final() / progress bar teardown.
Fix: after _main_dispatch_loop returns, call _cancel_workers() (or at minimum await remaining workers) before log_final() so no stragglers outlive the scheduler.
Related: #447.
Partial completion must be surfaced
build() returns the success path whether it generated 100% or 30% of records. Callers have to compare target_num_records vs actual_num_records in metadata to detect partial failure.
When early shutdown triggers and actual_num_records is significantly below target_num_records, _build_async should log a prominent warning or raise, rather than silently returning a partial dataset.
Related: #447.
allow_resize auto-fallback to sync
The async engine rejects columns with allow_resize=True via _validate_async_compatibility(). This is a documented feature used in custom columns, plugins, and agent rollout ingestion. Making async the default would break those users.
Quick fix: replace the hard rejection with an auto-fallback to the sync engine when allow_resize=True is detected. This requires a per-run execution-mode switch rather than just a top-level branch in build(), because _run_cell_by_cell_generator() also reads the module-global DATA_DESIGNER_ASYNC_ENGINE for fan-out strategy. The fallback must set a run-local flag that overrides the global for the duration of that build.
# Sketch - the key point is a per-run flag, not a build()-level if/else:
use_async = self._should_use_async() # checks global + allow_resize
if not use_async and DATA_DESIGNER_ASYNC_ENGINE:
warnings.warn("allow_resize is deprecated ...", DeprecationWarning, stacklevel=2)
# Pass use_async down so _run_cell_by_cell_generator uses the right fan-out strategy
This keeps existing behavior for allow_resize users (sync path works as before) while making async the default for everyone else. Full deprecation and removal of allow_resize is tracked in #552.
Pre-batch processor resize must fail-fast (async only)
Async handles pre-batch processor shrink accidentally (via drop-marking in replace_dataframe) but silently ignores expansion. This must be explicit before async is the default.
Fix: add a row-count guard in ProcessorRunner.run_pre_batch_on_df() (the async path) that raises DatasetProcessingError if the returned DataFrame has a different row count.
Note: sync currently allows pre-batch processors to resize freely via replace_buffer(allow_resize=True). That's documented behavior. Whether to also restrict it in sync is a migration concern tracked in #552, not a blocker here.
Async trace support for preview path
_build_async_preview doesn't pass trace=True to _prepare_async_run, even when DATA_DESIGNER_ASYNC_TRACE=1 is set. The full _build_async path reads the env var, but the preview path skips it.
This means TaskTrace data isn't collected during preview(), which makes it harder to profile short iterative runs where preview() is the primary execution path.
Fix: mirror the trace check from _build_async into _build_async_preview, and add a task_traces field to PreviewResults so the collected data is actually accessible.
Polish
Progress bar / reporting
Scheduler accounting
Metadata contract parity
Slow request diagnostics
Items needed before the async engine can become the default execution path. Split into blockers (must-fix for correctness/safety) and polish (nice-to-have).
Follows from #456 review (Nabin, Greptile, Codex) and subsequent investigation.
Relationship to #447
#447 proposes a larger structural refactor (AsyncRunController, explicit sinks) that solves several of the same problems by restructuring the code. This issue takes a targeted approach - minimal fixes to the existing code to unblock making async the default. The two are not mutually exclusive: land these fixes first, flip the default, then do #447's refactor for code quality.
Relationship to #552
#552 (workflow chaining) replaces
allow_resizeand pre-batch processor resize with between-stage chaining. Theallow_resizedeprecation path and pre-batch resize ban are migration concerns tracked there, not correctness blockers for this issue. This issue only adds a compatibility fallback so existingallow_resizeusers aren't broken when async becomes the default.Blockers for default
Processor callback failures must raise and clean up
Post-batch processor exceptions currently drop the entire row group silently (log + drop, no exception to caller). Pre-batch processor exceptions do the same. Users lose data with only a log entry as evidence.
Both
on_seeds_completeandon_before_checkpointcallbacks should propagate failures asDatasetGenerationErrorinstead of silently dropping row groups. When a processor failure propagates, in-flight workers for the affected row group must be cancelled before the error is raised - otherwise orphaned coroutines continue running (making HTTP requests, retrying) after the scheduler has moved on.Related: #447.
Early shutdown must drain in-flight workers
When
_early_shutdownis triggered,_main_dispatch_loopsalvages deferred tasks and checkpoints, then breaks. But workers that were already in-flight before the flag was set are never cancelled or awaited -_cancel_workers()is only called in theCancelledErrorpath, not the normal early-shutdown exit. Those orphaned worker coroutines continue running (finishing HTTP requests, retrying, etc.) and their log calls trickle in afterlog_final()/ progress bar teardown.Fix: after
_main_dispatch_loopreturns, call_cancel_workers()(or at minimum await remaining workers) beforelog_final()so no stragglers outlive the scheduler.Related: #447.
Partial completion must be surfaced
build()returns the success path whether it generated 100% or 30% of records. Callers have to comparetarget_num_recordsvsactual_num_recordsin metadata to detect partial failure.When early shutdown triggers and
actual_num_recordsis significantly belowtarget_num_records,_build_asyncshould log a prominent warning or raise, rather than silently returning a partial dataset.Related: #447.
allow_resizeauto-fallback to syncThe async engine rejects columns with
allow_resize=Truevia_validate_async_compatibility(). This is a documented feature used in custom columns, plugins, and agent rollout ingestion. Making async the default would break those users.Quick fix: replace the hard rejection with an auto-fallback to the sync engine when
allow_resize=Trueis detected. This requires a per-run execution-mode switch rather than just a top-level branch inbuild(), because_run_cell_by_cell_generator()also reads the module-globalDATA_DESIGNER_ASYNC_ENGINEfor fan-out strategy. The fallback must set a run-local flag that overrides the global for the duration of that build.This keeps existing behavior for
allow_resizeusers (sync path works as before) while making async the default for everyone else. Full deprecation and removal ofallow_resizeis tracked in #552.Pre-batch processor resize must fail-fast (async only)
Async handles pre-batch processor shrink accidentally (via drop-marking in
replace_dataframe) but silently ignores expansion. This must be explicit before async is the default.Fix: add a row-count guard in
ProcessorRunner.run_pre_batch_on_df()(the async path) that raisesDatasetProcessingErrorif the returned DataFrame has a different row count.Note: sync currently allows pre-batch processors to resize freely via
replace_buffer(allow_resize=True). That's documented behavior. Whether to also restrict it in sync is a migration concern tracked in #552, not a blocker here.Async trace support for preview path
_build_async_previewdoesn't passtrace=Trueto_prepare_async_run, even whenDATA_DESIGNER_ASYNC_TRACE=1is set. The full_build_asyncpath reads the env var, but the preview path skips it.This means
TaskTracedata isn't collected duringpreview(), which makes it harder to profile short iterative runs wherepreview()is the primary execution path.Fix: mirror the trace check from
_build_asyncinto_build_async_preview, and add atask_tracesfield toPreviewResultsso the collected data is actually accessible.Polish
Progress bar / reporting
ProgressSnapshotdataclass - Replace the 8-tuple returned byProgressTracker.get_snapshot()with a named frozen dataclass. Callers currently unpack with positional_-prefixed throwaways.StickyProgressBaron reporter existence -async_scheduler.pycreates aStickyProgressBareven when there are no CELL_BY_CELL columns (no reporter). Skip creation when reporter isNone._make_wrapperproperly -_wrapped_handlersinStickyProgressBaris typed aslist[tuple[StreamHandler, object]]. UseCallable[[logging.LogRecord], None]for the emit reference._redraw-shutil.get_terminal_size()is a syscall called on every bar update. Cache with a short TTL under high throughput._compute_stats_widthrate overflow - The sample string uses9999.9 rec/swhich could be exceeded at very high throughput. Low priority.StickyProgressBar.__exit__- After_clear_bars()releases the lock,_activeis stillTrueand handlers are still wrapped. A concurrent log emit can re-_redraw()bars that were just cleared, leaving ghost lines on the terminal. Fix: set_active = Falseinside the lock and add an_activeguard in_redraw()so threads that sneak past the lock after teardown don't redraw.Scheduler accounting
_drop_row_grouprecords skips for all CELL_BY_CELL columns, then_run_seeds_complete_checkfires and records skips again for the same rows. Fix: snapshot dropped rows beforeon_seeds_completeand only record skips for newly-dropped rows.Metadata contract parity
write_metadata()produce different fields - Sync and async paths write metadata with different keys and structures. Before async becomes the default, the metadata contract should be aligned so downstream consumers (notebooks, dashboards, CI checks) don't break when switching engines.Slow request diagnostics
HttpModelClient._apostwhen a request has been pending longer than a threshold (e.g. 30s). Useful for diagnosing streaming responses that trickle data without timing out._wake_event.wait()in the main dispatch loop that logs a warning with in-flight/active/deferred counts when no progress is made for 30s.