diff --git a/docs/sql_state_code.md b/docs/sql_state_code.md deleted file mode 100644 index dbfe25f5ba9..00000000000 --- a/docs/sql_state_code.md +++ /dev/null @@ -1,7 +0,0 @@ -|SQL State|Condition|Vendor Code|Type| -|:---:|:---|:---:|:---:| -|ServerError| internal error | 1000 | Kyuubi | -|08S01|SQL client unable to establish SQL connection | 1001| | -|ParseException|failed parsing sql statement|2000|Spark| -|AnalysisException|failed analysing sql statement|2001|| -|HiveAccessControlException| failed privilege checking|3000|Hive| diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala index 64040eb2321..1538d5662e5 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementInClientMode.scala @@ -20,11 +20,10 @@ package yaooqinn.kyuubi.operation.statement import java.io.File import scala.collection.JavaConverters._ -import scala.util.{Success, Try} +import scala.util.Try import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{FileUtil, Path} -import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSQLUtils} @@ -43,10 +42,7 @@ import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor import yaooqinn.kyuubi.utils.ReflectUtils -class ExecuteStatementInClientMode( - session: KyuubiSession, - statement: String, - runAsync: Boolean = true) +class ExecuteStatementInClientMode(session: KyuubiSession, statement: String, runAsync: Boolean) extends ExecuteStatementOperation(session, statement, runAsync) { import ExecuteStatementInClientMode._ @@ -131,50 +127,46 @@ class ExecuteStatementInClientMode( override protected def execute(): Unit = { try { - info(s"Running query '$statement' with $statementId") + val userName = session.getUserName + info(s"Running $userName's query '$statement' with $statementId") setState(RUNNING) MetricsSystem.get.foreach(_.RUNNING_QUERIES.inc) val classLoader = SparkSQLUtils.getUserJarClassLoader(sparkSession) Thread.currentThread().setContextClassLoader(classLoader) - KyuubiServerMonitor.getListener(session.getUserName).foreach { + KyuubiServerMonitor.getListener(userName).foreach { _.onStatementStart( statementId, session.getSessionHandle.getSessionId.toString, statement, statementId, - session.getUserName) + userName) } sparkSession.sparkContext.setJobGroup(statementId, statement) KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext) val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement) result = SparkSQLUtils.toDataFrame(sparkSession, transform(parsedPlan)) - KyuubiServerMonitor.getListener(session.getUserName).foreach { + KyuubiServerMonitor.getListener(userName).foreach { _.onStatementParsed(statementId, result.queryExecution.toString()) } - debug(result.queryExecution.toString()) iter = if (incrementalCollect) { - val numParts = result.rdd.getNumPartitions - info(s"Executing query in incremental mode, running $numParts jobs before optimization") + val parts = result.rdd.getNumPartitions + info("Run " + userName + "'s query " + statementId + " incrementally, " + parts + " jobs") val limit = conf.get(OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT).toInt - if (numParts > limit) { + if (parts > limit) { val partRows = conf.get(OPERATION_INCREMENTAL_PARTITION_ROWS).toInt - val count = Try { result.persist.count() } match { - case Success(outputSize) => - val num = math.min(math.max(outputSize / partRows, 1), numParts) - info(s"The total query output is $outputSize and will be coalesced to $num of" + - s" partitions with $partRows rows on average") - num - case _ => - warn("Failed to calculate the query output size, do not coalesce") - numParts + val outputSize = Try(result.persist.count()).getOrElse(Long.MaxValue) + val finalJobNums = math.max(math.min(math.max(outputSize / partRows, 1), parts), 1) + info("Run " + userName + "'s query " + statementId + " incrementally, records: " + + outputSize + ", " + parts + " -> " + finalJobNums + " jobs after") + try { + result.coalesce(finalJobNums.toInt).toLocalIterator().asScala + } finally { + result.unpersist() } - info(s"Executing query in incremental mode, running $count jobs after optimization") - result.coalesce(count.toInt).toLocalIterator().asScala } else { - info(s"Executing query in incremental mode, running $numParts jobs without optimization") result.toLocalIterator().asScala } } else { @@ -188,37 +180,14 @@ class ExecuteStatementInClientMode( setState(FINISHED) KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId)) } catch { - case e: KyuubiSQLException => - if (!isClosedOrCanceled) { - val err = KyuubiSparkUtil.exceptionString(e) - onStatementError(statementId, e.getMessage, err) - throw e - } - case e: ParseException => - if (!isClosedOrCanceled) { - val err = KyuubiSparkUtil.exceptionString(e) - onStatementError(statementId, e.withCommand(statement).getMessage, err) - throw new KyuubiSQLException( - e.withCommand(statement).getMessage + err, "ParseException", 2000, e) - } - case e: AnalysisException => - if (!isClosedOrCanceled) { - val err = KyuubiSparkUtil.exceptionString(e) - onStatementError(statementId, e.getMessage, err) - throw new KyuubiSQLException(err, "AnalysisException", 2001, e) - } - case e: HiveAccessControlException => - if (!isClosedOrCanceled) { - val err = KyuubiSparkUtil.exceptionString(e) - onStatementError(statementId, e.getMessage, err) - throw new KyuubiSQLException(err, "HiveAccessControlException", 3000, e) - } - case e: Throwable => - if (!isClosedOrCanceled) { - val err = KyuubiSparkUtil.exceptionString(e) - onStatementError(statementId, e.getMessage, err) - throw new KyuubiSQLException(err, "", 10000, e) - } + case e: KyuubiSQLException if !isClosedOrCanceled => + val err = KyuubiSparkUtil.exceptionString(e) + onStatementError(statementId, e.getMessage, err) + throw e + case e: Throwable if !isClosedOrCanceled => + val err = KyuubiSparkUtil.exceptionString(e) + onStatementError(statementId, e.getMessage, err) + throw new KyuubiSQLException(err, e.getClass.getSimpleName, e) } finally { MetricsSystem.get.foreach {m => m.RUNNING_QUERIES.dec() @@ -237,9 +206,7 @@ class ExecuteStatementInClientMode( override protected def cleanup(state: OperationState) { super.cleanup(state) - if (statementId != null) { - sparkSession.sparkContext.cancelJobGroup(statementId) - } + sparkSession.sparkContext.cancelJobGroup(statementId) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index 89e74e5040d..af424ac51ca 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -138,7 +138,6 @@ class SparkSessionWithUGI( conf.get(FRONTEND_BIND_PORT), UUID.randomUUID().toString).mkString("|") conf.setAppName(appName) - configureSparkConf(sessionConf) info(s"Create new SparkSession for " + userName + " as " + appName) try { @@ -180,6 +179,7 @@ class SparkSessionWithUGI( @throws[KyuubiSQLException] def init(sessionConf: Map[String, String]): Unit = { + configureSparkConf(sessionConf) getOrCreate(sessionConf) try { diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index cde8b488a30..2522bbc5851 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -606,6 +606,59 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu } } + test("select") { + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { + val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle)) + kyuubiSession.sparkSession.sql( + "create table if not exists default.select_tbl(key int) using parquet") + val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl") + val tExecuteStatementResp = fe.ExecuteStatement(ct) + val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle) + + while(fe.GetOperationStatus(statusReq) + .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { + Thread.sleep(10) + } + Thread.sleep(2000) + + val tFetchResultsReq = new TFetchResultsReq( + tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + withFEServiceAndHandleIncAndCal(block) + withFEServiceAndHandleInc(block) + withFEServiceAndHandle(block) + withFEServiceAndHandleAndResultLimit(block) + } + + test("select with exception") { + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { + val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle)) + kyuubiSession.sparkSession.sql( + "create table if not exists default.select_tbl(key int) using parquet") + val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl") + val tExecuteStatementResp = fe.ExecuteStatement(ct) + val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle) + + while(fe.GetOperationStatus(statusReq) + .getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) { + Thread.sleep(10) + } + Thread.sleep(2000) + + val tFetchResultsReq = new TFetchResultsReq( + tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + } + + withFEServiceAndHandleAndException(block) + } + def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = { val feService = server.feService val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) @@ -616,6 +669,32 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu block(feService, handle) } + def withFEServiceAndHandleAndResultLimit( + block: (FrontendService, TSessionHandle) => Unit): Unit = { + val feService = server.feService + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + req.setUsername(user) + req.setConfiguration( + Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "1").asJava) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + block(feService, handle) + } + + def withFEServiceAndHandleAndException( + block: (FrontendService, TSessionHandle) => Unit): Unit = { + val feService = server.feService + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + req.setUsername(user) + req.setConfiguration( + Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "invaild put").asJava) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + block(feService, handle) + } + def withFEServiceAndHandleInc(block: (FrontendService, TSessionHandle) => Unit): Unit = { val feService = server.feService val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)