-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: move pyspark tests into main test suite #1761
Changes from all commits
3aac922
16162af
789a05c
2934687
582081e
87d7ea5
71be730
b605ed3
b5484fd
f72fcc7
5ae3fee
387e089
1bd6ffc
0fbeb17
e41cea5
e4c8281
d56b995
1aba20d
931993e
2dd890a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -164,11 +164,16 @@ filterwarnings = [ | |
'ignore:.*Passing a BlockManager to DataFrame:DeprecationWarning', | ||
# This warning was temporarily raised by Polars but then reverted. | ||
'ignore:.*The default coalesce behavior of left join will change:DeprecationWarning', | ||
'ignore: unclosed <socket.socket', | ||
'ignore:.*The distutils package is deprecated and slated for removal in Python 3.12:DeprecationWarning:pyspark', | ||
'ignore:.*distutils Version classes are deprecated. Use packaging.version instead.*:DeprecationWarning:pyspark', | ||
Comment on lines
+168
to
+169
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @MarcoGorelli I moved these back to pyproject.toml, yet targeting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL nice! |
||
|
||
] | ||
xfail_strict = true | ||
markers = ["slow: marks tests as slow (deselect with '-m \"not slow\"')"] | ||
env = [ | ||
"MODIN_ENGINE=python", | ||
"PYARROW_IGNORE_TIMEZONE=1" | ||
] | ||
|
||
[tool.coverage.run] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
from __future__ import annotations | ||
|
||
import os | ||
import sys | ||
from typing import TYPE_CHECKING | ||
from typing import Any | ||
from typing import Callable | ||
from typing import Generator | ||
from typing import Sequence | ||
|
||
import pandas as pd | ||
|
@@ -14,7 +14,6 @@ | |
|
||
if TYPE_CHECKING: | ||
import duckdb | ||
from pyspark.sql import SparkSession | ||
|
||
from narwhals.typing import IntoDataFrame | ||
from narwhals.typing import IntoFrame | ||
|
@@ -129,23 +128,23 @@ def pyarrow_table_constructor(obj: Any) -> IntoDataFrame: | |
return pa.table(obj) # type: ignore[no-any-return] | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def spark_session() -> Generator[SparkSession, None, None]: # pragma: no cover | ||
def pyspark_lazy_constructor() -> Callable[[Any], IntoFrame]: # pragma: no cover | ||
try: | ||
from pyspark.sql import SparkSession | ||
except ImportError: # pragma: no cover | ||
pytest.skip("pyspark is not installed") | ||
return | ||
return None | ||
|
||
import warnings | ||
from atexit import register | ||
|
||
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" | ||
with warnings.catch_warnings(): | ||
# The spark session seems to trigger a polars warning. | ||
# Polars is imported in the tests, but not used in the spark operations | ||
warnings.filterwarnings( | ||
"ignore", r"Using fork\(\) can cause Polars", category=RuntimeWarning | ||
) | ||
|
||
session = ( | ||
SparkSession.builder.appName("unit-tests") | ||
.master("local[1]") | ||
|
@@ -155,8 +154,26 @@ def spark_session() -> Generator[SparkSession, None, None]: # pragma: no cover | |
.config("spark.sql.shuffle.partitions", "2") | ||
.getOrCreate() | ||
) | ||
yield session | ||
session.stop() | ||
|
||
register(session.stop) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL |
||
|
||
def _constructor(obj: Any) -> IntoFrame: | ||
with warnings.catch_warnings(): | ||
warnings.filterwarnings( | ||
"ignore", | ||
r".*is_datetime64tz_dtype is deprecated and will be removed in a future version.*", | ||
module="pyspark", | ||
category=DeprecationWarning, | ||
) | ||
pd_df = pd.DataFrame(obj).replace({float("nan"): None}).reset_index() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the objects that come into these constructors are (always?) dictionaries I think we can skip the trip through pandas and construct from a built-in Python object that spark knows how to ingest directly (list of dictionaries). Could be overly cautions, but Spark may infer data types differently if it is handed a pandas DataFrame rather than lists of Python objects. Since pyspark supports a list of records we could convert dict β list of dicts like so if isinstance(obj, dict):
obj = [{k: v for k, v in zip(obj, row)} for row in zip(*obj.values())] Or could pass in the rows & schema separately if isinstance(obj, dict):
df = ...createDataFrame([*zip(*obj.values())], schema=[*obj.keys()]) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I remember having issues with some tests, where we may need to specify the schema with column type. (but I don't remember exactly what was the problem) But if we can skip pandas here, it would be πππ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had the same thought when migrating the codebase, yet I can confirm the data type being an issue for a subset of the tests. I would say to keep it like this for now and eventually address it |
||
return ( # type: ignore[no-any-return] | ||
session.createDataFrame(pd_df) | ||
.repartition(2) | ||
.orderBy("index") | ||
.drop("index") | ||
) | ||
|
||
return _constructor | ||
|
||
|
||
EAGER_CONSTRUCTORS: dict[str, Callable[[Any], IntoDataFrame]] = { | ||
|
@@ -173,6 +190,7 @@ def spark_session() -> Generator[SparkSession, None, None]: # pragma: no cover | |
"dask": dask_lazy_p2_constructor, | ||
"polars[lazy]": polars_lazy_constructor, | ||
"duckdb": duckdb_lazy_constructor, | ||
"pyspark": pyspark_lazy_constructor, # type: ignore[dict-item] | ||
} | ||
GPU_CONSTRUCTORS: dict[str, Callable[[Any], IntoFrame]] = {"cudf": cudf_constructor} | ||
|
||
|
@@ -201,7 +219,13 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: | |
constructors.append(EAGER_CONSTRUCTORS[constructor]) | ||
constructors_ids.append(constructor) | ||
elif constructor in LAZY_CONSTRUCTORS: | ||
constructors.append(LAZY_CONSTRUCTORS[constructor]) | ||
if constructor == "pyspark": | ||
if sys.version_info < (3, 12): # pragma: no cover | ||
constructors.append(pyspark_lazy_constructor()) | ||
else: # pragma: no cover | ||
continue | ||
else: | ||
constructors.append(LAZY_CONSTRUCTORS[constructor]) | ||
constructors_ids.append(constructor) | ||
else: # pragma: no cover | ||
msg = f"Expected one of {EAGER_CONSTRUCTORS.keys()} or {LAZY_CONSTRUCTORS.keys()}, got {constructor}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to implement
Expr.__eq__
to get this to work. It overlaps with @EdAbati PR