diff --git a/PARQUET.md b/PARQUET.md index 3cdf55ee..37f83808 100644 --- a/PARQUET.md +++ b/PARQUET.md @@ -45,12 +45,12 @@ spark.read.parquetMetadata("/path/to/parquet").show() spark.read.parquet_metadata("/path/to/parquet").show() ``` ``` -+-------------+------+---------------+-----------------+----+--------------------+--------------------+ -| filename|blocks|compressedBytes|uncompressedBytes|rows| createdBy| schema| -+-------------+------+---------------+-----------------+----+--------------------+--------------------+ -|file1.parquet| 1| 1268| 1652| 100|parquet-mr versio...|message spark_sch...| -|file2.parquet| 2| 2539| 3302| 200|parquet-mr versio...|message spark_sch...| -+-------------+------+---------------+-----------------+----+--------------------+--------------------+ ++-------------+------+---------------+-----------------+----+--------------------+--------------------+-----------+--------------------+ +| filename|blocks|compressedBytes|uncompressedBytes|rows| createdBy| schema| encryption| keyValues| ++-------------+------+---------------+-----------------+----+--------------------+--------------------+-----------+--------------------+ +|file1.parquet| 1| 1268| 1652| 100|parquet-mr versio...|message spark_sch...|UNENCRYPTED|{org.apache.spark...| +|file2.parquet| 2| 2539| 3302| 200|parquet-mr versio...|message spark_sch...|UNENCRYPTED|{org.apache.spark...| ++-------------+------+---------------+-----------------+----+--------------------+--------------------+-----------+--------------------+ ``` The Dataframe provides the following per-file information: @@ -64,6 +64,8 @@ The Dataframe provides the following per-file information: |rows |long |Number of rows of all blocks | |createdBy |string|The createdBy string of the Parquet file, e.g. library used to write the file| |schema |string|The schema | +|encryption |string|The encryption | +|keyValues |string-to-string map|Key-value data of the file | ## Parquet file schema @@ -119,14 +121,13 @@ spark.read.parquetBlocks("/path/to/parquet").show() spark.read.parquet_blocks("/path/to/parquet").show() ``` ``` -+-------------+-----+----------+---------------+-----------------+----+ -| filename|block|blockStart|compressedBytes|uncompressedBytes|rows| -+-------------+-----+----------+---------------+-----------------+----+ -|file1.parquet| 1| 4| 1269| 1651| 100| -|file2.parquet| 1| 4| 1268| 1652| 100| -|file2.parquet| 2| 1273| 1270| 1651| 100| -+-------------+-----+----------+---------------+-----------------+----+ - ++-------------+-----+----------+---------------+-----------------+----+-------+------+ +| filename|block|blockStart|compressedBytes|uncompressedBytes|rows|columns|values| ++-------------+-----+----------+---------------+-----------------+----+-------+------+ +|file1.parquet| 1| 4| 1269| 1651| 100| 2| 200| +|file2.parquet| 1| 4| 1268| 1652| 100| 2| 200| +|file2.parquet| 2| 1273| 1270| 1651| 100| 2| 200| ++-------------+-----+----------+---------------+-----------------+----+-------+------+ ``` |column |type |description | @@ -137,6 +138,8 @@ spark.read.parquet_blocks("/path/to/parquet").show() |compressedBytes |long |Number of compressed bytes in block | |uncompressedBytes |long |Number of uncompressed bytes in block | |rows |long |Number of rows in block | +|columns |int |Number of columns in block | +|values |long |Number of values in block | ## Parquet block column metadata diff --git a/python/gresearch/spark/parquet/__init__.py b/python/gresearch/spark/parquet/__init__.py index 7e18e8c7..254b98dd 100644 --- a/python/gresearch/spark/parquet/__init__.py +++ b/python/gresearch/spark/parquet/__init__.py @@ -41,6 +41,8 @@ def parquet_metadata(self: DataFrameReader, *paths: str, parallelism: Optional[i - rows (long): Number of rows of all blocks - createdBy (string): The createdBy string of the Parquet file, e.g. library used to write the file - schema (string): The schema + - encryption (string): The encryption + - keyValues (string-to-string map): Key-value data of the file :param self: a Spark DataFrameReader :param paths: paths one or more paths to Parquet files or directories @@ -105,6 +107,8 @@ def parquet_blocks(self: DataFrameReader, *paths: str, parallelism: Optional[int - compressedBytes (long): Number of compressed bytes in block - uncompressedBytes (long): Number of uncompressed bytes in block - rows (long): Number of rows in block + - columns (int): Number of columns in block + - values (long): Number of values in block :param self: a Spark DataFrameReader :param paths: paths one or more paths to Parquet files or directories diff --git a/src/main/scala/uk/co/gresearch/spark/parquet/package.scala b/src/main/scala/uk/co/gresearch/spark/parquet/package.scala index e5e0ca81..fdce47fc 100644 --- a/src/main/scala/uk/co/gresearch/spark/parquet/package.scala +++ b/src/main/scala/uk/co/gresearch/spark/parquet/package.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.datasources.FilePartition import uk.co.gresearch._ -import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter} import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` package object parquet { @@ -51,6 +51,8 @@ package object parquet { * - rows (long): Number of rows of all blocks * - createdBy (string): The createdBy string of the Parquet file, e.g. library used to write the file * - schema (string): The schema + * - encryption (string): The encryption + * - keyValues (string-to-string map): Key-value data of the file * * @param paths one or more paths to Parquet files or directories * @return dataframe with Parquet metadata @@ -71,6 +73,8 @@ package object parquet { * - rows (long): Number of rows of all blocks * - createdBy (string): The createdBy string of the Parquet file, e.g. library used to write the file * - schema (string): The schema + * - encryption (string): The encryption + * - keyValues (string-to-string map): Key-value data of the file * * @param parallelism number of partitions of returned DataFrame * @param paths one or more paths to Parquet files or directories @@ -94,9 +98,11 @@ package object parquet { footer.getParquetMetadata.getBlocks.asScala.map(_.getRowCount).sum, footer.getParquetMetadata.getFileMetaData.getCreatedBy, footer.getParquetMetadata.getFileMetaData.getSchema.toString, + footer.getParquetMetadata.getFileMetaData.getEncryptionType.name(), + footer.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala, ) } - }.toDF("filename", "blocks", "compressedBytes", "uncompressedBytes", "rows", "createdBy", "schema") + }.toDF("filename", "blocks", "compressedBytes", "uncompressedBytes", "rows", "createdBy", "schema", "encryption", "keyValues") } /** @@ -206,6 +212,8 @@ package object parquet { * - compressedBytes (long): Number of compressed bytes in block * - uncompressedBytes (long): Number of uncompressed bytes in block * - rows (long): Number of rows in block + * - columns (int): Number of columns in block + * - values (long): Number of values in block * * @param paths one or more paths to Parquet files or directories * @return dataframe with Parquet block metadata @@ -220,11 +228,13 @@ package object parquet { * * This provides the following per-block information: * - filename (string): The file name - * - block (int): Block / RowGroup number starting at 1 + * - block (int): Block / RowGroup number starting at 1 (block ordinal + 1) * - blockStart (long): Start position of the block in the Parquet file * - compressedBytes (long): Number of compressed bytes in block * - uncompressedBytes (long): Number of uncompressed bytes in block * - rows (long): Number of rows in block + * - columns (int): Number of columns in block + * - values (long): Number of values in block * * @param parallelism number of partitions of returned DataFrame * @param paths one or more paths to Parquet files or directories @@ -240,18 +250,20 @@ package object parquet { files.flatMap { case (_, file) => readFooters(file).flatMap { footer => - footer.getParquetMetadata.getBlocks.asScala.zipWithIndex.map { case (block, idx) => + footer.getParquetMetadata.getBlocks.asScala.map { block => ( footer.getFile.toString, - idx + 1, + block.getOrdinal + 1, block.getStartingPos, block.getCompressedSize, block.getTotalByteSize, block.getRowCount, + block.getColumns.asScala.size, + block.getColumns.asScala.map(_.getValueCount).sum, ) } } - }.toDF("filename", "block", "blockStart", "compressedBytes", "uncompressedBytes", "rows") + }.toDF("filename", "block", "blockStart", "compressedBytes", "uncompressedBytes", "rows", "columns", "values") } /** @@ -287,7 +299,7 @@ package object parquet { * * This provides the following per-block-column information: * - filename (string): The file name - * - block (int): Block / RowGroup number starting at 1 + * - block (int): Block / RowGroup number starting at 1 (block ordinal + 1) * - column (string): Block / RowGroup column name * - codec (string): The coded used to compress the block column values * - type (string): The data type of the block column @@ -313,11 +325,11 @@ package object parquet { files.flatMap { case (_, file) => readFooters(file).flatMap { footer => - footer.getParquetMetadata.getBlocks.asScala.zipWithIndex.flatMap { case (block, idx) => + footer.getParquetMetadata.getBlocks.asScala.flatMap { block => block.getColumns.asScala.map { column => ( footer.getFile.toString, - idx + 1, + block.getOrdinal + 1, column.getPath.toSeq, column.getCodec.toString, column.getPrimitiveType.toString, diff --git a/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala b/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala index 4b3e77fe..f59249d1 100644 --- a/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala @@ -67,6 +67,11 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion { test(s"read parquet metadata (parallelism=${parallelism.map(_.toString).getOrElse("None")})") { val createdBy = "parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)" val schema = "message spark_schema {\\n required int64 id;\\n required double val;\\n}\\n" + val encrypted = "UNENCRYPTED" + val keyValues = Map( + "org.apache.spark.version" -> "3.3.0", + "org.apache.spark.sql.parquet.row.metadata" -> """{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"val","type":"double","nullable":false,"metadata":{}}]}""" + ) assertDf( spark.read @@ -82,10 +87,12 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion { StructField("rows", LongType, nullable = false), StructField("createdBy", StringType, nullable = true), StructField("schema", StringType, nullable = true), + StructField("encryption", StringType, nullable = true), + StructField("keyValues", MapType(StringType, StringType, valueContainsNull = true), nullable = true), )), Seq( - Row("file1.parquet", 1, 1268, 1652, 100, createdBy, schema), - Row("file2.parquet", 2, 2539, 3302, 200, createdBy, schema), + Row("file1.parquet", 1, 1268, 1652, 100, createdBy, schema, encrypted, keyValues), + Row("file2.parquet", 2, 2539, 3302, 200, createdBy, schema, encrypted, keyValues), ), parallelism ) @@ -146,11 +153,13 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion { StructField("compressedBytes", LongType, nullable = false), StructField("uncompressedBytes", LongType, nullable = false), StructField("rows", LongType, nullable = false), + StructField("columns", IntegerType, nullable = false), + StructField("values", LongType, nullable = false), )), Seq( - Row("file1.parquet", 1, 4, 1268, 1652, 100), - Row("file2.parquet", 1, 4, 1269, 1651, 100), - Row("file2.parquet", 2, 1273, 1270, 1651, 100), + Row("file1.parquet", 1, 4, 1268, 1652, 100, 2, 200), + Row("file2.parquet", 1, 4, 1269, 1651, 100, 2, 200), + Row("file2.parquet", 2, 1273, 1270, 1651, 100, 2, 200), ), parallelism )