diff --git a/docs/configurations.md b/docs/configurations.md index 9a30f8924d7..486aa35e477 100644 --- a/docs/configurations.md +++ b/docs/configurations.md @@ -85,10 +85,9 @@ spark.kyuubi.
backend.session.wait.other.times | 60 | How many times to che spark.kyuubi.
backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation. spark.kyuubi.
backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext. spark.kyuubi.
backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout. -spark.kyuubi.
backend.session.idle.timeout|30min|SparkSession timeout. +spark.kyuubi.
backend.session.idle.timeout|30min|How long the SparkSession instance will be cached after user logout. Using cached SparkSession can significantly cut the startup time for SparkContext, which makes sense for queries that are short lived. The timeout is calculated from when all sessions of the user are disconnected +spark.kyuubi.
backend.session.max.cache.time|5d|Max cache time for a SparkSession instance when its original copy has been created. When `spark.kyuubi.backend.session.idle.timeout` never is reached for user may continuously run queries, we need this configuration to stop the cached SparkSession which may end up with token expiry issue in kerberized clusters. When in the interval of [t, t * 1.25], we will try to stop the SparkSession gracefully util no connections. But once it fails stop in that region, we will force to stop it spark.kyuubi.
backend.session.local.dir|KYUUBI_HOME/
local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`. -spark.kyuubi.
backend.session.long.cache|${UserGroupInformation.
isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles. -spark.kyuubi.
backend.session.token.update.class|org.apache.spark.
scheduler.cluster.
CoarseGrainedClusterMessages$
UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later) #### Operation diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index ba733e9a3ea..cb3549e0f7b 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -280,10 +280,24 @@ object KyuubiConf { val BACKEND_SESSION_IDLE_TIMEOUT: ConfigEntry[Long] = KyuubiConfigBuilder("spark.kyuubi.backend.session.idle.timeout") - .doc("SparkSession timeout") + .doc("How long the SparkSession instance will be cached after user logout. Using cached" + + " SparkSession can significantly cut the startup time for SparkContext, which makes" + + " sense for queries that are short lived. The timeout is calculated from when all" + + " sessions of the user are disconnected") .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(30L)) + val BACKEND_SESSION_MAX_CACHE_TIME: ConfigEntry[Long] = + KyuubiConfigBuilder("spark.kyuubi.backend.session.max.cache.time") + .doc("Max cache time for a SparkSession instance when its original copy has been created." + + " When `spark.kyuubi.backend.session.idle.timeout` never is reached for user may" + + " continuously run queries, we need this configuration to stop the cached SparkSession" + + " which may end up with token expiry issue in kerberized clusters. When in the interval" + + " of [t, t * 1.25], we will try to stop the SparkSession gracefully util no connections." + + " But once it fails stop in that region, we will force to stop it") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(TimeUnit.DAYS.toMillis(5)) + val BACKEND_SESSION_LOCAL_DIR: ConfigEntry[String] = KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir") .doc("Default value to set `spark.local.dir`, for YARN mode, this only affect the Kyuubi" + @@ -293,24 +307,6 @@ object KyuubiConf { s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}" + File.separator + "local") - val BACKEND_SESSION_LONG_CACHE: ConfigEntry[Boolean] = - KyuubiConfigBuilder("spark.kyuubi.backend.session.long.cache") - .doc("Whether to update the tokens of Spark's executor to support long caching" + - " SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is" + - " loadable. This is used towards kerberized hadoop clusters in case of" + - " `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time" + - " limit or SparkSession never idles.") - .booleanConf - .createWithDefault(UserGroupInformation.isSecurityEnabled) - - val BACKEND_SESSION_TOKEN_UPDATE_CLASS: ConfigEntry[String] = - KyuubiConfigBuilder("spark.kyuubi.backend.session.token.update.class") - .doc("`CoarseGrainedClusterMessages` for token update message from the driver of Spark to" + - " executors, it is loadable only by higher version Spark release(2.3 and later)") - .stringConf - .createWithDefault( - "org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$UpdateDelegationTokens") - ///////////////////////////////////////////////////////////////////////////////////////////////// // Authentication // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index 4bbac8e87fe..f20dd7ee99c 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -269,11 +269,6 @@ object KyuubiSparkUtil extends Logging { loader } - /** Determines whether the provided class is loadable in the current thread. */ - def classIsLoadable(clazz: String): Boolean = { - Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess - } - /** * Generate proper configurations before server starts * @param conf the default [[SparkConf]] diff --git a/kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala b/kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala deleted file mode 100644 index b34ad97fccf..00000000000 --- a/kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import java.io.{ByteArrayOutputStream, DataOutputStream} - -import scala.collection.mutable -import scala.util.control.NonFatal - -import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.spark.KyuubiConf._ -import org.apache.spark.SparkContext - -import yaooqinn.kyuubi.Logging -import yaooqinn.kyuubi.utils.ReflectUtils._ - -/** - * Tool for methods used for Kyuubi to talking to Spark Executors - */ -object KyuubiSparkExecutorUtils extends Logging { - - /** - * Populate the tokens contained in the current KyuubiSession's ugi to the all the alive - * executors associated with its own SparkContext. - * - * @param sc The SparkContext with its runtime environment which contains all the executors, - * associated with the current KyuubiSession and UserGroupInformation. - * @param user the UserGroupInformation associated with the current KyuubiSession - */ - def populateTokens(sc: SparkContext, user: UserGroupInformation): Unit = { - populateTokens(sc, user.getCredentials) - } - - /** - * Populate the tokens contained in the current KyuubiSession's ugi to the all the alive - * executors associated with its own SparkContext. - */ - def populateTokens(sc: SparkContext, creds: Credentials): Unit = { - val schedulerBackend = sc.schedulerBackend - schedulerBackend match { - case backend: CoarseGrainedSchedulerBackend => - try { - val byteStream = new ByteArrayOutputStream - val dataStream = new DataOutputStream(byteStream) - creds.writeTokenStorageToStream(dataStream) - val tokens = byteStream.toByteArray - val executorField = - classOf[CoarseGrainedSchedulerBackend].getName.replace('.', '$') + "$$executorDataMap" - val executors = backend match { - case _: YarnClientSchedulerBackend | _: YarnClusterSchedulerBackend | - _: StandaloneSchedulerBackend => - getAncestorField(backend, 2, executorField) - case _ => getFieldValue(backend, executorField) - } - val msg = newInstance(sc.conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS), - Seq(classOf[Array[Byte]]), Seq(tokens)) - executors.asInstanceOf[mutable.HashMap[String, ExecutorData]] - .values.foreach(_.executorEndpoint.send(msg)) - } catch { - case NonFatal(e) => warn(s"Failed to populate secured tokens to executors", e) - } - case _ => - } - } -} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index dc5d7c0e02b..d0bce0f5663 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil -import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.parser.ParseException @@ -376,10 +375,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging _.onStatementParsed(statementId, result.queryExecution.toString()) } - if (conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean && - KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))) { - KyuubiSparkExecutorUtils.populateTokens(sparkSession.sparkContext, session.ugi) - } debug(result.queryExecution.toString()) iter = if (incrementalCollect) { val numParts = result.rdd.getNumPartitions diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCache.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCache.scala new file mode 100644 index 00000000000..3eb4dc2297c --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCache.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.spark + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.KyuubiConf._ +import org.apache.spark.KyuubiSparkUtil +import org.apache.spark.sql.SparkSession + +/** + * A recorder for how many client sessions have been cloned by the original [[SparkSession]], which + * helps the [[SparkSessionCacheManager]] cache and recycle [[SparkSession]] instances. + * + * @param spark the original [[SparkSession]] instances + * @param times times of the original [[SparkSession]] instance has been cloned, start from 1 + * @param initTime Start time of the SparkSession + */ +private[spark] +class SparkSessionCache private(val spark: SparkSession, times: AtomicInteger, initTime: Long) { + + private val maxCacheTime = + KyuubiSparkUtil.timeStringAsMs(spark.conf.get(BACKEND_SESSION_MAX_CACHE_TIME)) + + private val idleTimeout: Long = + KyuubiSparkUtil.timeStringAsMs(spark.conf.get(BACKEND_SESSION_IDLE_TIMEOUT)) + + @volatile + private var latestLogout: Long = Long.MaxValue + + def updateLogoutTime(time: Long): Unit = latestLogout = time + + /** + * When all connections are disconnected and idle timeout reached is since the user last time + * logout. + * + */ + def isIdle: Boolean = { + times.get <= 0 && System.currentTimeMillis - latestLogout > idleTimeout + } + + /** + * Whether the cached [[SparkSession]] instance is already stopped. + */ + def isCrashed: Boolean = spark.sparkContext.isStopped + + /** + * If the last time is between [maxCacheTime, maxCacheTime * 1.25], we will try to stop this + * SparkSession only when all connection are disconnected. + * If the last time is above maxCacheTime * 1.25, we will stop this SparkSession whether it has + * connections linked or jobs running with. + * + */ + def isExpired: Boolean = { + val now = System.currentTimeMillis + (now - initTime >= maxCacheTime && times.get <= 0 ) || (now - initTime > maxCacheTime * 1.25) + } + + def needClear: Boolean = isCrashed || isExpired + + def getReuseTimes: Int = times.get() + + def incReuseTimeAndGet: Int = times.incrementAndGet() + + def decReuseTimeAndGet: Int = times.decrementAndGet() +} + +object SparkSessionCache { + def init(spark: SparkSession): SparkSessionCache = + new SparkSessionCache(spark, new AtomicInteger(1), System.currentTimeMillis) +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala index 5b617a610c5..c109a426795 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManager.scala @@ -17,24 +17,22 @@ package yaooqinn.kyuubi.spark -import java.text.SimpleDateFormat -import java.util.Date import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit} -import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.hadoop.security.Credentials -import org.apache.spark.{KyuubiSparkUtil, SparkConf} import org.apache.spark.KyuubiConf._ -import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils +import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.service.AbstractService import yaooqinn.kyuubi.ui.KyuubiServerMonitor +/** + * Manager for cached [[SparkSession]]s. + */ class SparkSessionCacheManager private(name: String) extends AbstractService(name) with Logging { def this() = this(classOf[SparkSessionCacheManager].getSimpleName) @@ -44,87 +42,81 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam new ThreadFactoryBuilder() .setDaemon(true).setNameFormat(getClass.getSimpleName + "-%d").build()) - private val userToSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)] - private val userLatestLogout = new ConcurrentHashMap[String, Long] - private var idleTimeout: Long = _ - - private val userToCredentials = new ConcurrentHashMap[String, Credentials] - private var needPopulateToken: Boolean = _ - + private val userToSession = new ConcurrentHashMap[String, SparkSessionCache] private val sessionCleaner = new Runnable { override def run(): Unit = { userToSession.asScala.foreach { - case (user, (session, _)) if session.sparkContext.isStopped => - warn(s"SparkSession for $user might already be stopped outside Kyuubi," + - s" cleaning it..") - removeSparkSession(user) - case (user, (session, times)) if times.get() > 0 || !userLatestLogout.containsKey(user) => - debug(s"There are $times active connection(s) bound to the SparkSession instance" + - s" of $user ") - if (needPopulateToken) { - val credentials = userToCredentials.getOrDefault(user, new Credentials) - KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials) - } - case (user, (session, _)) - if userLatestLogout.get(user) + idleTimeout <= System.currentTimeMillis() => - info(s"Stopping idle SparkSession for user [$user].") - removeSparkSession(user) - session.stop() - System.setProperty("SPARK_YARN_MODE", "true") - case (user, (session, _)) => - if (needPopulateToken) { - val credentials = userToCredentials.getOrDefault(user, new Credentials) - KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials) - } + case (user, ssc) if ssc.isCrashed => removeSparkSession(user, doCheck = true) + case (user, ssc) => tryStopIdleCache(user, ssc) } } } - private def removeSparkSession(user: String): Unit = { - Option(userLatestLogout.remove(user)) match { - case Some(t) => info(s"User [$user] last time logout at " + - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(t))) - case _ => + /** + * Stop the idle [[SparkSession]] instance, then it can be cleared by the `sessionCleaner` or + * when the user reconnecting action. + * + */ + private def tryStopIdleCache(user: String, ssc: SparkSessionCache): Unit = this.synchronized { + if (ssc.isIdle) { + info(s"Stopping idle SparkSession for user [$user]") + ssc.spark.stop() + KyuubiServerMonitor.detachUITab(user) + System.setProperty("SPARK_YARN_MODE", "true") + } + } + + private def removeSparkSession(user: String, doCheck: Boolean = false): Unit = this.synchronized { + if (doCheck) { + // if we do remove SparkSession in sessionCleaner thread, we should double check whether the + // SparkSessionCache is removed or not recreated. + val cache = userToSession.get(user) + if(cache != null && cache.isCrashed) { + info(s"Cleaning stopped SparkSession for user [$user]") + userToSession.remove(user) + KyuubiServerMonitor.detachUITab(user) + } + } else { + val cache = userToSession.remove(user) + cache.spark.stop() + KyuubiServerMonitor.detachUITab(user) + System.setProperty("SPARK_YARN_MODE", "true") } - userToSession.remove(user) - KyuubiServerMonitor.detachUITab(user) } def set(user: String, sparkSession: SparkSession): Unit = { - userToSession.put(user, (sparkSession, new AtomicInteger(1))) + val sessionCache = SparkSessionCache.init(sparkSession) + userToSession.put(user, sessionCache) } - def getAndIncrease(user: String): Option[SparkSession] = { + def getAndIncrease(user: String): Option[SparkSessionCache] = this.synchronized { Option(userToSession.get(user)) match { - case Some((ss, times)) if !ss.sparkContext.isStopped => - val currentTime = times.incrementAndGet() + case Some(ssc) if ssc.needClear => + removeSparkSession(user) + info(s"SparkSession for [$user] needs to be cleared, will create a new one") + None + case Some(ssc) if !ssc.needClear => + val currentTime = ssc.incReuseTimeAndGet info(s"SparkSession for [$user] is reused for $currentTime time(s) after + 1") - Some(ss) + Some(ssc) case _ => - info(s"SparkSession for [$user] isn't cached, will create a new one.") + info(s"SparkSession for [$user] isn't cached, will create a new one") None } } def decrease(user: String): Unit = { Option(userToSession.get(user)) match { - case Some((ss, times)) if !ss.sparkContext.isStopped => - userLatestLogout.put(user, System.currentTimeMillis()) - val currentTime = times.decrementAndGet() + case Some(ssc) => + ssc.updateLogoutTime(System.currentTimeMillis) + val currentTime = ssc.decReuseTimeAndGet info(s"SparkSession for [$user] is reused for $currentTime time(s) after - 1") case _ => warn(s"SparkSession for [$user] was not found in the cache.") } } - def setupCredentials(user: String, creds: Credentials): Unit = { - userToCredentials.put(user, creds) - } - override def init(conf: SparkConf): Unit = { - idleTimeout = math.max(conf.getTimeAsMs(BACKEND_SESSION_IDLE_TIMEOUT.key), 60 * 1000) - needPopulateToken = conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean && - KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS)) super.init(conf) } @@ -132,7 +124,7 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam * Periodically close idle SparkSessions in 'spark.kyuubi.session.clean.interval(default 1min)' */ override def start(): Unit = { - val interval = math.max(conf.getTimeAsSeconds(BACKEND_SESSION_CHECK_INTERVAL.key), 1) + val interval = math.max(conf.getTimeAsSeconds(BACKEND_SESSION_CHECK_INTERVAL), 1) info(s"Scheduling SparkSession cache cleaning every $interval seconds") cacheManager.scheduleAtFixedRate(sessionCleaner, interval, interval, TimeUnit.SECONDS) super.start() @@ -141,7 +133,7 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam override def stop(): Unit = { info("Stopping SparkSession Cache Manager") cacheManager.shutdown() - userToSession.asScala.values.foreach { kv => kv._1.stop() } + userToSession.asScala.values.foreach(_.spark.stop()) userToSession.clear() super.stop() } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index 75420e3e30c..d2fe59596df 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -40,7 +40,7 @@ import yaooqinn.kyuubi.utils.{KyuubiHadoopUtil, ReflectUtils} class SparkSessionWithUGI( user: UserGroupInformation, conf: SparkConf, - cache: SparkSessionCacheManager) extends Logging { + cacheMgr: SparkSessionCacheManager) extends Logging { import SparkSessionWithUGI._ private var _sparkSession: SparkSession = _ @@ -142,9 +142,9 @@ class SparkSessionWithUGI( SPARK_INSTANTIATION_LOCK.wait(interval) } - cache.getAndIncrease(userName) match { - case Some(ss) => - _sparkSession = ss.newSession() + cacheMgr.getAndIncrease(userName) match { + case Some(ssc) => + _sparkSession = ssc.spark.newSession() configureSparkSession(sessionConf) case _ => setPartiallyConstructed(userName) @@ -171,7 +171,7 @@ class SparkSessionWithUGI( Seq(classOf[SparkContext]), Seq(context)).asInstanceOf[SparkSession] } - cache.set(userName, _sparkSession) + cacheMgr.set(userName, _sparkSession) } catch { case e: Exception => if (conf.getOption("spark.master").contains("yarn")) { @@ -207,8 +207,6 @@ class SparkSessionWithUGI( def init(sessionConf: Map[String, String]): Unit = { getOrCreate(sessionConf) - cache.setupCredentials(userName, user.getCredentials) - try { initialDatabase.foreach { db => KyuubiHadoopUtil.doAs(user) { @@ -217,7 +215,7 @@ class SparkSessionWithUGI( } } catch { case e: Exception => - cache.decrease(userName) + cacheMgr.decrease(userName) throw findCause(e) } diff --git a/kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala deleted file mode 100644 index 705ef05229d..00000000000 --- a/kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import scala.collection.mutable - -import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.KyuubiConf._ -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.scalatest.BeforeAndAfterEach - -import yaooqinn.kyuubi.utils.ReflectUtils -import yaooqinn.kyuubi.Logging - -class KyuubiSparkExecutorUtilsSuite - extends SparkFunSuite with BeforeAndAfterEach { - import KyuubiSparkUtil._ - - val conf: SparkConf = new SparkConf(true) - .setAppName(this.getClass.getSimpleName) - .setMaster("local") - KyuubiSparkUtil.setupCommonConfig(conf) - - var sc: SparkContext = _ - - override def beforeEach(): Unit = { - sc = new SparkContext(conf) - super.beforeEach() - } - - override def afterEach(): Unit = { - if (sc != null) { - sc.stop() - } - super.afterEach() - } - - test("logging") { - assert(KyuubiSparkExecutorUtils.isInstanceOf[Logging]) - } - - test("populate tokens for non CoarseGrainedSchedulerBackend") { - val taskSchedulerImpl = new TaskSchedulerImpl(sc) - val backend = new LocalSchedulerBackend(conf, taskSchedulerImpl, 1) - ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend) - val user = UserGroupInformation.getCurrentUser - KyuubiSparkExecutorUtils.populateTokens(sc, user) - } - - test("populate tokens for CoarseGrainedSchedulerBackend") { - val taskSchedulerImpl = new TaskSchedulerImpl(sc) - val backend = new CoarseGrainedSchedulerBackend(taskSchedulerImpl, sc.env.rpcEnv) - ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend) - val user = UserGroupInformation.getCurrentUser - KyuubiSparkExecutorUtils.populateTokens(sc, user) - } - - test("populate tokens for YarnClientSchedulerBackend") { - val taskSchedulerImpl = new TaskSchedulerImpl(sc) - val backend = new YarnClientSchedulerBackend(taskSchedulerImpl, sc) - ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend) - val user = UserGroupInformation.getCurrentUser - KyuubiSparkExecutorUtils.populateTokens(sc, user) - } - - test("populate tokens for YarnClusterSchedulerBackend") { - val taskSchedulerImpl = new TaskSchedulerImpl(sc) - val backend = new YarnClusterSchedulerBackend(taskSchedulerImpl, sc) - ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend) - val user = UserGroupInformation.getCurrentUser - KyuubiSparkExecutorUtils.populateTokens(sc, user) - } - - test("populate tokens for StandaloneSchedulerBackend") { - val taskSchedulerImpl = new TaskSchedulerImpl(sc) - val backend = new StandaloneSchedulerBackend(taskSchedulerImpl, sc, null) - ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend) - val user = UserGroupInformation.getCurrentUser - KyuubiSparkExecutorUtils.populateTokens(sc, user) - } - - test("get executor data map") { - val taskSchedulerImpl = new TaskSchedulerImpl(sc) - val backend = new CoarseGrainedSchedulerBackend(taskSchedulerImpl, sc.env.rpcEnv) - val executorDataMap = ReflectUtils.getFieldValue(backend, - "org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$executorDataMap") - assert(executorDataMap.asInstanceOf[mutable.HashMap[String, ExecutorData]].values.isEmpty) - sc.stop() - } - - test("create update token class via reflection") { - val className = conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS) - assert(classIsLoadable(className) === - (majorVersion(SPARK_VERSION) == 2 && minorVersion(SPARK_VERSION) >= 3)) - - if (classIsLoadable(className)) { - val tokens1 = Array(0.toByte) - val tokens2 = Array(1, 2, 3, 4).map(_.toByte) - val msg1 = ReflectUtils.newInstance(className, Seq(classOf[Array[Byte]]), Seq(tokens1)) - assert(ReflectUtils.getFieldValue(msg1, "tokens") === tokens1) - val msg2 = ReflectUtils.newInstance(className, Seq(classOf[Array[Byte]]), Seq(tokens2)) - assert(ReflectUtils.getFieldValue(msg2, "tokens") === tokens2) - } - } -} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala index db10154edc1..bb2a0b12474 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/BackendServiceSuite.scala @@ -62,13 +62,6 @@ class BackendServiceSuite extends SparkFunSuite { "", "localhost", Map.empty) - val session2 = backendService.openSession( - TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8, - user, - "", - "localhost", - Map(OPERATION_RESULT_LIMIT.key -> "1") - ) assert( backendService.getInfo( session, GetInfoType.SERVER_NAME).toTGetInfoValue.getStringValue === "Kyuubi Server") @@ -85,7 +78,6 @@ class BackendServiceSuite extends SparkFunSuite { val showTables = "show tables" val op1 = backendService.executeStatement(session, showTables) val op2 = backendService.executeStatementAsync(session, "show databases") - val op3 = backendService.executeStatement(session, showTables) val e1 = intercept[KyuubiSQLException](backendService.getTypeInfo(session)) assert(e1.toTStatus.getErrorMessage === "Method Not Implemented!") val e2 = intercept[KyuubiSQLException](backendService.getCatalogs(session)) @@ -99,7 +91,6 @@ class BackendServiceSuite extends SparkFunSuite { assert(backendService.getOperationStatus(op1).getState === RUNNING) assert(backendService.getOperationStatus(op2).getState === RUNNING) - assert(backendService.getOperationStatus(op3).getState === RUNNING) assert(backendService.getResultSetMetadata(op1).head.name === "Result") backendService.cancelOperation(op1) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManagerSuite.scala index b77c21efc3c..8bb8f1fe2a1 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheManagerSuite.scala @@ -18,17 +18,19 @@ package yaooqinn.kyuubi.spark import org.apache.spark._ -import org.apache.spark.KyuubiConf._ import org.apache.spark.sql.SparkSession -import org.mockito.Mockito._ +import org.mockito.Mockito.when import org.scalatest.Matchers import org.scalatest.mock.MockitoSugar import yaooqinn.kyuubi.service.State -import yaooqinn.kyuubi.utils.ReflectUtils class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers with MockitoSugar { + override def afterAll(): Unit = { + System.clearProperty("SPARK_YARN_MODE") + super.afterAll() + } test("new cache") { val cache = new SparkSessionCacheManager() cache.getStartTime should be(0) @@ -39,76 +41,94 @@ class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers with Moc } test("init cache") { - val cache = new SparkSessionCacheManager() - val conf = new SparkConf() - KyuubiSparkUtil.setupCommonConfig(conf) - cache.init(conf) - cache.getStartTime should be(0) - cache.getConf should be(conf) - cache.getServiceState should be(State.INITED) - cache.stop() + withCacheMgrInit { (cache, conf) => + cache.getStartTime should be(0) + cache.getConf should be(conf) + cache.getServiceState should be(State.INITED) + } } test("start cache") { + withCacheMgrStarted { (cache, conf) => + cache.getStartTime / 1000 should be(System.currentTimeMillis() / 1000) + cache.getConf should be(conf) + cache.getServiceState should be(State.STARTED) + } + } + + test("stop cache") { + withCacheMgrStarted { (cache, conf) => + cache.stop() + cache.getConf should be(conf) + cache.getServiceState should be(State.STOPPED) + } + } + + test("spark session cache should be null if max cache time reached") { + withCacheMgrStarted { (cache, conf) => + val session = SparkSession.builder().config(conf).getOrCreate() + val userName = KyuubiSparkUtil.getCurrentUserName + cache.set(userName, session) + assert(cache.getAndIncrease(userName).nonEmpty) + cache.decrease(userName) + Thread.sleep(10000) + val maybeCache2 = cache.getAndIncrease(userName) + assert(maybeCache2.isEmpty, s"reason, crash ${maybeCache2.map(_.isCrashed).mkString}") + session.stop() + } + } + + test("spark session cleaner thread test") { + withCacheMgrStarted { (cacheManager, conf) => + val session = SparkSession.builder().config(conf).getOrCreate() + val ss1 = mock[SparkSession] + val sc1 = mock[SparkContext] + + when(ss1.sparkContext).thenReturn(sc1) + when(sc1.isStopped).thenReturn(true) + when(ss1.conf).thenReturn(session.conf) + + cacheManager.set("alice", ss1) + cacheManager.set("bob", session) + val latestLogout = System.currentTimeMillis() - 2 * KyuubiSparkUtil.timeStringAsMs( + conf.get(KyuubiConf.BACKEND_SESSION_IDLE_TIMEOUT.key)) + cacheManager.getAndIncrease("bob").foreach { c => + c.updateLogoutTime(latestLogout) + c.decReuseTimeAndGet + c.decReuseTimeAndGet + } + Thread.sleep(1000) + assert(cacheManager.getAndIncrease("alice").isEmpty) + assert(cacheManager.getAndIncrease("bob").isEmpty) + assert(cacheManager.getAndIncrease("tom").isEmpty) + } + } + + def withCacheMgrInit(f: (SparkSessionCacheManager, SparkConf) => Unit): Unit = { val cache = new SparkSessionCacheManager() val conf = new SparkConf() - conf.set(BACKEND_SESSION_LONG_CACHE, "true") - KyuubiSparkUtil.setupCommonConfig(conf) - cache.init(conf) - cache.start() - cache.getStartTime / 100 should be(System.currentTimeMillis() / 100) - cache.getConf should be(conf) - cache.getServiceState should be(State.STARTED) - val ss = mock[SparkSession] - val userName = KyuubiSparkUtil.getCurrentUserName - val sc = mock[SparkContext] - when(ss.sparkContext).thenReturn(sc) - - cache.decrease(userName) // None - cache.getAndIncrease(userName) // None - - cache.set(userName, ss) - when(sc.isStopped).thenReturn(false) - cache.getAndIncrease(userName) - - when(sc.isStopped).thenReturn(true) - cache.getAndIncrease(userName) - - when(sc.isStopped).thenReturn(false) - cache.decrease(userName) - when(sc.isStopped).thenReturn(true) - cache.decrease(userName) - - Thread.sleep(2000) - val field = cache.getClass.getDeclaredField("sessionCleaner") - field.setAccessible(true) - - when(sc.isStopped).thenReturn(false) - val runnable = field.get(cache).asInstanceOf[Runnable] - runnable.run() // > 0 - - when(sc.isStopped).thenReturn(false) - cache.decrease(userName) - when(sc.isStopped).thenReturn(false) - runnable.run() // not expiry - when(sc.isStopped).thenReturn(false) - ReflectUtils.setFieldValue( - cache, "yaooqinn$kyuubi$spark$SparkSessionCacheManager$$idleTimeout", 0) - runnable.run() - System.clearProperty("SPARK_YARN_MODE") - cache.stop() + try { + cache.init(conf) + f(cache, conf) + } finally { + cache.stop() + } } - test("stop cache") { + def withCacheMgrStarted(f: (SparkSessionCacheManager, SparkConf) => Unit): Unit = { val cache = new SparkSessionCacheManager() - val conf = new SparkConf().set(KyuubiConf.BACKEND_SESSION_CHECK_INTERVAL.key, "1s") + val conf = new SparkConf().setMaster("local") + .set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "5s") + .set(KyuubiConf.BACKEND_SESSION_CHECK_INTERVAL.key, "50ms") KyuubiSparkUtil.setupCommonConfig(conf) - cache.init(conf) - cache.start() - Thread.sleep(2000) - cache.stop() - cache.getConf should be(conf) - cache.getServiceState should be(State.STOPPED) + + try { + cache.init(conf) + cache.start() + f(cache, conf) + } finally { + cache.stop() + } } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala new file mode 100644 index 00000000000..6fd60e1a212 --- /dev/null +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionCacheSuite.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.spark + +import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.sql.SparkSession + +class SparkSessionCacheSuite extends SparkFunSuite { + private val conf = new SparkConf() + .setMaster("local") + .set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s") + KyuubiSparkUtil.setupCommonConfig(conf) + private val spark = SparkSession.builder().config(conf).getOrCreate() + + override def afterAll(): Unit = { + spark.stop() + super.afterAll() + } + + test("spark session cache") { + val cache = SparkSessionCache.init(spark) + assert(!cache.isCrashed) + assert(!cache.isIdle) + assert(!cache.needClear, s"cache status [crash:${cache.isCrashed}, expired:${cache.isExpired}]") + assert(!cache.isExpired) + assert(cache.spark === spark) + assert(cache.getReuseTimes === 1) + assert(cache.incReuseTimeAndGet === 2) + assert(cache.decReuseTimeAndGet === 1) + val expired = cache.isExpired + assert(!expired) + Thread.sleep(10000) + assert(cache.decReuseTimeAndGet === 0) + assert(cache.isExpired) + assert(cache.needClear) + } + + test("cache status idled") { + val cache = SparkSessionCache.init(spark) + assert(!cache.isIdle, "cache is not idled, reuse time = 1") + cache.decReuseTimeAndGet + assert(!cache.isIdle, "cache is not idled, reuse time = 0, but latest logout is unset") + val latestLogout = System.currentTimeMillis() - 2 * KyuubiSparkUtil.timeStringAsMs( + spark.conf.get(KyuubiConf.BACKEND_SESSION_IDLE_TIMEOUT.key)) + cache.updateLogoutTime(latestLogout) + assert(cache.isIdle, "cache is idled, reuse time = 0, idle timeout exceeded") + cache.incReuseTimeAndGet + assert(!cache.isIdle, "cache is not idled, idle timeout exceeded however reuse time = 1") + } +}