Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 63 additions & 25 deletions python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyspark import SparkConf
from pyspark.errors import PySparkValueError
from pyspark.sql import SparkSession
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.functions import col, arrow_udf, pandas_udf, udf
from pyspark.sql.window import Window
from pyspark.profiler import UDFBasicProfiler
Expand Down Expand Up @@ -325,59 +326,47 @@ def add2(x):
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_pandas_udf_iterator_not_supported(self):
def test_perf_profiler_pandas_udf_iterator(self):
import pandas as pd

@pandas_udf("long")
def add1(x):
return x + 1

@pandas_udf("long")
def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
def add(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iter:
yield s + 2
yield s + 1

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.range(10, numPartitions=2).select(
add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
)
df = self.spark.range(10, numPartitions=2).select(add("id"))
df.collect()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4)

@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_perf_profiler_arrow_udf_iterator_not_supported(self):
def test_perf_profiler_arrow_udf_iterator(self):
import pyarrow as pa

@arrow_udf("long")
def add1(x):
return pa.compute.add(x, 1)

@arrow_udf("long")
def add2(iter: Iterator[pa.Array]) -> Iterator[pa.Array]:
def add(iter: Iterator[pa.Array]) -> Iterator[pa.Array]:
for s in iter:
yield pa.compute.add(s, 2)
yield pa.compute.add(s, 1)

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df = self.spark.range(10, numPartitions=2).select(
add1("id"), add2("id"), add1("id"), add2(col("id") + 1)
)
df = self.spark.range(10, numPartitions=2).select(add("id"))
df.collect()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_perf_profiler_map_in_pandas_not_supported(self):
df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def test_perf_profiler_map_in_pandas(self):
df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")).repartition(1)

def filter_func(iterator):
for pdf in iterator:
Expand All @@ -386,7 +375,28 @@ def filter_func(iterator):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.mapInPandas(filter_func, df.schema).show()

self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)

@unittest.skipIf(not have_pyarrow, pyarrow_requirement_message)
def test_perf_profiler_map_in_arrow(self):
import pyarrow as pa

df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age")).repartition(1)

def map_func(iterator: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
for batch in iterator:
yield pa.RecordBatch.from_arrays(
[batch.column("id"), pa.compute.add(batch.column("age"), 1)], ["id", "age"]
)

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
df.mapInArrow(map_func, df.schema).show()

for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down Expand Up @@ -575,6 +585,34 @@ def summarize(left, right):
for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=2)

def test_perf_profiler_data_source(self):
class TestDataSourceReader(DataSourceReader):
def __init__(self, schema):
self.schema = schema

def partitions(self):
raise NotImplementedError

def read(self, partition):
yield from ((1,), (2,), (3,))

class TestDataSource(DataSource):
def schema(self):
return "id long"

def reader(self, schema) -> "DataSourceReader":
return TestDataSourceReader(schema)

self.spark.dataSource.register(TestDataSource)

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
self.spark.read.format("TestDataSource").load().collect()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
self.assert_udf_profile_present(udf_id=id, expected_line_count_prefix=4)

def test_perf_profiler_render(self):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
_do_computation(self.spark)
Expand Down
16 changes: 10 additions & 6 deletions python/pyspark/tests/test_memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,13 @@ def add2(x):
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_memory_profiler_pandas_udf_iterator_not_supported(self):
def test_memory_profiler_pandas_udf_iterator(self):
import pandas as pd

@pandas_udf("long")
def add1(x):
return x + 1
def add1(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iter:
yield s + 1

@pandas_udf("long")
def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
Expand All @@ -359,7 +360,7 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
)
df.collect()

self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
self.assertEqual(3, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
self.assert_udf_memory_profile_present(udf_id=id)
Expand All @@ -368,7 +369,7 @@ def add2(iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
not have_pandas or not have_pyarrow,
cast(str, pandas_requirement_message or pyarrow_requirement_message),
)
def test_memory_profiler_map_in_pandas_not_supported(self):
def test_memory_profiler_map_in_pandas(self):
df = self.spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
Expand All @@ -378,7 +379,10 @@ def filter_func(iterator):
with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
df.mapInPandas(filter_func, df.schema).show()

self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))

for id in self.profile_results:
self.assert_udf_memory_profile_present(udf_id=id)

@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down
106 changes: 77 additions & 29 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1158,17 +1158,19 @@ def func(*args):
return f, args_offsets


def _supports_profiler(eval_type: int) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ueshin to check whether iterator-based UDFs can be supported in memory profiler now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some code to support memory profiler: https://github.com/apache/spark/pull/52853/files#diff-e91c3ea5c69ba1ce65bbaae56a7164a927bb013aac053d5167936ce9c9522051R1228?
Also the description contains the example output.

return eval_type not in (
def _is_iter_based(eval_type: int) -> bool:
return eval_type in (
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE,
PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_ITER_UDF,
)


def wrap_perf_profiler(f, result_id):
def wrap_perf_profiler(f, eval_type, result_id):
import cProfile
import pstats

Expand All @@ -1178,38 +1180,89 @@ def wrap_perf_profiler(f, result_id):
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
)

def profiling_func(*args, **kwargs):
with cProfile.Profile() as pr:
ret = f(*args, **kwargs)
st = pstats.Stats(pr)
st.stream = None # make it picklable
st.strip_dirs()
if _is_iter_based(eval_type):

def profiling_func(*args, **kwargs):
iterator = iter(f(*args, **kwargs))
pr = cProfile.Profile()
while True:
try:
with pr:
item = next(iterator)
yield item
except StopIteration:
break

st = pstats.Stats(pr)
st.stream = None # make it picklable
st.strip_dirs()

accumulator.add({result_id: (st, None)})

accumulator.add({result_id: (st, None)})
else:

def profiling_func(*args, **kwargs):
with cProfile.Profile() as pr:
ret = f(*args, **kwargs)
st = pstats.Stats(pr)
st.stream = None # make it picklable
st.strip_dirs()

return ret
accumulator.add({result_id: (st, None)})

return ret

return profiling_func


def wrap_memory_profiler(f, result_id):
def wrap_memory_profiler(f, eval_type, result_id):
from pyspark.sql.profiler import ProfileResultsParam
from pyspark.profiler import UDFLineProfilerV2

if not has_memory_profiler:
return f

accumulator = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
)

def profiling_func(*args, **kwargs):
profiler = UDFLineProfilerV2()
if _is_iter_based(eval_type):

wrapped = profiler(f)
ret = wrapped(*args, **kwargs)
codemap_dict = {
filename: list(line_iterator) for filename, line_iterator in profiler.code_map.items()
}
accumulator.add({result_id: (None, codemap_dict)})
return ret
def profiling_func(*args, **kwargs):
profiler = UDFLineProfilerV2()
profiler.add_function(f)

iterator = iter(f(*args, **kwargs))

while True:
try:
with profiler:
item = next(iterator)
yield item
except StopIteration:
break

codemap_dict = {
filename: list(line_iterator)
for filename, line_iterator in profiler.code_map.items()
}
accumulator.add({result_id: (None, codemap_dict)})

else:

def profiling_func(*args, **kwargs):
profiler = UDFLineProfilerV2()
profiler.add_function(f)

with profiler:
ret = f(*args, **kwargs)

codemap_dict = {
filename: list(line_iterator)
for filename, line_iterator in profiler.code_map.items()
}
accumulator.add({result_id: (None, codemap_dict)})
return ret

return profiling_func

Expand Down Expand Up @@ -1254,17 +1307,12 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index, profil
if profiler == "perf":
result_id = read_long(infile)

if _supports_profiler(eval_type):
profiling_func = wrap_perf_profiler(chained_func, result_id)
else:
profiling_func = chained_func
profiling_func = wrap_perf_profiler(chained_func, eval_type, result_id)

elif profiler == "memory":
result_id = read_long(infile)
if _supports_profiler(eval_type) and has_memory_profiler:
profiling_func = wrap_memory_profiler(chained_func, result_id)
else:
profiling_func = chained_func

profiling_func = wrap_memory_profiler(chained_func, eval_type, result_id)
else:
profiling_func = chained_func

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ case class UserDefinedPythonDataSource(dataSourceCls: PythonFunction) {
pythonRunnerConf,
metrics,
jobArtifactUUID,
sessionUUID)
sessionUUID,
conf.pythonUDFProfiler)
}

def createPythonMetrics(): Array[CustomMetric] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class MapInBatchEvaluatorFactory(
pythonRunnerConf: Map[String, String],
val pythonMetrics: Map[String, SQLMetric],
jobArtifactUUID: Option[String],
sessionUUID: Option[String])
sessionUUID: Option[String],
profiler: Option[String])
extends PartitionEvaluatorFactory[InternalRow, InternalRow] {

override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] =
Expand Down Expand Up @@ -74,7 +75,7 @@ class MapInBatchEvaluatorFactory(
pythonMetrics,
jobArtifactUUID,
sessionUUID,
None) with BatchedPythonArrowInput
profiler) with BatchedPythonArrowInput
val columnarBatchIter = pyRunner.compute(batchIter, context.partitionId(), context)

val unsafeProj = UnsafeProjection.create(output, output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ trait MapInBatchExec extends UnaryExecNode with PythonSQLMetrics {
pythonRunnerConf,
pythonMetrics,
jobArtifactUUID,
sessionUUID)
sessionUUID,
conf.pythonUDFProfiler)

val rdd = if (isBarrier) {
val rddBarrier = child.execute().barrier()
Expand Down