Skip to content

Commit

Permalink
[LIVY-1003][RSC] Interactive session - Setting large value of rsc.ser…
Browse files Browse the repository at this point in the history
…ver.connect.timeout blocks other tasks

## What changes were proposed in this pull request?

The main adjustment here is the thread pool used when creating and closing sessions asynchronously. Scala's default thread pool size is limited, which will cause the waiting thread to be blocked.

https://issues.apache.org/jira/browse/LIVY-1003

## How was this patch tested?

How to reproduce:
1. Set `livy.rsc.server.connect.timeout` to something high like 24h.
2. Create enough interactive livy sessions in YARN so that they are queued in ACCEPTED state. The number of sessions that are stuck in ACCEPTED state should be equal to global execution context [thread pool size](https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context) (Runtime.availableProcessors)
3. Try to delete a session using DELETE /sessions/{sessionId}
and it should not be hang until one of the sessions is no longer stuck in ACCEPTED state.
  • Loading branch information
wangdengshan committed Sep 13, 2024
1 parent 6dcb294 commit 6097af1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
4 changes: 4 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ object LivyConf {
Entry("livy.server.thrift.async.exec.wait.queue.size", 100)
val THRIFT_ASYNC_EXEC_KEEPALIVE_TIME =
Entry("livy.server.thrift.async.exec.keepalive.time", "10s")
val SESSION_MANAGE_THREADS = Entry("livy.server.session.manage.threads", 200)
val SESSION_MANAGE_SHUTDOWN_TIMEOUT = Entry("livy.server.session.manage.shutdown.timeout", "10s")
val SESSION_MANAGE_WAIT_QUEUE_SIZE = Entry("livy.server.session.manage.wait.queue.size", 100)
val SESSION_MANAGE_KEEPALIVE_TIME = Entry("livy.server.session.manage.keepalive.time", "10s")
val THRIFT_BIND_HOST = Entry("livy.server.thrift.bind.host", null)
val THRIFT_WORKER_KEEPALIVE_TIME = Entry("livy.server.thrift.worker.keepalive.time", "60s")
val THRIFT_MIN_WORKER_THREADS = Entry("livy.server.thrift.min.worker.threads", 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,15 +476,20 @@ class InteractiveSession(
info(msg)
sessionLog = IndexedSeq(msg)
} else {
val uriFuture = Future { client.get.getServerUri.get() }
val uriFuture = Future {
client.get.getServerUri.get()
}(sessionManageExecutors)

uriFuture.onSuccess { case url =>
rscDriverUri = Option(url)
sessionSaveLock.synchronized {
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
}
}
uriFuture.onFailure { case e => warn("Fail to get rsc uri", e) }
}(sessionManageExecutors)

uriFuture.onFailure {
case e => warn("Fail to get rsc uri", e)
}(sessionManageExecutors)

// Send a dummy job that will return once the client is ready to be used, and set the
// state to "idle" at that point.
Expand Down Expand Up @@ -576,7 +581,7 @@ class InteractiveSession(
}
}

def interrupt(): Future[Unit] = {
def interrupt(): Future[AnyVal] = {
stop()
}

Expand Down
34 changes: 32 additions & 2 deletions server/src/main/scala/org/apache/livy/sessions/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.livy.sessions
import java.io.InputStream
import java.net.{URI, URISyntaxException}
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{Executors, LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit}
import java.util.UUID

import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -141,6 +142,16 @@ object Session {
}
}

class NamedThreadFactory(prefix: String) extends ThreadFactory {
private val defaultFactory = Executors.defaultThreadFactory()

override def newThread(r: Runnable): Thread = {
val thread = defaultFactory.newThread(r)
thread.setName(prefix + "-" + thread.getName)
thread
}
}

abstract class Session(
val id: Int,
val name: Option[String],
Expand All @@ -159,6 +170,25 @@ abstract class Session(

import Session._

protected val sessionManageExecutors: ExecutionContext = {
val poolSize = livyConf.getInt(LivyConf.SESSION_MANAGE_THREADS)
val poolQueueSize = livyConf.getInt(LivyConf.SESSION_MANAGE_WAIT_QUEUE_SIZE)
val keepAliveTime = livyConf.getTimeAsMs(
LivyConf.SESSION_MANAGE_KEEPALIVE_TIME) / 1000
debug(s"Background session manage executors with size=${poolSize}," +
s" wait queue size= ${poolQueueSize}, keepalive time ${keepAliveTime} seconds")
val queue = new LinkedBlockingQueue[Runnable](poolQueueSize)
val executor = new ThreadPoolExecutor(
poolSize,
poolSize,
keepAliveTime,
TimeUnit.SECONDS,
queue,
new NamedThreadFactory("LivyServer2-SessionManageExecutors"))
executor.allowCoreThreadTimeOut(true)
ExecutionContext.fromExecutorService(executor)
}

protected implicit val executionContext = ExecutionContext.global

// validate session name. The name should not be a number
Expand Down Expand Up @@ -202,7 +232,7 @@ abstract class Session(

def start(): Unit

def stop(): Future[Unit] = Future {
def stop(): Future[AnyVal] = Future {
try {
info(s"Stopping $this...")
stopSession()
Expand All @@ -228,7 +258,7 @@ abstract class Session(
case e: Exception =>
warn(s"Error cleaning up session $id staging dir.", e)
}
}
}(sessionManageExecutors)


override def toString(): String = s"${this.getClass.getSimpleName} $id"
Expand Down

0 comments on commit 6097af1

Please sign in to comment.