diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index b4599bd02e6ce..18fcefa6fe531 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -19,7 +19,7 @@ use bytes::{Buf, Bytes}; use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::Description; use mz_ore::cast::CastInto; -use mz_ore::{assert_none, halt, soft_panic_or_log}; +use mz_ore::{halt, soft_panic_or_log}; use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates}; use mz_persist::location::{SeqNo, VersionedData}; use mz_persist::metrics::ColumnarMetrics; @@ -295,6 +295,13 @@ pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result< Some(x) => x, None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)), }; + // `Uuid::parse_str` panics (rather than erroring) while building its parse + // error for some malformed inputs — e.g. non-ASCII bytes make it slice the + // input on a non-char boundary. Reject anything that can't be a UUID first: + // the shortest valid form is 32 hex digits, all ASCII. + if uuid_encoded.len() < 32 || !uuid_encoded.is_ascii() { + return Err(format!("invalid {} {}: malformed UUID", id_type, encoded)); + } let uuid = Uuid::parse_str(uuid_encoded) .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?; Ok(*uuid.as_bytes()) @@ -512,7 +519,13 @@ impl RustType for StateDiff { proto.latest_rollup_key.into_rust()?, ); if let Some(field_diffs) = proto.field_diffs { - debug_assert_eq!(field_diffs.validate(), Ok(())); + // `field_diffs` is decoded from an untrusted blob, so validate it and + // return a decode error on a malformed/crafted diff rather than a + // `debug_assert` (which panics under debug assertions / fuzzing and + // is compiled out in release, where the bad data flowed through). + field_diffs + .validate() + .map_err(TryFromProtoError::InvalidPersistState)?; for field_diff in field_diffs.iter() { let (field, diff) = field_diff?; match field { @@ -1091,6 +1104,16 @@ impl RustType for Rollup { let diffs: Option = x.diffs.map(|diffs| diffs.into_rust()).transpose()?; if let Some(diffs) = &diffs { + // The diff bounds are validated against the latest rollup, which + // `State::latest_rollup` `.expect()`s to exist. A proto that carries + // diffs but no rollups is malformed, so reject it here rather than + // panicking. (A rollup-less state with no diffs — e.g. a freshly + // initialized state — is fine and must still decode.) + if state.collections.rollups.is_empty() { + return Err(TryFromProtoError::InvalidPersistState( + "rollup state has diffs but no rollups".into(), + )); + } if diffs.lower != state.latest_rollup().0.next() { return Err(TryFromProtoError::InvalidPersistState(format!( "diffs lower ({}) should match latest rollup's successor: ({})", @@ -1657,10 +1680,21 @@ impl RustType for BatchPart { })) } Some(proto_hollow_batch_part::Kind::Inline(x)) => { - assert_eq!(proto.encoded_size_bytes, 0); - assert_eq!(proto.key_lower.len(), 0); - assert_none!(proto.key_stats); - assert_none!(proto.diffs_sum); + // An inline part carries its data in `kind`; the hollow-only + // fields must be unset. These are decoded from an untrusted + // blob, so validate and return a decode error rather than + // asserting (which panicked, even in release, on a + // malformed/crafted part). Found by the rollup_proto_roundtrip + // cargo-fuzz target. + if proto.encoded_size_bytes != 0 + || !proto.key_lower.is_empty() + || proto.key_stats.is_some() + || proto.diffs_sum.is_some() + { + return Err(TryFromProtoError::InvalidPersistState( + "inline ProtoHollowBatchPart has hollow-part fields set".into(), + )); + } let updates = LazyInlineBatchPart(x.into_rust()?); Ok(BatchPart::Inline { updates, @@ -1896,8 +1930,17 @@ impl RustType for Description { } fn from_proto(proto: ProtoU64Description) -> Result { + let lower: Antichain = proto.lower.into_rust_if_some("lower")?; + // `Description::new` asserts a non-empty lower frontier. `lower` is + // decoded from an untrusted blob, so validate it here and return a + // decode error rather than panicking on a crafted/corrupted batch. + if lower.elements().is_empty() { + return Err(TryFromProtoError::InvalidPersistState( + "ProtoU64Description has an empty lower frontier".into(), + )); + } Ok(Description::new( - proto.lower.into_rust_if_some("lower")?, + lower, proto.upper.into_rust_if_some("upper")?, proto.since.into_rust_if_some("since")?, )) @@ -1927,10 +1970,13 @@ impl RustType for Antichain { #[cfg(test)] mod tests { + use mz_ore::assert_none; + use bytes::Bytes; use mz_build_info::DUMMY_BUILD_INFO; use mz_dyncfg::ConfigUpdates; use mz_ore::assert_err; + use mz_ore::cast::CastFrom; use mz_persist::location::SeqNo; use proptest::prelude::*; @@ -1943,6 +1989,58 @@ mod tests { use super::*; + #[mz_ore::test] + fn parse_id_rejects_malformed_uuid_without_panicking() { + // `Uuid::parse_str` panics (rather than erroring) while building its + // parse error for some malformed inputs — e.g. non-ASCII bytes make it + // slice the input on a non-char boundary. `parse_id` must turn these + // into clean errors. Regression for the rollup_proto_roundtrip finding. + let nonascii = "é".repeat(40); + let weird = format!("w{nonascii}"); + for encoded in ["w", "wnotauuid", "w{}", weird.as_str()] { + assert!( + parse_id("w", "WriterId", encoded).is_err(), + "{encoded:?} should error, not panic", + ); + } + } + + #[mz_ore::test] + fn rollup_inline_batch_part_with_hollow_fields_is_error() { + // An inline `ProtoHollowBatchPart` carrying hollow-only fields (here a + // non-zero encoded_size_bytes) must decode to an error, not panic — it + // used to `assert!`, which fired even in release on a crafted/corrupted + // blob. Regression for the rollup_proto_roundtrip cargo-fuzz finding. + use mz_proto::ProtoType; + use prost::Message; + let bytes: &[u8] = &[ + 0x3a, 0x12, 0x0a, 0x00, 0x12, 0x0a, 0x22, 0x06, 0x5a, 0x00, 0x10, 0x02, 0x2a, 0x00, + 0x22, 0x00, 0x22, 0x00, 0x22, 0x00, + ]; + let proto = crate::internal::state::ProtoRollup::decode(bytes) + .expect("crash input decodes as a proto"); + let result: Result, _> = proto.into_rust(); + assert_err!(result); + } + + #[mz_ore::test] + fn rollup_batch_with_empty_lower_frontier_is_error() { + // A `ProtoU64Description` with an empty lower frontier must decode to an + // error: `Description::new` asserts a non-empty lower, so a crafted batch + // used to panic. Regression for the rollup_proto_roundtrip cargo-fuzz + // finding. + use mz_proto::ProtoType; + use prost::Message; + let bytes: &[u8] = &[ + 0x3a, 0x12, 0x0a, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x0a, 0x00, 0x12, 0x00, 0x1a, 0x00, + 0x12, 0x00, 0x32, 0x00, 0x22, 0x00, + ]; + let proto = crate::internal::state::ProtoRollup::decode(bytes) + .expect("crash input decodes as a proto"); + let result: Result, _> = proto.into_rust(); + assert_err!(result); + } + #[mz_ore::test] fn metadata_map() { const COUNT: MetadataKey = MetadataKey::new("count"); @@ -2168,6 +2266,235 @@ mod tests { ); } + /// A rollup proto that carries diffs but whose state has no rollups is + /// malformed: the diff bounds are validated against the latest rollup, which + /// `latest_rollup` `.expect()`s to exist. Decoding it must return an error + /// rather than panicking. (A rollup-less state *without* diffs is legitimate + /// — see `applier_version_state` — and must still decode.) Regression for a + /// rollup_proto_roundtrip fuzz crash. + #[mz_ore::test] + fn rollup_proto_with_diffs_but_no_rollups_is_rejected() { + let shard_id = ShardId::new(); + let mut state = TypedState::<(), (), u64, i64>::new( + DUMMY_BUILD_INFO.semver_version(), + shard_id, + "host".to_owned(), + 0, + ); + // Anchor a rollup at the state's seqno so `Rollup::from` accepts the + // (empty) diff range, producing a proto that does carry diffs. + let seqno = state.state.seqno; + state.state.collections.rollups.insert( + seqno, + HollowRollup { + key: PartialRollupKey("foo".to_owned()), + encoded_size_bytes: None, + }, + ); + let mut proto = Rollup::from(state.into(), Vec::new()).into_proto(); + + // Strip every rollup, leaving a proto that has diffs but no rollups. + proto.rollups.clear(); + proto.deprecated_rollups.clear(); + + let result: Result, _> = proto.into_rust(); + assert!( + result.is_err(), + "a rollup proto with diffs but no rollups must error, not panic" + ); + } + + /// Returns a valid rollup proto whose trace was mutated by `update_trace`. + fn rollup_proto_with_trace(update_trace: impl FnOnce(&mut ProtoTrace)) -> ProtoRollup { + let state = TypedState::<(), (), u64, i64>::new( + DUMMY_BUILD_INFO.semver_version(), + ShardId::new(), + "host".to_owned(), + 0, + ); + let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto(); + update_trace(proto.trace.as_mut().expect("fresh state has a trace")); + proto + } + + fn u64_desc_proto(lower: u64, upper: u64, since: u64) -> ProtoU64Description { + Description::new( + Antichain::from_elem(lower), + Antichain::from_elem(upper), + Antichain::from_elem(since), + ) + .into_proto() + } + + fn legacy_batch_proto(lower: u64, upper: u64, since: u64) -> ProtoHollowBatch { + ProtoHollowBatch { + desc: Some(u64_desc_proto(lower, upper, since)), + ..Default::default() + } + } + + fn rollup_decode_err(proto: ProtoRollup) -> String { + let result: Result, _> = proto.into_rust(); + match result { + Ok(_) => panic!("crafted rollup proto must fail to decode"), + Err(err) => err.to_string(), + } + } + + /// The trace in a rollup proto is decoded from an untrusted blob, and + /// `Trace::unflatten` re-inserts structureless legacy batches into a + /// spine whose (always-on) asserts require them to tile the timeline + /// contiguously from the minimum frontier. A batch that doesn't must fail + /// decoding instead. Regression for a rollup_proto_roundtrip fuzz crash + /// (crash-8603829ee2a3b9c28ee988a14136050d1afe984c). + #[mz_ore::test] + fn rollup_proto_with_noncontiguous_legacy_batches_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(legacy_batch_proto(39, 40, 0)); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("legacy batch lower"), "{err}"); + } + + /// Like [rollup_proto_with_noncontiguous_legacy_batches_is_rejected], but + /// for `Spine::insert`'s other assert: a legacy batch with an empty time + /// range. + #[mz_ore::test] + fn rollup_proto_with_empty_range_legacy_batch_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(legacy_batch_proto(0, 0, 0)); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("empty time range"), "{err}"); + } + + /// A batch whose since is past the trace's since reconstructs without + /// tripping any spine assert but violates `Trace::validate`, which used + /// to run only under `debug_assert` (a panic in cargo-fuzz builds, silent + /// acceptance of corrupted state in release builds). It must be a decode + /// error. + #[mz_ore::test] + fn rollup_proto_with_batch_since_past_trace_since_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(legacy_batch_proto(0, 1, 5)); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("past the spine since"), "{err}"); + } + + /// An absurd batch len overflows the spine's maintenance arithmetic + /// (`len.next_power_of_two()`, summing lens of merged batches), which + /// panics in builds with overflow checks. It must be a decode error. + #[mz_ore::test] + fn rollup_proto_with_absurd_batch_len_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.legacy_batches.push(ProtoHollowBatch { + desc: Some(u64_desc_proto(0, 1, 0)), + len: u64::MAX, + ..Default::default() + }); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("maximum trace size"), "{err}"); + } + + /// An absurd spine batch level previously overflowed `level + 1` (a panic + /// in builds with overflow checks) and sized a `vec![]` allocation (an + /// abort). It must be a decode error. + #[mz_ore::test] + fn rollup_proto_with_absurd_spine_level_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(SpineId(0, 1).into_proto()), + batch: Some(ProtoSpineBatch { + level: u64::MAX, + desc: Some(u64_desc_proto(0, 1, 0)), + parts: vec![], + descs: vec![], + }), + }); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("exceeds the maximum"), "{err}"); + } + + /// A spine batch whose parts don't tile its id range (here: no parts at + /// all) previously tripped the `debug_assert`s in `SpineBatch::id`. It + /// must be a decode error. + #[mz_ore::test] + fn rollup_proto_with_partless_spine_batch_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(SpineId(0, 1).into_proto()), + batch: Some(ProtoSpineBatch { + level: 0, + desc: Some(u64_desc_proto(0, 1, 0)), + parts: vec![], + descs: vec![], + }), + }); + }); + let err = rollup_decode_err(proto); + assert!(err.contains("do not tile"), "{err}"); + } + + /// A spine batch whose parts hit the right endpoints but overlap in the + /// middle (id `[0, 3)` tiled by parts `[0, 2)` and `[1, 3)`) passes the + /// endpoint check but is not a valid tiling. Such parts reach compaction's + /// `id_range`, whose contiguity `assert_eq!` would panic later, so decoding + /// must reject them here instead. + #[mz_ore::test] + fn rollup_proto_with_noncontiguous_spine_parts_is_rejected() { + let outer = SpineId(0, 3); + let part_ids = [SpineId(0, 2), SpineId(1, 3)]; + let proto = rollup_proto_with_trace(|trace| { + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(outer.into_proto()), + batch: Some(ProtoSpineBatch { + level: 0, + desc: Some(u64_desc_proto(0, 3, 0)), + parts: part_ids.iter().map(|id| id.into_proto()).collect(), + descs: vec![], + }), + }); + for id in part_ids { + trace.hollow_batches.push(ProtoIdHollowBatch { + id: Some(id.into_proto()), + batch: Some(legacy_batch_proto(0, 1, 0)), + }); + } + }); + let err = rollup_decode_err(proto); + assert!(err.contains("do not tile"), "{err}"); + } + + /// Three spine batches at one level overflow the two-batch layer, which + /// `MergeState::push_batch` previously `expect`ed against. It must be a + /// decode error. + #[mz_ore::test] + fn rollup_proto_with_overfull_spine_level_is_rejected() { + let proto = rollup_proto_with_trace(|trace| { + for i in 0..3u64 { + let id = SpineId(usize::cast_from(i), usize::cast_from(i + 1)); + trace.spine_batches.push(ProtoIdSpineBatch { + id: Some(id.into_proto()), + batch: Some(ProtoSpineBatch { + level: 0, + desc: Some(u64_desc_proto(i, i + 1, 0)), + parts: vec![id.into_proto()], + descs: vec![], + }), + }); + trace.hollow_batches.push(ProtoIdHollowBatch { + id: Some(id.into_proto()), + batch: Some(legacy_batch_proto(i, i + 1, 0)), + }); + } + }); + let err = rollup_decode_err(proto); + assert!(err.contains("full layer"), "{err}"); + } + #[mz_persist_proc::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) { diff --git a/src/persist-client/src/internal/state_diff.rs b/src/persist-client/src/internal/state_diff.rs index 1be97db11bc5e..3a4a67c684e1e 100644 --- a/src/persist-client/src/internal/state_diff.rs +++ b/src/persist-client/src/internal/state_diff.rs @@ -1360,6 +1360,27 @@ mod tests { use super::*; + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function on OS `linux` + fn proto_state_diff_invalid_field_diffs_is_error() { + // A `ProtoStateDiff` decoded from an untrusted blob whose field diffs + // fail `validate()` must convert to an error, not panic. We previously + // `debug_assert`ed validity, which panicked under debug assertions / + // fuzzing. Regression for the state_diff_proto_roundtrip cargo-fuzz + // finding. + use crate::internal::state::ProtoStateDiff; + use mz_proto::ProtoType; + use prost::Message; + + let bytes: &[u8] = &[0x2a, 0x04, 0x08, 0x00, 0x68, 0x00, 0x40, 0x48]; + let proto = ProtoStateDiff::decode(bytes).expect("crash input decodes as a proto"); + let result: Result, _> = proto.into_rust(); + assert!( + result.is_err(), + "invalid field diffs must be a decode error" + ); + } + /// Model a situation where a "leader" is constantly making changes to its state, and a "follower" /// is applying those changes as diffs. #[mz_ore::test] diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 6dc55064fbe69..790e96b443393 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -287,6 +287,25 @@ impl Trace { // we know to preserve the structure for this trace. let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty(); + // The flattened trace is decoded from an untrusted blob: any invariant + // a crafted or corrupted value can violate must surface as a decode + // error here, never as a panic in the spine code below. + // + // Bound the total logical len of all batches. The spine's maintenance + // arithmetic (`len.next_power_of_two()`, summing lens of merged + // batches) overflows on absurd lens, and no real trace has anywhere + // near this many updates. + const MAX_TOTAL_LEN: usize = usize::MAX >> 3; + let mut total_len = 0usize; + for batch in legacy_batches.keys().chain(hollow_batches.values()) { + total_len = total_len + .checked_add(batch.len) + .filter(|len| *len <= MAX_TOTAL_LEN) + .ok_or_else(|| { + format!("total len of batches exceeds the maximum trace size: {batch:?}") + })?; + } + // We need to look up legacy batches somehow, but we don't have a spine id for them. // Instead, we rely on the fact that the spine must store them in antichain order. // Our timestamp type may not be totally ordered, so we need to implement our own comparator @@ -304,101 +323,123 @@ impl Trace { let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect(); legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse()); - let mut pop_batch = - |id: SpineId, expected_desc: Option<&Description>| -> Result<_, String> { - if let Some(batch) = hollow_batches.remove(&id) { - if let Some(desc) = expected_desc { - // We don't expect the desc's upper and lower to change for a given spine id. - assert_eq!(desc.lower(), batch.desc.lower()); - assert_eq!(desc.upper(), batch.desc.upper()); - // Due to the way thin spine batches are diffed, the sinces can be out of sync. - // This should be rare, and hopefully impossible once we change how diffs work. - if desc.since() != batch.desc.since() { - warn!( - "unexpected since out of sync for spine batch: {:?} != {:?}", - desc.since().elements(), - batch.desc.since().elements() - ); - } + let mut pop_batch = |id: SpineId, + expected_desc: Option<&Description>| + -> Result<_, String> { + if let Some(batch) = hollow_batches.remove(&id) { + if let Some(desc) = expected_desc { + // We don't expect the desc's upper and lower to change for a given spine id. + if desc.lower() != batch.desc.lower() || desc.upper() != batch.desc.upper() { + return Err(format!( + "hollow batch desc {:?} did not match the spine batch desc {:?} for {id:?}", + batch.desc, desc + )); + } + // Due to the way thin spine batches are diffed, the sinces can be out of sync. + // This should be rare, and hopefully impossible once we change how diffs work. + if desc.since() != batch.desc.since() { + warn!( + "unexpected since out of sync for spine batch: {:?} != {:?}", + desc.since().elements(), + batch.desc.since().elements() + ); } - return Ok(IdHollowBatch { id, batch }); } - let mut batch = legacy_batches - .pop() - .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?; + return Ok(IdHollowBatch { id, batch }); + } + let mut batch = legacy_batches + .pop() + .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?; - let Some(expected_desc) = expected_desc else { - return Ok(IdHollowBatch { id, batch }); - }; + let Some(expected_desc) = expected_desc else { + return Ok(IdHollowBatch { id, batch }); + }; - if expected_desc.lower() != batch.desc.lower() { - return Err(format!( - "hollow batch lower {:?} did not match expected lower {:?}", - batch.desc.lower().elements(), - expected_desc.lower().elements() - )); - } + if expected_desc.lower() != batch.desc.lower() { + return Err(format!( + "hollow batch lower {:?} did not match expected lower {:?}", + batch.desc.lower().elements(), + expected_desc.lower().elements() + )); + } - // Empty legacy batches are not deterministic: different nodes may split them up - // in different ways. For now, we rearrange them such to match the spine data. - if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 { - let mut new_upper = batch.desc.upper().clone(); - - // While our current batch is too small, and there's another empty batch - // in the list, roll it in. - while PartialOrder::less_than(&new_upper, expected_desc.upper()) { - let Some(next_batch) = legacy_batches.pop() else { - break; - }; - if next_batch.is_empty() { - new_upper.clone_from(next_batch.desc.upper()); - } else { - legacy_batches.push(next_batch); - break; - } - } + // Empty legacy batches are not deterministic: different nodes may split them up + // in different ways. For now, we rearrange them such to match the spine data. + if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 { + let mut new_upper = batch.desc.upper().clone(); - // If our current batch is too large, split it by the expected upper - // and preserve the remainder. - if PartialOrder::less_than(expected_desc.upper(), &new_upper) { - legacy_batches.push(Arc::new(HollowBatch::empty(Description::new( - expected_desc.upper().clone(), - new_upper.clone(), - batch.desc.since().clone(), - )))); - new_upper.clone_from(expected_desc.upper()); + // While our current batch is too small, and there's another empty batch + // in the list, roll it in. + while PartialOrder::less_than(&new_upper, expected_desc.upper()) { + let Some(next_batch) = legacy_batches.pop() else { + break; + }; + if next_batch.is_empty() { + new_upper.clone_from(next_batch.desc.upper()); + } else { + legacy_batches.push(next_batch); + break; } - batch = Arc::new(HollowBatch::empty(Description::new( - batch.desc.lower().clone(), - new_upper, - batch.desc.since().clone(), - ))) } - if expected_desc.upper() != batch.desc.upper() { - return Err(format!( - "hollow batch upper {:?} did not match expected upper {:?}", - batch.desc.upper().elements(), - expected_desc.upper().elements() - )); + // If our current batch is too large, split it by the expected upper + // and preserve the remainder. + if PartialOrder::less_than(expected_desc.upper(), &new_upper) { + legacy_batches.push(Arc::new(HollowBatch::empty(Description::new( + expected_desc.upper().clone(), + new_upper.clone(), + batch.desc.since().clone(), + )))); + new_upper.clone_from(expected_desc.upper()); } + batch = Arc::new(HollowBatch::empty(Description::new( + batch.desc.lower().clone(), + new_upper, + batch.desc.since().clone(), + ))) + } - Ok(IdHollowBatch { id, batch }) - }; + if expected_desc.upper() != batch.desc.upper() { + return Err(format!( + "hollow batch upper {:?} did not match expected upper {:?}", + batch.desc.upper().elements(), + expected_desc.upper().elements() + )); + } + + Ok(IdHollowBatch { id, batch }) + }; let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() { (batch.desc.upper().clone(), id.1) } else { (Antichain::from_elem(T::minimum()), 0) }; + // Real spine levels are logarithmic in the total len of the trace, so + // this bound is far above any legitimate level while keeping the + // allocation below trivial. + const MAX_LEVELS: usize = 256; let levels = spine_batches .first_key_value() - .map(|(_, batch)| batch.level + 1) + .map(|(_, batch)| batch.level.saturating_add(1)) .unwrap_or(0); + if levels > MAX_LEVELS { + return Err(format!( + "spine level {} exceeds the maximum {MAX_LEVELS}", + levels - 1 + )); + } let mut merging = vec![MergeState::default(); levels]; for (id, batch) in spine_batches { let level = batch.level; + if batch.descs.len() > batch.parts.len() { + return Err(format!( + "spine batch {id:?} has more descs ({}) than parts ({})", + batch.descs.len(), + batch.parts.len() + )); + } let descs = batch.descs.iter().map(Some).chain(std::iter::repeat_n( None, batch.parts.len() - batch.descs.len(), @@ -409,6 +450,23 @@ impl Trace { .zip_eq(descs) .map(|(id, desc)| pop_batch(id, desc)) .collect::, _>>()?; + // A spine batch's parts tile its id range (`SpineBatch::id` + // `debug_assert`s the endpoints). Real batches always have at least + // one part: an empty batch still carries an empty hollow batch. + // Validate the full tiling, not just the endpoints: downstream + // maintenance (`fueled_merge_reqs_before_ms` -> `id_range` in + // compaction, `apply_merge_res_checked`) `assert_eq!`s that the + // collected part ids are contiguous, so non-adjacent parts that + // happen to hit the right endpoints would panic later instead of + // here. + if parts.first().map(|x| x.id.0) != Some(id.0) + || parts.last().map(|x| x.id.1) != Some(id.1) + || parts.windows(2).any(|w| w[0].id.1 != w[1].id.0) + { + return Err(format!( + "spine batch {id:?} parts do not tile the batch's id range" + )); + } let len = parts.iter().map(|p| (*p).batch.len).sum(); let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction); let batch = SpineBatch { @@ -419,9 +477,11 @@ impl Trace { len, }; - let state = &mut merging[level]; + let state = merging.get_mut(level).ok_or_else(|| { + format!("spine batch {id:?} level {level} out of bounds ({levels} levels)") + })?; - state.push_batch(batch); + state.try_push_batch(batch)?; if let Some(id) = state.id() { if let Some(merge) = merges.remove(&id) { state.merge = Some(IdFuelingMerge { @@ -459,13 +519,36 @@ impl Trace { } else { // If the structure wasn't actually serialized, we may have legacy batches left over. for batch in legacy_batches.into_iter().rev() { + // `Spine::insert` asserts that pushed batches are non-empty + // and contiguous; check this here so that a corrupted batch + // results in a decode error instead of a panic. + if batch.desc.lower() == batch.desc.upper() { + return Err(format!( + "legacy batch has an empty time range: {:?}", + batch.desc + )); + } + if batch.desc.lower() != trace.upper() { + return Err(format!( + "legacy batch lower {:?} does not match the trace upper {:?}", + batch.desc.lower().elements(), + trace.upper().elements() + )); + } trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch)); } } check_empty("hollow batches", hollow_batches.len())?; check_empty("merges", merges.len())?; - debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace); + // The same check that's `debug_assert`ed when mutating a trace we + // built ourselves; for a trace reconstructed from untrusted data it + // must be a hard error, both to keep corrupted state from being used + // and because the write side would panic on it anyway (e.g. `Spine`'s + // batch invariants and the full-level/merge correspondence). + trace + .validate() + .map_err(|err| format!("reconstructed trace failed validation: {err}"))?; Ok(trace) } @@ -2170,19 +2253,41 @@ impl MergeState { /// Push a new batch at this level, checking invariants. fn push_batch(&mut self, batch: SpineBatch) { + self.try_push_batch(batch) + .unwrap_or_else(|err| panic!("invalid batch push: {err}")); + } + + /// Fallible version of [Self::push_batch], for [Trace::unflatten], where + /// the batches were decoded from an untrusted blob and a violated + /// invariant must be a decode error rather than a panic. + fn try_push_batch(&mut self, batch: SpineBatch) -> Result<(), String> { if let Some(last) = self.batches.last() { - assert_eq!(last.id().1, batch.id().0); - assert_eq!(last.upper(), batch.lower()); + if last.id().1 != batch.id().0 { + return Err(format!( + "batch id {:?} does not chain with the previous id {:?}", + batch.id(), + last.id() + )); + } + if last.upper() != batch.lower() { + return Err(format!( + "batch lower {:?} does not match the previous upper {:?}", + batch.lower(), + last.upper() + )); + } } - assert!( - self.merge.is_none(), - "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})", - batch.id, - self.batches.len(), - ); - self.batches - .try_push(batch) - .expect("Attempted to insert batch into full layer!"); + if self.merge.is_some() { + return Err(format!( + "attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})", + batch.id, + self.batches.len(), + )); + } + if self.batches.try_push(batch).is_err() { + return Err("attempted to insert batch into full layer!".to_string()); + } + Ok(()) } /// The number of actual updates contained in the level. diff --git a/src/persist-types/src/arrow.rs b/src/persist-types/src/arrow.rs index 0c969331af3b9..310408f35a485 100644 --- a/src/persist-types/src/arrow.rs +++ b/src/persist-types/src/arrow.rs @@ -170,12 +170,22 @@ fn from_proto_with_type( for b in buffers.into_iter().map(|b| b.into_rust()) { builder = builder.add_buffer(b?); } - for c in children - .into_iter() - .zip_eq(fields_for_type(&data_type)) - .map(|(c, field)| from_proto_with_type(c, Some(field.data_type()))) - { - builder = builder.add_child_data(c?); + // `children` is reconstructed from the wire, so its count may disagree with + // what `data_type` expects. Guard the length explicitly and reject a mismatch + // as a decode error (corrupted/crafted parts must not crash readers); the + // `zip_eq` below is then infallible. + let fields = fields_for_type(&data_type); + if children.len() != fields.len() { + return Err(TryFromProtoError::RowConversionError(format!( + "ProtoArrayData for {:?} has {} children but its data type expects {}", + data_type, + children.len(), + fields.len(), + ))); + } + for (c, field) in children.into_iter().zip_eq(fields) { + let c = from_proto_with_type(c, Some(field.data_type()))?; + builder = builder.add_child_data(c); } // Construct the builder which validates all inputs and aligns data. @@ -810,6 +820,25 @@ mod tests { use mz_proto::ProtoType; use std::sync::Arc; + #[mz_ore::test] + fn from_proto_child_count_mismatch_is_error() { + // A `ProtoArrayData` whose child count disagrees with its declared data + // type must decode to an error, not panic via `zip_eq`. Regression for + // the array_data_proto_roundtrip cargo-fuzz finding. + use prost::Message; + let bytes: &[u8] = &[ + 0x0a, 0x0a, 0x18, 0x32, 0x22, 0x00, 0x18, 0x0a, 0x18, 0x32, 0x22, 0x00, 0x2a, 0x00, + 0x2a, 0x00, 0xe0, 0x32, 0x24, + ]; + let proto = + crate::arrow::ProtoArrayData::decode(bytes).expect("crash input decodes as a proto"); + let result: Result = proto.into_rust(); + assert!( + result.is_err(), + "child-count mismatch must be a decode error" + ); + } + #[mz_ore::test] fn trim_proto() { let nested_fields: Fields = vec![Field::new("a", DataType::UInt64, true)].into(); diff --git a/src/persist-types/src/codec_impls.rs b/src/persist-types/src/codec_impls.rs index 06bb3d7502b80..e3ee72d20b063 100644 --- a/src/persist-types/src/codec_impls.rs +++ b/src/persist-types/src/codec_impls.rs @@ -618,13 +618,14 @@ mod tests { .to_string() ) ); + // Too short to be a UUID: rejected before the `uuid` crate (which + // panics building an error for some such inputs), so the message is + // generic rather than the crate's "invalid length". assert_eq!( ShardId::from_str("s0"), - Err( - "invalid ShardId s0: invalid length: expected length 32 for simple format, found 1" - .to_string() - ) + Err("invalid ShardId s0: malformed UUID".to_string()) ); + // Long enough and ASCII, so the `uuid` crate's own error survives. assert_eq!( ShardId::from_str("s00000000-0000-0000-0000-000000000000FOO"), Err("invalid ShardId s00000000-0000-0000-0000-000000000000FOO: invalid character: expected an optional prefix of `urn:uuid:` followed by [0-9a-fA-F-], found `O` at 38".to_string()) diff --git a/src/persist-types/src/lib.rs b/src/persist-types/src/lib.rs index 20d0f226fa27d..6b1aa21d87d3a 100644 --- a/src/persist-types/src/lib.rs +++ b/src/persist-types/src/lib.rs @@ -238,6 +238,13 @@ impl std::str::FromStr for ShardId { Some(x) => x, None => return Err(format!("invalid ShardId {}: incorrect prefix", s)), }; + // `Uuid::parse_str` panics (rather than erroring) while building its + // parse error for some malformed inputs — e.g. non-ASCII bytes make it + // slice the input on a non-char boundary. Reject anything that can't be + // a UUID first: the shortest valid form is 32 hex digits, all ASCII. + if uuid_encoded.len() < 32 || !uuid_encoded.is_ascii() { + return Err(format!("invalid ShardId {}: malformed UUID", s)); + } let uuid = Uuid::parse_str(uuid_encoded) .map_err(|err| format!("invalid ShardId {}: {}", s, err))?; Ok(ShardId(*uuid.as_bytes())) diff --git a/src/pgrepr/src/value/numeric.rs b/src/pgrepr/src/value/numeric.rs index 7b082cdc170e1..717bed8a282f5 100644 --- a/src/pgrepr/src/value/numeric.rs +++ b/src/pgrepr/src/value/numeric.rs @@ -205,20 +205,24 @@ impl<'a> FromSql<'a> for Numeric { _ => return Err("bad sign in numeric".into()), } - let mut scale = (units - weight - 1) * 4; + // Compute in i32: `units`/`weight` are attacker-controlled `i16`s from + // the wire, so `(units - weight - 1) * 4` overflows `i16` for crafted + // headers (panic under overflow checks, silent wraparound otherwise). + let in_scale = i32::from(in_scale); + let mut scale = (i32::from(units) - i32::from(weight) - 1) * 4; // Adjust scales if scale < 0 { // Multiply by 10^scale - cx.scaleb(&mut d, &AdtNumeric::from(-i32::from(scale))); + cx.scaleb(&mut d, &AdtNumeric::from(-scale)); scale = 0; } else if scale > in_scale { // Divide by 10^(difference in scale and in_scale) - cx.scaleb(&mut d, &AdtNumeric::from(-i32::from(scale - in_scale))); + cx.scaleb(&mut d, &AdtNumeric::from(-(scale - in_scale))); scale = in_scale; } - cx.scaleb(&mut d, &AdtNumeric::from(-i32::from(scale))); + cx.scaleb(&mut d, &AdtNumeric::from(-scale)); cx.reduce(&mut d); let mut cx = cx_datum(); @@ -299,3 +303,30 @@ fn test_to_from_sql_roundtrip() { let d_from_sql = Numeric::from_sql(&Type::NUMERIC, &out).unwrap(); assert_eq!(r.0, d_from_sql.0); } + +#[mz_ore::test] +#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux` +fn test_from_sql_extreme_weight_no_overflow() { + // `(units - weight - 1) * 4` is computed in i32 because `weight` is an + // attacker-controlled i16 from the wire: extreme values overflowed the old + // i16 arithmetic (panic under overflow checks, silent wraparound + // otherwise). `from_sql` must return a result, never panic. Regression for + // the value_decode_binary cargo-fuzz finding. + fn header(units: i16, weight: i16, sign: u16, in_scale: i16) -> Vec { + let mut b = Vec::new(); + b.extend_from_slice(&units.to_be_bytes()); + b.extend_from_slice(&weight.to_be_bytes()); + b.extend_from_slice(&sign.to_be_bytes()); + b.extend_from_slice(&in_scale.to_be_bytes()); + b + } + for (weight, in_scale) in [ + (i16::MAX, 0), + (i16::MIN, 0), + (-28174, -28271), // exact fuzz repro + (13606, -28272), + ] { + // units = 0 (no trailing digits); the header alone triggered the overflow. + let _ = Numeric::from_sql(&Type::NUMERIC, &header(0, weight, 0, in_scale)); + } +} diff --git a/src/pgwire/src/codec.rs b/src/pgwire/src/codec.rs index 8625476a0108c..797f4e6f11e6b 100644 --- a/src/pgwire/src/codec.rs +++ b/src/pgwire/src/codec.rs @@ -214,7 +214,7 @@ where } } -struct Codec { +pub struct Codec { decode_state: DecodeState, encode_state: Vec<(mz_pgrepr::Type, mz_pgwire_common::Format)>, /// When true, skip the aggregate buffer size check in `decode()`. diff --git a/src/postgres-util/src/desc.rs b/src/postgres-util/src/desc.rs index 2cad0f48cf3d6..11d5d9f894963 100644 --- a/src/postgres-util/src/desc.rs +++ b/src/postgres-util/src/desc.rs @@ -202,14 +202,16 @@ impl RustType for PostgresColumnDesc { } fn from_proto(proto: ProtoPostgresColumnDesc) -> Result { + let col_num_u32: u32 = proto + .col_num + .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?; + // `col_num` is `u16` on the Rust side; reject u32 values that don't + // fit instead of panicking — reachable from untrusted proto bytes. + let col_num = u16::try_from(col_num_u32) + .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?; Ok(PostgresColumnDesc { name: proto.name, - col_num: { - let v: u32 = proto - .col_num - .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?; - u16::try_from(v).expect("u16 must roundtrip") - }, + col_num, type_oid: proto.type_oid, type_mod: proto.type_mod, nullable: proto.nullable, @@ -257,14 +259,20 @@ impl RustType for PostgresKeyDesc { } fn from_proto(proto: ProtoPostgresKeyDesc) -> Result { + // `cols` is `Vec` on the Rust side but `Vec` on the wire; + // a u32 value above 65535 used to panic via `.expect`, which is + // reachable from untrusted proto bytes. + let cols = proto + .cols + .into_iter() + .map(|c| { + u16::try_from(c).map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string())) + }) + .collect::, _>>()?; Ok(PostgresKeyDesc { oid: proto.oid, name: proto.name, - cols: proto - .cols - .into_iter() - .map(|c| c.try_into().expect("values roundtrip")) - .collect(), + cols, is_primary: proto.is_primary, nulls_not_distinct: proto.nulls_not_distinct, })