Skip to content

Commit

Permalink
[KYUUBI#209][Server] Fix high cpu load due to log capture thread not …
Browse files Browse the repository at this point in the history
…release

fixes #279
Squashed commit of the following:

commit adc8113
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 14:26:38 2021 +0800

    remove sleep

commit 41ad117
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 11:58:43 2021 +0800

    comment

commit fa7a7e9
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 11:54:46 2021 +0800

    add test

commit 9f579e8
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 11:54:38 2021 +0800

    release log capture thread
  • Loading branch information
ulysses-you authored and yaooqinn committed Jan 13, 2021
1 parent eb31f47 commit 70c28cb
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ trait ProcBuilder {
}

@volatile private var error: Throwable = UNCAUGHT_ERROR
// Visible for test
private[kyuubi] var logCaptureThread: Thread = null

final def start: Process = synchronized {
val procLog = Paths.get(workingDir.toAbsolutePath.toString,
Expand Down Expand Up @@ -86,15 +88,23 @@ trait ProcBuilder {
}
} catch {
case _: IOException =>
case _: InterruptedException =>
} finally {
reader.close()
}
}

PROC_BUILD_LOGGER.newThread(redirect).start()
logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect)
logCaptureThread.start()
proc
}

def close(): Unit = {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
}
}

def getError: Throwable = synchronized {
if (error == UNCAUGHT_ERROR) {
Thread.sleep(3000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,35 @@ class KyuubiSessionImpl(
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
try {
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
val Some((host, port)) = sh
openSession(host, port)
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
val Some((host, port)) = sh
openSession(host, port)
}

try {
zkClient.close()
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,20 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
assert(!b6.toString.contains("--proxy-user kentyao"))
}
}

test("log capture should release after close") {
val process = new FakeSparkProcessBuilder
try {
val subProcess = process.start
assert(!process.logCaptureThread.isInterrupted)
subProcess.waitFor(3, TimeUnit.SECONDS)
} finally {
process.close()
}
assert(process.logCaptureThread.isInterrupted)
}
}

class FakeSparkProcessBuilder extends SparkProcessBuilder("fake", Map.empty) {
override protected def commands: Array[String] = Array("ls")
}

0 comments on commit 70c28cb

Please sign in to comment.