Skip to content

Commit db0a237

Browse files
committed
More moving around of utils to clean up
1 parent 3ae769f commit db0a237

File tree

21 files changed

+54
-148
lines changed

21 files changed

+54
-148
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ use arrow::array::{Array, ArrayRef};
2222
use arrow::datatypes::{Field, FieldRef};
2323
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
2424
use arrow::pyarrow::ToPyArrow;
25+
use datafusion_python_util::validate_pycapsule;
2526
use pyo3::ffi::c_str;
2627
use pyo3::prelude::{PyAnyMethods, PyCapsuleMethods};
2728
use pyo3::types::PyCapsule;
2829
use pyo3::{Bound, PyAny, PyResult, Python, pyclass, pymethods};
2930

3031
use crate::errors::PyDataFusionResult;
31-
use crate::utils::validate_pycapsule;
3232

3333
/// A Python object which implements the Arrow PyCapsule for importing
3434
/// into other libraries.

crates/core/src/catalog.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,20 @@ use datafusion::datasource::TableProvider;
3030
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
3131
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
3232
use datafusion_ffi::schema_provider::FFI_SchemaProvider;
33+
use datafusion_python_util::{
34+
create_logical_extension_capsule, ffi_logical_codec_from_pycapsule, validate_pycapsule,
35+
wait_for_future,
36+
};
3337
use pyo3::IntoPyObjectExt;
3438
use pyo3::exceptions::PyKeyError;
3539
use pyo3::ffi::c_str;
3640
use pyo3::prelude::*;
3741
use pyo3::types::PyCapsule;
3842

43+
use crate::context::PySessionContext;
3944
use crate::dataset::Dataset;
4045
use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err, to_datafusion_err};
4146
use crate::table::PyTable;
42-
use crate::utils::{
43-
create_logical_extension_capsule, extract_logical_extension_codec, validate_pycapsule,
44-
wait_for_future,
45-
};
4647

4748
#[pyclass(
4849
from_py_object,
@@ -710,6 +711,17 @@ fn extract_schema_provider_from_pyobj(
710711
Ok(provider)
711712
}
712713

714+
fn extract_logical_extension_codec(
715+
py: Python,
716+
obj: Option<Bound<PyAny>>,
717+
) -> PyResult<Arc<FFI_LogicalExtensionCodec>> {
718+
let obj = match obj {
719+
Some(obj) => obj,
720+
None => PySessionContext::global_ctx()?.into_bound_py_any(py)?,
721+
};
722+
ffi_logical_codec_from_pycapsule(obj).map(Arc::new)
723+
}
724+
713725
pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
714726
m.add_class::<PyCatalog>()?;
715727
m.add_class::<PySchema>()?;

crates/core/src/context.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
5252
use datafusion_ffi::execution::FFI_TaskContextProvider;
5353
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
5454
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
55+
use datafusion_python_util::{
56+
create_logical_extension_capsule, ffi_logical_codec_from_pycapsule, get_global_ctx,
57+
get_tokio_runtime, spawn_future, validate_pycapsule, wait_for_future,
58+
};
5559
use object_store::ObjectStore;
5660
use pyo3::IntoPyObjectExt;
5761
use pyo3::exceptions::{PyKeyError, PyValueError};
@@ -82,10 +86,6 @@ use crate::udaf::PyAggregateUDF;
8286
use crate::udf::PyScalarUDF;
8387
use crate::udtf::PyTableFunction;
8488
use crate::udwf::PyWindowUDF;
85-
use crate::utils::{
86-
create_logical_extension_capsule, extract_logical_extension_codec, get_global_ctx,
87-
get_tokio_runtime, spawn_future, validate_pycapsule, wait_for_future,
88-
};
8989

9090
/// Configuration options for a SessionContext
9191
#[pyclass(
@@ -1187,8 +1187,7 @@ impl PySessionContext {
11871187
&self,
11881188
codec: Bound<'py, PyAny>,
11891189
) -> PyDataFusionResult<Self> {
1190-
let py = codec.py();
1191-
let logical_codec = extract_logical_extension_codec(py, Some(codec))?;
1190+
let logical_codec = Arc::new(ffi_logical_codec_from_pycapsule(codec)?);
11921191

11931192
Ok({
11941193
Self {

crates/core/src/dataframe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion::logical_expr::SortExpr;
4141
use datafusion::logical_expr::dml::InsertOp;
4242
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
4343
use datafusion::prelude::*;
44+
use datafusion_python_util::{is_ipython_env, spawn_future, validate_pycapsule, wait_for_future};
4445
use futures::{StreamExt, TryStreamExt};
4546
use parking_lot::Mutex;
4647
use pyo3::PyErr;
@@ -58,7 +59,6 @@ use crate::physical_plan::PyExecutionPlan;
5859
use crate::record_batch::{PyRecordBatchStream, poll_next_batch};
5960
use crate::sql::logical::PyLogicalPlan;
6061
use crate::table::{PyTable, TempViewTable};
61-
use crate::utils::{is_ipython_env, spawn_future, validate_pycapsule, wait_for_future};
6262

6363
/// File-level static CStr for the Arrow array stream capsule name.
6464
static ARROW_ARRAY_STREAM_NAME: &CStr = cstr!("arrow_array_stream");

crates/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ mod udaf;
6262
mod udf;
6363
pub mod udtf;
6464
mod udwf;
65-
pub mod utils;
6665

6766
#[cfg(feature = "mimalloc")]
6867
#[global_allocator]

crates/core/src/record_batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ use std::sync::Arc;
2020
use datafusion::arrow::pyarrow::ToPyArrow;
2121
use datafusion::arrow::record_batch::RecordBatch;
2222
use datafusion::physical_plan::SendableRecordBatchStream;
23+
use datafusion_python_util::wait_for_future;
2324
use futures::StreamExt;
2425
use pyo3::exceptions::{PyStopAsyncIteration, PyStopIteration};
2526
use pyo3::prelude::*;
2627
use pyo3::{PyAny, PyResult, Python, pyclass, pymethods};
2728
use tokio::sync::Mutex;
2829

2930
use crate::errors::PyDataFusionError;
30-
use crate::utils::wait_for_future;
3131

3232
#[pyclass(name = "RecordBatch", module = "datafusion", subclass, frozen)]
3333
pub struct PyRecordBatch {

crates/core/src/substrait.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use datafusion_python_util::wait_for_future;
1819
use datafusion_substrait::logical_plan::{consumer, producer};
1920
use datafusion_substrait::serializer;
2021
use datafusion_substrait::substrait::proto::Plan;
@@ -25,7 +26,6 @@ use pyo3::types::PyBytes;
2526
use crate::context::PySessionContext;
2627
use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err, to_datafusion_err};
2728
use crate::sql::logical::PyLogicalPlan;
28-
use crate::utils::wait_for_future;
2929

3030
#[pyclass(
3131
from_py_object,

crates/core/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use datafusion::datasource::{TableProvider, TableType};
2727
use datafusion::logical_expr::{Expr, LogicalPlanBuilder, TableProviderFilterPushDown};
2828
use datafusion::physical_plan::ExecutionPlan;
2929
use datafusion::prelude::DataFrame;
30+
use datafusion_python_util::table_provider_from_pycapsule;
3031
use pyo3::IntoPyObjectExt;
3132
use pyo3::prelude::*;
3233

3334
use crate::context::PySessionContext;
3435
use crate::dataframe::PyDataFrame;
3536
use crate::dataset::Dataset;
36-
use crate::utils::table_provider_from_pycapsule;
3737

3838
/// This struct is used as a common method for all TableProviders,
3939
/// whether they refer to an FFI provider, an internally known

crates/core/src/udaf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ use datafusion::logical_expr::{
2727
Accumulator, AccumulatorFactoryFunction, AggregateUDF, AggregateUDFImpl, create_udaf,
2828
};
2929
use datafusion_ffi::udaf::FFI_AggregateUDF;
30+
use datafusion_python_util::{parse_volatility, validate_pycapsule};
3031
use pyo3::ffi::c_str;
3132
use pyo3::prelude::*;
3233
use pyo3::types::{PyCapsule, PyTuple};
3334

3435
use crate::common::data_type::PyScalarValue;
3536
use crate::errors::{PyDataFusionResult, py_datafusion_err, to_datafusion_err};
3637
use crate::expr::PyExpr;
37-
use crate::utils::{parse_volatility, validate_pycapsule};
3838

3939
#[derive(Debug)]
4040
struct RustAccumulator {

0 commit comments

Comments
 (0)