Skip to content

Commit 1345838

Browse files
committed
[SPARK-54217] Synchronize PythonRunner's MonitorThread kill decision
This diff addresses the synchronization issue described in SPARK-54217 by respecting the existing releasedOrClosed AtomicBoolean in the PythonRunner's kill codepath, which is currently only used in the "released" codepath - not the "closed" one. In doing so, we avoid erroneously destroying a still-healthy Python worker; in the current state, it will be destroyed & a new one will be created.
1 parent 37689bf commit 1345838

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,10 +325,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
325325
// SPARK-35009: avoid creating multiple monitor threads for the same python worker
326326
// and task context
327327
if (PythonRunner.runningMonitorThreads.add(key)) {
328-
new MonitorThread(SparkEnv.get, worker, context).start()
328+
new MonitorThread(SparkEnv.get, worker, context, releasedOrClosed).start()
329329
}
330330
} else {
331-
new MonitorThread(SparkEnv.get, worker, context).start()
331+
new MonitorThread(SparkEnv.get, worker, context, releasedOrClosed).start()
332332
}
333333

334334
// Return an iterator that read lines from the process's stdout
@@ -685,7 +685,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
685685
* interrupts disabled. In that case we will need to explicitly kill the worker, otherwise the
686686
* threads can block indefinitely.
687687
*/
688-
class MonitorThread(env: SparkEnv, worker: PythonWorker, context: TaskContext)
688+
class MonitorThread(
689+
env: SparkEnv,
690+
worker: PythonWorker,
691+
context: TaskContext,
692+
releasedOrClosed: AtomicBoolean)
689693
extends Thread(s"Worker Monitor for $pythonExec") {
690694

691695
/** How long to wait before killing the python worker if a task cannot be interrupted. */
@@ -701,7 +705,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
701705
}
702706
if (!context.isCompleted()) {
703707
Thread.sleep(taskKillTimeout)
704-
if (!context.isCompleted()) {
708+
if (!context.isCompleted() && releasedOrClosed.compareAndSet(false, true)) {
705709
try {
706710
// Mimic the task name used in `Executor` to help the user find out the task to blame.
707711
val taskName = s"${context.partitionId()}.${context.attemptNumber()} " +

0 commit comments

Comments
 (0)