From 3414105180dda1034ca4240f90cfbe4bd5e1df42 Mon Sep 17 00:00:00 2001 From: Subham Singhal Date: Tue, 5 May 2026 10:33:47 +0530 Subject: [PATCH] feat: implement retract_batch for array_agg sliding window support --- .../functions-aggregate/src/array_agg.rs | 281 +++++++++++++++++- .../test_files/array_agg_sliding_window.slt | 185 ++++++++++++ 2 files changed, 455 insertions(+), 11 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/array_agg_sliding_window.slt diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 861d7712ba1b0..12254fd6a5c88 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -256,18 +256,22 @@ impl AggregateUDFImpl for ArrayAgg { #[derive(Debug)] pub struct ArrayAggAccumulator { - values: Vec, + values: VecDeque, datatype: DataType, ignore_nulls: bool, + /// Number of elements already consumed (retracted) from the front array. + /// Used by sliding window frames to avoid copying on partial retract. + front_offset: usize, } impl ArrayAggAccumulator { /// new array_agg accumulator based on given item data type pub fn try_new(datatype: &DataType, ignore_nulls: bool) -> Result { Ok(Self { - values: vec![], + values: VecDeque::new(), datatype: datatype.clone(), ignore_nulls, + front_offset: 0, }) } @@ -356,7 +360,7 @@ impl Accumulator for ArrayAggAccumulator { }; if !val.is_empty() { - self.values.push(val) + self.values.push_back(val) } Ok(()) @@ -376,12 +380,12 @@ impl Accumulator for ArrayAggAccumulator { Some(values) => { // Make sure we don't insert empty lists if !values.is_empty() { - self.values.push(values); + self.values.push_back(values); } } None => { for arr in list_arr.iter().flatten() { - self.values.push(arr); + self.values.push_back(arr); } } } @@ -394,19 +398,71 @@ impl Accumulator for ArrayAggAccumulator { } fn evaluate(&mut self) -> Result { - // Transform Vec to ListArr - let element_arrays: Vec<&dyn Array> = - self.values.iter().map(|a| a.as_ref()).collect(); + if self.values.is_empty() { + return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); + } + + let element_arrays: Vec = self + .values + .iter() + .enumerate() + .map(|(i, a)| { + if i == 0 && self.front_offset > 0 { + a.slice(self.front_offset, a.len() - self.front_offset) + } else { + Arc::clone(a) + } + }) + .collect(); - if element_arrays.is_empty() { + let element_refs: Vec<&dyn Array> = + element_arrays.iter().map(|a| a.as_ref()).collect(); + + if element_refs.iter().all(|a| a.is_empty()) { return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); } - let concated_array = arrow::compute::concat(&element_arrays)?; + let concated_array = arrow::compute::concat(&element_refs)?; Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar()) } + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + assert_eq_or_internal_err!(values.len(), 1, "expects single batch"); + + let val = &values[0]; + let mut to_retract = if self.ignore_nulls { + val.len() - val.null_count() + } else { + val.len() + }; + + while to_retract > 0 { + let Some(front) = self.values.front() else { + break; + }; + let available = front.len() - self.front_offset; + if to_retract >= available { + self.values.pop_front(); + to_retract -= available; + self.front_offset = 0; + } else { + self.front_offset += to_retract; + to_retract = 0; + } + } + + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } + fn size(&self) -> usize { size_of_val(self) + (size_of::() * self.values.capacity()) @@ -1415,7 +1471,7 @@ mod tests { acc2.update_batch(&[data(["b", "c", "a"])])?; acc1 = merge(acc1, acc2)?; - assert_eq!(acc1.size(), 266); + assert_eq!(acc1.size(), 282); Ok(()) } @@ -1935,4 +1991,207 @@ mod tests { Ok(()) } + + // ---- retract_batch tests ---- + + #[test] + fn retract_basic_sliding_window() -> Result<()> { + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + + // Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D] + // Row 1: frame = [A] + acc.update_batch(&[data(["A"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A"]); + + // Row 2: frame = [A, B] + acc.update_batch(&[data(["B"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]); + + // Row 3: frame = [B, C] — A leaves + acc.update_batch(&[data(["C"])])?; + acc.retract_batch(&[data(["A"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C"]); + + // Row 4: frame = [C, D] — B leaves + acc.update_batch(&[data(["D"])])?; + acc.retract_batch(&[data(["B"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C", "D"]); + + Ok(()) + } + + #[test] + fn retract_multi_element_across_arrays() -> Result<()> { + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + + // First batch: 3 elements + acc.update_batch(&[data(["A", "B", "C"])])?; + // Second batch: 1 element + acc.update_batch(&[data(["D"])])?; + + assert_eq!( + print_nulls(str_arr(acc.evaluate()?)?), + vec!["A", "B", "C", "D"] + ); + + // Partial retract from front array: A leaves + acc.retract_batch(&[data(["A"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]); + + // Retract spanning two arrays: B, C (rest of first array) + D (second array) + acc.retract_batch(&[data(["B", "C", "D"])])?; + let result = acc.evaluate()?; + assert!( + matches!(&result, ScalarValue::List(arr) if arr.is_null(0)), + "expected null list after full retract, got {result:?}" + ); + + Ok(()) + } + + #[test] + fn retract_with_nulls_preserved() -> Result<()> { + // ignore_nulls = false: NULLs are stored and counted for retract + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + + acc.update_batch(&[data([Some("A"), None, Some("C")])])?; + assert_eq!( + print_nulls(str_arr(acc.evaluate()?)?), + vec!["A", "NULL", "C"] + ); + + // Retract 2 elements: A and NULL both leave + acc.retract_batch(&[data([Some("A"), None])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]); + + Ok(()) + } + + #[test] + fn retract_with_ignore_nulls() -> Result<()> { + // ignore_nulls = true: NULLs are NOT stored by update_batch, + // so retract must only count non-null values + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?; + + // update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered) + acc.update_batch(&[data([Some("A"), None, Some("C")])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "C"]); + + // retract_batch receives the original values including NULL: [A, NULL] + // But only 1 non-null value (A) should be retracted + acc.retract_batch(&[data([Some("A"), None])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["C"]); + + // retract_batch with [NULL, C] — only C (1 non-null) retracted + acc.retract_batch(&[data([None, Some("C")])])?; + let result = acc.evaluate()?; + assert!( + matches!(&result, ScalarValue::List(arr) if arr.is_null(0)), + "expected null list after full retract, got {result:?}" + ); + + Ok(()) + } + + #[test] + fn retract_ignore_nulls_all_nulls_batch() -> Result<()> { + // When ignore_nulls = true and retract batch is all NULLs, nothing is retracted + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?; + + acc.update_batch(&[data([Some("A"), Some("B")])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]); + + // Retract batch of all NULLs: to_retract = 0, nothing changes + acc.retract_batch(&[data::, 3>([None, None, None])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]); + + Ok(()) + } + + #[test] + fn retract_empty_accumulator() -> Result<()> { + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + + // Retract on empty accumulator should be a no-op + acc.retract_batch(&[data(["A"])])?; + let result = acc.evaluate()?; + assert!( + matches!(&result, ScalarValue::List(arr) if arr.is_null(0)), + "expected null list for empty accumulator, got {result:?}" + ); + + Ok(()) + } + + #[test] + fn retract_front_offset_partial_consume() -> Result<()> { + // Reproduces the RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING scenario: + // ts: 1, 2, 3, 4, 100 + // + // Row 1 (ts=1): update [A,B,C] (3 elements, ts in [-1,3]) + // Row 2 (ts=2): update [D] (ts=4 enters) + // Row 3 (ts=3): no change (same frame [0..4)) + // Row 4 (ts=4): retract [A] (ts=1 leaves, partial consume) + // Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays) + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + + // Row 1: update_batch(["A","B","C"]) + acc.update_batch(&[data(["A", "B", "C"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B", "C"]); + + // Row 2: update_batch(["D"]) + acc.update_batch(&[data(["D"])])?; + assert_eq!( + print_nulls(str_arr(acc.evaluate()?)?), + vec!["A", "B", "C", "D"] + ); + + // Row 4: retract_batch(["A"]) — partial consume, front_offset = 1 + acc.retract_batch(&[data(["A"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["B", "C", "D"]); + + // Row 5: update_batch(["E"]), then retract_batch(["B","C","D"]) + // retract spans: ["A","B","C"] (offset=1, 2 remaining) + ["D"] (1 element) + acc.update_batch(&[data(["E"])])?; + acc.retract_batch(&[data(["B", "C", "D"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["E"]); + + Ok(()) + } + + #[test] + fn retract_update_after_full_drain() -> Result<()> { + // Verify accumulator works correctly after being fully drained + let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + + acc.update_batch(&[data(["A", "B"])])?; + acc.retract_batch(&[data(["A", "B"])])?; + + // Accumulator is empty now + let result = acc.evaluate()?; + assert!( + matches!(&result, ScalarValue::List(arr) if arr.is_null(0)), + "expected null list, got {result:?}" + ); + + // New values should work normally after drain + acc.update_batch(&[data(["X", "Y"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["X", "Y"]); + + acc.retract_batch(&[data(["X"])])?; + assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["Y"]); + + Ok(()) + } + + #[test] + fn retract_supports_retract_batch() -> Result<()> { + let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + assert!(acc.supports_retract_batch()); + + let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?; + assert!(acc_ignore.supports_retract_batch()); + + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt b/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt new file mode 100644 index 0000000000000..78d48513a6656 --- /dev/null +++ b/datafusion/sqllogictest/test_files/array_agg_sliding_window.slt @@ -0,0 +1,185 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +####### +# Tests for array_agg with sliding (bounded) window frames. +# Validates the retract_batch implementation on ArrayAggAccumulator. +####### + +# Setup test data +statement ok +CREATE TABLE t(ts INT, val TEXT) AS VALUES + (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (5, 'E'); + +# Basic ROWS sliding window +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +FROM t; +---- +[A] +[A, B] +[B, C] +[C, D] +[D, E] + +# Wider ROWS frame +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) +FROM t; +---- +[A] +[A, B] +[A, B, C] +[B, C, D] +[C, D, E] + +# ROWS with FOLLOWING +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM t; +---- +[A, B] +[A, B, C] +[B, C, D] +[C, D, E] +[D, E] + +# Regression: unbounded frame still works +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) +FROM t; +---- +[A] +[A, B] +[A, B, C] +[A, B, C, D] +[A, B, C, D, E] + +# Setup data with NULLs +statement ok +CREATE TABLE t_nulls(ts INT, val TEXT) AS VALUES + (1, 'A'), (2, NULL), (3, 'C'), (4, NULL), (5, 'E'); + +# Sliding window with NULLs (nulls preserved) +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +FROM t_nulls; +---- +[A] +[A, NULL] +[NULL, C] +[C, NULL] +[NULL, E] + +# Setup data with value gaps for RANGE frame +statement ok +CREATE TABLE t_range(ts INT, val TEXT) AS VALUES + (1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'), (100, 'E'); + +# RANGE frame with value gap causes multi-element retract +query ? +SELECT array_agg(val) OVER (ORDER BY ts RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING) +FROM t_range; +---- +[A, B, C] +[A, B, C, D] +[A, B, C, D] +[B, C, D] +[E] + +# Single-row frame +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN CURRENT ROW AND CURRENT ROW) +FROM t; +---- +[A] +[B] +[C] +[D] +[E] + +# Integer values in sliding window +statement ok +CREATE TABLE t_int(ts INT, val INT) AS VALUES + (1, 10), (2, 20), (3, 30), (4, 40), (5, 50); + +query ? +SELECT array_agg(val) OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +FROM t_int; +---- +[10] +[10, 20] +[20, 30] +[30, 40] +[40, 50] + +# Setup data with duplicate sort keys for GROUPS frame +statement ok +CREATE TABLE t_groups(ts INT, val TEXT) AS VALUES + (1, 'A'), (1, 'B'), (2, 'C'), (2, 'D'), (3, 'E'); + +# GROUPS frame: each "group" = rows with same ORDER BY value +# Group 0: ts=1 (A,B), Group 1: ts=2 (C,D), Group 2: ts=3 (E) +# Frame: 1 PRECEDING group + CURRENT group +query ? +SELECT array_agg(val) OVER (ORDER BY ts GROUPS BETWEEN 1 PRECEDING AND CURRENT ROW) +FROM t_groups; +---- +[A, B] +[A, B] +[A, B, C, D] +[A, B, C, D] +[C, D, E] + +# IGNORE NULLS: nulls are filtered out before storage and retract +# t_nulls has: (1,'A'), (2,NULL), (3,'C'), (4,NULL), (5,'E') +query ? +SELECT array_agg(val) IGNORE NULLS OVER (ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) +FROM t_nulls; +---- +[A] +[A] +[C] +[C] +[E] + +# IGNORE NULLS with wider frame +query ? +SELECT array_agg(val) IGNORE NULLS OVER (ORDER BY ts ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) +FROM t_nulls; +---- +[A] +[A] +[A, C] +[C] +[C, E] + +# Cleanup +statement ok +DROP TABLE t; + +statement ok +DROP TABLE t_nulls; + +statement ok +DROP TABLE t_range; + +statement ok +DROP TABLE t_int; + +statement ok +DROP TABLE t_groups; \ No newline at end of file