diff --git a/README.md b/README.md index dda0518..5f741aa 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,33 @@ # Spark JDBC Driver -This driver wraps the Spark Hive Driver and adds metadata capability using Spark's Catalog API. +Because the Spark 2.x ThriftServer doesn't implement the Hive JDBC Driver's Catalog API, when using Spark's version of the Hive JDBC driver (`org.spark-project.hive:hive-jdbc:1.2.1.spark2`) +within BI tools such as [DataGrip](https://www.jetbrains.com/datagrip/), the tools are unable to introspect the catalog schema (databases, tables, views, columns, etc). + +This driver fixes that by wrapping the default Spark Hive Driver (`org.spark-project.hive:hive-jdbc:1.2.1.spark2`) and adds introspection support by implementing the missing calls with calls +to Spark's [SHOW/DESCRIBE](https://docs.databricks.com/spark/latest/spark-sql/language-manual/index.html#describe-statements) commands. ### Download -Go to Releases page and download the latest JAR version +Go to [Releases](https://github.com/hindog/spark-jdbc/releases) and download the latest JAR version. ### Configuration -JDBC URL patterns are same as [Hive's](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-ConnectionURLs). +Add the new Driver to your BI tool using the downloaded jar. + +The driver class is `com.hindog.spark.jdbc.SparkDriver`. + +JDBC url patterns are same as the [Hive Driver](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-ConnectionURLs). You can also use `spark` for the driver scheme, so the JDBC url `jdbc:hive2://:/` would be equivalent to `jdbc:spark://:/` + +### Building + +Run `sbt assembly:assembly` to build the driver jar. + +The output jar will be written to `target/scala-2.11/spark-jdbc-.jar` and it will include all the required dependencies and so it will be quite large. + +### Limitations -You can also use `spark` +- Spark 2.x does not support multiple "catalogs", so a hard-coded catalog named `Spark` will be used. You can override this by setting the system property `com.hindog.spark.jdbc.catalog.name` with the desired catalog name. +- Spark does not support column sizes, so the driver will return the value of the column's `defaultSize` value. +- Spark `DESCRIBE` & `SHOW` commands return a limited amount of metadata, so most JDBC metadata will not be populated. +- Introspection can take some time to run as a separate SQL statement must be issued to introspect each database/table/column/function, etc. diff --git a/build.sbt b/build.sbt index 51d1ca9..a2df956 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,5 @@ name := "spark-jdbc" -version := "1.0.0" - scalaVersion := "2.11.12" libraryDependencies += "org.spark-project.hive" % "hive-jdbc" % "1.2.1.spark2" @@ -10,7 +8,6 @@ libraryDependencies += "com.h2database" % "h2" % "1.4.200" exclude("org.apache.c libraryDependencies += "com.google.code.gson" % "gson" % "2.8.6" exclude("org.apache.commons", "commons-lang3") libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.5" libraryDependencies += "org.scalatest" %% "scalatest" % "3.1.0" % "test" -libraryDependencies += "org.jsoup" % "jsoup" % "1.12.1" % "test" // used to pull function definitions from API docs // Github Release ghreleaseRepoOrg := "hindog" diff --git a/notes/1.1.0.markdown b/notes/1.1.0.markdown new file mode 100644 index 0000000..80342b2 --- /dev/null +++ b/notes/1.1.0.markdown @@ -0,0 +1,7 @@ +## Spark JDBC 1.1.0 Release + +- Fix issue with Spark truncating the DDL of very large column types causing introspection exception +- Return Spark's raw column type name in column metadata (but may be truncated by Spark) +- Allow root catalog name to be overridden using the `com.hindog.spark.jdbc.catalog.name` system property +- Removed some unused code + diff --git a/src/main/scala/com/hindog/spark/jdbc/catalog/CatalogMetadata.scala b/src/main/scala/com/hindog/spark/jdbc/catalog/CatalogMetadata.scala index 2a6974e..06b3192 100644 --- a/src/main/scala/com/hindog/spark/jdbc/catalog/CatalogMetadata.scala +++ b/src/main/scala/com/hindog/spark/jdbc/catalog/CatalogMetadata.scala @@ -1,12 +1,11 @@ package com.hindog.spark.jdbc.catalog -import java.sql.{Connection, ResultSet} - -import com.hindog.spark.jdbc.SparkConnection import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{StringType, StructField, StructType} +import java.sql.{Connection, ResultSet} + class CatalogMetadata(conn: Connection) extends AbstractMetadata(conn) { override def fetch() = { if (isDebugEnabled) { @@ -22,5 +21,5 @@ class CatalogMetadata(conn: Connection) extends AbstractMetadata(conn) { } object CatalogMetadata { - def catalogName: String = "Spark" + def catalogName: String = sys.props.getOrElse("com.hindog.spark.jdbc.catalog.name", "Spark") } diff --git a/src/main/scala/com/hindog/spark/jdbc/catalog/ColumnMetadata.scala b/src/main/scala/com/hindog/spark/jdbc/catalog/ColumnMetadata.scala index a3e5b39..ce0e53d 100644 --- a/src/main/scala/com/hindog/spark/jdbc/catalog/ColumnMetadata.scala +++ b/src/main/scala/com/hindog/spark/jdbc/catalog/ColumnMetadata.scala @@ -8,9 +8,16 @@ import org.apache.spark.SparkException import scala.collection._ import scala.util.control.NonFatal +import scala.util.matching.Regex class ColumnMetadata(catalog: String, schemaPattern: String, tablePattern: String, columnPattern: String, conn: Connection) extends AbstractMetadata(conn) { + // Used to fix column type DDL definitions when Spark truncates it due to excessive size. + // For example, it will add "... 5 more fields" for very large struct types and then we are unable to parse column DDL. + // This regex will remove that so that the type can be parsed, except without the additional columns. + // This doesn't really impact anything on the JDBC side since we don't return the full type, only the "top-level" type of a colunn + lazy val truncatedTypeRegex: Regex = "(.*),\\.\\.\\.\\s\\d+\\smore\\sfields(.*)".r + override def fetch(): List[Row] = { if (isDebugEnabled) { log.info(s"Fetching Table Metadata for catalog: $catalog, schemaPattern: $schemaPattern, tablePattern: $tablePattern, columnPattern: $columnPattern") @@ -26,15 +33,25 @@ class ColumnMetadata(catalog: String, schemaPattern: String, tablePattern: Strin while (columns.next() && !columns.getString("col_name").startsWith("#")) { try { - val jdbcType = JdbcType.apply(DataType.fromDDL(columns.getString("DATA_TYPE"))) + val rawDataType = columns.getString("DATA_TYPE") + + // parse the Spark column DDL. If the type was truncated due to size, we'll patch it up first + val dataType = truncatedTypeRegex.findFirstMatchIn(rawDataType) match { + case Some(m) => m.group(1) + m.group(2) // combine parts before and after the "... N more fields" + case None => rawDataType + } + + val jdbcType = JdbcType.apply(DataType.fromDDL(dataType)) val sparkType = jdbcType.toSparkType + + // https://docs.oracle.com/javase/7/docs/api/java/sql/DatabaseMetaData.html#getColumns(java.lang.String,%20java.lang.String,%20java.lang.String,%20java.lang.String) val row = new GenericRowWithSchema(Array( tables.getString("TABLE_CAT"), tables.getString("TABLE_SCHEM"), tables.getString("TABLE_NAME"), columns.getString("col_name"), jdbcType.sqlType, - jdbcType.name, + rawDataType, sparkType.defaultSize, null, if (jdbcType.scale == 0) null else jdbcType.scale, diff --git a/src/test/scala/com/hindog/spark/jdbc/ExtractSparkFunctions.scala b/src/test/scala/com/hindog/spark/jdbc/ExtractSparkFunctions.scala deleted file mode 100644 index a441f93..0000000 --- a/src/test/scala/com/hindog/spark/jdbc/ExtractSparkFunctions.scala +++ /dev/null @@ -1,23 +0,0 @@ -package com.hindog.spark.jdbc - -import org.jsoup.Jsoup - -import scala.collection.JavaConverters._ - -import java.net.URL - -object ExtractSparkFunctions extends App { - val sparkVersion = "2.4.4" - val url = s"https://spark.apache.org/docs/${sparkVersion}/api/sql/" - val doc = Jsoup.parse(new URL(url), 60000) - val main = doc.select("div[role=main]") - val elements = main.select("h3, h3+p") - - elements.asScala.grouped(2).foreach { func => - val name = func(0).text() - val desc = func(1).text() - if (desc.trim.nonEmpty) { - println(s"$name\t$desc") - } - } -} diff --git a/version.sbt b/version.sbt index 2de7261..46307ee 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "1.0.0" +ThisBuild / version := "1.1.0"