From 928b05c3276dc6aada5d458a92a7281a0d71da3a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 2 Nov 2025 22:42:18 -0500 Subject: [PATCH 1/9] Add profiling support to iterator based udfs --- python/pyspark/sql/tests/test_udf_profiler.py | 51 +++++++++++++------ python/pyspark/worker.py | 50 +++++++++++++++--- .../python/UserDefinedPythonDataSource.scala | 3 +- .../python/MapInBatchEvaluatorFactory.scala | 5 +- .../sql/execution/python/MapInBatchExec.scala | 3 +- 5 files changed, 85 insertions(+), 27 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 4e8f722c22cbd..3730bec5e9f5b 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -325,12 +325,13 @@ def add2(x): not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), ) - def test_perf_profiler_pandas_udf_iterator_not_supported(self): + def test_perf_profiler_pandas_udf_iterator(self): import pandas as pd @pandas_udf("long") - def add1(x): - return x + 1 + def add1(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: + for s in iter: + yield s + 1 @pandas_udf("long") def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: @@ -339,22 +340,23 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): df = self.spark.range(10, numPartitions=2).select( - add1("id"), add2("id"), add1("id"), add2(col("id") + 1) + add1("id"), add2("id"), add1("id") ) df.collect() - self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(2, len(self.profile_results), str(self.profile_results.keys())) for id in self.profile_results: - self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2) + self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4) @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) - def test_perf_profiler_arrow_udf_iterator_not_supported(self): + def test_perf_profiler_arrow_udf_iterator(self): import pyarrow as pa @arrow_udf("long") - def add1(x): - return pa.compute.add(x, 1) + def add1(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: + for s in iter: + yield pa.compute.add(s, 1) @arrow_udf("long") def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: @@ -363,21 +365,21 @@ def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): df = self.spark.range(10, numPartitions=2).select( - add1("id"), add2("id"), add1("id"), add2(col("id") + 1) + add1("id"), add2("id"), add1("id") ) df.collect() - self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(2, len(self.profile_results), str(self.profile_results.keys())) for id in self.profile_results: - self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2) + self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4) @unittest.skipIf( not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), ) - def test_perf_profiler_map_in_pandas_not_supported(self): - df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) + def test_perf_profiler_map_in_pandas(self): + df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")).repartition(1) def filter_func(iterator): for pdf in iterator: @@ -386,7 +388,26 @@ def filter_func(iterator): with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): df.mapInPandas(filter_func, df.schema).show() - self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2) + + @unittest.skipIf(not have_pyarrow, pyarrow_requirement_message) + def test_perf_profiler_map_in_arrow(self): + import pyarrow as pa + + df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")).repartition(1) + + def map_func(iterator: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]: + for batch in iterator: + yield pa.RecordBatch.from_arrays([batch.column("id"), pa.compute.add(batch.column("age"), 1)], ["id", "age"]) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + df.mapInArrow(map_func, df.schema).show() + + for id in self.profile_results: + self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2) @unittest.skipIf( not have_pandas or not have_pyarrow, diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 09c6a40a33db9..2eac797f65a44 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1158,13 +1158,15 @@ def func(*args): return f, args_offsets -def _supports_profiler(eval_type: int) -> bool: - return eval_type not in ( +def _is_iter_based(eval_type: int) -> bool: + return eval_type in ( PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF, PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, PythonEvalType.SQL_MAP_ARROW_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE, + PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF, ) @@ -1192,6 +1194,36 @@ def profiling_func(*args, **kwargs): return profiling_func +def wrap_iter_perf_profiler(f, result_id): + import cProfile + import pstats + + from pyspark.sql.profiler import ProfileResultsParam + + accumulator = _deserialize_accumulator( + SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + ) + + def profiling_func(*args, **kwargs): + iterator = iter(f(*args, **kwargs)) + pr = cProfile.Profile() + while True: + try: + with pr: + item = next(iterator) + yield item + except StopIteration: + break + + st = pstats.Stats(pr) + st.stream = None # make it picklable + st.strip_dirs() + + accumulator.add({result_id: (st, None)}) + + return profiling_func + + def wrap_memory_profiler(f, result_id): from pyspark.sql.profiler import ProfileResultsParam from pyspark.profiler import UDFLineProfilerV2 @@ -1254,17 +1286,19 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profil if profiler == "perf": result_id = read_long(infile) - if _supports_profiler(eval_type): - profiling_func = wrap_perf_profiler(chained_func, result_id) + if _is_iter_based(eval_type): + profiling_func = wrap_iter_perf_profiler(chained_func, result_id) else: - profiling_func = chained_func + profiling_func = wrap_perf_profiler(chained_func, result_id) elif profiler == "memory": result_id = read_long(infile) - if _supports_profiler(eval_type) and has_memory_profiler: - profiling_func = wrap_memory_profiler(chained_func, result_id) - else: + if not has_memory_profiler: profiling_func = chained_func + elif _is_iter_based(eval_type): + profiling_func = chained_func + else: + profiling_func = wrap_memory_profiler(chained_func, result_id) else: profiling_func = chained_func diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala index c147030037cd8..57366b6f165bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala @@ -172,7 +172,8 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) { pythonRunnerConf, metrics, jobArtifactUUID, - sessionUUID) + sessionUUID, + None) } def createPythonMetrics(): Array[CustomMetric] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala index 4e78b3035a7ec..51909df26a567 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala @@ -41,7 +41,8 @@ class MapInBatchEvaluatorFactory( pythonRunnerConf: Map[String, String], val pythonMetrics: Map[String, SQLMetric], jobArtifactUUID: Option[String], - sessionUUID: Option[String]) + sessionUUID: Option[String], + profiler: Option[String]) extends PartitionEvaluatorFactory[InternalRow, InternalRow] { override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = @@ -74,7 +75,7 @@ class MapInBatchEvaluatorFactory( pythonMetrics, jobArtifactUUID, sessionUUID, - None) with BatchedPythonArrowInput + profiler) with BatchedPythonArrowInput val columnarBatchIter = pyRunner.compute(batchIter, context.partitionId(), context) val unsafeProj = UnsafeProjection.create(output, output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala index 1d03c0cf76037..c4f090674e7c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala @@ -70,7 +70,8 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics { pythonRunnerConf, pythonMetrics, jobArtifactUUID, - sessionUUID) + sessionUUID, + conf.pythonUDFProfiler) val rdd = if (isBarrier) { val rddBarrier = child.execute().barrier() From 96fd0d5ba4bd568f65af8f704245475063501c23 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 07:27:38 -0500 Subject: [PATCH 2/9] Support memory and format --- python/pyspark/sql/tests/test_udf_profiler.py | 12 ++++---- python/pyspark/worker.py | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 3730bec5e9f5b..e4a0785b8a5da 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -339,9 +339,7 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: yield s + 2 with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): - df = self.spark.range(10, numPartitions=2).select( - add1("id"), add2("id"), add1("id") - ) + df = self.spark.range(10, numPartitions=2).select(add1("id"), add2("id"), add1("id")) df.collect() self.assertEqual(2, len(self.profile_results), str(self.profile_results.keys())) @@ -364,9 +362,7 @@ def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: yield pa.compute.add(s, 2) with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): - df = self.spark.range(10, numPartitions=2).select( - add1("id"), add2("id"), add1("id") - ) + df = self.spark.range(10, numPartitions=2).select(add1("id"), add2("id"), add1("id")) df.collect() self.assertEqual(2, len(self.profile_results), str(self.profile_results.keys())) @@ -401,7 +397,9 @@ def test_perf_profiler_map_in_arrow(self): def map_func(iterator: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]: for batch in iterator: - yield pa.RecordBatch.from_arrays([batch.column("id"), pa.compute.add(batch.column("age"), 1)], ["id", "age"]) + yield pa.RecordBatch.from_arrays( + [batch.column("id"), pa.compute.add(batch.column("age"), 1)], ["id", "age"] + ) with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): df.mapInArrow(map_func, df.schema).show() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 2eac797f65a44..18dd5497db0c3 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1246,6 +1246,36 @@ def profiling_func(*args, **kwargs): return profiling_func +def wrap_iter_memory_profiler(f, result_id): + from pyspark.sql.profiler import ProfileResultsParam + from pyspark.profiler import UDFLineProfilerV2 + + accumulator = _deserialize_accumulator( + SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + ) + + def profiling_func(*args, **kwargs): + profiler = UDFLineProfilerV2() + profiler.add_function(f) + + iterator = iter(f(*args, **kwargs)) + + while True: + try: + with profiler: + item = next(iterator) + yield item + except StopIteration: + break + + codemap_dict = { + filename: list(line_iterator) for filename, line_iterator in profiler.code_map.items() + } + accumulator.add({result_id: (None, codemap_dict)}) + + return profiling_func + + def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profiler): num_arg = read_int(infile) From e7a55e0f860577cad5012ddceacb99285b7db49c Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 15:26:44 +0000 Subject: [PATCH 3/9] Wrap memory iter --- python/pyspark/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 18dd5497db0c3..694d3c0282110 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1326,7 +1326,7 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profil if not has_memory_profiler: profiling_func = chained_func elif _is_iter_based(eval_type): - profiling_func = chained_func + profiling_func = wrap_iter_memory_profiler(chained_func, result_id) else: profiling_func = wrap_memory_profiler(chained_func, result_id) else: From 290474afa3560f114b1ef50731a98e17a473e42f Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 15:47:49 +0000 Subject: [PATCH 4/9] Update memory profiler tests --- python/pyspark/tests/test_memory_profiler.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/python/pyspark/tests/test_memory_profiler.py b/python/pyspark/tests/test_memory_profiler.py index df9d63c5260f9..1909358aa2bc2 100644 --- a/python/pyspark/tests/test_memory_profiler.py +++ b/python/pyspark/tests/test_memory_profiler.py @@ -341,12 +341,13 @@ def add2(x): not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), ) - def test_memory_profiler_pandas_udf_iterator_not_supported(self): + def test_memory_profiler_pandas_udf_iterator(self): import pandas as pd @pandas_udf("long") - def add1(x): - return x + 1 + def add1(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: + for s in iter: + yield s + 1 @pandas_udf("long") def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: @@ -359,7 +360,7 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: ) df.collect() - self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(3, len(self.profile_results), str(self.profile_results.keys())) for id in self.profile_results: self.assert_udf_memory_profile_present(udf_id=id) @@ -368,7 +369,7 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: not have_pandas or not have_pyarrow, cast(str, pandas_requirement_message or pyarrow_requirement_message), ) - def test_memory_profiler_map_in_pandas_not_supported(self): + def test_memory_profiler_map_in_pandas(self): df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")) def filter_func(iterator): @@ -378,7 +379,10 @@ def filter_func(iterator): with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): df.mapInPandas(filter_func, df.schema).show() - self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + self.assert_udf_memory_profile_present(udf_id=id) @unittest.skipIf( not have_pandas or not have_pyarrow, From a9d3aeabe318b3b6c303185900bba67166262518 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 19:52:57 -0500 Subject: [PATCH 5/9] Simplify --- python/pyspark/worker.py | 131 +++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 75 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 694d3c0282110..2db16c4517594 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1170,7 +1170,7 @@ def _is_iter_based(eval_type: int) -> bool: ) -def wrap_perf_profiler(f, result_id): +def wrap_perf_profiler(f, eval_type, result_id): import cProfile import pstats @@ -1180,51 +1180,42 @@ def wrap_perf_profiler(f, result_id): SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam ) - def profiling_func(*args, **kwargs): - with cProfile.Profile() as pr: - ret = f(*args, **kwargs) - st = pstats.Stats(pr) - st.stream = None # make it picklable - st.strip_dirs() + if _is_iter_based(eval_type): - accumulator.add({result_id: (st, None)}) - - return ret - - return profiling_func + def profiling_func(*args, **kwargs): + iterator = iter(f(*args, **kwargs)) + pr = cProfile.Profile() + while True: + try: + with pr: + item = next(iterator) + yield item + except StopIteration: + break + st = pstats.Stats(pr) + st.stream = None # make it picklable + st.strip_dirs() -def wrap_iter_perf_profiler(f, result_id): - import cProfile - import pstats + accumulator.add({result_id: (st, None)}) - from pyspark.sql.profiler import ProfileResultsParam - - accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam - ) + else: - def profiling_func(*args, **kwargs): - iterator = iter(f(*args, **kwargs)) - pr = cProfile.Profile() - while True: - try: - with pr: - item = next(iterator) - yield item - except StopIteration: - break + def profiling_func(*args, **kwargs): + with cProfile.Profile() as pr: + ret = f(*args, **kwargs) + st = pstats.Stats(pr) + st.stream = None # make it picklable + st.strip_dirs() - st = pstats.Stats(pr) - st.stream = None # make it picklable - st.strip_dirs() + accumulator.add({result_id: (st, None)}) - accumulator.add({result_id: (st, None)}) + return ret return profiling_func -def wrap_memory_profiler(f, result_id): +def wrap_memory_profiler(f, eval_type, result_id): from pyspark.sql.profiler import ProfileResultsParam from pyspark.profiler import UDFLineProfilerV2 @@ -1232,46 +1223,41 @@ def wrap_memory_profiler(f, result_id): SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam ) - def profiling_func(*args, **kwargs): - profiler = UDFLineProfilerV2() - - wrapped = profiler(f) - ret = wrapped(*args, **kwargs) - codemap_dict = { - filename: list(line_iterator) for filename, line_iterator in profiler.code_map.items() - } - accumulator.add({result_id: (None, codemap_dict)}) - return ret + if _is_iter_based(eval_type): - return profiling_func + def profiling_func(*args, **kwargs): + profiler = UDFLineProfilerV2() + profiler.add_function(f) + iterator = iter(f(*args, **kwargs)) -def wrap_iter_memory_profiler(f, result_id): - from pyspark.sql.profiler import ProfileResultsParam - from pyspark.profiler import UDFLineProfilerV2 + while True: + try: + with profiler: + item = next(iterator) + yield item + except StopIteration: + break - accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam - ) + codemap_dict = { + filename: list(line_iterator) + for filename, line_iterator in profiler.code_map.items() + } + accumulator.add({result_id: (None, codemap_dict)}) - def profiling_func(*args, **kwargs): - profiler = UDFLineProfilerV2() - profiler.add_function(f) + else: - iterator = iter(f(*args, **kwargs)) + def profiling_func(*args, **kwargs): + profiler = UDFLineProfilerV2() - while True: - try: - with profiler: - item = next(iterator) - yield item - except StopIteration: - break - - codemap_dict = { - filename: list(line_iterator) for filename, line_iterator in profiler.code_map.items() - } - accumulator.add({result_id: (None, codemap_dict)}) + wrapped = profiler(f) + ret = wrapped(*args, **kwargs) + codemap_dict = { + filename: list(line_iterator) + for filename, line_iterator in profiler.code_map.items() + } + accumulator.add({result_id: (None, codemap_dict)}) + return ret return profiling_func @@ -1316,19 +1302,14 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profil if profiler == "perf": result_id = read_long(infile) - if _is_iter_based(eval_type): - profiling_func = wrap_iter_perf_profiler(chained_func, result_id) - else: - profiling_func = wrap_perf_profiler(chained_func, result_id) + profiling_func = wrap_perf_profiler(chained_func, eval_type, result_id) elif profiler == "memory": result_id = read_long(infile) if not has_memory_profiler: profiling_func = chained_func - elif _is_iter_based(eval_type): - profiling_func = wrap_iter_memory_profiler(chained_func, result_id) else: - profiling_func = wrap_memory_profiler(chained_func, result_id) + profiling_func = wrap_memory_profiler(chained_func, eval_type, result_id) else: profiling_func = chained_func From caa6b3f7ca25c1a53daefb01db74fecf33597383 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 22:41:07 -0500 Subject: [PATCH 6/9] Support python data source --- python/pyspark/sql/tests/test_udf_profiler.py | 29 +++++++++++++++++++ .../python/UserDefinedPythonDataSource.scala | 2 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index e4a0785b8a5da..6897e87f49f7b 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -28,6 +28,7 @@ from pyspark import SparkConf from pyspark.errors import PySparkValueError from pyspark.sql import SparkSession +from pyspark.sql.datasource import DataSource, DataSourceReader from pyspark.sql.functions import col, arrow_udf, pandas_udf, udf from pyspark.sql.window import Window from pyspark.profiler import UDFBasicProfiler @@ -594,6 +595,34 @@ def summarize(left, right): for id in self.profile_results: self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2) + def test_perf_profiler_data_source(self): + class TestDataSourceReader(DataSourceReader): + def __init__(self, schema): + self.schema = schema + + def partitions(self): + raise NotImplementedError + + def read(self, partition): + yield from ((1,), (2,), (3,)) + + class TestDataSource(DataSource): + def schema(self): + return "id long" + + def reader(self, schema) -> "DataSourceReader": + return TestDataSourceReader(schema) + + self.spark.dataSource.register(TestDataSource) + + with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): + self.spark.read.format("TestDataSource").load().collect() + + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + + for id in self.profile_results: + self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4) + def test_perf_profiler_render(self): with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): _do_computation(self.spark) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala index 57366b6f165bd..47e64a5b4041a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/UserDefinedPythonDataSource.scala @@ -173,7 +173,7 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) { metrics, jobArtifactUUID, sessionUUID, - None) + conf.pythonUDFProfiler) } def createPythonMetrics(): Array[CustomMetric] = { From 4f4e4cb233a9a20cded8c49ff00e8b334ec18205 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 22:44:21 -0500 Subject: [PATCH 7/9] Simplify test --- python/pyspark/sql/tests/test_udf_profiler.py | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 6897e87f49f7b..37f4a70fabd27 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -330,20 +330,15 @@ def test_perf_profiler_pandas_udf_iterator(self): import pandas as pd @pandas_udf("long") - def add1(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: + def add(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: for s in iter: yield s + 1 - @pandas_udf("long") - def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]: - for s in iter: - yield s + 2 - with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): - df = self.spark.range(10, numPartitions=2).select(add1("id"), add2("id"), add1("id")) + df = self.spark.range(10, numPartitions=2).select(add("id")) df.collect() - self.assertEqual(2, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) for id in self.profile_results: self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4) @@ -353,20 +348,15 @@ def test_perf_profiler_arrow_udf_iterator(self): import pyarrow as pa @arrow_udf("long") - def add1(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: + def add(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: for s in iter: yield pa.compute.add(s, 1) - @arrow_udf("long") - def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]: - for s in iter: - yield pa.compute.add(s, 2) - with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): - df = self.spark.range(10, numPartitions=2).select(add1("id"), add2("id"), add1("id")) + df = self.spark.range(10, numPartitions=2).select(add("id")) df.collect() - self.assertEqual(2, len(self.profile_results), str(self.profile_results.keys())) + self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) for id in self.profile_results: self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4) From f3144e70e15812b66d2e2240a354b2fbffa43f87 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 22:46:11 -0500 Subject: [PATCH 8/9] Use same context manager logic for non-iter memory wrapper --- python/pyspark/worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 2db16c4517594..c697cafba6edd 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1249,9 +1249,11 @@ def profiling_func(*args, **kwargs): def profiling_func(*args, **kwargs): profiler = UDFLineProfilerV2() + profiler.add_function(f) + + with profiler: + ret = f(*args, **kwargs) - wrapped = profiler(f) - ret = wrapped(*args, **kwargs) codemap_dict = { filename: list(line_iterator) for filename, line_iterator in profiler.code_map.items() From 2daa80aa1f5214c735bc8b5a51824508b268ec1a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 3 Nov 2025 22:47:03 -0500 Subject: [PATCH 9/9] Move memory profiler check into wrap function --- python/pyspark/worker.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c697cafba6edd..6e34b041665ac 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1219,6 +1219,9 @@ def wrap_memory_profiler(f, eval_type, result_id): from pyspark.sql.profiler import ProfileResultsParam from pyspark.profiler import UDFLineProfilerV2 + if not has_memory_profiler: + return f + accumulator = _deserialize_accumulator( SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam ) @@ -1308,10 +1311,8 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profil elif profiler == "memory": result_id = read_long(infile) - if not has_memory_profiler: - profiling_func = chained_func - else: - profiling_func = wrap_memory_profiler(chained_func, eval_type, result_id) + + profiling_func = wrap_memory_profiler(chained_func, eval_type, result_id) else: profiling_func = chained_func