Skip to content

Commit a68c2e6

Browse files
ueshindongjoon-hyun
authored andcommitted
[SPARK-54246][PYTHON][DOCS] Add the user guide for python worker logging
### What changes were proposed in this pull request? Adds the user guide for python worker logging. ### Why are the changes needed? The documentation for python worker logging will be in the user guide: "Chapter 4: Bug Busting - Debugging PySpark". ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? Yes, asked cursor for the draft. Closes #52949 from ueshin/issues/SPARK-54246/user_guide. Authored-by: Takuya Ueshin <ueshin@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit b144965) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 136246c commit a68c2e6

File tree

1 file changed

+358
-1
lines changed

1 file changed

+358
-1
lines changed

python/docs/source/user_guide/bugbusting.ipynb

Lines changed: 358 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@
792792
"id": "09b420ba",
793793
"metadata": {},
794794
"source": [
795-
"## Disply Stacktraces"
795+
"## Display Stacktraces"
796796
]
797797
},
798798
{
@@ -900,6 +900,363 @@
900900
"See also [Stack Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces) for more details."
901901
]
902902
},
903+
{
904+
"cell_type": "markdown",
905+
"id": "cff22ba8",
906+
"metadata": {},
907+
"source": [
908+
"## Python Worker Logging\n",
909+
"\n",
910+
"<div class=\"alert alert-block alert-info\">\n",
911+
"<b>Note:</b> This section applies to Spark 4.1\n",
912+
"</div>\n",
913+
"\n",
914+
"PySpark provides a logging mechanism for Python workers that execute UDFs, UDTFs, Pandas UDFs, and Python data sources. When enabled, all logging output (including `print` statements, standard logging, and exceptions) is captured and made available for querying and analysis.\n",
915+
"\n",
916+
"### Enabling Worker Logging\n",
917+
"\n",
918+
"Worker logging is **disabled by default**. Enable it by setting the Spark SQL configuration:"
919+
]
920+
},
921+
{
922+
"cell_type": "code",
923+
"execution_count": 20,
924+
"id": "74786d45",
925+
"metadata": {},
926+
"outputs": [],
927+
"source": [
928+
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")"
929+
]
930+
},
931+
{
932+
"cell_type": "markdown",
933+
"id": "0f23fee2",
934+
"metadata": {},
935+
"source": [
936+
"### Accessing Logs\n",
937+
"\n",
938+
"All captured logs can be queried as a DataFrame:"
939+
]
940+
},
941+
{
942+
"cell_type": "code",
943+
"execution_count": 21,
944+
"id": "9db0c509",
945+
"metadata": {},
946+
"outputs": [],
947+
"source": [
948+
"logs = spark.table(\"system.session.python_worker_logs\")"
949+
]
950+
},
951+
{
952+
"cell_type": "markdown",
953+
"id": "34bca836",
954+
"metadata": {},
955+
"source": [
956+
"The logs DataFrame contains the following columns:\n",
957+
"\n",
958+
"- **ts**: Timestamp of the log entry\n",
959+
"- **level**: Log level (e.g., `\"INFO\"`, `\"WARNING\"`, `\"ERROR\"`)\n",
960+
"- **logger**: Logger name (e.g., custom logger name, `\"stdout\"`, `\"stderr\"`)\n",
961+
"- **msg**: The log message\n",
962+
"- **context**: A map containing contextual information (e.g., `func_name`, `class_name`, custom fields)\n",
963+
"- **exception**: Exception details (if an exception was logged)\n",
964+
"\n",
965+
"### Examples\n",
966+
"\n",
967+
"#### Basic UDF Logging"
968+
]
969+
},
970+
{
971+
"cell_type": "code",
972+
"execution_count": 22,
973+
"id": "4cb5bbca",
974+
"metadata": {},
975+
"outputs": [
976+
{
977+
"name": "stdout",
978+
"output_type": "stream",
979+
"text": [
980+
"+------------+\n",
981+
"|my_udf(text)|\n",
982+
"+------------+\n",
983+
"| HELLO|\n",
984+
"| WORLD|\n",
985+
"+------------+\n",
986+
"\n",
987+
"+-------+------------------------+----------------+---------------------+\n",
988+
"|level |msg |logger |context |\n",
989+
"+-------+------------------------+----------------+---------------------+\n",
990+
"|INFO |Processing value: hello |my_custom_logger|{func_name -> my_udf}|\n",
991+
"|WARNING|This is a warning |my_custom_logger|{func_name -> my_udf}|\n",
992+
"|INFO |This is a stdout message|stdout |{func_name -> my_udf}|\n",
993+
"|ERROR |This is a stderr message|stderr |{func_name -> my_udf}|\n",
994+
"|INFO |Processing value: world |my_custom_logger|{func_name -> my_udf}|\n",
995+
"|WARNING|This is a warning |my_custom_logger|{func_name -> my_udf}|\n",
996+
"|INFO |This is a stdout message|stdout |{func_name -> my_udf}|\n",
997+
"|ERROR |This is a stderr message|stderr |{func_name -> my_udf}|\n",
998+
"+-------+------------------------+----------------+---------------------+\n",
999+
"\n"
1000+
]
1001+
}
1002+
],
1003+
"source": [
1004+
"from pyspark.sql.functions import udf\n",
1005+
"import logging\n",
1006+
"import sys\n",
1007+
"\n",
1008+
"@udf(\"string\")\n",
1009+
"def my_udf(value):\n",
1010+
" logger = logging.getLogger(\"my_custom_logger\")\n",
1011+
" logger.setLevel(logging.INFO) # Set level to INFO to capture info messages\n",
1012+
" logger.info(f\"Processing value: {value}\")\n",
1013+
" logger.warning(\"This is a warning\")\n",
1014+
" print(\"This is a stdout message\") # INFO level, logger=stdout\n",
1015+
" print(\"This is a stderr message\", file=sys.stderr) # ERROR level, logger=stderr\n",
1016+
" return value.upper()\n",
1017+
"\n",
1018+
"# Enable logging and execute\n",
1019+
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
1020+
"df = spark.createDataFrame([(\"hello\",), (\"world\",)], [\"text\"])\n",
1021+
"df.select(my_udf(\"text\")).show()\n",
1022+
"\n",
1023+
"# Query the logs\n",
1024+
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1025+
"logs.select(\"level\", \"msg\", \"logger\", \"context\").show(truncate=False)"
1026+
]
1027+
},
1028+
{
1029+
"cell_type": "markdown",
1030+
"id": "15a80ffb",
1031+
"metadata": {},
1032+
"source": [
1033+
"#### Logging with Custom Context\n",
1034+
"\n",
1035+
"You can add custom context information to your logs:"
1036+
]
1037+
},
1038+
{
1039+
"cell_type": "code",
1040+
"execution_count": 23,
1041+
"id": "427a06c5",
1042+
"metadata": {},
1043+
"outputs": [
1044+
{
1045+
"name": "stdout",
1046+
"output_type": "stream",
1047+
"text": [
1048+
"+--------------------+\n",
1049+
"|contextual_udf(test)|\n",
1050+
"+--------------------+\n",
1051+
"| test|\n",
1052+
"+--------------------+\n",
1053+
"\n",
1054+
"+-----------------------------+---------------------------------------------------------------------+\n",
1055+
"|msg |context |\n",
1056+
"+-----------------------------+---------------------------------------------------------------------+\n",
1057+
"|Processing with extra context|{func_name -> contextual_udf, user_id -> 123, operation -> transform}|\n",
1058+
"+-----------------------------+---------------------------------------------------------------------+\n",
1059+
"\n"
1060+
]
1061+
}
1062+
],
1063+
"source": [
1064+
"from pyspark.sql.functions import lit, udf\n",
1065+
"import logging\n",
1066+
"\n",
1067+
"@udf(\"string\")\n",
1068+
"def contextual_udf(value):\n",
1069+
" logger = logging.getLogger(\"contextual\")\n",
1070+
" logger.warning(\n",
1071+
" \"Processing with extra context\",\n",
1072+
" extra={\"context\": {\"user_id\": 123, \"operation\": \"transform\"}}\n",
1073+
" )\n",
1074+
" return value\n",
1075+
"\n",
1076+
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
1077+
"spark.range(1).select(contextual_udf(lit(\"test\"))).show()\n",
1078+
"\n",
1079+
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1080+
"logs.filter(\"logger = 'contextual'\").select(\"msg\", \"context\").show(truncate=False)"
1081+
]
1082+
},
1083+
{
1084+
"cell_type": "markdown",
1085+
"id": "a19db296",
1086+
"metadata": {},
1087+
"source": [
1088+
"The context includes both automatic fields (like `func_name`) and custom fields (like `user_id`, `operation`).\n",
1089+
"\n",
1090+
"#### Exception Logging\n",
1091+
"\n",
1092+
"Exceptions are automatically captured with full stack traces:"
1093+
]
1094+
},
1095+
{
1096+
"cell_type": "code",
1097+
"execution_count": 24,
1098+
"id": "3ab34a4c",
1099+
"metadata": {},
1100+
"outputs": [
1101+
{
1102+
"name": "stdout",
1103+
"output_type": "stream",
1104+
"text": [
1105+
"+------------------+\n",
1106+
"|failing_udf(value)|\n",
1107+
"+------------------+\n",
1108+
"| -1|\n",
1109+
"| 20|\n",
1110+
"+------------------+\n",
1111+
"\n",
1112+
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
1113+
"|msg |exception |\n",
1114+
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
1115+
"|Division by zero occurred|{ZeroDivisionError, division by zero, [{NULL, failing_udf, /var/folders/r8/0v7zwfbd59q4ym2gn6kxjq8h0000gp/T/ipykernel_79089/916837455.py, 8}]}|\n",
1116+
"+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+\n",
1117+
"\n"
1118+
]
1119+
}
1120+
],
1121+
"source": [
1122+
"from pyspark.sql.functions import udf\n",
1123+
"import logging\n",
1124+
"\n",
1125+
"@udf(\"int\")\n",
1126+
"def failing_udf(x):\n",
1127+
" logger = logging.getLogger(\"error_handler\")\n",
1128+
" try:\n",
1129+
" result = 100 / x\n",
1130+
" except ZeroDivisionError:\n",
1131+
" logger.exception(\"Division by zero occurred\")\n",
1132+
" return -1\n",
1133+
" return int(result)\n",
1134+
"\n",
1135+
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
1136+
"spark.createDataFrame([(0,), (5,)], [\"value\"]).select(failing_udf(\"value\")).show()\n",
1137+
"\n",
1138+
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1139+
"logs.filter(\"logger = 'error_handler'\").select(\"msg\", \"exception\").show(truncate=False)"
1140+
]
1141+
},
1142+
{
1143+
"cell_type": "markdown",
1144+
"id": "e54f6ac3",
1145+
"metadata": {},
1146+
"source": [
1147+
"#### UDTF and Python Data Source Logging\n",
1148+
"\n",
1149+
"Worker logging also works with UDTFs and Python Data Sources, capturing both the class and function names:"
1150+
]
1151+
},
1152+
{
1153+
"cell_type": "code",
1154+
"execution_count": 25,
1155+
"id": "02d454b0",
1156+
"metadata": {},
1157+
"outputs": [
1158+
{
1159+
"name": "stdout",
1160+
"output_type": "stream",
1161+
"text": [
1162+
"+-----------+-----+------+\n",
1163+
"| text| word|length|\n",
1164+
"+-----------+-----+------+\n",
1165+
"|hello world|hello| 5|\n",
1166+
"|hello world|world| 5|\n",
1167+
"+-----------+-----+------+\n",
1168+
"\n",
1169+
"+-----------------------------+---------------------------------------------------------------------+\n",
1170+
"|msg |context |\n",
1171+
"+-----------------------------+---------------------------------------------------------------------+\n",
1172+
"|Processing 2 words |{func_name -> eval, class_name -> WordSplitter} |\n",
1173+
"+-----------------------------+---------------------------------------------------------------------+\n",
1174+
"\n"
1175+
]
1176+
}
1177+
],
1178+
"source": [
1179+
"from pyspark.sql.functions import col, udtf\n",
1180+
"import logging\n",
1181+
"\n",
1182+
"@udtf(returnType=\"word: string, length: int\")\n",
1183+
"class WordSplitter:\n",
1184+
" def eval(self, text: str):\n",
1185+
" logger = logging.getLogger(\"udtf_logger\")\n",
1186+
" logger.setLevel(logging.INFO) # Set level to INFO to capture info messages\n",
1187+
" words = text.split()\n",
1188+
" logger.info(f\"Processing {len(words)} words\")\n",
1189+
" for word in words:\n",
1190+
" yield (word, len(word))\n",
1191+
"\n",
1192+
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
1193+
"df = spark.createDataFrame([(\"hello world\",)], [\"text\"])\n",
1194+
"df.lateralJoin(WordSplitter(col(\"text\").outer())).show()\n",
1195+
"\n",
1196+
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1197+
"logs.filter(\"logger = 'udtf_logger'\").select(\"msg\", \"context\").show(truncate=False)"
1198+
]
1199+
},
1200+
{
1201+
"cell_type": "markdown",
1202+
"id": "9d4c119b",
1203+
"metadata": {},
1204+
"source": [
1205+
"### Querying and Analyzing Logs\n",
1206+
"\n",
1207+
"You can use standard DataFrame operations to analyze logs:"
1208+
]
1209+
},
1210+
{
1211+
"cell_type": "code",
1212+
"execution_count": 26,
1213+
"id": "5b061011",
1214+
"metadata": {},
1215+
"outputs": [
1216+
{
1217+
"name": "stdout",
1218+
"output_type": "stream",
1219+
"text": [
1220+
"+-------+-----+\n",
1221+
"| level|count|\n",
1222+
"+-------+-----+\n",
1223+
"| INFO| 5|\n",
1224+
"|WARNING| 3|\n",
1225+
"| ERROR| 3|\n",
1226+
"+-------+-----+\n",
1227+
"\n",
1228+
"...\n",
1229+
"\n"
1230+
]
1231+
}
1232+
],
1233+
"source": [
1234+
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1235+
"\n",
1236+
"# Count logs by level\n",
1237+
"logs.groupBy(\"level\").count().show()\n",
1238+
"\n",
1239+
"# Find all errors\n",
1240+
"logs.filter(\"level = 'ERROR'\").show()\n",
1241+
"\n",
1242+
"# Logs from a specific function\n",
1243+
"logs.filter(\"context.func_name = 'my_udf'\").show()\n",
1244+
"\n",
1245+
"# Logs with exceptions\n",
1246+
"logs.filter(\"exception is not null\").show()\n",
1247+
"\n",
1248+
"# Time-based analysis\n",
1249+
"logs.orderBy(\"ts\").show()"
1250+
]
1251+
},
1252+
{
1253+
"cell_type": "markdown",
1254+
"id": "7eaa72b9",
1255+
"metadata": {},
1256+
"source": [
1257+
"\n"
1258+
]
1259+
},
9031260
{
9041261
"attachments": {},
9051262
"cell_type": "markdown",

0 commit comments

Comments
 (0)