Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/upgrade datafusion 44 #973

Merged
merged 12 commits into from
Jan 9, 2025
Merged
783 changes: 408 additions & 375 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ tokio = { version = "1.41", features = ["macros", "rt", "rt-multi-thread", "sync
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
pyo3-async-runtimes = { version = "0.22", features = ["tokio-runtime"]}
arrow = { version = "53", features = ["pyarrow"] }
datafusion = { version = "43.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "43.0.0", optional = true }
datafusion-proto = { version = "43.0.0" }
datafusion-ffi = { version = "43.0.0" }
datafusion-functions-window-common = { version = "43.0.0" }
datafusion = { version = "44.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "44.0.0", optional = true }
datafusion-proto = { version = "44.0.0" }
datafusion-ffi = { version = "44.0.0" }
prost = "0.13" # keep in line with `datafusion-substrait`
uuid = { version = "1.11", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ It is possible to configure runtime (memory and disk settings) and configuration

```python
runtime = (
RuntimeConfig()
RuntimeEnvBuilder()
.with_disk_manager_os()
.with_fair_spill_pool(10000000)
)
Expand Down
6 changes: 4 additions & 2 deletions benchmarks/db-benchmark/groupby-datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datafusion import (
col,
functions as f,
RuntimeConfig,
RuntimeEnvBuilder,
SessionConfig,
SessionContext,
)
Expand Down Expand Up @@ -85,7 +85,9 @@ def execute(df):

# create a session context with explicit runtime and config settings
runtime = (
RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(64 * 1024 * 1024 * 1024)
RuntimeEnvBuilder()
.with_disk_manager_os()
.with_fair_spill_pool(64 * 1024 * 1024 * 1024)
)
config = (
SessionConfig()
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpch/tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def bench(data_path, query_path):

# create context
# runtime = (
# RuntimeConfig()
# RuntimeEnvBuilder()
# .with_disk_manager_os()
# .with_fair_spill_pool(10000000)
# )
Expand Down
8 changes: 4 additions & 4 deletions docs/source/user-guide/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ Configuration
=============

Let's look at how we can configure DataFusion. When creating a :py:class:`~datafusion.context.SessionContext`, you can pass in
a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.context.RuntimeConfig` object. These two cover a wide range of options.
a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.context.RuntimeEnvBuilder` object. These two cover a wide range of options.

.. code-block:: python

from datafusion import RuntimeConfig, SessionConfig, SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig, SessionContext

# create a session context with default settings
ctx = SessionContext()
print(ctx)

# create a session context with explicit runtime and config settings
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
Expand All @@ -48,4 +48,4 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte


You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
and about :code:`RuntimeConfig` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeConfig.html>`_.
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.
4 changes: 2 additions & 2 deletions examples/create-context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
# specific language governing permissions and limitations
# under the License.

from datafusion import RuntimeConfig, SessionConfig, SessionContext
from datafusion import RuntimeEnvBuilder, SessionConfig, SessionContext

# create a session context with default settings
ctx = SessionContext()
print(ctx)

# create a session context with explicit runtime and config settings
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
Expand Down
4 changes: 2 additions & 2 deletions examples/ffi-table-provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
datafusion = { version = "43.0.0" }
datafusion-ffi = { version = "43.0.0" }
datafusion = { version = "44.0.0" }
datafusion-ffi = { version = "44.0.0" }
pyo3 = { version = "0.22.6", features = ["extension-module", "abi3", "abi3-py38"] }
arrow = { version = "53.2.0" }
arrow-array = { version = "53.2.0" }
Expand Down
4 changes: 2 additions & 2 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from .context import (
SessionContext,
SessionConfig,
RuntimeConfig,
RuntimeEnvBuilder,
SQLOptions,
)

Expand Down Expand Up @@ -66,7 +66,7 @@
"SessionContext",
"SessionConfig",
"SQLOptions",
"RuntimeConfig",
"RuntimeEnvBuilder",
"Expr",
"ScalarUDF",
"WindowFrame",
Expand Down
55 changes: 33 additions & 22 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from __future__ import annotations

from ._internal import SessionConfig as SessionConfigInternal
from ._internal import RuntimeConfig as RuntimeConfigInternal
from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import SessionContext as SessionContextInternal

Expand Down Expand Up @@ -265,56 +265,58 @@ def set(self, key: str, value: str) -> SessionConfig:
return self


class RuntimeConfig:
class RuntimeEnvBuilder:
"""Runtime configuration options."""

def __init__(self) -> None:
"""Create a new :py:class:`RuntimeConfig` with default values."""
self.config_internal = RuntimeConfigInternal()
"""Create a new :py:class:`RuntimeEnvBuilder` with default values."""
self.config_internal = RuntimeEnvBuilderInternal()

def with_disk_manager_disabled(self) -> RuntimeConfig:
def with_disk_manager_disabled(self) -> RuntimeEnvBuilder:
"""Disable the disk manager, attempts to create temporary files will error.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_disk_manager_disabled()
return self

def with_disk_manager_os(self) -> RuntimeConfig:
def with_disk_manager_os(self) -> RuntimeEnvBuilder:
"""Use the operating system's temporary directory for disk manager.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_disk_manager_os()
return self

def with_disk_manager_specified(self, *paths: str | pathlib.Path) -> RuntimeConfig:
def with_disk_manager_specified(
self, *paths: str | pathlib.Path
) -> RuntimeEnvBuilder:
"""Use the specified paths for the disk manager's temporary files.

Args:
paths: Paths to use for the disk manager's temporary files.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
paths_list = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(
paths_list
)
return self

def with_unbounded_memory_pool(self) -> RuntimeConfig:
def with_unbounded_memory_pool(self) -> RuntimeEnvBuilder:
"""Use an unbounded memory pool.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.
"""
self.config_internal = self.config_internal.with_unbounded_memory_pool()
return self

def with_fair_spill_pool(self, size: int) -> RuntimeConfig:
def with_fair_spill_pool(self, size: int) -> RuntimeEnvBuilder:
"""Use a fair spill pool with the specified size.

This pool works best when you know beforehand the query has multiple spillable
Expand All @@ -335,16 +337,16 @@ def with_fair_spill_pool(self, size: int) -> RuntimeConfig:
size: Size of the memory pool in bytes.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.

Examples usage::

config = RuntimeConfig().with_fair_spill_pool(1024)
config = RuntimeEnvBuilder().with_fair_spill_pool(1024)
"""
self.config_internal = self.config_internal.with_fair_spill_pool(size)
return self

def with_greedy_memory_pool(self, size: int) -> RuntimeConfig:
def with_greedy_memory_pool(self, size: int) -> RuntimeEnvBuilder:
"""Use a greedy memory pool with the specified size.

This pool works well for queries that do not need to spill or have a single
Expand All @@ -355,32 +357,39 @@ def with_greedy_memory_pool(self, size: int) -> RuntimeConfig:
size: Size of the memory pool in bytes.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.

Example usage::

config = RuntimeConfig().with_greedy_memory_pool(1024)
config = RuntimeEnvBuilder().with_greedy_memory_pool(1024)
"""
self.config_internal = self.config_internal.with_greedy_memory_pool(size)
return self

def with_temp_file_path(self, path: str | pathlib.Path) -> RuntimeConfig:
def with_temp_file_path(self, path: str | pathlib.Path) -> RuntimeEnvBuilder:
"""Use the specified path to create any needed temporary files.

Args:
path: Path to use for temporary files.

Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
A new :py:class:`RuntimeEnvBuilder` object with the updated setting.

Example usage::

config = RuntimeConfig().with_temp_file_path("/tmp")
config = RuntimeEnvBuilder().with_temp_file_path("/tmp")
"""
self.config_internal = self.config_internal.with_temp_file_path(str(path))
return self


@deprecated("Use `RuntimeEnvBuilder` instead.")
class RuntimeConfig(RuntimeEnvBuilder):
"""See `RuntimeEnvBuilder`."""

pass


class SQLOptions:
"""Options to be used when performing SQL queries."""

Expand Down Expand Up @@ -454,7 +463,9 @@ class SessionContext:
"""

def __init__(
self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None
self,
config: SessionConfig | None = None,
runtime: RuntimeEnvBuilder | None = None,
) -> None:
"""Main interface for executing queries with DataFusion.

Expand Down
10 changes: 5 additions & 5 deletions python/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from datafusion import (
DataFrame,
RuntimeConfig,
RuntimeEnvBuilder,
SessionConfig,
SessionContext,
SQLOptions,
Expand All @@ -43,7 +43,7 @@ def test_create_context_session_config_only():


def test_create_context_runtime_config_only():
SessionContext(runtime=RuntimeConfig())
SessionContext(runtime=RuntimeEnvBuilder())


@pytest.mark.parametrize("path_to_str", (True, False))
Expand All @@ -54,7 +54,7 @@ def test_runtime_configs(tmp_path, path_to_str):
path1 = str(path1) if path_to_str else path1
path2 = str(path2) if path_to_str else path2

runtime = RuntimeConfig().with_disk_manager_specified(path1, path2)
runtime = RuntimeEnvBuilder().with_disk_manager_specified(path1, path2)
config = SessionConfig().with_default_catalog_and_schema("foo", "bar")
ctx = SessionContext(config, runtime)
assert ctx is not None
Expand All @@ -67,7 +67,7 @@ def test_runtime_configs(tmp_path, path_to_str):
def test_temporary_files(tmp_path, path_to_str):
path = str(tmp_path) if path_to_str else tmp_path

runtime = RuntimeConfig().with_temp_file_path(path)
runtime = RuntimeEnvBuilder().with_temp_file_path(path)
config = SessionConfig().with_default_catalog_and_schema("foo", "bar")
ctx = SessionContext(config, runtime)
assert ctx is not None
Expand All @@ -77,7 +77,7 @@ def test_temporary_files(tmp_path, path_to_str):


def test_create_context_with_all_valid_args():
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
runtime = RuntimeEnvBuilder().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
Expand Down
18 changes: 12 additions & 6 deletions python/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ def test_lit_arith(df):
result = df.collect()
assert len(result) == 1
result = result[0]

assert result.column(0) == pa.array([5, 6, 7])
assert result.column(1) == pa.array(["Hello!", "World!", "!!"])
assert result.column(1) == pa.array(
["Hello!", "World!", "!!"], type=pa.string_view()
)


def test_math_functions():
Expand Down Expand Up @@ -661,9 +664,12 @@ def test_array_function_obj_tests(stmt, py_expr):
),
(
f.concat(column("a").cast(pa.string()), literal("?")),
pa.array(["Hello?", "World?", "!?"]),
pa.array(["Hello?", "World?", "!?"], type=pa.string_view()),
),
(
f.initcap(column("c")),
pa.array(["Hello ", " World ", " !"], type=pa.string_view()),
),
(f.initcap(column("c")), pa.array(["Hello ", " World ", " !"])),
(f.left(column("a"), literal(3)), pa.array(["Hel", "Wor", "!"])),
(f.length(column("c")), pa.array([6, 7, 2], type=pa.int32())),
(f.lower(column("a")), pa.array(["hello", "world", "!"])),
Expand Down Expand Up @@ -871,8 +877,8 @@ def test_temporal_functions(df):
result = df.collect()
assert len(result) == 1
result = result[0]
assert result.column(0) == pa.array([12, 6, 7], type=pa.float64())
assert result.column(1) == pa.array([2022, 2027, 2020], type=pa.float64())
assert result.column(0) == pa.array([12, 6, 7], type=pa.int32())
assert result.column(1) == pa.array([2022, 2027, 2020], type=pa.int32())
assert result.column(2) == pa.array(
[datetime(2022, 12, 1), datetime(2027, 6, 1), datetime(2020, 7, 1)],
type=pa.timestamp("us"),
Expand Down Expand Up @@ -904,7 +910,7 @@ def test_temporal_functions(df):
assert result.column(9) == pa.array(
[datetime(2023, 9, 7, 5, 6, 14, 523952)] * 3, type=pa.timestamp("us")
)
assert result.column(10) == pa.array([31, 26, 2], type=pa.float64())
assert result.column(10) == pa.array([31, 26, 2], type=pa.int32())


def test_arrow_cast(df):
Expand Down
Loading
Loading