Skip to content

Commit

Permalink
fix(build): Fix types.
Browse files Browse the repository at this point in the history
  • Loading branch information
Asmoday committed Jan 21, 2025
1 parent 2d67bb1 commit 1eb754e
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 519 deletions.

Large diffs are not rendered by default.

627 changes: 266 additions & 361 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ private[hive] object IsolatedClientLoader extends Logging {

def hiveVersion(version: String): HiveVersion = {
VersionUtils.majorMinorPatchVersion(version).flatMap {
case (12, _, _) | (0, 12, _) => Some(hive.v12)
case (13, _, _) | (0, 13, _) => Some(hive.v13)
case (14, _, _) | (0, 14, _) => Some(hive.v14)
case (1, 0, _) => Some(hive.v1_0)
case (1, 1, _) => Some(hive.v1_1)
case (1, 2, _) => Some(hive.v1_2)
case (2, 0, _) => Some(hive.v2_0)
case (2, 1, _) => Some(hive.v2_1)
case (2, 2, _) => Some(hive.v2_2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm"))

// Since HIVE-23980, calcite-core included in Hive package jar.
case object v2_3 extends HiveVersion("2.3.9",
case object v2_3 extends HiveVersion("2.3.10",
exclusions = Seq("org.apache.calcite:calcite-core",
"org.apache.calcite:calcite-druid",
"org.apache.calcite.avatica:avatica",
"com.fasterxml.jackson.core:*",
"org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm",
"net.hydromatic:aggdesigner-algorithm",
"org.apache.hive:hive-vector-code-gen"))

// Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings
Expand Down Expand Up @@ -107,4 +106,4 @@ package object client {
}
// scalastyle:on

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.execution

import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale, Random}

import scala.util.control.NonFatal
Expand All @@ -29,69 +30,43 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveVersion

class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path)
extends Logging {
private var stagingDirForCreating: Option[Path] = None

lazy val externalTempPath: Path = getExternalTmpPath(path)

private lazy val dateTimeFormatter =
DateTimeFormatter
.ofPattern("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
.withZone(ZoneId.systemDefault())

private def getExternalTmpPath(path: Path): Path = {
import org.apache.spark.sql.hive.client.hive._

// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
// After Hive 1.1, Hive will create the staging directory under the table directory, and when
// Hive will creates the staging directory under the table directory, and when
// moving staging directory to table directory, Hive will still empty the table directory, but
// will exclude the staging directory there.
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)

val externalCatalog = session.sharedState.externalCatalog
val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")

if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, scratchDir)
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, stagingDir)
if (allSupportedHiveVersions.contains(hiveVersion)) {
externalTempPath(path, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
throw SparkException.internalError("Unsupported hive version: " + hiveVersion.fullVersion)
}
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())

val fs = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())
stagingDirForCreating = Some(dirPath)
dirPath
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = {
private def externalTempPath(path: Path, stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
val qualifiedStagingDir = getStagingDir(path, stagingDir)
Expand Down Expand Up @@ -148,8 +123,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P

private def executionId: String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
"hive_" + dateTimeFormatter.format(new Date().toInstant) + "_" + Math.abs(rand.nextLong)
}

def deleteTmpPath() : Unit = {
Expand All @@ -175,7 +149,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P
stagingDirForCreating.foreach { stagingDir =>
val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
throw new IllegalStateException(
throw SparkException.internalError(
"Cannot create staging directory '" + stagingDir.toString + "'")
}
fs.deleteOnExit(stagingDir)
Expand All @@ -190,4 +164,6 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P
def deleteIfNotStagingDir(path: Path, fs: FileSystem): Unit = {
if (Option(path) != stagingDirForCreating) fs.delete(path, true)
}

override def toString: String = s"HiveTempPath($path)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ class HiveClientSuites extends SparkFunSuite with HiveClientVersions {
}

override def nestedSuites: IndexedSeq[Suite] = {
versions.map(new HiveClientSuite(_, versions))
versions.map(new HiveClientSuite(_, versions)).toIndexedSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import org.scalatest.Suite

class HiveClientUserNameSuites extends Suite with HiveClientVersions {
override def nestedSuites: IndexedSeq[Suite] = {
versions.map(new HiveClientUserNameSuite(_))
versions.map(new HiveClientUserNameSuite(_)).toIndexedSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.sql.hive.client

private[client] trait HiveClientVersions {
private val testVersions = sys.env.get("SPARK_TEST_HIVE_CLIENT_VERSIONS")
protected val versions = if (testVersions.nonEmpty) {
protected val versions: Seq[String] = if (testVersions.nonEmpty) {
testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq
} else {
IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
Seq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import org.scalatest.Suite
class HivePartitionFilteringSuites extends Suite with HiveClientVersions {
override def nestedSuites: IndexedSeq[Suite] = {
// Hive 0.12 does not provide the partition filtering API we call
versions.filterNot(_ == "0.12").map(new HivePartitionFilteringSuite(_))
versions.filterNot(_ == "0.12").map(new HivePartitionFilteringSuite(_)).toIndexedSeq
}
}

0 comments on commit 1eb754e

Please sign in to comment.