Skip to content

Commit

Permalink
Add more columns, replace block id with ordinal+1
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 24, 2023
1 parent 1695420 commit 07b87e0
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 28 deletions.
31 changes: 17 additions & 14 deletions PARQUET.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ spark.read.parquetMetadata("/path/to/parquet").show()
spark.read.parquet_metadata("/path/to/parquet").show()
```
```
+-------------+------+---------------+-----------------+----+--------------------+--------------------+
| filename|blocks|compressedBytes|uncompressedBytes|rows| createdBy| schema|
+-------------+------+---------------+-----------------+----+--------------------+--------------------+
|file1.parquet| 1| 1268| 1652| 100|parquet-mr versio...|message spark_sch...|
|file2.parquet| 2| 2539| 3302| 200|parquet-mr versio...|message spark_sch...|
+-------------+------+---------------+-----------------+----+--------------------+--------------------+
+-------------+------+---------------+-----------------+----+--------------------+--------------------+-----------+--------------------+
| filename|blocks|compressedBytes|uncompressedBytes|rows| createdBy| schema| encryption| keyValues|
+-------------+------+---------------+-----------------+----+--------------------+--------------------+-----------+--------------------+
|file1.parquet| 1| 1268| 1652| 100|parquet-mr versio...|message spark_sch...|UNENCRYPTED|{org.apache.spark...|
|file2.parquet| 2| 2539| 3302| 200|parquet-mr versio...|message spark_sch...|UNENCRYPTED|{org.apache.spark...|
+-------------+------+---------------+-----------------+----+--------------------+--------------------+-----------+--------------------+
```

The Dataframe provides the following per-file information:
Expand All @@ -64,6 +64,8 @@ The Dataframe provides the following per-file information:
|rows |long |Number of rows of all blocks |
|createdBy |string|The createdBy string of the Parquet file, e.g. library used to write the file|
|schema |string|The schema |
|encryption |string|The encryption |
|keyValues |string-to-string map|Key-value data of the file |

## Parquet file schema

Expand Down Expand Up @@ -119,14 +121,13 @@ spark.read.parquetBlocks("/path/to/parquet").show()
spark.read.parquet_blocks("/path/to/parquet").show()
```
```
+-------------+-----+----------+---------------+-----------------+----+
| filename|block|blockStart|compressedBytes|uncompressedBytes|rows|
+-------------+-----+----------+---------------+-----------------+----+
|file1.parquet| 1| 4| 1269| 1651| 100|
|file2.parquet| 1| 4| 1268| 1652| 100|
|file2.parquet| 2| 1273| 1270| 1651| 100|
+-------------+-----+----------+---------------+-----------------+----+
+-------------+-----+----------+---------------+-----------------+----+-------+------+
| filename|block|blockStart|compressedBytes|uncompressedBytes|rows|columns|values|
+-------------+-----+----------+---------------+-----------------+----+-------+------+
|file1.parquet| 1| 4| 1269| 1651| 100| 2| 200|
|file2.parquet| 1| 4| 1268| 1652| 100| 2| 200|
|file2.parquet| 2| 1273| 1270| 1651| 100| 2| 200|
+-------------+-----+----------+---------------+-----------------+----+-------+------+
```

|column |type |description |
Expand All @@ -137,6 +138,8 @@ spark.read.parquet_blocks("/path/to/parquet").show()
|compressedBytes |long |Number of compressed bytes in block |
|uncompressedBytes |long |Number of uncompressed bytes in block |
|rows |long |Number of rows in block |
|columns |int |Number of columns in block |
|values |long |Number of values in block |

## Parquet block column metadata

Expand Down
4 changes: 4 additions & 0 deletions python/gresearch/spark/parquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def parquet_metadata(self: DataFrameReader, *paths: str, parallelism: Optional[i
- rows (long): Number of rows of all blocks
- createdBy (string): The createdBy string of the Parquet file, e.g. library used to write the file
- schema (string): The schema
- encryption (string): The encryption
- keyValues (string-to-string map): Key-value data of the file
:param self: a Spark DataFrameReader
:param paths: paths one or more paths to Parquet files or directories
Expand Down Expand Up @@ -105,6 +107,8 @@ def parquet_blocks(self: DataFrameReader, *paths: str, parallelism: Optional[int
- compressedBytes (long): Number of compressed bytes in block
- uncompressedBytes (long): Number of uncompressed bytes in block
- rows (long): Number of rows in block
- columns (int): Number of columns in block
- values (long): Number of values in block
:param self: a Spark DataFrameReader
:param paths: paths one or more paths to Parquet files or directories
Expand Down
30 changes: 21 additions & 9 deletions src/main/scala/uk/co/gresearch/spark/parquet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.FilePartition
import uk.co.gresearch._

import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`

package object parquet {
Expand All @@ -51,6 +51,8 @@ package object parquet {
* - rows (long): Number of rows of all blocks
* - createdBy (string): The createdBy string of the Parquet file, e.g. library used to write the file
* - schema (string): The schema
* - encryption (string): The encryption
* - keyValues (string-to-string map): Key-value data of the file
*
* @param paths one or more paths to Parquet files or directories
* @return dataframe with Parquet metadata
Expand All @@ -71,6 +73,8 @@ package object parquet {
* - rows (long): Number of rows of all blocks
* - createdBy (string): The createdBy string of the Parquet file, e.g. library used to write the file
* - schema (string): The schema
* - encryption (string): The encryption
* - keyValues (string-to-string map): Key-value data of the file
*
* @param parallelism number of partitions of returned DataFrame
* @param paths one or more paths to Parquet files or directories
Expand All @@ -94,9 +98,11 @@ package object parquet {
footer.getParquetMetadata.getBlocks.asScala.map(_.getRowCount).sum,
footer.getParquetMetadata.getFileMetaData.getCreatedBy,
footer.getParquetMetadata.getFileMetaData.getSchema.toString,
footer.getParquetMetadata.getFileMetaData.getEncryptionType.name(),
footer.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala,
)
}
}.toDF("filename", "blocks", "compressedBytes", "uncompressedBytes", "rows", "createdBy", "schema")
}.toDF("filename", "blocks", "compressedBytes", "uncompressedBytes", "rows", "createdBy", "schema", "encryption", "keyValues")
}

/**
Expand Down Expand Up @@ -206,6 +212,8 @@ package object parquet {
* - compressedBytes (long): Number of compressed bytes in block
* - uncompressedBytes (long): Number of uncompressed bytes in block
* - rows (long): Number of rows in block
* - columns (int): Number of columns in block
* - values (long): Number of values in block
*
* @param paths one or more paths to Parquet files or directories
* @return dataframe with Parquet block metadata
Expand All @@ -220,11 +228,13 @@ package object parquet {
*
* This provides the following per-block information:
* - filename (string): The file name
* - block (int): Block / RowGroup number starting at 1
* - block (int): Block / RowGroup number starting at 1 (block ordinal + 1)
* - blockStart (long): Start position of the block in the Parquet file
* - compressedBytes (long): Number of compressed bytes in block
* - uncompressedBytes (long): Number of uncompressed bytes in block
* - rows (long): Number of rows in block
* - columns (int): Number of columns in block
* - values (long): Number of values in block
*
* @param parallelism number of partitions of returned DataFrame
* @param paths one or more paths to Parquet files or directories
Expand All @@ -240,18 +250,20 @@ package object parquet {

files.flatMap { case (_, file) =>
readFooters(file).flatMap { footer =>
footer.getParquetMetadata.getBlocks.asScala.zipWithIndex.map { case (block, idx) =>
footer.getParquetMetadata.getBlocks.asScala.map { block =>
(
footer.getFile.toString,
idx + 1,
block.getOrdinal + 1,
block.getStartingPos,
block.getCompressedSize,
block.getTotalByteSize,
block.getRowCount,
block.getColumns.asScala.size,
block.getColumns.asScala.map(_.getValueCount).sum,
)
}
}
}.toDF("filename", "block", "blockStart", "compressedBytes", "uncompressedBytes", "rows")
}.toDF("filename", "block", "blockStart", "compressedBytes", "uncompressedBytes", "rows", "columns", "values")
}

/**
Expand Down Expand Up @@ -287,7 +299,7 @@ package object parquet {
*
* This provides the following per-block-column information:
* - filename (string): The file name
* - block (int): Block / RowGroup number starting at 1
* - block (int): Block / RowGroup number starting at 1 (block ordinal + 1)
* - column (string): Block / RowGroup column name
* - codec (string): The coded used to compress the block column values
* - type (string): The data type of the block column
Expand All @@ -313,11 +325,11 @@ package object parquet {

files.flatMap { case (_, file) =>
readFooters(file).flatMap { footer =>
footer.getParquetMetadata.getBlocks.asScala.zipWithIndex.flatMap { case (block, idx) =>
footer.getParquetMetadata.getBlocks.asScala.flatMap { block =>
block.getColumns.asScala.map { column =>
(
footer.getFile.toString,
idx + 1,
block.getOrdinal + 1,
column.getPath.toSeq,
column.getCodec.toString,
column.getPrimitiveType.toString,
Expand Down
19 changes: 14 additions & 5 deletions src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
test(s"read parquet metadata (parallelism=${parallelism.map(_.toString).getOrElse("None")})") {
val createdBy = "parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)"
val schema = "message spark_schema {\\n required int64 id;\\n required double val;\\n}\\n"
val encrypted = "UNENCRYPTED"
val keyValues = Map(
"org.apache.spark.version" -> "3.3.0",
"org.apache.spark.sql.parquet.row.metadata" -> """{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"val","type":"double","nullable":false,"metadata":{}}]}"""
)

assertDf(
spark.read
Expand All @@ -82,10 +87,12 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
StructField("rows", LongType, nullable = false),
StructField("createdBy", StringType, nullable = true),
StructField("schema", StringType, nullable = true),
StructField("encryption", StringType, nullable = true),
StructField("keyValues", MapType(StringType, StringType, valueContainsNull = true), nullable = true),
)),
Seq(
Row("file1.parquet", 1, 1268, 1652, 100, createdBy, schema),
Row("file2.parquet", 2, 2539, 3302, 200, createdBy, schema),
Row("file1.parquet", 1, 1268, 1652, 100, createdBy, schema, encrypted, keyValues),
Row("file2.parquet", 2, 2539, 3302, 200, createdBy, schema, encrypted, keyValues),
),
parallelism
)
Expand Down Expand Up @@ -146,11 +153,13 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion {
StructField("compressedBytes", LongType, nullable = false),
StructField("uncompressedBytes", LongType, nullable = false),
StructField("rows", LongType, nullable = false),
StructField("columns", IntegerType, nullable = false),
StructField("values", LongType, nullable = false),
)),
Seq(
Row("file1.parquet", 1, 4, 1268, 1652, 100),
Row("file2.parquet", 1, 4, 1269, 1651, 100),
Row("file2.parquet", 2, 1273, 1270, 1651, 100),
Row("file1.parquet", 1, 4, 1268, 1652, 100, 2, 200),
Row("file2.parquet", 1, 4, 1269, 1651, 100, 2, 200),
Row("file2.parquet", 2, 1273, 1270, 1651, 100, 2, 200),
),
parallelism
)
Expand Down

0 comments on commit 07b87e0

Please sign in to comment.