From e2a825c5519c69ea5ff1a6ebdcbca2efb60f9d86 Mon Sep 17 00:00:00 2001 From: Andrey Shitov Date: Tue, 12 Dec 2023 16:32:16 +0300 Subject: [PATCH] SPARK-39910 backport --- dev/.rat-excludes | 2 + .../execution/datasources/DataSource.scala | 2 +- .../test-data/test-archive.har/_index | 7 +++ .../test-data/test-archive.har/_masterindex | 2 + .../test-data/test-archive.har/part-0 | 0 .../datasources/DataSourceSuite.scala | 57 +++++++++++++++++++ .../sql/test/DataFrameReaderWriterSuite.scala | 14 +++++ 7 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/test-data/test-archive.har/_index create mode 100644 sql/core/src/test/resources/test-data/test-archive.har/_masterindex create mode 100644 sql/core/src/test/resources/test-data/test-archive.har/part-0 diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 59a7e8ac20fe9..340aea5217b26 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -138,6 +138,8 @@ over10k exported_table/* ansible-for-test-node/* node_modules +.*\.har + spark-events-broken/* # Spark Connect related files with custom licence any.proto diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 94dd3bc0bd63e..2e24087d507bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -722,7 +722,7 @@ object DataSource extends Logging { val qualifiedPaths = pathStrings.map { pathString => val path = new Path(pathString) val fs = path.getFileSystem(hadoopConf) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) + fs.makeQualified(path) } // Split the paths into glob and non glob paths, because we don't need to do an existence check diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_index b/sql/core/src/test/resources/test-data/test-archive.har/_index new file mode 100644 index 0000000000000..fc1ad0227fad0 --- /dev/null +++ b/sql/core/src/test/resources/test-data/test-archive.har/_index @@ -0,0 +1,7 @@ +%2F dir 1697722622766+493+tigrulya+hadoop 0 0 test.txt test.orc test.parquet test.json test.csv +%2Ftest.json file part-0 18 36 1697722536260+420+tigrulya+hadoop +%2Ftest.parquet file part-0 54 448 1697722536359+420+tigrulya+hadoop +%2Ftest.csv file part-0 6 6 1697722536193+420+tigrulya+hadoop +%2Ftest.orc file part-0 502 259 1697722536317+420+tigrulya+hadoop +%2Ftest.txt file part-0 12 6 1697722536407+420+tigrulya+hadoop +%2Ftest.txt file part-0 0 6 1697722536407+420+tigrulya+hadoop diff --git a/sql/core/src/test/resources/test-data/test-archive.har/_masterindex b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex new file mode 100644 index 0000000000000..8762c0ae5843e --- /dev/null +++ b/sql/core/src/test/resources/test-data/test-archive.har/_masterindex @@ -0,0 +1,2 @@ +3 +0 1948563523 0 490 diff --git a/sql/core/src/test/resources/test-data/test-archive.har/part-0 b/sql/core/src/test/resources/test-data/test-archive.har/part-0 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala index 06e570cb016b0..4da42df41e8b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.PrivateMethodTester @@ -156,6 +158,35 @@ class DataSourceSuite extends SharedSparkSession with PrivateMethodTester { val expectMessage = "No FileSystem for scheme nonexistentFs" assert(message.filterNot(Set(':', '"').contains) == expectMessage) } + + test("SPARK-39910: test Hadoop archive non glob paths") { + val absoluteHarPaths = buildFullHarPaths(allRelativeHarPaths) + + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + absoluteHarPaths.map(_.toString), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = true + ) + + assert(resultPaths.toSet === absoluteHarPaths.toSet) + } + + test("SPARK-39910: test Hadoop archive glob paths") { + val harGlobPaths = buildFullHarPaths(Seq(globRelativeHarPath)) + + val resultPaths = DataSource.checkAndGlobPathIfNecessary( + harGlobPaths.map(_.toString), + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = true + ) + + val expectedHarPaths = buildFullHarPaths(allRelativeHarPaths) + assert(resultPaths.toSet === expectedHarPaths.toSet) + } } object TestPaths { @@ -197,11 +228,35 @@ object TestPaths { ) ) + val txtRelativeHarPath = new Path("/test.txt") + val csvRelativeHarPath = new Path("/test.csv") + val jsonRelativeHarPath = new Path("/test.json") + val parquetRelativeHarPath = new Path("/test.parquet") + val orcRelativeHarPath = new Path("/test.orc") + val globRelativeHarPath = new Path("/test.*") + + val allRelativeHarPaths = Seq( + txtRelativeHarPath, + csvRelativeHarPath, + jsonRelativeHarPath, + parquetRelativeHarPath, + orcRelativeHarPath + ) + def createMockFileStatus(path: String): FileStatus = { val fileStatus = new FileStatus() fileStatus.setPath(new Path(path)) fileStatus } + + def buildFullHarPaths(relativePaths: Seq[Path]): Seq[Path] = { + val archiveUrl = Thread.currentThread().getContextClassLoader + .getResource("test-data/test-archive.har") + .getPath + + val archivePath = new Path("har", "file-:", archiveUrl) + relativePaths.map(Path.mergePaths(archivePath, _)) + } } class MockFileSystem extends RawLocalFileSystem { @@ -214,4 +269,6 @@ class MockFileSystem extends RawLocalFileSystem { override def globStatus(pathPattern: Path): Array[FileStatus] = { mockGlobResults.getOrElse(pathPattern, Array()) } + + override def getUri: URI = URI.create("mockFs://mockFs/") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 44c9fbadfac66..5ab9eebe0046f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -1344,4 +1344,18 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } } + test("SPARK-39910: read files from Hadoop archives") { + val fileSchema = new StructType().add("str", StringType) + val harPath = testFile("test-data/test-archive.har") + .replaceFirst("file:/", "har:/") + + testRead(spark.read.textFile(s"$harPath/test.txt").toDF(), data, textSchema) + testRead(spark.read.schema(fileSchema).csv(s"$harPath/test.csv"), data, fileSchema) + testRead(spark.read.schema(fileSchema).json(s"$harPath/test.json"), data, fileSchema) + testRead(spark.read.schema(fileSchema).parquet(s"$harPath/test.parquet"), data, fileSchema) + + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { + testRead(spark.read.schema(fileSchema).orc(s"$harPath/test.orc"), data, fileSchema) + } + } }