From 225c249276cf4c362576efb71839e4f86e68fb79 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Fri, 29 May 2026 11:57:12 +0000 Subject: [PATCH 01/12] postgres-util: Propagate u16-conversion errors in ProtoPostgresKeyDesc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `PostgresKeyDesc::cols` is `Vec` but the proto carries it as `Vec`; the decode path did `c.try_into().expect("values roundtrip")`, which panicked on any value above 65535 — reachable from untrusted proto bytes (the fuzz target found this from many distinct inputs). Propagate the conversion error via `TryFromProtoError` instead. Also fixes two build errors found while bringing up new fuzz crates: - mysql-util fuzz target was importing via `mz_mysql_util::desc::*`, but `desc` is a private module; use the top-level re-exports. - pgwire's `Codec` was a fully-private struct, blocking the `fuzz_exports` re-export. Make it `pub(crate)` so the same-crate `pub use` in lib.rs works under `#[cfg(feature = "fuzz")]`. --- src/pgwire/src/codec.rs | 2 +- src/postgres-util/src/desc.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/pgwire/src/codec.rs b/src/pgwire/src/codec.rs index 8625476a0108c..797f4e6f11e6b 100644 --- a/src/pgwire/src/codec.rs +++ b/src/pgwire/src/codec.rs @@ -214,7 +214,7 @@ where } } -struct Codec { +pub struct Codec { decode_state: DecodeState, encode_state: Vec<(mz_pgrepr::Type, mz_pgwire_common::Format)>, /// When true, skip the aggregate buffer size check in `decode()`. diff --git a/src/postgres-util/src/desc.rs b/src/postgres-util/src/desc.rs index 2cad0f48cf3d6..d5432b4e083c6 100644 --- a/src/postgres-util/src/desc.rs +++ b/src/postgres-util/src/desc.rs @@ -257,14 +257,20 @@ impl RustType for PostgresKeyDesc { } fn from_proto(proto: ProtoPostgresKeyDesc) -> Result { + // `cols` is `Vec` on the Rust side but `Vec` on the wire; + // a u32 value above 65535 used to panic via `.expect`, which is + // reachable from untrusted proto bytes. + let cols = proto + .cols + .into_iter() + .map(|c| { + u16::try_from(c).map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string())) + }) + .collect::, _>>()?; Ok(PostgresKeyDesc { oid: proto.oid, name: proto.name, - cols: proto - .cols - .into_iter() - .map(|c| c.try_into().expect("values roundtrip")) - .collect(), + cols, is_primary: proto.is_primary, nulls_not_distinct: proto.nulls_not_distinct, }) From 5e135eda5ab79a436a7db61389d708eaa8b29792 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Fri, 29 May 2026 12:15:36 +0000 Subject: [PATCH 02/12] postgres-util: Propagate u16-conversion errors for ProtoPostgresColumnDesc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same shape as the `ProtoPostgresKeyDesc.cols` fix: `col_num` is `u16` in Rust but `u32` on the wire, and the decode path used `.expect("u16 must roundtrip")` — reachable from untrusted proto bytes (the fuzz target found this from many distinct inputs once the key-desc panic was out of the way). Also bumps `pgwire::codec::Codec` from `pub(crate)` to `pub` — needed for the `fuzz_exports` re-export to compile (`pub use` can't widen a `pub(crate)` item to a fully-public reexport). `mod codec` remains private, so the type is only reachable via the `fuzz` feature. --- src/postgres-util/src/desc.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/postgres-util/src/desc.rs b/src/postgres-util/src/desc.rs index d5432b4e083c6..11d5d9f894963 100644 --- a/src/postgres-util/src/desc.rs +++ b/src/postgres-util/src/desc.rs @@ -202,14 +202,16 @@ impl RustType for PostgresColumnDesc { } fn from_proto(proto: ProtoPostgresColumnDesc) -> Result { + let col_num_u32: u32 = proto + .col_num + .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?; + // `col_num` is `u16` on the Rust side; reject u32 values that don't + // fit instead of panicking — reachable from untrusted proto bytes. + let col_num = u16::try_from(col_num_u32) + .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?; Ok(PostgresColumnDesc { name: proto.name, - col_num: { - let v: u32 = proto - .col_num - .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?; - u16::try_from(v).expect("u16 must roundtrip") - }, + col_num, type_oid: proto.type_oid, type_mod: proto.type_mod, nullable: proto.nullable, From e32346941a457ca3f03e3ca510221069279bb8c6 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 1 Jun 2026 15:10:43 +0000 Subject: [PATCH 03/12] pgrepr: Avoid i16 overflow computing numeric scale in binary decode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Numeric`'s binary `FromSql` computed `(units - weight - 1) * 4` in i16, but `units`/`weight` are attacker-controlled i16 header fields from a client bind parameter. Extreme `weight` values overflow i16 — a panic under overflow checks (a client-triggerable crash) and a silent wraparound to a wrong scale otherwise. Compute the scale (and the `in_scale` comparison) in i32; the values are well within range. Found by the new value_decode_binary cargo-fuzz target. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/pgrepr/src/value/numeric.rs | 39 +++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/pgrepr/src/value/numeric.rs b/src/pgrepr/src/value/numeric.rs index 7b082cdc170e1..717bed8a282f5 100644 --- a/src/pgrepr/src/value/numeric.rs +++ b/src/pgrepr/src/value/numeric.rs @@ -205,20 +205,24 @@ impl<'a> FromSql<'a> for Numeric { _ => return Err("bad sign in numeric".into()), } - let mut scale = (units - weight - 1) * 4; + // Compute in i32: `units`/`weight` are attacker-controlled `i16`s from + // the wire, so `(units - weight - 1) * 4` overflows `i16` for crafted + // headers (panic under overflow checks, silent wraparound otherwise). + let in_scale = i32::from(in_scale); + let mut scale = (i32::from(units) - i32::from(weight) - 1) * 4; // Adjust scales if scale < 0 { // Multiply by 10^scale - cx.scaleb(&mut d, &AdtNumeric::from(-i32::from(scale))); + cx.scaleb(&mut d, &AdtNumeric::from(-scale)); scale = 0; } else if scale > in_scale { // Divide by 10^(difference in scale and in_scale) - cx.scaleb(&mut d, &AdtNumeric::from(-i32::from(scale - in_scale))); + cx.scaleb(&mut d, &AdtNumeric::from(-(scale - in_scale))); scale = in_scale; } - cx.scaleb(&mut d, &AdtNumeric::from(-i32::from(scale))); + cx.scaleb(&mut d, &AdtNumeric::from(-scale)); cx.reduce(&mut d); let mut cx = cx_datum(); @@ -299,3 +303,30 @@ fn test_to_from_sql_roundtrip() { let d_from_sql = Numeric::from_sql(&Type::NUMERIC, &out).unwrap(); assert_eq!(r.0, d_from_sql.0); } + +#[mz_ore::test] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux` +fn test_from_sql_extreme_weight_no_overflow() { + // `(units - weight - 1) * 4` is computed in i32 because `weight` is an + // attacker-controlled i16 from the wire: extreme values overflowed the old + // i16 arithmetic (panic under overflow checks, silent wraparound + // otherwise). `from_sql` must return a result, never panic. Regression for + // the value_decode_binary cargo-fuzz finding. + fn header(units: i16, weight: i16, sign: u16, in_scale: i16) -> Vec { + let mut b = Vec::new(); + b.extend_from_slice(&units.to_be_bytes()); + b.extend_from_slice(&weight.to_be_bytes()); + b.extend_from_slice(&sign.to_be_bytes()); + b.extend_from_slice(&in_scale.to_be_bytes()); + b + } + for (weight, in_scale) in [ + (i16::MAX, 0), + (i16::MIN, 0), + (-28174, -28271), // exact fuzz repro + (13606, -28272), + ] { + // units = 0 (no trailing digits); the header alone triggered the overflow. + let _ = Numeric::from_sql(&Type::NUMERIC, &header(0, weight, 0, in_scale)); + } +} From c31b36ab44459c950919796cfec1f21f236fe389 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 1 Jun 2026 15:22:30 +0000 Subject: [PATCH 04/12] persist-types: Reject ProtoArrayData child-count mismatch instead of panicking `from_proto_with_type` zipped the proto's `children` against the fields its declared data type expects using `zip_eq`, which panics when the counts disagree. `children` is reconstructed from (untrusted-on-disk) Parquet part bytes, so a corrupted or crafted part panicked the reader. Check the lengths and return a `TryFromProtoError` on mismatch. Found by an exploratory cargo-fuzz target over the `ProtoArrayData -> ArrayData` decode path. (That target is not committed: adversarial-but- build-valid layouts also panic inside arrow's own ArrayData build/align/ equal routines, which is an upstream concern, so the target can't yet run green in CI.) Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-types/src/arrow.rs | 41 +++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/persist-types/src/arrow.rs b/src/persist-types/src/arrow.rs index 0c969331af3b9..310408f35a485 100644 --- a/src/persist-types/src/arrow.rs +++ b/src/persist-types/src/arrow.rs @@ -170,12 +170,22 @@ fn from_proto_with_type( for b in buffers.into_iter().map(|b| b.into_rust()) { builder = builder.add_buffer(b?); } - for c in children - .into_iter() - .zip_eq(fields_for_type(&data_type)) - .map(|(c, field)| from_proto_with_type(c, Some(field.data_type()))) - { - builder = builder.add_child_data(c?); + // `children` is reconstructed from the wire, so its count may disagree with + // what `data_type` expects. Guard the length explicitly and reject a mismatch + // as a decode error (corrupted/crafted parts must not crash readers); the + // `zip_eq` below is then infallible. + let fields = fields_for_type(&data_type); + if children.len() != fields.len() { + return Err(TryFromProtoError::RowConversionError(format!( + "ProtoArrayData for {:?} has {} children but its data type expects {}", + data_type, + children.len(), + fields.len(), + ))); + } + for (c, field) in children.into_iter().zip_eq(fields) { + let c = from_proto_with_type(c, Some(field.data_type()))?; + builder = builder.add_child_data(c); } // Construct the builder which validates all inputs and aligns data. @@ -810,6 +820,25 @@ mod tests { use mz_proto::ProtoType; use std::sync::Arc; + #[mz_ore::test] + fn from_proto_child_count_mismatch_is_error() { + // A `ProtoArrayData` whose child count disagrees with its declared data + // type must decode to an error, not panic via `zip_eq`. Regression for + // the array_data_proto_roundtrip cargo-fuzz finding. + use prost::Message; + let bytes: &[u8] = &[ + 0x0a, 0x0a, 0x18, 0x32, 0x22, 0x00, 0x18, 0x0a, 0x18, 0x32, 0x22, 0x00, 0x2a, 0x00, + 0x2a, 0x00, 0xe0, 0x32, 0x24, + ]; + let proto = + crate::arrow::ProtoArrayData::decode(bytes).expect("crash input decodes as a proto"); + let result: Result = proto.into_rust(); + assert!( + result.is_err(), + "child-count mismatch must be a decode error" + ); + } + #[mz_ore::test] fn trim_proto() { let nested_fields: Fields = vec![Field::new("a", DataType::UInt64, true)].into(); From fd45df4e4f5658b7a9d57ddf90149f336867e389 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 1 Jun 2026 23:53:49 +0000 Subject: [PATCH 05/12] persist-client: Reject invalid StateDiff field diffs instead of debug-asserting `StateDiff`'s proto decode `debug_assert_eq!(field_diffs.validate(), Ok(()))` on field diffs reconstructed from an untrusted consensus blob. Under debug assertions (and fuzzing) a corrupt/crafted diff panics; in release the assert is compiled out and the unvalidated diffs flow through. Validate and return a `TryFromProtoError::InvalidPersistState` on failure so the behaviour is consistent and a bad blob is a decode error, not a panic. Found by the new state_diff_proto_roundtrip cargo-fuzz target. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 8 ++++++- src/persist-client/src/internal/state_diff.rs | 21 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index b4599bd02e6ce..b4694519f43cf 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -512,7 +512,13 @@ impl RustType for StateDiff { proto.latest_rollup_key.into_rust()?, ); if let Some(field_diffs) = proto.field_diffs { - debug_assert_eq!(field_diffs.validate(), Ok(())); + // `field_diffs` is decoded from an untrusted blob, so validate it and + // return a decode error on a malformed/crafted diff rather than a + // `debug_assert` (which panics under debug assertions / fuzzing and + // is compiled out in release, where the bad data flowed through). + field_diffs + .validate() + .map_err(TryFromProtoError::InvalidPersistState)?; for field_diff in field_diffs.iter() { let (field, diff) = field_diff?; match field { diff --git a/src/persist-client/src/internal/state_diff.rs b/src/persist-client/src/internal/state_diff.rs index 1be97db11bc5e..3a4a67c684e1e 100644 --- a/src/persist-client/src/internal/state_diff.rs +++ b/src/persist-client/src/internal/state_diff.rs @@ -1360,6 +1360,27 @@ mod tests { use super::*; + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function on OS `linux` + fn proto_state_diff_invalid_field_diffs_is_error() { + // A `ProtoStateDiff` decoded from an untrusted blob whose field diffs + // fail `validate()` must convert to an error, not panic. We previously + // `debug_assert`ed validity, which panicked under debug assertions / + // fuzzing. Regression for the state_diff_proto_roundtrip cargo-fuzz + // finding. + use crate::internal::state::ProtoStateDiff; + use mz_proto::ProtoType; + use prost::Message; + + let bytes: &[u8] = &[0x2a, 0x04, 0x08, 0x00, 0x68, 0x00, 0x40, 0x48]; + let proto = ProtoStateDiff::decode(bytes).expect("crash input decodes as a proto"); + let result: Result, _> = proto.into_rust(); + assert!( + result.is_err(), + "invalid field diffs must be a decode error" + ); + } + /// Model a situation where a "leader" is constantly making changes to its state, and a "follower" /// is applying those changes as diffs. #[mz_ore::test] From e679877c497ba991ae596be448cfc3c6628616a9 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 2 Jun 2026 12:45:51 +0000 Subject: [PATCH 06/12] persist-client: Reject inline batch parts with hollow-only fields instead of asserting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `BatchPart::from_proto`'s inline branch asserted (`assert_eq!`/`assert_none!`, not `debug_assert` — so it panicked even in release) that the hollow-only fields (`encoded_size_bytes`, `key_lower`, `key_stats`, `diffs_sum`) were unset. These are decoded from an untrusted blob (read on every rollup/state load), so a crafted/corrupted inline `ProtoHollowBatchPart` panicked the reader. Validate and return a `TryFromProtoError` instead. Found by the rollup_proto_roundtrip cargo-fuzz target. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 41 ++++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index b4694519f43cf..80a3e495c4ad8 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -19,7 +19,7 @@ use bytes::{Buf, Bytes}; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; use mz_ore::cast::CastInto; -use mz_ore::{assert_none, halt, soft_panic_or_log}; +use mz_ore::{halt, soft_panic_or_log}; use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates}; use mz_persist::location::{SeqNo, VersionedData}; use mz_persist::metrics::ColumnarMetrics; @@ -1663,10 +1663,21 @@ impl RustType for BatchPart { })) } Some(proto_hollow_batch_part::Kind::Inline(x)) => { - assert_eq!(proto.encoded_size_bytes, 0); - assert_eq!(proto.key_lower.len(), 0); - assert_none!(proto.key_stats); - assert_none!(proto.diffs_sum); + // An inline part carries its data in `kind`; the hollow-only + // fields must be unset. These are decoded from an untrusted + // blob, so validate and return a decode error rather than + // asserting (which panicked, even in release, on a + // malformed/crafted part). Found by the rollup_proto_roundtrip + // cargo-fuzz target. + if proto.encoded_size_bytes != 0 + || !proto.key_lower.is_empty() + || proto.key_stats.is_some() + || proto.diffs_sum.is_some() + { + return Err(TryFromProtoError::InvalidPersistState( + "inline ProtoHollowBatchPart has hollow-part fields set".into(), + )); + } let updates = LazyInlineBatchPart(x.into_rust()?); Ok(BatchPart::Inline { updates, @@ -1933,6 +1944,8 @@ impl RustType for Antichain { #[cfg(test)] mod tests { + use mz_ore::assert_none; + use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; @@ -1949,6 +1962,24 @@ mod tests { use super::*; + #[mz_ore::test] + fn rollup_inline_batch_part_with_hollow_fields_is_error() { + // An inline `ProtoHollowBatchPart` carrying hollow-only fields (here a + // non-zero encoded_size_bytes) must decode to an error, not panic — it + // used to `assert!`, which fired even in release on a crafted/corrupted + // blob. Regression for the rollup_proto_roundtrip cargo-fuzz finding. + use mz_proto::ProtoType; + use prost::Message; + let bytes: &[u8] = &[ + 0x3a, 0x12, 0x0a, 0x00, 0x12, 0x0a, 0x22, 0x06, 0x5a, 0x00, 0x10, 0x02, 0x2a, 0x00, + 0x22, 0x00, 0x22, 0x00, 0x22, 0x00, + ]; + let proto = crate::internal::state::ProtoRollup::decode(bytes) + .expect("crash input decodes as a proto"); + let result: Result, _> = proto.into_rust(); + assert_err!(result); + } + #[mz_ore::test] fn metadata_map() { const COUNT: MetadataKey = MetadataKey::new("count"); From b9b5ee68debf43fec5abca98278d3eb4d9662776 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Tue, 2 Jun 2026 13:39:17 +0000 Subject: [PATCH 07/12] persist-client: Reject batch descriptions with an empty lower frontier `Description::from_proto` passed the decoded `lower` antichain straight to `Description::new`, which asserts a non-empty lower frontier. `lower` comes from an untrusted blob, so a crafted/corrupted `ProtoHollowBatch` with an empty lower frontier panicked on rollup/state load. Validate it and return a `TryFromProtoError` instead. Found by the rollup_proto_roundtrip cargo-fuzz target (a second, distinct rollup decode panic after the inline-batch-part fix). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 29 ++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 80a3e495c4ad8..269ebf7b51547 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1913,8 +1913,17 @@ impl RustType for Description { } fn from_proto(proto: ProtoU64Description) -> Result { + let lower: Antichain = proto.lower.into_rust_if_some("lower")?; + // `Description::new` asserts a non-empty lower frontier. `lower` is + // decoded from an untrusted blob, so validate it here and return a + // decode error rather than panicking on a crafted/corrupted batch. + if lower.elements().is_empty() { + return Err(TryFromProtoError::InvalidPersistState( + "ProtoU64Description has an empty lower frontier".into(), + )); + } Ok(Description::new( - proto.lower.into_rust_if_some("lower")?, + lower, proto.upper.into_rust_if_some("upper")?, proto.since.into_rust_if_some("since")?, )) @@ -1980,6 +1989,24 @@ mod tests { assert_err!(result); } + #[mz_ore::test] + fn rollup_batch_with_empty_lower_frontier_is_error() { + // A `ProtoU64Description` with an empty lower frontier must decode to an + // error: `Description::new` asserts a non-empty lower, so a crafted batch + // used to panic. Regression for the rollup_proto_roundtrip cargo-fuzz + // finding. + use mz_proto::ProtoType; + use prost::Message; + let bytes: &[u8] = &[ + 0x3a, 0x12, 0x0a, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x0a, 0x00, 0x12, 0x00, 0x1a, 0x00, + 0x12, 0x00, 0x32, 0x00, 0x22, 0x00, + ]; + let proto = crate::internal::state::ProtoRollup::decode(bytes) + .expect("crash input decodes as a proto"); + let result: Result, _> = proto.into_rust(); + assert_err!(result); + } + #[mz_ore::test] fn metadata_map() { const COUNT: MetadataKey = MetadataKey::new("count"); From 73858ec832efb815f42417f5ee68ab6407e62b18 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Wed, 3 Jun 2026 00:20:48 +0000 Subject: [PATCH 08/12] persist: Guard ID UUID parsing against the uuid crate panicking on malformed input MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `parse_id` (persist-client) and `ShardId::from_str` (persist-types) hand the post-prefix string straight to `Uuid::parse_str`, which panics — rather than returning an error — while building its parse error for some malformed inputs: non-ASCII bytes make it slice the input on a non-char boundary. A crafted `ProtoRollup` reaches these via the ID `RustType` decode, so the panic is reachable from untrusted durable state. Reject anything that can't be a UUID (fewer than 32 chars, or non-ASCII) before calling the crate, matching the guard in `mz_repr::strconv::parse_uuid`. Regression for the rollup_proto_roundtrip cargo-fuzz finding. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 23 +++++++++++++++++++++ src/persist-types/src/codec_impls.rs | 9 ++++---- src/persist-types/src/lib.rs | 7 +++++++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 269ebf7b51547..aaa8d6dd534d2 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -295,6 +295,13 @@ pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result< Some(x) => x, None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)), }; + // `Uuid::parse_str` panics (rather than erroring) while building its parse + // error for some malformed inputs — e.g. non-ASCII bytes make it slice the + // input on a non-char boundary. Reject anything that can't be a UUID first: + // the shortest valid form is 32 hex digits, all ASCII. + if uuid_encoded.len() < 32 || !uuid_encoded.is_ascii() { + return Err(format!("invalid {} {}: malformed UUID", id_type, encoded)); + } let uuid = Uuid::parse_str(uuid_encoded) .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?; Ok(*uuid.as_bytes()) @@ -1971,6 +1978,22 @@ mod tests { use super::*; + #[mz_ore::test] + fn parse_id_rejects_malformed_uuid_without_panicking() { + // `Uuid::parse_str` panics (rather than erroring) while building its + // parse error for some malformed inputs — e.g. non-ASCII bytes make it + // slice the input on a non-char boundary. `parse_id` must turn these + // into clean errors. Regression for the rollup_proto_roundtrip finding. + let nonascii = "é".repeat(40); + let weird = format!("w{nonascii}"); + for encoded in ["w", "wnotauuid", "w{}", weird.as_str()] { + assert!( + parse_id("w", "WriterId", encoded).is_err(), + "{encoded:?} should error, not panic", + ); + } + } + #[mz_ore::test] fn rollup_inline_batch_part_with_hollow_fields_is_error() { // An inline `ProtoHollowBatchPart` carrying hollow-only fields (here a diff --git a/src/persist-types/src/codec_impls.rs b/src/persist-types/src/codec_impls.rs index 06bb3d7502b80..e3ee72d20b063 100644 --- a/src/persist-types/src/codec_impls.rs +++ b/src/persist-types/src/codec_impls.rs @@ -618,13 +618,14 @@ mod tests { .to_string() ) ); + // Too short to be a UUID: rejected before the `uuid` crate (which + // panics building an error for some such inputs), so the message is + // generic rather than the crate's "invalid length". assert_eq!( ShardId::from_str("s0"), - Err( - "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1" - .to_string() - ) + Err("invalid ShardId s0: malformed UUID".to_string()) ); + // Long enough and ASCII, so the `uuid` crate's own error survives. assert_eq!( ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"), Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string()) diff --git a/src/persist-types/src/lib.rs b/src/persist-types/src/lib.rs index 20d0f226fa27d..6b1aa21d87d3a 100644 --- a/src/persist-types/src/lib.rs +++ b/src/persist-types/src/lib.rs @@ -238,6 +238,13 @@ impl std::str::FromStr for ShardId { Some(x) => x, None => return Err(format!("invalid ShardId {}: incorrect prefix", s)), }; + // `Uuid::parse_str` panics (rather than erroring) while building its + // parse error for some malformed inputs — e.g. non-ASCII bytes make it + // slice the input on a non-char boundary. Reject anything that can't be + // a UUID first: the shortest valid form is 32 hex digits, all ASCII. + if uuid_encoded.len() < 32 || !uuid_encoded.is_ascii() { + return Err(format!("invalid ShardId {}: malformed UUID", s)); + } let uuid = Uuid::parse_str(uuid_encoded) .map_err(|err| format!("invalid ShardId {}: {}", s, err))?; Ok(ShardId(*uuid.as_bytes())) From 11eb9ae225015020c09541e226658b50bc0e48b6 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Thu, 4 Jun 2026 14:09:51 +0000 Subject: [PATCH 09/12] persist: Reject rollup protos with no rollups instead of panicking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every persisted state maintains the invariant that it has at least one rollup (State::latest_rollup .expect()s it). ProtoRollup::into_rust called latest_rollup while decoding, so an untrusted/corrupt rollup proto whose state carries no rollups panicked instead of being rejected — a rollup_proto_roundtrip fuzz crash (and a decode-time availability risk for a corrupt blob). Validate the invariant in from_proto and return TryFromProtoError:: InvalidPersistState when it is violated, before latest_rollup is reached (and before the decoded Rollup is later re-encoded). Adds a regression test. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 43 +++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index aaa8d6dd534d2..9fa76b3dddd2c 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1102,6 +1102,16 @@ impl RustType for Rollup { collections, }; + // Every persisted state maintains the invariant that it has at least one + // rollup (see `State::latest_rollup`, which `.expect()`s this). A proto + // that violates it is malformed, so reject it here rather than panicking + // on `latest_rollup` below (or later, when this `Rollup` is re-encoded). + if state.collections.rollups.is_empty() { + return Err(TryFromProtoError::InvalidPersistState( + "rollup state has no rollups".into(), + )); + } + let diffs: Option = x.diffs.map(|diffs| diffs.into_rust()).transpose()?; if let Some(diffs) = &diffs { if diffs.lower != state.latest_rollup().0.next() { @@ -2255,6 +2265,39 @@ mod tests { ); } + /// A rollup proto whose state has no rollups violates the "every state has at + /// least one rollup" invariant. Decoding it must return an error rather than + /// panicking in `latest_rollup` (or later, on re-encode). Regression for a + /// rollup_proto_roundtrip fuzz crash. + #[mz_ore::test] + fn rollup_proto_without_rollups_is_rejected() { + let shard_id = ShardId::new(); + let mut state = TypedState::<(), (), u64, i64>::new( + DUMMY_BUILD_INFO.semver_version(), + shard_id, + "host".to_owned(), + 0, + ); + state.state.collections.rollups.insert( + SeqNo(1), + HollowRollup { + key: PartialRollupKey("foo".to_owned()), + encoded_size_bytes: None, + }, + ); + let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto(); + + // Strip every rollup, leaving a state that claims a seqno but no rollups. + proto.rollups.clear(); + proto.deprecated_rollups.clear(); + + let result: Result, _> = proto.into_rust(); + assert!( + result.is_err(), + "decoding a rollup proto with no rollups must error, not panic" + ); + } + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) { From 8761e9c2e49ca5475ac56b4344f9aa5b15682765 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Fri, 5 Jun 2026 09:38:21 +0000 Subject: [PATCH 10/12] persist: Only reject a rollup-less state that carries diffs My earlier guard rejected any rollup proto whose state had no rollups, but that was too broad: it broke applier_version_state, which decodes a freshly initialized state that legitimately has no rollups yet. latest_rollup -- the actual panic site -- is only reached when the proto carries diffs (to validate the diff bounds against it), so move the check there. A rollup-less state without diffs decodes again, while a proto that has diffs but no rollups is still rejected instead of panicking. The regression test is updated to build the diffs-but-no-rollups case it now guards. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 41 ++++++++++++--------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 9fa76b3dddd2c..52aa0786838e9 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1102,18 +1102,18 @@ impl RustType for Rollup { collections, }; - // Every persisted state maintains the invariant that it has at least one - // rollup (see `State::latest_rollup`, which `.expect()`s this). A proto - // that violates it is malformed, so reject it here rather than panicking - // on `latest_rollup` below (or later, when this `Rollup` is re-encoded). - if state.collections.rollups.is_empty() { - return Err(TryFromProtoError::InvalidPersistState( - "rollup state has no rollups".into(), - )); - } - let diffs: Option = x.diffs.map(|diffs| diffs.into_rust()).transpose()?; if let Some(diffs) = &diffs { + // The diff bounds are validated against the latest rollup, which + // `State::latest_rollup` `.expect()`s to exist. A proto that carries + // diffs but no rollups is malformed, so reject it here rather than + // panicking. (A rollup-less state with no diffs — e.g. a freshly + // initialized state — is fine and must still decode.) + if state.collections.rollups.is_empty() { + return Err(TryFromProtoError::InvalidPersistState( + "rollup state has diffs but no rollups".into(), + )); + } if diffs.lower != state.latest_rollup().0.next() { return Err(TryFromProtoError::InvalidPersistState(format!( "diffs lower ({}) should match latest rollup's successor: ({})", @@ -2265,12 +2265,14 @@ mod tests { ); } - /// A rollup proto whose state has no rollups violates the "every state has at - /// least one rollup" invariant. Decoding it must return an error rather than - /// panicking in `latest_rollup` (or later, on re-encode). Regression for a + /// A rollup proto that carries diffs but whose state has no rollups is + /// malformed: the diff bounds are validated against the latest rollup, which + /// `latest_rollup` `.expect()`s to exist. Decoding it must return an error + /// rather than panicking. (A rollup-less state *without* diffs is legitimate + /// — see `applier_version_state` — and must still decode.) Regression for a /// rollup_proto_roundtrip fuzz crash. #[mz_ore::test] - fn rollup_proto_without_rollups_is_rejected() { + fn rollup_proto_with_diffs_but_no_rollups_is_rejected() { let shard_id = ShardId::new(); let mut state = TypedState::<(), (), u64, i64>::new( DUMMY_BUILD_INFO.semver_version(), @@ -2278,23 +2280,26 @@ mod tests { "host".to_owned(), 0, ); + // Anchor a rollup at the state's seqno so `Rollup::from` accepts the + // (empty) diff range, producing a proto that does carry diffs. + let seqno = state.state.seqno; state.state.collections.rollups.insert( - SeqNo(1), + seqno, HollowRollup { key: PartialRollupKey("foo".to_owned()), encoded_size_bytes: None, }, ); - let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto(); + let mut proto = Rollup::from(state.into(), Vec::new()).into_proto(); - // Strip every rollup, leaving a state that claims a seqno but no rollups. + // Strip every rollup, leaving a proto that has diffs but no rollups. proto.rollups.clear(); proto.deprecated_rollups.clear(); let result: Result, _> = proto.into_rust(); assert!( result.is_err(), - "decoding a rollup proto with no rollups must error, not panic" + "a rollup proto with diffs but no rollups must error, not panic" ); } From fe66f7b61d7c7500951b1e7ceb4060763a031ed7 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Wed, 10 Jun 2026 15:22:12 +0000 Subject: [PATCH 11/12] persist: Reject corrupted traces in unflatten instead of panicking A rollup's trace is decoded from an untrusted blob, but Trace::unflatten reconstructed the spine through code whose invariants are enforced by panics, so a corrupted or crafted blob could kill the decoder instead of failing with a decode error. The rollup_proto_roundtrip fuzz target hit the first of these in release qualification: legacy batches that don't tile the timeline contiguously trip Spine::insert's (always-on) contiguity assert. Probing the neighboring reconstruction code found the rest of the class, all reachable from crafted bytes (cargo-fuzz builds enable debug assertions and overflow checks, so the debug_asserts and arithmetic count too): * Spine::insert also asserts a non-empty time range per legacy batch. * An absurd batch len overflows the spine's maintenance arithmetic (len.next_power_of_two(), summing lens of merged batches). * An absurd spine batch level overflows `level + 1` and sizes a vec![] allocation; a level past the first batch's indexes out of bounds. * More descs than parts underflows the padding subtraction. * A spine batch whose parts don't tile its id range trips SpineBatch::id's debug_asserts. * MergeState::push_batch asserts id/desc chaining within a level, expects at most two batches per level, and that no incomplete merge is present. * Anything else invalid (e.g. a batch since past the trace since) only failed Trace::validate, which ran under debug_assert: a panic in fuzz/dev builds, silent acceptance of corrupted state in release. Validate all of this in unflatten (plus a fallible try_push_batch that push_batch now wraps) and run Trace::validate unconditionally at the end, returning decode errors. For real blobs nothing changes: every state a correct writer flattens satisfies these checks (the flatten/unflatten roundtrip proptest exercises both the structured and legacy paths), and prod treats a decode error the same as the old panic -- UntypedState::decode expects rollups to decode and State::apply_diffs panics on apply errors -- just with a diagnosable message instead of an assert deep in spine internals. Adds decode-rejection regression tests for each vector alongside the existing rollup proto tests, named after what the crafted proto violates. Co-Authored-By: Claude Fable 5 --- src/persist-client/src/internal/encoding.rs | 162 ++++++++++++ src/persist-client/src/internal/trace.rs | 268 +++++++++++++------- 2 files changed, 345 insertions(+), 85 deletions(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 52aa0786838e9..79bc1d7006cf4 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1976,6 +1976,7 @@ mod tests { use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::assert_err; + use mz_ore::cast::CastFrom; use mz_persist::location::SeqNo; use proptest::prelude::*; @@ -2303,6 +2304,167 @@ mod tests { ); } + /// Returns a valid rollup proto whose trace was mutated by `update_trace`. + fn rollup_proto_with_trace(update_trace: impl FnOnce(&mut ProtoTrace)) -> ProtoRollup { + let state = TypedState::<(), (), u64, i64>::new( + DUMMY_BUILD_INFO.semver_version(), + ShardId::new(), + "host".to_owned(), + 0, + ); + let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto(); + update_trace(proto.trace.as_mut().expect("fresh state has a trace")); + proto + } + + fn u64_desc_proto(lower: u64, upper: u64, since: u64) -> ProtoU64Description { + Description::new( + Antichain::from_elem(lower), + Antichain::from_elem(upper), + Antichain::from_elem(since), + ) + .into_proto() + } + + fn legacy_batch_proto(lower: u64, upper: u64, since: u64) -> ProtoHollowBatch { + ProtoHollowBatch { + desc: Some(u64_desc_proto(lower, upper, since)), + ..Default::default() + } + } + + fn rollup_decode_err(proto: ProtoRollup) -> String { + let result: Result, _> = proto.into_rust(); + match result { + Ok(_) => panic!("crafted rollup proto must fail to decode"), + Err(err) => err.to_string(), + } + } + + /// The trace in a rollup proto is decoded from an untrusted blob, and + /// `Trace::unflatten` re-inserts structureless legacy batches into a + /// spine whose (always-on) asserts require them to tile the timeline + /// contiguously from the minimum frontier. A batch that doesn't must fail + /// decoding instead. Regression for a rollup_proto_roundtrip fuzz crash + /// (crash-8603829ee2a3b9c28ee988a14136050d1afe984c). + #[mz_ore::test] + fn rollup_proto_with_noncontiguous_legacy_batches_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(legacy_batch_proto(39, 40, 0)); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("legacy batch lower"), "{err}"); + } + + /// Like [rollup_proto_with_noncontiguous_legacy_batches_is_rejected], but + /// for `Spine::insert`'s other assert: a legacy batch with an empty time + /// range. + #[mz_ore::test] + fn rollup_proto_with_empty_range_legacy_batch_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(legacy_batch_proto(0, 0, 0)); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("empty time range"), "{err}"); + } + + /// A batch whose since is past the trace's since reconstructs without + /// tripping any spine assert but violates `Trace::validate`, which used + /// to run only under `debug_assert` (a panic in cargo-fuzz builds, silent + /// acceptance of corrupted state in release builds). It must be a decode + /// error. + #[mz_ore::test] + fn rollup_proto_with_batch_since_past_trace_since_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(legacy_batch_proto(0, 1, 5)); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("past the spine since"), "{err}"); + } + + /// An absurd batch len overflows the spine's maintenance arithmetic + /// (`len.next_power_of_two()`, summing lens of merged batches), which + /// panics in builds with overflow checks. It must be a decode error. + #[mz_ore::test] + fn rollup_proto_with_absurd_batch_len_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(ProtoHollowBatch { + desc: Some(u64_desc_proto(0, 1, 0)), + len: u64::MAX, + ..Default::default() + }); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("maximum trace size"), "{err}"); + } + + /// An absurd spine batch level previously overflowed `level + 1` (a panic + /// in builds with overflow checks) and sized a `vec![]` allocation (an + /// abort). It must be a decode error. + #[mz_ore::test] + fn rollup_proto_with_absurd_spine_level_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(SpineId(0, 1).into_proto()), + batch: Some(ProtoSpineBatch { + level: u64::MAX, + desc: Some(u64_desc_proto(0, 1, 0)), + parts: vec![], + descs: vec![], + }), + }); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("exceeds the maximum"), "{err}"); + } + + /// A spine batch whose parts don't tile its id range (here: no parts at + /// all) previously tripped the `debug_assert`s in `SpineBatch::id`. It + /// must be a decode error. + #[mz_ore::test] + fn rollup_proto_with_partless_spine_batch_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(SpineId(0, 1).into_proto()), + batch: Some(ProtoSpineBatch { + level: 0, + desc: Some(u64_desc_proto(0, 1, 0)), + parts: vec![], + descs: vec![], + }), + }); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("do not cover"), "{err}"); + } + + /// Three spine batches at one level overflow the two-batch layer, which + /// `MergeState::push_batch` previously `expect`ed against. It must be a + /// decode error. + #[mz_ore::test] + fn rollup_proto_with_overfull_spine_level_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + for i in 0..3u64 { + let id = SpineId(usize::cast_from(i), usize::cast_from(i + 1)); + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(id.into_proto()), + batch: Some(ProtoSpineBatch { + level: 0, + desc: Some(u64_desc_proto(i, i + 1, 0)), + parts: vec![id.into_proto()], + descs: vec![], + }), + }); + trace.hollow_batches.push(ProtoIdHollowBatch { + id: Some(id.into_proto()), + batch: Some(legacy_batch_proto(i, i + 1, 0)), + }); + } + }); + let err = rollup_decode_err(proto); + assert!(err.contains("full layer"), "{err}"); + } + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) { diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 6dc55064fbe69..90501fe6cbe1e 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -287,6 +287,25 @@ impl Trace { // we know to preserve the structure for this trace. let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty(); + // The flattened trace is decoded from an untrusted blob: any invariant + // a crafted or corrupted value can violate must surface as a decode + // error here, never as a panic in the spine code below. + // + // Bound the total logical len of all batches. The spine's maintenance + // arithmetic (`len.next_power_of_two()`, summing lens of merged + // batches) overflows on absurd lens, and no real trace has anywhere + // near this many updates. + const MAX_TOTAL_LEN: usize = usize::MAX >> 3; + let mut total_len = 0usize; + for batch in legacy_batches.keys().chain(hollow_batches.values()) { + total_len = total_len + .checked_add(batch.len) + .filter(|len| *len <= MAX_TOTAL_LEN) + .ok_or_else(|| { + format!("total len of batches exceeds the maximum trace size: {batch:?}") + })?; + } + // We need to look up legacy batches somehow, but we don't have a spine id for them. // Instead, we rely on the fact that the spine must store them in antichain order. // Our timestamp type may not be totally ordered, so we need to implement our own comparator @@ -304,101 +323,123 @@ impl Trace { let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect(); legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse()); - let mut pop_batch = - |id: SpineId, expected_desc: Option<&Description>| -> Result<_, String> { - if let Some(batch) = hollow_batches.remove(&id) { - if let Some(desc) = expected_desc { - // We don't expect the desc's upper and lower to change for a given spine id. - assert_eq!(desc.lower(), batch.desc.lower()); - assert_eq!(desc.upper(), batch.desc.upper()); - // Due to the way thin spine batches are diffed, the sinces can be out of sync. - // This should be rare, and hopefully impossible once we change how diffs work. - if desc.since() != batch.desc.since() { - warn!( - "unexpected since out of sync for spine batch: {:?} != {:?}", - desc.since().elements(), - batch.desc.since().elements() - ); - } + let mut pop_batch = |id: SpineId, + expected_desc: Option<&Description>| + -> Result<_, String> { + if let Some(batch) = hollow_batches.remove(&id) { + if let Some(desc) = expected_desc { + // We don't expect the desc's upper and lower to change for a given spine id. + if desc.lower() != batch.desc.lower() || desc.upper() != batch.desc.upper() { + return Err(format!( + "hollow batch desc {:?} did not match the spine batch desc {:?} for {id:?}", + batch.desc, desc + )); + } + // Due to the way thin spine batches are diffed, the sinces can be out of sync. + // This should be rare, and hopefully impossible once we change how diffs work. + if desc.since() != batch.desc.since() { + warn!( + "unexpected since out of sync for spine batch: {:?} != {:?}", + desc.since().elements(), + batch.desc.since().elements() + ); } - return Ok(IdHollowBatch { id, batch }); } - let mut batch = legacy_batches - .pop() - .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?; + return Ok(IdHollowBatch { id, batch }); + } + let mut batch = legacy_batches + .pop() + .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?; - let Some(expected_desc) = expected_desc else { - return Ok(IdHollowBatch { id, batch }); - }; + let Some(expected_desc) = expected_desc else { + return Ok(IdHollowBatch { id, batch }); + }; - if expected_desc.lower() != batch.desc.lower() { - return Err(format!( - "hollow batch lower {:?} did not match expected lower {:?}", - batch.desc.lower().elements(), - expected_desc.lower().elements() - )); - } + if expected_desc.lower() != batch.desc.lower() { + return Err(format!( + "hollow batch lower {:?} did not match expected lower {:?}", + batch.desc.lower().elements(), + expected_desc.lower().elements() + )); + } - // Empty legacy batches are not deterministic: different nodes may split them up - // in different ways. For now, we rearrange them such to match the spine data. - if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 { - let mut new_upper = batch.desc.upper().clone(); - - // While our current batch is too small, and there's another empty batch - // in the list, roll it in. - while PartialOrder::less_than(&new_upper, expected_desc.upper()) { - let Some(next_batch) = legacy_batches.pop() else { - break; - }; - if next_batch.is_empty() { - new_upper.clone_from(next_batch.desc.upper()); - } else { - legacy_batches.push(next_batch); - break; - } - } + // Empty legacy batches are not deterministic: different nodes may split them up + // in different ways. For now, we rearrange them such to match the spine data. + if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 { + let mut new_upper = batch.desc.upper().clone(); - // If our current batch is too large, split it by the expected upper - // and preserve the remainder. - if PartialOrder::less_than(expected_desc.upper(), &new_upper) { - legacy_batches.push(Arc::new(HollowBatch::empty(Description::new( - expected_desc.upper().clone(), - new_upper.clone(), - batch.desc.since().clone(), - )))); - new_upper.clone_from(expected_desc.upper()); + // While our current batch is too small, and there's another empty batch + // in the list, roll it in. + while PartialOrder::less_than(&new_upper, expected_desc.upper()) { + let Some(next_batch) = legacy_batches.pop() else { + break; + }; + if next_batch.is_empty() { + new_upper.clone_from(next_batch.desc.upper()); + } else { + legacy_batches.push(next_batch); + break; } - batch = Arc::new(HollowBatch::empty(Description::new( - batch.desc.lower().clone(), - new_upper, - batch.desc.since().clone(), - ))) } - if expected_desc.upper() != batch.desc.upper() { - return Err(format!( - "hollow batch upper {:?} did not match expected upper {:?}", - batch.desc.upper().elements(), - expected_desc.upper().elements() - )); + // If our current batch is too large, split it by the expected upper + // and preserve the remainder. + if PartialOrder::less_than(expected_desc.upper(), &new_upper) { + legacy_batches.push(Arc::new(HollowBatch::empty(Description::new( + expected_desc.upper().clone(), + new_upper.clone(), + batch.desc.since().clone(), + )))); + new_upper.clone_from(expected_desc.upper()); } + batch = Arc::new(HollowBatch::empty(Description::new( + batch.desc.lower().clone(), + new_upper, + batch.desc.since().clone(), + ))) + } - Ok(IdHollowBatch { id, batch }) - }; + if expected_desc.upper() != batch.desc.upper() { + return Err(format!( + "hollow batch upper {:?} did not match expected upper {:?}", + batch.desc.upper().elements(), + expected_desc.upper().elements() + )); + } + + Ok(IdHollowBatch { id, batch }) + }; let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() { (batch.desc.upper().clone(), id.1) } else { (Antichain::from_elem(T::minimum()), 0) }; + // Real spine levels are logarithmic in the total len of the trace, so + // this bound is far above any legitimate level while keeping the + // allocation below trivial. + const MAX_LEVELS: usize = 256; let levels = spine_batches .first_key_value() - .map(|(_, batch)| batch.level + 1) + .map(|(_, batch)| batch.level.saturating_add(1)) .unwrap_or(0); + if levels > MAX_LEVELS { + return Err(format!( + "spine level {} exceeds the maximum {MAX_LEVELS}", + levels - 1 + )); + } let mut merging = vec![MergeState::default(); levels]; for (id, batch) in spine_batches { let level = batch.level; + if batch.descs.len() > batch.parts.len() { + return Err(format!( + "spine batch {id:?} has more descs ({}) than parts ({})", + batch.descs.len(), + batch.parts.len() + )); + } let descs = batch.descs.iter().map(Some).chain(std::iter::repeat_n( None, batch.parts.len() - batch.descs.len(), @@ -409,6 +450,16 @@ impl Trace { .zip_eq(descs) .map(|(id, desc)| pop_batch(id, desc)) .collect::, _>>()?; + // A spine batch's parts tile its id range (`SpineBatch::id` + // `debug_assert`s this). Real batches always have at least one + // part: an empty batch still carries an empty hollow batch. + if parts.first().map(|x| x.id.0) != Some(id.0) + || parts.last().map(|x| x.id.1) != Some(id.1) + { + return Err(format!( + "spine batch {id:?} parts do not cover the batch's id range" + )); + } let len = parts.iter().map(|p| (*p).batch.len).sum(); let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction); let batch = SpineBatch { @@ -419,9 +470,11 @@ impl Trace { len, }; - let state = &mut merging[level]; + let state = merging.get_mut(level).ok_or_else(|| { + format!("spine batch {id:?} level {level} out of bounds ({levels} levels)") + })?; - state.push_batch(batch); + state.try_push_batch(batch)?; if let Some(id) = state.id() { if let Some(merge) = merges.remove(&id) { state.merge = Some(IdFuelingMerge { @@ -459,13 +512,36 @@ impl Trace { } else { // If the structure wasn't actually serialized, we may have legacy batches left over. for batch in legacy_batches.into_iter().rev() { + // `Spine::insert` asserts that pushed batches are non-empty + // and contiguous; check this here so that a corrupted batch + // results in a decode error instead of a panic. + if batch.desc.lower() == batch.desc.upper() { + return Err(format!( + "legacy batch has an empty time range: {:?}", + batch.desc + )); + } + if batch.desc.lower() != trace.upper() { + return Err(format!( + "legacy batch lower {:?} does not match the trace upper {:?}", + batch.desc.lower().elements(), + trace.upper().elements() + )); + } trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch)); } } check_empty("hollow batches", hollow_batches.len())?; check_empty("merges", merges.len())?; - debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace); + // The same check that's `debug_assert`ed when mutating a trace we + // built ourselves; for a trace reconstructed from untrusted data it + // must be a hard error, both to keep corrupted state from being used + // and because the write side would panic on it anyway (e.g. `Spine`'s + // batch invariants and the full-level/merge correspondence). + trace + .validate() + .map_err(|err| format!("reconstructed trace failed validation: {err}"))?; Ok(trace) } @@ -2170,19 +2246,41 @@ impl MergeState { /// Push a new batch at this level, checking invariants. fn push_batch(&mut self, batch: SpineBatch) { + self.try_push_batch(batch) + .unwrap_or_else(|err| panic!("invalid batch push: {err}")); + } + + /// Fallible version of [Self::push_batch], for [Trace::unflatten], where + /// the batches were decoded from an untrusted blob and a violated + /// invariant must be a decode error rather than a panic. + fn try_push_batch(&mut self, batch: SpineBatch) -> Result<(), String> { if let Some(last) = self.batches.last() { - assert_eq!(last.id().1, batch.id().0); - assert_eq!(last.upper(), batch.lower()); + if last.id().1 != batch.id().0 { + return Err(format!( + "batch id {:?} does not chain with the previous id {:?}", + batch.id(), + last.id() + )); + } + if last.upper() != batch.lower() { + return Err(format!( + "batch lower {:?} does not match the previous upper {:?}", + batch.lower(), + last.upper() + )); + } } - assert!( - self.merge.is_none(), - "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})", - batch.id, - self.batches.len(), - ); - self.batches - .try_push(batch) - .expect("Attempted to insert batch into full layer!"); + if self.merge.is_some() { + return Err(format!( + "attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})", + batch.id, + self.batches.len(), + )); + } + if self.batches.try_push(batch).is_err() { + return Err("attempted to insert batch into full layer!".to_string()); + } + Ok(()) } /// The number of actual updates contained in the level. From 2e10d1fcb0e944d3527eaf6d8e7577e39689af56 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Mon, 15 Jun 2026 07:46:12 +0000 Subject: [PATCH 12/12] persist: Validate full part tiling in unflatten, not just endpoints Trace::unflatten's check that a spine batch's reconstructed parts cover its id range only verified the endpoints: the first part starts at the batch id's lower and the last ends at its upper. A crafted rollup could encode an outer id [0, 3) tiled by non-contiguous parts [0, 2) and [1, 3): the endpoint check passes, and Trace::validate also passes because it inspects the outer SpineBatch ids and descriptions, never the adjacency of a SpineBatch's parts. That malformed trace would be accepted at decode and panic later in normal maintenance instead. fueled_merge_reqs_before_ms emits a FueledMergeReq from the accepted parts, and Compactor::chunk_runs / compact_all (plus the id-range merge path in apply_merge_res_checked) call id_range on those ids, whose adjacency check is an assert_eq!. So the hardening still converted a corrupted durable rollup into a process panic for this invariant. Validate the full tiling in unflatten: require adjacent parts to be contiguous in addition to the existing endpoint check, so non-adjacent parts fail with a decode error. Adds a decode-rejection regression test for the overlapping-parts vector alongside the existing partless-spine-batch one. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/persist-client/src/internal/encoding.rs | 32 ++++++++++++++++++++- src/persist-client/src/internal/trace.rs | 13 +++++++-- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index 79bc1d7006cf4..18fcefa6fe531 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -2435,7 +2435,37 @@ mod tests { }); }); let err = rollup_decode_err(proto); - assert!(err.contains("do not cover"), "{err}"); + assert!(err.contains("do not tile"), "{err}"); + } + + /// A spine batch whose parts hit the right endpoints but overlap in the + /// middle (id `[0, 3)` tiled by parts `[0, 2)` and `[1, 3)`) passes the + /// endpoint check but is not a valid tiling. Such parts reach compaction's + /// `id_range`, whose contiguity `assert_eq!` would panic later, so decoding + /// must reject them here instead. + #[mz_ore::test] + fn rollup_proto_with_noncontiguous_spine_parts_is_rejected() { + let outer = SpineId(0, 3); + let part_ids = [SpineId(0, 2), SpineId(1, 3)]; + let proto = rollup_proto_with_trace(|trace| { + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(outer.into_proto()), + batch: Some(ProtoSpineBatch { + level: 0, + desc: Some(u64_desc_proto(0, 3, 0)), + parts: part_ids.iter().map(|id| id.into_proto()).collect(), + descs: vec![], + }), + }); + for id in part_ids { + trace.hollow_batches.push(ProtoIdHollowBatch { + id: Some(id.into_proto()), + batch: Some(legacy_batch_proto(0, 1, 0)), + }); + } + }); + let err = rollup_decode_err(proto); + assert!(err.contains("do not tile"), "{err}"); } /// Three spine batches at one level overflow the two-batch layer, which diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 90501fe6cbe1e..790e96b443393 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -451,13 +451,20 @@ impl Trace { .map(|(id, desc)| pop_batch(id, desc)) .collect::, _>>()?; // A spine batch's parts tile its id range (`SpineBatch::id` - // `debug_assert`s this). Real batches always have at least one - // part: an empty batch still carries an empty hollow batch. + // `debug_assert`s the endpoints). Real batches always have at least + // one part: an empty batch still carries an empty hollow batch. + // Validate the full tiling, not just the endpoints: downstream + // maintenance (`fueled_merge_reqs_before_ms` -> `id_range` in + // compaction, `apply_merge_res_checked`) `assert_eq!`s that the + // collected part ids are contiguous, so non-adjacent parts that + // happen to hit the right endpoints would panic later instead of + // here. if parts.first().map(|x| x.id.0) != Some(id.0) || parts.last().map(|x| x.id.1) != Some(id.1) + || parts.windows(2).any(|w| w[0].id.1 != w[1].id.0) { return Err(format!( - "spine batch {id:?} parts do not cover the batch's id range" + "spine batch {id:?} parts do not tile the batch's id range" )); } let len = parts.iter().map(|p| (*p).batch.len).sum();