compute: serialize DataflowError on dataflow edges#35776
Draft
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
Draft
compute: serialize DataflowError on dataflow edges#35776antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
antiguru wants to merge 7 commits intoMaterializeInc:mainfrom
Conversation
Introduce `DataflowErrorSer`, a newtype over `Vec<u8>` holding proto-encoded `ProtoDataflowError` bytes, for use on compute-internal dataflow edges. The canonical proto3+prost encoding (no map fields) ensures byte-equality implies semantic equality, enabling derived `Ord`/`Hash` on the raw bytes. The type includes `From<DataflowError>`, `From<EvalError>`, `Display`, `Debug`, and `Columnation` impls. Nothing uses it yet -- subsequent tasks will wire it in to replace `DataflowError` on internal edges. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add a generic type parameter E to persist_source, persist_source_core, decode_and_mfp, and PendingWork::do_work, bounded by ExchangeData + Ord + Clone + Debug + From<DataflowError> + From<EvalError>. This is a pure refactor — all existing callers continue to compile, inferring E = DataflowError. No behavior change. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… DataflowError
Replace DataflowError with DataflowErrorSer (proto-serialized bytes) on
all compute-internal dataflow edges. This avoids carrying the full enum
through the dataflow graph, reducing memory and comparison costs.
Changes:
- typedefs.rs: All Err{Spine,Batcher,Builder} and KeyErr/RowErr types
now use DataflowErrorSer.
- context.rs: CollectionBundle and ArrangementFlavor use DataflowErrorSer
in their error collection types.
- render.rs: persist_source call uses turbofish ::<_, DataflowErrorSer>,
Constant and LetRecLimitExceeded errors create DataflowErrorSer.
- All render modules (reduce, flat_map, top_k, join/delta_join,
join/linear_join, sinks): DataflowError::from(e) -> DataflowErrorSer::from(e).
- Sinks deserialize at persist boundaries: materialized_view batch writes
and continual_task appends call .deserialize() to recover DataflowError
for SourceData. copy_to_s3 deserializes before passing to
storage-operators which still expects DataflowError.
- subscribe sink works unchanged since DataflowErrorSer implements Display.
- logging/initialize.rs: empty error arrangement uses DataflowErrorSer.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add proptest-based round-trip test verifying that serialize -> deserialize -> serialize produces identical bytes, and a display equivalence test confirming DataflowError and DataflowErrorSer produce the same Display output. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone. PR title guidelines
Pre-merge checklist
|
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Apply rustfmt-canonical import ordering. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
DataflowErrorSer, a newtype over proto-encoded bytes, to replaceDataflowErroron compute-internal dataflow edges (streams, arrangements).persist_sourcegeneric over the error typeE, so compute callers can chooseDataflowErrorSerwhile storage callers stay onDataflowError.ErrSpine,RowErrSpine, etc.),CollectionBundle, sinks, and logging to useDataflowErrorSer.Motivation
DataflowErroris a complex enum with boxed variants (EvalErroralone has 50+ sub-variants). It requires a 700-line hand-craftedColumnationimpl. In compute rendering, errors are never inspected — they flow opaquely through concat, consolidate, arrange, and are only displayed (.to_string()) at sinks.Replacing it with opaque serialized bytes:
Columnationimpl in compute (delegates to trivialVec<u8>columnation)Rowto represent serialized errorsDesign
Serialization boundary: Errors are serialized at the
persist_sourceboundary when entering compute, and deserialized only at exit points (MV sink and continual task for persist writes; subscribe and copy-to-s3 viaDisplay).Canonicality invariant: The MV sink's self-correction logic computes
desired - persistand relies on equality for cancellation. Proto3 encoding via prost is deterministic for messages withoutmapfields (verified: none in the error proto chain), so byte-equality implies semantic equality.Persist boundary unchanged:
SourceDataalready serializes errors as proto bytes in Arrow columnar encoding. This change does not affect the persist format.Testing
ser(deser(ser(e))) == ser(e)for allDataflowErrorvariants.Displayequivalence betweenDataflowErrorandDataflowErrorSer.Future work
EvalError → proto bytes(avoids aBox::newallocation in theFrom<EvalError>path)SourceData::append_err_bytesto skip deserialize/reserialize at MV sinkColumnarimpl forDataflowErrorSer(enables newer spine infrastructure)🤖 Generated with Claude Code