From 947394350f46093378645077dc219f89d3aeece4 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 26 Aug 2019 15:04:04 +0800 Subject: [PATCH] [KYUUBI-213] Check and wait the old spark context completely stopped before create a new one (#214) * [KYUUBI-213] Check and wait the old spark context completely stopped before create a new one #213 * stop to not stop --- .../statement/ExecuteStatementInClientMode.scala | 11 +++++++---- .../kyuubi/spark/SparkSessionCacheManager.scala | 15 ++++++++++++++- .../kyuubi/spark/SparkSessionWithUGI.scala | 9 +++++++-- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala index 1538d5662e5..0af21cc6c14 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala @@ -26,9 +26,8 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil -import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSQLUtils} +import org.apache.spark.sql.{DataFrame, SparkSQLUtils} import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource} -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.{AddFileCommand, AddJarCommand, CreateFunctionCommand} import org.apache.spark.sql.types._ @@ -193,12 +192,14 @@ class ExecuteStatementInClientMode(session: KyuubiSession, statement: String, ru m.RUNNING_QUERIES.dec() m.TOTAL_QUERIES.inc() } - sparkSession.sparkContext.cancelJobGroup(statementId) } } override protected def onStatementError(id: String, message: String, trace: String): Unit = { super.onStatementError(id, message, trace) + if (!sparkSession.sparkContext.isStopped) { + sparkSession.sparkContext.cancelJobGroup(statementId) + } KyuubiServerMonitor.getListener(session.getUserName) .foreach(_.onStatementError(id, message, trace)) MetricsSystem.get.foreach(_.ERROR_QUERIES.inc) @@ -206,7 +207,9 @@ class ExecuteStatementInClientMode(session: KyuubiSession, statement: String, ru override protected def cleanup(state: OperationState) { super.cleanup(state) - sparkSession.sparkContext.cancelJobGroup(statementId) + if (!sparkSession.sparkContext.isStopped) { + sparkSession.sparkContext.cancelJobGroup(statementId) + } } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala index 8012d34b2b7..c307940d30c 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala @@ -22,8 +22,8 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit} import scala.collection.JavaConverters._ import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.spark.{KyuubiSparkUtil, SparkConf} import org.apache.spark.KyuubiConf._ -import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import yaooqinn.kyuubi.Logging @@ -53,6 +53,18 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam } } + /** + * This is a little bit hacky, Spark sometimes call the active context to get configuration and + * when the context stop, Spark will remove this active context, which results in Kyuubi we may + * encounter with `None.get()` exception. This is not a thread safe and complete solution for + * that issue, but it can significantly reduce the probability. + */ + private def resetActiveSparkContext(): Unit = { + userToSession.values().asScala.find(!_.isCrashed).foreach { ssc => + KyuubiSparkUtil.setActiveSparkContext(ssc.spark.sparkContext) + } + } + /** * Stop the idle [[SparkSession]] instance, then it can be cleared by the `sessionCleaner` or * when the user reconnecting action. @@ -62,6 +74,7 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam if (ssc.isIdle) { info(s"Stopping idle SparkSession for user [$user]") ssc.spark.stop() + resetActiveSparkContext() KyuubiServerMonitor.detachUITab(user) System.setProperty("SPARK_YARN_MODE", "true") } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index af424ac51ca..936ec3f90a1 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration.Duration import scala.util.Try import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext} +import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkEnv} import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil._ import org.apache.spark.sql.{SparkSession, SparkSQLUtils} @@ -53,7 +53,12 @@ class SparkSessionWithUGI( private lazy val newContext: Thread = { val threadName = "SparkContext-Starter-" + userName new Thread(threadName) { - override def run(): Unit = promisedSparkContext.complete(Try(new SparkContext(conf))) + override def run(): Unit = promisedSparkContext.complete(Try { + if (SparkEnv.get != null) { + Thread.sleep(2000) + } + new SparkContext(conf) + }) } }