From 72e664fced7615113b79482905bd31393caf5782 Mon Sep 17 00:00:00 2001
From: Kent Yao <11215016@zju.edu.cn>
Date: Sat, 23 Feb 2019 01:10:05 +0800
Subject: [PATCH] [KYUUBI-116][Experimental] Support long cache spark session
in kerberized cluster (#117)
* fixes @116 Support long caching SparkSession/SparkContext for secured hadoop cluster
* handle sub classes of coarse grained scheduler backend
* fix ut for spark 2.2
* updating doc
---
docs/configurations.md | 71 ++++++-----
.../scala/org/apache/spark/KyuubiConf.scala | 30 ++++-
.../org/apache/spark/KyuubiSparkUtil.scala | 6 +
.../cluster/KyuubiSparkExecutorUtils.scala | 71 +++++++++++
.../kyuubi/operation/KyuubiOperation.scala | 27 ++--
.../kyuubi/spark/SparkSessionWithUGI.scala | 2 +-
kyuubi-server/src/test/resources/log4j.xml | 8 +-
.../KyuubiSparkExecutorUtilsSuite.scala | 117 ++++++++++++++++++
.../spark/SparkSessionWithUGISuite.scala | 4 +-
9 files changed, 282 insertions(+), 54 deletions(-)
create mode 100644 kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala
create mode 100644 kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala
diff --git a/docs/configurations.md b/docs/configurations.md
index 0c3a7399c06..1c04218cc18 100644
--- a/docs/configurations.md
+++ b/docs/configurations.md
@@ -30,71 +30,74 @@ $ bin/start-kyuubi.sh \
Name|Default|Description
---|---|---
-spark.kyuubi.ha.enabled|false|Whether KyuubiServer supports dynamic service discovery for its clients. To support this, each instance of KyuubiServer currently uses ZooKeeper to register itself, when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.
-spark.kyuubi.ha.mode|load-balance|High availability mode, one is load-balance which is used by default, another is failover as master-slave mode.
-spark.kyuubi.ha.zk.quorum|none|Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports service discovery via Zookeeper.
-spark.kyuubi.ha.zk.namespace|kyuubiserver|The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service discovery.
-spark.kyuubi.ha.zk.client.port|2181|The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used.
-spark.kyuubi.ha.zk.session.timeout|1,200,000|ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.
-spark.kyuubi.ha.zk.connection.basesleeptime|1,000|Initial amount of time (in milliseconds) to wait between retries when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy.
-spark.kyuubi.ha.zk.connection.max.retries|3|Max retry times for connecting to the zk server
+spark.kyuubi.
ha.enabled|false|Whether KyuubiServer supports dynamic service discovery for its clients. To support this, each instance of KyuubiServer currently uses ZooKeeper to register itself, when it is brought up. JDBC/ODBC clients should use the ZooKeeper ensemble: spark.kyuubi.ha.zk.quorum in their connection string.
+spark.kyuubi.
ha.mode|load-balance|High availability mode, one is load-balance which is used by default, another is failover as master-slave mode.
+spark.kyuubi.
ha.zk.quorum|none|Comma separated list of ZooKeeper servers to talk to, when KyuubiServer supports service discovery via Zookeeper.
+spark.kyuubi.
ha.zk.namespace|kyuubiserver|The parent node in ZooKeeper used by KyuubiServer when supporting dynamic service discovery.
+spark.kyuubi.
ha.zk.client.port|2181|The port of ZooKeeper servers to talk to. If the list of Zookeeper servers specified in spark.kyuubi.zookeeper.quorum does not contain port numbers, this value is used.
+spark.kyuubi.
ha.zk.session.timeout|1,200,000|ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, if a heartbeat is not sent in the timeout.
+spark.kyuubi.
ha.zk.connection.basesleeptime|1,000|Initial amount of time (in milliseconds) to wait between retries when connecting to the ZooKeeper server when using ExponentialBackoffRetry policy.
+spark.kyuubi.
ha.zk.connection.max.retries|3|Max retry times for connecting to the zk server
#### Operation Log
Name|Default|Description
---|---|---
-spark.kyuubi.logging.operation.enabled|true|When true, Kyuubi Server will save operation logs and make them available for clients
-spark.kyuubi.logging.operation.log.dir|`KYUUBI_LOG_DIR` -> `java.io.tmpdir`/operation_logs|Top level directory where operation logs are stored if logging functionality is enabled
+spark.kyuubi.
logging.operation.enabled|true|When true, Kyuubi Server will save operation logs and make them available for clients
+spark.kyuubi.
logging.operation.log.dir|`KYUUBI_LOG_DIR` -> `java.io.tmpdir`/operation_logs|Top level directory where operation logs are stored if logging functionality is enabled
#### Frontend Service options
Name|Default|Description
---|---|---
-spark.kyuubi.frontend.bind.host | localhost | Bind host on which to run the Kyuubi Frontend service.
-spark.kyuubi.frontend.bind.port| 10009 | Port number of Kyuubi Frontend service. set 0 will get a random available one
-spark.kyuubi.frontend.min.worker.threads| 50 | Minimum number of Thrift worker threads.
-spark.kyuubi.frontend.max.worker.threads| 500 | Maximum number of Thrift worker threads
-spark.kyuubi.frontend.worker.keepalive.time | 60s| Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
-spark.kyuubi.authentication | NONE | Client authentication types. NONE: no authentication check; NOSASL: no authentication check KERBEROS: Kerberos/GSSAPI authentication.
-spark.kyuubi.frontend.allow.user.substitution | true | Allow alternate user to be specified as part of Kyuubi open connection request.
-spark.kyuubi.frontend.enable.doAs | true | Set true to have Kyuubi execute SQL operations as the user making the calls to it.
-spark.kyuubi.frontend.max.message.size | 104857600 | Maximum message size in bytes a Kyuubi server will accept.
+spark.kyuubi.
frontend.bind.host | localhost | Bind host on which to run the Kyuubi Frontend service.
+spark.kyuubi.
frontend.bind.port| 10009 | Port number of Kyuubi Frontend service. set 0 will get a random available one
+spark.kyuubi.
frontend.min.worker.threads| 50 | Minimum number of Thrift worker threads.
+spark.kyuubi.
frontend.max.worker.threads| 500 | Maximum number of Thrift worker threads
+spark.kyuubi.
frontend.worker.keepalive.time | 60s| Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
+spark.kyuubi.
authentication | NONE | Client authentication types. NONE: no authentication check; NOSASL: no authentication check KERBEROS: Kerberos/GSSAPI authentication.
+spark.kyuubi.
frontend.allow.user.substitution | true | Allow alternate user to be specified as part of Kyuubi open connection request.
+spark.kyuubi.
frontend.enable.doAs | true | Set true to have Kyuubi execute SQL operations as the user making the calls to it.
+spark.kyuubi.
frontend.max.message.size | 104857600 | Maximum message size in bytes a Kyuubi server will accept.
#### Background Execution Thread Pool
Name|Default|Description
---|---|---
-spark.kyuubi.async.exec.threads|100|Number of threads in the async thread pool for KyuubiServer.
-spark.kyuubi.async.exec.wait.queue.size|100|Size of the wait queue for async thread pool in KyuubiServer. After hitting this limit, the async thread pool will reject new requests.
-spark.kyuubi.async.exec.keep.alive.time|10,000|Time (in milliseconds) that an idle KyuubiServer async thread (from the thread pool) will wait for a new task to arrive before terminating.
-spark.kyuubi.async.exec.shutdown.timeout|10,000|How long KyuubiServer shutdown will wait for async threads to terminate.
+spark.kyuubi.
async.exec.threads|100|Number of threads in the async thread pool for KyuubiServer.
+spark.kyuubi.
async.exec.wait.queue.size|100|Size of the wait queue for async thread pool in KyuubiServer. After hitting this limit, the async thread pool will reject new requests.
+spark.kyuubi.
async.exec.keep.alive.time|10,000|Time (in milliseconds) that an idle KyuubiServer async thread (from the thread pool) will wait for a new task to arrive before terminating.
+spark.kyuubi.
async.exec.shutdown.timeout|10,000|How long KyuubiServer shutdown will wait for async threads to terminate.
#### Kyuubi Session
Name|Default|Description
---|---|---
-spark.kyuubi.frontend.session.check.interval|6h|The check interval for frontend session/operation timeout, which can be disabled by setting to zero or negative value.
-spark.kyuubi.frontend.session.timeout|8h|The check interval for session/operation timeout, which can be disabled by setting to zero or negative value.
-spark.kyuubi.frontend.session.check.operation| true |Session will be considered to be idle only if there is no activity, and there is no pending operation. This setting takes effect only if session idle timeout `spark.kyuubi.frontend.session.timeout` and checking `spark.kyuubi.frontend.session.check.interval` are enabled.
+spark.kyuubi.
frontend.session.check.interval|6h|The check interval for frontend session/operation timeout, which can be disabled by setting to zero or negative value.
+spark.kyuubi.
frontend.session.timeout|8h|The check interval for session/operation timeout, which can be disabled by setting to zero or negative value.
+spark.kyuubi.
frontend.session.check.operation| true |Session will be considered to be idle only if there is no activity, and there is no pending operation. This setting takes effect only if session idle timeout `spark.kyuubi.frontend.session.timeout` and checking `spark.kyuubi.frontend.session.check.interval` are enabled.
#### Spark Session
Name|Default|Description
---|---|---
-spark.kyuubi.backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
-spark.kyuubi.backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
-spark.kyuubi.backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
-spark.kyuubi.backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
-spark.kyuubi.backend.session.idle.timeout|30min|SparkSession timeout.
+spark.kyuubi.
backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
+spark.kyuubi.
backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
+spark.kyuubi.
backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
+spark.kyuubi.
backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
+spark.kyuubi.
backend.session.idle.timeout|30min|SparkSession timeout.
+spark.kyuubi.
backend.session.local.dir|KYUUBI_HOME/local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
+spark.kyuubi.
backend.session.long.cache|${UserGroupInformation.isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
+spark.kyuubi.
backend.session.token.update.class|org.apache.spark.
scheduler.cluster.
CoarseGrainedClusterMessages$
UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)
#### Operation
Name|Default|Description
---|---|---
-spark.kyuubi.operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
-spark.kyuubi.operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
-spark.kyuubi.operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
+spark.kyuubi.
operation.idle.timeout|6h|Operation will be closed when it's not accessed for this duration of time.
+spark.kyuubi.
operation.incremental.collect|false|Whether to use incremental result collection from Spark executor side to Kyuubi server side.
+spark.kyuubi.
operation.result.limit|-1|In non-incremental result collection mode, set this to a positive value to limit the size of result collected to driver side.
---
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
index 51ac2943c0e..6d5805ca526 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
@@ -18,12 +18,13 @@
package org.apache.spark
import java.io.File
-import java.util.HashMap
+import java.util.{HashMap => JMap}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.language.implicitConversions
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
/**
@@ -32,7 +33,7 @@ import org.apache.spark.internal.config.{ConfigBuilder, ConfigEntry}
*/
object KyuubiConf {
- private[this] val kyuubiConfEntries = new HashMap[String, ConfigEntry[_]]()
+ private val kyuubiConfEntries = new JMap[String, ConfigEntry[_]]()
def register(entry: ConfigEntry[_]): Unit = {
require(!kyuubiConfEntries.containsKey(entry.key),
@@ -40,7 +41,7 @@ object KyuubiConf {
kyuubiConfEntries.put(entry.key, entry)
}
- private[this] object KyuubiConfigBuilder {
+ private object KyuubiConfigBuilder {
def apply(key: String): ConfigBuilder = ConfigBuilder(key).onCreate(register)
}
@@ -265,7 +266,7 @@ object KyuubiConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))
- val BACKEND_SESSTION_INIT_TIMEOUT: ConfigEntry[Long] =
+ val BACKEND_SESSION_INIT_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.init.timeout")
.doc("How long we suggest the server to give up instantiating SparkContext")
.timeConf(TimeUnit.SECONDS)
@@ -285,12 +286,31 @@ object KyuubiConf {
val BACKEND_SESSION_LOCAL_DIR: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir")
- .doc("Default value to set spark.local.dir")
+ .doc("Default value to set `spark.local.dir`, for YARN mode, this only affect the Kyuubi" +
+ " server side settings according to the rule of Spark treating `spark.local.dir`")
.stringConf
.createWithDefault(
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "local")
+ val BACKEND_SESSION_LONG_CACHE: ConfigEntry[Boolean] =
+ KyuubiConfigBuilder("spark.kyuubi.backend.session.long.cache")
+ .doc("Whether to update the tokens of Spark's executor to support long caching" +
+ " SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is" +
+ " loadable. This is used towards kerberized hadoop clusters in case of" +
+ " `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time" +
+ " limit or SparkSession never idles.")
+ .booleanConf
+ .createWithDefault(UserGroupInformation.isSecurityEnabled)
+
+ val BACKEND_SESSION_TOKEN_UPDATE_CLASS: ConfigEntry[String] =
+ KyuubiConfigBuilder("spark.kyuubi.backend.session.token.update.class")
+ .doc("`CoarseGrainedClusterMessages` for token update message from the driver of Spark to" +
+ " executors, it is loadable only by higher version Spark release(2.3 and later)")
+ .stringConf
+ .createWithDefault(
+ "org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$UpdateDelegationTokens")
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Authentication //
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
index 9aaf1cfab78..8ec7b3d0f5b 100644
--- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
+++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala
@@ -23,6 +23,7 @@ import java.net.URI
import scala.annotation.tailrec
import scala.collection.Map
+import scala.util.Try
import scala.util.matching.Regex
import org.apache.hadoop.conf.Configuration
@@ -255,6 +256,11 @@ object KyuubiSparkUtil extends Logging {
loader
}
+ /** Determines whether the provided class is loadable in the current thread. */
+ def classIsLoadable(clazz: String): Boolean = {
+ Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
+ }
+
/**
* Generate proper configurations before server starts
* @param conf the default [[SparkConf]]
diff --git a/kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala b/kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala
new file mode 100644
index 00000000000..55102ad88d9
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.KyuubiConf._
+import org.apache.spark.SparkContext
+
+import yaooqinn.kyuubi.utils.ReflectUtils._
+
+/**
+ * Tool for methods used for Kyuubi to talking to Spark Executors
+ */
+object KyuubiSparkExecutorUtils {
+
+ /**
+ * Populate the tokens contained in the current KyuubiSession's ugi to the all the alive
+ * executors associated with its own SparkContext.
+ *
+ * @param sc The SparkContext with its runtime environment which contains all the executors,
+ * associated with the current KyuubiSession and UserGroupInformation.
+ * @param user the UserGroupInformation associated with the current KyuubiSession
+ */
+ def populateTokens(sc: SparkContext, user: UserGroupInformation): Unit = {
+ val schedulerBackend = sc.schedulerBackend
+ schedulerBackend match {
+ case backend: CoarseGrainedSchedulerBackend =>
+ try {
+ val byteStream = new ByteArrayOutputStream
+ val dataStream = new DataOutputStream(byteStream)
+ user.getCredentials.writeTokenStorageToStream(dataStream)
+ val tokens = byteStream.toByteArray
+ val executorField =
+ classOf[CoarseGrainedSchedulerBackend].getName.replace('.', '$') + "$$executorDataMap"
+ val executors = backend match {
+ case _: YarnClientSchedulerBackend | _: YarnClusterSchedulerBackend |
+ _: StandaloneSchedulerBackend =>
+ getAncestorField(backend, 2, executorField)
+ case _ => getFieldValue(backend, executorField)
+ }
+ val msg = newInstance(sc.conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS),
+ Seq(classOf[Array[Byte]]), Seq(tokens))
+ executors.asInstanceOf[mutable.HashMap[String, ExecutorData]]
+ .values.foreach(_.executorEndpoint.send(msg))
+ } catch {
+ case NonFatal(e) => warn(s"Failed to populate secured tokens to executors", e)
+ }
+ case _ =>
+ }
+ }
+}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala
index 658d65ece57..91c5c323fbf 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
+import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -54,7 +55,8 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
private val opHandle: OperationHandle =
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
- private val conf = session.sparkSession.conf
+ private val sparkSession = session.sparkSession
+ private val conf = sparkSession.conf
private val operationTimeout =
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT))
@@ -213,7 +215,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
debug(s"CLOSING $statementId")
cleanup(CLOSED)
cleanupOperationLog()
- session.sparkSession.sparkContext.clearJobGroup()
+ sparkSession.sparkContext.clearJobGroup()
}
def cancel(): Unit = {
@@ -312,7 +314,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
val destFileName = src.getName
val destFile =
new File(session.getResourcesSessionDir, destFileName).getCanonicalPath
- val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration)
+ val fs = src.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
fs.copyToLocalFile(src, new Path(destFile))
FileUtil.chmod(destFile, "ugo+rx", true)
AddJarCommand(destFile).run(session.sparkSession)
@@ -327,7 +329,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
info(s"Running query '$statement' with $statementId")
setState(RUNNING)
- val classLoader = SparkSQLUtils.getUserJarClassLoader(session.sparkSession)
+ val classLoader = SparkSQLUtils.getUserJarClassLoader(sparkSession)
Thread.currentThread().setContextClassLoader(classLoader)
KyuubiServerMonitor.getListener(session.getUserName).foreach {
@@ -338,10 +340,10 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
statementId,
session.getUserName)
}
- session.sparkSession.sparkContext.setJobGroup(statementId, statement)
- KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext)
+ sparkSession.sparkContext.setJobGroup(statementId, statement)
+ KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext)
- val parsedPlan = SparkSQLUtils.parsePlan(session.sparkSession, statement)
+ val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement)
parsedPlan match {
case c if c.nodeName == "CreateFunctionCommand" =>
val resources =
@@ -354,10 +356,15 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
localizeAndAndResource(path)
case _ =>
}
- result = SparkSQLUtils.toDataFrame(session.sparkSession, parsedPlan)
+ result = SparkSQLUtils.toDataFrame(sparkSession, parsedPlan)
KyuubiServerMonitor.getListener(session.getUserName).foreach {
_.onStatementParsed(statementId, result.queryExecution.toString())
}
+
+ if (conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
+ KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))) {
+ KyuubiSparkExecutorUtils.populateTokens(sparkSession.sparkContext, session.ugi)
+ }
debug(result.queryExecution.toString())
iter = if (incrementalCollect) {
info("Executing query in incremental collection mode")
@@ -402,7 +409,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
} finally {
if (statementId != null) {
- session.sparkSession.sparkContext.cancelJobGroup(statementId)
+ sparkSession.sparkContext.cancelJobGroup(statementId)
}
}
}
@@ -429,7 +436,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
backgroundHandle.cancel(true)
}
if (statementId != null) {
- session.sparkSession.sparkContext.cancelJobGroup(statementId)
+ sparkSession.sparkContext.cancelJobGroup(statementId)
}
}
diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala
index 7c04d88f2a4..abe9f7f9f3d 100644
--- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala
+++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala
@@ -158,7 +158,7 @@ class SparkSessionWithUGI(
"kyuubi", userName, conf.get(FRONTEND_BIND_HOST), conf.get(FRONTEND_BIND_PORT)).mkString("|")
conf.setAppName(appName)
configureSparkConf(sessionConf)
- val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSTION_INIT_TIMEOUT)
+ val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSION_INIT_TIMEOUT)
try {
KyuubiHadoopUtil.doAs(user) {
newContext.start()
diff --git a/kyuubi-server/src/test/resources/log4j.xml b/kyuubi-server/src/test/resources/log4j.xml
index 37072aab6f5..61450b91e0e 100644
--- a/kyuubi-server/src/test/resources/log4j.xml
+++ b/kyuubi-server/src/test/resources/log4j.xml
@@ -18,7 +18,11 @@
-
+
+
+
+
+
@@ -30,7 +34,7 @@
-
+
diff --git a/kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala
new file mode 100644
index 00000000000..4d451a3075d
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/spark/scheduler/cluster/KyuubiSparkExecutorUtilsSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.KyuubiConf._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.scalatest.BeforeAndAfterEach
+
+import yaooqinn.kyuubi.utils.ReflectUtils
+
+class KyuubiSparkExecutorUtilsSuite
+ extends SparkFunSuite with BeforeAndAfterEach {
+ import KyuubiSparkUtil._
+
+ val conf: SparkConf = new SparkConf(true)
+ .setAppName(this.getClass.getSimpleName)
+ .setMaster("local")
+ KyuubiSparkUtil.setupCommonConfig(conf)
+
+ var sc: SparkContext = _
+
+ override def beforeEach(): Unit = {
+ sc = new SparkContext(conf)
+ super.beforeEach()
+ }
+
+ override def afterEach(): Unit = {
+ if (sc != null) {
+ sc.stop()
+ }
+ super.afterEach()
+ }
+
+ test("populate tokens for non CoarseGrainedSchedulerBackend") {
+ val taskSchedulerImpl = new TaskSchedulerImpl(sc)
+ val backend = new LocalSchedulerBackend(conf, taskSchedulerImpl, 1)
+ ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
+ val user = UserGroupInformation.getCurrentUser
+ KyuubiSparkExecutorUtils.populateTokens(sc, user)
+ }
+
+ test("populate tokens for CoarseGrainedSchedulerBackend") {
+ val taskSchedulerImpl = new TaskSchedulerImpl(sc)
+ val backend = new CoarseGrainedSchedulerBackend(taskSchedulerImpl, sc.env.rpcEnv)
+ ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
+ val user = UserGroupInformation.getCurrentUser
+ KyuubiSparkExecutorUtils.populateTokens(sc, user)
+ }
+
+ test("populate tokens for YarnClientSchedulerBackend") {
+ val taskSchedulerImpl = new TaskSchedulerImpl(sc)
+ val backend = new YarnClientSchedulerBackend(taskSchedulerImpl, sc)
+ ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
+ val user = UserGroupInformation.getCurrentUser
+ KyuubiSparkExecutorUtils.populateTokens(sc, user)
+ }
+
+ test("populate tokens for YarnClusterSchedulerBackend") {
+ val taskSchedulerImpl = new TaskSchedulerImpl(sc)
+ val backend = new YarnClusterSchedulerBackend(taskSchedulerImpl, sc)
+ ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
+ val user = UserGroupInformation.getCurrentUser
+ KyuubiSparkExecutorUtils.populateTokens(sc, user)
+ }
+
+ test("populate tokens for StandaloneSchedulerBackend") {
+ val taskSchedulerImpl = new TaskSchedulerImpl(sc)
+ val backend = new StandaloneSchedulerBackend(taskSchedulerImpl, sc, null)
+ ReflectUtils.setFieldValue(sc, "_schedulerBackend", backend)
+ val user = UserGroupInformation.getCurrentUser
+ KyuubiSparkExecutorUtils.populateTokens(sc, user)
+ }
+
+ test("get executor data map") {
+ val taskSchedulerImpl = new TaskSchedulerImpl(sc)
+ val backend = new CoarseGrainedSchedulerBackend(taskSchedulerImpl, sc.env.rpcEnv)
+ val executorDataMap = ReflectUtils.getFieldValue(backend,
+ "org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$executorDataMap")
+ assert(executorDataMap.asInstanceOf[mutable.HashMap[String, ExecutorData]].values.isEmpty)
+ sc.stop()
+ }
+
+ test("create update token class via reflection") {
+ val className = conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS)
+ assert(classIsLoadable(className) ===
+ (majorVersion(SPARK_VERSION) == 2 && minorVersion(SPARK_VERSION) >= 3))
+
+ if (classIsLoadable(className)) {
+ val tokens1 = Array(0.toByte)
+ val tokens2 = Array(1, 2, 3, 4).map(_.toByte)
+ val msg1 = ReflectUtils.newInstance(className, Seq(classOf[Array[Byte]]), Seq(tokens1))
+ assert(ReflectUtils.getFieldValue(msg1, "tokens") === tokens1)
+ val msg2 = ReflectUtils.newInstance(className, Seq(classOf[Array[Byte]]), Seq(tokens2))
+ assert(ReflectUtils.getFieldValue(msg2, "tokens") === tokens2)
+ }
+ }
+}
diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala
index a4111897044..a8d13f9e795 100644
--- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala
+++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala
@@ -63,7 +63,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
test("test init failed with sc init failing") {
assert(!spark.sparkContext.isStopped)
val confClone = conf.clone().remove(KyuubiSparkUtil.MULTIPLE_CONTEXTS)
- .set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3")
+ .set(KyuubiConf.BACKEND_SESSION_INIT_TIMEOUT.key, "3")
val userName1 = "test1"
val ru = UserGroupInformation.createRemoteUser(userName1)
val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone, cache)
@@ -146,7 +146,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
test("test init failed with time out exception") {
// point to an non-exist cluster manager
val confClone = conf.clone().setMaster("spark://localhost:7077")
- .set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3")
+ .set(KyuubiConf.BACKEND_SESSION_INIT_TIMEOUT.key, "3")
val userName1 = "test"
val ru = UserGroupInformation.createRemoteUser(userName1)
val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone, cache)