From f08430d4b74a29852986a0440eae398fa37f2ef6 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 27 Feb 2026 11:17:06 +0000 Subject: [PATCH 01/14] gpu scan bench Signed-off-by: Onur Satici --- Cargo.lock | 17 ++ Cargo.toml | 1 + vortex-cuda/gpu-scan-bench/Cargo.toml | 29 ++++ vortex-cuda/gpu-scan-bench/src/main.rs | 208 +++++++++++++++++++++++++ 4 files changed, 255 insertions(+) create mode 100644 vortex-cuda/gpu-scan-bench/Cargo.toml create mode 100644 vortex-cuda/gpu-scan-bench/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 27205b18086..1a54422dbba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4099,6 +4099,23 @@ dependencies = [ "yansi", ] +[[package]] +name = "gpu-scan-bench" +version = "0.1.0" +dependencies = [ + "clap", + "futures", + "object_store", + "tokio", + "tracing", + "tracing-perfetto", + "tracing-subscriber", + "url", + "vortex", + "vortex-cuda", + "vortex-cuda-macros", +] + [[package]] name = "gpu-scan-cli" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b5490594dd7..490ccce980a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "vortex-duckdb", "vortex-cuda", "vortex-cuda/cub", + "vortex-cuda/gpu-scan-bench", "vortex-cuda/gpu-scan-cli", "vortex-cuda/macros", "vortex-cuda/nvcomp", diff --git a/vortex-cuda/gpu-scan-bench/Cargo.toml b/vortex-cuda/gpu-scan-bench/Cargo.toml new file mode 100644 index 00000000000..ec4a736fe7a --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "gpu-scan-bench" +authors = { workspace = true } +description = "CUDA GPU scan benchmarks for S3/NVMe" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +publish = false +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +clap = { workspace = true, features = ["derive"] } +futures = { workspace = true, features = ["executor"] } +object_store = { workspace = true, features = ["aws", "fs"] } +tokio = { workspace = true, features = ["macros", "full"] } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-perfetto = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +url = { workspace = true } +vortex = { workspace = true } +vortex-cuda = { workspace = true, features = ["_test-harness"] } +vortex-cuda-macros = { workspace = true } diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs new file mode 100644 index 00000000000..a1418161e12 --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -0,0 +1,208 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(unused_imports)] + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use clap::Parser; +use futures::StreamExt; +use object_store::aws::AmazonS3Builder; +use object_store::path::Path as ObjectPath; +use tracing::Instrument; +use tracing_perfetto::PerfettoLayer; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::Layer; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use url::Url; +use vortex::VortexSessionDefault; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::file::OpenOptionsSessionExt; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::PinnedByteBufferPool; +use vortex_cuda::PooledFileReadAt; +use vortex_cuda::PooledObjectStoreReadAt; +use vortex_cuda::TracingLaunchStrategy; +use vortex_cuda::VortexCudaStreamPool; +use vortex_cuda::executor::CudaArrayExt; +use vortex_cuda::layout::register_cuda_layout; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +#[derive(Parser)] +#[command( + name = "gpu-scan-bench", + about = "Benchmark GPU scans of CUDA-compatible Vortex files from S3 or local storage" +)] +struct Cli { + /// S3 URI (s3://bucket/path) or local path to a CUDA-compatible .vortex file. + source: String, + + /// Number of scan iterations. + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Path to write Perfetto trace output. If omitted, no trace file is written. + #[arg(long)] + perfetto: Option, + + /// Output logs as JSON. + #[arg(long)] + json: bool, +} + +#[cuda_not_available] +fn main() {} + +#[cuda_available] +#[tokio::main] +async fn main() -> VortexResult<()> { + let cli = Cli::parse(); + + // Setup tracing + let perfetto_guard = if let Some(ref perfetto_path) = cli.perfetto { + let perfetto_file = File::create(perfetto_path)?; + Some(PerfettoLayer::new(perfetto_file).with_debug_annotations(true)) + } else { + None + }; + + if cli.json { + let log_layer = tracing_subscriber::fmt::layer() + .json() + .with_span_events(FmtSpan::NONE) + .with_ansi(false); + + let mut registry = tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())); + + if let Some(perfetto) = perfetto_guard { + registry.with(perfetto).init(); + } else { + registry.init(); + } + } else { + let log_layer = tracing_subscriber::fmt::layer() + .pretty() + .with_span_events(FmtSpan::NONE) + .with_ansi(false) + .event_format(tracing_subscriber::fmt::format().with_target(true)); + + let mut registry = tracing_subscriber::registry() + .with(log_layer.with_filter(EnvFilter::from_default_env())); + + if let Some(perfetto) = perfetto_guard { + registry.with(perfetto).init(); + } else { + registry.init(); + } + } + + let session = VortexSession::default(); + register_cuda_layout(&session); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? + .with_launch_strategy(Arc::new(TracingLaunchStrategy)); + + let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( + cuda_ctx.stream().context(), + ))); + let cuda_stream = + VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).get_stream()?; + let handle = session.handle(); + + // Parse source and create reader + let reader: Arc = if cli.source.starts_with("s3://") { + let url = Url::parse(&cli.source)?; + let bucket = url + .host_str() + .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; + let path = ObjectPath::from(url.path()); + let store: Arc = Arc::new( + AmazonS3Builder::from_env() + .with_bucket_name(bucket) + .build()?, + ); + Arc::new(PooledObjectStoreReadAt::new( + store, + path, + handle, + Arc::clone(&pool), + cuda_stream, + )) + } else { + let path = PathBuf::from(&cli.source); + Arc::new(PooledFileReadAt::open( + &path, + handle, + Arc::clone(&pool), + cuda_stream, + )?) + }; + + // Run benchmark iterations + let mut iteration_times = Vec::with_capacity(cli.iterations); + + for iteration in 0..cli.iterations { + let start = Instant::now(); + + let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; + + let mut batches = gpu_file.scan()?.into_array_stream()?; + + let mut chunk = 0; + while let Some(next) = batches.next().await.transpose()? { + let len = next.len(); + let span = tracing::info_span!( + "batch execution", + iteration = iteration, + chunk = chunk, + len = len, + ); + + async { + next.execute_cuda(&mut cuda_ctx).await?; + VortexResult::Ok(()) + } + .instrument(span) + .await?; + + chunk += 1; + } + + let elapsed = start.elapsed(); + iteration_times.push(elapsed); + tracing::info!( + "Iteration {}/{}: {:.3}s", + iteration + 1, + cli.iterations, + elapsed.as_secs_f64() + ); + } + + // Print summary + let total: std::time::Duration = iteration_times.iter().sum(); + let avg = total / iteration_times.len() as u32; + + // Get file size for throughput + let file_size = reader.size().await?; + let throughput_mbs = (file_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64(); + + eprintln!(); + eprintln!("=== Benchmark Results ==="); + eprintln!("Source: {}", cli.source); + eprintln!("Iterations: {}", cli.iterations); + eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); + eprintln!("File size: {:.2} MB", file_size as f64 / (1024.0 * 1024.0)); + eprintln!("Throughput: {throughput_mbs:.2} MB/s"); + + Ok(()) +} From 15b87c4af941921758963592862abc483a162338 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 27 Feb 2026 17:53:51 +0000 Subject: [PATCH 02/14] parquet bench Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/bench_parquet.py | 69 +++++++++++++++++++++ vortex-cuda/gpu-scan-bench/src/main.rs | 11 ++-- 2 files changed, 75 insertions(+), 5 deletions(-) create mode 100644 vortex-cuda/gpu-scan-bench/bench_parquet.py diff --git a/vortex-cuda/gpu-scan-bench/bench_parquet.py b/vortex-cuda/gpu-scan-bench/bench_parquet.py new file mode 100644 index 00000000000..0bcbc5aa47d --- /dev/null +++ b/vortex-cuda/gpu-scan-bench/bench_parquet.py @@ -0,0 +1,69 @@ +#!/usr/bin/env -S uv run --script +# /// script +# requires-python = ">=3.12" +# dependencies = [ +# "cudf-cu12", +# ] +# +# [tool.uv] +# extra-index-url = ["https://pypi.nvidia.com"] +# /// +# +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors +# +# Benchmark reading a Parquet file into GPU memory using cuDF. +# This serves as the baseline for comparing against Vortex GPU scans. +# +# Usage: +# uv run bench_parquet.py dataset.parquet --iterations 5 + +import argparse +import json +import os +import sys +import time + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark cuDF GPU parquet reads", + ) + parser.add_argument("source", help="Path to parquet file") + parser.add_argument("--iterations", type=int, default=1, help="Number of scan iterations") + args = parser.parse_args() + + import cudf + import fsspec + + source = args.source + fs, fs_path = fsspec.core.url_to_fs(source) + file_size = fs.size(fs_path) + file_size_mb = file_size / (1024 * 1024) + + iteration_secs = [] + for i in range(args.iterations): + start = time.perf_counter() + df = cudf.read_parquet(source) + elapsed = time.perf_counter() - start + iteration_secs.append(elapsed) + print( + f"Iteration {i + 1}/{args.iterations}: {elapsed:.3f}s", + file=sys.stderr, + ) + del df + + avg_secs = sum(iteration_secs) / len(iteration_secs) + throughput_mbs = file_size_mb / avg_secs + + print(file=sys.stderr) + print("=== Benchmark Results ===", file=sys.stderr) + print(f"Source: {source}", file=sys.stderr) + print(f"Iterations: {args.iterations}", file=sys.stderr) + print(f"Avg time: {avg_secs:.3f}s", file=sys.stderr) + print(f"File size: {file_size_mb:.2f} MB", file=sys.stderr) + print(f"Throughput: {throughput_mbs:.2f} MB/s", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs index a1418161e12..678cde7baa4 100644 --- a/vortex-cuda/gpu-scan-bench/src/main.rs +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -188,20 +188,21 @@ async fn main() -> VortexResult<()> { ); } - // Print summary + // Compute summary stats let total: std::time::Duration = iteration_times.iter().sum(); let avg = total / iteration_times.len() as u32; - - // Get file size for throughput let file_size = reader.size().await?; - let throughput_mbs = (file_size as f64 / (1024.0 * 1024.0)) / avg.as_secs_f64(); + let file_size_mb = file_size as f64 / (1024.0 * 1024.0); + let throughput_mbs = file_size_mb / avg.as_secs_f64(); + let iteration_secs: Vec = iteration_times.iter().map(|d| d.as_secs_f64()).collect(); + // Always print human-readable to stderr eprintln!(); eprintln!("=== Benchmark Results ==="); eprintln!("Source: {}", cli.source); eprintln!("Iterations: {}", cli.iterations); eprintln!("Avg time: {:.3}s", avg.as_secs_f64()); - eprintln!("File size: {:.2} MB", file_size as f64 / (1024.0 * 1024.0)); + eprintln!("File size: {file_size_mb:.2} MB"); eprintln!("Throughput: {throughput_mbs:.2} MB/s"); Ok(()) From 0a7058d4a85d059cb5875ea9beea7e26e572cd60 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 18:11:06 +0000 Subject: [PATCH 03/14] fix from impl Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs index 678cde7baa4..74380c8f7cd 100644 --- a/vortex-cuda/gpu-scan-bench/src/main.rs +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -121,7 +121,8 @@ async fn main() -> VortexResult<()> { // Parse source and create reader let reader: Arc = if cli.source.starts_with("s3://") { - let url = Url::parse(&cli.source)?; + let url = Url::parse(&cli.source) + .map_err(|e| vortex::error::vortex_err!("invalid S3 URL: {e}"))?; let bucket = url .host_str() .ok_or_else(|| vortex::error::vortex_err!("S3 URL missing bucket name"))?; From 674416674c90781f164b03be4f506e5cc011f8b1 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 18:15:56 +0000 Subject: [PATCH 04/14] tokio Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/Cargo.toml | 2 +- vortex-cuda/gpu-scan-bench/src/main.rs | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/vortex-cuda/gpu-scan-bench/Cargo.toml b/vortex-cuda/gpu-scan-bench/Cargo.toml index ec4a736fe7a..fab85568c32 100644 --- a/vortex-cuda/gpu-scan-bench/Cargo.toml +++ b/vortex-cuda/gpu-scan-bench/Cargo.toml @@ -24,6 +24,6 @@ tracing = { workspace = true, features = ["std", "attributes"] } tracing-perfetto = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } url = { workspace = true } -vortex = { workspace = true } +vortex = { workspace = true, features = ["tokio"] } vortex-cuda = { workspace = true, features = ["_test-harness"] } vortex-cuda-macros = { workspace = true } diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs index 74380c8f7cd..15bf8c87fbd 100644 --- a/vortex-cuda/gpu-scan-bench/src/main.rs +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -81,7 +81,7 @@ async fn main() -> VortexResult<()> { .with_span_events(FmtSpan::NONE) .with_ansi(false); - let mut registry = tracing_subscriber::registry() + let registry = tracing_subscriber::registry() .with(log_layer.with_filter(EnvFilter::from_default_env())); if let Some(perfetto) = perfetto_guard { @@ -96,7 +96,7 @@ async fn main() -> VortexResult<()> { .with_ansi(false) .event_format(tracing_subscriber::fmt::format().with_target(true)); - let mut registry = tracing_subscriber::registry() + let registry = tracing_subscriber::registry() .with(log_layer.with_filter(EnvFilter::from_default_env())); if let Some(perfetto) = perfetto_guard { @@ -106,7 +106,7 @@ async fn main() -> VortexResult<()> { } } - let session = VortexSession::default(); + let session = VortexSession::default().with_tokio(); register_cuda_layout(&session); let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? @@ -195,8 +195,6 @@ async fn main() -> VortexResult<()> { let file_size = reader.size().await?; let file_size_mb = file_size as f64 / (1024.0 * 1024.0); let throughput_mbs = file_size_mb / avg.as_secs_f64(); - let iteration_secs: Vec = iteration_times.iter().map(|d| d.as_secs_f64()).collect(); - // Always print human-readable to stderr eprintln!(); eprintln!("=== Benchmark Results ==="); From 96d37fb22c9df50504202887bc0ad95455e900e6 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 18:22:26 +0000 Subject: [PATCH 05/14] zstd Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-cuda/gpu-scan-bench/Cargo.toml b/vortex-cuda/gpu-scan-bench/Cargo.toml index fab85568c32..557c7506f03 100644 --- a/vortex-cuda/gpu-scan-bench/Cargo.toml +++ b/vortex-cuda/gpu-scan-bench/Cargo.toml @@ -24,6 +24,6 @@ tracing = { workspace = true, features = ["std", "attributes"] } tracing-perfetto = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } url = { workspace = true } -vortex = { workspace = true, features = ["tokio"] } -vortex-cuda = { workspace = true, features = ["_test-harness"] } +vortex = { workspace = true, features = ["tokio", "zstd"] } +vortex-cuda = { workspace = true, features = ["_test-harness", "unstable_encodings"] } vortex-cuda-macros = { workspace = true } From 36bbe76e3d81119cbc2bb417f80098204fdc96f5 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 18:31:51 +0000 Subject: [PATCH 06/14] ffor, datetimeparts Signed-off-by: Onur Satici --- vortex-cuda/src/kernel/encodings/date_time_parts.rs | 7 +++++++ vortex-cuda/src/kernel/encodings/for_.rs | 7 +++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/vortex-cuda/src/kernel/encodings/date_time_parts.rs b/vortex-cuda/src/kernel/encodings/date_time_parts.rs index 5caf66277f7..7302c17c39a 100644 --- a/vortex-cuda/src/kernel/encodings/date_time_parts.rs +++ b/vortex-cuda/src/kernel/encodings/date_time_parts.rs @@ -106,6 +106,13 @@ impl CudaExecute for DateTimePartsExecutor { let seconds_prim = seconds_canonical.into_primitive(); let subseconds_prim = subseconds_canonical.into_primitive(); + // Components may decompress as unsigned (e.g. from BitPacked). Reinterpret + // as signed since the CUDA kernel only has signed variants and casts + // everything to int64_t anyway — the bit pattern is identical. + let days_prim = days_prim.reinterpret_cast(days_prim.ptype().to_signed()); + let seconds_prim = seconds_prim.reinterpret_cast(seconds_prim.ptype().to_signed()); + let subseconds_prim = subseconds_prim.reinterpret_cast(subseconds_prim.ptype().to_signed()); + let days_ptype = days_prim.ptype(); let seconds_ptype = seconds_prim.ptype(); let subseconds_ptype = subseconds_prim.ptype(); diff --git a/vortex-cuda/src/kernel/encodings/for_.rs b/vortex-cuda/src/kernel/encodings/for_.rs index bfc4a3ac3a0..dc8a8957c6a 100644 --- a/vortex-cuda/src/kernel/encodings/for_.rs +++ b/vortex-cuda/src/kernel/encodings/for_.rs @@ -52,8 +52,11 @@ impl CudaExecute for FoRExecutor { let array = Self::try_specialize(array).ok_or_else(|| vortex_err!("Expected FoRArray"))?; // Fuse FOR + BP => FFOR + // Dispatch on the FoR's ptype (not bitpacked's) so the output has the + // correct signedness. The CUDA kernel only depends on bit-width which is + // the same for signed/unsigned pairs (e.g. i16/u16). if let Some(bitpacked) = array.encoded().as_opt::() { - match_each_integer_ptype!(bitpacked.ptype(), |P| { + match_each_integer_ptype!(array.ptype(), |P| { let reference: P = array.reference_scalar().try_into()?; return decode_bitpacked(bitpacked.clone(), reference, ctx).await; }) @@ -64,7 +67,7 @@ impl CudaExecute for FoRExecutor { && let Some(bitpacked) = slice_array.child().as_opt::() { let slice_range = slice_array.slice_range().clone(); - let unpacked = match_each_integer_ptype!(bitpacked.ptype(), |P| { + let unpacked = match_each_integer_ptype!(array.ptype(), |P| { let reference: P = array.reference_scalar().try_into()?; decode_bitpacked(bitpacked.clone(), reference, ctx).await? }); From 967667074b2122fd72bb55b099170c7fc7d81323 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 18:45:20 +0000 Subject: [PATCH 07/14] constant Signed-off-by: Onur Satici --- vortex-cuda/src/kernel/arrays/constant.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/vortex-cuda/src/kernel/arrays/constant.rs b/vortex-cuda/src/kernel/arrays/constant.rs index 2358ddd0c37..fc11f8bb855 100644 --- a/vortex-cuda/src/kernel/arrays/constant.rs +++ b/vortex-cuda/src/kernel/arrays/constant.rs @@ -10,9 +10,11 @@ use cudarc::driver::PushKernelArg; use tracing::instrument; use vortex::array::ArrayRef; use vortex::array::Canonical; +use vortex::array::IntoArray; use vortex::array::arrays::ConstantArray; use vortex::array::arrays::ConstantVTable; use vortex::array::arrays::DecimalArray; +use vortex::array::arrays::ExtensionArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::buffer::BufferHandle; use vortex::array::match_each_decimal_value_type; @@ -76,6 +78,16 @@ impl CudaExecute for ConstantNumericExecutor { materialize_constant_decimal::(array, decimal_dtype, validity, ctx).await }) } + DType::Extension(ext_dtype) => { + let ext_dtype = ext_dtype.clone(); + let storage_scalar = array.scalar().as_extension().to_storage_scalar(); + let storage_constant = ConstantArray::new(storage_scalar, array.len()).into_array(); + let storage_canonical = self.execute(storage_constant, ctx).await?; + Ok(Canonical::Extension(ExtensionArray::new( + ext_dtype, + storage_canonical.into_array(), + ))) + } dt => vortex_bail!( "CUDA constant array only supports numeric types, got {:?}", dt From 8bbe84305f485e8eb315f2b893b64075a75e31d3 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 18:58:59 +0000 Subject: [PATCH 08/14] tracing launch strategy only on perfetto Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/src/main.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs index 15bf8c87fbd..f0e0123a6be 100644 --- a/vortex-cuda/gpu-scan-bench/src/main.rs +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -109,8 +109,10 @@ async fn main() -> VortexResult<()> { let session = VortexSession::default().with_tokio(); register_cuda_layout(&session); - let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? - .with_launch_strategy(Arc::new(TracingLaunchStrategy)); + let mut cuda_ctx = CudaSession::create_execution_ctx(&session)?; + if cli.perfetto.is_some() { + cuda_ctx = cuda_ctx.with_launch_strategy(Arc::new(TracingLaunchStrategy)); + } let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( cuda_ctx.stream().context(), From e474389719e72e772db0a7a942adc1a0c95f2bd9 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sun, 1 Mar 2026 19:04:07 +0000 Subject: [PATCH 09/14] validate only on debug Signed-off-by: Onur Satici --- vortex-cuda/src/kernel/encodings/zstd_buffers.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vortex-cuda/src/kernel/encodings/zstd_buffers.rs b/vortex-cuda/src/kernel/encodings/zstd_buffers.rs index 6396f73681c..3fbf1159c49 100644 --- a/vortex-cuda/src/kernel/encodings/zstd_buffers.rs +++ b/vortex-cuda/src/kernel/encodings/zstd_buffers.rs @@ -154,6 +154,7 @@ async fn decode_zstd_buffers( record_statuses, )); + #[cfg(debug_assertions)] validate_decompress_results(&plan, device_actual_sizes, device_statuses).await?; let output_handle = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(device_output))); From c2370316b5e8d31e126a903c86f7afbf8b1a9fee Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 2 Mar 2026 10:42:26 +0000 Subject: [PATCH 10/14] use a cuda context per batch and add buffered Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/src/main.rs | 70 ++++++++++++++------------ vortex-cuda/src/session.rs | 5 ++ 2 files changed, 43 insertions(+), 32 deletions(-) diff --git a/vortex-cuda/gpu-scan-bench/src/main.rs b/vortex-cuda/gpu-scan-bench/src/main.rs index f0e0123a6be..846d12d3dcd 100644 --- a/vortex-cuda/gpu-scan-bench/src/main.rs +++ b/vortex-cuda/gpu-scan-bench/src/main.rs @@ -9,7 +9,8 @@ use std::sync::Arc; use std::time::Instant; use clap::Parser; -use futures::StreamExt; +use futures::TryStreamExt; +use futures::stream::StreamExt; use object_store::aws::AmazonS3Builder; use object_store::path::Path as ObjectPath; use tracing::Instrument; @@ -22,15 +23,14 @@ use tracing_subscriber::util::SubscriberInitExt; use url::Url; use vortex::VortexSessionDefault; use vortex::error::VortexResult; -use vortex::error::vortex_bail; use vortex::file::OpenOptionsSessionExt; use vortex::io::session::RuntimeSessionExt; use vortex::session::VortexSession; use vortex_cuda::CudaSession; +use vortex_cuda::CudaSessionExt; use vortex_cuda::PinnedByteBufferPool; use vortex_cuda::PooledFileReadAt; use vortex_cuda::PooledObjectStoreReadAt; -use vortex_cuda::TracingLaunchStrategy; use vortex_cuda::VortexCudaStreamPool; use vortex_cuda::executor::CudaArrayExt; use vortex_cuda::layout::register_cuda_layout; @@ -54,6 +54,10 @@ struct Cli { #[arg(long)] perfetto: Option, + /// Number of batches to process concurrently (each on its own CUDA stream). + #[arg(long, default_value_t = 1)] + concurrency: usize, + /// Output logs as JSON. #[arg(long)] json: bool, @@ -109,16 +113,10 @@ async fn main() -> VortexResult<()> { let session = VortexSession::default().with_tokio(); register_cuda_layout(&session); - let mut cuda_ctx = CudaSession::create_execution_ctx(&session)?; - if cli.perfetto.is_some() { - cuda_ctx = cuda_ctx.with_launch_strategy(Arc::new(TracingLaunchStrategy)); - } + let cuda_context = session.cuda_session().context().clone(); - let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone( - cuda_ctx.stream().context(), - ))); - let cuda_stream = - VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).get_stream()?; + let pool = Arc::new(PinnedByteBufferPool::new(Arc::clone(&cuda_context))); + let cuda_stream = VortexCudaStreamPool::new(Arc::clone(&cuda_context), 1).get_stream()?; let handle = session.handle(); // Parse source and create reader @@ -153,34 +151,42 @@ async fn main() -> VortexResult<()> { // Run benchmark iterations let mut iteration_times = Vec::with_capacity(cli.iterations); + let concurrency = cli.concurrency; for iteration in 0..cli.iterations { let start = Instant::now(); let gpu_file = session.open_options().open(Arc::clone(&reader)).await?; - let mut batches = gpu_file.scan()?.into_array_stream()?; - - let mut chunk = 0; - while let Some(next) = batches.next().await.transpose()? { - let len = next.len(); - let span = tracing::info_span!( - "batch execution", - iteration = iteration, - chunk = chunk, - len = len, - ); - - async { - next.execute_cuda(&mut cuda_ctx).await?; - VortexResult::Ok(()) - } - .instrument(span) + let batches = gpu_file.scan()?.into_array_stream()?; + + batches + .enumerate() + .map(|(chunk, batch)| { + let session = &session; + async move { + let batch = batch?; + let len = batch.len(); + let span = tracing::info_span!( + "batch execution", + iteration = iteration, + chunk = chunk, + len = len, + ); + + async { + let mut cuda_ctx = CudaSession::create_execution_ctx(session)?; + batch.execute_cuda(&mut cuda_ctx).await?; + VortexResult::Ok(()) + } + .instrument(span) + .await + } + }) + .buffered(concurrency) + .try_collect::>() .await?; - chunk += 1; - } - let elapsed = start.elapsed(); iteration_times.push(elapsed); tracing::info!( diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index 1da99eb07d6..a53654dd7e2 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -122,6 +122,11 @@ impl CudaSession { .load_function(module_name, type_suffixes, &self.context) } + /// Returns the underlying CUDA context. + pub fn context(&self) -> &Arc { + &self.context + } + /// Get a handle to the exporter that converts Vortex arrays to `ArrowDeviceArray`. pub fn export_device_array(&self) -> &Arc { &self.export_device_array From eb12dd9998b2b0229f55013b8e771127e9b25efd Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 2 Mar 2026 12:14:59 +0000 Subject: [PATCH 11/14] data-gen can create vortex-cuda for benchmarks Signed-off-by: Onur Satici --- benchmarks/datafusion-bench/src/lib.rs | 2 +- benchmarks/datafusion-bench/src/main.rs | 14 +++++++++++++- vortex-bench/src/bin/data-gen.rs | 9 +++++++++ vortex-bench/src/conversions.rs | 1 + vortex-bench/src/lib.rs | 11 +++++++++++ 5 files changed, 35 insertions(+), 2 deletions(-) diff --git a/benchmarks/datafusion-bench/src/lib.rs b/benchmarks/datafusion-bench/src/lib.rs index 43b86331e9f..626b4ac0f1b 100644 --- a/benchmarks/datafusion-bench/src/lib.rs +++ b/benchmarks/datafusion-bench/src/lib.rs @@ -109,7 +109,7 @@ pub fn format_to_df_format(format: Format) -> Arc { Format::Csv => Arc::new(CsvFormat::default()) as _, Format::Arrow => Arc::new(ArrowFormat), Format::Parquet => Arc::new(ParquetFormat::new()), - Format::OnDiskVortex | Format::VortexCompact => { + Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda => { Arc::new(VortexFormat::new(SESSION.clone())) } Format::OnDiskDuckDB | Format::Lance => { diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index f915c9c7d3e..5869e7ec6f3 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -140,6 +140,13 @@ async fn main() -> anyhow::Result<()> { convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact) .await?; } + Format::VortexCuda => { + convert_parquet_directory_to_vortex( + &base_path, + CompactionStrategy::CudaCompatible, + ) + .await?; + } _ => {} } } @@ -233,7 +240,12 @@ async fn register_benchmark_tables( ) -> anyhow::Result<()> { match format { Format::Arrow => register_arrow_tables(session, benchmark).await, - _ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => { + _ if use_scan_api() + && matches!( + format, + Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda + ) => + { register_v2_tables(session, benchmark, format).await } _ => { diff --git a/vortex-bench/src/bin/data-gen.rs b/vortex-bench/src/bin/data-gen.rs index d38c977a6c6..c61131479ff 100644 --- a/vortex-bench/src/bin/data-gen.rs +++ b/vortex-bench/src/bin/data-gen.rs @@ -79,6 +79,15 @@ async fn main() -> anyhow::Result<()> { convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?; } + if args + .formats + .iter() + .any(|f| matches!(f, Format::VortexCuda)) + { + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::CudaCompatible) + .await?; + } + if args .formats .iter() diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 1ab3c1e7f20..f070b8fdd4c 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -149,6 +149,7 @@ pub async fn convert_parquet_directory_to_vortex( ) -> anyhow::Result<()> { let (format, dir_name) = match compaction { CompactionStrategy::Compact => (Format::VortexCompact, Format::VortexCompact.name()), + CompactionStrategy::CudaCompatible => (Format::VortexCuda, Format::VortexCuda.name()), CompactionStrategy::Default => (Format::OnDiskVortex, Format::OnDiskVortex.name()), }; diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index 6dad0f0f6a1..feccd9d79e2 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -137,6 +137,9 @@ pub enum Format { #[clap(name = "vortex-compact")] #[serde(rename = "vortex-compact")] VortexCompact, + #[clap(name = "vortex-cuda")] + #[serde(rename = "vortex-cuda")] + VortexCuda, #[clap(name = "duckdb")] #[serde(rename = "duckdb")] OnDiskDuckDB, @@ -176,6 +179,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex-file-compressed", Format::VortexCompact => "vortex-compact", + Format::VortexCuda => "vortex-cuda", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -188,6 +192,7 @@ impl Format { Format::Parquet => "parquet", Format::OnDiskVortex => "vortex", Format::VortexCompact => "vortex", + Format::VortexCuda => "vortex", Format::OnDiskDuckDB => "duckdb", Format::Lance => "lance", } @@ -222,6 +227,7 @@ impl Display for Engine { #[derive(Debug, Clone, Copy, Default)] pub enum CompactionStrategy { Compact, + CudaCompatible, #[default] Default, } @@ -234,6 +240,11 @@ impl CompactionStrategy { .with_compact_encodings() .build(), ), + CompactionStrategy::CudaCompatible => options.with_strategy( + WriteStrategyBuilder::default() + .with_cuda_compatible_encodings() + .build(), + ), CompactionStrategy::Default => options, } } From c6e294fd5c592db1a609a49d31521bcc00941b0a Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 2 Mar 2026 13:48:49 +0000 Subject: [PATCH 12/14] stream parquet Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/bench_parquet.py | 39 ++++++++++++++++++--- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/vortex-cuda/gpu-scan-bench/bench_parquet.py b/vortex-cuda/gpu-scan-bench/bench_parquet.py index 0bcbc5aa47d..f91210a3072 100644 --- a/vortex-cuda/gpu-scan-bench/bench_parquet.py +++ b/vortex-cuda/gpu-scan-bench/bench_parquet.py @@ -19,8 +19,6 @@ # uv run bench_parquet.py dataset.parquet --iterations 5 import argparse -import json -import os import sys import time @@ -31,27 +29,60 @@ def main(): ) parser.add_argument("source", help="Path to parquet file") parser.add_argument("--iterations", type=int, default=1, help="Number of scan iterations") + parser.add_argument( + "--row-group-batch-size", + type=int, + default=1, + help="Number of parquet row groups to read per cuDF call when streaming", + ) + parser.add_argument( + "--full-file-read", + action="store_true", + help="Read the full parquet file in one call (old behavior, can OOM)", + ) args = parser.parse_args() import cudf import fsspec + import pyarrow.parquet as pq source = args.source + if args.row_group_batch_size < 1: + raise ValueError("--row-group-batch-size must be >= 1") + fs, fs_path = fsspec.core.url_to_fs(source) file_size = fs.size(fs_path) file_size_mb = file_size / (1024 * 1024) + num_row_groups = None + if not args.full_file_read: + with fs.open(fs_path, "rb") as parquet_file: + num_row_groups = pq.ParquetFile(parquet_file).metadata.num_row_groups + print( + f"Streaming parquet by row groups: {num_row_groups} total, " + f"batch size={args.row_group_batch_size}", + file=sys.stderr, + ) + iteration_secs = [] for i in range(args.iterations): start = time.perf_counter() - df = cudf.read_parquet(source) + if args.full_file_read: + df = cudf.read_parquet(source) + del df + else: + for rg_start in range(0, num_row_groups, args.row_group_batch_size): + row_groups = list( + range(rg_start, min(rg_start + args.row_group_batch_size, num_row_groups)) + ) + df = cudf.read_parquet(source, row_groups=row_groups) + del df elapsed = time.perf_counter() - start iteration_secs.append(elapsed) print( f"Iteration {i + 1}/{args.iterations}: {elapsed:.3f}s", file=sys.stderr, ) - del df avg_secs = sum(iteration_secs) / len(iteration_secs) throughput_mbs = file_size_mb / avg_secs From 0780c87891b527922e36df4d6abb40b5650d3bbb Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 2 Mar 2026 14:00:05 +0000 Subject: [PATCH 13/14] cuda to have 128MB chunks Signed-off-by: Onur Satici --- vortex-bench/src/lib.rs | 2 ++ vortex-file/src/strategy.rs | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index feccd9d79e2..be4b79537bc 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -234,6 +234,7 @@ pub enum CompactionStrategy { impl CompactionStrategy { pub fn apply_options(&self, options: VortexWriteOptions) -> VortexWriteOptions { + const CUDA_COALESCING_TARGET_BYTES: u64 = 128 * 1024 * 1024; match self { CompactionStrategy::Compact => options.with_strategy( WriteStrategyBuilder::default() @@ -243,6 +244,7 @@ impl CompactionStrategy { CompactionStrategy::CudaCompatible => options.with_strategy( WriteStrategyBuilder::default() .with_cuda_compatible_encodings() + .with_coalescing_block_size(CUDA_COALESCING_TARGET_BYTES) .build(), ), CompactionStrategy::Default => options, diff --git a/vortex-file/src/strategy.rs b/vortex-file/src/strategy.rs index a9d805d5447..06b59998862 100644 --- a/vortex-file/src/strategy.rs +++ b/vortex-file/src/strategy.rs @@ -126,6 +126,8 @@ pub static ALLOWED_ENCODINGS: LazyLock = LazyLock::new(|| { pub struct WriteStrategyBuilder { compressor: Option>, row_block_size: usize, + coalescing_block_size_minimum: u64, + coalescing_block_size_target: Option, field_writers: HashMap>, allow_encodings: Option, flat_strategy: Option>, @@ -138,6 +140,8 @@ impl Default for WriteStrategyBuilder { Self { compressor: None, row_block_size: 8192, + coalescing_block_size_minimum: ONE_MEG, + coalescing_block_size_target: Some(ONE_MEG), field_writers: HashMap::new(), allow_encodings: None, flat_strategy: None, @@ -161,6 +165,16 @@ impl WriteStrategyBuilder { self } + /// Override the coalescing partition target used before compression. + /// + /// This controls the uncompressed size of data chunks produced by the coalescing repartition + /// stage. Larger values produce fewer, larger chunks. + pub fn with_coalescing_block_size(mut self, bytes: u64) -> Self { + self.coalescing_block_size_minimum = bytes; + self.coalescing_block_size_target = Some(bytes); + self + } + /// Override the default write layout for a specific field somewhere in the nested /// schema tree. pub fn with_field_writer( @@ -262,9 +276,9 @@ impl WriteStrategyBuilder { // sufficient read concurrency for the desired throughput. One megabyte is small // enough to achieve this for S3 (Durner et al., "Exploiting Cloud Object Storage for // High-Performance Analytics", VLDB Vol 16, Iss 11). - block_size_minimum: ONE_MEG, + block_size_minimum: self.coalescing_block_size_minimum, block_len_multiple: self.row_block_size, - block_size_target: Some(ONE_MEG), + block_size_target: self.coalescing_block_size_target, canonicalize: true, }, ); From 4be081e069ffed979b66715a6c44c811c80cce86 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 2 Mar 2026 14:01:16 +0000 Subject: [PATCH 14/14] s3fs to uv Signed-off-by: Onur Satici --- vortex-cuda/gpu-scan-bench/bench_parquet.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vortex-cuda/gpu-scan-bench/bench_parquet.py b/vortex-cuda/gpu-scan-bench/bench_parquet.py index f91210a3072..8419d740179 100644 --- a/vortex-cuda/gpu-scan-bench/bench_parquet.py +++ b/vortex-cuda/gpu-scan-bench/bench_parquet.py @@ -3,6 +3,7 @@ # requires-python = ">=3.12" # dependencies = [ # "cudf-cu12", +# "s3fs", # ] # # [tool.uv]