-
Notifications
You must be signed in to change notification settings - Fork 148
feat: Add FFI_TableProviderFactory support #1396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,43 @@ | ||||||
| # Licensed to the Apache Software Foundation (ASF) under one | ||||||
| # or more contributor license agreements. See the NOTICE file | ||||||
| # distributed with this work for additional information | ||||||
| # regarding copyright ownership. The ASF licenses this file | ||||||
| # to you under the Apache License, Version 2.0 (the | ||||||
| # "License"); you may not use this file except in compliance | ||||||
| # with the License. You may obtain a copy of the License at | ||||||
| # | ||||||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||||||
| # | ||||||
| # Unless required by applicable law or agreed to in writing, | ||||||
| # software distributed under the License is distributed on an | ||||||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
| # KIND, either express or implied. See the License for the | ||||||
| # specific language governing permissions and limitations | ||||||
| # under the License. | ||||||
|
|
||||||
| from __future__ import annotations | ||||||
|
|
||||||
| import pyarrow as pa | ||||||
| import pytest | ||||||
| from datafusion import SessionContext | ||||||
| from datafusion_ffi_example import MyTableProviderFactory | ||||||
|
|
||||||
|
|
||||||
| def test_table_provider_factory_ffi() -> None: | ||||||
| ctx = SessionContext() | ||||||
| table = MyTableProviderFactory() | ||||||
|
|
||||||
| ctx.register_table_factory("MY_FORMAT", table) | ||||||
|
|
||||||
| # Create a new external table | ||||||
| ctx.sql(""" | ||||||
| CREATE EXTERNAL TABLE | ||||||
| foo | ||||||
| STORED AS my_format | ||||||
| LOCATION ''; | ||||||
| """) | ||||||
|
||||||
| """) | |
| """).collect() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use async_trait::async_trait; | ||
| use datafusion_catalog::{Session, TableProvider, TableProviderFactory}; | ||
| use datafusion_common::error::Result as DataFusionResult; | ||
| use datafusion_expr::CreateExternalTable; | ||
| use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory; | ||
| use pyo3::types::PyCapsule; | ||
| use pyo3::{Bound, PyAny, PyResult, Python, pyclass, pymethods}; | ||
|
|
||
| use crate::catalog_provider; | ||
| use crate::utils::ffi_logical_codec_from_pycapsule; | ||
|
|
||
| #[derive(Debug)] | ||
| pub(crate) struct ExampleTableProviderFactory {} | ||
|
|
||
| impl ExampleTableProviderFactory { | ||
| fn new() -> Self { | ||
| Self {} | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl TableProviderFactory for ExampleTableProviderFactory { | ||
| async fn create( | ||
| &self, | ||
| _state: &dyn Session, | ||
| _cmd: &CreateExternalTable, | ||
| ) -> DataFusionResult<Arc<dyn TableProvider>> { | ||
| Ok(catalog_provider::my_table()) | ||
| } | ||
| } | ||
|
|
||
| #[pyclass( | ||
| name = "MyTableProviderFactory", | ||
| module = "datafusion_ffi_example", | ||
| subclass | ||
| )] | ||
| #[derive(Debug)] | ||
| pub struct MyTableProviderFactory { | ||
| inner: Arc<ExampleTableProviderFactory>, | ||
| } | ||
|
|
||
| impl Default for MyTableProviderFactory { | ||
| fn default() -> Self { | ||
| let inner = Arc::new(ExampleTableProviderFactory::new()); | ||
| Self { inner } | ||
| } | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl MyTableProviderFactory { | ||
| #[new] | ||
| pub fn new() -> Self { | ||
| Self::default() | ||
| } | ||
|
|
||
| pub fn __datafusion_table_provider_factory__<'py>( | ||
| &self, | ||
| py: Python<'py>, | ||
| codec: Bound<PyAny>, | ||
| ) -> PyResult<Bound<'py, PyCapsule>> { | ||
| let name = cr"datafusion_table_provider_factory".into(); | ||
| let codec = ffi_logical_codec_from_pycapsule(codec)?; | ||
| let factory = Arc::clone(&self.inner) as Arc<dyn TableProviderFactory + Send>; | ||
| let factory = FFI_TableProviderFactory::new_with_ffi_codec(factory, None, codec); | ||
|
|
||
| PyCapsule::new(py, factory, Some(name)) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -37,6 +37,7 @@ | |||||||||||||||||||||||||
| CatalogProviderExportable, | ||||||||||||||||||||||||||
| CatalogProviderList, | ||||||||||||||||||||||||||
| CatalogProviderListExportable, | ||||||||||||||||||||||||||
| TableProviderFactoryExportable, | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| from datafusion.dataframe import DataFrame | ||||||||||||||||||||||||||
| from datafusion.expr import sort_list_to_raw_sort_list | ||||||||||||||||||||||||||
|
|
@@ -830,6 +831,20 @@ def deregister_table(self, name: str) -> None: | |||||||||||||||||||||||||
| """Remove a table from the session.""" | ||||||||||||||||||||||||||
| self.ctx.deregister_table(name) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| def register_table_factory( | ||||||||||||||||||||||||||
| self, format: str, factory: TableProviderFactoryExportable | ||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||
| """Register a :py:class:`~datafusion.TableProviderFactoryExportable`. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| The registered factory can be reference from SQL DDL statements executed | ||||||||||||||||||||||||||
| against this context. | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||
| format: The value to be used in `STORED AS ${format}` clause. | ||||||||||||||||||||||||||
| factory: A PyCapsule that implements TableProviderFactoryExportable" | ||||||||||||||||||||||||||
|
Comment on lines
+839
to
+844
|
||||||||||||||||||||||||||
| The registered factory can be reference from SQL DDL statements executed | |
| against this context. | |
| Args: | |
| format: The value to be used in `STORED AS ${format}` clause. | |
| factory: A PyCapsule that implements TableProviderFactoryExportable" | |
| The registered factory can be referenced from SQL DDL statements executed | |
| against this context. | |
| Args: | |
| format: The value to be used in `STORED AS ${format}` clause. | |
| factory: An object implementing :class:`TableProviderFactoryExportable`. |
Copilot
AI
Mar 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No core test coverage is added for SessionContext.register_table_factory. The new test is under examples/..., but the main pytest testpaths (pyproject.toml) only include python/tests and python/datafusion, so this behavior likely won’t run in CI. Consider adding a unit/integration test under python/tests that registers a factory and exercises CREATE EXTERNAL TABLE end-to-end.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -27,7 +27,7 @@ use arrow::pyarrow::FromPyArrow; | |||||||||||||||||||||||||||
| use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef}; | ||||||||||||||||||||||||||||
| use datafusion::arrow::pyarrow::PyArrowType; | ||||||||||||||||||||||||||||
| use datafusion::arrow::record_batch::RecordBatch; | ||||||||||||||||||||||||||||
| use datafusion::catalog::{CatalogProvider, CatalogProviderList}; | ||||||||||||||||||||||||||||
| use datafusion::catalog::{CatalogProvider, CatalogProviderList, TableProviderFactory}; | ||||||||||||||||||||||||||||
| use datafusion::common::{ScalarValue, TableReference, exec_err}; | ||||||||||||||||||||||||||||
| use datafusion::datasource::file_format::file_compression_type::FileCompressionType; | ||||||||||||||||||||||||||||
| use datafusion::datasource::file_format::parquet::ParquetFormat; | ||||||||||||||||||||||||||||
|
|
@@ -51,6 +51,7 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider; | |||||||||||||||||||||||||||
| use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList; | ||||||||||||||||||||||||||||
| use datafusion_ffi::execution::FFI_TaskContextProvider; | ||||||||||||||||||||||||||||
| use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; | ||||||||||||||||||||||||||||
| use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory; | ||||||||||||||||||||||||||||
| use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec; | ||||||||||||||||||||||||||||
| use object_store::ObjectStore; | ||||||||||||||||||||||||||||
| use pyo3::IntoPyObjectExt; | ||||||||||||||||||||||||||||
|
|
@@ -659,6 +660,34 @@ impl PySessionContext { | |||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| pub fn register_table_factory( | ||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||
| format: &str, | ||||||||||||||||||||||||||||
| factory: Bound<'_, PyAny>, | ||||||||||||||||||||||||||||
| ) -> PyDataFusionResult<()> { | ||||||||||||||||||||||||||||
| let py = factory.py(); | ||||||||||||||||||||||||||||
| let codec_capsule = create_logical_extension_capsule(py, self.logical_codec.as_ref())?; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| let capsule = factory | ||||||||||||||||||||||||||||
| .getattr("__datafusion_table_provider_factory__")? | ||||||||||||||||||||||||||||
| .call1((codec_capsule,))?; | ||||||||||||||||||||||||||||
| let capsule = capsule.cast::<PyCapsule>().map_err(py_datafusion_err)?; | ||||||||||||||||||||||||||||
|
Comment on lines
+671
to
+674
|
||||||||||||||||||||||||||||
| let capsule = factory | |
| .getattr("__datafusion_table_provider_factory__")? | |
| .call1((codec_capsule,))?; | |
| let capsule = capsule.cast::<PyCapsule>().map_err(py_datafusion_err)?; | |
| // Support both exportable factory objects and already-exported PyCapsules, | |
| // mirroring the pattern used by catalog registration functions. | |
| let capsule = if factory.hasattr("__datafusion_table_provider_factory__")? { | |
| let exporter = factory.getattr("__datafusion_table_provider_factory__")?; | |
| let capsule_obj = exporter.call1((codec_capsule,))?; | |
| capsule_obj.cast::<PyCapsule>().map_err(py_datafusion_err)? | |
| } else { | |
| factory.cast::<PyCapsule>().map_err(py_datafusion_err)? | |
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused imports
pyarrow as paandpytestwill be flagged by the repo's Ruff configuration forexamples/*(no per-file ignore for F401). Please remove them or use them in the test.