Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,5 @@ disallowed-macros = [
disallowed-types = [
{ path = "std::collections::HashMap", reason = "use `std::collections::BTreeMap` or `mz_ore::collections::HashMap` instead" },
{ path = "std::collections::HashSet", reason = "use `std::collections::BTreeSet` or `mz_ore::collections::HashSet` instead" },
{ path = "differential_dataflow::containers::TimelyStack", reason = "use `mz_timely_util::columnation::ColumnationStack` instead" },
]
1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow.workspace = true
async-stream.workspace = true
bytesize.workspace = true
columnar.workspace = true
columnation.workspace = true
dec = { workspace = true, features = ["serde"] }
differential-dataflow.workspace = true
differential-dogs3.workspace = true
Expand Down
8 changes: 3 additions & 5 deletions src/compute/src/extensions/temporal_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
use std::marker::PhantomData;

use differential_dataflow::Hashable;
use differential_dataflow::containers::TimelyStack;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
use differential_dataflow::trace::{Batcher, Builder, Description};
use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
use mz_timely_util::temporal::{Bucket, BucketChain, BucketTimestamp};
use timely::container::PushInto;
use timely::dataflow::channels::pact::Exchange;
Expand Down Expand Up @@ -194,8 +192,8 @@ where
}
}

/// Reveal the contents of the `MergeBatcher`, returning a vector of `TimelyStack`s.
fn done(mut self) -> Vec<TimelyStack<(D, T, R)>> {
/// Reveal the contents of the `MergeBatcher`, returning a vector of `ColumnationStack`s.
fn done(mut self) -> Vec<ColumnationStack<(D, T, R)>> {
self.inner.seal::<CapturingBuilder<_, _>>(Antichain::new())
}
}
Expand Down
31 changes: 17 additions & 14 deletions src/compute/src/row_spine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,35 @@ use differential_dataflow::trace::implementations::OffsetList;
mod spines {
use std::rc::Rc;

use differential_dataflow::containers::{Columnation, TimelyStack};
use columnation::Columnation;
use differential_dataflow::trace::implementations::Layout;
use differential_dataflow::trace::implementations::Update;
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
use mz_repr::Row;
use mz_timely_util::columnation::ColumnationStack;

use crate::row_spine::{DatumContainer, OffsetOptimized};
use crate::typedefs::{KeyBatcher, KeyValBatcher};

pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
pub type RowRowBuilder<T, R> =
RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
pub type RowRowBuilder<T, R> = RcBuilder<
OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
>;

pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
pub type RowValBuilder<V, T, R> =
RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
pub type RowValBuilder<V, T, R> = RcBuilder<
OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
>;

pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
pub type RowBuilder<T, R> =
RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;

/// A layout based on timely stacks
pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
Expand All @@ -65,8 +68,8 @@ mod spines {
{
type KeyContainer = DatumContainer;
type ValContainer = DatumContainer;
type TimeContainer = TimelyStack<U::Time>;
type DiffContainer = TimelyStack<U::Diff>;
type TimeContainer = ColumnationStack<U::Time>;
type DiffContainer = ColumnationStack<U::Diff>;
type OffsetContainer = OffsetOptimized;
}
impl<U: Update<Key = Row>> Layout for RowValLayout<U>
Expand All @@ -76,9 +79,9 @@ mod spines {
U::Diff: Columnation,
{
type KeyContainer = DatumContainer;
type ValContainer = TimelyStack<U::Val>;
type TimeContainer = TimelyStack<U::Time>;
type DiffContainer = TimelyStack<U::Diff>;
type ValContainer = ColumnationStack<U::Val>;
type TimeContainer = ColumnationStack<U::Time>;
type DiffContainer = ColumnationStack<U::Diff>;
type OffsetContainer = OffsetOptimized;
}
impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
Expand All @@ -87,9 +90,9 @@ mod spines {
U::Diff: Columnation,
{
type KeyContainer = DatumContainer;
type ValContainer = TimelyStack<()>;
type TimeContainer = TimelyStack<U::Time>;
type DiffContainer = TimelyStack<U::Diff>;
type ValContainer = ColumnationStack<()>;
type TimeContainer = ColumnationStack<U::Time>;
type DiffContainer = ColumnationStack<U::Diff>;
type OffsetContainer = OffsetOptimized;
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ use std::collections::{BinaryHeap, VecDeque};
use std::fmt;
use std::rc::Rc;

use differential_dataflow::containers::{Columnation, TimelyStack};
use columnation::Columnation;
use differential_dataflow::trace::implementations::BatchContainer;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::columnation::ColumnationStack;
use timely::PartialOrder;
use timely::container::SizableContainer;
use timely::progress::Antichain;
Expand Down Expand Up @@ -849,19 +850,19 @@ impl<D: Data> From<Cursor<D>> for Chain<D> {
/// All updates in a chunk are sorted by (time, data) and consolidated.
///
/// We would like all chunks to have the same fixed size, to make it easy for the allocator to
/// re-use chunk allocations. Unfortunately, the current `TimelyStack`/`ChunkedStack` API doesn't
/// re-use chunk allocations. Unfortunately, the current `ColumnationStack`/`ChunkedStack` API doesn't
/// provide a convenient way to pre-size regions, so chunks are currently only fixed-size in
/// spirit.
struct Chunk<D: Data> {
/// The contained updates.
data: TimelyStack<(D, Timestamp, Diff)>,
data: ColumnationStack<(D, Timestamp, Diff)>,
/// Cached value of the current chunk size, for efficient updating of metrics.
cached_size: Option<SizeMetrics>,
}

impl<D: Data> Default for Chunk<D> {
fn default() -> Self {
let mut data = TimelyStack::default();
let mut data = ColumnationStack::default();
data.ensure_capacity(&mut None);

Self {
Expand Down
26 changes: 13 additions & 13 deletions src/compute/src/typedefs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
use columnar::{Container, Ref};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
use differential_dataflow::trace::wrappers::enter::TraceEnter;
use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
use mz_repr::Diff;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker};
use timely::dataflow::ScopeParent;

use crate::row_spine::RowValBuilder;
Expand All @@ -32,13 +31,14 @@ pub use crate::typedefs::spines::{ColKeySpine, ColValSpine};
pub(crate) mod spines {
use std::rc::Rc;

use differential_dataflow::containers::{Columnation, TimelyStack};
use columnation::Columnation;
use differential_dataflow::trace::implementations::ord_neu::{
OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder,
};
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use differential_dataflow::trace::implementations::{Layout, Update};
use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
use mz_timely_util::columnation::ColumnationStack;

use crate::row_spine::OffsetOptimized;
use crate::typedefs::{KeyBatcher, KeyValBatcher};
Expand All @@ -47,13 +47,13 @@ pub(crate) mod spines {
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<MzStack<((K, V), T, R)>>>>;
pub type ColValBatcher<K, V, T, R> = KeyValBatcher<K, V, T, R>;
pub type ColValBuilder<K, V, T, R> =
RcBuilder<OrdValBuilder<MzStack<((K, V), T, R)>, TimelyStack<((K, V), T, R)>>>;
RcBuilder<OrdValBuilder<MzStack<((K, V), T, R)>, ColumnationStack<((K, V), T, R)>>>;

/// A spine for generic keys
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<MzStack<((K, ()), T, R)>>>>;
pub type ColKeyBatcher<K, T, R> = KeyBatcher<K, T, R>;
pub type ColKeyBuilder<K, T, R> =
RcBuilder<OrdKeyBuilder<MzStack<((K, ()), T, R)>, TimelyStack<((K, ()), T, R)>>>;
RcBuilder<OrdKeyBuilder<MzStack<((K, ()), T, R)>, ColumnationStack<((K, ()), T, R)>>>;

/// A layout based on chunked timely stacks
pub struct MzStack<U: Update> {
Expand All @@ -67,10 +67,10 @@ pub(crate) mod spines {
U::Time: Columnation,
U::Diff: Columnation,
{
type KeyContainer = TimelyStack<U::Key>;
type ValContainer = TimelyStack<U::Val>;
type TimeContainer = TimelyStack<U::Time>;
type DiffContainer = TimelyStack<U::Diff>;
type KeyContainer = ColumnationStack<U::Key>;
type ValContainer = ColumnationStack<U::Val>;
type TimeContainer = ColumnationStack<U::Time>;
type DiffContainer = ColumnationStack<U::Diff>;
type OffsetContainer = OffsetOptimized;
}
}
Expand Down Expand Up @@ -143,18 +143,18 @@ where
/// Trait for data types that can be used in Materialize's dataflow, supporting both columnar and
/// columnation.
pub trait MzData:
differential_dataflow::containers::Columnation
columnation::Columnation
+ for<'a> columnar::Columnar<Container: Container<Ref<'a>: Copy + Ord> + Clone + Send>
{
}

impl<T> MzData for T
where
T: differential_dataflow::containers::Columnation,
T: columnation::Columnation,
T: for<'a> columnar::Columnar<Container: Clone + Send>,
for<'a> Ref<'a, T>: Copy + Ord,
{
}

pub trait MzArrangeData: differential_dataflow::containers::Columnation {}
impl<T> MzArrangeData for T where T: differential_dataflow::containers::Columnation {}
pub trait MzArrangeData: columnation::Columnation {}
impl<T> MzArrangeData for T where T: columnation::Columnation {}
6 changes: 3 additions & 3 deletions src/storage-operators/src/s3_oneshot_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::rc::Rc;
use anyhow::anyhow;
use aws_types::sdk_config::SdkConfig;
use differential_dataflow::Hashable;
use differential_dataflow::containers::TimelyStack;
use futures::StreamExt;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand All @@ -31,6 +30,7 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::columnation::ColumnationStack;
use timely::PartialOrder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
Expand Down Expand Up @@ -59,7 +59,7 @@ mod pgcopy;
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
pub fn copy_to<G, F>(
input_collection: StreamVec<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
input_collection: StreamVec<G, Vec<ColumnationStack<((Row, ()), G::Timestamp, Diff)>>>,
err_stream: StreamVec<G, (DataflowError, G::Timestamp, Diff)>,
up_to: Antichain<G::Timestamp>,
connection_details: S3UploadInfo,
Expand Down Expand Up @@ -297,7 +297,7 @@ fn render_upload_operator<G, T>(
connection_id: CatalogItemId,
connection_details: S3UploadInfo,
sink_id: GlobalId,
input_collection: StreamVec<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
input_collection: StreamVec<G, Vec<ColumnationStack<((Row, ()), G::Timestamp, Diff)>>>,
up_to: Antichain<G::Timestamp>,
start_stream: StreamVec<G, Result<(), String>>,
params: CopyToParameters,
Expand Down
6 changes: 3 additions & 3 deletions src/storage-types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,13 +1012,13 @@ mod columnation {

#[cfg(test)]
mod tests {
use differential_dataflow::containers::TimelyStack;
use mz_timely_util::columnation::ColumnationStack;
use proptest::prelude::*;

use super::*;

fn columnation_roundtrip<T: Columnation>(item: &T) -> TimelyStack<T> {
let mut container = TimelyStack::with_capacity(1);
fn columnation_roundtrip<T: Columnation>(item: &T) -> ColumnationStack<T> {
let mut container = ColumnationStack::with_capacity(1);
container.copy(item);
container
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/source/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ use std::io;
use std::rc::Rc;

use differential_dataflow::AsCollection;
use differential_dataflow::containers::TimelyStack;
use itertools::Itertools;
use mz_mysql_util::quote_identifier;
use mz_ore::cast::CastFrom;
use mz_repr::Diff;
use mz_repr::GlobalId;
use mz_storage_types::errors::{DataflowError, SourceError};
use mz_storage_types::sources::SourceExport;
use mz_timely_util::columnation::ColumnationStack;
use mz_timely_util::containers::stack::AccountedStackBuilder;
use serde::{Deserialize, Serialize};
use timely::container::CapacityContainerBuilder;
Expand Down Expand Up @@ -380,7 +380,7 @@ pub(crate) struct RewindRequest {

type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
T,
AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
AccountedStackBuilder<CapacityContainerBuilder<ColumnationStack<(D, T, Diff)>>>,
>;

async fn return_definite_error(
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/source/sql_server/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::sync::Arc;
use std::time::Instant;

use differential_dataflow::AsCollection;
use differential_dataflow::containers::TimelyStack;
use futures::StreamExt;
use itertools::Itertools;
use mz_ore::cast::CastFrom;
Expand All @@ -35,6 +34,7 @@ use mz_storage_types::sources::sql_server::{MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPO
use mz_timely_util::builder_async::{
AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::columnation::ColumnationStack;
use mz_timely_util::containers::stack::AccountedStackBuilder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::operators::vec::Map;
Expand Down Expand Up @@ -740,7 +740,7 @@ async fn handle_data_event(

type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
T,
AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
AccountedStackBuilder<CapacityContainerBuilder<ColumnationStack<(D, T, Diff)>>>,
>;

/// Helper method to decode a row from a [`tiberius::Row`] (or 2 of them in the case of update)
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/source/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use std::sync::Arc;
use std::task::{Context, Poll, ready};

use differential_dataflow::Collection;
use differential_dataflow::containers::TimelyStack;
use mz_repr::{Diff, GlobalId, Row};
use mz_storage_types::errors::{DataflowError, DecodeError};
use mz_storage_types::sources::SourceTimestamp;
use mz_timely_util::builder_async::PressOnDropButton;
use mz_timely_util::columnation::ColumnationStack;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use timely::dataflow::{Scope, ScopeParent, StreamVec};
Expand Down Expand Up @@ -54,7 +54,7 @@ pub enum ProgressStatisticsUpdate {
}

pub type StackedCollection<G, T> =
Collection<G, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
Collection<G, ColumnationStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;

/// Describes a source that can render itself in a timely scope.
pub trait SourceRender {
Expand Down
Loading
Loading