Skip to content

Commit

Permalink
add user info and statement id in executing log (#212)
Browse files Browse the repository at this point in the history
* add user info and statement id in executing log

* add ut

* update conf

* long max

* modify log

* modify log

* unpersit

* typo

* add ut

* fix ut

* typo

* code cov

* add log

* ex order

* rm some ex

* none get

* Revert "none get"

This reverts commit 6792e33.

* rm debug
  • Loading branch information
yaooqinn committed Aug 16, 2019
1 parent 552bb10 commit 6621d36
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 68 deletions.
7 changes: 0 additions & 7 deletions docs/sql_state_code.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ package yaooqinn.kyuubi.operation.statement
import java.io.File

import scala.collection.JavaConverters._
import scala.util.{Success, Try}
import scala.util.Try

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSQLUtils}
Expand All @@ -43,10 +42,7 @@ import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils

class ExecuteStatementInClientMode(
session: KyuubiSession,
statement: String,
runAsync: Boolean = true)
class ExecuteStatementInClientMode(session: KyuubiSession, statement: String, runAsync: Boolean)
extends ExecuteStatementOperation(session, statement, runAsync) {

import ExecuteStatementInClientMode._
Expand Down Expand Up @@ -131,50 +127,46 @@ class ExecuteStatementInClientMode(

override protected def execute(): Unit = {
try {
info(s"Running query '$statement' with $statementId")
val userName = session.getUserName
info(s"Running $userName's query '$statement' with $statementId")
setState(RUNNING)
MetricsSystem.get.foreach(_.RUNNING_QUERIES.inc)
val classLoader = SparkSQLUtils.getUserJarClassLoader(sparkSession)
Thread.currentThread().setContextClassLoader(classLoader)

KyuubiServerMonitor.getListener(session.getUserName).foreach {
KyuubiServerMonitor.getListener(userName).foreach {
_.onStatementStart(
statementId,
session.getSessionHandle.getSessionId.toString,
statement,
statementId,
session.getUserName)
userName)
}
sparkSession.sparkContext.setJobGroup(statementId, statement)
KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext)

val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement)
result = SparkSQLUtils.toDataFrame(sparkSession, transform(parsedPlan))
KyuubiServerMonitor.getListener(session.getUserName).foreach {
KyuubiServerMonitor.getListener(userName).foreach {
_.onStatementParsed(statementId, result.queryExecution.toString())
}

debug(result.queryExecution.toString())
iter = if (incrementalCollect) {
val numParts = result.rdd.getNumPartitions
info(s"Executing query in incremental mode, running $numParts jobs before optimization")
val parts = result.rdd.getNumPartitions
info("Run " + userName + "'s query " + statementId + " incrementally, " + parts + " jobs")
val limit = conf.get(OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT).toInt
if (numParts > limit) {
if (parts > limit) {
val partRows = conf.get(OPERATION_INCREMENTAL_PARTITION_ROWS).toInt
val count = Try { result.persist.count() } match {
case Success(outputSize) =>
val num = math.min(math.max(outputSize / partRows, 1), numParts)
info(s"The total query output is $outputSize and will be coalesced to $num of" +
s" partitions with $partRows rows on average")
num
case _ =>
warn("Failed to calculate the query output size, do not coalesce")
numParts
val outputSize = Try(result.persist.count()).getOrElse(Long.MaxValue)
val finalJobNums = math.max(math.min(math.max(outputSize / partRows, 1), parts), 1)
info("Run " + userName + "'s query " + statementId + " incrementally, records: " +
outputSize + ", " + parts + " -> " + finalJobNums + " jobs after")
try {
result.coalesce(finalJobNums.toInt).toLocalIterator().asScala
} finally {
result.unpersist()
}
info(s"Executing query in incremental mode, running $count jobs after optimization")
result.coalesce(count.toInt).toLocalIterator().asScala
} else {
info(s"Executing query in incremental mode, running $numParts jobs without optimization")
result.toLocalIterator().asScala
}
} else {
Expand All @@ -188,37 +180,14 @@ class ExecuteStatementInClientMode(
setState(FINISHED)
KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId))
} catch {
case e: KyuubiSQLException =>
if (!isClosedOrCanceled) {
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw e
}
case e: ParseException =>
if (!isClosedOrCanceled) {
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.withCommand(statement).getMessage, err)
throw new KyuubiSQLException(
e.withCommand(statement).getMessage + err, "ParseException", 2000, e)
}
case e: AnalysisException =>
if (!isClosedOrCanceled) {
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw new KyuubiSQLException(err, "AnalysisException", 2001, e)
}
case e: HiveAccessControlException =>
if (!isClosedOrCanceled) {
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw new KyuubiSQLException(err, "HiveAccessControlException", 3000, e)
}
case e: Throwable =>
if (!isClosedOrCanceled) {
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw new KyuubiSQLException(err, "<unknown>", 10000, e)
}
case e: KyuubiSQLException if !isClosedOrCanceled =>
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw e
case e: Throwable if !isClosedOrCanceled =>
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw new KyuubiSQLException(err, e.getClass.getSimpleName, e)
} finally {
MetricsSystem.get.foreach {m =>
m.RUNNING_QUERIES.dec()
Expand All @@ -237,9 +206,7 @@ class ExecuteStatementInClientMode(

override protected def cleanup(state: OperationState) {
super.cleanup(state)
if (statementId != null) {
sparkSession.sparkContext.cancelJobGroup(statementId)
}
sparkSession.sparkContext.cancelJobGroup(statementId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class SparkSessionWithUGI(
conf.get(FRONTEND_BIND_PORT),
UUID.randomUUID().toString).mkString("|")
conf.setAppName(appName)
configureSparkConf(sessionConf)
info(s"Create new SparkSession for " + userName + " as " + appName)

try {
Expand Down Expand Up @@ -180,6 +179,7 @@ class SparkSessionWithUGI(

@throws[KyuubiSQLException]
def init(sessionConf: Map[String, String]): Unit = {
configureSparkConf(sessionConf)
getOrCreate(sessionConf)

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,59 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
}
}

test("select") {
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle))
kyuubiSession.sparkSession.sql(
"create table if not exists default.select_tbl(key int) using parquet")
val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl")
val tExecuteStatementResp = fe.ExecuteStatement(ct)
val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle)

while(fe.GetOperationStatus(statusReq)
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
Thread.sleep(10)
}
Thread.sleep(2000)

val tFetchResultsReq = new TFetchResultsReq(
tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)

val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
withFEServiceAndHandleIncAndCal(block)
withFEServiceAndHandleInc(block)
withFEServiceAndHandle(block)
withFEServiceAndHandleAndResultLimit(block)
}

test("select with exception") {
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val kyuubiSession = server.beService.getSessionManager.getSession(new SessionHandle(handle))
kyuubiSession.sparkSession.sql(
"create table if not exists default.select_tbl(key int) using parquet")
val ct = new TExecuteStatementReq(handle, "select * from default.select_tbl")
val tExecuteStatementResp = fe.ExecuteStatement(ct)
val statusReq = new TGetOperationStatusReq(tExecuteStatementResp.getOperationHandle)

while(fe.GetOperationStatus(statusReq)
.getOperationState.getValue < TOperationState.FINISHED_STATE.getValue) {
Thread.sleep(10)
}
Thread.sleep(2000)

val tFetchResultsReq = new TFetchResultsReq(
tExecuteStatementResp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)

val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
}

withFEServiceAndHandleAndException(block)
}

def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = server.feService
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
Expand All @@ -616,6 +669,32 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
block(feService, handle)
}

def withFEServiceAndHandleAndResultLimit(
block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = server.feService
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
req.setUsername(user)
req.setConfiguration(
Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "1").asJava)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle
block(feService, handle)
}

def withFEServiceAndHandleAndException(
block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = server.feService
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
req.setUsername(user)
req.setConfiguration(
Map("set:hivevar:" + KyuubiConf.OPERATION_RESULT_LIMIT.key -> "invaild put").asJava)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle
block(feService, handle)
}

def withFEServiceAndHandleInc(block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = server.feService
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
Expand Down

0 comments on commit 6621d36

Please sign in to comment.