Skip to content

Commit

Permalink
[LIVY-1007] Livy should not spawn one thread per job to track the job…
Browse files Browse the repository at this point in the history
… on Kubernetes

## What changes were proposed in this pull request?

Instead of spawning a monitor thread for every session, create a centralised thread to monitor all Kubernetes apps.

JIRA link: https://issues.apache.org/jira/browse/LIVY-1007

## How was this patch tested?

**Unit Tests:** The patch has been verified through comprehensive unit tests.
**Manual Testing:** Conducted manual testing using Kubernetes on Docker Desktop.

Environment: Helm charts. For detailed instructions on testing using Helm charts, please refer to the documentation available at [livycluster](https://github.com/askhatri/livycluster).

Co-authored-by: Asif Khatri <asif.khatri@cloudera.com>
  • Loading branch information
askhatri and Asif Khatri authored Aug 12, 2024
1 parent b089dd6 commit 6dcb294
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 37 deletions.
4 changes: 4 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@

# If Livy can't find the Kubernetes app within this time, consider it lost.
# livy.server.kubernetes.app-lookup-timeout = 600s
# If Livy can't find the Kubernetes app within this max times, consider it lost.
# livy.server.kubernetes.app-lookup.max-failed.times = 120
# The size of thread pool to monitor all Kubernetes apps.
# livy.server.kubernetes.app-lookup.thread-pool.size = 4
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ object LivyConf {

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// If Livy can't find the Kubernetes app within this max times, consider it lost.
val KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES =
Entry("livy.server.kubernetes.app-lookup.max-failed.times", 120)
// The size of thread pool to monitor all Kubernetes apps.
val KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE =
Entry("livy.server.kubernetes.app-lookup.thread-pool.size", 4)

// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

Expand Down
219 changes: 184 additions & 35 deletions server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.livy.utils

import java.net.URLEncoder
import java.util.Collections
import java.util.concurrent.TimeoutException
import java.util.concurrent._

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
Expand All @@ -33,12 +33,16 @@ import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressBuilder}
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, _}
import org.apache.commons.lang.StringUtils

import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.{LivyConf, Logging}

object SparkKubernetesApp extends Logging {

private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()

private val monitorAppThreadMap = new java.util.concurrent.ConcurrentHashMap[Thread, Long]()

private val appQueue = new ConcurrentLinkedQueue[SparkKubernetesApp]()

private val leakedAppsGCThread = new Thread() {
override def run(): Unit = {
import KubernetesExtensions._
Expand Down Expand Up @@ -97,6 +101,33 @@ object SparkKubernetesApp extends Logging {
}
}

private val checkMonitorAppTimeoutThread = new Thread() {
override def run(): Unit = {
while (true) {
try {
val iter = monitorAppThreadMap.entrySet().iterator()
val now = System.currentTimeMillis()

while (iter.hasNext) {
val entry = iter.next()
val thread = entry.getKey
val updatedTime = entry.getValue

val remaining: Long = now - updatedTime - pollInterval.toMillis
if (remaining > appLookupTimeout.toMillis) {
thread.interrupt()
}
}

Thread.sleep(pollInterval.toMillis)
} catch {
case e: InterruptedException =>
error("Apps timeout monitoring thread was interrupted.", e)
}
}
}
}

private var livyConf: LivyConf = _

private var cacheLogSize: Int = _
Expand All @@ -108,7 +139,10 @@ object SparkKubernetesApp extends Logging {

var kubernetesClient: DefaultKubernetesClient = _

def init(livyConf: LivyConf): Unit = {
private var appLookupThreadPoolSize: Long = _
private var appLookupMaxFailedTimes: Long = _

def init(livyConf: LivyConf, client: Option[KubernetesClient] = None): Unit = {
this.livyConf = livyConf

// KubernetesClient is thread safe. Create once, share it across threads.
Expand All @@ -119,6 +153,9 @@ object SparkKubernetesApp extends Logging {
appLookupTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LOOKUP_TIMEOUT).milliseconds
pollInterval = livyConf.getTimeAsMs(LivyConf.KUBERNETES_POLL_INTERVAL).milliseconds

appLookupThreadPoolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
appLookupMaxFailedTimes = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_MAX_FAILED_TIMES)

sessionLeakageCheckInterval =
livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL)
sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT)
Expand All @@ -131,6 +168,12 @@ object SparkKubernetesApp extends Logging {
setName("RefreshServiceAccountTokenThread")
RefreshServiceAccountTokenThread.setDaemon(true)
RefreshServiceAccountTokenThread.start()

checkMonitorAppTimeoutThread.setDaemon(true)
checkMonitorAppTimeoutThread.setName("CheckMonitorAppTimeoutThread")
checkMonitorAppTimeoutThread.start()

initKubernetesAppMonitorThreadPool(livyConf)
}

// Returning T, throwing the exception on failure
Expand All @@ -147,6 +190,53 @@ object SparkKubernetesApp extends Logging {
}
}

class KubernetesAppMonitorRunnable extends Runnable {
override def run(): Unit = {
while (true) {
try {
val poolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
var numberOfAppsToProcess = appQueue.size() / poolSize
if (numberOfAppsToProcess < 1) {
numberOfAppsToProcess = 1
} else if (numberOfAppsToProcess > 20) {
numberOfAppsToProcess = 20
}
for (_ <- 0 until numberOfAppsToProcess) {
// update time when monitor app so that
// checkMonitorAppTimeoutThread can check whether the thread was blocked on monitoring
monitorAppThreadMap.put(Thread.currentThread(), System.currentTimeMillis())
val app = appQueue.poll()
if (app != null) {
app.monitorSparkKubernetesApp()
if (app.isRunning) {
appQueue.add(app)
}
}
}
Thread.sleep(pollInterval.toMillis)
} catch {
case e: InterruptedException =>
error(s"Kubernetes app monitoring was interrupted.", e)
}
}
}
}

private def initKubernetesAppMonitorThreadPool(livyConf: LivyConf): Unit = {
val poolSize = livyConf.getInt(LivyConf.KUBERNETES_APP_LOOKUP_THREAD_POOL_SIZE)
val KubernetesAppMonitorThreadPool: ExecutorService =
Executors.newFixedThreadPool(poolSize)

val runnable = new KubernetesAppMonitorRunnable()

for (_ <- 0 until poolSize) {
KubernetesAppMonitorThreadPool.execute(runnable)
}
}

def getAppSize: Int = appQueue.size()

def clearApps(): Unit = appQueue.clear()
}

class SparkKubernetesApp private[utils] (
Expand All @@ -162,26 +252,59 @@ class SparkKubernetesApp private[utils] (
import KubernetesExtensions._
import SparkKubernetesApp._

appQueue.add(this)
private var killed = false
private val appPromise: Promise[KubernetesApplication] = Promise()
private[utils] var state: SparkApp.State = SparkApp.State.STARTING
private var kubernetesDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
private var kubernetesAppLog: IndexedSeq[String] = IndexedSeq.empty[String]

// Exposed for unit test.
// TODO Instead of spawning a thread for every session, create a centralized thread and
// batch Kubernetes queries.
private[utils] val kubernetesAppMonitorThread = Utils
.startDaemonThread(s"kubernetesAppMonitorThread-$this") {
private var kubernetesTagToAppIdFailedTimes: Int = _
private var kubernetesAppMonitorFailedTimes: Int = _

private def failToMonitor(): Unit = {
changeState(SparkApp.State.FAILED)
process.foreach(_.destroy())
leakedAppTags.put(appTag, System.currentTimeMillis())
}

private def failToGetAppId(): Unit = {
kubernetesTagToAppIdFailedTimes += 1
if (kubernetesTagToAppIdFailedTimes > appLookupMaxFailedTimes) {
val msg = "No KUBERNETES application is found with tag " +
s"${appTag.toLowerCase}. This may be because " +
"1) spark-submit fail to submit application to KUBERNETES; " +
"or 2) KUBERNETES cluster doesn't have enough resource to start the application in time. " +
"Please check Livy log and KUBERNETES log to know the details."

error(s"Failed monitoring the app $appTag: $msg")
kubernetesDiagnostics = ArrayBuffer(msg)
failToMonitor()
}
}

private def monitorSparkKubernetesApp(): Unit = {
try {
if (killed) {
changeState(SparkApp.State.KILLED)
} else if (isProcessErrExit) {
changeState(SparkApp.State.FAILED)
}
// Get KubernetesApplication by appTag.
val app: KubernetesApplication = try {
val appOption: Option[KubernetesApplication] = try {
getAppFromTag(appTag, pollInterval, appLookupTimeout.fromNow)
} catch {
case e: Exception =>
failToGetAppId()
appPromise.failure(e)
throw e
return
}
if (appOption.isEmpty) {
failToGetAppId()
return
}
appPromise.success(app)
val app: KubernetesApplication = appOption.get
appPromise.trySuccess(app)
val appId = app.getApplicationId

Thread.currentThread().setName(s"kubernetesAppMonitorThread-$appId")
Expand All @@ -192,7 +315,9 @@ class SparkKubernetesApp private[utils] (
}

var appInfo = AppInfo()
while (isRunning) {

// while loop is replaced with "if" condition so that another thread can process and continue
if (isRunning) {
try {
Clock.sleep(pollInterval.toMillis)

Expand Down Expand Up @@ -222,14 +347,22 @@ class SparkKubernetesApp private[utils] (
} catch {
// TODO analyse available exceptions
case e: Throwable =>
throw e
error(s"Failed to refresh application state for $appTag.", e)
}
}

kubernetesTagToAppIdFailedTimes = 0
kubernetesAppMonitorFailedTimes = 0
debug(s"$appId $state ${kubernetesDiagnostics.mkString(" ")}")
Thread.currentThread().setName(s"appMonitorCommonThreadPool")
} catch {
case _: InterruptedException =>
kubernetesDiagnostics = ArrayBuffer("Application stopped by user.")
changeState(SparkApp.State.KILLED)
case e: InterruptedException =>
kubernetesAppMonitorFailedTimes += 1
if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
error(s"Monitoring of the app $appTag was interrupted.", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
failToMonitor()
}
case NonFatal(e) =>
error(s"Error while refreshing Kubernetes state", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
Expand All @@ -250,18 +383,38 @@ class SparkKubernetesApp private[utils] (
("\nKubernetes Diagnostics: " +: kubernetesDiagnostics)

override def kill(): Unit = synchronized {
try {
withRetry(kubernetesClient.killApplication(Await.result(appPromise.future, appLookupTimeout)))
} catch {
// We cannot kill the Kubernetes app without the appTag.
// There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
// We don't want a stuck session that can't be deleted. Emit a warning and move on.
case _: TimeoutException | _: InterruptedException =>
warn("Deleting a session while its Kubernetes application is not found.")
kubernetesAppMonitorThread.interrupt()
} finally {
process.foreach(_.destroy())
killed = true

if (!isRunning) {
return
}

process.foreach(_.destroy())

def applicationDetails: Option[Try[KubernetesApplication]] = appPromise.future.value
if (applicationDetails.isEmpty) {
leakedAppTags.put(appTag, System.currentTimeMillis())
return
}
def kubernetesApplication: KubernetesApplication = applicationDetails.get.get
if (kubernetesApplication != null && kubernetesApplication.getApplicationId != null) {
try {
withRetry(kubernetesClient.killApplication(
Await.result(appPromise.future, appLookupTimeout)))
} catch {
// We cannot kill the Kubernetes app without the appTag.
// There's a chance the Kubernetes app hasn't been submitted during a livy-server failure.
// We don't want a stuck session that can't be deleted. Emit a warning and move on.
case _: TimeoutException | _: InterruptedException =>
warn("Deleting a session while its Kubernetes application is not found.")
}
} else {
leakedAppTags.put(appTag, System.currentTimeMillis())
}
}

private def isProcessErrExit: Boolean = {
process.isDefined && !process.get.isAlive && process.get.exitValue() != 0
}

private def isRunning: Boolean = {
Expand All @@ -282,18 +435,17 @@ class SparkKubernetesApp private[utils] (
*
* @param appTag The application tag tagged on the target application.
* If the tag is not unique, it returns the first application it found.
* @return KubernetesApplication or the failure.
* @return Option[KubernetesApplication] or the failure.
*/
@tailrec
private def getAppFromTag(
appTag: String,
pollInterval: duration.Duration,
deadline: Deadline): KubernetesApplication = {
deadline: Deadline): Option[KubernetesApplication] = {
import KubernetesExtensions._

withRetry(kubernetesClient.getApplications().find(_.getApplicationTag.contains(appTag)))
match {
case Some(app) => app
case Some(app) => Some(app)
case None =>
if (deadline.isOverdue) {
process.foreach(_.destroy())
Expand All @@ -307,10 +459,7 @@ class SparkKubernetesApp private[utils] (
throw new IllegalStateException(s"Failed to submit Kubernetes application with tag" +
s" $appTag. 'spark-submit' exited with non-zero status. " +
s"Please check Livy log and Kubernetes log to know the details.")
} else {
Clock.sleep(pollInterval.toMillis)
getAppFromTag(appTag, pollInterval, deadline)
}
} else None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,32 @@ import java.util.Objects._

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.networking.v1.{Ingress, IngressRule, IngressSpec}
import io.fabric8.kubernetes.client.KubernetesClient
import org.mockito.Mockito.when
import org.scalatest.FunSpec
import org.scalatest.{BeforeAndAfterAll, FunSpec}
import org.scalatestplus.mockito.MockitoSugar._

import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
import org.apache.livy.utils.KubernetesConstants.SPARK_APP_TAG_LABEL

class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite {
class SparkKubernetesAppSpec extends FunSpec with LivyBaseUnitTestSuite with BeforeAndAfterAll {

override def beforeAll(): Unit = {
super.beforeAll()
val livyConf = new LivyConf()
livyConf.set(LivyConf.KUBERNETES_POLL_INTERVAL, "500ms")
livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
livyConf.set(LivyConf.KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")

val client = mock[KubernetesClient]
SparkKubernetesApp.init(livyConf, Some(client))
SparkKubernetesApp.clearApps
}

override def afterAll(): Unit = {
super.afterAll()
assert(SparkKubernetesApp.getAppSize === 0)
}

describe("KubernetesAppReport") {
import scala.collection.JavaConverters._
Expand Down

0 comments on commit 6dcb294

Please sign in to comment.