diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 42e9040d86d..2cb77bb7190 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -270,6 +270,7 @@ jobs: run: | git ls-files vortex-cuda vortex-cxx vortex-duckdb vortex-ffi \ | grep -E '\.(cpp|hpp|cu|cuh|h)$' \ + | grep -v 'arrow/reference/arrow_c_device\.h$' \ | grep -v 'kernels/src/bit_unpack_.*\.cu$' \ | grep -v 'kernels/src/bit_unpack_.*_lanes\.cuh$' \ | xargs clang-format --dry-run --Werror --style=file diff --git a/Cargo.lock b/Cargo.lock index 6f2b7bb670e..0805c0f1b4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9481,6 +9481,7 @@ name = "vortex-cuda" version = "0.1.0" dependencies = [ "arc-swap", + "arrow-schema", "async-trait", "bindgen", "bytes", diff --git a/REUSE.toml b/REUSE.toml index 8e406c95c90..d864907d6be 100644 --- a/REUSE.toml +++ b/REUSE.toml @@ -46,3 +46,8 @@ path = ["vortex-cuda/kernels/src/bit_unpack_*"] precedence = "override" SPDX-FileCopyrightText = "Copyright the Vortex contributors" SPDX-License-Identifier = "Apache-2.0" + +[[annotations]] +path = "vortex-cuda/src/arrow/reference/arrow_c_device.h" +SPDX-FileCopyrightText = "2016-2025 Copyright The Apache Software Foundation" +SPDX-License-Identifier = "Apache-2.0" diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 6cab27e6bc3..d540c54b7e6 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -24,6 +24,7 @@ unstable_encodings = ["vortex/unstable_encodings"] [dependencies] arc-swap = { workspace = true } +arrow-schema = { workspace = true, features = ["ffi"] } async-trait = { workspace = true } bytes = { workspace = true } cudarc = { workspace = true, features = ["f16"] } diff --git a/vortex-cuda/build.rs b/vortex-cuda/build.rs index b8673cdda83..972625cd6fb 100644 --- a/vortex-cuda/build.rs +++ b/vortex-cuda/build.rs @@ -56,6 +56,7 @@ fn main() { generate_unpack::(&kernels_src, 16).expect("Failed to generate unpack for u64"); let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR not set")); + generate_arrow_device_array_bindings(Path::new(&manifest_dir), &out_dir); generate_dynamic_dispatch_bindings(&kernels_src, &out_dir); generate_patches_bindings(&kernels_src, &out_dir); @@ -196,6 +197,28 @@ fn nvcc_compile_ptx( Ok(()) } +/// Generate bindings for the vendored Arrow C Device ABI header. +fn generate_arrow_device_array_bindings(manifest_dir: &Path, out_dir: &Path) { + let header = manifest_dir.join("src/arrow/reference/arrow_c_device.h"); + println!("cargo:rerun-if-changed={}", header.display()); + + let bindings = bindgen::Builder::default() + .header(header.to_string_lossy()) + .allowlist_type("ArrowArray") + .allowlist_type("ArrowDeviceArray") + .allowlist_type("ArrowDeviceType") + .allowlist_var("ARROW_DEVICE_.*") + .derive_copy(true) + .derive_debug(true) + .layout_tests(false) + .generate() + .expect("Failed to generate Arrow C Device bindings"); + + bindings + .write_to_file(out_dir.join("arrow_c_abi.rs")) + .expect("Failed to write arrow_c_abi.rs"); +} + /// Generate bindings for the dynamic dispatch shared header. fn generate_dynamic_dispatch_bindings(kernels_src: &Path, out_dir: &Path) { let header = kernels_src.join("dynamic_dispatch.h"); diff --git a/vortex-cuda/kernels/src/varbinview_compute_offsets.cu b/vortex-cuda/kernels/src/varbinview_compute_offsets.cu deleted file mode 100644 index 7b7434fd313..00000000000 --- a/vortex-cuda/kernels/src/varbinview_compute_offsets.cu +++ /dev/null @@ -1,30 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -#include "config.cuh" -#include "varbinview.cuh" - -// single-threaded, compute offsets -extern "C" __global__ void varbinview_compute_offsets(const BinaryView *views, - int64_t num_strings, - Offsets out_offsets, - int32_t *last_offset) { - const int64_t tid = blockIdx.x * blockDim.x + threadIdx.x; - - // force execution to be single-threaded to compute the prefix - // sum. - // TODO(aduffy): we could do this with a CUB kernel instead. - // Check the profiles later to see where this shows up. - if (tid != 0) { - return; - } - - int32_t offset = 0; - out_offsets[0] = 0; - for (int i = 0; i < num_strings; i++) { - offset += views[i].inlined.size; - out_offsets[i + 1] = offset; - } - - *last_offset = offset; -} diff --git a/vortex-cuda/kernels/src/varbinview_copy_strings.cu b/vortex-cuda/kernels/src/varbinview_copy_strings.cu deleted file mode 100644 index f843c9f3b3c..00000000000 --- a/vortex-cuda/kernels/src/varbinview_copy_strings.cu +++ /dev/null @@ -1,39 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -#include "config.cuh" -#include "varbinview.cuh" - -// Lookup a string from a binary view, copying it into a destination buffer. -__device__ void copy_string_to_dst(BinaryView &view, Buffer *buffers, uint8_t *dst) { - int32_t size = view.inlined.size; - uint8_t *src; - if (size <= MAX_INLINED_SIZE) { - // TODO(aduffy): use uint64_t loads instead? - src = view.inlined.data; - } else { - auto ref = view.ref; - src = buffers[ref.index] + ref.offset; - } - memcpy(dst, src, size); -} - -extern "C" __global__ void varbinview_copy_strings(int64_t len, - BinaryView *views, - Buffer *buffers, - Buffer dst_buffer, - Offsets dst_offsets) { - const int64_t tid = blockIdx.x * blockDim.x + threadIdx.x; - - // Each thread is responsible for copying a single string. - // Any excess threads do no work. - if (tid >= len) { - return; - } - - auto view = views[tid]; - int32_t offset = dst_offsets[tid]; - uint8_t *dst = &dst_buffer[offset]; - - copy_string_to_dst(view, buffers, dst); -} diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 0ccace213c9..9b5e5bb55ef 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -15,7 +15,9 @@ use vortex::array::arrays::decimal::DecimalDataParts; use vortex::array::arrays::extension::ExtensionArrayExt; use vortex::array::arrays::primitive::PrimitiveDataParts; use vortex::array::arrays::struct_::StructDataParts; +use vortex::array::arrays::varbinview::VarBinViewDataParts; use vortex::array::buffer::BufferHandle; +use vortex::buffer::Buffer; use vortex::dtype::DecimalType; use vortex::error::VortexResult; use vortex::error::vortex_bail; @@ -23,15 +25,13 @@ use vortex::error::vortex_ensure; use vortex::extension::datetime::AnyTemporal; use crate::CudaExecutionCtx; +use crate::arrow::ARROW_DEVICE_CUDA; use crate::arrow::ArrowArray; use crate::arrow::ArrowDeviceArray; -use crate::arrow::DeviceType; use crate::arrow::ExportDeviceArray; use crate::arrow::PrivateData; use crate::arrow::SyncEvent; use crate::arrow::check_validity_empty; -use crate::arrow::varbinview::BinaryParts; -use crate::arrow::varbinview::copy_varbinview_to_varbin; use crate::executor::CudaArrayExt; /// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by @@ -49,14 +49,14 @@ impl ExportDeviceArray for CanonicalDeviceArrayExport { ) -> VortexResult { let cuda_array = array.execute_cuda(ctx).await?; - let (arrow_array, _) = export_canonical(cuda_array, ctx).await?; + let (arrow_array, sync_event) = export_canonical(cuda_array, ctx).await?; Ok(ArrowDeviceArray { array: arrow_array, - sync_event: None, device_id: ctx.stream().context().ordinal() as i64, - device_type: DeviceType::Cuda, - _reserved: Default::default(), + device_type: ARROW_DEVICE_CUDA, + sync_event, + reserved: Default::default(), }) } } @@ -90,7 +90,7 @@ fn export_canonical( array.release = Some(release_array); // we don't need a sync event for Null since no data is copied. - Ok((array, None)) + Ok((array, ptr::null_mut())) } Canonical::Decimal(decimal) => { let len = decimal.len(); @@ -143,28 +143,50 @@ fn export_canonical( check_validity_empty(&validity)?; + let bits = ctx.ensure_on_device(bits).await?; export_fixed_size(bits, len, offset, ctx) } Canonical::VarBinView(varbinview) => { let len = varbinview.len(); - check_validity_empty(&varbinview.validity()?)?; - - let BinaryParts { offsets, bytes } = - copy_varbinview_to_varbin(varbinview, ctx).await?; + let VarBinViewDataParts { + views, + buffers: data_buffers, + validity, + .. + } = varbinview.into_data_parts(); - let offsets = ctx.ensure_on_device(offsets).await?; - let bytes = ctx.ensure_on_device(bytes).await?; + check_validity_empty(&validity)?; - let buffers = vec![None, Some(offsets), Some(bytes)]; + let views = ctx.ensure_on_device(views).await?; + let mut buffers = Vec::with_capacity(data_buffers.len() + 3); + buffers.push(None); + buffers.push(Some(views)); + for buffer in data_buffers.iter() { + buffers.push(Some(ctx.ensure_on_device(buffer.clone()).await?)); + } + // Nanoarrow's Utf8View/BinaryView C layout stores the variadic data buffer sizes + // as the final buffer slot, after the null bitmap, views, and data buffers. + let variadic_buffer_sizes = data_buffers + .iter() + .map(|buffer| i64::try_from(buffer.len())) + .collect::, _>>()?; + buffers.push(Some( + ctx.ensure_on_device(BufferHandle::new_host( + Buffer::from(variadic_buffer_sizes).into_byte_buffer(), + )) + .await?, + )); + + let n_buffers = i64::try_from(buffers.len())?; let mut private_data = PrivateData::new(buffers, vec![], ctx)?; let sync_event = private_data.sync_event(); - // let arrow_array = ArrowArray { length: len as i64, null_count: 0, offset: 0, - // 1 (optional) buffer for nulls, one buffer for the data - n_buffers: 2, + // Arrow Utf8View/BinaryView layout: optional null bitmap, views, data buffers, + // and trailing variadic buffer sizes. + n_buffers, buffers: private_data.buffer_ptrs.as_mut_ptr(), n_children: 0, children: ptr::null_mut(), @@ -175,9 +197,7 @@ fn export_canonical( Ok((arrow_array, sync_event)) } - // TODO(aduffy): implement VarBinView. cudf doesn't support it, so we need to - // execute a kernel to translate from VarBinView -> VarBin. - c => todo!("support for exporting {} arrays", c.dtype()), + c => vortex_bail!("unsupported Arrow Device export for {} array", c.dtype()), } }) } @@ -233,8 +253,8 @@ fn export_fixed_size( "buffer must already be copied to device before calling" ); - // TODO(aduffy): currently the null buffer is always None, in the future we will need - // to pass it. + // Non-trivial validity is rejected before fixed-size export, so the Arrow null bitmap slot is + // always null for now. Future nullable export support should pass the validity bitmap here. let mut private_data = PrivateData::new(vec![None, Some(buffer)], vec![], ctx)?; let sync_event: SyncEvent = private_data.sync_event(); @@ -261,14 +281,15 @@ unsafe extern "C" fn release_array(array: *mut ArrowArray) { // code. This is necessary to ensure that the fields inside the CudaPrivateData // get dropped to free native/GPU memory. unsafe { + if array.is_null() || (*array).release.is_none() { + return; + } + let private_data_ptr = ptr::replace(&raw mut (*array).private_data, ptr::null_mut()); if !private_data_ptr.is_null() { let mut private_data = Box::from_raw(private_data_ptr.cast::()); - let children = mem::take(&mut private_data.children); - for child in children { - release_array(child); - } + release_children(&mut private_data); } // update the release function to NULL to avoid any possibility of double-frees. @@ -276,11 +297,31 @@ unsafe extern "C" fn release_array(array: *mut ArrowArray) { } } +unsafe fn release_children(private_data: &mut PrivateData) { + unsafe { + let children = mem::take(&mut private_data.children); + for child in children { + if !child.is_null() { + if let Some(release) = (*child).release { + release(child); + } + // Children are allocated with Box::into_raw in PrivateData::new, so the + // release callback must also reclaim the ArrowArray allocation itself. + drop(Box::from_raw(child)); + } + } + } +} + #[cfg(test)] mod tests { + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Schema; use rstest::rstest; use vortex::array::ArrayRef; use vortex::array::IntoArray; + use vortex::array::arrays::BoolArray; use vortex::array::arrays::DecimalArray; use vortex::array::arrays::NullArray; use vortex::array::arrays::PrimitiveArray; @@ -295,11 +336,19 @@ mod tests { use vortex::extension::datetime::TimeUnit; use vortex::session::VortexSession; - use super::release_array; + use crate::arrow::ARROW_DEVICE_CUDA; + use crate::arrow::ArrowArray; use crate::arrow::DeviceArrayExt; - use crate::arrow::DeviceType; use crate::session::CudaSession; + unsafe fn release_exported_array(array: *mut ArrowArray) { + unsafe { + if let Some(release) = (*array).release { + release(array); + } + } + } + #[rstest] #[case::u8(PrimitiveArray::from_iter(0u8..10).into_array(), 10)] #[case::u16(PrimitiveArray::from_iter(0u16..10).into_array(), 10)] @@ -325,9 +374,9 @@ mod tests { assert_eq!(device_array.array.n_buffers, 2); assert_eq!(device_array.array.n_children, 0); assert!(device_array.array.release.is_some()); - assert!(matches!(device_array.device_type, DeviceType::Cuda)); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); - unsafe { release_array(&raw mut device_array.array) }; + unsafe { release_exported_array(&raw mut device_array.array) }; Ok(()) } @@ -341,9 +390,9 @@ mod tests { assert_eq!(device_array.array.length, 7); assert_eq!(device_array.array.null_count, 7); - assert!(matches!(device_array.device_type, DeviceType::Cuda)); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); - unsafe { release_array(&raw mut device_array.array) }; + unsafe { release_exported_array(&raw mut device_array.array) }; Ok(()) } @@ -360,9 +409,9 @@ mod tests { assert_eq!(device_array.array.n_buffers, 2); assert_eq!(device_array.array.n_children, 0); assert!(device_array.array.release.is_some()); - assert!(matches!(device_array.device_type, DeviceType::Cuda)); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); - unsafe { release_array(&raw mut device_array.array) }; + unsafe { release_exported_array(&raw mut device_array.array) }; Ok(()) } @@ -383,9 +432,28 @@ mod tests { assert_eq!(device_array.array.n_buffers, 2); assert_eq!(device_array.array.n_children, 0); assert!(device_array.array.release.is_some()); - assert!(matches!(device_array.device_type, DeviceType::Cuda)); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); - unsafe { release_array(&raw mut device_array.array) }; + unsafe { release_exported_array(&raw mut device_array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_bool() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = BoolArray::from_iter([true, false, true]).into_array(); + let mut device_array = array.export_device_array(&mut ctx).await?; + + assert_eq!(device_array.array.length, 3); + assert_eq!(device_array.array.null_count, 0); + assert_eq!(device_array.array.n_buffers, 2); + assert_eq!(device_array.array.n_children, 0); + assert!(device_array.array.release.is_some()); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut device_array.array) }; Ok(()) } @@ -404,13 +472,19 @@ mod tests { assert_eq!(device_array.array.length, 3); assert_eq!(device_array.array.null_count, 0); - // VarBin export: null buffer + offsets + data - assert_eq!(device_array.array.n_buffers, 2); + // VarBinView export: null buffer + views + data buffers + variadic buffer sizes + assert_eq!(device_array.array.n_buffers, 4); + let n_buffers = usize::try_from(device_array.array.n_buffers)?; + let buffers = unsafe { std::slice::from_raw_parts(device_array.array.buffers, n_buffers) }; + assert!(buffers[0].is_null()); + assert!(!buffers[1].is_null()); + assert!(!buffers[2].is_null()); + assert!(!buffers[3].is_null()); assert_eq!(device_array.array.n_children, 0); assert!(device_array.array.release.is_some()); - assert!(matches!(device_array.device_type, DeviceType::Cuda)); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); - unsafe { release_array(&raw mut device_array.array) }; + unsafe { release_exported_array(&raw mut device_array.array) }; Ok(()) } @@ -437,9 +511,89 @@ mod tests { assert_eq!(device_array.array.n_buffers, 1); assert_eq!(device_array.array.n_children, 2); assert!(device_array.array.release.is_some()); - assert!(matches!(device_array.device_type, DeviceType::Cuda)); + assert_eq!(device_array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut device_array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_struct_with_schema() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = StructArray::new( + FieldNames::from_iter(["a", "b", "c"]), + vec![ + PrimitiveArray::from_iter(0u32..5).into_array(), + PrimitiveArray::from_iter(0i64..5).into_array(), + VarBinViewArray::from_iter_str(["one", "two", "three", "four", "five"]) + .into_array(), + ], + 5, + Validity::NonNullable, + ) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let schema = Schema::try_from(&exported.schema)?; + assert_eq!( + schema, + Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::Int64, false), + Field::new("c", DataType::Utf8View, false), + ]) + ); + assert_eq!(exported.array.array.length, 5); + assert_eq!(exported.array.array.n_buffers, 1); + assert_eq!(exported.array.array.n_children, 3); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_primitive_with_schema_is_column_shaped() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = PrimitiveArray::from_iter(0u32..5).into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!(field, Field::new("", DataType::UInt32, false)); + assert_eq!(exported.array.array.length, 5); + assert_eq!(exported.array.array.n_buffers, 2); + assert_eq!(exported.array.array.n_children, 0); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_varbinview_with_schema_uses_utf8_view_layout() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = VarBinViewArray::from_iter_str([ + "one", + "two", + "this is a longer string for out-of-line storage", + ]) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!(field, Field::new("", DataType::Utf8View, false)); + assert_eq!(exported.array.array.length, 3); + assert_eq!(exported.array.array.n_buffers, 4); + assert_eq!(exported.array.array.n_children, 0); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); - unsafe { release_array(&raw mut device_array.array) }; + unsafe { release_exported_array(&raw mut exported.array.array) }; Ok(()) } } diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 89580290a6a..90e44c2413a 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -9,22 +9,23 @@ //! More documentation at mod canonical; -mod varbinview; use std::ffi::c_void; use std::fmt::Debug; -use std::ptr::NonNull; +use std::ptr; use std::sync::Arc; +use arrow_schema::ffi::FFI_ArrowSchema; use async_trait::async_trait; pub(crate) use canonical::CanonicalDeviceArrayExport; use cudarc::driver::CudaEvent; use cudarc::driver::CudaStream; -use cudarc::driver::sys; use cudarc::runtime::sys::cudaEvent_t; use vortex::array::ArrayRef; +use vortex::array::arrow::ArrowSessionExt; use vortex::array::buffer::BufferHandle; use vortex::array::validity::Validity; +use vortex::dtype::DType; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; @@ -32,67 +33,25 @@ use vortex::error::vortex_err; use crate::CudaBufferExt; use crate::CudaExecutionCtx; -#[derive(Debug, Copy, Clone)] -#[repr(i32)] -pub enum DeviceType { - /// Host-resident data buffer - Cpu = 1, - Cuda = 2, - CudaHost = 3, - // OpenCL = 4, - // Vulkan = 7, - // Metal = 8, - // Vpi = 9, - // Rocm = 10, - // RocmHost = 11, - CudaManaged = 13, - // OneApi = 14, - // WebGPU = 15, - // Hexagon = 16, -} - -/// A (potentially null) pointer to a `cudaEvent_t`. -pub type SyncEvent = Option>; +mod arrow_c_abi { + #![allow(dead_code)] + #![allow(non_camel_case_types)] + #![allow(non_snake_case)] + #![allow(non_upper_case_globals)] + #![allow(clippy::absolute_paths)] -/// The C Device data interface representation of an Arrow array. -/// -/// This array contains on-device pointers to Arrow array data, along with a synchronization -/// event that the client must wait on. -#[repr(C)] -#[derive(Debug)] -pub struct ArrowDeviceArray { - array: ArrowArray, - device_id: i64, - device_type: DeviceType, - sync_event: SyncEvent, - - // unused space reserved for future fields - _reserved: [i64; 3], + include!(concat!(env!("OUT_DIR"), "/arrow_c_abi.rs")); } -unsafe impl Send for ArrowDeviceArray {} +pub use arrow_c_abi::ArrowArray; +pub use arrow_c_abi::ArrowDeviceArray; +pub use arrow_c_abi::ArrowDeviceType; -/// An FFI-compatible version of the ArrowArray that holds pointers to device buffers. -#[repr(C)] -#[derive(Debug)] -pub(crate) struct ArrowArray { - length: i64, - null_count: i64, - offset: i64, - n_buffers: i64, - n_children: i64, - buffers: *mut sys::CUdeviceptr, - children: *mut *mut ArrowArray, - // NOTE: we don't support exporting dictionary arrays, so we leave this as an opaque pointer. - dictionary: *mut (), - release: Option, - // When exported, this MUST contain everything that is owned by this array. - // for example, any buffer pointed to in `buffers` must be here, as well - // as the `buffers` pointer itself. - // In other words, everything in ArrowArray must be owned by - // `private_data` and can assume that they do not outlive `private_data`. - private_data: *mut c_void, -} +/// CUDA device memory. +pub const ARROW_DEVICE_CUDA: ArrowDeviceType = arrow_c_abi::ARROW_DEVICE_CUDA as ArrowDeviceType; + +/// A pointer to a device-specific synchronization event, or null if synchronization is not needed. +pub type SyncEvent = *mut c_void; impl ArrowArray { pub fn empty() -> Self { @@ -102,11 +61,11 @@ impl ArrowArray { offset: 0, n_buffers: 0, n_children: 0, - buffers: std::ptr::null_mut(), - children: std::ptr::null_mut(), - dictionary: std::ptr::null_mut(), + buffers: ptr::null_mut(), + children: ptr::null_mut(), + dictionary: ptr::null_mut(), release: None, - private_data: std::ptr::null_mut(), + private_data: ptr::null_mut(), } } } @@ -126,7 +85,7 @@ pub(crate) struct PrivateData { pub(crate) buffers: Box<[Option]>, /// Boxed slice of buffer pointers. We return a pointer to the start of this allocation over /// the interface, so we hold it here so the Box contents are not freed. - pub(crate) buffer_ptrs: Box<[sys::CUdeviceptr]>, + pub(crate) buffer_ptrs: Box<[*const c_void]>, pub(crate) cuda_event: CudaEvent, pub(crate) cuda_event_ptr: cudaEvent_t, pub(crate) children: Box<[*mut ArrowArray]>, @@ -139,15 +98,17 @@ impl PrivateData { ctx: &mut CudaExecutionCtx, ) -> VortexResult> { let buffers = buffers.into_boxed_slice(); - let buffer_ptrs: Box<[sys::CUdeviceptr]> = buffers + let buffer_ptrs: Box<[*const c_void]> = buffers .iter() .map(|buf| { match buf { None => { // null pointer - Ok(sys::CUdeviceptr::default()) + Ok(ptr::null()) } - Some(handle) => handle.cuda_device_ptr(), + Some(handle) => usize::try_from(handle.cuda_device_ptr()?) + .map(|ptr| ptr as *const c_void) + .map_err(|_| vortex_err!("CUDA device pointer does not fit in usize")), } }) .collect::>>()? @@ -175,16 +136,48 @@ impl PrivateData { } pub(crate) fn sync_event(&mut self) -> SyncEvent { - NonNull::new(&raw mut self.cuda_event_ptr) + (&raw mut self.cuda_event_ptr).cast() } } +/// A Vortex array exported as an Arrow schema and Arrow Device array pair. +#[derive(Debug)] +pub struct ArrowDeviceArrayWithSchema { + /// The Arrow C Data schema describing [`Self::array`]. + /// + /// For top-level Vortex struct arrays this is an Arrow schema (a struct with one child per + /// field). For top-level non-struct arrays this is a single Arrow field schema matching the + /// column-shaped device array. + pub schema: FFI_ArrowSchema, + /// The Arrow C Device array containing the exported device-resident buffers. + pub array: ArrowDeviceArray, +} + #[async_trait] pub trait DeviceArrayExt { + /// Export this array as an Arrow C Device array. + /// + /// The returned array owns any device buffers allocated during export. Call the embedded + /// Arrow release callback when the consumer is done with the array. async fn export_device_array( self, ctx: &mut CudaExecutionCtx, ) -> VortexResult; + + /// Export this array as an Arrow C Device array together with its matching Arrow C schema. + /// + /// Arrow arrays are not self-describing: consumers need both the [`ArrowDeviceArray`] and an + /// Arrow schema to interpret the buffer layout. This helper derives the schema from the + /// Vortex dtype using the session's Arrow conversion rules and returns it alongside the device + /// array. + /// + /// Top-level struct arrays are exported as table-like Arrow schemas and struct-shaped device + /// arrays. Top-level non-struct arrays are exported as column-shaped field schemas and + /// column-shaped device arrays; this method does not wrap them in a single-field struct. + async fn export_device_array_with_schema( + self, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult; } #[async_trait] @@ -196,6 +189,35 @@ impl DeviceArrayExt for ArrayRef { let exporter = Arc::clone(ctx.exporter()); exporter.export_device_array(self, ctx).await } + + async fn export_device_array_with_schema( + self, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let schema = arrow_schema_for_array(&self, ctx)?; + let array = self.export_device_array(ctx).await?; + Ok(ArrowDeviceArrayWithSchema { schema, array }) + } +} + +/// Build the Arrow C schema that describes the device array exported for `array`. +/// +/// Top-level Vortex structs are represented as Arrow schemas, which is the shape expected for +/// table-like consumers. Non-struct arrays are represented as a single Arrow field schema, matching +/// the column-shaped [`ArrowDeviceArray`] returned by [`DeviceArrayExt::export_device_array`]. +fn arrow_schema_for_array( + array: &ArrayRef, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let arrow = ctx.execution_ctx().session().arrow(); + match array.dtype() { + DType::Struct(..) => Ok(FFI_ArrowSchema::try_from( + arrow.to_arrow_schema(array.dtype())?, + )?), + _ => Ok(FFI_ArrowSchema::try_from( + arrow.to_arrow_field("", array.dtype())?, + )?), + } } /// A type that can convert a Vortex array into an [`ArrowDeviceArray`]. diff --git a/vortex-cuda/src/arrow/reference/arrow_c_device.h b/vortex-cuda/src/arrow/reference/arrow_c_device.h new file mode 100644 index 00000000000..ae632f2dbd2 --- /dev/null +++ b/vortex-cuda/src/arrow/reference/arrow_c_device.h @@ -0,0 +1,460 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// \file abi.h Arrow C Data Interface +/// +/// The Arrow C Data interface defines a very small, stable set +/// of C definitions which can be easily copied into any project's +/// source code and vendored to be used for columnar data interchange +/// in the Arrow format. For non-C/C++ languages and runtimes, +/// it should be almost as easy to translate the C definitions into +/// the corresponding C FFI declarations. +/// +/// Applications and libraries can therefore work with Arrow memory +/// without necessarily using the Arrow libraries or reinventing +/// the wheel. Developers can choose between tight integration +/// with the Arrow software project or minimal integration with +/// the Arrow format only. + +#pragma once + +#include + +// Spec and documentation: https://arrow.apache.org/docs/format/CDataInterface.html + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef ARROW_C_DATA_INTERFACE +# define ARROW_C_DATA_INTERFACE + +# define ARROW_FLAG_DICTIONARY_ORDERED 1 +# define ARROW_FLAG_NULLABLE 2 +# define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowSchema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + + // Release callback + void (*release)(struct ArrowSchema*); + // Opaque producer-specific data + void* private_data; +}; + +struct ArrowArray { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + + // Release callback + void (*release)(struct ArrowArray*); + // Opaque producer-specific data + void* private_data; +}; + +# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_EXACT "ARROW:average_byte_width:exact" +# define ARROW_STATISTICS_KEY_AVERAGE_BYTE_WIDTH_APPROXIMATE \ + "ARROW:average_byte_width:approximate" +# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_EXACT "ARROW:distinct_count:exact" +# define ARROW_STATISTICS_KEY_DISTINCT_COUNT_APPROXIMATE \ + "ARROW:distinct_count:approximate" +# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_EXACT "ARROW:max_byte_width:exact" +# define ARROW_STATISTICS_KEY_MAX_BYTE_WIDTH_APPROXIMATE \ + "ARROW:max_byte_width:approximate" +# define ARROW_STATISTICS_KEY_MAX_VALUE_EXACT "ARROW:max_value:exact" +# define ARROW_STATISTICS_KEY_MAX_VALUE_APPROXIMATE "ARROW:max_value:approximate" +# define ARROW_STATISTICS_KEY_MIN_VALUE_EXACT "ARROW:min_value:exact" +# define ARROW_STATISTICS_KEY_MIN_VALUE_APPROXIMATE "ARROW:min_value:approximate" +# define ARROW_STATISTICS_KEY_NULL_COUNT_EXACT "ARROW:null_count:exact" +# define ARROW_STATISTICS_KEY_NULL_COUNT_APPROXIMATE "ARROW:null_count:approximate" +# define ARROW_STATISTICS_KEY_ROW_COUNT_EXACT "ARROW:row_count:exact" +# define ARROW_STATISTICS_KEY_ROW_COUNT_APPROXIMATE "ARROW:row_count:approximate" + +#endif // ARROW_C_DATA_INTERFACE + +#ifndef ARROW_C_DEVICE_DATA_INTERFACE +# define ARROW_C_DEVICE_DATA_INTERFACE + +// Spec and Documentation: https://arrow.apache.org/docs/format/CDeviceDataInterface.html + +// DeviceType for the allocated memory +typedef int32_t ArrowDeviceType; + +// CPU device, same as using ArrowArray directly +# define ARROW_DEVICE_CPU 1 +// CUDA GPU Device +# define ARROW_DEVICE_CUDA 2 +// Pinned CUDA CPU memory by cudaMallocHost +# define ARROW_DEVICE_CUDA_HOST 3 +// OpenCL Device +# define ARROW_DEVICE_OPENCL 4 +// Vulkan buffer for next-gen graphics +# define ARROW_DEVICE_VULKAN 7 +// Metal for Apple GPU +# define ARROW_DEVICE_METAL 8 +// Verilog simulator buffer +# define ARROW_DEVICE_VPI 9 +// ROCm GPUs for AMD GPUs +# define ARROW_DEVICE_ROCM 10 +// Pinned ROCm CPU memory allocated by hipMallocHost +# define ARROW_DEVICE_ROCM_HOST 11 +// Reserved for extension +# define ARROW_DEVICE_EXT_DEV 12 +// CUDA managed/unified memory allocated by cudaMallocManaged +# define ARROW_DEVICE_CUDA_MANAGED 13 +// unified shared memory allocated on a oneAPI non-partitioned device. +# define ARROW_DEVICE_ONEAPI 14 +// GPU support for next-gen WebGPU standard +# define ARROW_DEVICE_WEBGPU 15 +// Qualcomm Hexagon DSP +# define ARROW_DEVICE_HEXAGON 16 + +struct ArrowDeviceArray { + // the Allocated Array + // + // the buffers in the array (along with the buffers of any + // children) are what is allocated on the device. + struct ArrowArray array; + // The device id to identify a specific device + int64_t device_id; + // The type of device which can access this memory. + ArrowDeviceType device_type; + // An event-like object to synchronize on if needed. + void* sync_event; + // Reserved bytes for future expansion. + int64_t reserved[3]; +}; + +#endif // ARROW_C_DEVICE_DATA_INTERFACE + +#ifndef ARROW_C_STREAM_INTERFACE +# define ARROW_C_STREAM_INTERFACE + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_STREAM_INTERFACE + +#ifndef ARROW_C_DEVICE_STREAM_INTERFACE +# define ARROW_C_DEVICE_STREAM_INTERFACE + +// Equivalent to ArrowArrayStream, but for ArrowDeviceArrays. +// +// This stream is intended to provide a stream of data on a single +// device, if a producer wants data to be produced on multiple devices +// then multiple streams should be provided. One per device. +struct ArrowDeviceArrayStream { + // The device that this stream produces data on. + ArrowDeviceType device_type; + + // Callback to get the stream schema + // (will be the same for all arrays in the stream). + // + // Return value 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + // The schema should be accessible via CPU memory. + int (*get_schema)(struct ArrowDeviceArrayStream* self, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowDeviceArray must be released independently from the stream. + int (*get_next)(struct ArrowDeviceArrayStream* self, struct ArrowDeviceArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowDeviceArrayStream* self); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowDeviceArrayStream* self); + + // Opaque producer-specific data + void* private_data; +}; + +#endif // ARROW_C_DEVICE_STREAM_INTERFACE + +#ifndef ARROW_C_ASYNC_STREAM_INTERFACE +# define ARROW_C_ASYNC_STREAM_INTERFACE + +// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that was passed +// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler. +// +// The reason for this Task approach instead of the Async interface returning +// the Array directly is to allow for more complex thread handling and reducing +// context switching and data transfers between CPU cores (e.g. from one L1/L2 +// cache to another) if desired. +// +// For example, the `on_next_task` callback can be called when data is ready, while +// the producer puts potential "decoding" logic in the `ArrowAsyncTask` object. This +// allows for the producer to manage the I/O on one thread which calls `on_next_task` +// and the consumer can determine when the decoding (producer logic in the `extract_data` +// callback of the task) occurs and on which thread, to avoid a CPU core transfer +// (data staying in the L2 cache). +struct ArrowAsyncTask { + // This callback should populate the ArrowDeviceArray associated with this task. + // The order of ArrowAsyncTasks provided by the producer enables a consumer to + // ensure the order of data to process. + // + // This function is expected to be synchronous, but should not perform any blocking + // I/O. Ideally it should be as cheap as possible so as to not tie up the consumer + // thread unnecessarily. + // + // Returns: 0 if successful, errno-compatible error otherwise. + // + // If a non-0 value is returned then it should be followed by a call to `on_error` + // on the appropriate ArrowAsyncDeviceStreamHandler. This is because it's highly + // likely that whatever is calling this function may be entirely disconnected from + // the current control flow. Indicating an error here with a non-zero return allows + // the current flow to be aware of the error occurring, while still allowing any + // logging or error handling to still be centralized in the `on_error` callback of + // the original Async handler. + // + // Rather than a release callback, any required cleanup should be performed as part + // of the invocation of `extract_data`. Ownership of the Array is passed to the consumer + // calling this, and so it must be released separately. + // + // It is only valid to call this method exactly once. + int (*extract_data)(struct ArrowAsyncTask* self, struct ArrowDeviceArray* out); + + // opaque task-specific data + void* private_data; +}; + +// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between an async +// producer and consumer. This object allows the consumer to perform backpressure and flow +// control on the asynchronous stream processing. This object must be owned by the +// producer who creates it, and thus is responsible for cleaning it up. +struct ArrowAsyncProducer { + // The device type that this stream produces data on. + ArrowDeviceType device_type; + + // A consumer must call this function to start receiving on_next_task calls. + // + // It *must* be valid to call this synchronously from within `on_next_task` or + // `on_schema`, but this function *must not* immediately call `on_next_task` so as + // to avoid recursion and reentrant callbacks. + // + // After cancel has been called, additional calls to this function must be NOPs, + // but allowed. While not cancelled, calling this function must register the + // given number of additional arrays/batches to be produced with the producer. + // The producer should only call `on_next_task` at most the registered number + // of arrays before propagating backpressure. + // + // Any error encountered by calling request must be propagated by calling the `on_error` + // callback of the ArrowAsyncDeviceStreamHandler. + // + // While not cancelled, any subsequent calls to `on_next_task`, `on_error` or + // `release` should be scheduled by the producer to be called later. + // + // It is invalid for a consumer to call this with a value of n <= 0, producers should + // error if given such a value. + void (*request)(struct ArrowAsyncProducer* self, int64_t n); + + // This cancel callback signals a producer that it must eventually stop making calls + // to on_next_task. It must be idempotent and thread-safe. After calling cancel once, + // subsequent calls must be NOPs. This must not call any consumer-side handlers other + // than `on_error`. + // + // It is not required that calling cancel affect the producer immediately, only that it + // must eventually stop calling on_next_task and subsequently call release on the + // async handler. As such, a consumer must be prepared to receive one or more calls to + // `on_next_task` even after calling cancel if there are still requested arrays pending. + // + // Successful cancellation should *not* result in the producer calling `on_error`, it + // should finish out any remaining tasks and eventually call `release`. + // + // Any error encountered during handling a call to cancel must be reported via the + // on_error callback on the async stream handler. + void (*cancel)(struct ArrowAsyncProducer* self); + + // Any additional metadata tied to a specific stream of data. This must either be NULL + // or a valid pointer to metadata which is encoded in the same way schema metadata + // would be. Non-null metadata must be valid for the lifetime of this object. As an + // example a producer could use this to provide the total number of rows and/or batches + // in the stream if known. + const char* additional_metadata; + + // producer-specific opaque data. + void* private_data; +}; + +// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// style of interaction. While ArrowDeviceArrayStream provides producer +// defined callbacks, this is intended to be created by the consumer instead. +// The consumer passes this handler to the producer, which in turn uses the +// callbacks to inform the consumer of events in the stream. +struct ArrowAsyncDeviceStreamHandler { + // Handler for receiving a schema. The passed in stream_schema must be + // released or moved by the handler (producer is giving ownership of the schema to + // the handler, but not ownership of the top level object itself). + // + // With the exception of an error occurring (on_error), this must be the first + // callback function which is called by a producer and must only be called exactly + // once. As such, the producer should provide a valid ArrowAsyncProducer instance + // so the consumer can control the flow. See the documentation on ArrowAsyncProducer + // for how it works. The ArrowAsyncProducer is owned by the producer who calls this + // function and thus the producer is responsible for cleaning it up when calling + // the release callback of this handler. + // + // If there is any additional metadata tied to this stream, it will be provided as + // a non-null value for the `additional_metadata` field of the ArrowAsyncProducer + // which will be valid at least until the release callback is called. + // + // Return value: 0 if successful, `errno`-compatible error otherwise + // + // A producer that receives a non-zero return here should stop producing and eventually + // call release instead. + int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowSchema* stream_schema); + + // Handler for receiving data. This is called when data is available providing an + // ArrowAsyncTask struct to signify it. The producer indicates the end of the stream + // by passing NULL as the value for the task rather than a valid pointer to a task. + // The task object is only valid for the lifetime of this function call, if a consumer + // wants to utilize it after this function returns, it must copy or move the contents + // of it to a new ArrowAsyncTask object. + // + // The `request` callback of a provided ArrowAsyncProducer must be called in order + // to start receiving calls to this handler. + // + // The metadata argument can be null or can be used by a producer + // to pass arbitrary extra information to the consumer (such as total number + // of rows, context info, or otherwise). The data should be passed using the same + // encoding as the metadata within the ArrowSchema struct itself (defined in + // the spec at + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata) + // + // If metadata is non-null then it only needs to exist for the lifetime of this call, + // a consumer who wants it to live after that must copy it to ensure lifetime. + // + // A producer *must not* call this concurrently from multiple different threads. + // + // A consumer must be prepared to receive one or more calls to this callback even + // after calling cancel on the corresponding ArrowAsyncProducer, as cancel does not + // guarantee it happens immediately. + // + // Return value: 0 if successful, `errno`-compatible error otherwise. + // + // If the consumer returns a non-zero return from this method, that indicates to the + // producer that it should stop propagating data as an error occurred. After receiving + // such a return, the only interaction with this object is for the producer to call + // the `release` callback. + int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self, + struct ArrowAsyncTask* task, const char* metadata); + + // Handler for encountering an error. The producer should call release after + // this returns to clean up any resources. The `code` passed in can be any error + // code that a producer wants, but should be errno-compatible for consistency. + // + // If the message or metadata are non-null, they will only last as long as this + // function call. The consumer would need to perform a copy of the data if it is + // necessary for them to live past the lifetime of this call. + // + // Error metadata should be encoded as with metadata in ArrowSchema, defined in + // the spec at + // https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.metadata + // + // It is valid for this to be called by a producer with or without a preceding call + // to ArrowAsyncProducer.request. + // + // This callback must not call any methods of an ArrowAsyncProducer object. + void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self, int code, + const char* message, const char* metadata); + + // Release callback to release any resources for the handler. Should always be + // called by a producer when it is done utilizing a handler. No callbacks should + // be called after this is called. + // + // It is valid for the release callback to be called by a producer with or without + // a preceding call to ArrowAsyncProducer.request. + // + // The release callback must not call any methods of an ArrowAsyncProducer object. + void (*release)(struct ArrowAsyncDeviceStreamHandler* self); + + // MUST be populated by the producer BEFORE calling any callbacks other than release. + // This provides the connection between a handler and its producer, and must exist until + // the release callback is called. + struct ArrowAsyncProducer* producer; + + // Opaque handler-specific data + void* private_data; +}; + +#endif // ARROW_C_ASYNC_STREAM_INTERFACE + +#ifdef __cplusplus +} +#endif diff --git a/vortex-cuda/src/arrow/varbinview.rs b/vortex-cuda/src/arrow/varbinview.rs deleted file mode 100644 index c000d44908c..00000000000 --- a/vortex-cuda/src/arrow/varbinview.rs +++ /dev/null @@ -1,160 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! We force conversion to VarBin from VarBinView. We parallelize -//! all the necessary string copying using some kernels. - -use std::sync::Arc; - -use cudarc::driver::LaunchConfig; -use cudarc::driver::PushKernelArg; -use vortex::array::arrays::VarBinViewArray; -use vortex::array::arrays::varbinview::VarBinViewDataParts; -use vortex::array::buffer::BufferHandle; -use vortex::error::VortexExpect; -use vortex::error::VortexResult; -use vortex::error::vortex_err; - -use crate::CudaBufferExt; -use crate::CudaDeviceBuffer; -use crate::CudaExecutionCtx; -use crate::arrow::check_validity_empty; - -/// Parts of a binary array (VarBin). -/// -/// We return them as buffer handles directly since we don't need the full VarBin array. -pub(crate) struct BinaryParts { - pub(crate) offsets: BufferHandle, - pub(crate) bytes: BufferHandle, -} - -pub(crate) async fn copy_varbinview_to_varbin( - array: VarBinViewArray, - ctx: &mut CudaExecutionCtx, -) -> VortexResult { - let len = array.len(); - let VarBinViewDataParts { - views, - buffers, - validity, - .. - } = array.into_data_parts(); - - // TODO(aduffy): handle nulls - check_validity_empty(&validity)?; - - // copy all buffers over to device. - let views = ctx.ensure_on_device(views).await?; - // before string copying, we must copy all string data buffers to the device. - let mut device_buffers = vec![]; - for buffer in buffers.iter() { - device_buffers.push(ctx.ensure_on_device(buffer.clone()).await?); - } - - let buffer_ptrs = device_buffers - .iter() - .map(|b| b.cuda_device_ptr()) - .collect::>>()?; - - // clone the buffer_ptrs to device so that we can pass it as an `uint8_t**` to the kernels - let buffer_ptrs_device = ctx - .stream() - .clone_htod(&buffer_ptrs) - .map_err(|e| vortex_err!("failed copying buffer_ptrs to device: {e}"))?; - - // single-threaded, launch the kernel for building the assets - let compute_offsets = ctx.load_function("varbinview_compute_offsets", &[])?; - - // allocate the final offsets buffer. - let offsets = ctx.device_alloc::(len + 1)?; - let len_i64 = len as i64; - - let views_view = views.cuda_view::()?; - let offsets_view = offsets.as_view(); - - let last_offset_device = ctx.device_alloc::(1)?; - let last_offset_device_view = last_offset_device.as_view(); - - let mut kernel = ctx.launch_builder(&compute_offsets); - kernel.arg(&views_view); - kernel.arg(&len_i64); - kernel.arg(&offsets_view); - kernel.arg(&last_offset_device_view); - - let single_threaded_cfg = LaunchConfig { - grid_dim: (1, 1, 1), - block_dim: (1, 1, 1), - shared_mem_bytes: 0, - }; - - // Launch the kernel - // SAFETY: we do not access any of the buffers we passed in until after the - // kernel completes and we synchronize the stream. - unsafe { - kernel - .launch(single_threaded_cfg) - .map_err(|d| vortex_err!("compute_offsets kernel failure: {d}"))?; - } - - // synchronize so the offset writes complete - // now it is safe to read the memory again. - ctx.stream() - .synchronize() - .map_err(|d| vortex_err!("synchronize stream failed: {d}"))?; - - let last_offset_host = ctx - .stream() - .clone_dtoh(&last_offset_device) - .map_err(|e| vortex_err!("failed reading last_offset_device back to host: {e}"))?; - - // allocate a string buffer large enough to hold all strings. - let data_buf = ctx.device_alloc::(last_offset_host[0] as usize)?; - - // now setup and launch the parallel string copy kernel - let copy_strings = ctx.load_function("varbinview_copy_strings", &[])?; - - let buffer_ptrs_view = buffer_ptrs_device.as_view(); - let data_buf_view = data_buf.as_view(); - - let mut kernel = ctx.launch_builder(©_strings); - kernel.arg(&len_i64); - kernel.arg(&views_view); - kernel.arg(&buffer_ptrs_view); - kernel.arg(&data_buf_view); - kernel.arg(&offsets_view); - - // we do a fully parallel string copy. each thread is responsible for issuing copy - // for a single string. - let threads_per_blocks = 256u32; - let n_blocks = len - .div_ceil(threads_per_blocks as usize) - .try_into() - .vortex_expect("n_blocks should never overflow u32"); - let fully_parallel_cfg = LaunchConfig { - grid_dim: (n_blocks, 1, 1), - block_dim: (threads_per_blocks, 1, 1), - shared_mem_bytes: 0, - }; - - // SAFETY: downstream callers should synchronize the stream before accessing the values. - unsafe { - kernel - .launch(fully_parallel_cfg) - .map_err(|d| vortex_err!("copy_strings kernel failure: {d}"))?; - } - - // synchronize? - ctx.stream() - .synchronize() - .map_err(|e| vortex_err!("synchronize failure: {e}"))?; - - // now, offsets should contain the final offsets, and data_buf should contain all the - // string data. - let bytes_handle = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(data_buf))); - let offsets_handle = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(offsets))); - - Ok(BinaryParts { - bytes: bytes_handle, - offsets: offsets_handle, - }) -} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 5692bb6affc..d2227579d9f 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -22,6 +22,8 @@ mod session; mod stream; mod stream_pool; +pub use arrow::ArrowDeviceArrayWithSchema; +pub use arrow::DeviceArrayExt; pub use arrow::ExportDeviceArray; pub use canonical::CanonicalCudaExt; pub use device_buffer::CudaBufferExt; diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 6ae3b2fccf7..52fff823600 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -27,7 +27,6 @@ use arrow_array::cast::AsArray; use arrow_array::ffi::FFI_ArrowArray; use arrow_array::ffi::from_ffi; use arrow_array::make_array; -use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Fields; use arrow_schema::ffi::FFI_ArrowSchema; @@ -96,18 +95,10 @@ pub unsafe extern "C" fn export_array( ) .into_array(); - let data_type = DataType::Struct(Fields::from_iter([ - Field::new("prims", DataType::UInt32, false), - Field::new("decimals", DataType::Decimal128(38, 2), false), - Field::new("strings", DataType::Utf8, false), - Field::new("dates", DataType::Date32, false), - ])); - - *schema_ptr = FFI_ArrowSchema::try_from(data_type).expect("data_type to FFI_ArrowSchema"); - - match block_on(array.export_device_array(&mut ctx)) { + match block_on(array.export_device_array_with_schema(&mut ctx)) { Ok(exported) => { - *array_ptr = exported; + *schema_ptr = exported.schema; + *array_ptr = exported.array; 0 } Err(err) => {