-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54182][SQL][PYTHON] Optimize non-arrow conversion of df.toPandas
#52897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cd50afd
26ad2cc
dd32ab3
55c2634
dc43392
5d7a578
6a7b7a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| from typing import ( | ||
| Any, | ||
| Callable, | ||
| Iterator, | ||
| List, | ||
| Optional, | ||
| Union, | ||
|
|
@@ -208,18 +209,20 @@ def toPandas(self) -> "PandasDataFrameLike": | |
|
|
||
| # Below is toPandas without Arrow optimization. | ||
| rows = self.collect() | ||
| if len(rows) > 0: | ||
| pdf = pd.DataFrame.from_records( | ||
| rows, index=range(len(rows)), columns=self.columns # type: ignore[arg-type] | ||
| ) | ||
| else: | ||
| pdf = pd.DataFrame(columns=self.columns) | ||
|
|
||
| if len(pdf.columns) > 0: | ||
| if len(self.columns) > 0: | ||
| timezone = sessionLocalTimeZone | ||
| struct_in_pandas = pandasStructHandlingMode | ||
|
|
||
| return pd.concat( | ||
| # Extract columns from rows and apply converters | ||
| if len(rows) > 0: | ||
| # Use iterator to avoid materializing intermediate data structure | ||
| columns_data: Iterator[Any] = iter(zip(*rows)) | ||
| else: | ||
| columns_data = iter([] for _ in self.schema.fields) | ||
|
|
||
| # Build DataFrame from columns | ||
| pdf = pd.concat( | ||
| [ | ||
| _create_converter_to_pandas( | ||
| field.dataType, | ||
|
|
@@ -230,13 +233,15 @@ def toPandas(self) -> "PandasDataFrameLike": | |
| ), | ||
| error_on_duplicated_field_names=False, | ||
| timestamp_utc_localized=False, | ||
| )(pser) | ||
| for (_, pser), field in zip(pdf.items(), self.schema.fields) | ||
| )(pd.Series(col_data, dtype=object)) | ||
| for col_data, field in zip(columns_data, self.schema.fields) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we avoid creating
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point, although I think it is a list of list references so memory won't be too large of a difference. I changed it to iterator. |
||
| ], | ||
| axis="columns", | ||
| axis=1, | ||
| keys=self.columns, | ||
| ) | ||
| else: | ||
| return pdf | ||
| else: | ||
| return pd.DataFrame(columns=[], index=range(len(rows))) | ||
|
|
||
| def toArrow(self) -> "pa.Table": | ||
| from pyspark.sql.dataframe import DataFrame | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is
dtype=objectnecessary?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is building a series to pass to
_create_converter_to_pandas, which will convert the series to the declared type infield.dataType. So I think the dtype here is optional and unnecessary. If we do not supplyobject, then when creating series it will start to infer the type, which will be quite slow.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also tried to use field.dataType explicitly but it would need some conversion to pandas data type, which is the purpose of
_create_converter_to_pandas. So I suggest we keep useobjectfor disabling type inference.