Skip to content

Commit

Permalink
[KYUUBI-186]Add a max cache time to clean SparkSessions that may have…
Browse files Browse the repository at this point in the history
… token expiry issue

---

---

fix #186 fix #187

---
Squashed commit of the following:

commit 4b8297f
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Mon May 20 19:36:34 2019 +0800

    add log

commit 7b811f8
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Thu May 16 16:46:04 2019 +0800

    add ut

commit 19f67fa
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Thu May 16 10:57:24 2019 +0800

    fix ut

commit 7ad7c20
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Thu May 16 10:40:42 2019 +0800

    fix ut

commit 9a114ab
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Thu May 16 00:10:57 2019 +0800

    mv cache validating in its own class

commit 0afba5a
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Wed May 15 11:47:42 2019 +0800

    fix ut

commit eff3f41
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Wed May 15 11:12:51 2019 +0800

    add ut

commit 9bbbea7
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Tue May 14 23:17:00 2019 +0800

    add ut

commit 0e0e59e
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Tue May 14 19:39:09 2019 +0800

    mv init time to spark session cache

commit a4a1c69
Author: Kent Yao <yaooqinn@hotmail.com>
Date:   Tue May 14 18:36:06 2019 +0800

    Add a max cache time to clean SparkSessions that may have token expiry issue fix #186
  • Loading branch information
yaooqinn committed May 21, 2019
1 parent d7dd005 commit ab7dde8
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 375 deletions.
5 changes: 2 additions & 3 deletions docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ spark.kyuubi.<br />backend.session.wait.other.times | 60 | How many times to che
spark.kyuubi.<br />backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
spark.kyuubi.<br />backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
spark.kyuubi.<br />backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
spark.kyuubi.<br />backend.session.idle.timeout|30min|SparkSession timeout.
spark.kyuubi.<br />backend.session.idle.timeout|30min|How long the SparkSession instance will be cached after user logout. Using cached SparkSession can significantly cut the startup time for SparkContext, which makes sense for queries that are short lived. The timeout is calculated from when all sessions of the user are disconnected
spark.kyuubi.<br />backend.session.max.cache.time|5d|Max cache time for a SparkSession instance when its original copy has been created. When `spark.kyuubi.backend.session.idle.timeout` never is reached for user may continuously run queries, we need this configuration to stop the cached SparkSession which may end up with token expiry issue in kerberized clusters. When in the interval of [t, t * 1.25], we will try to stop the SparkSession gracefully util no connections. But once it fails stop in that region, we will force to stop it
spark.kyuubi.<br />backend.session.local.dir|KYUUBI_HOME/<br />local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
spark.kyuubi.<br />backend.session.long.cache|${UserGroupInformation.<br />isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
spark.kyuubi.<br />backend.session.token.update.class|org.apache.spark.<br />scheduler.cluster.<br />CoarseGrainedClusterMessages$<br />UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)

#### Operation

Expand Down
34 changes: 15 additions & 19 deletions kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,24 @@ object KyuubiConf {

val BACKEND_SESSION_IDLE_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.idle.timeout")
.doc("SparkSession timeout")
.doc("How long the SparkSession instance will be cached after user logout. Using cached" +
" SparkSession can significantly cut the startup time for SparkContext, which makes" +
" sense for queries that are short lived. The timeout is calculated from when all" +
" sessions of the user are disconnected")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(30L))

val BACKEND_SESSION_MAX_CACHE_TIME: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.max.cache.time")
.doc("Max cache time for a SparkSession instance when its original copy has been created." +
" When `spark.kyuubi.backend.session.idle.timeout` never is reached for user may" +
" continuously run queries, we need this configuration to stop the cached SparkSession" +
" which may end up with token expiry issue in kerberized clusters. When in the interval" +
" of [t, t * 1.25], we will try to stop the SparkSession gracefully util no connections." +
" But once it fails stop in that region, we will force to stop it")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.DAYS.toMillis(5))

val BACKEND_SESSION_LOCAL_DIR: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir")
.doc("Default value to set `spark.local.dir`, for YARN mode, this only affect the Kyuubi" +
Expand All @@ -293,24 +307,6 @@ object KyuubiConf {
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "local")

val BACKEND_SESSION_LONG_CACHE: ConfigEntry[Boolean] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.long.cache")
.doc("Whether to update the tokens of Spark's executor to support long caching" +
" SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is" +
" loadable. This is used towards kerberized hadoop clusters in case of" +
" `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time" +
" limit or SparkSession never idles.")
.booleanConf
.createWithDefault(UserGroupInformation.isSecurityEnabled)

val BACKEND_SESSION_TOKEN_UPDATE_CLASS: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.token.update.class")
.doc("`CoarseGrainedClusterMessages` for token update message from the driver of Spark to" +
" executors, it is loadable only by higher version Spark release(2.3 and later)")
.stringConf
.createWithDefault(
"org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$UpdateDelegationTokens")

/////////////////////////////////////////////////////////////////////////////////////////////////
// Authentication //
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,6 @@ object KyuubiSparkUtil extends Logging {
loader
}

/** Determines whether the provided class is loadable in the current thread. */
def classIsLoadable(clazz: String): Boolean = {
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
}

/**
* Generate proper configurations before server starts
* @param conf the default [[SparkConf]]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.parser.ParseException
Expand Down Expand Up @@ -376,10 +375,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
_.onStatementParsed(statementId, result.queryExecution.toString())
}

if (conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))) {
KyuubiSparkExecutorUtils.populateTokens(sparkSession.sparkContext, session.ugi)
}
debug(result.queryExecution.toString())
iter = if (incrementalCollect) {
val numParts = result.rdd.getNumPartitions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.spark

import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.sql.SparkSession

/**
* A recorder for how many client sessions have been cloned by the original [[SparkSession]], which
* helps the [[SparkSessionCacheManager]] cache and recycle [[SparkSession]] instances.
*
* @param spark the original [[SparkSession]] instances
* @param times times of the original [[SparkSession]] instance has been cloned, start from 1
* @param initTime Start time of the SparkSession
*/
private[spark]
class SparkSessionCache private(val spark: SparkSession, times: AtomicInteger, initTime: Long) {

private val maxCacheTime =
KyuubiSparkUtil.timeStringAsMs(spark.conf.get(BACKEND_SESSION_MAX_CACHE_TIME))

private val idleTimeout: Long =
KyuubiSparkUtil.timeStringAsMs(spark.conf.get(BACKEND_SESSION_IDLE_TIMEOUT))

@volatile
private var latestLogout: Long = Long.MaxValue

def updateLogoutTime(time: Long): Unit = latestLogout = time

/**
* When all connections are disconnected and idle timeout reached is since the user last time
* logout.
*
*/
def isIdle: Boolean = {
times.get <= 0 && System.currentTimeMillis - latestLogout > idleTimeout
}

/**
* Whether the cached [[SparkSession]] instance is already stopped.
*/
def isCrashed: Boolean = spark.sparkContext.isStopped

/**
* If the last time is between [maxCacheTime, maxCacheTime * 1.25], we will try to stop this
* SparkSession only when all connection are disconnected.
* If the last time is above maxCacheTime * 1.25, we will stop this SparkSession whether it has
* connections linked or jobs running with.
*
*/
def isExpired: Boolean = {
val now = System.currentTimeMillis
(now - initTime >= maxCacheTime && times.get <= 0 ) || (now - initTime > maxCacheTime * 1.25)
}

def needClear: Boolean = isCrashed || isExpired

def getReuseTimes: Int = times.get()

def incReuseTimeAndGet: Int = times.incrementAndGet()

def decReuseTimeAndGet: Int = times.decrementAndGet()
}

object SparkSessionCache {
def init(spark: SparkSession): SparkSessionCache =
new SparkSessionCache(spark, new AtomicInteger(1), System.currentTimeMillis)
}
Loading

0 comments on commit ab7dde8

Please sign in to comment.