Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 44 additions & 49 deletions vortex-cuda/src/kernel/patches/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

use vortex::buffer::Buffer;
use vortex::buffer::BufferMut;
use vortex::buffer::buffer_mut;
use vortex_array::Canonical;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::IntegerPType;
Expand Down Expand Up @@ -40,19 +39,6 @@ const fn patch_lanes<V: Sized>() -> usize {
if size_of::<V>() < 8 { 32 } else { 16 }
}

#[derive(Clone)]
struct Chunk<V> {
lanes: Vec<Lane<V>>,
}

impl<V: Copy + Default> Default for Chunk<V> {
fn default() -> Self {
Self {
lanes: vec![Lane::<V>::default(); patch_lanes::<V>()],
}
}
}

/// A set of patches of values `V` existing in host buffers.
#[allow(dead_code)]
pub struct HostPatches<V> {
Expand Down Expand Up @@ -122,23 +108,6 @@ impl<V: Copy> HostPatches<V> {
}
}

#[derive(Debug, Default, Clone)]
struct Lane<V> {
indices: Vec<u16>,
values: Vec<V>,
}

impl<V: Copy> Lane<V> {
fn push(&mut self, index: u16, value: V) {
self.indices.push(index);
self.values.push(value);
}

fn len(&self) -> usize {
self.indices.len()
}
}

/// Transpose a set of patches from the default sorted layout into the data parallel layout.
#[allow(clippy::cognitive_complexity)]
pub async fn transpose_patches(
Expand Down Expand Up @@ -180,8 +149,8 @@ pub async fn transpose_patches(

#[allow(clippy::cast_possible_truncation)]
fn transpose<I: IntegerPType, V: NativePType>(
indices: &[I],
values: &[V],
indices_in: &[I],
values_in: &[V],
offset: usize,
array_len: usize,
) -> HostPatches<V> {
Expand All @@ -193,30 +162,56 @@ fn transpose<I: IntegerPType, V: NativePType>(
);

let n_lanes = patch_lanes::<V>();
let mut chunks: Vec<Chunk<V>> = vec![Chunk::default(); n_chunks];

// For each chunk, for each lane, push new values
for (index, &value) in std::iter::zip(indices, values) {
// We know upfront how many indices and values we'll have.
let mut indices_buffer = BufferMut::with_capacity(indices_in.len());
let mut values_buffer = BufferMut::with_capacity(values_in.len());

// number of patches in each chunk.
let mut lane_offsets: BufferMut<u32> = BufferMut::zeroed(n_chunks * n_lanes + 1);

// Scan the index/values once to get chunk/lane counts
for index in indices_in {
let index = index.as_() - offset;
let chunk = index / 1024;
let lane = index % n_lanes;

lane_offsets[chunk * n_lanes + lane + 1] += 1;
}

// Prefix-sum sizes -> offsets
for index in 1..lane_offsets.len() {
lane_offsets[index] += lane_offsets[index - 1];
}

// Loop over patches, writing them to final positions
let indices_out = indices_buffer.spare_capacity_mut();
let values_out = values_buffer.spare_capacity_mut();
for (index, &value) in std::iter::zip(indices_in, values_in) {
let index = index.as_() - offset;
let chunk = index / 1024;
let lane = index % n_lanes;

chunks[chunk].lanes[lane].push((index % 1024) as u16, value);
let position = &mut lane_offsets[chunk * n_lanes + lane];
indices_out[*position as usize].write((index % 1024) as u16);
values_out[*position as usize].write(value);
*position += 1;
}

// Reshuffle the different containers into a single contiguous buffer each for indices/values
let mut lane_offset = 0;
let mut lane_offsets = buffer_mut![0u32];
let mut indices_buffer = BufferMut::empty();
let mut values_buffer = BufferMut::empty();
for chunk in chunks {
for lane in chunk.lanes {
indices_buffer.extend_from_slice(&lane.indices);
values_buffer.extend_from_slice(&lane.values);
lane_offset += lane.len() as u32;
lane_offsets.push(lane_offset);
}
// SAFETY: we know there are exactly indices_in.len() indices/values, and we just
// set them to the appropriate values in the loop above.
unsafe {
indices_buffer.set_len(indices_in.len());
values_buffer.set_len(values_in.len());
}

// Now, pass over all the indices and values again and subtract out the position increments.
for index in indices_in {
let index = index.as_() - offset;
let chunk = index / 1024;
let lane = index % n_lanes;

lane_offsets[chunk * n_lanes + lane] -= 1;
}

HostPatches {
Expand Down
Loading