diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 8d2b32f39fb..4ac557f00fd 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -235,19 +235,6 @@ impl ExpressionConvertor for DefaultExpressionConvertor { return Ok(TreeNodeRecursion::Stop); } - // If the projection contains a `CastColumnExpr` that casts to physical types that don't have a 1:1 mapping - // with Vortex's types system, we make sure to pull the input from the file and keep the projection - if let Some(cast_expr) = node.as_any().downcast_ref::() - && is_dtype_incompatible(cast_expr.target_field().data_type()) - { - scan_projection.push(( - cast_expr.input_field().name().clone(), - self.convert(cast_expr.expr().as_ref())?, - )); - leftover_projection.push(projection_expr.clone()); - return Ok(TreeNodeRecursion::Stop); - } - // DataFusion assumes different decimal types can be coerced. // Vortex expects a perfect match so we don't push it down. if let Some(binary_expr) = node.as_any().downcast_ref::() @@ -430,25 +417,6 @@ fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool { ScalarFunctionExpr::try_downcast_func::(scalar_fn).is_some() } -fn is_dtype_incompatible(dt: &DataType) -> bool { - use DataType::*; - - dt.is_run_ends_type() - || is_decimal(dt) - || matches!( - dt, - Dictionary(..) - | Utf8 - | LargeUtf8 - | Binary - | LargeBinary - | FixedSizeBinary(_) - | FixedSizeList(..) - | ListView(..) - | LargeListView(..) - ) -} - // TODO(adam): Replace with `DataType::is_decimal` once its released. fn is_decimal(dt: &DataType) -> bool { matches!( diff --git a/vortex-datafusion/src/convert/mod.rs b/vortex-datafusion/src/convert/mod.rs index 51779914b28..84fcc333086 100644 --- a/vortex-datafusion/src/convert/mod.rs +++ b/vortex-datafusion/src/convert/mod.rs @@ -5,6 +5,7 @@ use vortex::error::VortexResult; pub(crate) mod exprs; mod scalars; +pub(crate) mod schema; /// First-party trait for implementing conversion from DataFusion types to Vortex types. pub(crate) trait FromDataFusion: Sized { diff --git a/vortex-datafusion/src/convert/schema.rs b/vortex-datafusion/src/convert/schema.rs new file mode 100644 index 00000000000..a6a4d6c40bf --- /dev/null +++ b/vortex-datafusion/src/convert/schema.rs @@ -0,0 +1,397 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use datafusion_common::Result as DFResult; +use datafusion_common::exec_datafusion_err; +use vortex::dtype::DType; + +/// Calculate the physical Arrow schema for a Vortex file given its DType and the expected logical schema. +/// +/// Some Arrow types don't roundtrip cleanly through Vortex's DType system: +/// - Dictionary types lose their encoding (become the value type) +/// - Utf8/LargeUtf8 become Utf8View +/// - Binary/LargeBinary become BinaryView +/// - RunEndEncoded loses its encoding +/// - Lists are even more complex, with various sizes and physical layouts that are lost +/// +/// For these types, we use the logical schema's type instead of the DType's natural Arrow +/// conversion, since Vortex's Arrow executor can produce these types when requested. +pub fn calculate_physical_schema( + dtype: &DType, + reference_logical_schema: &Schema, +) -> DFResult { + let DType::Struct(struct_dtype, _) = dtype else { + return Err(exec_datafusion_err!( + "Expected struct dtype for schema conversion" + )); + }; + + let fields: Vec = struct_dtype + .names() + .iter() + .zip(struct_dtype.fields()) + .map(|(name, field_dtype)| { + let arrow_type = match reference_logical_schema.field_with_name(name.as_ref()).ok() { + Some(logical_field) => { + calculate_physical_field_type(&field_dtype, logical_field.data_type())? + } + None => { + // Field not in logical schema, use default conversion + field_dtype.to_arrow_dtype().map_err(|e| { + exec_datafusion_err!("Failed to convert dtype to arrow: {e}") + })? + } + }; + + Ok(Field::new( + name.to_string(), + arrow_type, + field_dtype.is_nullable(), + )) + }) + .collect::>>()?; + + Ok(Schema::new(fields)) +} + +/// Calculate the physical Arrow type for a field, preferring the logical type when the +/// DType doesn't roundtrip cleanly. +fn calculate_physical_field_type(dtype: &DType, logical_type: &DataType) -> DFResult { + // Check if the logical type is one that doesn't roundtrip through DType + Ok(match logical_type { + // Dictionary types lose their encoding when converted to DType + DataType::Dictionary(..) => logical_type.clone(), + + // Non-view string/binary types become view types after roundtrip + DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => { + if dtype.is_binary() || dtype.is_utf8() { + logical_type.clone() + } else { + return Err(exec_datafusion_err!( + "Failed to convert dtype to arrow: Vortex DType is {dtype} which is not compatible with {logical_type}" + )); + } + } + + // RunEndEncoded loses its encoding + DataType::RunEndEncoded(..) => logical_type.clone(), + + // For struct types, recursively check each field + DataType::Struct(logical_fields) => { + if let DType::Struct(struct_dtype, _) = dtype { + let physical_fields: Vec = struct_dtype + .names() + .iter() + .zip(struct_dtype.fields()) + .map(|(name, field_dtype)| { + let arrow_type = + match logical_fields.iter().find(|f| f.name() == name.as_ref()) { + Some(logical_field) => calculate_physical_field_type( + &field_dtype, + logical_field.data_type(), + )?, + None => field_dtype.to_arrow_dtype().map_err(|e| { + exec_datafusion_err!("Failed to convert dtype to arrow: {e}") + })?, + }; + + Ok(Field::new( + name.to_string(), + arrow_type, + field_dtype.is_nullable(), + )) + }) + .collect::>>()?; + + DataType::Struct(physical_fields.into()) + } else { + return Err(exec_datafusion_err!( + "Failed to convert dtype to arrow: Vortex DType is {dtype} which is not compatible with {logical_type}" + )); + } + } + + // For list types, recursively check the element type + DataType::List(logical_elem) | DataType::LargeList(logical_elem) => { + if let DType::List(elem_dtype, _) = dtype { + let physical_elem_type = + calculate_physical_field_type(elem_dtype, logical_elem.data_type())?; + let physical_field = Field::new( + logical_elem.name(), + physical_elem_type, + logical_elem.is_nullable(), + ); + match logical_type { + DataType::List(_) => DataType::List(physical_field.into()), + DataType::LargeList(_) => DataType::LargeList(physical_field.into()), + _ => unreachable!(), + } + } else { + return Err(exec_datafusion_err!( + "Failed to convert dtype to arrow: Vortex DType is {dtype} which is not compatible with {logical_type}" + )); + } + } + + // For fixed-size list types, recursively check the element type + DataType::FixedSizeList(logical_elem, size) => { + if let DType::FixedSizeList(elem_dtype, ..) = dtype { + let physical_elem_type = + calculate_physical_field_type(elem_dtype, logical_elem.data_type())?; + let physical_field = Field::new( + logical_elem.name(), + physical_elem_type, + logical_elem.is_nullable(), + ); + DataType::FixedSizeList(physical_field.into(), *size) + } else { + return Err(exec_datafusion_err!( + "Failed to convert dtype to arrow: Vortex DType is {dtype} which is not compatible with {logical_type}" + )); + } + } + + // For list view types, recursively check the element type + DataType::ListView(logical_elem) | DataType::LargeListView(logical_elem) => { + if let DType::List(elem_dtype, _) = dtype { + let physical_elem_type = + calculate_physical_field_type(elem_dtype, logical_elem.data_type())?; + let physical_field = Field::new( + logical_elem.name(), + physical_elem_type, + logical_elem.is_nullable(), + ); + match logical_type { + DataType::ListView(_) => DataType::ListView(physical_field.into()), + DataType::LargeListView(_) => DataType::LargeListView(physical_field.into()), + _ => unreachable!(), + } + } else { + return Err(exec_datafusion_err!( + "Failed to convert dtype to arrow: Vortex DType is {dtype} which is not compatible with {logical_type}" + )); + } + } + // All other types roundtrip cleanly, use the DType's natural conversion + _ => dtype + .to_arrow_dtype() + .map_err(|e| exec_datafusion_err!("Failed to convert dtype to arrow: {e}"))?, + }) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_schema::Fields; + use vortex::dtype::Nullability; + use vortex::dtype::PType; + use vortex::dtype::StructFields; + + use super::*; + + #[test] + fn test_dict_conversion() { + // Dictionary types lose their encoding when converted to DType, but we should + // preserve the original logical type in the physical schema. + let logical_schema = Schema::new(vec![Field::new( + "dict_col", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + )]); + + // Vortex DType for dictionary is just the value type (Utf8) + let dtype = DType::Struct( + StructFields::from_iter([("dict_col", DType::Utf8(Nullability::Nullable))]), + Nullability::NonNullable, + ); + + let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + + // Should preserve the dictionary type from the logical schema + assert_eq!( + physical_schema.field(0).data_type(), + &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + ); + } + + #[test] + fn test_utf8_variants_preserved() { + // Non-view string types become view types after roundtrip through DType, + // but we should preserve the original logical type. + let logical_schema = Schema::new(vec![ + Field::new("utf8_col", DataType::Utf8, false), + Field::new("large_utf8_col", DataType::LargeUtf8, true), + Field::new("binary_col", DataType::Binary, false), + Field::new("large_binary_col", DataType::LargeBinary, true), + ]); + + let dtype = DType::Struct( + StructFields::from_iter([ + ("utf8_col", DType::Utf8(Nullability::NonNullable)), + ("large_utf8_col", DType::Utf8(Nullability::Nullable)), + ("binary_col", DType::Binary(Nullability::NonNullable)), + ("large_binary_col", DType::Binary(Nullability::Nullable)), + ]), + Nullability::NonNullable, + ); + + let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + + assert_eq!(physical_schema.field(0).data_type(), &DataType::Utf8); + assert_eq!(physical_schema.field(1).data_type(), &DataType::LargeUtf8); + assert_eq!(physical_schema.field(2).data_type(), &DataType::Binary); + assert_eq!(physical_schema.field(3).data_type(), &DataType::LargeBinary); + } + + #[test] + fn test_failing_conversion_incompatible_types() { + let logical_schema = Schema::new(vec![Field::new("col", DataType::Utf8, false)]); + + let dtype = DType::Struct( + StructFields::from_iter([( + "col", + DType::Primitive(PType::I32, Nullability::NonNullable), + )]), + Nullability::NonNullable, + ); + + let result = calculate_physical_schema(&dtype, &logical_schema); + assert!( + result + .unwrap_err() + .to_string() + .contains("not compatible with") + ); + + // Test struct vs non-struct mismatch + let logical_schema = Schema::new(vec![Field::new( + "col", + DataType::Struct(Fields::empty()), + false, + )]); + + let dtype = DType::Struct( + StructFields::from_iter([("col", DType::Utf8(Nullability::NonNullable))]), + Nullability::NonNullable, + ); + + let result = calculate_physical_schema(&dtype, &logical_schema); + assert!( + result + .unwrap_err() + .to_string() + .contains("not compatible with") + ); + } + + #[test] + fn test_nested_struct_conversion() { + let logical_schema = Schema::new(vec![ + Field::new( + "outer_col", + DataType::Struct(Fields::from(vec![ + Field::new("inner_utf8", DataType::Utf8, false), + Field::new("inner_int", DataType::Int64, true), + ])), + true, + ), + Field::new("simple_col", DataType::Int32, false), + ]); + + let dtype = DType::Struct( + StructFields::from_iter([ + ( + "outer_col", + DType::Struct( + StructFields::from_iter([ + ("inner_utf8", DType::Utf8(Nullability::NonNullable)), + ( + "inner_int", + DType::Primitive(PType::I64, Nullability::Nullable), + ), + ]), + Nullability::Nullable, + ), + ), + ( + "simple_col", + DType::Primitive(PType::I32, Nullability::NonNullable), + ), + ]), + Nullability::NonNullable, + ); + + let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + + // Check outer structure + assert_eq!(physical_schema.fields().len(), 2); + + // Check nested struct preserves Utf8 (not Utf8View) + let outer_field = physical_schema.field(0); + if let DataType::Struct(inner_fields) = outer_field.data_type() { + assert_eq!(inner_fields.len(), 2); + assert_eq!(inner_fields[0].data_type(), &DataType::Utf8); + assert_eq!(inner_fields[1].data_type(), &DataType::Int64); + } else { + panic!("Expected struct type for outer_col"); + } + } + + #[test] + fn test_list_with_dict_elements() { + // Test that list types with dictionary elements preserve the dictionary type + let inner_field = Field::new( + "item", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ); + let logical_schema = Schema::new(vec![Field::new( + "list_col", + DataType::List(Arc::new(inner_field)), + true, + )]); + + let dtype = DType::Struct( + StructFields::from_iter([( + "list_col", + DType::List( + Arc::new(DType::Utf8(Nullability::Nullable)), + Nullability::Nullable, + ), + )]), + Nullability::NonNullable, + ); + + let physical_schema = calculate_physical_schema(&dtype, &logical_schema).unwrap(); + + if let DataType::List(elem_field) = physical_schema.field(0).data_type() { + assert_eq!( + elem_field.data_type(), + &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)) + ); + } else { + panic!("Expected list type"); + } + } + + #[test] + fn test_non_struct_dtype_error() { + // Test that non-struct DType produces an error + let logical_schema = Schema::new(vec![Field::new("col", DataType::Int32, false)]); + + let dtype = DType::Primitive(PType::I32, Nullability::NonNullable); + + let result = calculate_physical_schema(&dtype, &logical_schema); + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Expected struct dtype") + ); + } +} diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index c6c2badc528..ae6d3578a33 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -47,6 +47,7 @@ use crate::VortexAccessPlan; use crate::convert::exprs::ExpressionConvertor; use crate::convert::exprs::ProcessedProjection; use crate::convert::exprs::make_vortex_predicate; +use crate::convert::schema::calculate_physical_schema; use crate::persistent::stream::PrunableStream; #[derive(Clone)] @@ -98,8 +99,7 @@ impl FileOpener for VortexOpener { let expr_adapter_factory = self.expr_adapter_factory.clone(); let file_cache = self.file_cache.clone(); - let table_schema = self.table_schema.clone(); - let logical_file_schema = table_schema.file_schema().clone(); + let unified_file_schema = self.table_schema.file_schema().clone(); let batch_size = self.batch_size; let limit = self.limit; let metrics = self.metrics.clone(); @@ -110,7 +110,8 @@ impl FileOpener for VortexOpener { // Replace column access for partition columns with literals #[allow(clippy::disallowed_types)] - let literal_value_cols = table_schema + let literal_value_cols = self + .table_schema .table_partition_cols() .iter() .map(|f| f.name()) @@ -142,7 +143,7 @@ impl FileOpener for VortexOpener { .and_then(|predicate| { FilePruner::try_new( predicate.clone(), - &logical_file_schema, + &unified_file_schema, &file, Count::default(), ) @@ -161,18 +162,21 @@ impl FileOpener for VortexOpener { .await .map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?; - let physical_file_schema = Arc::new(vxf.dtype().to_arrow_schema().map_err(|e| { - exec_datafusion_err!("Failed to convert file schema to arrow: {e}") - })?); + // This is the expected arrow types of the actual columns in the file, which might have different types + // from the unified logical schema or miss + let this_file_schema = Arc::new(calculate_physical_schema( + vxf.dtype(), + &unified_file_schema, + )?); - let projected_physical_schema = projection.project_schema(&logical_file_schema)?; + let projected_physical_schema = projection.project_schema(&unified_file_schema)?; let expr_adapter = expr_adapter_factory.create( - Arc::clone(&logical_file_schema), - Arc::clone(&physical_file_schema), + Arc::clone(&unified_file_schema), + Arc::clone(&this_file_schema), ); - let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); + let simplifier = PhysicalExprSimplifier::new(&this_file_schema); // The adapter rewrites the expressions to the local file schema, allowing // for schema evolution and divergence between the table's schema and individual files. @@ -191,17 +195,16 @@ impl FileOpener for VortexOpener { leftover_projection, } = expr_convertor.split_projection( projection, - &physical_file_schema, + &this_file_schema, &projected_physical_schema, )?; // The schema of the stream returned from the vortex scan. + // We use the physical_file_schema as reference for types that don't roundtrip. let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| { exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan") })?; - let stream_schema = scan_dtype.to_arrow_schema().map_err(|_e| { - exec_datafusion_err!("Couldn't get the schema for the underlying Vortex scan") - })?; + let stream_schema = calculate_physical_schema(&scan_dtype, &projected_physical_schema)?; let leftover_projection = leftover_projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; @@ -264,7 +267,7 @@ impl FileOpener for VortexOpener { .into_iter() .cloned() .partition(|expr| { - expr_convertor.can_be_pushed_down(expr, &physical_file_schema) + expr_convertor.can_be_pushed_down(expr, &this_file_schema) }); if !unpushed.is_empty() { @@ -384,11 +387,13 @@ mod tests { use arrow_schema::Field; use arrow_schema::Fields; use arrow_schema::SchemaRef; + use datafusion::arrow::array::DictionaryArray; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::array::StringArray; use datafusion::arrow::array::StructArray; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::datatypes::Schema; + use datafusion::arrow::datatypes::UInt32Type; use datafusion::arrow::util::display::FormatOptions; use datafusion::arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::record_batch; @@ -1103,4 +1108,52 @@ mod tests { Ok(()) } + + /// When a Struct contains Dictionary fields, writing to vortex and reading back + /// should preserve the Dictionary type. + #[tokio::test] + async fn test_struct_with_dictionary_roundtrip() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + + let struct_fields = Fields::from(vec![ + Field::new_dictionary("a", DataType::UInt32, DataType::Utf8, true), + Field::new_dictionary("b", DataType::UInt32, DataType::Utf8, true), + ]); + let struct_array = StructArray::new( + struct_fields.clone(), + vec![ + Arc::new(DictionaryArray::::from_iter(["x", "y", "x"])), + Arc::new(DictionaryArray::::from_iter(["p", "p", "q"])), + ], + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "labels", + DataType::Struct(struct_fields.clone()), + false, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)])?; + + let file_path = "/test.vortex"; + let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?; + + let opener = make_test_opener( + object_store.clone(), + schema.clone(), + ProjectionExprs::from_indices(&[0], &schema), + ); + let data: Vec<_> = opener + .open(PartitionedFile::new(file_path.to_string(), data_size))? + .await? + .try_collect() + .await?; + + assert_eq!( + data[0].schema().field(0).data_type(), + &DataType::Struct(struct_fields), + "Struct(Dictionary) type should be preserved" + ); + Ok(()) + } } diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index f088b023f96..0848b0aa7dc 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -14,18 +14,22 @@ use std::sync::LazyLock; use arrow_schema::DataType; use arrow_schema::Field; +use arrow_schema::Fields; use arrow_schema::Schema; use datafusion::arrow::array::Array; use datafusion::arrow::array::ArrayRef as ArrowArrayRef; +use datafusion::arrow::array::DictionaryArray; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::array::StructArray; +use datafusion::arrow::datatypes::UInt16Type; +use datafusion::arrow::datatypes::UInt32Type; +use datafusion::assert_batches_sorted_eq; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::listing::ListingTable; use datafusion::datasource::listing::ListingTableConfig; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; use datafusion_common::assert_batches_eq; -use datafusion_common::assert_batches_sorted_eq; use datafusion_common::create_array; use datafusion_common::record_batch; use datafusion_datasource::ListingTableUrl; @@ -336,7 +340,7 @@ async fn test_filter_schema_evolution_struct_fields() { let table = ListingTable::try_new( ListingTableConfig::new(table_url) .with_listing_options(list_opts) - .with_schema(read_schema.clone()), // .with_expr_adapter_factory(Arc::new(DF52PhysicalExprAdapterFactory)), + .with_schema(read_schema.clone()), ) .unwrap(); @@ -402,3 +406,123 @@ async fn test_filter_schema_evolution_struct_fields() { &filtered_scan ); } + +#[tokio::test] +async fn test_schema_evolution_struct_of_dict() -> anyhow::Result<()> { + let (ctx, store) = make_session_ctx(); + + // First file + let struct_fields = Fields::from(vec![ + Field::new_dictionary("a", DataType::UInt16, DataType::Utf8, true), + Field::new_dictionary("b", DataType::UInt16, DataType::Utf8, true), + ]); + let struct_array = StructArray::new( + struct_fields.clone(), + vec![ + Arc::new(DictionaryArray::::from_iter(["x1", "y1", "x1"])), + Arc::new(DictionaryArray::::from_iter(["p1", "p1", "q1"])), + ], + None, + ); + + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + "my_struct", + DataType::Struct(struct_fields), + true, + )])), + vec![Arc::new(struct_array)], + )?; + + write_file(&store, "files/file1.vortex", &batch).await; + + // Second file + let struct_fields = Fields::from(vec![ + Field::new_dictionary("a", DataType::UInt32, DataType::Utf8, true), + Field::new_dictionary("b", DataType::UInt32, DataType::Utf8, true), + Field::new_dictionary("c", DataType::UInt32, DataType::Utf8, true), + ]); + let struct_array = StructArray::new( + struct_fields.clone(), + vec![ + Arc::new(DictionaryArray::::from_iter(["x2", "y2", "x2"])), + Arc::new(DictionaryArray::::from_iter(["p2", "p2", "q2"])), + Arc::new(DictionaryArray::::from_iter(["a2", "b2", "c2"])), + ], + None, + ); + + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + "my_struct", + DataType::Struct(struct_fields.clone()), + true, + )])), + vec![Arc::new(struct_array)], + )?; + + write_file(&store, "files/file2.vortex", &batch).await; + + let read_schema = batch.schema(); + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + let table = Arc::new(ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .with_schema(read_schema.clone()), + )?); + + let df = ctx.read_table(table.clone()).unwrap(); + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + assert_eq!(table_schema.as_ref(), read_schema.as_ref()); + + let full_scan = df.collect().await.unwrap(); + + assert_batches_sorted_eq!( + &[ + "+-----------------------+", + "| my_struct |", + "+-----------------------+", + "| {a: x1, b: p1, c: } |", + "| {a: x1, b: q1, c: } |", + "| {a: x2, b: p2, c: a2} |", + "| {a: x2, b: q2, c: c2} |", + "| {a: y1, b: p1, c: } |", + "| {a: y2, b: p2, c: b2} |", + "+-----------------------+", + ], + &full_scan + ); + + let filter = + get_field(col("my_struct"), "a") + .eq(lit("x1")) + .or(get_field(col("my_struct"), "a").eq(lit("x2"))); + // run a filter that touches both the payload.uptime AND the payload.instance nested fields + let df = ctx.read_table(table.clone())?; + let filtered_scan = df.filter(filter)?.collect().await?; + + assert_eq!(filtered_scan[0].schema(), read_schema); + + assert_batches_sorted_eq!( + &[ + "+-----------------------+", + "| my_struct |", + "+-----------------------+", + "| {a: x1, b: p1, c: } |", + "| {a: x1, b: q1, c: } |", + "| {a: x2, b: p2, c: a2} |", + "| {a: x2, b: q2, c: c2} |", + "+-----------------------+", + ], + &filtered_scan + ); + + Ok(()) +}