Skip to content

Commit

Permalink
Make new columns work with all Spark / Parquet versions
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 24, 2023
1 parent 07b87e0 commit 5eabd0a
Show file tree
Hide file tree
Showing 16 changed files with 113 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 G-Research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.co.gresearch.spark.parquet

import org.apache.parquet.hadoop.metadata.BlockMetaData

object BlockMetaDataUtil {
def getOrdinal(block: BlockMetaData): Option[Int] = None
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 G-Research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.co.gresearch.spark.parquet

import org.apache.parquet.hadoop.metadata.BlockMetaData

object BlockMetaDataUtil {
def getOrdinal(block: BlockMetaData): Option[Int] = Some(block.getOrdinal)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2023 G-Research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.co.gresearch.spark.parquet

import org.apache.parquet.hadoop.metadata.FileMetaData

object FileMetaDataUtil {
def getEncryptionType(fileMetaData: FileMetaData): Option[String] = None
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 G-Research
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.co.gresearch.spark.parquet

import org.apache.parquet.hadoop.metadata.FileMetaData

object FileMetaDataUtil {
def getEncryptionType(fileMetaData: FileMetaData): Option[String] =
Some(fileMetaData.getEncryptionType.name())
}
10 changes: 5 additions & 5 deletions src/main/scala/uk/co/gresearch/spark/parquet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ package object parquet {
footer.getParquetMetadata.getBlocks.asScala.map(_.getRowCount).sum,
footer.getParquetMetadata.getFileMetaData.getCreatedBy,
footer.getParquetMetadata.getFileMetaData.getSchema.toString,
footer.getParquetMetadata.getFileMetaData.getEncryptionType.name(),
FileMetaDataUtil.getEncryptionType(footer.getParquetMetadata.getFileMetaData),
footer.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala,
)
}
Expand Down Expand Up @@ -250,10 +250,10 @@ package object parquet {

files.flatMap { case (_, file) =>
readFooters(file).flatMap { footer =>
footer.getParquetMetadata.getBlocks.asScala.map { block =>
footer.getParquetMetadata.getBlocks.asScala.zipWithIndex.map { case (block, idx) =>
(
footer.getFile.toString,
block.getOrdinal + 1,
BlockMetaDataUtil.getOrdinal(block).getOrElse(idx) + 1,
block.getStartingPos,
block.getCompressedSize,
block.getTotalByteSize,
Expand Down Expand Up @@ -325,11 +325,11 @@ package object parquet {

files.flatMap { case (_, file) =>
readFooters(file).flatMap { footer =>
footer.getParquetMetadata.getBlocks.asScala.flatMap { block =>
footer.getParquetMetadata.getBlocks.asScala.zipWithIndex.flatMap { case (block, idx) =>
block.getColumns.asScala.map { column =>
(
footer.getFile.toString,
block.getOrdinal + 1,
BlockMetaDataUtil.getOrdinal(block).getOrElse(idx) + 1,
column.getPath.toSeq,
column.getCodec.toString,
column.getPrimitiveType.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
assert(replaced.collect() === expectedRows)
}

val hasEncryption: Boolean = SparkMajorVersion > 3 || SparkMinorVersion > 4
val UNENCRYPTED: String = if (hasEncryption) "UNENCRYPTED" else null

parallelisms.foreach { parallelism =>
test(s"read parquet metadata (parallelism=${parallelism.map(_.toString).getOrElse("None")})") {
val createdBy = "parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)"
val schema = "message spark_schema {\\n required int64 id;\\n required double val;\\n}\\n"
val encrypted = "UNENCRYPTED"
val keyValues = Map(
"org.apache.spark.version" -> "3.3.0",
"org.apache.spark.sql.parquet.row.metadata" -> """{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"val","type":"double","nullable":false,"metadata":{}}]}"""
Expand All @@ -91,8 +93,8 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
StructField("keyValues", MapType(StringType, StringType, valueContainsNull = true), nullable = true),
)),
Seq(
Row("file1.parquet", 1, 1268, 1652, 100, createdBy, schema, encrypted, keyValues),
Row("file2.parquet", 2, 2539, 3302, 200, createdBy, schema, encrypted, keyValues),
Row("file1.parquet", 1, 1268, 1652, 100, createdBy, schema, UNENCRYPTED, keyValues),
Row("file2.parquet", 2, 2539, 3302, 200, createdBy, schema, UNENCRYPTED, keyValues),
),
parallelism
)
Expand Down

0 comments on commit 5eabd0a

Please sign in to comment.