Skip to content

Commit

Permalink
[SPARK-49991][SQL] Make HadoopMapReduceCommitProtocol respect 'mapred…
Browse files Browse the repository at this point in the history
…uce.output.basename' to generate file names

### What changes were proposed in this pull request?

In 'HadoopMapReduceCommitProtocol', task output files are generated ahead instead of calling `org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getDefaultWorkFile`, which uses the `mapreduce.output.basename` as the prefix of output files.
In this pull request, we modify the `HadoopMapReduceCommitProtocol.getFilename` method to also look up this config instead of using the hardcoded 'part'.

### Why are the changes needed?

Given a custom file name is a useful feature for users. They can use it to distinguish files added by different engines, on different days, etc. We can also align the usage scenario with other SQL on Hadoop engines for better Hadoop compatibility.

### Does this PR introduce _any_ user-facing change?

Yes, a Hadoop configuration 'mapreduce.output.basename' can be used in file datasource output files

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?
no`

Closes apache#48494 from yaooqinn/SPARK-49991.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Oct 18, 2024
1 parent 75b8666 commit d90145d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class HadoopMapReduceCommitProtocol(
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
f"${spec.prefix}part-$split%05d-$jobId${spec.suffix}"
val basename = taskContext.getConfiguration.get("mapreduce.output.basename", "part")
f"${spec.prefix}$basename-$split%05d-$jobId${spec.suffix}"
}

override def setupJob(jobContext: JobContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
}

test("SPARK-49991: Respect 'mapreduce.output.basename' to generate file names") {
withTempPath { dir =>
withSQLConf("mapreduce.output.basename" -> "apachespark") {
spark.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
val df = spark.read.parquet(dir.getCanonicalPath)
assert(df.inputFiles.head.contains("apachespark"))
checkAnswer(spark.read.parquet(dir.getCanonicalPath), Row(0))
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down

0 comments on commit d90145d

Please sign in to comment.