Skip to content

Commit 210b6d9

Browse files
Added tests for flattening schema and counting total number of leaves
1 parent e3b4d6a commit 210b6d9

2 files changed

Lines changed: 137 additions & 14 deletions

File tree

datafusion/src/datasource/file_format/parquet.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl FileFormat for ParquetFormat {
124124
fn summarize_min_max(
125125
max_values: &mut Vec<Option<MaxAccumulator>>,
126126
min_values: &mut Vec<Option<MinAccumulator>>,
127-
fields: &Vec<&Field>,
127+
fields: &Vec<Field>,
128128
i: usize,
129129
stat: &ParquetStatistics,
130130
) {
@@ -280,9 +280,6 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
280280
.iter()
281281
.flat_map(|c| c.statistics().map(|stats| stats.null_count()));
282282

283-
let cols_vec: Vec<u64> = columns_null_counts.clone().collect();
284-
let cols_vec_num = cols_vec.len();
285-
286283
for (i, cnt) in columns_null_counts.enumerate() {
287284
null_counts[i] += cnt as usize
288285
}

datafusion/src/datasource/mod.rs

Lines changed: 136 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ pub async fn get_statistics_with_limit(
4949
) -> Result<(Vec<PartitionedFile>, Statistics)> {
5050
let mut result_files = vec![];
5151

52-
let flat_schema = flatten_schema(&file_schema);
52+
let total_num_fields = total_number_of_fields(&file_schema);
5353
let mut total_byte_size = 0;
54-
let mut null_counts = vec![0; flat_schema.len()];
54+
let mut null_counts = vec![0; total_num_fields];
5555
let mut has_statistics = false;
5656
let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
5757

@@ -180,8 +180,8 @@ fn get_col_stats(
180180
max_values: &mut Vec<Option<MaxAccumulator>>,
181181
min_values: &mut Vec<Option<MinAccumulator>>,
182182
) -> Vec<ColumnStatistics> {
183-
let flat_schema = flatten_schema(schema);
184-
(0..flat_schema.len())
183+
let total_num_fields = total_number_of_fields(schema);
184+
(0..total_num_fields)
185185
.map(|i| {
186186
let max_value = match &max_values[i] {
187187
Some(max_value) => max_value.evaluate().ok(),
@@ -201,26 +201,152 @@ fn get_col_stats(
201201
.collect()
202202
}
203203

204-
fn flatten_schema(schema: &Schema) -> Vec<&Field> {
205-
fn fetch_children(field: &Field) -> Vec<&Field> {
206-
let mut collected_fields: Vec<&Field> = vec![];
204+
fn total_number_of_fields(schema: &Schema) -> usize {
205+
fn count_children(field: &Field) -> usize {
206+
let mut num_children: usize = 0;
207+
match field.data_type() {
208+
DataType::Struct(fields) | DataType::Union(fields) => {
209+
let counts_arr = fields.iter().map(|f| count_children(f)).collect::<Vec<usize>>();
210+
let c: usize = counts_arr.iter().sum();
211+
num_children += c
212+
},
213+
DataType::List(f)
214+
| DataType::LargeList(f)
215+
| DataType::FixedSizeList(f, _)
216+
| DataType::Map(f, _) => {
217+
let c: usize = count_children(f);
218+
num_children += c
219+
},
220+
_ => num_children += 1,
221+
}
222+
num_children
223+
}
224+
let top_level_fields = schema.fields();
225+
let top_level_counts = top_level_fields.iter().map(|i| count_children(i)).collect::<Vec<usize>>();
226+
top_level_counts.iter().sum()
227+
}
228+
229+
fn flatten_schema(schema: &Schema) -> Vec<Field> {
230+
fn fetch_children(field: Field) -> Vec<Field> {
231+
let mut collected_fields: Vec<Field> = vec![];
207232
let data_type = field.data_type();
208233
match data_type {
209234
DataType::Struct(fields) | DataType::Union(fields) => collected_fields
210-
.extend(fields.iter().map(|f| fetch_children(f)).flatten()),
235+
.extend(fields.iter().map(|f| {
236+
let full_name = format!("{}.{}", field.name(), f.name());
237+
let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable());
238+
fetch_children(f_new)
239+
}).flatten()),
211240
DataType::List(f)
212241
| DataType::LargeList(f)
213242
| DataType::FixedSizeList(f, _)
214-
| DataType::Map(f, _) => collected_fields.extend(fetch_children(f)),
243+
| DataType::Map(f, _) => {
244+
let full_name = format!("{}.{}", field.name(), f.name());
245+
let f_new = Field::new(&full_name, f.data_type().clone(), f.is_nullable());
246+
collected_fields.extend(fetch_children(f_new))
247+
},
215248
_ => collected_fields.push(field),
216249
}
217250
collected_fields
218251
}
219252
let top_level_fields = schema.fields();
220253
let flatten = top_level_fields
221254
.iter()
222-
.map(|f| fetch_children(f))
255+
.map(|f| fetch_children(f.clone()))
223256
.flatten()
224257
.collect();
225258
flatten
226259
}
260+
261+
#[cfg(test)]
262+
mod tests {
263+
use arrow::datatypes::TimeUnit;
264+
265+
use super::*;
266+
267+
#[tokio::test]
268+
async fn test_total_number_of_fields() -> Result<()> {
269+
let fields: Vec<Field> = vec![
270+
Field::new("id", DataType::Int16, false),
271+
Field::new("name", DataType::Utf8, false),
272+
Field::new("nested1", DataType::Struct(vec![
273+
Field::new("str1", DataType::Utf8, false),
274+
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false),
275+
]), false),
276+
Field::new("nested2", DataType::Struct(vec![
277+
Field::new("nested_a", DataType::Struct(vec![
278+
Field::new("another_nested", DataType::Struct(vec![
279+
Field::new("idx", DataType::UInt8, false),
280+
Field::new("no", DataType::UInt8, false),
281+
]), false),
282+
Field::new("id2", DataType::UInt16, false),
283+
]), false),
284+
Field::new("nested_b", DataType::Struct(vec![
285+
Field::new("nested_x", DataType::Struct(vec![
286+
Field::new("nested_y", DataType::Struct(vec![
287+
Field::new("desc", DataType::Utf8, false),
288+
]), false),
289+
]), false),
290+
]), false),
291+
]), false),
292+
];
293+
294+
assert_eq!(8, total_number_of_fields(&Schema::new(fields)));
295+
296+
Ok(())
297+
}
298+
299+
#[tokio::test]
300+
async fn test_total_number_of_fields_empty_struct() -> Result<()> {
301+
302+
let fields = vec![
303+
Field::new("empty_nested", DataType::Struct(vec![]), false),
304+
];
305+
306+
assert_eq!(0, total_number_of_fields(&Schema::new(fields)));
307+
308+
Ok(())
309+
}
310+
311+
#[tokio::test]
312+
async fn test_flatten_schema() -> Result<()> {
313+
let fields: Vec<Field> = vec![
314+
Field::new("id", DataType::Int16, false),
315+
Field::new("name", DataType::Utf8, false),
316+
Field::new("nested1", DataType::Struct(vec![
317+
Field::new("str1", DataType::Utf8, false),
318+
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), false),
319+
]), false),
320+
Field::new("nested2", DataType::Struct(vec![
321+
Field::new("nested_a", DataType::Struct(vec![
322+
Field::new("another_nested", DataType::Struct(vec![
323+
Field::new("idx", DataType::UInt8, false),
324+
Field::new("no", DataType::UInt8, false),
325+
]), false),
326+
Field::new("id2", DataType::UInt16, false),
327+
]), false),
328+
Field::new("nested_b", DataType::Struct(vec![
329+
Field::new("nested_x", DataType::Struct(vec![
330+
Field::new("nested_y", DataType::Struct(vec![
331+
Field::new("desc", DataType::Utf8, false),
332+
]), false),
333+
]), false),
334+
]), false),
335+
]), false),
336+
];
337+
338+
let flat_schema = flatten_schema(&Schema::new(fields));
339+
assert_eq!(8, flat_schema.len());
340+
assert_eq!(Field::new("id", DataType::Int16, false), flat_schema[0]);
341+
assert_eq!(Field::new("name", DataType::Utf8, false), flat_schema[1]);
342+
assert_eq!(Field::new("nested1.str1", DataType::Utf8, false), flat_schema[2]);
343+
assert_eq!(Field::new("nested1.ts", DataType::Timestamp(TimeUnit::Millisecond, None), false), flat_schema[3]);
344+
assert_eq!(Field::new("nested2.nested_a.another_nested.idx", DataType::UInt8, false), flat_schema[4]);
345+
assert_eq!(Field::new("nested2.nested_a.another_nested.no", DataType::UInt8, false), flat_schema[5]);
346+
assert_eq!(Field::new("nested2.nested_a.id2", DataType::UInt16, false), flat_schema[6]);
347+
assert_eq!(Field::new("nested2.nested_b.nested_x.nested_y.desc", DataType::Utf8, false), flat_schema[7]);
348+
349+
Ok(())
350+
}
351+
352+
}

0 commit comments

Comments
 (0)