Skip to content

Chore/upgrade datafusion 44 #973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
783 changes: 408 additions & 375 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -38,11 +38,10 @@ tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread", "sync
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"]}
arrow = { version = "53", features = ["pyarrow"] }
datafusion = { version = "43.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "43.0.0", optional = true }
datafusion-proto = { version = "43.0.0" }
datafusion-ffi = { version = "43.0.0" }
datafusion-functions-window-common = { version = "43.0.0" }
datafusion = { version = "44.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "44.0.0", optional = true }
datafusion-proto = { version = "44.0.0" }
datafusion-ffi = { version = "44.0.0" }
prost = "0.13" # keep in line with `datafusion-substrait`
uuid = { version = "1.11", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ It is possible to configure runtime (memory and disk settings) and configuration

```python
runtime = (
RuntimeConfig()
RuntimeEnvBuilder()
.with_disk_manager_os()
.with_fair_spill_pool(10000000)
)
6 changes: 4 additions & 2 deletions benchmarks/db-benchmark/groupby-datafusion.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
from datafusion import (
col,
functions as f,
RuntimeConfig,
RuntimeEnvBuilder,
SessionConfig,
SessionContext,
)
@@ -85,7 +85,9 @@ def execute(df):

# create a session context with explicit runtime and config settings
runtime = (
RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(64 * 1024 * 1024 * 1024)
RuntimeEnvBuilder()
.with_disk_manager_os()
.with_fair_spill_pool(64 * 1024 * 1024 * 1024)
)
config = (
SessionConfig()
2 changes: 1 addition & 1 deletion benchmarks/tpch/tpch.py
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ def bench(data_path, query_path):

# create context
# runtime = (
# RuntimeConfig()
# RuntimeEnvBuilder()
# .with_disk_manager_os()
# .with_fair_spill_pool(10000000)
# )
8 changes: 4 additions & 4 deletions docs/source/user-guide/configuration.rst
Original file line number Diff line number Diff line change
@@ -19,18 +19,18 @@ Configuration
=============

Let's look at how we can configure DataFusion. When creating a :py:class:`~datafusion.context.SessionContext`, you can pass in
a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.context.RuntimeConfig` object. These two cover a wide range of options.
a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.context.RuntimeEnvBuilder` object. These two cover a wide range of options.

.. code-block:: python
from datafusion import RuntimeConfig, SessionConfig, SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig, SessionContext
# create a session context with default settings
ctx = SessionContext()
print(ctx)
# create a session context with explicit runtime and config settings
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
@@ -48,4 +48,4 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
and about :code:`RuntimeConfig` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeConfig.html>`_.
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.
4 changes: 2 additions & 2 deletions examples/create-context.py
Original file line number Diff line number Diff line change
@@ -15,14 +15,14 @@
# specific language governing permissions and limitations
# under the License.

from datafusion import RuntimeConfig, SessionConfig, SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig, SessionContext

# create a session context with default settings
ctx = SessionContext()
print(ctx)

# create a session context with explicit runtime and config settings
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
4 changes: 2 additions & 2 deletions examples/ffi-table-provider/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
datafusion = { version = "43.0.0" }
datafusion-ffi = { version = "43.0.0" }
datafusion = { version = "44.0.0" }
datafusion-ffi = { version = "44.0.0" }
pyo3 = { version = "0.22.6", features = ["extension-module", "abi3", "abi3-py38"] }
arrow = { version = "53.2.0" }
arrow-array = { version = "53.2.0" }
4 changes: 2 additions & 2 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@
from .context import (
SessionContext,
SessionConfig,
RuntimeConfig,
RuntimeEnvBuilder,
SQLOptions,
)

@@ -66,7 +66,7 @@
"SessionContext",
"SessionConfig",
"SQLOptions",
"RuntimeConfig",
"RuntimeEnvBuilder",
"Expr",
"ScalarUDF",
"WindowFrame",
55 changes: 33 additions & 22 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@
from __future__ import annotations

from ._internal import SessionConfig as SessionConfigInternal
from ._internal import RuntimeConfig as RuntimeConfigInternal
from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import SessionContext as SessionContextInternal

@@ -265,56 +265,58 @@ def set(self, key: str, value: str) -> SessionConfig:
return self


class RuntimeConfig:
class RuntimeEnvBuilder:
"""Runtime configuration options."""

def __init__(self) -> None:
"""Create a new :py:class:`RuntimeConfig` with default values."""
self.config_internal = RuntimeConfigInternal()
"""Create a new :py:class:`RuntimeEnvBuilder` with default values."""
self.config_internal = RuntimeEnvBuilderInternal()

def with_disk_manager_disabled(self) -> RuntimeConfig:
def with_disk_manager_disabled(self) -> RuntimeEnvBuilder:
"""Disable the disk manager, attempts to create temporary files will error.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_disk_manager_disabled()
return self

def with_disk_manager_os(self) -> RuntimeConfig:
def with_disk_manager_os(self) -> RuntimeEnvBuilder:
"""Use the operating system's temporary directory for disk manager.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_disk_manager_os()
return self

def with_disk_manager_specified(self, *paths: str | pathlib.Path) -> RuntimeConfig:
def with_disk_manager_specified(
self, *paths: str | pathlib.Path
) -> RuntimeEnvBuilder:
"""Use the specified paths for the disk manager's temporary files.
Args:
paths: Paths to use for the disk manager's temporary files.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
paths_list = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(
paths_list
)
return self

def with_unbounded_memory_pool(self) -> RuntimeConfig:
def with_unbounded_memory_pool(self) -> RuntimeEnvBuilder:
"""Use an unbounded memory pool.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_unbounded_memory_pool()
return self

def with_fair_spill_pool(self, size: int) -> RuntimeConfig:
def with_fair_spill_pool(self, size: int) -> RuntimeEnvBuilder:
"""Use a fair spill pool with the specified size.
This pool works best when you know beforehand the query has multiple spillable
@@ -335,16 +337,16 @@ def with_fair_spill_pool(self, size: int) -> RuntimeConfig:
size: Size of the memory pool in bytes.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
Examples usage::
config = RuntimeConfig().with_fair_spill_pool(1024)
config = RuntimeEnvBuilder().with_fair_spill_pool(1024)
"""
self.config_internal = self.config_internal.with_fair_spill_pool(size)
return self

def with_greedy_memory_pool(self, size: int) -> RuntimeConfig:
def with_greedy_memory_pool(self, size: int) -> RuntimeEnvBuilder:
"""Use a greedy memory pool with the specified size.
This pool works well for queries that do not need to spill or have a single
@@ -355,32 +357,39 @@ def with_greedy_memory_pool(self, size: int) -> RuntimeConfig:
size: Size of the memory pool in bytes.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
Example usage::
config = RuntimeConfig().with_greedy_memory_pool(1024)
config = RuntimeEnvBuilder().with_greedy_memory_pool(1024)
"""
self.config_internal = self.config_internal.with_greedy_memory_pool(size)
return self

def with_temp_file_path(self, path: str | pathlib.Path) -> RuntimeConfig:
def with_temp_file_path(self, path: str | pathlib.Path) -> RuntimeEnvBuilder:
"""Use the specified path to create any needed temporary files.
Args:
path: Path to use for temporary files.
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
Example usage::
config = RuntimeConfig().with_temp_file_path("/tmp")
config = RuntimeEnvBuilder().with_temp_file_path("/tmp")
"""
self.config_internal = self.config_internal.with_temp_file_path(str(path))
return self


@deprecated("Use `RuntimeEnvBuilder` instead.")
class RuntimeConfig(RuntimeEnvBuilder):
"""See `RuntimeEnvBuilder`."""

pass


class SQLOptions:
"""Options to be used when performing SQL queries."""

@@ -454,7 +463,9 @@ class SessionContext:
"""

def __init__(
self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None
self,
config: SessionConfig | None = None,
runtime: RuntimeEnvBuilder | None = None,
) -> None:
"""Main interface for executing queries with DataFusion.
10 changes: 5 additions & 5 deletions python/tests/test_context.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@

from datafusion import (
DataFrame,
RuntimeConfig,
RuntimeEnvBuilder,
SessionConfig,
SessionContext,
SQLOptions,
@@ -43,7 +43,7 @@ def test_create_context_session_config_only():


def test_create_context_runtime_config_only():
SessionContext(runtime=RuntimeConfig())
SessionContext(runtime=RuntimeEnvBuilder())


@pytest.mark.parametrize("path_to_str", (True, False))
@@ -54,7 +54,7 @@ def test_runtime_configs(tmp_path, path_to_str):
path1 = str(path1) if path_to_str else path1
path2 = str(path2) if path_to_str else path2

runtime = RuntimeConfig().with_disk_manager_specified(path1, path2)
runtime = RuntimeEnvBuilder().with_disk_manager_specified(path1, path2)
config = SessionConfig().with_default_catalog_and_schema("foo", "bar")
ctx = SessionContext(config, runtime)
assert ctx is not None
@@ -67,7 +67,7 @@ def test_runtime_configs(tmp_path, path_to_str):
def test_temporary_files(tmp_path, path_to_str):
path = str(tmp_path) if path_to_str else tmp_path

runtime = RuntimeConfig().with_temp_file_path(path)
runtime = RuntimeEnvBuilder().with_temp_file_path(path)
config = SessionConfig().with_default_catalog_and_schema("foo", "bar")
ctx = SessionContext(config, runtime)
assert ctx is not None
@@ -77,7 +77,7 @@ def test_temporary_files(tmp_path, path_to_str):


def test_create_context_with_all_valid_args():
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
18 changes: 12 additions & 6 deletions python/tests/test_functions.py
Original file line number Diff line number Diff line change
@@ -103,8 +103,11 @@ def test_lit_arith(df):
result = df.collect()
assert len(result) == 1
result = result[0]

assert result.column(0) == pa.array([5, 6, 7])
assert result.column(1) == pa.array(["Hello!", "World!", "!!"])
assert result.column(1) == pa.array(
["Hello!", "World!", "!!"], type=pa.string_view()
)


def test_math_functions():
@@ -661,9 +664,12 @@ def test_array_function_obj_tests(stmt, py_expr):
),
(
f.concat(column("a").cast(pa.string()), literal("?")),
pa.array(["Hello?", "World?", "!?"]),
pa.array(["Hello?", "World?", "!?"], type=pa.string_view()),
),
(
f.initcap(column("c")),
pa.array(["Hello ", " World ", " !"], type=pa.string_view()),
),
(f.initcap(column("c")), pa.array(["Hello ", " World ", " !"])),
(f.left(column("a"), literal(3)), pa.array(["Hel", "Wor", "!"])),
(f.length(column("c")), pa.array([6, 7, 2], type=pa.int32())),
(f.lower(column("a")), pa.array(["hello", "world", "!"])),
@@ -871,8 +877,8 @@ def test_temporal_functions(df):
result = df.collect()
assert len(result) == 1
result = result[0]
assert result.column(0) == pa.array([12, 6, 7], type=pa.float64())
assert result.column(1) == pa.array([2022, 2027, 2020], type=pa.float64())
assert result.column(0) == pa.array([12, 6, 7], type=pa.int32())
assert result.column(1) == pa.array([2022, 2027, 2020], type=pa.int32())
assert result.column(2) == pa.array(
[datetime(2022, 12, 1), datetime(2027, 6, 1), datetime(2020, 7, 1)],
type=pa.timestamp("us"),
@@ -904,7 +910,7 @@ def test_temporal_functions(df):
assert result.column(9) == pa.array(
[datetime(2023, 9, 7, 5, 6, 14, 523952)] * 3, type=pa.timestamp("us")
)
assert result.column(10) == pa.array([31, 26, 2], type=pa.float64())
assert result.column(10) == pa.array([31, 26, 2], type=pa.int32())


def test_arrow_cast(df):
64 changes: 32 additions & 32 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ use datafusion::execution::context::{
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, UnboundedMemoryPool};
use datafusion::execution::options::ReadOptions;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, DataFrame, NdJsonReadOptions, ParquetReadOptions,
@@ -165,62 +165,62 @@ impl PySessionConfig {
}

/// Runtime options for a SessionContext
#[pyclass(name = "RuntimeConfig", module = "datafusion", subclass)]
#[pyclass(name = "RuntimeEnvBuilder", module = "datafusion", subclass)]
#[derive(Clone)]
pub struct PyRuntimeConfig {
pub config: RuntimeConfig,
pub struct PyRuntimeEnvBuilder {
pub builder: RuntimeEnvBuilder,
}

#[pymethods]
impl PyRuntimeConfig {
impl PyRuntimeEnvBuilder {
#[new]
fn new() -> Self {
Self {
config: RuntimeConfig::default(),
builder: RuntimeEnvBuilder::default(),
}
}

fn with_disk_manager_disabled(&self) -> Self {
let config = self.config.clone();
let config = config.with_disk_manager(DiskManagerConfig::Disabled);
Self { config }
let mut builder = self.builder.clone();
builder = builder.with_disk_manager(DiskManagerConfig::Disabled);
Self { builder }
}

fn with_disk_manager_os(&self) -> Self {
let config = self.config.clone();
let config = config.with_disk_manager(DiskManagerConfig::NewOs);
Self { config }
let builder = self.builder.clone();
let builder = builder.with_disk_manager(DiskManagerConfig::NewOs);
Self { builder }
}

fn with_disk_manager_specified(&self, paths: Vec<String>) -> Self {
let config = self.config.clone();
let builder = self.builder.clone();
let paths = paths.iter().map(|s| s.into()).collect();
let config = config.with_disk_manager(DiskManagerConfig::NewSpecified(paths));
Self { config }
let builder = builder.with_disk_manager(DiskManagerConfig::NewSpecified(paths));
Self { builder }
}

fn with_unbounded_memory_pool(&self) -> Self {
let config = self.config.clone();
let config = config.with_memory_pool(Arc::new(UnboundedMemoryPool::default()));
Self { config }
let builder = self.builder.clone();
let builder = builder.with_memory_pool(Arc::new(UnboundedMemoryPool::default()));
Self { builder }
}

fn with_fair_spill_pool(&self, size: usize) -> Self {
let config = self.config.clone();
let config = config.with_memory_pool(Arc::new(FairSpillPool::new(size)));
Self { config }
let builder = self.builder.clone();
let builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(size)));
Self { builder }
}

fn with_greedy_memory_pool(&self, size: usize) -> Self {
let config = self.config.clone();
let config = config.with_memory_pool(Arc::new(GreedyMemoryPool::new(size)));
Self { config }
let builder = self.builder.clone();
let builder = builder.with_memory_pool(Arc::new(GreedyMemoryPool::new(size)));
Self { builder }
}

fn with_temp_file_path(&self, path: &str) -> Self {
let config = self.config.clone();
let config = config.with_temp_file_path(path);
Self { config }
let builder = self.builder.clone();
let builder = builder.with_temp_file_path(path);
Self { builder }
}
}

@@ -276,19 +276,19 @@ impl PySessionContext {
#[new]
pub fn new(
config: Option<PySessionConfig>,
runtime: Option<PyRuntimeConfig>,
runtime: Option<PyRuntimeEnvBuilder>,
) -> PyResult<Self> {
let config = if let Some(c) = config {
c.config
} else {
SessionConfig::default().with_information_schema(true)
};
let runtime_config = if let Some(c) = runtime {
c.config
let runtime_env_builder = if let Some(c) = runtime {
c.builder
} else {
RuntimeConfig::default()
RuntimeEnvBuilder::default()
};
let runtime = Arc::new(RuntimeEnv::try_new(runtime_config)?);
let runtime = Arc::new(runtime_env_builder.build()?);
let session_state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
20 changes: 13 additions & 7 deletions src/dataset_exec.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
/// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow Dataset
/// This actually performs the projection, filtering and scanning of a Dataset
use pyo3::prelude::*;
@@ -34,11 +35,11 @@ use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFRes
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datafusion::physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
SendableRecordBatchStream, Statistics,
};

use crate::errors::DataFusionError;
@@ -136,7 +137,8 @@ impl DatasetExec {
let plan_properties = datafusion::physical_plan::PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(fragments.len()),
ExecutionMode::Bounded,
EmissionType::Final,
Boundedness::Bounded,
);

Ok(DatasetExec {
@@ -251,12 +253,16 @@ impl ExecutionPlanProperties for DatasetExec {
self.plan_properties.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
fn output_ordering(&self) -> Option<&LexOrdering> {
None
}

fn execution_mode(&self) -> datafusion::physical_plan::ExecutionMode {
self.plan_properties.execution_mode
fn boundedness(&self) -> Boundedness {
self.plan_properties.boundedness
}

fn pipeline_behavior(&self) -> EmissionType {
self.plan_properties.emission_type
}

fn equivalence_properties(&self) -> &datafusion::physical_expr::EquivalenceProperties {
11 changes: 1 addition & 10 deletions src/functions.rs
Original file line number Diff line number Diff line change
@@ -36,10 +36,7 @@ use datafusion::functions_aggregate;
use datafusion::functions_window;
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
use datafusion::logical_expr::{
expr::{find_df_window_func, WindowFunction},
lit, Expr, WindowFunctionDefinition,
};
use datafusion::logical_expr::{expr::WindowFunction, lit, Expr, WindowFunctionDefinition};

fn add_builder_fns_to_aggregate(
agg_fn: Expr,
@@ -232,12 +229,6 @@ fn when(when: PyExpr, then: PyExpr) -> PyResult<PyCaseBuilder> {
///
/// NOTE: we search the built-ins first because the `UDAF` versions currently do not have the same behavior.
fn find_window_fn(name: &str, ctx: Option<PySessionContext>) -> PyResult<WindowFunctionDefinition> {
// search built in window functions (soon to be deprecated)
let df_window_func = find_df_window_func(name);
if let Some(df_window_func) = df_window_func {
return Ok(df_window_func);
}

if let Some(ctx) = ctx {
// search UDAFs
let udaf = ctx
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<catalog::PyCatalog>()?;
m.add_class::<catalog::PyDatabase>()?;
m.add_class::<catalog::PyTable>()?;
m.add_class::<context::PyRuntimeConfig>()?;
m.add_class::<context::PyRuntimeEnvBuilder>()?;
m.add_class::<context::PySessionConfig>()?;
m.add_class::<context::PySessionContext>()?;
m.add_class::<context::PySQLOptions>()?;
2 changes: 0 additions & 2 deletions src/sql/logical.rs
Original file line number Diff line number Diff line change
@@ -85,12 +85,10 @@ impl PyLogicalPlan {
| LogicalPlan::Union(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Prepare(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Execute(_)
| LogicalPlan::RecursiveQuery(_) => Err(py_unsupported_variant_err(format!(
"Conversion of variant not implemented: {:?}",
self.plan
6 changes: 4 additions & 2 deletions src/substrait.rs
Original file line number Diff line number Diff line change
@@ -114,7 +114,8 @@ impl PySubstraitProducer {
/// Convert DataFusion LogicalPlan to Substrait Plan
#[staticmethod]
pub fn to_substrait_plan(plan: PyLogicalPlan, ctx: &PySessionContext) -> PyResult<PyPlan> {
match producer::to_substrait_plan(&plan.plan, &ctx.ctx) {
let session_state = ctx.ctx.state();
match producer::to_substrait_plan(&plan.plan, &session_state) {
Ok(plan) => Ok(PyPlan { plan: *plan }),
Err(e) => Err(py_datafusion_err(e)),
}
@@ -134,7 +135,8 @@ impl PySubstraitConsumer {
plan: PyPlan,
py: Python,
) -> PyResult<PyLogicalPlan> {
let result = consumer::from_substrait_plan(&ctx.ctx, &plan.plan);
let session_state = ctx.ctx.state();
let result = consumer::from_substrait_plan(&session_state, &plan.plan);
let logical_plan = wait_for_future(py, result).map_err(DataFusionError::from)?;
Ok(PyLogicalPlan::new(logical_plan))
}
6 changes: 0 additions & 6 deletions src/udwf.rs
Original file line number Diff line number Diff line change
@@ -22,9 +22,7 @@ use std::sync::Arc;
use arrow::array::{make_array, Array, ArrayData, ArrayRef};
use datafusion::logical_expr::function::{PartitionEvaluatorArgs, WindowUDFFieldArgs};
use datafusion::logical_expr::window_state::WindowAggState;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::scalar::ScalarValue;
use datafusion_functions_window_common::expr::ExpressionArgs;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;

@@ -319,8 +317,4 @@ impl WindowUDFImpl for MultiColumnWindowUDF {
let _ = _partition_evaluator_args;
(self.partition_evaluator_factory)()
}

fn expressions(&self, expr_args: ExpressionArgs) -> Vec<Arc<dyn PhysicalExpr>> {
expr_args.input_exprs().into()
}
}