diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index ead543924b6..4d7a5e65cb5 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -56,6 +56,8 @@ trait ProcBuilder { } @volatile private var error: Throwable = UNCAUGHT_ERROR + // Visible for test + private[kyuubi] var logCaptureThread: Thread = null final def start: Process = synchronized { val procLog = Paths.get(workingDir.toAbsolutePath.toString, @@ -86,15 +88,23 @@ trait ProcBuilder { } } catch { case _: IOException => + case _: InterruptedException => } finally { reader.close() } } - PROC_BUILD_LOGGER.newThread(redirect).start() + logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect) + logCaptureThread.start() proc } + def close(): Unit = { + if (logCaptureThread != null) { + logCaptureThread.interrupt() + } + } + def getError: Throwable = synchronized { if (error == UNCAUGHT_ERROR) { Thread.sleep(3000) diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index a791d6635f9..0ec5de17c69 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -106,28 +106,35 @@ class KyuubiSessionImpl( sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString) sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace) val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf) - val process = builder.start - info(s"Launching SQL engine: $builder") - var sh = getServerHost - val started = System.currentTimeMillis() - var exitValue: Option[Int] = None - while (sh.isEmpty) { - if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { - exitValue = Some(process.exitValue()) - if (exitValue.get != 0) { - throw builder.getError + try { + val process = builder.start + info(s"Launching SQL engine: $builder") + var sh = getServerHost + val started = System.currentTimeMillis() + var exitValue: Option[Int] = None + while (sh.isEmpty) { + if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { + exitValue = Some(process.exitValue()) + if (exitValue.get != 0) { + throw builder.getError + } } + if (started + timeout <= System.currentTimeMillis()) { + process.destroyForcibly() + throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", + builder.getError) + } + sh = getServerHost } - if (started + timeout <= System.currentTimeMillis()) { - process.destroyForcibly() - throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder", - builder.getError) - } - sh = getServerHost + val Some((host, port)) = sh + openSession(host, port) + } finally { + // we must close the process builder whether session open is success or failure since + // we have a log capture thread in process builder. + builder.close() } - val Some((host, port)) = sh - openSession(host, port) } + try { zkClient.close() } catch { diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index cd54ab9fc24..4cc5ba4a7e8 100644 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -99,4 +99,20 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper { assert(!b6.toString.contains("--proxy-user kentyao")) } } + + test("log capture should release after close") { + val process = new FakeSparkProcessBuilder + try { + val subProcess = process.start + assert(!process.logCaptureThread.isInterrupted) + subProcess.waitFor(3, TimeUnit.SECONDS) + } finally { + process.close() + } + assert(process.logCaptureThread.isInterrupted) + } +} + +class FakeSparkProcessBuilder extends SparkProcessBuilder("fake", Map.empty) { + override protected def commands: Array[String] = Array("ls") }