Skip to content
Draft
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"vortex-duckdb",
"vortex-cuda",
"vortex-cuda/cub",
"vortex-cuda/gpu-scan-bench",
"vortex-cuda/gpu-scan-cli",
"vortex-cuda/macros",
"vortex-cuda/nvcomp",
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
Format::Csv => Arc::new(CsvFormat::default()) as _,
Format::Arrow => Arc::new(ArrowFormat),
Format::Parquet => Arc::new(ParquetFormat::new()),
Format::OnDiskVortex | Format::VortexCompact => {
Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda => {
Arc::new(VortexFormat::new(SESSION.clone()))
}
Format::OnDiskDuckDB | Format::Lance => {
Expand Down
14 changes: 13 additions & 1 deletion benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ async fn main() -> anyhow::Result<()> {
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact)
.await?;
}
Format::VortexCuda => {
convert_parquet_directory_to_vortex(
&base_path,
CompactionStrategy::CudaCompatible,
)
.await?;
}
_ => {}
}
}
Expand Down Expand Up @@ -233,7 +240,12 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
) -> anyhow::Result<()> {
match format {
Format::Arrow => register_arrow_tables(session, benchmark).await,
_ if use_scan_api() && matches!(format, Format::OnDiskVortex | Format::VortexCompact) => {
_ if use_scan_api()
&& matches!(
format,
Format::OnDiskVortex | Format::VortexCompact | Format::VortexCuda
) =>
{
register_v2_tables(session, benchmark, format).await
}
_ => {
Expand Down
9 changes: 9 additions & 0 deletions vortex-bench/src/bin/data-gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ async fn main() -> anyhow::Result<()> {
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?;
}

if args
.formats
.iter()
.any(|f| matches!(f, Format::VortexCuda))
{
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::CudaCompatible)
.await?;
}

if args
.formats
.iter()
Expand Down
1 change: 1 addition & 0 deletions vortex-bench/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub async fn convert_parquet_directory_to_vortex(
) -> anyhow::Result<()> {
let (format, dir_name) = match compaction {
CompactionStrategy::Compact => (Format::VortexCompact, Format::VortexCompact.name()),
CompactionStrategy::CudaCompatible => (Format::VortexCuda, Format::VortexCuda.name()),
CompactionStrategy::Default => (Format::OnDiskVortex, Format::OnDiskVortex.name()),
};

Expand Down
13 changes: 13 additions & 0 deletions vortex-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ pub enum Format {
#[clap(name = "vortex-compact")]
#[serde(rename = "vortex-compact")]
VortexCompact,
#[clap(name = "vortex-cuda")]
#[serde(rename = "vortex-cuda")]
VortexCuda,
#[clap(name = "duckdb")]
#[serde(rename = "duckdb")]
OnDiskDuckDB,
Expand Down Expand Up @@ -176,6 +179,7 @@ impl Format {
Format::Parquet => "parquet",
Format::OnDiskVortex => "vortex-file-compressed",
Format::VortexCompact => "vortex-compact",
Format::VortexCuda => "vortex-cuda",
Format::OnDiskDuckDB => "duckdb",
Format::Lance => "lance",
}
Expand All @@ -188,6 +192,7 @@ impl Format {
Format::Parquet => "parquet",
Format::OnDiskVortex => "vortex",
Format::VortexCompact => "vortex",
Format::VortexCuda => "vortex",
Format::OnDiskDuckDB => "duckdb",
Format::Lance => "lance",
}
Expand Down Expand Up @@ -222,18 +227,26 @@ impl Display for Engine {
#[derive(Debug, Clone, Copy, Default)]
pub enum CompactionStrategy {
Compact,
CudaCompatible,
#[default]
Default,
}

impl CompactionStrategy {
pub fn apply_options(&self, options: VortexWriteOptions) -> VortexWriteOptions {
const CUDA_COALESCING_TARGET_BYTES: u64 = 128 * 1024 * 1024;
match self {
CompactionStrategy::Compact => options.with_strategy(
WriteStrategyBuilder::default()
.with_compact_encodings()
.build(),
),
CompactionStrategy::CudaCompatible => options.with_strategy(
WriteStrategyBuilder::default()
.with_cuda_compatible_encodings()
.with_coalescing_block_size(CUDA_COALESCING_TARGET_BYTES)
.build(),
),
CompactionStrategy::Default => options,
}
}
Expand Down
29 changes: 29 additions & 0 deletions vortex-cuda/gpu-scan-bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "gpu-scan-bench"
authors = { workspace = true }
description = "CUDA GPU scan benchmarks for S3/NVMe"
edition = { workspace = true }
homepage = { workspace = true }
include = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
publish = false
repository = { workspace = true }
rust-version = { workspace = true }
version = { workspace = true }

[lints]
workspace = true

[dependencies]
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true, features = ["executor"] }
object_store = { workspace = true, features = ["aws", "fs"] }
tokio = { workspace = true, features = ["macros", "full"] }
tracing = { workspace = true, features = ["std", "attributes"] }
tracing-perfetto = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
url = { workspace = true }
vortex = { workspace = true, features = ["tokio", "zstd"] }
vortex-cuda = { workspace = true, features = ["_test-harness", "unstable_encodings"] }
vortex-cuda-macros = { workspace = true }
101 changes: 101 additions & 0 deletions vortex-cuda/gpu-scan-bench/bench_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "cudf-cu12",
# "s3fs",
# ]
#
# [tool.uv]
# extra-index-url = ["https://pypi.nvidia.com"]
# ///
#
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors
#
# Benchmark reading a Parquet file into GPU memory using cuDF.
# This serves as the baseline for comparing against Vortex GPU scans.
#
# Usage:
# uv run bench_parquet.py dataset.parquet --iterations 5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a standalone uv script to do the same scan we do on gpu-scan-bench above, but for parquet instead of vortex


import argparse
import sys
import time


def main():
parser = argparse.ArgumentParser(
description="Benchmark cuDF GPU parquet reads",
)
parser.add_argument("source", help="Path to parquet file")
parser.add_argument("--iterations", type=int, default=1, help="Number of scan iterations")
parser.add_argument(
"--row-group-batch-size",
type=int,
default=1,
help="Number of parquet row groups to read per cuDF call when streaming",
)
parser.add_argument(
"--full-file-read",
action="store_true",
help="Read the full parquet file in one call (old behavior, can OOM)",
)
args = parser.parse_args()

import cudf
import fsspec
import pyarrow.parquet as pq

source = args.source
if args.row_group_batch_size < 1:
raise ValueError("--row-group-batch-size must be >= 1")

fs, fs_path = fsspec.core.url_to_fs(source)
file_size = fs.size(fs_path)
file_size_mb = file_size / (1024 * 1024)

num_row_groups = None
if not args.full_file_read:
with fs.open(fs_path, "rb") as parquet_file:
num_row_groups = pq.ParquetFile(parquet_file).metadata.num_row_groups
print(
f"Streaming parquet by row groups: {num_row_groups} total, "
f"batch size={args.row_group_batch_size}",
file=sys.stderr,
)

iteration_secs = []
for i in range(args.iterations):
start = time.perf_counter()
if args.full_file_read:
df = cudf.read_parquet(source)
del df
else:
for rg_start in range(0, num_row_groups, args.row_group_batch_size):
row_groups = list(
range(rg_start, min(rg_start + args.row_group_batch_size, num_row_groups))
)
df = cudf.read_parquet(source, row_groups=row_groups)
del df
elapsed = time.perf_counter() - start
iteration_secs.append(elapsed)
print(
f"Iteration {i + 1}/{args.iterations}: {elapsed:.3f}s",
file=sys.stderr,
)

avg_secs = sum(iteration_secs) / len(iteration_secs)
throughput_mbs = file_size_mb / avg_secs

print(file=sys.stderr)
print("=== Benchmark Results ===", file=sys.stderr)
print(f"Source: {source}", file=sys.stderr)
print(f"Iterations: {args.iterations}", file=sys.stderr)
print(f"Avg time: {avg_secs:.3f}s", file=sys.stderr)
print(f"File size: {file_size_mb:.2f} MB", file=sys.stderr)
print(f"Throughput: {throughput_mbs:.2f} MB/s", file=sys.stderr)


if __name__ == "__main__":
main()
Loading
Loading