Skip to content

compute: serialize DataflowError on dataflow edges#35776

Draft
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
antiguru:dfe-ser-impl
Draft

compute: serialize DataflowError on dataflow edges#35776
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
antiguru:dfe-ser-impl

Conversation

@antiguru
Copy link
Copy Markdown
Member

Summary

  • Introduce DataflowErrorSer, a newtype over proto-encoded bytes, to replace DataflowError on compute-internal dataflow edges (streams, arrangements).
  • Make persist_source generic over the error type E, so compute callers can choose DataflowErrorSer while storage callers stay on DataflowError.
  • Switch all compute rendering, arrangement types (ErrSpine, RowErrSpine, etc.), CollectionBundle, sinks, and logging to use DataflowErrorSer.

Motivation

DataflowError is a complex enum with boxed variants (EvalError alone has 50+ sub-variants). It requires a 700-line hand-crafted Columnation impl. In compute rendering, errors are never inspected — they flow opaquely through concat, consolidate, arrange, and are only displayed (.to_string()) at sinks.

Replacing it with opaque serialized bytes:

  • Eliminates the need for the complex Columnation impl in compute (delegates to trivial Vec<u8> columnation)
  • Removes complex structures from dataflow edges
  • Paves the way for eventually using Row to represent serialized errors

Design

Serialization boundary: Errors are serialized at the persist_source boundary when entering compute, and deserialized only at exit points (MV sink and continual task for persist writes; subscribe and copy-to-s3 via Display).

Canonicality invariant: The MV sink's self-correction logic computes desired - persist and relies on equality for cancellation. Proto3 encoding via prost is deterministic for messages without map fields (verified: none in the error proto chain), so byte-equality implies semantic equality.

Persist boundary unchanged: SourceData already serializes errors as proto bytes in Arrow columnar encoding. This change does not affect the persist format.

Testing

  • Added proptest verifying round-trip canonicality: ser(deser(ser(e))) == ser(e) for all DataflowError variants.
  • Added test verifying Display equivalence between DataflowError and DataflowErrorSer.
  • All existing mz-compute and mz-storage-operators tests pass.

Future work

  • Direct EvalError → proto bytes (avoids a Box::new allocation in the From<EvalError> path)
  • SourceData::append_err_bytes to skip deserialize/reserialize at MV sink
  • Error code + optional description format for compute-created errors
  • Columnar impl for DataflowErrorSer (enables newer spine infrastructure)

🤖 Generated with Claude Code

antiguru and others added 5 commits March 28, 2026 17:35
Introduce `DataflowErrorSer`, a newtype over `Vec<u8>` holding
proto-encoded `ProtoDataflowError` bytes, for use on compute-internal
dataflow edges. The canonical proto3+prost encoding (no map fields)
ensures byte-equality implies semantic equality, enabling derived
`Ord`/`Hash` on the raw bytes.

The type includes `From<DataflowError>`, `From<EvalError>`, `Display`,
`Debug`, and `Columnation` impls. Nothing uses it yet -- subsequent
tasks will wire it in to replace `DataflowError` on internal edges.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add a generic type parameter E to persist_source, persist_source_core,
decode_and_mfp, and PendingWork::do_work, bounded by
ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>.

This is a pure refactor — all existing callers continue to compile,
inferring E = DataflowError. No behavior change.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… DataflowError

Replace DataflowError with DataflowErrorSer (proto-serialized bytes) on
all compute-internal dataflow edges. This avoids carrying the full enum
through the dataflow graph, reducing memory and comparison costs.

Changes:
- typedefs.rs: All Err{Spine,Batcher,Builder} and KeyErr/RowErr types
  now use DataflowErrorSer.
- context.rs: CollectionBundle and ArrangementFlavor use DataflowErrorSer
  in their error collection types.
- render.rs: persist_source call uses turbofish ::<_, DataflowErrorSer>,
  Constant and LetRecLimitExceeded errors create DataflowErrorSer.
- All render modules (reduce, flat_map, top_k, join/delta_join,
  join/linear_join, sinks): DataflowError::from(e) -> DataflowErrorSer::from(e).
- Sinks deserialize at persist boundaries: materialized_view batch writes
  and continual_task appends call .deserialize() to recover DataflowError
  for SourceData. copy_to_s3 deserializes before passing to
  storage-operators which still expects DataflowError.
- subscribe sink works unchanged since DataflowErrorSer implements Display.
- logging/initialize.rs: empty error arrangement uses DataflowErrorSer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add proptest-based round-trip test verifying that serialize -> deserialize
-> serialize produces identical bytes, and a display equivalence test
confirming DataflowError and DataflowErrorSer produce the same Display
output.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

antiguru and others added 2 commits March 28, 2026 23:50
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Apply rustfmt-canonical import ordering.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@antiguru antiguru requested a review from DAlperin March 29, 2026 10:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant