Skip to content

Commit

Permalink
JDBC Driver support pull Multiple result sets
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Sep 13, 2024
1 parent 17d2e03 commit c798324
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain implements Driver {
static String PASSWORD = "password";
static boolean TABLEAU_SERVER = false;
static String FIXED_SESSION = "fixedSession";
static String ENABLE_MULTI_RESULT = "enableMultiResult";

static String USE_SSL = "useSSL";
static String VERSION = "version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope
val runType = EngineType.mapStringToEngineType(engine) match {
case EngineType.SPARK => RunType.SQL
case EngineType.HIVE => RunType.HIVE
case EngineType.REPL => RunType.REPL
case EngineType.DORIS => RunType.DORIS
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
case EngineType.NEBULA => RunType.NEBULA_SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio
with Logging {

private var jobExecuteResult: JobExecuteResult = _

private val openedResultSets: util.ArrayList[UJESSQLResultSet] =
new util.ArrayList[UJESSQLResultSet]()

private var resultSet: UJESSQLResultSet = _
private var closed = false
private var maxRows: Int = 0
Expand Down Expand Up @@ -190,7 +194,7 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio

override def getUpdateCount: Int = throwWhenClosed(-1)

override def getMoreResults: Boolean = false
override def getMoreResults: Boolean = getMoreResults(Statement.CLOSE_CURRENT_RESULT)

override def setFetchDirection(direction: Int): Unit =
throwWhenClosed(if (direction != ResultSet.FETCH_FORWARD) {
Expand Down Expand Up @@ -230,7 +234,45 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio

override def getConnection: Connection = throwWhenClosed(ujesSQLConnection)

override def getMoreResults(current: Int): Boolean = false
override def getMoreResults(current: Int): Boolean = {
if (this.resultSet == null) {
false
} else {
this.resultSet.getMetaData
val nextResultSet = this.resultSet.getNextResultSet
current match {
case Statement.CLOSE_CURRENT_RESULT =>
// 1 - CLOSE CURRENT RESULT SET
this.resultSet.close()
this.resultSet.clearNextResultSet
case Statement.KEEP_CURRENT_RESULT =>
// 2 - KEEP CURRENT RESULT SET
this.openedResultSets.add(this.resultSet)
this.resultSet.clearNextResultSet
case Statement.CLOSE_ALL_RESULTS =>
// 3 - CLOSE ALL RESULT SET
this.openedResultSets.add(this.resultSet)
closeAllOpenedResultSet()
case _ =>
throw new LinkisSQLException(
LinkisSQLErrorCode.NOSUPPORT_STATEMENT,
"getMoreResults with current not in 1,2,3 is not supported, see Statement.getMoreResults"
)
}
this.resultSet = nextResultSet
this.resultSet != null
}
}

private def closeAllOpenedResultSet(): Any = {
val iterator = this.openedResultSets.iterator()
while (iterator.hasNext) {
val set = iterator.next()
if (!set.isClosed) {
set.close()
}
}
}

override def getGeneratedKeys: ResultSet = throw new LinkisSQLException(
LinkisSQLErrorCode.NOSUPPORT_STATEMENT,
Expand Down Expand Up @@ -302,6 +344,7 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio

/**
* log[0] error log[1] warn log[2] info log[3] all (info + warn + error)
*
* @return
*/
def getAllLog(): Array[String] = {
Expand All @@ -316,6 +359,7 @@ class LinkisSQLStatement(private[jdbc] val ujesSQLConnection: LinkisSQLConnectio

/**
* log[0] error log[1] warn log[2] info log[3] all (info + warn + error)
*
* @return
*/
def getIncrementalLog(): util.List[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class UJESSQLDriverMain extends Driver with Logging {
case Array(USE_SSL, value) =>
props.setProperty(USE_SSL, value)
false
case Array(ENABLE_MULTI_RESULT, value) =>
props.setProperty(ENABLE_MULTI_RESULT, value)
false
case Array(key, _) =>
if (StringUtils.isBlank(key)) {
throw new LinkisSQLException(
Expand Down Expand Up @@ -141,6 +144,7 @@ object UJESSQLDriverMain {
val PASSWORD = UJESSQLDriver.PASSWORD
val TABLEAU_SERVER = UJESSQLDriver.TABLEAU_SERVER
val FIXED_SESSION = UJESSQLDriver.FIXED_SESSION
val ENABLE_MULTI_RESULT = UJESSQLDriver.ENABLE_MULTI_RESULT

val USE_SSL = UJESSQLDriver.USE_SSL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.ujes.jdbc
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.ujes.client.request.ResultSetAction
import org.apache.linkis.ujes.client.response.ResultSetResult
import org.apache.linkis.ujes.client.utils.UJESClientUtils

import org.apache.commons.lang3.StringUtils

Expand Down Expand Up @@ -76,6 +77,7 @@ class UJESSQLResultSet(
private var path: String = _
private var metaData: util.List[util.Map[String, String]] = _
private val statement: LinkisSQLStatement = ujesStatement
private var nextResultSet: UJESSQLResultSet = _

private val connection: LinkisSQLConnection =
ujesStatement.getConnection.asInstanceOf[LinkisSQLConnection]
Expand All @@ -102,14 +104,28 @@ class UJESSQLResultSet(

private def getResultSetPath(resultSetList: Array[String]): String = {
if (resultSetList.length > 0) {
resultSetList(resultSetList.length - 1)
val enableMultiResult = connection.getProps.getProperty(UJESSQLDriverMain.ENABLE_MULTI_RESULT)
enableMultiResult match {
case "Y" =>
// 配置开启时,返回首个结果集
resultSetList(0)
case _ =>
// 配置关闭时,返回以最后一个结果集为准
resultSetList(resultSetList.length - 1)
}
} else {
""
}
}

private def resultSetResultInit(): Unit = {
if (path == null) path = getResultSetPath(resultSetList)
// 设置下一个结果集
val enableMultiResult = connection.getProps.getProperty(UJESSQLDriverMain.ENABLE_MULTI_RESULT)
if (resultSetList.length > 1 && "Y".equals(enableMultiResult)) {
this.nextResultSet =
new UJESSQLResultSet(resultSetList.drop(1), this.statement, maxRows, fetchSize)
}
val user = connection.getProps.getProperty("user")
if (StringUtils.isNotBlank(path)) {
val resultAction =
Expand Down Expand Up @@ -235,38 +251,7 @@ class UJESSQLResultSet(
}

private def evaluate(dataType: String, value: String): Any = {

if (value == null || value.equals("null") || value.equals("NULL") || value.equals("Null")) {
dataType.toLowerCase(Locale.getDefault) match {
case "string" | "char" | "varchar" | "nvarchar" => value
case _ => null
}
} else {
dataType.toLowerCase(Locale.getDefault) match {
case null => throw new LinkisSQLException(LinkisSQLErrorCode.METADATA_EMPTY)
case "char" | "varchar" | "nvarchar" | "string" => value
case "short" => value.toShort
case "smallint" => value.toShort
case "tinyint" => value.toShort
case "int" => value.toInt
case "long" => value.toLong
case "float" => value.toFloat
case "double" => value.toDouble
case "boolean" => value.toBoolean
case "byte" => value.toByte
case "timestamp" => value
case "date" => value
case "bigint" => value.toLong
case "decimal" => value.toDouble
case "array" => value.toArray
case "map" => value
case _ =>
throw new LinkisSQLException(
LinkisSQLErrorCode.PREPARESTATEMENT_TYPEERROR,
s"Can't infer the SQL type to use for an instance of ${dataType}. Use getObject() with an explicit Types value to specify the type to use"
)
}
}
UJESClientUtils.evaluate(dataType, value)
}

private def getColumnValue(columnIndex: Int): Any = {
Expand Down Expand Up @@ -303,6 +288,10 @@ class UJESSQLResultSet(
}
}

def clearNextResultSet: Any = {
this.nextResultSet = null
}

override def getBoolean(columnIndex: Int): Boolean = {
val any = getColumnValue(columnIndex)
if (wasNull()) {
Expand Down Expand Up @@ -683,6 +672,8 @@ class UJESSQLResultSet(
true
}

def getNextResultSet: UJESSQLResultSet = this.nextResultSet

override def setFetchDirection(direction: Int): Unit = {
throw new LinkisSQLException(LinkisSQLErrorCode.NOSUPPORT_RESULTSET)
}
Expand Down
Loading

0 comments on commit c798324

Please sign in to comment.