Skip to content

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Sep 18, 2025

What changes were proposed in this pull request?

Reuse InputStream in vectorized Parquet reader between reading the footer and row groups, on the executor side.

This PR is part of SPARK-52011, you can check more details at #50765

Why are the changes needed?

Reduce unnecessary RPCs of NameNode to improve performance and stability for large Hadoop clusters.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

See #50765

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Sep 18, 2025
@pan3793
Copy link
Member Author

pan3793 commented Sep 18, 2025

cc @sunchao @cloud-fan @LuciferYang @viirya

as explained in #50765 (comment), I spilt the executor-side changes to a dedicated PR, please take a look when you have time, thank you in advance.

import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.util.Utils

object ParquetFooterReader {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a strong reason to rewrite it from java to scala?

Copy link
Member Author

@pan3793 pan3793 Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

over 2/3 of the code in the original file is removed, and the newly added method openFileAndReadFooter uses Scala Tuple, Option, which is ugly if written in Java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should create a java record to wrap it instead of using a tuple...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I created a case class OpenedParquetFooter to replace the tuple, please let me know if you have better idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then why not keep it as Java then? Java is more AI friendly and I'm a bit hesitant to turn existing Java code to Scala.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me convert it back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan done, I converted it to Java now.

val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String])
val sharedConf = broadcastedHadoopConf.value.value

val fileFooter = if (enableVectorizedReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buildReaderWithPartitionValues method is super long now, can we create some smaller methods to split it, so that this PR is easier to review?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the actual change is relatively small if you enable the "Hide whitespace"
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea we can use this trick to help review, but my point is that this method is too long and we need to split it sooner or later. Since we are changing it now, maybe we should also split it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 code blocks were extracted from this method as independent methods.

@pan3793 pan3793 requested a review from cloud-fan September 22, 2025 05:31
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM mostly - just a few nits!

/**
* Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true,
* this will skip reading the Parquet row group metadata.
* Build a filter for reading footer of the input Parquet file 'split'.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the doc is out-dated - there is no 'split'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thing has not changed, the 'split' represents PartitionedFile here

PartitionedFile file,
boolean keepInputStreamOpen) throws IOException {
var readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath())
.withMetadataFilter(buildFilter(hadoopConf, file, !keepInputStreamOpen))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth adding some comments here to explain why we choose to skip row groups when keepInputStreamOpen is false

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment

// Before transferring the ownership of inputStream to the vectorizedReader,
// we must take responsibility to close the inputStream if something goes wrong
// to avoid resource leak.
val shouldCloseInputStream = new AtomicBoolean(openedFooter.inputStreamOpt.isPresent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why this needs to be an AtomicBoolean? also is an boolean flag needed? can we just do

openedFooter.inputStreamOpt.ifPresent(Utils.closeQuietly)

Copy link
Member Author

@pan3793 pan3793 Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we pass shouldCloseInputStream to def buildVectorizedIterator and the flag will be updated by that method, so we must use a reference instead of a primitive type.

the suggestion works but will introduce many unnecessary close() in normal cases, so I add a flag to avoid that as much as possible.

}
}

// scalastyle:off argcount
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the following changes are just refactoring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

effective code at L338-344, others are just method extraction

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @pan3793 , @cloud-fan , @sunchao .

Merged to master for Apache Spark 4.1.0-preview2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants