Skip to content

Commit

Permalink
[KYUUBI-213] Check and wait the old spark context completely stopped …
Browse files Browse the repository at this point in the history
…before create a new one (#214)

* [KYUUBI-213] Check and wait the old spark context completely stopped before create a new one #213

* stop to not stop
  • Loading branch information
yaooqinn committed Aug 26, 2019
1 parent 6621d36 commit 9473943
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSQLUtils}
import org.apache.spark.sql.{DataFrame, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{AddFileCommand, AddJarCommand, CreateFunctionCommand}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -193,20 +192,24 @@ class ExecuteStatementInClientMode(session: KyuubiSession, statement: String, ru
m.RUNNING_QUERIES.dec()
m.TOTAL_QUERIES.inc()
}
sparkSession.sparkContext.cancelJobGroup(statementId)
}
}

override protected def onStatementError(id: String, message: String, trace: String): Unit = {
super.onStatementError(id, message, trace)
if (!sparkSession.sparkContext.isStopped) {
sparkSession.sparkContext.cancelJobGroup(statementId)
}
KyuubiServerMonitor.getListener(session.getUserName)
.foreach(_.onStatementError(id, message, trace))
MetricsSystem.get.foreach(_.ERROR_QUERIES.inc)
}

override protected def cleanup(state: OperationState) {
super.cleanup(state)
sparkSession.sparkContext.cancelJobGroup(statementId)
if (!sparkSession.sparkContext.isStopped) {
sparkSession.sparkContext.cancelJobGroup(statementId)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import scala.collection.JavaConverters._

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import yaooqinn.kyuubi.Logging
Expand Down Expand Up @@ -53,6 +53,18 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
}
}

/**
* This is a little bit hacky, Spark sometimes call the active context to get configuration and
* when the context stop, Spark will remove this active context, which results in Kyuubi we may
* encounter with `None.get()` exception. This is not a thread safe and complete solution for
* that issue, but it can significantly reduce the probability.
*/
private def resetActiveSparkContext(): Unit = {
userToSession.values().asScala.find(!_.isCrashed).foreach { ssc =>
KyuubiSparkUtil.setActiveSparkContext(ssc.spark.sparkContext)
}
}

/**
* Stop the idle [[SparkSession]] instance, then it can be cleared by the `sessionCleaner` or
* when the user reconnecting action.
Expand All @@ -62,6 +74,7 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
if (ssc.isIdle) {
info(s"Stopping idle SparkSession for user [$user]")
ssc.spark.stop()
resetActiveSparkContext()
KyuubiServerMonitor.detachUITab(user)
System.setProperty("SPARK_YARN_MODE", "true")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext}
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.sql.{SparkSession, SparkSQLUtils}
Expand Down Expand Up @@ -53,7 +53,12 @@ class SparkSessionWithUGI(
private lazy val newContext: Thread = {
val threadName = "SparkContext-Starter-" + userName
new Thread(threadName) {
override def run(): Unit = promisedSparkContext.complete(Try(new SparkContext(conf)))
override def run(): Unit = promisedSparkContext.complete(Try {
if (SparkEnv.get != null) {
Thread.sleep(2000)
}
new SparkContext(conf)
})
}
}

Expand Down

0 comments on commit 9473943

Please sign in to comment.