diff --git a/Cargo.lock b/Cargo.lock index 27205b18086..1a54422dbba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4099,6 +4099,23 @@ dependencies = [ "yansi", ] +[[package]] +name = "gpu-scan-bench" +version = "0.1.0" +dependencies = [ + "clap", + "futures", + "object_store", + "tokio", + "tracing", + "tracing-perfetto", + "tracing-subscriber", + "url", + "vortex", + "vortex-cuda", + "vortex-cuda-macros", +] + [[package]] name = "gpu-scan-cli" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b5490594dd7..490ccce980a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "vortex-duckdb", "vortex-cuda", "vortex-cuda/cub", + "vortex-cuda/gpu-scan-bench", "vortex-cuda/gpu-scan-cli", "vortex-cuda/macros", "vortex-cuda/nvcomp", diff --git a/benchmarks/datafusion-bench/src/lib.rs b/benchmarks/datafusion-bench/src/lib.rs index 43b86331e9f..626b4ac0f1b 100644 --- a/benchmarks/datafusion-bench/src/lib.rs +++ b/benchmarks/datafusion-bench/src/lib.rs @@ -109,7 +109,7 @@ pub fn format_to_df_format(format: Format) -> Arc { Format::Csv => Arc::new(CsvFormat::default()) as _, Format::Arrow => Arc::new(ArrowFormat), Format::Parquet => Arc::new(ParquetFormat::new()), - Format::OnDiskVortex | Format::VortexCompact => { + Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda => { Arc::new(VortexFormat::new(SESSION.clone())) } Format::OnDiskDuckDB | Format::Lance => { diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index f915c9c7d3e..5869e7ec6f3 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -140,6 +140,13 @@ async fn main() -> anyhow::Result<()> { convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact) .await?; } + Format::VortexCuda => { + convert_parquet_directory_to_vortex( + &base_path, + CompactionStrategy::CudaCompatible, + ) + .await?; + } _ => {} } } @@ -233,7 +240,12 @@ async fn register_benchmark_tables( ) -> anyhow::Result<()> { match format { Format::Arrow => register_arrow_tables(session, benchmark).await, - _ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => { + _ if use_scan_api() + && matches!( + format, + Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda + ) => + { register_v2_tables(session, benchmark, format).await } _ => { diff --git a/vortex-bench/src/bin/data-gen.rs b/vortex-bench/src/bin/data-gen.rs index d38c977a6c6..c61131479ff 100644 --- a/vortex-bench/src/bin/data-gen.rs +++ b/vortex-bench/src/bin/data-gen.rs @@ -79,6 +79,15 @@ async fn main() -> anyhow::Result<()> { convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?; } + if args + .formats + .iter() + .any(|f| matches!(f, Format::VortexCuda)) + { + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::CudaCompatible) + .await?; + } + if args .formats .iter() diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 1ab3c1e7f20..f070b8fdd4c 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -149,6 +149,7 @@ pub async fn convert_parquet_directory_to_vortex( ) -> anyhow::Result<()> { let (format, dir_name) = match compaction { CompactionStrategy::Compact => (Format::VortexCompact, Format::VortexCompact.name()), + CompactionStrategy::CudaCompatible => (Format::VortexCuda, Format::VortexCuda.name()), CompactionStrategy::Default => (Format::OnDiskVortex, Format::OnDiskVortex.name()), }; diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index 6dad0f0f6a1..be4b79537bc 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -137,6 +137,9 @@ pub enum Format { #[clap(name = "vortex-compact")] #[serde(rename = "vortex-compact")] VortexCompact, + #[clap(name = "vortex-cuda")] + #[serde(rename = "vortex-cuda")] + VortexCuda, #[clap(name = "duckdb")] #[serde(rename = "duckdb")] OnDiskDuckDB, @@ -176,6 +179,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex-file-compressed", Format::VortexCompact => "vortex-compact", + Format::VortexCuda => "vortex-cuda", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -188,6 +192,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex", Format::VortexCompact => "vortex", + Format::VortexCuda => "vortex", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -222,18 +227,26 @@ impl Display for Engine { #[derive(Debug, Clone, Copy, Default)] pub enum CompactionStrategy { Compact, + CudaCompatible, #[default] Default, } impl CompactionStrategy { pub fn apply_options(&self, options: VortexWriteOptions) -> VortexWriteOptions { + const CUDA_COALESCING_TARGET_BYTES: u64 = 128 * 1024 * 1024; match self { CompactionStrategy::Compact => options.with_strategy( WriteStrategyBuilder::default() .with_compact_encodings() .build(), ), + CompactionStrategy::CudaCompatible => options.with_strategy( + WriteStrategyBuilder::default() + .with_cuda_compatible_encodings() + .with_coalescing_block_size(CUDA_COALESCING_TARGET_BYTES) + .build(), + ), CompactionStrategy::Default => options, } } diff --git a/vortex-cuda/gpu-scan-bench/Cargo.toml b/vortex-cuda/gpu-scan-bench/Cargo.toml new file mode 100644 index 00000000000..557c7506f03 --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "gpu-scan-bench" +authors = { workspace = true } +description = "CUDA GPU scan benchmarks for S3/NVMe" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +publish = false +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true, features = ["derive"] } +futures = { workspace = true, features = ["executor"] } +object_store = { workspace = true, features = ["aws", "fs"] } +tokio = { workspace = true, features = ["macros", "full"] } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-perfetto = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +url = { workspace = true } +vortex = { workspace = true, features = ["tokio", "zstd"] } +vortex-cuda = { workspace = true, features = ["_test-harness", "unstable_encodings"] } +vortex-cuda-macros = { workspace = true } diff --git a/vortex-cuda/gpu-scan-bench/bench_parquet.py b/vortex-cuda/gpu-scan-bench/bench_parquet.py new file mode 100644 index 00000000000..8419d740179 --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/bench_parquet.py @@ -0,0 +1,101 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "cudf-cu12", +# "s3fs", +# ] +# +# [tool.uv] +# extra-index-url = ["https://pypi.nvidia.com"] +# /// +# +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors +# +# Benchmark reading a Parquet file into GPU memory using cuDF. +# This serves as the baseline for comparing against Vortex GPU scans. +# +# Usage: +# uv run bench_parquet.py dataset.parquet --iterations 5 + +import argparse +import sys +import time + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark cuDF GPU parquet reads", + ) + parser.add_argument("source", help="Path to parquet file") + parser.add_argument("--iterations", type=int, default=1, help="Number of scan iterations") + parser.add_argument( + "--row-group-batch-size", + type=int, + default=1, + help="Number of parquet row groups to read per cuDF call when streaming", + ) + parser.add_argument( + "--full-file-read", + action="store_true", + help="Read the full parquet file in one call (old behavior, can OOM)", + ) + args = parser.parse_args() + + import cudf + import fsspec + import pyarrow.parquet as pq + + source = args.source + if args.row_group_batch_size < 1: + raise ValueError("--row-group-batch-size must be >= 1") + + fs, fs_path = fsspec.core.url_to_fs(source) + file_size = fs.size(fs_path) + file_size_mb = file_size / (1024 * 1024) + + num_row_groups = None + if not args.full_file_read: + with fs.open(fs_path, "rb") as parquet_file: + num_row_groups = pq.ParquetFile(parquet_file).metadata.num_row_groups + print( + f"Streaming parquet by row groups: {num_row_groups} total, " + f"batch size={args.row_group_batch_size}", + file=sys.stderr, + ) + + iteration_secs = [] + for i in range(args.iterations): + start = time.perf_counter() + if args.full_file_read: + df = cudf.read_parquet(source) + del df + else: + for rg_start in range(0, num_row_groups, args.row_group_batch_size): + row_groups = list( + range(rg_start, min(rg_start + args.row_group_batch_size, num_row_groups)) + ) + df = cudf.read_parquet(source, row_groups=row_groups) + del df + elapsed = time.perf_counter() - start + iteration_secs.append(elapsed) + print( + f"Iteration {i + 1}/{args.iterations}: {elapsed:.3f}s", + file=sys.stderr, + ) + + avg_secs = sum(iteration_secs) / len(iteration_secs) + throughput_mbs = file_size_mb / avg_secs + + print(file=sys.stderr) + print("=== Benchmark Results ===", file=sys.stderr) + print(f"Source: {source}", file=sys.stderr) + print(f"Iterations: {args.iterations}", file=sys.stderr) + print(f"Avg time: {avg_secs:.3f}s", file=sys.stderr) + print(f"File size: {file_size_mb:.2f} MB", file=sys.stderr) + print(f"Throughput: {throughput_mbs:.2f} MB/s", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs new file mode 100644 index 00000000000..846d12d3dcd --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -0,0 +1,216 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(unused_imports)] + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use clap::Parser; +use futures::TryStreamExt; +use futures::stream::StreamExt; +use object_store::aws::AmazonS3Builder; +use object_store::path::Path as ObjectPath; +use tracing::Instrument; +use tracing_perfetto::PerfettoLayer; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::Layer; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use url::Url; +use vortex::VortexSessionDefault; +use vortex::error::VortexResult; +use vortex::file::OpenOptionsSessionExt; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::CudaSessionExt; +use vortex_cuda::PinnedByteBufferPool; +use vortex_cuda::PooledFileReadAt; +use vortex_cuda::PooledObjectStoreReadAt; +use vortex_cuda::VortexCudaStreamPool; +use vortex_cuda::executor::CudaArrayExt; +use vortex_cuda::layout::register_cuda_layout; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +#[derive(Parser)] +#[command( + name = "gpu-scan-bench", + about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" +)] +struct Cli { + /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. + source: String, + + /// Number of scan iterations. + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Path to write Perfetto trace output. If omitted, no trace file is written. + #[arg(long)] + perfetto: Option, + + /// Number of batches to process concurrently (each on its own CUDA stream). + #[arg(long, default_value_t = 1)] + concurrency: usize, + + /// Output logs as JSON. + #[arg(long)] + json: bool, +} + +#[cuda_not_available] +fn main() {} + +#[cuda_available] +#[tokio::main] +async fn main() -> VortexResult<()> { + let cli = Cli::parse(); + + // Setup tracing + let perfetto_guard = if let Some(ref perfetto_path) = cli.perfetto { + let perfetto_file = File::create(perfetto_path)?; + Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) + } else { + None + }; + + if cli.json { + let log_layer = tracing_subscriber::fmt::layer() + .json() + .with_span_events(FmtSpan::NONE) + .with_ansi(false); + + let registry = tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())); + + if let Some(perfetto) = perfetto_guard { + registry.with(perfetto).init(); + } else { + registry.init(); + } + } else { + let log_layer = tracing_subscriber::fmt::layer() + .pretty() + .with_span_events(FmtSpan::NONE) + .with_ansi(false) + .event_format(tracing_subscriber::fmt::format().with_target(true)); + + let registry = tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())); + + if let Some(perfetto) = perfetto_guard { + registry.with(perfetto).init(); + } else { + registry.init(); + } + } + + let session = VortexSession::default().with_tokio(); + register_cuda_layout(&session); + + let cuda_context = session.cuda_session().context().clone(); + + let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone(&cuda_context))); + let cuda_stream = VortexCudaStreamPool::new(Arc::clone(&cuda_context), 1).get_stream()?; + let handle = session.handle(); + + // Parse source and create reader + let reader: Arc = if cli.source.starts_with("s3://") { + let url = Url::parse(&cli.source) + .map_err(|e| vortex::error::vortex_err!("invalid S3 URL: {e}"))?; + let bucket = url + .host_str() + .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; + let path = ObjectPath::from(url.path()); + let store: Arc = Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build()?, + ); + Arc::new(PooledObjectStoreReadAt::new( + store, + path, + handle, + Arc::clone(&pool), + cuda_stream, + )) + } else { + let path = PathBuf::from(&cli.source); + Arc::new(PooledFileReadAt::open( + &path, + handle, + Arc::clone(&pool), + cuda_stream, + )?) + }; + + // Run benchmark iterations + let mut iteration_times = Vec::with_capacity(cli.iterations); + let concurrency = cli.concurrency; + + for iteration in 0..cli.iterations { + let start = Instant::now(); + + let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; + + let batches = gpu_file.scan()?.into_array_stream()?; + + batches + .enumerate() + .map(|(chunk, batch)| { + let session = &session; + async move { + let batch = batch?; + let len = batch.len(); + let span = tracing::info_span!( + "batch execution", + iteration = iteration, + chunk = chunk, + len = len, + ); + + async { + let mut cuda_ctx = CudaSession::create_execution_ctx(session)?; + batch.execute_cuda(&mut cuda_ctx).await?; + VortexResult::Ok(()) + } + .instrument(span) + .await + } + }) + .buffered(concurrency) + .try_collect::>() + .await?; + + let elapsed = start.elapsed(); + iteration_times.push(elapsed); + tracing::info!( + "Iteration {}/{}: {:.3}s", + iteration + 1, + cli.iterations, + elapsed.as_secs_f64() + ); + } + + // Compute summary stats + let total: std::time::Duration = iteration_times.iter().sum(); + let avg = total / iteration_times.len() as u32; + let file_size = reader.size().await?; + let file_size_mb = file_size as f64 / (1024.0 * 1024.0); + let throughput_mbs = file_size_mb / avg.as_secs_f64(); + // Always print human-readable to stderr + eprintln!(); + eprintln!("=== Benchmark Results ==="); + eprintln!("Source: {}", cli.source); + eprintln!("Iterations: {}", cli.iterations); + eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); + eprintln!("File size: {file_size_mb:.2} MB"); + eprintln!("Throughput: {throughput_mbs:.2} MB/s"); + + Ok(()) +} diff --git a/vortex-cuda/src/kernel/arrays/constant.rs b/vortex-cuda/src/kernel/arrays/constant.rs index 2358ddd0c37..fc11f8bb855 100644 --- a/vortex-cuda/src/kernel/arrays/constant.rs +++ b/vortex-cuda/src/kernel/arrays/constant.rs @@ -10,9 +10,11 @@ use cudarc::driver::PushKernelArg; use tracing::instrument; use vortex::array::ArrayRef; use vortex::array::Canonical; +use vortex::array::IntoArray; use vortex::array::arrays::ConstantArray; use vortex::array::arrays::ConstantVTable; use vortex::array::arrays::DecimalArray; +use vortex::array::arrays::ExtensionArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::buffer::BufferHandle; use vortex::array::match_each_decimal_value_type; @@ -76,6 +78,16 @@ impl CudaExecute for ConstantNumericExecutor { materialize_constant_decimal::(array, decimal_dtype, validity, ctx).await }) } + DType::Extension(ext_dtype) => { + let ext_dtype = ext_dtype.clone(); + let storage_scalar = array.scalar().as_extension().to_storage_scalar(); + let storage_constant = ConstantArray::new(storage_scalar, array.len()).into_array(); + let storage_canonical = self.execute(storage_constant, ctx).await?; + Ok(Canonical::Extension(ExtensionArray::new( + ext_dtype, + storage_canonical.into_array(), + ))) + } dt => vortex_bail!( "CUDA constant array only supports numeric types, got {:?}", dt diff --git a/vortex-cuda/src/kernel/encodings/date_time_parts.rs b/vortex-cuda/src/kernel/encodings/date_time_parts.rs index 5caf66277f7..7302c17c39a 100644 --- a/vortex-cuda/src/kernel/encodings/date_time_parts.rs +++ b/vortex-cuda/src/kernel/encodings/date_time_parts.rs @@ -106,6 +106,13 @@ impl CudaExecute for DateTimePartsExecutor { let seconds_prim = seconds_canonical.into_primitive(); let subseconds_prim = subseconds_canonical.into_primitive(); + // Components may decompress as unsigned (e.g. from BitPacked). Reinterpret + // as signed since the CUDA kernel only has signed variants and casts + // everything to int64_t anyway — the bit pattern is identical. + let days_prim = days_prim.reinterpret_cast(days_prim.ptype().to_signed()); + let seconds_prim = seconds_prim.reinterpret_cast(seconds_prim.ptype().to_signed()); + let subseconds_prim = subseconds_prim.reinterpret_cast(subseconds_prim.ptype().to_signed()); + let days_ptype = days_prim.ptype(); let seconds_ptype = seconds_prim.ptype(); let subseconds_ptype = subseconds_prim.ptype(); diff --git a/vortex-cuda/src/kernel/encodings/for_.rs b/vortex-cuda/src/kernel/encodings/for_.rs index bfc4a3ac3a0..dc8a8957c6a 100644 --- a/vortex-cuda/src/kernel/encodings/for_.rs +++ b/vortex-cuda/src/kernel/encodings/for_.rs @@ -52,8 +52,11 @@ impl CudaExecute for FoRExecutor { let array = Self::try_specialize(array).ok_or_else(|| vortex_err!("Expected FoRArray"))?; // Fuse FOR + BP => FFOR + // Dispatch on the FoR's ptype (not bitpacked's) so the output has the + // correct signedness. The CUDA kernel only depends on bit-width which is + // the same for signed/unsigned pairs (e.g. i16/u16). if let Some(bitpacked) = array.encoded().as_opt::() { - match_each_integer_ptype!(bitpacked.ptype(), |P| { + match_each_integer_ptype!(array.ptype(), |P| { let reference: P = array.reference_scalar().try_into()?; return decode_bitpacked(bitpacked.clone(), reference, ctx).await; }) @@ -64,7 +67,7 @@ impl CudaExecute for FoRExecutor { && let Some(bitpacked) = slice_array.child().as_opt::() { let slice_range = slice_array.slice_range().clone(); - let unpacked = match_each_integer_ptype!(bitpacked.ptype(), |P| { + let unpacked = match_each_integer_ptype!(array.ptype(), |P| { let reference: P = array.reference_scalar().try_into()?; decode_bitpacked(bitpacked.clone(), reference, ctx).await? }); diff --git a/vortex-cuda/src/kernel/encodings/zstd_buffers.rs b/vortex-cuda/src/kernel/encodings/zstd_buffers.rs index 6396f73681c..3fbf1159c49 100644 --- a/vortex-cuda/src/kernel/encodings/zstd_buffers.rs +++ b/vortex-cuda/src/kernel/encodings/zstd_buffers.rs @@ -154,6 +154,7 @@ async fn decode_zstd_buffers( record_statuses, )); + #[cfg(debug_assertions)] validate_decompress_results(&plan, device_actual_sizes, device_statuses).await?; let output_handle = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(device_output))); diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index 1da99eb07d6..a53654dd7e2 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -122,6 +122,11 @@ impl CudaSession { .load_function(module_name, type_suffixes, &self.context) } + /// Returns the underlying CUDA context. + pub fn context(&self) -> &Arc { + &self.context + } + /// Get a handle to the exporter that converts Vortex arrays to `ArrowDeviceArray`. pub fn export_device_array(&self) -> &Arc { &self.export_device_array diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index a9d805d5447..06b59998862 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -126,6 +126,8 @@ pub static ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { pub struct WriteStrategyBuilder { compressor: Option>, row_block_size: usize, + coalescing_block_size_minimum: u64, + coalescing_block_size_target: Option, field_writers: HashMap>, allow_encodings: Option, flat_strategy: Option>, @@ -138,6 +140,8 @@ impl Default for WriteStrategyBuilder { Self { compressor: None, row_block_size: 8192, + coalescing_block_size_minimum: ONE_MEG, + coalescing_block_size_target: Some(ONE_MEG), field_writers: HashMap::new(), allow_encodings: None, flat_strategy: None, @@ -161,6 +165,16 @@ impl WriteStrategyBuilder { self } + /// Override the coalescing partition target used before compression. + /// + /// This controls the uncompressed size of data chunks produced by the coalescing repartition + /// stage. Larger values produce fewer, larger chunks. + pub fn with_coalescing_block_size(mut self, bytes: u64) -> Self { + self.coalescing_block_size_minimum = bytes; + self.coalescing_block_size_target = Some(bytes); + self + } + /// Override the default write layout for a specific field somewhere in the nested /// schema tree. pub fn with_field_writer( @@ -262,9 +276,9 @@ impl WriteStrategyBuilder { // sufficient read concurrency for the desired throughput. One megabyte is small // enough to achieve this for S3 (Durner et al., "Exploiting Cloud Object Storage for // High-Performance Analytics", VLDB Vol 16, Iss 11). - block_size_minimum: ONE_MEG, + block_size_minimum: self.coalescing_block_size_minimum, block_len_multiple: self.row_block_size, - block_size_target: Some(ONE_MEG), + block_size_target: self.coalescing_block_size_target, canonicalize: true, }, );