From bc1c0ea78ffceb9d85ec9a42c59917cb7856b019 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 19 Feb 2025 18:28:09 -0500 Subject: [PATCH 1/4] pyo3 update required changes to deprecated interfaces --- src/config.rs | 10 +++--- src/context.rs | 12 +++---- src/dataframe.rs | 17 +++++---- src/dataset.rs | 2 +- src/dataset_exec.rs | 8 ++--- src/errors.rs | 4 +++ src/expr.rs | 61 ++++++++++++++++---------------- src/expr/aggregate.rs | 6 ++-- src/expr/analyze.rs | 6 ++-- src/expr/create_memory_table.rs | 6 ++-- src/expr/create_view.rs | 6 ++-- src/expr/distinct.rs | 6 ++-- src/expr/drop_table.rs | 6 ++-- src/expr/empty_relation.rs | 6 ++-- src/expr/explain.rs | 6 ++-- src/expr/extension.rs | 6 ++-- src/expr/filter.rs | 6 ++-- src/expr/join.rs | 6 ++-- src/expr/limit.rs | 6 ++-- src/expr/literal.rs | 6 ++-- src/expr/logical_node.rs | 4 +-- src/expr/projection.rs | 6 ++-- src/expr/repartition.rs | 6 ++-- src/expr/sort.rs | 6 ++-- src/expr/subquery.rs | 6 ++-- src/expr/subquery_alias.rs | 6 ++-- src/expr/table_scan.rs | 6 ++-- src/expr/union.rs | 6 ++-- src/expr/unnest.rs | 6 ++-- src/expr/window.rs | 6 ++-- src/lib.rs | 8 ++--- src/physical_plan.rs | 2 +- src/pyarrow_filter_expression.rs | 36 ++++++++++--------- src/pyarrow_util.rs | 4 +-- src/sql/logical.rs | 4 +-- src/udaf.rs | 5 +-- src/udf.rs | 5 +-- src/udwf.rs | 49 ++++++++++++++----------- 38 files changed, 190 insertions(+), 173 deletions(-) diff --git a/src/config.rs b/src/config.rs index cc725b9a3..667d5c590 100644 --- a/src/config.rs +++ b/src/config.rs @@ -47,14 +47,14 @@ impl PyConfig { } /// Get a configuration option - pub fn get(&mut self, key: &str, py: Python) -> PyResult { + pub fn get<'py>(&mut self, key: &str, py: Python<'py>) -> PyResult> { let options = self.config.to_owned(); for entry in options.entries() { if entry.key == key { - return Ok(entry.value.into_py(py)); + return Ok(entry.value.into_pyobject(py)?); } } - Ok(None::.into_py(py)) + Ok(None::.into_pyobject(py)?) } /// Set a configuration option @@ -66,10 +66,10 @@ impl PyConfig { /// Get all configuration options pub fn get_all(&mut self, py: Python) -> PyResult { - let dict = PyDict::new_bound(py); + let dict = PyDict::new(py); let options = self.config.to_owned(); for entry in options.entries() { - dict.set_item(entry.key, entry.value.clone().into_py(py))?; + dict.set_item(entry.key, entry.value.clone().into_pyobject(py)?)?; } Ok(dict.into()) } diff --git a/src/context.rs b/src/context.rs index ebe7db230..0f962638e 100644 --- a/src/context.rs +++ b/src/context.rs @@ -458,8 +458,8 @@ impl PySessionContext { let py = data.py(); // Instantiate pyarrow Table object & convert to Arrow Table - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[data]); + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[data])?; let table = table_class.call_method1("from_pylist", args)?; // Convert Arrow Table to datafusion DataFrame @@ -478,8 +478,8 @@ impl PySessionContext { let py = data.py(); // Instantiate pyarrow Table object & convert to Arrow Table - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[data]); + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[data])?; let table = table_class.call_method1("from_pydict", args)?; // Convert Arrow Table to datafusion DataFrame @@ -533,8 +533,8 @@ impl PySessionContext { let py = data.py(); // Instantiate pyarrow Table object & convert to Arrow Table - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[data]); + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[data])?; let table = table_class.call_method1("from_pandas", args)?; // Convert Arrow Table to datafusion DataFrame diff --git a/src/dataframe.rs b/src/dataframe.rs index 13d7ae838..ed9578a71 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -545,12 +545,12 @@ impl PyDataFrame { /// Convert to Arrow Table /// Collect the batches and pass to Arrow Table fn to_arrow_table(&self, py: Python<'_>) -> PyResult { - let batches = self.collect(py)?.to_object(py); - let schema: PyObject = self.schema().into_pyobject(py)?.to_object(py); + let batches = self.collect(py)?.into_pyobject(py)?; + let schema = self.schema().into_pyobject(py)?; // Instantiate pyarrow Table object and use its from_batches method - let table_class = py.import_bound("pyarrow")?.getattr("Table")?; - let args = PyTuple::new_bound(py, &[batches, schema]); + let table_class = py.import("pyarrow")?.getattr("Table")?; + let args = PyTuple::new(py, &[batches, schema])?; let table: PyObject = table_class.call_method1("from_batches", args)?.into(); Ok(table) } @@ -585,8 +585,7 @@ impl PyDataFrame { let ffi_stream = FFI_ArrowArrayStream::new(reader); let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); - PyCapsule::new_bound(py, ffi_stream, Some(stream_capsule_name)) - .map_err(PyDataFusionError::from) + PyCapsule::new(py, ffi_stream, Some(stream_capsule_name)).map_err(PyDataFusionError::from) } fn execute_stream(&self, py: Python) -> PyDataFusionResult { @@ -649,8 +648,8 @@ impl PyDataFrame { /// Collect the batches, pass to Arrow Table & then convert to polars DataFrame fn to_polars(&self, py: Python<'_>) -> PyResult { let table = self.to_arrow_table(py)?; - let dataframe = py.import_bound("polars")?.getattr("DataFrame")?; - let args = PyTuple::new_bound(py, &[table]); + let dataframe = py.import("polars")?.getattr("DataFrame")?; + let args = PyTuple::new(py, &[table])?; let result: PyObject = dataframe.call1(args)?.into(); Ok(result) } @@ -673,7 +672,7 @@ fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> { // Import the Python 'builtins' module to access the print function // Note that println! does not print to the Python debug console and is not visible in notebooks for instance - let print = py.import_bound("builtins")?.getattr("print")?; + let print = py.import("builtins")?.getattr("print")?; print.call1((result,))?; Ok(()) } diff --git a/src/dataset.rs b/src/dataset.rs index a8fa21ec5..0baf4da2a 100644 --- a/src/dataset.rs +++ b/src/dataset.rs @@ -48,7 +48,7 @@ impl Dataset { // Creates a Python PyArrow.Dataset pub fn new(dataset: &Bound<'_, PyAny>, py: Python) -> PyResult { // Ensure that we were passed an instance of pyarrow.dataset.Dataset - let ds = PyModule::import_bound(py, "pyarrow.dataset")?; + let ds = PyModule::import(py, "pyarrow.dataset")?; let ds_attr = ds.getattr("Dataset")?; let ds_type = ds_attr.downcast::()?; if dataset.is_instance(ds_type)? { diff --git a/src/dataset_exec.rs b/src/dataset_exec.rs index ace42115b..445e4fe74 100644 --- a/src/dataset_exec.rs +++ b/src/dataset_exec.rs @@ -104,7 +104,7 @@ impl DatasetExec { }) .transpose()?; - let kwargs = PyDict::new_bound(py); + let kwargs = PyDict::new(py); kwargs.set_item("columns", columns.clone())?; kwargs.set_item( @@ -121,7 +121,7 @@ impl DatasetExec { .0, ); - let builtins = Python::import_bound(py, "builtins")?; + let builtins = Python::import(py, "builtins")?; let pylist = builtins.getattr("list")?; // Get the fragments or partitions of the dataset @@ -198,7 +198,7 @@ impl ExecutionPlan for DatasetExec { let dataset_schema = dataset .getattr("schema") .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; - let kwargs = PyDict::new_bound(py); + let kwargs = PyDict::new(py); kwargs .set_item("columns", self.columns.clone()) .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; @@ -223,7 +223,7 @@ impl ExecutionPlan for DatasetExec { let record_batches: Bound<'_, PyIterator> = scanner .call_method0("to_batches") .map_err(|err| InnerDataFusionError::External(Box::new(err)))? - .iter() + .try_iter() .map_err(|err| InnerDataFusionError::External(Box::new(err)))?; let record_batches = PyArrowBatchesAdapter { diff --git a/src/errors.rs b/src/errors.rs index b02b754a2..f1d5aeb23 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -91,3 +91,7 @@ pub fn py_datafusion_err(e: impl Debug) -> PyErr { pub fn py_unsupported_variant_err(e: impl Debug) -> PyErr { PyErr::new::(format!("{e:?}")) } + +pub fn to_datafusion_err(e: impl Debug) -> InnerDataFusionError { + InnerDataFusionError::Execution(format!("{e:?}")) +} diff --git a/src/expr.rs b/src/expr.rs index 1e9983d42..e750be6a4 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -19,6 +19,7 @@ use datafusion::logical_expr::utils::exprlist_to_fields; use datafusion::logical_expr::{ ExprFuncBuilder, ExprFunctionExt, LogicalPlan, WindowFunctionDefinition, }; +use pyo3::IntoPyObjectExt; use pyo3::{basic::CompareOp, prelude::*}; use std::convert::{From, Into}; use std::sync::Arc; @@ -126,35 +127,35 @@ pub fn py_expr_list(expr: &[Expr]) -> PyResult> { #[pymethods] impl PyExpr { /// Return the specific expression - fn to_variant(&self, py: Python) -> PyResult { + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { Python::with_gil(|_| { match &self.expr { - Expr::Alias(alias) => Ok(PyAlias::from(alias.clone()).into_py(py)), - Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)), + Expr::Alias(alias) => Ok(PyAlias::from(alias.clone()).into_bound_py_any(py)?), + Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_bound_py_any(py)?), Expr::ScalarVariable(data_type, variables) => { - Ok(PyScalarVariable::new(data_type, variables).into_py(py)) + Ok(PyScalarVariable::new(data_type, variables).into_bound_py_any(py)?) } - Expr::Like(value) => Ok(PyLike::from(value.clone()).into_py(py)), - Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)), - Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)), - Expr::Not(expr) => Ok(PyNot::new(*expr.clone()).into_py(py)), - Expr::IsNotNull(expr) => Ok(PyIsNotNull::new(*expr.clone()).into_py(py)), - Expr::IsNull(expr) => Ok(PyIsNull::new(*expr.clone()).into_py(py)), - Expr::IsTrue(expr) => Ok(PyIsTrue::new(*expr.clone()).into_py(py)), - Expr::IsFalse(expr) => Ok(PyIsFalse::new(*expr.clone()).into_py(py)), - Expr::IsUnknown(expr) => Ok(PyIsUnknown::new(*expr.clone()).into_py(py)), - Expr::IsNotTrue(expr) => Ok(PyIsNotTrue::new(*expr.clone()).into_py(py)), - Expr::IsNotFalse(expr) => Ok(PyIsNotFalse::new(*expr.clone()).into_py(py)), - Expr::IsNotUnknown(expr) => Ok(PyIsNotUnknown::new(*expr.clone()).into_py(py)), - Expr::Negative(expr) => Ok(PyNegative::new(*expr.clone()).into_py(py)), + Expr::Like(value) => Ok(PyLike::from(value.clone()).into_bound_py_any(py)?), + Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_bound_py_any(py)?), + Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_bound_py_any(py)?), + Expr::Not(expr) => Ok(PyNot::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsNotNull(expr) => Ok(PyIsNotNull::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsNull(expr) => Ok(PyIsNull::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsTrue(expr) => Ok(PyIsTrue::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsFalse(expr) => Ok(PyIsFalse::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsUnknown(expr) => Ok(PyIsUnknown::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsNotTrue(expr) => Ok(PyIsNotTrue::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsNotFalse(expr) => Ok(PyIsNotFalse::new(*expr.clone()).into_bound_py_any(py)?), + Expr::IsNotUnknown(expr) => Ok(PyIsNotUnknown::new(*expr.clone()).into_bound_py_any(py)?), + Expr::Negative(expr) => Ok(PyNegative::new(*expr.clone()).into_bound_py_any(py)?), Expr::AggregateFunction(expr) => { - Ok(PyAggregateFunction::from(expr.clone()).into_py(py)) + Ok(PyAggregateFunction::from(expr.clone()).into_bound_py_any(py)?) } - Expr::SimilarTo(value) => Ok(PySimilarTo::from(value.clone()).into_py(py)), - Expr::Between(value) => Ok(between::PyBetween::from(value.clone()).into_py(py)), - Expr::Case(value) => Ok(case::PyCase::from(value.clone()).into_py(py)), - Expr::Cast(value) => Ok(cast::PyCast::from(value.clone()).into_py(py)), - Expr::TryCast(value) => Ok(cast::PyTryCast::from(value.clone()).into_py(py)), + Expr::SimilarTo(value) => Ok(PySimilarTo::from(value.clone()).into_bound_py_any(py)?), + Expr::Between(value) => Ok(between::PyBetween::from(value.clone()).into_bound_py_any(py)?), + Expr::Case(value) => Ok(case::PyCase::from(value.clone()).into_bound_py_any(py)?), + Expr::Cast(value) => Ok(cast::PyCast::from(value.clone()).into_bound_py_any(py)?), + Expr::TryCast(value) => Ok(cast::PyTryCast::from(value.clone()).into_bound_py_any(py)?), Expr::ScalarFunction(value) => Err(py_unsupported_variant_err(format!( "Converting Expr::ScalarFunction to a Python object is not implemented: {:?}", value @@ -163,29 +164,29 @@ impl PyExpr { "Converting Expr::WindowFunction to a Python object is not implemented: {:?}", value ))), - Expr::InList(value) => Ok(in_list::PyInList::from(value.clone()).into_py(py)), - Expr::Exists(value) => Ok(exists::PyExists::from(value.clone()).into_py(py)), + Expr::InList(value) => Ok(in_list::PyInList::from(value.clone()).into_bound_py_any(py)?), + Expr::Exists(value) => Ok(exists::PyExists::from(value.clone()).into_bound_py_any(py)?), Expr::InSubquery(value) => { - Ok(in_subquery::PyInSubquery::from(value.clone()).into_py(py)) + Ok(in_subquery::PyInSubquery::from(value.clone()).into_bound_py_any(py)?) } Expr::ScalarSubquery(value) => { - Ok(scalar_subquery::PyScalarSubquery::from(value.clone()).into_py(py)) + Ok(scalar_subquery::PyScalarSubquery::from(value.clone()).into_bound_py_any(py)?) } Expr::Wildcard { qualifier, options } => Err(py_unsupported_variant_err(format!( "Converting Expr::Wildcard to a Python object is not implemented : {:?} {:?}", qualifier, options ))), Expr::GroupingSet(value) => { - Ok(grouping_set::PyGroupingSet::from(value.clone()).into_py(py)) + Ok(grouping_set::PyGroupingSet::from(value.clone()).into_bound_py_any(py)?) } Expr::Placeholder(value) => { - Ok(placeholder::PyPlaceholder::from(value.clone()).into_py(py)) + Ok(placeholder::PyPlaceholder::from(value.clone()).into_bound_py_any(py)?) } Expr::OuterReferenceColumn(data_type, column) => Err(py_unsupported_variant_err(format!( "Converting Expr::OuterReferenceColumn to a Python object is not implemented: {:?} - {:?}", data_type, column ))), - Expr::Unnest(value) => Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_py(py)), + Expr::Unnest(value) => Ok(unnest_expr::PyUnnestExpr::from(value.clone()).into_bound_py_any(py)?), } }) } diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs index 389bfb332..8fc9da5b0 100644 --- a/src/expr/aggregate.rs +++ b/src/expr/aggregate.rs @@ -19,7 +19,7 @@ use datafusion::common::DataFusionError; use datafusion::logical_expr::expr::{AggregateFunction, Alias}; use datafusion::logical_expr::logical_plan::Aggregate; use datafusion::logical_expr::Expr; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use super::logical_node::LogicalNode; @@ -151,7 +151,7 @@ impl LogicalNode for PyAggregate { vec![PyLogicalPlan::from((*self.aggregate.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs index 084513971..62f93cd26 100644 --- a/src/expr/analyze.rs +++ b/src/expr/analyze.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::logical_plan::Analyze; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use super::logical_node::LogicalNode; @@ -78,7 +78,7 @@ impl LogicalNode for PyAnalyze { vec![PyLogicalPlan::from((*self.analyze.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_memory_table.rs b/src/expr/create_memory_table.rs index 01ebb66b0..8872b2d47 100644 --- a/src/expr/create_memory_table.rs +++ b/src/expr/create_memory_table.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::CreateMemoryTable; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -91,7 +91,7 @@ impl LogicalNode for PyCreateMemoryTable { vec![PyLogicalPlan::from((*self.create.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/create_view.rs b/src/expr/create_view.rs index d119f5c21..87bb76876 100644 --- a/src/expr/create_view.rs +++ b/src/expr/create_view.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::{CreateView, DdlStatement, LogicalPlan}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{errors::py_type_err, sql::logical::PyLogicalPlan}; @@ -88,8 +88,8 @@ impl LogicalNode for PyCreateView { vec![PyLogicalPlan::from((*self.create.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/distinct.rs b/src/expr/distinct.rs index 061ab4824..b62b776f8 100644 --- a/src/expr/distinct.rs +++ b/src/expr/distinct.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::Distinct; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -89,7 +89,7 @@ impl LogicalNode for PyDistinct { } } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/drop_table.rs b/src/expr/drop_table.rs index 330156abe..96983c1cf 100644 --- a/src/expr/drop_table.rs +++ b/src/expr/drop_table.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::logical_plan::DropTable; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -83,7 +83,7 @@ impl LogicalNode for PyDropTable { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/empty_relation.rs b/src/expr/empty_relation.rs index ce7163466..a1534ac15 100644 --- a/src/expr/empty_relation.rs +++ b/src/expr/empty_relation.rs @@ -17,7 +17,7 @@ use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; use datafusion::logical_expr::EmptyRelation; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use super::logical_node::LogicalNode; @@ -79,7 +79,7 @@ impl LogicalNode for PyEmptyRelation { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/explain.rs b/src/expr/explain.rs index 8e7fb8843..fc02fe2b5 100644 --- a/src/expr/explain.rs +++ b/src/expr/explain.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::{logical_plan::Explain, LogicalPlan}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::df_schema::PyDFSchema, errors::py_type_err, sql::logical::PyLogicalPlan}; @@ -104,7 +104,7 @@ impl LogicalNode for PyExplain { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/extension.rs b/src/expr/extension.rs index a29802b0b..1e3fbb199 100644 --- a/src/expr/extension.rs +++ b/src/expr/extension.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::Extension; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -46,7 +46,7 @@ impl LogicalNode for PyExtension { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/filter.rs b/src/expr/filter.rs index a6d8aa7ee..9bdb667cd 100644 --- a/src/expr/filter.rs +++ b/src/expr/filter.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::logical_plan::Filter; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -81,7 +81,7 @@ impl LogicalNode for PyFilter { vec![PyLogicalPlan::from((*self.filter.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/join.rs b/src/expr/join.rs index 66e677f8a..76ec532e7 100644 --- a/src/expr/join.rs +++ b/src/expr/join.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::logical_plan::{Join, JoinConstraint, JoinType}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -193,7 +193,7 @@ impl LogicalNode for PyJoin { ] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/limit.rs b/src/expr/limit.rs index 84ad7d68b..c2a33ff89 100644 --- a/src/expr/limit.rs +++ b/src/expr/limit.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::logical_plan::Limit; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -90,7 +90,7 @@ impl LogicalNode for PyLimit { vec![PyLogicalPlan::from((*self.limit.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/literal.rs b/src/expr/literal.rs index 2cb2079f1..a660ac914 100644 --- a/src/expr/literal.rs +++ b/src/expr/literal.rs @@ -17,7 +17,7 @@ use crate::errors::PyDataFusionError; use datafusion::common::ScalarValue; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; #[pyclass(name = "Literal", module = "datafusion.expr", subclass)] #[derive(Clone)] @@ -144,8 +144,8 @@ impl PyLiteral { } #[allow(clippy::wrong_self_convention)] - fn into_type(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn into_type<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } fn __repr__(&self) -> PyResult { diff --git a/src/expr/logical_node.rs b/src/expr/logical_node.rs index 757e4f94b..5aff70059 100644 --- a/src/expr/logical_node.rs +++ b/src/expr/logical_node.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use pyo3::{PyObject, PyResult, Python}; +use pyo3::{Bound, PyAny, PyResult, Python}; use crate::sql::logical::PyLogicalPlan; @@ -25,5 +25,5 @@ pub trait LogicalNode { /// The input plan to the current logical node instance. fn inputs(&self) -> Vec; - fn to_variant(&self, py: Python) -> PyResult; + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult>; } diff --git a/src/expr/projection.rs b/src/expr/projection.rs index 36534fdb2..dc7e5e3c1 100644 --- a/src/expr/projection.rs +++ b/src/expr/projection.rs @@ -17,7 +17,7 @@ use datafusion::logical_expr::logical_plan::Projection; use datafusion::logical_expr::Expr; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -113,7 +113,7 @@ impl LogicalNode for PyProjection { vec![PyLogicalPlan::from((*self.projection.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/repartition.rs b/src/expr/repartition.rs index 4e680e181..3e782d6af 100644 --- a/src/expr/repartition.rs +++ b/src/expr/repartition.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::{logical_plan::Repartition, Expr, Partitioning}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{errors::py_type_err, sql::logical::PyLogicalPlan}; @@ -121,7 +121,7 @@ impl LogicalNode for PyRepartition { vec![PyLogicalPlan::from((*self.repartition.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/sort.rs b/src/expr/sort.rs index a1803ccaf..ed4947591 100644 --- a/src/expr/sort.rs +++ b/src/expr/sort.rs @@ -17,7 +17,7 @@ use datafusion::common::DataFusionError; use datafusion::logical_expr::logical_plan::Sort; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -96,7 +96,7 @@ impl LogicalNode for PySort { vec![PyLogicalPlan::from((*self.sort.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/subquery.rs b/src/expr/subquery.rs index dac8d0a2b..5ebfe6927 100644 --- a/src/expr/subquery.rs +++ b/src/expr/subquery.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::Subquery; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::sql::logical::PyLogicalPlan; @@ -75,7 +75,7 @@ impl LogicalNode for PySubquery { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/subquery_alias.rs b/src/expr/subquery_alias.rs index a83cff96d..267a4d485 100644 --- a/src/expr/subquery_alias.rs +++ b/src/expr/subquery_alias.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Display, Formatter}; use datafusion::logical_expr::SubqueryAlias; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; @@ -85,7 +85,7 @@ impl LogicalNode for PySubqueryAlias { vec![PyLogicalPlan::from((*self.subquery_alias.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs index f61be7fe4..6a0d53f0f 100644 --- a/src/expr/table_scan.rs +++ b/src/expr/table_scan.rs @@ -17,7 +17,7 @@ use datafusion::common::TableReference; use datafusion::logical_expr::logical_plan::TableScan; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::expr::logical_node::LogicalNode; @@ -146,7 +146,7 @@ impl LogicalNode for PyTableScan { vec![] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/union.rs b/src/expr/union.rs index 62488d9a1..5a08ccc13 100644 --- a/src/expr/union.rs +++ b/src/expr/union.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::logical_plan::Union; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -83,7 +83,7 @@ impl LogicalNode for PyUnion { .collect() } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/unnest.rs b/src/expr/unnest.rs index adc705035..8e70e0990 100644 --- a/src/expr/unnest.rs +++ b/src/expr/unnest.rs @@ -16,7 +16,7 @@ // under the License. use datafusion::logical_expr::logical_plan::Unnest; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::df_schema::PyDFSchema; @@ -79,7 +79,7 @@ impl LogicalNode for PyUnnest { vec![PyLogicalPlan::from((*self.unnest_.input).clone())] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/expr/window.rs b/src/expr/window.rs index 4dc6cb9c9..13deaec25 100644 --- a/src/expr/window.rs +++ b/src/expr/window.rs @@ -18,7 +18,7 @@ use datafusion::common::{DataFusionError, ScalarValue}; use datafusion::logical_expr::expr::WindowFunction; use datafusion::logical_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits}; -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::fmt::{self, Display, Formatter}; use crate::common::data_type::PyScalarValue; @@ -289,7 +289,7 @@ impl LogicalNode for PyWindowExpr { vec![self.window.input.as_ref().clone().into()] } - fn to_variant(&self, py: Python) -> PyResult { - Ok(self.clone().into_py(py)) + fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { + self.clone().into_bound_py_any(py) } } diff --git a/src/lib.rs b/src/lib.rs index 317c3a49a..0310f95d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,21 +94,21 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; // Register `common` as a submodule. Matching `datafusion-common` https://docs.rs/datafusion-common/latest/datafusion_common/ - let common = PyModule::new_bound(py, "common")?; + let common = PyModule::new(py, "common")?; common::init_module(&common)?; m.add_submodule(&common)?; // Register `expr` as a submodule. Matching `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/ - let expr = PyModule::new_bound(py, "expr")?; + let expr = PyModule::new(py, "expr")?; expr::init_module(&expr)?; m.add_submodule(&expr)?; // Register the functions as a submodule - let funcs = PyModule::new_bound(py, "functions")?; + let funcs = PyModule::new(py, "functions")?; functions::init_module(&funcs)?; m.add_submodule(&funcs)?; - let store = PyModule::new_bound(py, "object_store")?; + let store = PyModule::new(py, "object_store")?; store::init_module(&store)?; m.add_submodule(&store)?; diff --git a/src/physical_plan.rs b/src/physical_plan.rs index 295908dc7..f0be45c6a 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -66,7 +66,7 @@ impl PyExecutionPlan { )?; let bytes = proto.encode_to_vec(); - Ok(PyBytes::new_bound(py, &bytes)) + Ok(PyBytes::new(py, &bytes)) } #[staticmethod] diff --git a/src/pyarrow_filter_expression.rs b/src/pyarrow_filter_expression.rs index 314eebf4f..4b4c86597 100644 --- a/src/pyarrow_filter_expression.rs +++ b/src/pyarrow_filter_expression.rs @@ -16,7 +16,7 @@ // under the License. /// Converts a Datafusion logical plan expression (Expr) into a PyArrow compute expression -use pyo3::prelude::*; +use pyo3::{prelude::*, IntoPyObjectExt}; use std::convert::TryFrom; use std::result::Result; @@ -53,24 +53,28 @@ fn operator_to_py<'py>( Ok(py_op) } -fn extract_scalar_list(exprs: &[Expr], py: Python) -> PyDataFusionResult> { +fn extract_scalar_list<'py>( + exprs: &[Expr], + py: Python<'py>, +) -> PyDataFusionResult>> { let ret = exprs .iter() .map(|expr| match expr { // TODO: should we also leverage `ScalarValue::to_pyarrow` here? Expr::Literal(v) => match v { - ScalarValue::Boolean(Some(b)) => Ok(b.into_py(py)), - ScalarValue::Int8(Some(i)) => Ok(i.into_py(py)), - ScalarValue::Int16(Some(i)) => Ok(i.into_py(py)), - ScalarValue::Int32(Some(i)) => Ok(i.into_py(py)), - ScalarValue::Int64(Some(i)) => Ok(i.into_py(py)), - ScalarValue::UInt8(Some(i)) => Ok(i.into_py(py)), - ScalarValue::UInt16(Some(i)) => Ok(i.into_py(py)), - ScalarValue::UInt32(Some(i)) => Ok(i.into_py(py)), - ScalarValue::UInt64(Some(i)) => Ok(i.into_py(py)), - ScalarValue::Float32(Some(f)) => Ok(f.into_py(py)), - ScalarValue::Float64(Some(f)) => Ok(f.into_py(py)), - ScalarValue::Utf8(Some(s)) => Ok(s.into_py(py)), + // The unwraps here are for infallible conversions + ScalarValue::Boolean(Some(b)) => Ok(b.into_bound_py_any(py)?), + ScalarValue::Int8(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::Int16(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::Int32(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::Int64(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::UInt8(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::UInt16(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::UInt32(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::UInt64(Some(i)) => Ok(i.into_bound_py_any(py)?), + ScalarValue::Float32(Some(f)) => Ok(f.into_bound_py_any(py)?), + ScalarValue::Float64(Some(f)) => Ok(f.into_bound_py_any(py)?), + ScalarValue::Utf8(Some(s)) => Ok(s.into_bound_py_any(py)?), _ => Err(PyDataFusionError::Common(format!( "PyArrow can't handle ScalarValue: {v:?}" ))), @@ -98,8 +102,8 @@ impl TryFrom<&Expr> for PyArrowFilterExpression { // https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Expression.html#pyarrow-dataset-expression fn try_from(expr: &Expr) -> Result { Python::with_gil(|py| { - let pc = Python::import_bound(py, "pyarrow.compute")?; - let op_module = Python::import_bound(py, "operator")?; + let pc = Python::import(py, "pyarrow.compute")?; + let op_module = Python::import(py, "operator")?; let pc_expr: PyDataFusionResult> = match expr { Expr::Column(Column { name, .. }) => Ok(pc.getattr("field")?.call1((name,))?), Expr::Literal(scalar) => Ok(scalar_to_pyarrow(scalar, py)?.into_bound(py)), diff --git a/src/pyarrow_util.rs b/src/pyarrow_util.rs index 2b31467f8..cab708458 100644 --- a/src/pyarrow_util.rs +++ b/src/pyarrow_util.rs @@ -33,8 +33,8 @@ impl FromPyArrow for PyScalarValue { let val = value.call_method0("as_py")?; // construct pyarrow array from the python value and pyarrow type - let factory = py.import_bound("pyarrow")?.getattr("array")?; - let args = PyList::new_bound(py, [val]); + let factory = py.import("pyarrow")?.getattr("array")?; + let args = PyList::new(py, [val])?; let array = factory.call1((args, typ))?; // convert the pyarrow array to rust array using C data interface diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 1be33b75f..96561c434 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -64,7 +64,7 @@ impl PyLogicalPlan { #[pymethods] impl PyLogicalPlan { /// Return the specific logical operator - pub fn to_variant(&self, py: Python) -> PyResult { + pub fn to_variant<'py>(&self, py: Python<'py>) -> PyResult> { match self.plan.as_ref() { LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py), LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py), @@ -132,7 +132,7 @@ impl PyLogicalPlan { datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(&self.plan, &codec)?; let bytes = proto.encode_to_vec(); - Ok(PyBytes::new_bound(py, &bytes)) + Ok(PyBytes::new(py, &bytes)) } #[staticmethod] diff --git a/src/udaf.rs b/src/udaf.rs index 5f21533e0..34a9cd51d 100644 --- a/src/udaf.rs +++ b/src/udaf.rs @@ -29,6 +29,7 @@ use datafusion::logical_expr::{ }; use crate::common::data_type::PyScalarValue; +use crate::errors::to_datafusion_err; use crate::expr::PyExpr; use crate::utils::parse_volatility; @@ -73,7 +74,7 @@ impl Accumulator for RustAccumulator { .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) .collect::>(); - let py_args = PyTuple::new_bound(py, py_args); + let py_args = PyTuple::new(py, py_args).map_err(to_datafusion_err)?; // 2. call function self.accum @@ -119,7 +120,7 @@ impl Accumulator for RustAccumulator { .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()) .collect::>(); - let py_args = PyTuple::new_bound(py, py_args); + let py_args = PyTuple::new(py, py_args).map_err(to_datafusion_err)?; // 2. call function self.accum diff --git a/src/udf.rs b/src/udf.rs index 4570e77a6..574c9d7b5 100644 --- a/src/udf.rs +++ b/src/udf.rs @@ -28,6 +28,7 @@ use datafusion::logical_expr::function::ScalarFunctionImplementation; use datafusion::logical_expr::ScalarUDF; use datafusion::logical_expr::{create_udf, ColumnarValue}; +use crate::errors::to_datafusion_err; use crate::expr::PyExpr; use crate::utils::parse_volatility; @@ -46,11 +47,11 @@ fn pyarrow_function_to_rust( .map_err(|e| DataFusionError::Execution(format!("{e:?}"))) }) .collect::, _>>()?; - let py_args = PyTuple::new_bound(py, py_args); + let py_args = PyTuple::new(py, py_args).map_err(to_datafusion_err)?; // 2. call function let value = func - .call_bound(py, py_args, None) + .call(py, py_args, None) .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; // 3. cast to arrow::array::Array diff --git a/src/udwf.rs b/src/udwf.rs index 04a4a1640..a0d557c07 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -27,6 +27,7 @@ use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use crate::common::data_type::PyScalarValue; +use crate::errors::to_datafusion_err; use crate::expr::PyExpr; use crate::utils::parse_volatility; use datafusion::arrow::datatypes::DataType; @@ -56,8 +57,8 @@ impl PartitionEvaluator for RustPartitionEvaluator { fn get_range(&self, idx: usize, n_rows: usize) -> Result> { Python::with_gil(|py| { - let py_args = vec![idx.to_object(py), n_rows.to_object(py)]; - let py_args = PyTuple::new_bound(py, py_args); + let py_args = vec![idx.into_pyobject(py)?, n_rows.into_pyobject(py)?]; + let py_args = PyTuple::new(py, py_args)?; self.evaluator .bind(py) @@ -93,17 +94,17 @@ impl PartitionEvaluator for RustPartitionEvaluator { fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { println!("evaluate all called with number of values {}", values.len()); Python::with_gil(|py| { - let py_values = PyList::new_bound( + let py_values = PyList::new( py, values .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), - ); - let py_num_rows = num_rows.to_object(py).into_bound(py); - let py_args = PyTuple::new_bound( + )?; + let py_num_rows = num_rows.into_pyobject(py)?; + let py_args = PyTuple::new( py, - PyTuple::new_bound(py, vec![py_values.as_any(), &py_num_rows]), - ); + PyTuple::new(py, vec![py_values.as_any(), &py_num_rows])?, + )?; self.evaluator .bind(py) @@ -112,32 +113,34 @@ impl PartitionEvaluator for RustPartitionEvaluator { let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); make_array(array_data) }) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) }) + .map_err(to_datafusion_err) } fn evaluate(&mut self, values: &[ArrayRef], range: &Range) -> Result { Python::with_gil(|py| { - let py_values = PyList::new_bound( + let py_values = PyList::new( py, values .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), - ); - let range_tuple = - PyTuple::new_bound(py, vec![range.start.to_object(py), range.end.to_object(py)]); - let py_args = PyTuple::new_bound( + )?; + let range_tuple = PyTuple::new( py, - PyTuple::new_bound(py, vec![py_values.as_any(), range_tuple.as_any()]), - ); + vec![range.start.into_pyobject(py)?, range.end.into_pyobject(py)?], + )?; + let py_args = PyTuple::new( + py, + PyTuple::new(py, vec![py_values.as_any(), range_tuple.as_any()]), + )?; self.evaluator .bind(py) .call_method1("evaluate", py_args) .and_then(|v| v.extract::()) .map(|v| v.0) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) }) + .map_err(to_datafusion_err) } fn evaluate_all_with_rank( @@ -148,23 +151,27 @@ impl PartitionEvaluator for RustPartitionEvaluator { Python::with_gil(|py| { let ranks = ranks_in_partition .iter() - .map(|r| PyTuple::new_bound(py, vec![r.start, r.end])); + .map(|r| PyTuple::new(py, vec![r.start, r.end])) + .collect::>>()?; // 1. cast args to Pyarrow array - let py_args = vec![num_rows.to_object(py), PyList::new_bound(py, ranks).into()]; + let py_args = vec![ + num_rows.into_pyobject(py)?.into_any(), + PyList::new(py, ranks)?.into_any(), + ]; - let py_args = PyTuple::new_bound(py, py_args); + let py_args = PyTuple::new(py, py_args)?; // 2. call function self.evaluator .bind(py) .call_method1("evaluate_all_with_rank", py_args) - .map_err(|e| DataFusionError::Execution(format!("{e}"))) .map(|v| { let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); make_array(array_data) }) }) + .map_err(to_datafusion_err) } fn supports_bounded_execution(&self) -> bool { From 33efc4fa437abc51d22dbe9654eff30e290aec6e Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 19 Feb 2025 18:36:26 -0500 Subject: [PATCH 2/4] Substrait feature clippy updates --- src/lib.rs | 2 +- src/substrait.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0310f95d3..ce93ff0c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,7 +121,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { #[cfg(feature = "substrait")] fn setup_substrait_module(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { - let substrait = PyModule::new_bound(py, "substrait")?; + let substrait = PyModule::new(py, "substrait")?; substrait::init_module(&substrait)?; m.add_submodule(&substrait)?; Ok(()) diff --git a/src/substrait.rs b/src/substrait.rs index 8dcf3e8a7..1fefc0bbd 100644 --- a/src/substrait.rs +++ b/src/substrait.rs @@ -40,7 +40,7 @@ impl PyPlan { self.plan .encode(&mut proto_bytes) .map_err(PyDataFusionError::EncodeError)?; - Ok(PyBytes::new_bound(py, &proto_bytes).unbind().into()) + Ok(PyBytes::new(py, &proto_bytes).into()) } } @@ -95,7 +95,7 @@ impl PySubstraitSerializer { py: Python, ) -> PyDataFusionResult { let proto_bytes: Vec = wait_for_future(py, serializer::serialize_bytes(sql, &ctx.ctx))?; - Ok(PyBytes::new_bound(py, &proto_bytes).unbind().into()) + Ok(PyBytes::new(py, &proto_bytes).into()) } #[staticmethod] From 8224be40fede24ed15a463d2e6d141459b8d972f Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Wed, 19 Feb 2025 18:57:05 -0500 Subject: [PATCH 3/4] PyTuple was called twice --- src/udwf.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/udwf.rs b/src/udwf.rs index a0d557c07..defd9c522 100644 --- a/src/udwf.rs +++ b/src/udwf.rs @@ -101,10 +101,7 @@ impl PartitionEvaluator for RustPartitionEvaluator { .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), )?; let py_num_rows = num_rows.into_pyobject(py)?; - let py_args = PyTuple::new( - py, - PyTuple::new(py, vec![py_values.as_any(), &py_num_rows])?, - )?; + let py_args = PyTuple::new(py, vec![py_values.as_any(), &py_num_rows])?; self.evaluator .bind(py) @@ -125,14 +122,8 @@ impl PartitionEvaluator for RustPartitionEvaluator { .iter() .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), )?; - let range_tuple = PyTuple::new( - py, - vec![range.start.into_pyobject(py)?, range.end.into_pyobject(py)?], - )?; - let py_args = PyTuple::new( - py, - PyTuple::new(py, vec![py_values.as_any(), range_tuple.as_any()]), - )?; + let range_tuple = PyTuple::new(py, vec![range.start, range.end])?; + let py_args = PyTuple::new(py, vec![py_values.as_any(), range_tuple.as_any()])?; self.evaluator .bind(py) From ad375524d90d6d180d3b35175c8972568b4adff6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Wed, 19 Feb 2025 10:28:24 -0800 Subject: [PATCH 4/4] add -D warnings option --- .github/workflows/test.yaml | 2 +- .pre-commit-config.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c93d4c06f..c1d9ac838 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -71,7 +71,7 @@ jobs: - name: Run Clippy if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }} - run: cargo clippy --all-targets --all-features -- -D clippy::all -A clippy::redundant_closure + run: cargo clippy --all-targets --all-features -- -D clippy::all -D warnings -A clippy::redundant_closure - name: Install dependencies and build uses: astral-sh/setup-uv@v5 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e20fedf5c..b548ff18f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -40,7 +40,7 @@ repos: - id: rust-clippy name: Rust clippy description: Run cargo clippy on files included in the commit. clippy should be installed before-hand. - entry: cargo clippy --all-targets --all-features -- -Dclippy::all -Aclippy::redundant_closure + entry: cargo clippy --all-targets --all-features -- -Dclippy::all -D warnings -Aclippy::redundant_closure pass_filenames: false types: [file, rust] language: system