Skip to content

Commit

Permalink
SPARK-39910 backport
Browse files Browse the repository at this point in the history
  • Loading branch information
Asmoday committed Dec 12, 2023
1 parent 0c0e7d4 commit e2a825c
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 1 deletion.
2 changes: 2 additions & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ over10k
exported_table/*
ansible-for-test-node/*
node_modules
.*\.har

spark-events-broken/*
# Spark Connect related files with custom licence
any.proto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ object DataSource extends Logging {
val qualifiedPaths = pathStrings.map { pathString =>
val path = new Path(pathString)
val fs = path.getFileSystem(hadoopConf)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.makeQualified(path)
}

// Split the paths into glob and non glob paths, because we don't need to do an existence check
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/test-data/test-archive.har/_index
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
%2F dir 1697722622766+493+tigrulya+hadoop 0 0 test.txt test.orc test.parquet test.json test.csv
%2Ftest.json file part-0 18 36 1697722536260+420+tigrulya+hadoop
%2Ftest.parquet file part-0 54 448 1697722536359+420+tigrulya+hadoop
%2Ftest.csv file part-0 6 6 1697722536193+420+tigrulya+hadoop
%2Ftest.orc file part-0 502 259 1697722536317+420+tigrulya+hadoop
%2Ftest.txt file part-0 12 6 1697722536407+420+tigrulya+hadoop
%2Ftest.txt file part-0 0 6 1697722536407+420+tigrulya+hadoop
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
3
0 1948563523 0 490
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.PrivateMethodTester
Expand Down Expand Up @@ -156,6 +158,35 @@ class DataSourceSuite extends SharedSparkSession with PrivateMethodTester {
val expectMessage = "No FileSystem for scheme nonexistentFs"
assert(message.filterNot(Set(':', '"').contains) == expectMessage)
}

test("SPARK-39910: test Hadoop archive non glob paths") {
val absoluteHarPaths = buildFullHarPaths(allRelativeHarPaths)

val resultPaths = DataSource.checkAndGlobPathIfNecessary(
absoluteHarPaths.map(_.toString),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true,
enableGlobbing = true
)

assert(resultPaths.toSet === absoluteHarPaths.toSet)
}

test("SPARK-39910: test Hadoop archive glob paths") {
val harGlobPaths = buildFullHarPaths(Seq(globRelativeHarPath))

val resultPaths = DataSource.checkAndGlobPathIfNecessary(
harGlobPaths.map(_.toString),
hadoopConf,
checkEmptyGlobPath = true,
checkFilesExist = true,
enableGlobbing = true
)

val expectedHarPaths = buildFullHarPaths(allRelativeHarPaths)
assert(resultPaths.toSet === expectedHarPaths.toSet)
}
}

object TestPaths {
Expand Down Expand Up @@ -197,11 +228,35 @@ object TestPaths {
)
)

val txtRelativeHarPath = new Path("/test.txt")
val csvRelativeHarPath = new Path("/test.csv")
val jsonRelativeHarPath = new Path("/test.json")
val parquetRelativeHarPath = new Path("/test.parquet")
val orcRelativeHarPath = new Path("/test.orc")
val globRelativeHarPath = new Path("/test.*")

val allRelativeHarPaths = Seq(
txtRelativeHarPath,
csvRelativeHarPath,
jsonRelativeHarPath,
parquetRelativeHarPath,
orcRelativeHarPath
)

def createMockFileStatus(path: String): FileStatus = {
val fileStatus = new FileStatus()
fileStatus.setPath(new Path(path))
fileStatus
}

def buildFullHarPaths(relativePaths: Seq[Path]): Seq[Path] = {
val archiveUrl = Thread.currentThread().getContextClassLoader
.getResource("test-data/test-archive.har")
.getPath

val archivePath = new Path("har", "file-:", archiveUrl)
relativePaths.map(Path.mergePaths(archivePath, _))
}
}

class MockFileSystem extends RawLocalFileSystem {
Expand All @@ -214,4 +269,6 @@ class MockFileSystem extends RawLocalFileSystem {
override def globStatus(pathPattern: Path): Array[FileStatus] = {
mockGlobResults.getOrElse(pathPattern, Array())
}

override def getUri: URI = URI.create("mockFs://mockFs/")
}
Original file line number Diff line number Diff line change
Expand Up @@ -1344,4 +1344,18 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}
test("SPARK-39910: read files from Hadoop archives") {
val fileSchema = new StructType().add("str", StringType)
val harPath = testFile("test-data/test-archive.har")
.replaceFirst("file:/", "har:/")

testRead(spark.read.textFile(s"$harPath/test.txt").toDF(), data, textSchema)
testRead(spark.read.schema(fileSchema).csv(s"$harPath/test.csv"), data, fileSchema)
testRead(spark.read.schema(fileSchema).json(s"$harPath/test.json"), data, fileSchema)
testRead(spark.read.schema(fileSchema).parquet(s"$harPath/test.parquet"), data, fileSchema)

withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") {
testRead(spark.read.schema(fileSchema).orc(s"$harPath/test.orc"), data, fileSchema)
}
}
}

0 comments on commit e2a825c

Please sign in to comment.