From 6eadf6d8283517d94a29bd5ec9ec2e65036aada0 Mon Sep 17 00:00:00 2001 From: Alexander Bianchi Date: Sun, 15 Mar 2026 16:01:08 +0000 Subject: [PATCH 1/4] Add support for nested lists in substrait consumer --- .../src/logical_plan/consumer/expr/mod.rs | 2 + .../src/logical_plan/consumer/expr/nested.rs | 178 ++++++++++++++++++ .../consumer/substrait_consumer.rs | 8 +- .../substrait/tests/cases/logical_plans.rs | 24 +++ .../nested_list_expressions.substrait.json | 92 +++++++++ 5 files changed, 300 insertions(+), 4 deletions(-) create mode 100644 datafusion/substrait/src/logical_plan/consumer/expr/nested.rs create mode 100644 datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs index a0468dbd451b9..295456e95f9f3 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/mod.rs @@ -21,6 +21,7 @@ mod field_reference; mod function_arguments; mod if_then; mod literal; +mod nested; mod scalar_function; mod singular_or_list; mod subquery; @@ -32,6 +33,7 @@ pub use field_reference::*; pub use function_arguments::*; pub use if_then::*; pub use literal::*; +pub use nested::*; pub use scalar_function::*; pub use singular_or_list::*; pub use subquery::*; diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs b/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs new file mode 100644 index 0000000000000..cdf811d76789e --- /dev/null +++ b/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::logical_plan::consumer::SubstraitConsumer; +use datafusion::arrow::datatypes::DataType; +use datafusion::common::{DFSchema, ScalarValue, not_impl_err, substrait_err}; +use datafusion::execution::FunctionRegistry; +use datafusion::logical_expr::Expr; +use substrait::proto::expression::Nested; +use substrait::proto::expression::nested::NestedType; + +/// Converts a Substrait [Nested] expression into a DataFusion [Expr]. +/// +/// Substrait Nested expressions represent complex type constructors (list, struct, map) +/// where elements are full expressions rather than just literals. This is used by +/// producers that emit `Nested { list: ... }` for array construction, as opposed to +/// `Literal { list: ... }` which only supports scalar values. +/// +/// When all elements are literals, the result is a scalar literal (e.g. `ScalarValue::List`). +/// When elements include non-literal expressions (e.g. column references), the result uses +/// the `make_array` / `named_struct` UDFs to construct the value at execution time. +pub async fn from_nested( + consumer: &impl SubstraitConsumer, + nested: &Nested, + input_schema: &DFSchema, +) -> datafusion::common::Result { + let Some(nested_type) = &nested.nested_type else { + return substrait_err!("Nested expression requires a nested_type"); + }; + + match nested_type { + NestedType::List(list) => { + let mut exprs = Vec::with_capacity(list.values.len()); + for value in &list.values { + exprs.push(consumer.consume_expression(value, input_schema).await?); + } + + // Fast path: if all expressions are literals, construct a ScalarValue::List directly + let all_literals: Option> = exprs + .iter() + .map(|e| match e { + Expr::Literal(sv, _) => Some(sv.clone()), + _ => None, + }) + .collect(); + + if let Some(literals) = all_literals { + // For empty lists, use Null as the element type. A surrounding Cast + // in the Substrait plan can refine the type if needed. + let element_type = literals + .first() + .map(|l| l.data_type()) + .unwrap_or(DataType::Null); + let arr = ScalarValue::new_list_nullable(&literals, &element_type); + Ok(Expr::Literal(ScalarValue::List(arr), None)) + } else { + // General case: use make_array UDF for non-literal expressions + let make_array_udf = + consumer.get_function_registry().udf("make_array")?; + Ok(Expr::ScalarFunction( + datafusion::logical_expr::expr::ScalarFunction::new_udf( + make_array_udf, + exprs, + ), + )) + } + } + NestedType::Struct(_) => { + not_impl_err!("Nested struct expressions are not yet supported") + } + NestedType::Map(_) => { + not_impl_err!("Nested map expressions are not yet supported") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::logical_plan::consumer::utils::tests::test_consumer; + use datafusion::arrow::array::Array; + use substrait::proto::expression::Literal; + use substrait::proto::expression::nested::List; + use substrait::proto::{self, Expression}; + + fn make_i64_literal(value: i64) -> Expression { + Expression { + rex_type: Some(proto::expression::RexType::Literal(Literal { + nullable: false, + type_variation_reference: 0, + literal_type: Some(proto::expression::literal::LiteralType::I64(value)), + })), + } + } + + #[tokio::test] + async fn nested_list_with_literals() -> datafusion::common::Result<()> { + let consumer = test_consumer(); + let schema = DFSchema::empty(); + let nested = Nested { + nullable: false, + type_variation_reference: 0, + nested_type: Some(NestedType::List(List { + values: vec![ + make_i64_literal(1), + make_i64_literal(2), + make_i64_literal(3), + ], + })), + }; + + let expr = from_nested(&consumer, &nested, &schema).await?; + match &expr { + Expr::Literal(ScalarValue::List(arr), _) => { + assert_eq!(arr.len(), 1); + assert_eq!(arr.value_length(0), 3); + } + other => panic!("Expected Expr::Literal(List), got: {other:?}"), + } + assert_eq!(format!("{expr}"), "List([1, 2, 3])"); + + Ok(()) + } + + #[tokio::test] + async fn nested_list_empty() -> datafusion::common::Result<()> { + let consumer = test_consumer(); + let schema = DFSchema::empty(); + let nested = Nested { + nullable: true, + type_variation_reference: 0, + nested_type: Some(NestedType::List(List { values: vec![] })), + }; + + let expr = from_nested(&consumer, &nested, &schema).await?; + match &expr { + Expr::Literal(ScalarValue::List(arr), _) => { + assert_eq!(arr.len(), 1); + assert_eq!(arr.value_length(0), 0); + } + other => panic!("Expected Expr::Literal(List), got: {other:?}"), + } + assert_eq!(format!("{expr}"), "List([])"); + + Ok(()) + } + + #[tokio::test] + async fn nested_list_missing_type() -> datafusion::common::Result<()> { + let consumer = test_consumer(); + let schema = DFSchema::empty(); + let nested = Nested { + nullable: false, + type_variation_reference: 0, + nested_type: None, + }; + + let result = from_nested(&consumer, &nested, &schema).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("nested_type"),); + + Ok(()) + } +} diff --git a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs index 730ceab8ccef3..14385888a8de4 100644 --- a/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs @@ -18,7 +18,7 @@ use super::{ from_aggregate_rel, from_cast, from_cross_rel, from_exchange_rel, from_fetch_rel, from_field_reference, from_filter_rel, from_if_then, from_join_rel, from_literal, - from_project_rel, from_read_rel, from_scalar_function, from_set_rel, + from_nested, from_project_rel, from_read_rel, from_scalar_function, from_set_rel, from_singular_or_list, from_sort_rel, from_subquery, from_substrait_rel, from_substrait_rex, from_window_function, }; @@ -350,10 +350,10 @@ pub trait SubstraitConsumer: Send + Sync + Sized { async fn consume_nested( &self, - _expr: &Nested, - _input_schema: &DFSchema, + expr: &Nested, + input_schema: &DFSchema, ) -> datafusion::common::Result { - not_impl_err!("Nested expression not supported") + from_nested(self, expr, input_schema).await } async fn consume_enum( diff --git a/datafusion/substrait/tests/cases/logical_plans.rs b/datafusion/substrait/tests/cases/logical_plans.rs index 9de7cb8f3835e..87c4cf0a81831 100644 --- a/datafusion/substrait/tests/cases/logical_plans.rs +++ b/datafusion/substrait/tests/cases/logical_plans.rs @@ -270,4 +270,28 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn nested_list_expressions() -> Result<()> { + // Tests that a Substrait Nested list expression containing non-literal + // expressions (column references) uses the make_array UDF. + let proto_plan = read_json( + "tests/testdata/test_plans/nested_list_expressions.substrait.json", + ); + let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?; + let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?; + + assert_snapshot!( + plan, + @r" + Projection: make_array(DATA.a, DATA.b) AS my_list + TableScan: DATA + " + ); + + // Trigger execution to ensure plan validity + DataFrame::new(ctx.state(), plan).show().await?; + + Ok(()) + } } diff --git a/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json b/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json new file mode 100644 index 0000000000000..77d5ca126eb4c --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json @@ -0,0 +1,92 @@ +{ + "extensionUris": [ + { + "extensionUriAnchor": 1, + "uri": "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml" + } + ], + "extensions": [ + { + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "add:i32_i32" + } + } + ], + "relations": [ + { + "root": { + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [2] + } + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": ["a", "b"], + "struct": { + "types": [ + { + "i32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i32": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["DATA"] + } + } + }, + "expressions": [ + { + "nested": { + "nullable": false, + "list": { + "values": [ + { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + }, + "rootReference": {} + } + } + ] + } + } + } + ] + } + }, + "names": ["my_list"] + } + } + ] +} From ca14e75b2a35b9c22f222641d7c17246a63ac175 Mon Sep 17 00:00:00 2001 From: Alexander Bianchi Date: Sun, 15 Mar 2026 16:42:36 +0000 Subject: [PATCH 2/4] fully delegate to make_array --- .../src/logical_plan/consumer/expr/nested.rs | 71 +++++-------------- 1 file changed, 16 insertions(+), 55 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs b/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs index cdf811d76789e..dc1cc1d18b4bf 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs @@ -16,8 +16,7 @@ // under the License. use crate::logical_plan::consumer::SubstraitConsumer; -use datafusion::arrow::datatypes::DataType; -use datafusion::common::{DFSchema, ScalarValue, not_impl_err, substrait_err}; +use datafusion::common::{DFSchema, not_impl_err, substrait_err}; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::Expr; use substrait::proto::expression::Nested; @@ -29,10 +28,6 @@ use substrait::proto::expression::nested::NestedType; /// where elements are full expressions rather than just literals. This is used by /// producers that emit `Nested { list: ... }` for array construction, as opposed to /// `Literal { list: ... }` which only supports scalar values. -/// -/// When all elements are literals, the result is a scalar literal (e.g. `ScalarValue::List`). -/// When elements include non-literal expressions (e.g. column references), the result uses -/// the `make_array` / `named_struct` UDFs to construct the value at execution time. pub async fn from_nested( consumer: &impl SubstraitConsumer, nested: &Nested, @@ -44,40 +39,18 @@ pub async fn from_nested( match nested_type { NestedType::List(list) => { - let mut exprs = Vec::with_capacity(list.values.len()); + let mut args = Vec::with_capacity(list.values.len()); for value in &list.values { - exprs.push(consumer.consume_expression(value, input_schema).await?); + args.push(consumer.consume_expression(value, input_schema).await?); } - // Fast path: if all expressions are literals, construct a ScalarValue::List directly - let all_literals: Option> = exprs - .iter() - .map(|e| match e { - Expr::Literal(sv, _) => Some(sv.clone()), - _ => None, - }) - .collect(); - - if let Some(literals) = all_literals { - // For empty lists, use Null as the element type. A surrounding Cast - // in the Substrait plan can refine the type if needed. - let element_type = literals - .first() - .map(|l| l.data_type()) - .unwrap_or(DataType::Null); - let arr = ScalarValue::new_list_nullable(&literals, &element_type); - Ok(Expr::Literal(ScalarValue::List(arr), None)) - } else { - // General case: use make_array UDF for non-literal expressions - let make_array_udf = - consumer.get_function_registry().udf("make_array")?; - Ok(Expr::ScalarFunction( - datafusion::logical_expr::expr::ScalarFunction::new_udf( - make_array_udf, - exprs, - ), - )) - } + let make_array_udf = consumer.get_function_registry().udf("make_array")?; + Ok(Expr::ScalarFunction( + datafusion::logical_expr::expr::ScalarFunction::new_udf( + make_array_udf, + args, + ), + )) } NestedType::Struct(_) => { not_impl_err!("Nested struct expressions are not yet supported") @@ -92,7 +65,6 @@ pub async fn from_nested( mod tests { use super::*; use crate::logical_plan::consumer::utils::tests::test_consumer; - use datafusion::arrow::array::Array; use substrait::proto::expression::Literal; use substrait::proto::expression::nested::List; use substrait::proto::{self, Expression}; @@ -124,14 +96,10 @@ mod tests { }; let expr = from_nested(&consumer, &nested, &schema).await?; - match &expr { - Expr::Literal(ScalarValue::List(arr), _) => { - assert_eq!(arr.len(), 1); - assert_eq!(arr.value_length(0), 3); - } - other => panic!("Expected Expr::Literal(List), got: {other:?}"), - } - assert_eq!(format!("{expr}"), "List([1, 2, 3])"); + assert_eq!( + format!("{expr}"), + "make_array(Int64(1), Int64(2), Int64(3))" + ); Ok(()) } @@ -147,14 +115,7 @@ mod tests { }; let expr = from_nested(&consumer, &nested, &schema).await?; - match &expr { - Expr::Literal(ScalarValue::List(arr), _) => { - assert_eq!(arr.len(), 1); - assert_eq!(arr.value_length(0), 0); - } - other => panic!("Expected Expr::Literal(List), got: {other:?}"), - } - assert_eq!(format!("{expr}"), "List([])"); + assert_eq!(format!("{expr}"), "make_array()"); Ok(()) } @@ -171,7 +132,7 @@ mod tests { let result = from_nested(&consumer, &nested, &schema).await; assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("nested_type"),); + assert!(result.unwrap_err().to_string().contains("nested_type")); Ok(()) } From 3e5c80166b4097ec49ddd3738eb5cb35341f2cbe Mon Sep 17 00:00:00 2001 From: Alexander Bianchi Date: Sun, 15 Mar 2026 16:50:40 +0000 Subject: [PATCH 3/4] cargo fmt --- datafusion/substrait/tests/cases/logical_plans.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/substrait/tests/cases/logical_plans.rs b/datafusion/substrait/tests/cases/logical_plans.rs index 87c4cf0a81831..663a372fe2e4f 100644 --- a/datafusion/substrait/tests/cases/logical_plans.rs +++ b/datafusion/substrait/tests/cases/logical_plans.rs @@ -275,9 +275,8 @@ mod tests { async fn nested_list_expressions() -> Result<()> { // Tests that a Substrait Nested list expression containing non-literal // expressions (column references) uses the make_array UDF. - let proto_plan = read_json( - "tests/testdata/test_plans/nested_list_expressions.substrait.json", - ); + let proto_plan = + read_json("tests/testdata/test_plans/nested_list_expressions.substrait.json"); let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?; let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?; From fc7b3e850702bbf84f8d27231e68c5397cb7678c Mon Sep 17 00:00:00 2001 From: Alexander Bianchi Date: Mon, 16 Mar 2026 14:16:02 +0000 Subject: [PATCH 4/4] respond to nits and error on empty nested list --- .../src/logical_plan/consumer/expr/nested.rs | 20 +++++++++++++++---- .../nested_list_expressions.substrait.json | 15 -------------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs b/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs index dc1cc1d18b4bf..f94a701342826 100644 --- a/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs +++ b/datafusion/substrait/src/logical_plan/consumer/expr/nested.rs @@ -39,6 +39,12 @@ pub async fn from_nested( match nested_type { NestedType::List(list) => { + if list.values.is_empty() { + return substrait_err!( + "Empty Nested lists are not supported; use Literal.empty_list instead" + ); + } + let mut args = Vec::with_capacity(list.values.len()); for value in &list.values { args.push(consumer.consume_expression(value, input_schema).await?); @@ -105,7 +111,7 @@ mod tests { } #[tokio::test] - async fn nested_list_empty() -> datafusion::common::Result<()> { + async fn nested_list_empty_rejected() -> datafusion::common::Result<()> { let consumer = test_consumer(); let schema = DFSchema::empty(); let nested = Nested { @@ -114,14 +120,20 @@ mod tests { nested_type: Some(NestedType::List(List { values: vec![] })), }; - let expr = from_nested(&consumer, &nested, &schema).await?; - assert_eq!(format!("{expr}"), "make_array()"); + let result = from_nested(&consumer, &nested, &schema).await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Empty Nested lists are not supported") + ); Ok(()) } #[tokio::test] - async fn nested_list_missing_type() -> datafusion::common::Result<()> { + async fn nested_missing_type() -> datafusion::common::Result<()> { let consumer = test_consumer(); let schema = DFSchema::empty(); let nested = Nested { diff --git a/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json b/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json index 77d5ca126eb4c..85a69c41c5eb1 100644 --- a/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json +++ b/datafusion/substrait/tests/testdata/test_plans/nested_list_expressions.substrait.json @@ -1,19 +1,4 @@ { - "extensionUris": [ - { - "extensionUriAnchor": 1, - "uri": "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml" - } - ], - "extensions": [ - { - "extensionFunction": { - "extensionUriReference": 1, - "functionAnchor": 0, - "name": "add:i32_i32" - } - } - ], "relations": [ { "root": {