diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e3a7709..6348117 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.12.19, 2.13.14, 3.5.0] + scala: [2.13.14, 3.5.0] java: [temurin@11, temurin@17] runs-on: ${{ matrix.os }} steps: diff --git a/.gitignore b/.gitignore index 29a3f7d..07f600e 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ private/ .metals/ .vscode/ metals.sbt +.scala-build diff --git a/.scalafmt.conf b/.scalafmt.conf index 08e8add..995ca42 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -11,5 +11,5 @@ spaces { inImportCurlyBraces = true } optIn.annotationNewlines = true -runner.dialect = scala213 +runner.dialect = scala3 rewrite.rules = [SortImports, RedundantBraces] diff --git a/README.md b/README.md index f6a5125..42b8d3f 100644 --- a/README.md +++ b/README.md @@ -5,10 +5,18 @@ # ZIO Apache Parquet -ZIO based wrapper for [Apache Parquet Java implementation](https://github.com/apache/parquet-mr) that -leverages [ZIO Schema](https://zio.dev/zio-schema/) to derive codecs +A ZIO-powered wrapper for [Apache Parquet's Java implementation](https://github.com/apache/parquet-mr), leveraging [ZIO Schema](https://zio.dev/zio-schema/) to automatically derive codecs and provide type-safe filter predicates. Operate your parquet files easily using a top-notch ZIO-powered ecosystem without running a Spark cluster. -## Content +Ready for more? Check out my other game-changing library that makes working with Apache Arrow format a breeze - [ZIO Apache Arrow](https://github.com/grouzen/zio-apache-arrow). + +## Why? + +- **No Spark required** - you don't need to run a Spark cluster to read/write Parquet files. +- **ZIO native** - utilizes various ZIO features to offer a FP-oriented way of working with the Parquet API. +- **ZIO Schema** - the backbone that powers all the cool features of this library such as type-safe filter predicates and codecs derivation. + + +## Contents - [Installation](#installation) - [Usage](#usage) @@ -16,6 +24,7 @@ leverages [ZIO Schema](https://zio.dev/zio-schema/) to derive codecs - [Schema](#schema) - [Value](#value) - [Reading & Writing files](#reading--writing-files) + - [Filtering](#filtering) ## Installation @@ -25,6 +34,8 @@ libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION ## Usage +All examples are self-contained [Scala CLI](https://scala-cli.virtuslab.org) snippets. You can find copies of them in `docs/scala-cli`. + ### Codecs To be able to write/read data to/from parquet files you need to define the following schema and value codecs @@ -35,152 +46,153 @@ To be able to write/read data to/from parquet files you need to define the follo You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver: ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 - -import zio.schema._ -import me.mnedokushev.zio.apache.parquet.core.codec._ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 -case class MyRecord(a: Int, b: String, c: Option[Long]) +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* -object MyRecord { - implicit val schema: Schema[MyRecord] = - DeriveSchema.gen[MyRecord] - implicit val schemaEncoder: SchemaEncoder[MyRecord] = - Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) -} +object Schema extends App: -import MyRecord._ + case class MyRecord(a: Int, b: String, c: Option[Long]) + object MyRecord { + given schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + } -val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false) + val parquetSchema = MyRecord.schemaEncoder.encode(MyRecord.schema, "my_record", optional = false) -println(parquetSchema) -// Outputs: -// required group my_record { -// required int32 a (INTEGER(32,true)); -// required binary b (STRING); -// optional int64 c (INTEGER(64,true)); -// } + println(parquetSchema) + // Outputs: + // required group my_record { + // required int32 a (INTEGER(32,true)); + // required binary b (STRING); + // optional int64 c (INTEGER(64,true)); + // } ``` -Alternatively, you can override the schemas of some fields in your record by defining a custom `SchemaEncoder` -and using `SchemaEncoderDeriver.summoned` deriver. +Alternatively, you can customize the schemas of [primitive](https://zio.dev/zio-schema/standard-type-reference) fields within your record by defining a custom `SchemaEncoder` +and using the `SchemaEncoderDeriver.summoned` deriver. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 import me.mnedokushev.zio.apache.parquet.core.Schemas import zio.schema._ import me.mnedokushev.zio.apache.parquet.core.codec._ -case class MyRecord(a: Int, b: String, c: Option[Long]) - -object MyRecord { - implicit val schema: Schema[MyRecord] = - DeriveSchema.gen[MyRecord] - implicit val intEncoder: SchemaEncoder[Int] = new SchemaEncoder[Int] { - override def encode(schema: Schema[Int], name: String, optional: Boolean) = - Schemas.uuid.optionality(optional).named(name) - } - implicit val schemaEncoder: SchemaEncoder[MyRecord] = - Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned) -} - -import MyRecord._ - -val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false) - -println(parquetSchema) -// Outputs: -// required group my_record { -// required fixed_len_byte_array(16) a (UUID); -// required binary b (STRING); -// optional int64 c (INTEGER(64,true)); -// } +object SchemaSummoned extends App: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + // The custom encoder must be defined before the definition for your record type. + given SchemaEncoder[Int] with { + override def encode(schema: Schema[Int], name: String, optional: Boolean) = + Schemas.uuid.optionality(optional).named(name) + } + given schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned) + + val parquetSchema = MyRecord.schemaEncoder.encode(MyRecord.schema, "my_record", optional = false) + + println(parquetSchema) + // Outputs: + // required group my_record { + // required fixed_len_byte_array(16) a (UUID); + // required binary b (STRING); + // optional int64 c (INTEGER(64,true)); + // } ``` #### Value -There is a sealed hierarchy of `Value` types for interop between Scala values and Parquet readers/writers. +`Value` is a sealed hierarchy of types for interop between Scala values and Parquet readers/writers. For converting Scala values into `Value` and back we need to define instances of `ValueEncoder` and `ValueDecoder` type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 import zio.schema._ import me.mnedokushev.zio.apache.parquet.core.codec._ -case class MyRecord(a: Int, b: String, c: Option[Long]) +object Value extends App: -object MyRecord { - implicit val schema: Schema[MyRecord] = - DeriveSchema.gen[MyRecord] - implicit val encoder: ValueEncoder[MyRecord] = - Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) - implicit val decoder: ValueDecoder[MyRecord] = - Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) -} + case class MyRecord(a: Int, b: String, c: Option[Long]) -import MyRecord._ + object MyRecord: + given Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + given decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) -val value = encoder.encode(MyRecord(3, "zio", None)) -val record = decoder.decode(value) + val value = MyRecord.encoder.encode(MyRecord(3, "zio", None)) + val record = MyRecord.decoder.decode(value) -println(value) -// Outputs: -// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) -println(record) -// Outputs: -// MyRecord(3,zio,None) + println(value) + // Outputs: + // RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) + println(record) + // Outputs: + // MyRecord(3,zio,None) ``` -Same as for `SchemaEncoder`, you can override the schemas of some fields in your record by defining custom +Same as for `SchemaEncoder`, you can customize the codecs of primitive types by defining custom `ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 import me.mnedokushev.zio.apache.parquet.core.Value -import zio.schema._ -import me.mnedokushev.zio.apache.parquet.core.codec._ +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* import java.nio.charset.StandardCharsets -case class MyRecord(a: Int, b: String, c: Option[Long]) - -object MyRecord { - implicit val schema: Schema[MyRecord] = - DeriveSchema.gen[MyRecord] - implicit val intEncoder: ValueEncoder[Int] = new ValueEncoder[Int] { - override def encode(value: Int): Value = - Value.string(value.toString) - } - implicit val intDecoder: ValueDecoder[Int] = new ValueDecoder[Int] { - override def decode(value: Value): Int = - value match { - case Value.PrimitiveValue.BinaryValue(v) => - new String(v.getBytes, StandardCharsets.UTF_8).toInt - case other => - throw DecoderError(s"Wrong value: $other") - } - } - implicit val encoder: ValueEncoder[MyRecord] = - Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned) - implicit val decoder: ValueDecoder[MyRecord] = - Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned) -} - -import MyRecord._ - -val value = encoder.encode(MyRecord(3, "zio", None)) -val record = decoder.decode(value) - -println(value) -// Outputs: -// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) -println(record) -// Outputs: -// MyRecord(3,zio,None) +object ValueSummoned extends App: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given ValueEncoder[Int] with { + override def encode(value: Int): Value = + Value.string(value.toString) + } + given ValueDecoder[Int] with { + override def decode(value: Value): Int = + value match { + case Value.PrimitiveValue.BinaryValue(v) => + new String(v.getBytes, StandardCharsets.UTF_8).toInt + case other => + throw DecoderError(s"Wrong value: $other") + } + } + given encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned) + given decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned) + + val value = MyRecord.encoder.encode(MyRecord(3, "zio", None)) + val record = MyRecord.decoder.decode(value) + + println(value) + // Outputs: + // RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) + println(record) + // Outputs: + // MyRecord(3,zio,None) ``` ### Reading & Writing files @@ -189,55 +201,127 @@ Finally, to perform some IO operations we need to initialize `ParquetWriter` and `writeChunk`/`readChunk` or `writeStream`/`readStream` methods. ```scala -//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.4 +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 -import zio.schema._ -import me.mnedokushev.zio.apache.parquet.core.codec._ +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path } -import zio._ +import zio.* import java.nio.file.Files -case class MyRecord(a: Int, b: String, c: Option[Long]) -object MyRecord { - implicit val schema: Schema[MyRecord] = - DeriveSchema.gen[MyRecord] - implicit val schemaEncoder: SchemaEncoder[MyRecord] = - Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) - implicit val encoder: ValueEncoder[MyRecord] = - Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) - implicit val decoder: ValueDecoder[MyRecord] = - Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) -} - -val data = - Chunk( - MyRecord(1, "first", Some(11)), - MyRecord(3, "third", None) - ) - -val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet" - -Unsafe.unsafe { implicit unsafe => - Runtime.default.unsafe - .run( - (for { - writer <- ZIO.service[ParquetWriter[MyRecord]] - reader <- ZIO.service[ParquetReader[MyRecord]] - _ <- writer.writeChunk(recordsFile, data) - fromFile <- reader.readChunk(recordsFile) - _ <- Console.printLine(fromFile) - } yield ()).provide( - ParquetWriter.configured[MyRecord](), - ParquetReader.configured[MyRecord]() - ) +object ParquetIO extends ZIOAppDefault: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + given ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + given ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) + + val data = + Chunk( + MyRecord(1, "first", Some(11)), + MyRecord(3, "third", None) + ) + + val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet" + + override def run = + (for { + writer <- ZIO.service[ParquetWriter[MyRecord]] + reader <- ZIO.service[ParquetReader[MyRecord]] + _ <- writer.writeChunk(recordsFile, data) + fromFile <- reader.readChunk(recordsFile) + _ <- Console.printLine(fromFile) + } yield ()).provide( + ParquetWriter.configured[MyRecord](), + ParquetReader.configured[MyRecord]() ) - .getOrThrowFiberFailure() -} -// Outputs: -// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None)) + // Outputs: + // Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None)) ``` In the previous code snippet we used `ParquetReader.configured[A]()` to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file. -In case you need to read only part of the columns, use `ParquetReader.projected[A]()` that always will use the schema of the provided type. +In case you need to read only part of the columns, use `ParquetReader.projected[A]()`. This skips columns that are not present in the schema and reads only those that are, saving precious CPU cycles and time. + +#### Filtering + +Say goodbye to type-unsafe filter predicates such as `Col("foo") != "bar"`. The library takes advantage of an underdocumented feature in ZIO Schema - [Accessors](https://github.com/zio/zio-schema/blob/main/zio-schema/shared/src/main/scala/zio/schema/Schema.scala#L38) - the hidden pearl that allows extracting type level infromation about fields of case classes. In addition to the already provided codecs, you need to provide an instance of `TypeTag` for your record type. For this, use the `TypeTagDeriver.default` deriver. + +```scala +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import zio.* +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* +import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path } +import me.mnedokushev.zio.apache.parquet.core.filter.syntax.* +import me.mnedokushev.zio.apache.parquet.core.filter.* + +import java.nio.file.Files + +object Filtering extends ZIOAppDefault: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + // We need to provide field names using singleton types + given Schema.CaseClass3.WithFields["a", "b", "c", Int, String, Option[Long], MyRecord] = + DeriveSchema.gen[MyRecord] + given SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + given ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + given ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) + given TypeTag[MyRecord] = + Derive.derive[TypeTag, MyRecord](TypeTagDeriver.default) + + // Define accessors to use them later in the filter predicate. + // You can give any names to the accessors as we demonstrate here. + val (id, name, age) = Filter[MyRecord].columns + + val data = + Chunk( + MyRecord(1, "bob", Some(10L)), + MyRecord(2, "bob", Some(12L)), + MyRecord(3, "alice", Some(13L)), + MyRecord(4, "john", None) + ) + + val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet" + + override def run = + ( + for { + writer <- ZIO.service[ParquetWriter[MyRecord]] + reader <- ZIO.service[ParquetReader[MyRecord]] + _ <- writer.writeChunk(recordsFile, data) + fromFile <- reader.readChunkFiltered( + recordsFile, + filter( + MyRecord.id > 1 `and` ( + MyRecord.name =!= "bob" `or` + // Use .nullable syntax for optional fields. + MyRecord.age.nullable > 10L + ) + ) + ) + _ <- Console.printLine(fromFile) + } yield () + ).provide( + ParquetWriter.configured[MyRecord](), + ParquetReader.configured[MyRecord]() + ) + // Outputs: + // Chunk(MyRecord(2,bob,Some(12)),MyRecord(3,alice,Some(13)),MyRecord(4,john,None)) +``` \ No newline at end of file diff --git a/build.sbt b/build.sbt index b7baf9e..9dd32ff 100644 --- a/build.sbt +++ b/build.sbt @@ -20,8 +20,9 @@ inThisBuild( "scm:git:git@github.com:grouzen/zio-apache-parquet.git" ) ), - crossScalaVersions := Seq(Scala212, Scala213, Scala3), - githubWorkflowJavaVersions := Seq(JavaSpec.temurin("11"), JavaSpec.temurin("17")), + crossScalaVersions := Seq(Scala213, Scala3), + ThisBuild / scalaVersion := Scala3, + githubWorkflowJavaVersions := Seq(JavaSpec.temurin("11"), JavaSpec.temurin("17")), githubWorkflowPublishTargetBranches := Seq(), githubWorkflowBuildPreamble := Seq( WorkflowStep.Sbt( @@ -40,6 +41,9 @@ lazy val root = .in(file(".")) .aggregate(core) .settings(publish / skip := true) + .settings( + addCommandAlias("fmtAll", "+scalafmtAll; +scalafixAll") + ) lazy val core = project diff --git a/docs/scala-cli/Filtering.scala b/docs/scala-cli/Filtering.scala new file mode 100644 index 0000000..51236e3 --- /dev/null +++ b/docs/scala-cli/Filtering.scala @@ -0,0 +1,67 @@ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import zio.* +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* +import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path } +import me.mnedokushev.zio.apache.parquet.core.filter.syntax.* +import me.mnedokushev.zio.apache.parquet.core.filter.* + +import java.nio.file.Files + +object Filtering extends ZIOAppDefault: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + // We need to provide field names using singleton types + given Schema.CaseClass3.WithFields["a", "b", "c", Int, String, Option[Long], MyRecord] = + DeriveSchema.gen[MyRecord] + given SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + given ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + given ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) + given TypeTag[MyRecord] = + Derive.derive[TypeTag, MyRecord](TypeTagDeriver.default) + + // Define accessors to use them later in the filter predicate. + // You can give any names to the accessors as we demonstrate here. + val (id, name, age) = Filter[MyRecord].columns + + val data = + Chunk( + MyRecord(1, "bob", Some(10L)), + MyRecord(2, "bob", Some(12L)), + MyRecord(3, "alice", Some(13L)), + MyRecord(4, "john", None) + ) + + val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet" + + override def run = + ( + for { + writer <- ZIO.service[ParquetWriter[MyRecord]] + reader <- ZIO.service[ParquetReader[MyRecord]] + _ <- writer.writeChunk(recordsFile, data) + fromFile <- reader.readChunkFiltered( + recordsFile, + filter( + MyRecord.id > 1 `and` ( + MyRecord.name =!= "bob" `or` + // Use .nullable syntax for optional fields. + MyRecord.age.nullable > 10L + ) + ) + ) + _ <- Console.printLine(fromFile) + } yield () + ).provide( + ParquetWriter.configured[MyRecord](), + ParquetReader.configured[MyRecord]() + ) + // Outputs: + // Chunk(MyRecord(2,bob,Some(12)),MyRecord(3,alice,Some(13)),MyRecord(4,john,None)) diff --git a/docs/scala-cli/ParquetIO.scala b/docs/scala-cli/ParquetIO.scala new file mode 100644 index 0000000..058fcfb --- /dev/null +++ b/docs/scala-cli/ParquetIO.scala @@ -0,0 +1,45 @@ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* +import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path } +import zio.* + +import java.nio.file.Files + +object ParquetIO extends ZIOAppDefault: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + given ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + given ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) + + val data = + Chunk( + MyRecord(1, "first", Some(11)), + MyRecord(3, "third", None) + ) + + val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet" + + override def run = + (for { + writer <- ZIO.service[ParquetWriter[MyRecord]] + reader <- ZIO.service[ParquetReader[MyRecord]] + _ <- writer.writeChunk(recordsFile, data) + fromFile <- reader.readChunk(recordsFile) + _ <- Console.printLine(fromFile) + } yield ()).provide( + ParquetWriter.configured[MyRecord](), + ParquetReader.configured[MyRecord]() + ) + // Outputs: + // Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None)) diff --git a/docs/scala-cli/Schema.scala b/docs/scala-cli/Schema.scala new file mode 100644 index 0000000..a12b483 --- /dev/null +++ b/docs/scala-cli/Schema.scala @@ -0,0 +1,25 @@ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* + +object Schema extends App: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + + val parquetSchema = MyRecord.schemaEncoder.encode(MyRecord.schema, "my_record", optional = false) + + println(parquetSchema) + // Outputs: + // required group my_record { + // required int32 a (INTEGER(32,true)); + // required binary b (STRING); + // optional int64 c (INTEGER(64,true)); + // } diff --git a/docs/scala-cli/SchemaSummoned.scala b/docs/scala-cli/SchemaSummoned.scala new file mode 100644 index 0000000..543e5f2 --- /dev/null +++ b/docs/scala-cli/SchemaSummoned.scala @@ -0,0 +1,31 @@ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import me.mnedokushev.zio.apache.parquet.core.Schemas +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* + +object SchemaSummoned extends App: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + // The custom encoder must be defined before the definition for your record type. + given SchemaEncoder[Int] with { + override def encode(schema: Schema[Int], name: String, optional: Boolean) = + Schemas.uuid.optionality(optional).named(name) + } + given schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned) + + val parquetSchema = MyRecord.schemaEncoder.encode(MyRecord.schema, "my_record", optional = false) + + println(parquetSchema) + // Outputs: + // required group my_record { + // required fixed_len_byte_array(16) a (UUID); + // required binary b (STRING); + // optional int64 c (INTEGER(64,true)); + // } diff --git a/docs/scala-cli/Value.scala b/docs/scala-cli/Value.scala new file mode 100644 index 0000000..908bf15 --- /dev/null +++ b/docs/scala-cli/Value.scala @@ -0,0 +1,27 @@ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* + +object Value extends App: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + given decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) + + val value = MyRecord.encoder.encode(MyRecord(3, "zio", None)) + val record = MyRecord.decoder.decode(value) + + println(value) + // Outputs: + // RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) + println(record) + // Outputs: + // MyRecord(3,zio,None) diff --git a/docs/scala-cli/ValueSummoned.scala b/docs/scala-cli/ValueSummoned.scala new file mode 100644 index 0000000..a3f8b5f --- /dev/null +++ b/docs/scala-cli/ValueSummoned.scala @@ -0,0 +1,43 @@ +//> using scala "3.5.0" +//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0 + +import me.mnedokushev.zio.apache.parquet.core.Value +import zio.schema.* +import me.mnedokushev.zio.apache.parquet.core.codec.* + +import java.nio.charset.StandardCharsets + +object ValueSummoned extends App: + + case class MyRecord(a: Int, b: String, c: Option[Long]) + + object MyRecord: + given Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + given ValueEncoder[Int] with { + override def encode(value: Int): Value = + Value.string(value.toString) + } + given ValueDecoder[Int] with { + override def decode(value: Value): Int = + value match { + case Value.PrimitiveValue.BinaryValue(v) => + new String(v.getBytes, StandardCharsets.UTF_8).toInt + case other => + throw DecoderError(s"Wrong value: $other") + } + } + given encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned) + given decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned) + + val value = MyRecord.encoder.encode(MyRecord(3, "zio", None)) + val record = MyRecord.decoder.decode(value) + + println(value) + // Outputs: + // RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) + println(record) + // Outputs: + // MyRecord(3,zio,None) diff --git a/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/ColumnPathConcatMacro.scala b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/ColumnPathConcatMacro.scala new file mode 100644 index 0000000..2409eb3 --- /dev/null +++ b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/ColumnPathConcatMacro.scala @@ -0,0 +1,36 @@ +package me.mnedokushev.zio.apache.parquet.core.filter.internal + +import me.mnedokushev.zio.apache.parquet.core.filter.Column + +import scala.reflect.macros.blackbox + +class ColumnPathConcatMacro(val c: blackbox.Context) extends MacroUtils(c) { + import c.universe._ + + def concatImpl[A, B, F](parent: Expr[Column[A]], child: Expr[Column.Named[B, F]])(implicit + ptt: c.WeakTypeTag[A], + ftt: c.WeakTypeTag[F] + ): Tree = { + val childField = getSingletonTypeName(ftt.tpe) + val parentFields = ptt.tpe.members.collect { + case p: TermSymbol if p.isCaseAccessor && !p.isMethod => p.name.toString.trim + }.toList + + if (parentFields.exists(_ == childField)) { + val pathTermName = "path" + val dotStringLiteral = "." + val concatExpr = + q"${parent.tree}.${TermName(pathTermName)} + ${Literal(Constant(dotStringLiteral))} + ${child.tree}.${TermName(pathTermName)}" + + q"_root_.me.mnedokushev.zio.apache.parquet.core.filter.Column.Named($concatExpr)" + } else + c.abort(c.enclosingPosition, s"Parent column doesn't contain a column named '$childField'") + } + + private def getSingletonTypeName(tpe: Type): String = + tpe match { + case ConstantType(Constant(name)) => name.toString + case _ => c.abort(c.enclosingPosition, s"Couldn't get a name of a singleton type ${showRaw(tpe)}") + } + +} diff --git a/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/MacroUtils.scala b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/MacroUtils.scala new file mode 100644 index 0000000..144a61f --- /dev/null +++ b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/MacroUtils.scala @@ -0,0 +1,17 @@ +package me.mnedokushev.zio.apache.parquet.core.filter.internal + +import scala.reflect.macros.blackbox + +abstract class MacroUtils(c: blackbox.Context) { + + import c.universe._ + + private def debugEnabled: Boolean = true + + implicit class Debugged[A](self: A) { + def debugged(): Unit = + if (debugEnabled) + c.info(c.enclosingPosition, s"tree=${showRaw(self)}", force = true) + } + +} diff --git a/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/SanitizeOptionalsMacro.scala b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/SanitizeOptionalsMacro.scala new file mode 100644 index 0000000..2189d84 --- /dev/null +++ b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/internal/SanitizeOptionalsMacro.scala @@ -0,0 +1,64 @@ +package me.mnedokushev.zio.apache.parquet.core.filter.internal + +import me.mnedokushev.zio.apache.parquet.core.filter.Predicate + +import scala.reflect.macros.blackbox + +class SanitizeOptionalsMacro(val c: blackbox.Context) extends MacroUtils(c) { + import c.universe._ + + def sanitizeImpl[A](predicate: Expr[Predicate[A]])(ptt: c.WeakTypeTag[A]): Tree = { + + // Example of a tree for A type: + // RefinedType( + // List( + // RefinedType( + // List( + // TypeRef( + // ThisType(java.lang), + // java.lang.String, + // List() + // ), + // TypeRef( + // ThisType(scala), + // scala.Option, + // List( + // TypeRef( + // ThisType(scala), + // scala.Int, + // List() + // ) + // ) + // ) + // ), + // Scope() + // ), + // TypeRef(ThisType(scala), scala.Int, List()) + // ), + // Scope() + // ) + // TODO: rewrite using limited stack for safety + def containsOptionalValue(tpe: Type): Boolean = + tpe match { + case RefinedType(tpes, _) => + tpes.exists(containsOptionalValue) + case TypeRef(_, sym, _) => + List("scala.Option", "scala.Some", "scala.None").contains(sym.fullName) + case _ => + false + } + + if (containsOptionalValue(ptt.tpe)) + c.abort( + c.enclosingPosition, + s""" + | The use of optional columns in filter predicate is prohibited. Please, use .nullable: + | column.nullable > 3 + | Predicate: ${predicate.tree} + """.stripMargin + ) + else + q"_root_.me.mnedokushev.zio.apache.parquet.core.filter.Predicate.compile0($predicate)" + } + +} diff --git a/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/syntax.scala b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/syntax.scala new file mode 100644 index 0000000..7ff27f9 --- /dev/null +++ b/modules/core/src/main/scala-2.13/me/mnedokushev/zio/apache/parquet/core/filter/syntax.scala @@ -0,0 +1,21 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import me.mnedokushev.zio.apache.parquet.core.Lens +import me.mnedokushev.zio.apache.parquet.core.filter.CompiledPredicate +import me.mnedokushev.zio.apache.parquet.core.filter.internal.{ ColumnPathConcatMacro, SanitizeOptionalsMacro } + +package object syntax extends Predicate.Syntax { + + implicit class NullableColumnSyntax[F, S, A](val column: Lens[F, S, Option[A]]) { + def nullable(implicit typeTag: TypeTag[A]): Column.Named[A, column.Identity] = + Column.Named(column.path) + } + + def filter[A](predicate: Predicate[A]): CompiledPredicate = macro SanitizeOptionalsMacro.sanitizeImpl[A] + + def concat[A, B, F]( + parent: Column[A], + child: Column.Named[B, F] + ): Column[B] = macro ColumnPathConcatMacro.concatImpl[A, B, F] + +} diff --git a/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/internal/ColumnPathConcatMacro.scala b/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/internal/ColumnPathConcatMacro.scala new file mode 100644 index 0000000..880c1d7 --- /dev/null +++ b/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/internal/ColumnPathConcatMacro.scala @@ -0,0 +1,34 @@ +package me.mnedokushev.zio.apache.parquet.core.filter.internal + +import me.mnedokushev.zio.apache.parquet.core.filter.{ Column, TypeTag } + +import scala.quoted.* + +object ColumnPathConcatMacro { + + def concatImpl[A: Type, B: Type, F: Type]( + parent: Expr[Column[A]], + child: Expr[Column.Named[B, F]], + childTypeTag: Expr[TypeTag[B]] + )(using + Quotes + ): Expr[Column[B]] = { + import quotes.reflect.* + + val childField = TypeRepr.of[F] match { + case ConstantType(StringConstant(name)) => + name + case tpe => + report.errorAndAbort(s"Couldn't get a name of a singleton type $tpe") + } + val parentFields = TypeRepr.of[A].typeSymbol.caseFields.map(_.name) + + if (parentFields.contains(childField)) { + val concatExpr = '{ ${ parent }.path + "." + ${ child }.path } + + '{ me.mnedokushev.zio.apache.parquet.core.filter.Column.Named[B, F]($concatExpr)(using $childTypeTag) } + } else + report.errorAndAbort(s"Parent column doesn't contain a column named '$childField'") + } + +} diff --git a/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/internal/SanitizeOptionalsMacro.scala b/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/internal/SanitizeOptionalsMacro.scala new file mode 100644 index 0000000..b07dd08 --- /dev/null +++ b/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/internal/SanitizeOptionalsMacro.scala @@ -0,0 +1,51 @@ +package me.mnedokushev.zio.apache.parquet.core.filter.internal + +import me.mnedokushev.zio.apache.parquet.core.filter.{ CompiledPredicate, Predicate } +import org.apache.parquet.filter2.predicate.FilterPredicate + +import scala.quoted.* + +object SanitizeOptionalsMacro { + + // TODO: tests + def sanitizeImpl[A: Type](predicate: Expr[Predicate[A]])(using Quotes): Expr[CompiledPredicate] = { + import quotes.reflect.* + + // Example of a type representation of A type: + // AndType( + // AndType( + // TypeRef(TermRef(ThisType(TypeRef(NoPrefix(), "scala")), "Predef"), "String"), + // AppliedType( + // TypeRef(TermRef(ThisType(TypeRef(NoPrefix(), "")), "scala"), "Option"), + // List( + // TypeRef(TermRef(ThisType(TypeRef(NoPrefix(), "")), "scala"), "Int") + // ) + // ) + // ), + // TypeRef(TermRef(ThisType(TypeRef(NoPrefix(), "")), "scala"), "Int") + // ) + // TODO: rewrite using limited stack for safety + def containsOptionalValue(tpe: TypeRepr): Boolean = + tpe match { + case AndType(a, b) => + containsOptionalValue(a) || containsOptionalValue(b) + case AppliedType(tpe, _) => + containsOptionalValue(tpe) + case TypeRef(_, name) => + List("Option", "Some", "None").contains(name) + } + + if (containsOptionalValue(TypeRepr.of[A])) + report.errorAndAbort( + s""" + | The use of optional columns in filter predicate is prohibited. Please, use .nullable: + | column.nullable > 3 + | Predicate tree: ${predicate.show} + """.stripMargin + ) + else + '{ _root_.me.mnedokushev.zio.apache.parquet.core.filter.Predicate.compile0($predicate) } + + } + +} diff --git a/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/syntax.scala b/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/syntax.scala new file mode 100644 index 0000000..a7e78f2 --- /dev/null +++ b/modules/core/src/main/scala-3/me/mnedokushev/zio/apache/parquet/core/filter/syntax.scala @@ -0,0 +1,21 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import me.mnedokushev.zio.apache.parquet.core.Lens +import me.mnedokushev.zio.apache.parquet.core.filter.internal.{ ColumnPathConcatMacro, SanitizeOptionalsMacro } + +package object syntax extends Predicate.Syntax { + + extension [F, S, A](column: Lens[F, S, Option[A]]) { + def nullable(implicit typeTag: TypeTag[A]): Column.Named[A, column.Identity] = + Column.Named(column.path) + } + + inline def filter[A](inline predicate: Predicate[A]): CompiledPredicate = + ${ SanitizeOptionalsMacro.sanitizeImpl[A]('predicate) } + + inline def concat[A, B, F](inline parent: Column[A], inline child: Column.Named[B, F])(using + ctt: TypeTag[B] + ): Column[B] = + ${ ColumnPathConcatMacro.concatImpl[A, B, F]('parent, 'child, 'ctt) } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala index 42973a8..b8155e6 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala @@ -301,27 +301,32 @@ object Value { } def offsetTime(v: OffsetTime) = { - val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR - val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR - val dayMillis = timeMillis - offsetMillis + val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR + val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR + val timeOffsetMillis = timeMillis - offsetMillis + val dayMillis = if (timeOffsetMillis < 0) MILLIS_PER_DAY - timeOffsetMillis else timeOffsetMillis int(dayMillis.toInt) } def offsetDateTime(v: OffsetDateTime) = { - val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY - val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR - val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR - val epochMillis = dateMillis + timeMillis - offsetMillis + val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY + val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR + val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR + val timeOffsetMillis = timeMillis - offsetMillis + val dayMillis = if (timeOffsetMillis < 0) MILLIS_PER_DAY - timeOffsetMillis else timeOffsetMillis + val epochMillis = dateMillis + dayMillis long(epochMillis) } def zonedDateTime(v: ZonedDateTime) = { - val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY - val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR - val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR - val epochMillis = dateMillis + timeMillis - offsetMillis + val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY + val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR + val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR + val timeOffsetMillis = timeMillis - offsetMillis + val dayMillis = if (timeOffsetMillis < 0) MILLIS_PER_DAY - timeOffsetMillis else timeOffsetMillis + val epochMillis = dateMillis + dayMillis long(epochMillis) } diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Column.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Column.scala new file mode 100644 index 0000000..33e8704 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Column.scala @@ -0,0 +1,50 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +trait Column[A] { self => + + type Identity + + val path: String + val typeTag: TypeTag[A] + + // TODO: overcome the limitation of scala macros for having a better API + // I found out the compiler throws an error that macro is not found as + // the macro itself depends on Column. The only option is to move the definition + // of "concat" outside the Column class. + // def /[B](child: Column[B]): Column[B] = + // ColumnPathConcatMacro.concatImpl[A, B] + + def >(value: A)(implicit ev: OperatorSupport.LtGt[A]): Predicate[A] = + Predicate.Binary(self, value, Operator.Binary.GreaterThen()) + + def <(value: A)(implicit ev: OperatorSupport.LtGt[A]): Predicate[A] = + Predicate.Binary(self, value, Operator.Binary.LessThen()) + + def >=(value: A)(implicit ev: OperatorSupport.LtGt[A]): Predicate[A] = + Predicate.Binary(self, value, Operator.Binary.GreaterEq()) + + def <=(value: A)(implicit ev: OperatorSupport.LtGt[A]): Predicate[A] = + Predicate.Binary(self, value, Operator.Binary.LessEq()) + + def ===(value: A)(implicit ev: OperatorSupport.EqNotEq[A]): Predicate[A] = + Predicate.Binary(self, value, Operator.Binary.Eq()) + + def =!=(value: A)(implicit ev: OperatorSupport.EqNotEq[A]): Predicate[A] = + Predicate.Binary(self, value, Operator.Binary.NotEq()) + + def in(values: Set[A])(implicit ev: OperatorSupport.EqNotEq[A]): Predicate[A] = + Predicate.BinarySet(self, values, Operator.Binary.Set.In()) + + def notIn(values: Set[A])(implicit ev: OperatorSupport.EqNotEq[A]): Predicate[A] = + Predicate.BinarySet(self, values, Operator.Binary.Set.NotIn()) + +} + +object Column { + + final case class Named[A: TypeTag, Identity0](path: String) extends Column[A] { + override type Identity = Identity0 + override val typeTag: TypeTag[A] = implicitly[TypeTag[A]] + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/ExprAccessorBuilder.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/ExprAccessorBuilder.scala new file mode 100644 index 0000000..999d6db --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/ExprAccessorBuilder.scala @@ -0,0 +1,26 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import zio.schema.{ AccessorBuilder, Schema } + +final class ExprAccessorBuilder(typeTags: Map[String, TypeTag[?]]) extends AccessorBuilder { + + override type Lens[F, S, A] = Column.Named[A, F] + + override type Prism[F, S, A] = Unit + + override type Traversal[S, A] = Unit + + override def makeLens[F, S, A](product: Schema.Record[S], term: Schema.Field[S, A]): Column.Named[A, F] = { + val name = term.name.toString + implicit val typeTag = typeTags(name).asInstanceOf[TypeTag[A]] + + Column.Named[A, F](name) + } + + override def makePrism[F, S, A](sum: Schema.Enum[S], term: Schema.Case[S, A]): Prism[F, S, A] = + () + + override def makeTraversal[S, A](collection: Schema.Collection[S, A], element: Schema[A]): Traversal[S, A] = + () + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Filter.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Filter.scala new file mode 100644 index 0000000..1baa650 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Filter.scala @@ -0,0 +1,35 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import me.mnedokushev.zio.apache.parquet.core.{ Lens, Prism, Traversal } +import zio.schema._ + +trait Filter { + + type Columns + + val columns: Columns + +} + +object Filter { + + type Aux[Columns0] = Filter { + type Columns = Columns0 + } + + def apply[A](implicit + schema: Schema[A], + typeTag: TypeTag[A] + ): Filter.Aux[schema.Accessors[Lens, Prism, Traversal]] = + new Filter { + val accessorBuilder = + new ExprAccessorBuilder(typeTag.asInstanceOf[TypeTag.Record[A]].columns) + + override type Columns = + schema.Accessors[accessorBuilder.Lens, accessorBuilder.Prism, accessorBuilder.Traversal] + + override val columns: Columns = + schema.makeAccessors(accessorBuilder) + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/FilterError.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/FilterError.scala new file mode 100644 index 0000000..41ad3ab --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/FilterError.scala @@ -0,0 +1,6 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +final case class FilterError( + message: String, + cause: Option[Throwable] = None +) extends IllegalArgumentException(message, cause.getOrElse(new Throwable())) diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Operator.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Operator.scala new file mode 100644 index 0000000..93a74d2 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Operator.scala @@ -0,0 +1,59 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +sealed trait Operator + +object Operator { + + sealed trait Binary[A] extends Operator { + def operatorSupport: OperatorSupport[A] + } + + object Binary { + final case class Eq[A: OperatorSupport.EqNotEq]() extends Binary[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.EqNotEq[A]] + } + final case class NotEq[A: OperatorSupport.EqNotEq]() extends Binary[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.EqNotEq[A]] + } + final case class LessThen[A: OperatorSupport.LtGt]() extends Binary[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.LtGt[A]] + } + final case class LessEq[A: OperatorSupport.LtGt]() extends Binary[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.LtGt[A]] + } + final case class GreaterThen[A: OperatorSupport.LtGt]() extends Binary[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.LtGt[A]] + } + final case class GreaterEq[A: OperatorSupport.LtGt]() extends Binary[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.LtGt[A]] + } + + sealed trait Set[A] extends Binary[A] + + object Set { + + final case class In[A: OperatorSupport.EqNotEq]() extends Set[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.EqNotEq[A]] + } + final case class NotIn[A: OperatorSupport.EqNotEq]() extends Set[A] { + override def operatorSupport: OperatorSupport[A] = implicitly[OperatorSupport.EqNotEq[A]] + } + + } + + } + + sealed trait Unary[A] extends Operator + + object Unary { + final case class Not[A]() extends Unary[A] + } + + sealed trait Logical[A, B] extends Operator + + object Logical { + final case class And[A, B]() extends Logical[A, B] + final case class Or[A, B]() extends Logical[A, B] + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/OperatorSupport.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/OperatorSupport.scala new file mode 100644 index 0000000..2291742 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/OperatorSupport.scala @@ -0,0 +1,116 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import zio.{ Chunk, Duration } + +import java.time.{ + DayOfWeek, + Instant, + LocalDate, + LocalDateTime, + LocalTime, + Month, + MonthDay, + OffsetDateTime, + OffsetTime, + Period, + Year, + YearMonth, + ZoneId, + ZoneOffset, + ZonedDateTime +} +import java.util.UUID +import scala.annotation.implicitNotFound + +sealed trait OperatorSupport[A] + +object OperatorSupport { + + trait Optional[A, S[_] <: OperatorSupport[?]] { + val operatorSupport: S[A] + } + + @implicitNotFound("You can't use this operator for the type ${A}") + abstract class LtGt[A: TypeTag] extends OperatorSupport[A] { + val typeTag: TypeTag[A] = implicitly[TypeTag[A]] + } + + object LtGt { + + implicit def optional[A: TypeTag: LtGt]: LtGt[Option[A]] = + new LtGt[Option[A]] with Optional[A, LtGt] { + override val operatorSupport: LtGt[A] = implicitly[LtGt[A]] + } + + implicit case object byte extends LtGt[Byte] + implicit case object short extends LtGt[Short] + implicit case object int extends LtGt[Int] + implicit case object long extends LtGt[Long] + implicit case object float extends LtGt[Float] + implicit case object double extends LtGt[Double] + implicit case object bigDecimal extends LtGt[java.math.BigDecimal] + implicit case object bigInteger extends LtGt[java.math.BigInteger] + implicit case object dayOfWeek extends LtGt[DayOfWeek] + implicit case object month extends LtGt[Month] + implicit case object monthDay extends LtGt[MonthDay] + implicit case object period extends LtGt[Period] + implicit case object year extends LtGt[Year] + implicit case object yearMonth extends LtGt[YearMonth] + implicit case object duration extends LtGt[Duration] + implicit case object instant extends LtGt[Instant] + implicit case object localDate extends LtGt[LocalDate] + implicit case object localTime extends LtGt[LocalTime] + implicit case object localDateTime extends LtGt[LocalDateTime] + implicit case object offsetTime extends LtGt[OffsetTime] + implicit case object offsetDateTime extends LtGt[OffsetDateTime] + implicit case object zonedDateTime extends LtGt[ZonedDateTime] + + } + + @implicitNotFound("You can't use this operator for the type ${A}") + abstract class EqNotEq[A: TypeTag] extends OperatorSupport[A] { + val typeTag: TypeTag[A] = implicitly[TypeTag[A]] + } + + object EqNotEq { + + implicit def enum0[A: TypeTag]: EqNotEq[A] = new EqNotEq[A] {} + + implicit def optional[A: TypeTag: EqNotEq]: EqNotEq[Option[A]] = + new EqNotEq[Option[A]] with Optional[A, EqNotEq] { + override val operatorSupport: EqNotEq[A] = implicitly[EqNotEq[A]] + } + + implicit case object string extends EqNotEq[String] + implicit case object boolean extends EqNotEq[Boolean] + implicit case object byte extends EqNotEq[Byte] + implicit case object short extends EqNotEq[Short] + implicit case object int extends EqNotEq[Int] + implicit case object long extends EqNotEq[Long] + implicit case object float extends EqNotEq[Float] + implicit case object double extends EqNotEq[Double] + implicit case object binary extends EqNotEq[Chunk[Byte]] + implicit case object char extends EqNotEq[Char] + implicit case object uuid extends EqNotEq[UUID] + implicit case object bigDecimal extends EqNotEq[java.math.BigDecimal] + implicit case object bigInteger extends EqNotEq[java.math.BigInteger] + implicit case object dayOfWeek extends EqNotEq[DayOfWeek] + implicit case object month extends EqNotEq[Month] + implicit case object monthDay extends EqNotEq[MonthDay] + implicit case object period extends EqNotEq[Period] + implicit case object year extends EqNotEq[Year] + implicit case object yearMonth extends EqNotEq[YearMonth] + implicit case object zoneId extends EqNotEq[ZoneId] + implicit case object zoneOffset extends EqNotEq[ZoneOffset] + implicit case object duration extends EqNotEq[Duration] + implicit case object instant extends EqNotEq[Instant] + implicit case object localDate extends EqNotEq[LocalDate] + implicit case object localTime extends EqNotEq[LocalTime] + implicit case object localDateTime extends EqNotEq[LocalDateTime] + implicit case object offsetTime extends EqNotEq[OffsetTime] + implicit case object offsetDateTime extends EqNotEq[OffsetDateTime] + implicit case object zonedDateTime extends EqNotEq[ZonedDateTime] + + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Predicate.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Predicate.scala new file mode 100644 index 0000000..387dc13 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/Predicate.scala @@ -0,0 +1,124 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import org.apache.parquet.filter2.predicate.{ FilterApi, FilterPredicate, Operators } +import zio.prelude._ + +sealed trait Predicate[A] { self => + + def and[B](other: Predicate[B]): Predicate[A & B] = + Predicate.Logical[A, B](self, other, Operator.Logical.And[A, B]()) + + def or[B](other: Predicate[B]): Predicate[A & B] = + Predicate.Logical[A, B](self, other, Operator.Logical.Or[A, B]()) + +} + +object Predicate { + + private[filter] trait Syntax { + def not[A](pred: Predicate[A]) = + Predicate.Unary(pred, Operator.Unary.Not[A]()) + } + + final case class Binary[A](column: Column[A], value: A, op: Operator.Binary[A]) extends Predicate[A] + + final case class BinarySet[A](column: Column[A], values: Set[A], op: Operator.Binary.Set[A]) extends Predicate[A] + + final case class Unary[A](predicate: Predicate[A], op: Operator.Unary[A]) extends Predicate[A] + + final case class Logical[A, B](left: Predicate[A], right: Predicate[B], op: Operator.Logical[A, B]) + extends Predicate[A & B] + + private[zio] def compile0[A](predicate: Predicate[A]): Either[String, FilterPredicate] = { + + def error(op: Operator) = + Left(s"Operator $op is not supported by $predicate") + + def binarySet[T <: Comparable[T], C <: Operators.Column[T] & Operators.SupportsEqNotEq]( + column: C, + values: java.util.Set[T], + op: Operator.Binary.Set[?] + ) = + op match { + case Operator.Binary.Set.In() => + Right(FilterApi.in(column, values)) + case Operator.Binary.Set.NotIn() => + Right(FilterApi.notIn(column, values)) + } + + predicate match { + case Predicate.Unary(predicate0, op) => + op match { + case Operator.Unary.Not() => + compile0(predicate0).map(FilterApi.not) + } + case Predicate.Logical(left, right, op) => + (compile0(left) <*> compile0(right)).map { case (left0, right0) => + op match { + case Operator.Logical.And() => + FilterApi.and(left0, right0) + case Operator.Logical.Or() => + FilterApi.or(left0, right0) + } + } + case Predicate.Binary(column, value, op) => + column.typeTag match { + case typeTag: TypeTag.EqNotEq[_] => + val typeTag0 = typeTag.cast[A] + val column0 = typeTag0.column(column.path) + val value0 = typeTag0.value(value) + + op match { + case Operator.Binary.Eq() => + Right(FilterApi.eq(column0, value0)) + case Operator.Binary.NotEq() => + Right(FilterApi.notEq(column0, value0)) + case _ => + error(op) + } + case typeTag: TypeTag.LtGt[_] => + val typeTag0 = typeTag.cast[A] + val column0 = typeTag0.column(column.path) + val value0 = typeTag0.value(value) + + op match { + case Operator.Binary.Eq() => + Right(FilterApi.eq(column0, value0)) + case Operator.Binary.NotEq() => + Right(FilterApi.notEq(column0, value0)) + case Operator.Binary.LessThen() => + Right(FilterApi.lt(column0, value0)) + case Operator.Binary.LessEq() => + Right(FilterApi.ltEq(column0, value0)) + case Operator.Binary.GreaterThen() => + Right(FilterApi.gt(column0, value0)) + case Operator.Binary.GreaterEq() => + Right(FilterApi.gtEq(column0, value0)) + case _ => + error(op) + } + case _ => + error(op) + } + case Predicate.BinarySet(column, values, op) => + column.typeTag match { + case typeTag: TypeTag.EqNotEq[_] => + val typeTag0 = typeTag.cast[A] + val column0 = typeTag0.column(column.path) + val values0 = typeTag0.values(values) + + binarySet(column0, values0, op) + case typeTag: TypeTag.LtGt[_] => + val typeTag0 = typeTag.cast[A] + val column0 = typeTag0.column(column.path) + val values0 = typeTag0.values(values) + + binarySet(column0, values0, op) + case _ => + error(op) + } + } + + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/TypeTag.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/TypeTag.scala new file mode 100644 index 0000000..59ed94b --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/TypeTag.scala @@ -0,0 +1,271 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import _root_.java.time.Instant +import me.mnedokushev.zio.apache.parquet.core.Value +import me.mnedokushev.zio.apache.parquet.core.filter.TypeTag.{ Dummy, EqNotEq, LtGt, Optional, Record } +import org.apache.parquet.filter2.predicate.{ FilterApi, Operators } +import org.apache.parquet.io.api.Binary +import zio.{ Chunk, Duration } + +import java.time.{ + DayOfWeek, + LocalDate, + LocalDateTime, + LocalTime, + Month, + MonthDay, + OffsetDateTime, + OffsetTime, + Period, + Year, + YearMonth, + ZoneId, + ZoneOffset, + ZonedDateTime +} +import java.util.UUID +import scala.jdk.CollectionConverters._ + +sealed trait TypeTag[+A] { self => + + override def toString: String = + self match { + case _: Dummy[_] => "Dummy[A]" + case _: Optional[_] => "Optional[A]" + case _: Record[_] => "Record[A]" + case _: EqNotEq[_] => "EqNotEq[A]" + case _: LtGt[_] => "LtGt[A]" + } + +} + +object TypeTag { + + trait Dummy[+A] extends TypeTag[A] + + def dummy[A]: TypeTag.Dummy[A] = + new Dummy[A] {} + + final case class Optional[+A: TypeTag]() extends TypeTag[Option[A]] { + val typeTag: TypeTag[A] = implicitly[TypeTag[A]] + } + + implicit def optional[A: TypeTag]: TypeTag[Option[A]] = + Optional[A]() + + final case class Record[+A](columns: Map[String, TypeTag[?]]) extends TypeTag[A] + + trait EqNotEq[A] extends TypeTag[A] { self => + type T <: Comparable[T] + type C <: Operators.Column[T] & Operators.SupportsEqNotEq + + def cast[A0]: EqNotEq[A0] = self.asInstanceOf[EqNotEq[A0]] + + def column(path: String): C + def value(v: A): T + def values(vs: Set[A]): java.util.Set[T] = + vs.map(value).asJava + } + + trait LtGt[A] extends TypeTag[A] { self => + type T <: Comparable[T] + type C <: Operators.Column[T] & Operators.SupportsLtGt + + def cast[A0]: LtGt[A0] = self.asInstanceOf[LtGt[A0]] + + def column(path: String): C + def value(v: A): T + def values(vs: Set[A]): java.util.Set[T] = + vs.map(value).asJava + } + + def eqnoteq[A, T0 <: Comparable[T0], C0 <: Operators.Column[T0] & Operators.SupportsEqNotEq]( + column0: String => C0, + value0: A => T0 + ): TypeTag.EqNotEq[A] = + new TypeTag.EqNotEq[A] { + + override type T = T0 + + override type C = C0 + + override def column(path: String): C = + column0(path) + + override def value(v: A): T = + value0(v) + + } + + def ltgt[A, T0 <: Comparable[T0], C0 <: Operators.Column[T0] & Operators.SupportsLtGt]( + column0: String => C0, + value0: A => T0 + ): TypeTag.LtGt[A] = + new TypeTag.LtGt[A] { + + override type T = T0 + + override type C = C0 + + override def column(path: String): C = + column0(path) + + override def value(v: A): T = + value0(v) + + } + + def enum0[A](casesMap: Map[A, String]): TypeTag.EqNotEq[A] = + eqnoteq[A, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + v => Value.string(casesMap.getOrElse(v, throw FilterError(s"Failed to encode enum for value $v"))).value + ) + + implicit val string: TypeTag.EqNotEq[String] = + eqnoteq[String, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.string(_).value + ) + implicit val boolean: TypeTag.EqNotEq[Boolean] = + eqnoteq[Boolean, java.lang.Boolean, Operators.BooleanColumn]( + FilterApi.booleanColumn, + Value.boolean(_).value + ) + implicit val byte: TypeTag.LtGt[Byte] = + ltgt[Byte, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.byte(_).value + ) + implicit val short: TypeTag.LtGt[Short] = + ltgt[Short, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.short(_).value + ) + implicit val int: TypeTag.LtGt[Int] = + ltgt[Int, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.int(_).value + ) + implicit val long: TypeTag.LtGt[Long] = + ltgt[Long, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.long(_).value + ) + implicit val float: TypeTag.LtGt[Float] = + ltgt[Float, java.lang.Float, Operators.FloatColumn]( + FilterApi.floatColumn, + Value.float(_).value + ) + implicit val double: TypeTag.LtGt[Double] = + ltgt[Double, java.lang.Double, Operators.DoubleColumn]( + FilterApi.doubleColumn, + Value.double(_).value + ) + implicit val binary: TypeTag.EqNotEq[Chunk[Byte]] = + eqnoteq[Chunk[Byte], Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.binary(_).value + ) + implicit val char: TypeTag.EqNotEq[Char] = + eqnoteq[Char, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.char(_).value + ) + implicit val uuid: TypeTag.EqNotEq[UUID] = + eqnoteq[UUID, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.uuid(_).value + ) + implicit val bigDecimal: TypeTag.LtGt[java.math.BigDecimal] = + ltgt[java.math.BigDecimal, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.bigDecimal(_).value + ) + implicit val bigInteger: TypeTag.LtGt[java.math.BigInteger] = + ltgt[java.math.BigInteger, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.bigInteger(_).value + ) + implicit val dayOfWeek: TypeTag.LtGt[DayOfWeek] = + ltgt[DayOfWeek, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.dayOfWeek(_).value + ) + implicit val month: TypeTag.LtGt[Month] = + ltgt[Month, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.month(_).value + ) + implicit val monthDay: TypeTag.LtGt[MonthDay] = + ltgt[MonthDay, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.monthDay(_).value + ) + implicit val period: TypeTag.LtGt[Period] = + ltgt[Period, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.period(_).value + ) + implicit val year: TypeTag.LtGt[Year] = + ltgt[Year, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.year(_).value + ) + implicit val yearMonth: TypeTag.LtGt[YearMonth] = + ltgt[YearMonth, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.yearMonth(_).value + ) + // NOTE: it is not implicit to make scalac happy since ZoneOffset is a subtype of ZoneId + val zoneId: TypeTag.EqNotEq[ZoneId] = + eqnoteq[ZoneId, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.zoneId(_).value + ) + implicit val zoneOffset: TypeTag.EqNotEq[ZoneOffset] = + eqnoteq[ZoneOffset, Binary, Operators.BinaryColumn]( + FilterApi.binaryColumn, + Value.zoneOffset(_).value + ) + implicit val duration: TypeTag.LtGt[Duration] = + ltgt[Duration, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.duration(_).value + ) + implicit val instant: TypeTag.LtGt[Instant] = + ltgt[Instant, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.instant(_).value + ) + implicit val localDate: TypeTag.LtGt[LocalDate] = + ltgt[LocalDate, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.localDate(_).value + ) + implicit val localTime: TypeTag.LtGt[LocalTime] = + ltgt[LocalTime, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.localTime(_).value + ) + implicit val localDateTime: TypeTag.LtGt[LocalDateTime] = + ltgt[LocalDateTime, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.localDateTime(_).value + ) + implicit val offsetTime: TypeTag.LtGt[OffsetTime] = + ltgt[OffsetTime, java.lang.Integer, Operators.IntColumn]( + FilterApi.intColumn, + Value.offsetTime(_).value + ) + implicit val offsetDateTime: TypeTag.LtGt[OffsetDateTime] = + ltgt[OffsetDateTime, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.offsetDateTime(_).value + ) + implicit val zonedDateTime: TypeTag.LtGt[ZonedDateTime] = + ltgt[ZonedDateTime, java.lang.Long, Operators.LongColumn]( + FilterApi.longColumn, + Value.zonedDateTime(_).value + ) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/TypeTagDeriver.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/TypeTagDeriver.scala new file mode 100644 index 0000000..1175a51 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/TypeTagDeriver.scala @@ -0,0 +1,105 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import zio.Chunk +import zio.schema.{ Deriver, Schema, StandardType } + +object TypeTagDeriver { + + val default: Deriver[TypeTag] = new Deriver[TypeTag] { + + override def deriveRecord[A]( + record: Schema.Record[A], + fields: => Chunk[Deriver.WrappedF[TypeTag, ?]], + summoned: => Option[TypeTag[A]] + ): TypeTag[A] = + TypeTag.Record( + record.fields + .map(_.name.toString) + .zip(fields.map(_.unwrap)) + .toMap + ) + + override def deriveEnum[A]( + `enum`: Schema.Enum[A], + cases: => Chunk[Deriver.WrappedF[TypeTag, ?]], + summoned: => Option[TypeTag[A]] + ): TypeTag[A] = { + val casesMap = `enum`.cases.map { case0 => + case0.schema.asInstanceOf[Schema.CaseClass0[A]].defaultConstruct() -> case0.id + }.toMap + + TypeTag.enum0(casesMap) + } + + override def derivePrimitive[A]( + st: StandardType[A], + summoned: => Option[TypeTag[A]] + ): TypeTag[A] = + st match { + case StandardType.StringType => TypeTag.string + case StandardType.BoolType => TypeTag.boolean + case StandardType.ByteType => TypeTag.byte + case StandardType.ShortType => TypeTag.short + case StandardType.IntType => TypeTag.int + case StandardType.LongType => TypeTag.long + case StandardType.FloatType => TypeTag.float + case StandardType.DoubleType => TypeTag.double + case StandardType.BinaryType => TypeTag.binary + case StandardType.CharType => TypeTag.char + case StandardType.UUIDType => TypeTag.uuid + case StandardType.BigDecimalType => TypeTag.bigDecimal + case StandardType.BigIntegerType => TypeTag.bigInteger + case StandardType.DayOfWeekType => TypeTag.dayOfWeek + case StandardType.MonthType => TypeTag.month + case StandardType.MonthDayType => TypeTag.monthDay + case StandardType.PeriodType => TypeTag.period + case StandardType.YearType => TypeTag.year + case StandardType.YearMonthType => TypeTag.yearMonth + case StandardType.ZoneIdType => TypeTag.zoneId + case StandardType.ZoneOffsetType => TypeTag.zoneOffset + case StandardType.DurationType => TypeTag.duration + case StandardType.InstantType => TypeTag.instant + case StandardType.LocalDateType => TypeTag.localDate + case StandardType.LocalTimeType => TypeTag.localTime + case StandardType.LocalDateTimeType => TypeTag.localDateTime + case StandardType.OffsetTimeType => TypeTag.offsetTime + case StandardType.OffsetDateTimeType => TypeTag.offsetDateTime + case StandardType.ZonedDateTimeType => TypeTag.zonedDateTime + case _ => TypeTag.dummy[A] + } + + override def deriveOption[A]( + option: Schema.Optional[A], + inner: => TypeTag[A], + summoned: => Option[TypeTag[Option[A]]] + ): TypeTag[Option[A]] = + TypeTag.optional[A](using inner) + + override def deriveSequence[C[_], A]( + sequence: Schema.Sequence[C[A], A, ?], + inner: => TypeTag[A], + summoned: => Option[TypeTag[C[A]]] + ): TypeTag[C[A]] = + TypeTag.dummy[C[A]] + + override def deriveMap[K, V]( + map: Schema.Map[K, V], + key: => TypeTag[K], + value: => TypeTag[V], + summoned: => Option[TypeTag[Map[K, V]]] + ): TypeTag[Map[K, V]] = + TypeTag.dummy[Map[K, V]] + + override def deriveTransformedRecord[A, B]( + record: Schema.Record[A], + transform: Schema.Transform[A, B, ?], + fields: => Chunk[Deriver.WrappedF[TypeTag, ?]], + summoned: => Option[TypeTag[B]] + ): TypeTag[B] = + TypeTag.dummy[B] + + }.cached + + val summoned: Deriver[TypeTag] = default.autoAcceptSummoned + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/package.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/package.scala new file mode 100644 index 0000000..ed4e4b0 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/filter/package.scala @@ -0,0 +1,9 @@ +package me.mnedokushev.zio.apache.parquet.core + +import org.apache.parquet.filter2.predicate.FilterPredicate + +package object filter { + + type CompiledPredicate = Either[String, FilterPredicate] + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala index 1ff6c0b..8ee09d1 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala @@ -2,7 +2,9 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueDecoder } +import me.mnedokushev.zio.apache.parquet.core.filter.CompiledPredicate import org.apache.hadoop.conf.Configuration +import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.hadoop.api.{ ReadSupport => HadoopReadSupport } import org.apache.parquet.hadoop.{ ParquetReader => HadoopParquetReader } import org.apache.parquet.io.InputFile @@ -16,7 +18,11 @@ trait ParquetReader[+A <: Product] { def readStream(path: Path): ZStream[Scope, Throwable, A] - def readChunk(path: Path): Task[Chunk[A]] + def readStreamFiltered(path: Path, filter: CompiledPredicate): ZStream[Scope, Throwable, A] + + def readChunk[B](path: Path): Task[Chunk[A]] + + def readChunkFiltered[B](path: Path, filter: CompiledPredicate): Task[Chunk[A]] } @@ -29,28 +35,53 @@ final class ParquetReaderLive[A <: Product: Tag]( override def readStream(path: Path): ZStream[Scope, Throwable, A] = for { - reader <- ZStream.fromZIO(build(path)) - value <- ZStream.repeatZIOOption( - ZIO - .attemptBlockingIO(reader.read()) - .asSomeError - .filterOrFail(_ != null)(None) - .flatMap(decoder.decodeZIO(_).asSomeError) - ) + reader <- ZStream.fromZIO(build(path, None)) + value <- readStream0(reader) + } yield value + + override def readStreamFiltered(path: Path, filter: CompiledPredicate): ZStream[Scope, Throwable, A] = + for { + reader <- ZStream.fromZIO(build(path, Some(filter))) + value <- readStream0(reader) } yield value - override def readChunk(path: Path): Task[Chunk[A]] = + override def readChunk[B](path: Path): Task[Chunk[A]] = + ZIO.scoped( + for { + reader <- build(path, None) + result <- readChunk0(reader) + } yield result + ) + + override def readChunkFiltered[B](path: Path, filter: CompiledPredicate): Task[Chunk[A]] = + ZIO.scoped( + for { + reader <- build(path, Some(filter)) + result <- readChunk0(reader) + } yield result + ) + + private def readStream0(reader: HadoopParquetReader[RecordValue]): ZStream[Any, Throwable, A] = + ZStream.repeatZIOOption( + ZIO + .attemptBlockingIO(reader.read()) + .asSomeError + .filterOrFail(_ != null)(None) + .flatMap(decoder.decodeZIO(_).asSomeError) + ) + + private def readChunk0[B](reader: HadoopParquetReader[RecordValue]): Task[Chunk[A]] = { + val readNext = for { + value <- ZIO.attemptBlockingIO(reader.read()) + record <- if (value != null) + decoder.decodeZIO(value) + else + ZIO.succeed(null.asInstanceOf[A]) + } yield record + val builder = Chunk.newBuilder[A] + ZIO.scoped( for { - reader <- build(path) - readNext = for { - value <- ZIO.attemptBlockingIO(reader.read()) - record <- if (value != null) - decoder.decodeZIO(value) - else - ZIO.succeed(null.asInstanceOf[A]) - } yield record - builder = Chunk.newBuilder[A] initial <- readNext _ <- { var current = initial @@ -62,15 +93,27 @@ final class ParquetReaderLive[A <: Product: Tag]( } } yield builder.result() ) + } - private def build(path: Path): ZIO[Scope, IOException, HadoopParquetReader[RecordValue]] = + private def build[B]( + path: Path, + filter: Option[CompiledPredicate] + ): ZIO[Scope, IOException, HadoopParquetReader[RecordValue]] = for { - inputFile <- path.toInputFileZIO(hadoopConf) - reader <- ZIO.fromAutoCloseable( - ZIO.attemptBlockingIO( - new ParquetReader.Builder(inputFile, schema, schemaEncoder).withConf(hadoopConf).build() - ) - ) + inputFile <- path.toInputFileZIO(hadoopConf) + compiledFilter <- ZIO.foreach(filter) { pred => + ZIO + .fromEither(pred) + .mapError(new IOException(_)) + } + reader <- ZIO.fromAutoCloseable( + ZIO.attemptBlockingIO { + val builder = new ParquetReader.Builder(inputFile, schema, schemaEncoder) + + compiledFilter.foreach(pred => builder.withFilter(FilterCompat.get(pred))) + builder.withConf(hadoopConf).build() + } + ) } yield reader } @@ -88,14 +131,17 @@ object ParquetReader { } - def configured[A <: Product: ValueDecoder]( + def configured[A <: Product: ValueDecoder: Tag]( hadoopConf: Configuration = new Configuration() - )(implicit tag: Tag[A]): ULayer[ParquetReader[A]] = + ): ULayer[ParquetReader[A]] = ZLayer.succeed(new ParquetReaderLive[A](hadoopConf)) - def projected[A <: Product: ValueDecoder]( + def projected[A <: Product: ValueDecoder: Tag]( hadoopConf: Configuration = new Configuration() - )(implicit schema: Schema[A], schemaEncoder: SchemaEncoder[A], tag: Tag[A]): ULayer[ParquetReader[A]] = + )(implicit + schema: Schema[A], + schemaEncoder: SchemaEncoder[A] + ): ULayer[ParquetReader[A]] = ZLayer.succeed(new ParquetReaderLive[A](hadoopConf, Some(schema), Some(schemaEncoder))) } diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/package.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/package.scala index bff5e61..c0211bd 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/package.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/package.scala @@ -10,4 +10,8 @@ package object core { val DECIMAL_PRECISION = 11 val DECIMAL_SCALE = 2 + type Lens[F, S, A] = filter.Column.Named[A, F] + type Prism[F, S, A] = Unit + type Traversal[S, A] = Unit + } diff --git a/modules/core/src/test/scala-2.13+/me/mnedokushev/zio/apache/parquet/core/Fixtures.scala b/modules/core/src/test/scala-2.13+/me/mnedokushev/zio/apache/parquet/core/Fixtures.scala new file mode 100644 index 0000000..c6c5b43 --- /dev/null +++ b/modules/core/src/test/scala-2.13+/me/mnedokushev/zio/apache/parquet/core/Fixtures.scala @@ -0,0 +1,234 @@ +package me.mnedokushev.zio.apache.parquet.core + +import me.mnedokushev.zio.apache.parquet.core.codec.{ + SchemaEncoder, + SchemaEncoderDeriver, + ValueDecoder, + ValueDecoderDeriver, + ValueEncoder, + ValueEncoderDeriver +} +import me.mnedokushev.zio.apache.parquet.core.filter.{ TypeTag, TypeTagDeriver } +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn +import org.apache.parquet.io.api.Binary +import zio.Chunk +import zio.schema._ + +import java.time._ +import java.util.UUID + +object Fixtures { + + case class MyRecord(a: String, b: Int, child: MyRecord.Child, enm: MyRecord.Enum, opt: Option[Int]) + + object MyRecord { + implicit val schema: Schema.CaseClass5.WithFields[ + "a", + "b", + "child", + "enm", + "opt", + String, + Int, + MyRecord.Child, + MyRecord.Enum, + Option[Int], + MyRecord + ] = + DeriveSchema.gen[MyRecord] + implicit val typeTag: TypeTag[MyRecord] = + Derive.derive[TypeTag, MyRecord](TypeTagDeriver.default) + + case class Child(c: Int, d: Option[Long]) + object Child { + implicit val schema: Schema.CaseClass2.WithFields["c", "d", Int, Option[Long], MyRecord.Child] = + DeriveSchema.gen[Child] + implicit val typeTag: TypeTag[Child] = + Derive.derive[TypeTag, Child](TypeTagDeriver.default) + } + + sealed trait Enum + object Enum { + case object Started extends Enum + case object InProgress extends Enum + case object Done extends Enum + + implicit val schema: Schema[Enum] = + DeriveSchema.gen[Enum] + implicit val typeTag: TypeTag[Enum] = + Derive.derive[TypeTag, Enum](TypeTagDeriver.default) + } + } + + case class MyRecordSummoned(a: Int, b: String) + + object MyRecordSummoned { + implicit val schema: zio.schema.Schema.CaseClass2.WithFields["a", "b", Int, String, MyRecordSummoned] = + DeriveSchema.gen[MyRecordSummoned] + + implicit val intTypeTag: TypeTag.EqNotEq[Int] = + TypeTag.eqnoteq[Int, Binary, BinaryColumn]( + FilterApi.binaryColumn, + v => Value.string(v.toString).value + ) + implicit val typeTag: TypeTag[MyRecordSummoned] = Derive.derive[TypeTag, MyRecordSummoned](TypeTagDeriver.summoned) + } + + case class MyRecordIO(a: Int, b: String, c: Option[Long], d: List[Int], e: Map[String, Int]) + object MyRecordIO { + implicit val schema: zio.schema.Schema.CaseClass5.WithFields[ + "a", + "b", + "c", + "d", + "e", + Int, + String, + Option[Long], + List[Int], + Map[String, Int], + MyRecordIO + ] = + DeriveSchema.gen[MyRecordIO] + implicit val schemaEncoder: SchemaEncoder[MyRecordIO] = + Derive.derive[SchemaEncoder, MyRecordIO](SchemaEncoderDeriver.summoned) + implicit val valueEncoder: ValueEncoder[MyRecordIO] = + Derive.derive[ValueEncoder, MyRecordIO](ValueEncoderDeriver.summoned) + implicit val valueDecoder: ValueDecoder[MyRecordIO] = + Derive.derive[ValueDecoder, MyRecordIO](ValueDecoderDeriver.summoned) + implicit val typeTag: TypeTag[MyRecordIO] = + Derive.derive[TypeTag, MyRecordIO](TypeTagDeriver.default) + } + + case class MyProjectedRecordIO(a: Int, c: Option[Long], d: List[Int], e: Map[String, Int]) + object MyProjectedRecordIO { + implicit val schema: zio.schema.Schema.CaseClass4.WithFields[ + "a", + "c", + "d", + "e", + Int, + Option[Long], + List[Int], + Map[String, Int], + MyProjectedRecordIO + ] = + DeriveSchema.gen[MyProjectedRecordIO] + implicit val schemaEncoder: SchemaEncoder[MyProjectedRecordIO] = + Derive.derive[SchemaEncoder, MyProjectedRecordIO](SchemaEncoderDeriver.summoned) + implicit val valueEncoder: ValueEncoder[MyProjectedRecordIO] = + Derive.derive[ValueEncoder, MyProjectedRecordIO](ValueEncoderDeriver.summoned) + implicit val valueDecoder: ValueDecoder[MyProjectedRecordIO] = + Derive.derive[ValueDecoder, MyProjectedRecordIO](ValueDecoderDeriver.summoned) + } + + case class MyRecordAllTypes1( + string: String, + boolean: Boolean, + byte: Byte, + short: Short, + int: Int, + long: Long, + float: Float, + double: Double, + binary: Chunk[Byte], + char: Char, + uuid: UUID, + bigDecimal: java.math.BigDecimal, + bigInteger: java.math.BigInteger, + dayOfWeek: DayOfWeek, + month: Month, + monthDay: MonthDay, + period: Period, + year: Year, + yearMonth: YearMonth, + zoneId: ZoneId, + zoneOffset: ZoneOffset + ) + object MyRecordAllTypes1 { + implicit val schema: zio.schema.Schema.CaseClass21.WithFields[ + "string", + "boolean", + "byte", + "short", + "int", + "long", + "float", + "double", + "binary", + "char", + "uuid", + "bigDecimal", + "bigInteger", + "dayOfWeek", + "month", + "monthDay", + "period", + "year", + "yearMonth", + "zoneId", + "zoneOffset", + String, + Boolean, + Byte, + Short, + Int, + Long, + Float, + Double, + zio.Chunk[Byte], + Char, + java.util.UUID, + java.math.BigDecimal, + java.math.BigInteger, + java.time.DayOfWeek, + java.time.Month, + java.time.MonthDay, + java.time.Period, + java.time.Year, + java.time.YearMonth, + java.time.ZoneId, + java.time.ZoneOffset, + MyRecordAllTypes1 + ] = + DeriveSchema.gen[MyRecordAllTypes1] + implicit val typeTag: TypeTag[MyRecordAllTypes1] = + Derive.derive[TypeTag, MyRecordAllTypes1](TypeTagDeriver.default) + } + case class MyRecordAllTypes2( + duration: Duration, + instant: Instant, + localDate: LocalDate, + localTime: LocalTime, + localDateTime: LocalDateTime, + offsetTime: OffsetTime, + offsetDateTime: OffsetDateTime, + zonedDateTime: ZonedDateTime + ) + object MyRecordAllTypes2 { + implicit val schema: zio.schema.Schema.CaseClass8.WithFields[ + "duration", + "instant", + "localDate", + "localTime", + "localDateTime", + "offsetTime", + "offsetDateTime", + "zonedDateTime", + java.time.Duration, + java.time.Instant, + java.time.LocalDate, + java.time.LocalTime, + java.time.LocalDateTime, + java.time.OffsetTime, + java.time.OffsetDateTime, + java.time.ZonedDateTime, + MyRecordAllTypes2 + ] = + DeriveSchema.gen[MyRecordAllTypes2] + implicit val typeTag: TypeTag[MyRecordAllTypes2] = + Derive.derive[TypeTag, MyRecordAllTypes2](TypeTagDeriver.default) + } + +} diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala index 82a2938..b9a4d85 100644 --- a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala @@ -146,12 +146,14 @@ object ValueCodecDeriverSpec extends ZIOSpecDefault { val offsetDateTimePayload = OffsetDateTime.of(localDateTimePayload, zoneOffsetPayload) val zonedDateTimePayload = ZonedDateTime.of(localDateTimePayload, zoneIdPayload) + // TODO: fails when the current time is after midnight val expectedOffsetTimeUTC = { - val timeNanos = offsetTimePayload.toLocalTime.toNanoOfDay - val offsetNanos = offsetTimePayload.getOffset.getTotalSeconds * NANOS_FACTOR - val dayNanos = timeNanos - offsetNanos + val timeNanos = offsetTimePayload.toLocalTime.toNanoOfDay + val offsetNanos = offsetTimePayload.getOffset.getTotalSeconds * NANOS_FACTOR + val timeOffsetNanos = timeNanos - offsetNanos + val nanoOfDay = if (timeOffsetNanos < 0) NANOS_PER_DAY - timeOffsetNanos else timeOffsetNanos - OffsetTime.of(LocalTime.ofNanoOfDay(dayNanos), ZoneOffset.UTC) + OffsetTime.of(LocalTime.ofNanoOfDay(nanoOfDay), ZoneOffset.UTC) } val expectedOffsetDateTimeUTC = { diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/filter/ExprSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/filter/ExprSpec.scala new file mode 100644 index 0000000..72bdcb8 --- /dev/null +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/filter/ExprSpec.scala @@ -0,0 +1,352 @@ +package me.mnedokushev.zio.apache.parquet.core.filter + +import me.mnedokushev.zio.apache.parquet.core.Fixtures._ +import me.mnedokushev.zio.apache.parquet.core.Value +import me.mnedokushev.zio.apache.parquet.core.filter.TypeTag._ +import me.mnedokushev.zio.apache.parquet.core.filter.syntax._ +import org.apache.parquet.filter2.predicate.FilterApi +import zio._ +import zio.test.Assertion.{ equalTo, isRight } +import zio.test._ + +import java.time._ +import java.util.UUID +import scala.jdk.CollectionConverters._ + +object ExprSpec extends ZIOSpecDefault { + + override def spec: Spec[TestEnvironment & Scope, Any] = + suite("ExprSpec")( + test("compile all operators") { + val (a, b, _, _, _) = Filter[MyRecord].columns + + val result = filter( + not( + (b >= 3 `or` b <= 100 `and` a.in(Set("foo", "bar"))) `or` + (a === "foo" `and` (b === 20 `or` b.notIn(Set(1, 2, 3)))) `or` + (a =!= "foo" `and` b > 2 `and` b < 10) + ) + ) + + val acol = FilterApi.binaryColumn("a") + val bcol = FilterApi.intColumn("b") + val expected = + FilterApi.not( + FilterApi.or( + FilterApi.or( + FilterApi.and( + FilterApi.or( + FilterApi.gtEq(bcol, Int.box(Value.int(3).value)), + FilterApi.ltEq(bcol, Int.box(Value.int(100).value)) + ), + FilterApi.in(acol, Set(Value.string("foo").value, Value.string("bar").value).asJava) + ), + FilterApi.and( + FilterApi.eq(acol, Value.string("foo").value), + FilterApi.or( + FilterApi.eq(bcol, Int.box(Value.int(20).value)), + FilterApi.notIn(bcol, Set(1, 2, 3).map(i => Int.box(Value.int(i).value)).asJava) + ) + ) + ), + FilterApi.and( + FilterApi.and( + FilterApi.notEq(acol, Value.string("foo").value), + FilterApi.gt(bcol, Int.box(Value.int(2).value)) + ), + FilterApi.lt(bcol, Int.box(Value.int(10).value)) + ) + ) + ) + + assert(result)(isRight(equalTo(expected))) + }, + test("compile summoned") { + val (a, b) = Filter[MyRecordSummoned].columns + + val result = filter( + a === 3 `and` b === "foo" + ) + + val acol = FilterApi.binaryColumn("a") + val bcol = FilterApi.binaryColumn("b") + val expected = FilterApi.and( + FilterApi.eq(acol, Value.string("3").value), + FilterApi.eq(bcol, Value.string("foo").value) + ) + + assert(result)(isRight(equalTo(expected))) + }, + test("compile all primitive types") { + val ( + string, + boolean, + byte, + short, + int, + long, + float, + double, + binary, + char, + uuid, + bigDecimal, + bigInteger, + dayOfWeek, + month, + monthDay, + period, + year, + yearMonth, + zoneId, + zoneOffset + ) = Filter[MyRecordAllTypes1].columns + + val ( + duration, + instant, + localDate, + localTime, + localDateTime, + offsetTime, + offsetDateTime, + zonedDateTime + ) = Filter[MyRecordAllTypes2].columns + + val stringPayload = "foo" + val booleanPayload = true + val bytePayload = 1.toByte + val shortPayload = 1.toShort + val intPayload = 1 + val longPayload = 1L + val floatPayload = 1.0f + val doublePayload = 1.0 + val binaryPayload = Chunk(1.toByte, 2.toByte) + val charPayload = 'c' + val uuidPayload = UUID.randomUUID() + val bigDecimalPayload = new java.math.BigDecimal("1.0") + val bigIntegerPayload = new java.math.BigInteger("99999999999") + val dayOfWeekPayload = DayOfWeek.of(1) + val monthPayload = Month.of(1) + val monthDayPayload = MonthDay.of(1, 1) + val periodPayload = Period.of(1, 1, 1) + val yearPayload = Year.of(1) + val yearMonthPayload = YearMonth.of(1, 1) + val zoneIdPayload = ZoneId.of("Europe/Paris") + val zoneOffsetPayload = ZoneOffset.of("+02:00") + val durationPayload = 1.second + val instantPayload = Instant.ofEpochMilli(1) + val localDatePayload = LocalDate.ofEpochDay(1) + val localTimePayload = LocalTime.ofInstant(instantPayload, zoneIdPayload) + val localDateTimePayload = LocalDateTime.of(localDatePayload, localTimePayload) + val offsetTimePayload = OffsetTime.ofInstant(instantPayload, zoneIdPayload) + val offsetDateTimePayload = OffsetDateTime.ofInstant(instantPayload, zoneIdPayload) + val zonedDateTimePayload = ZonedDateTime.ofInstant(localDateTimePayload, zoneOffsetPayload, zoneIdPayload) + + val stringExpected = FilterApi.eq( + FilterApi.binaryColumn("string"), + Value.string(stringPayload).value + ) + val booleanExpected = FilterApi.eq( + FilterApi.booleanColumn("boolean"), + Boolean.box(Value.boolean(booleanPayload).value) + ) + val byteExpected = FilterApi.eq( + FilterApi.intColumn("byte"), + Int.box(Value.byte(bytePayload).value) + ) + val shortExpected = FilterApi.eq( + FilterApi.intColumn("short"), + Int.box(Value.short(shortPayload).value) + ) + val intExpected = FilterApi.eq( + FilterApi.intColumn("int"), + Int.box(Value.int(intPayload).value) + ) + val longExpected = FilterApi.eq( + FilterApi.longColumn("long"), + Long.box(Value.long(longPayload).value) + ) + val floatExpected = FilterApi.eq( + FilterApi.floatColumn("float"), + Float.box(Value.float(floatPayload).value) + ) + val doubleExpected = FilterApi.eq( + FilterApi.doubleColumn("double"), + Double.box(Value.double(doublePayload).value) + ) + val binaryExpected = FilterApi.eq( + FilterApi.binaryColumn("binary"), + Value.binary(binaryPayload).value + ) + val charExpected = FilterApi.eq( + FilterApi.intColumn("char"), + Int.box(Value.char(charPayload).value) + ) + val uuidExpected = FilterApi.eq( + FilterApi.binaryColumn("uuid"), + Value.uuid(uuidPayload).value + ) + val bigDecimalExpected = FilterApi.eq( + FilterApi.longColumn("bigDecimal"), + Long.box(Value.bigDecimal(bigDecimalPayload).value) + ) + val bigIntegerExpected = FilterApi.eq( + FilterApi.binaryColumn("bigInteger"), + Value.bigInteger(bigIntegerPayload).value + ) + val dayOfWeekExpected = FilterApi.eq( + FilterApi.intColumn("dayOfWeek"), + Int.box(Value.dayOfWeek(dayOfWeekPayload).value) + ) + val monthExpected = FilterApi.eq( + FilterApi.intColumn("month"), + Int.box(Value.month(monthPayload).value) + ) + val monthDayExpected = FilterApi.eq( + FilterApi.binaryColumn("monthDay"), + Value.monthDay(monthDayPayload).value + ) + val periodExpected = FilterApi.eq( + FilterApi.binaryColumn("period"), + Value.period(periodPayload).value + ) + val yearExpected = FilterApi.eq( + FilterApi.intColumn("year"), + Int.box(Value.year(yearPayload).value) + ) + val yearMonthExpected = FilterApi.eq( + FilterApi.binaryColumn("yearMonth"), + Value.yearMonth(yearMonthPayload).value + ) + val zoneIdExpected = FilterApi.eq( + FilterApi.binaryColumn("zoneId"), + Value.zoneId(zoneIdPayload).value + ) + val zoneOffsetExpected = FilterApi.eq( + FilterApi.binaryColumn("zoneOffset"), + Value.zoneOffset(zoneOffsetPayload).value + ) + val durationExpected = FilterApi.eq( + FilterApi.longColumn("duration"), + Long.box(Value.duration(durationPayload).value) + ) + val instantExpected = FilterApi.eq( + FilterApi.longColumn("instant"), + Long.box(Value.instant(instantPayload).value) + ) + val localDateExpected = FilterApi.eq( + FilterApi.intColumn("localDate"), + Int.box(Value.localDate(localDatePayload).value) + ) + val localTimeExpected = FilterApi.eq( + FilterApi.intColumn("localTime"), + Int.box(Value.localTime(localTimePayload).value) + ) + val localDateTimeExpected = FilterApi.eq( + FilterApi.longColumn("localDateTime"), + Long.box(Value.localDateTime(localDateTimePayload).value) + ) + val offsetTimeExpected = FilterApi.eq( + FilterApi.intColumn("offsetTime"), + Int.box(Value.offsetTime(offsetTimePayload).value) + ) + val offsetDateTimeExpected = FilterApi.eq( + FilterApi.longColumn("offsetDateTime"), + Long.box(Value.offsetDateTime(offsetDateTimePayload).value) + ) + val zonedDateTimeExpected = FilterApi.eq( + FilterApi.longColumn("zonedDateTime"), + Long.box(Value.zonedDateTime(zonedDateTimePayload).value) + ) + + val stringResul = filter(string === stringPayload) + val booleanResult = filter(boolean === booleanPayload) + val byteResult = filter(byte === bytePayload) + val shortResult = filter(short === shortPayload) + val intResult = filter(int === intPayload) + val longResult = filter(long === longPayload) + val floatResult = filter(float === floatPayload) + val doubleResult = filter(double === doublePayload) + val binaryResult = filter(binary === binaryPayload) + val charResult = filter(char === charPayload) + val uuidResult = filter(uuid === uuidPayload) + val bigDecimalResult = filter(bigDecimal === bigDecimalPayload) + val bigIntegerResult = filter(bigInteger === bigIntegerPayload) + val dayOfWeekResult = filter(dayOfWeek === dayOfWeekPayload) + val monthResult = filter(month === monthPayload) + val monthDayResult = filter(monthDay === monthDayPayload) + val periodResult = filter(period === periodPayload) + val yearResult = filter(year === yearPayload) + val yearMonthResult = filter(yearMonth === yearMonthPayload) + val zoneIdResult = filter(zoneId === zoneIdPayload) + val zoneOffsetResult = filter(zoneOffset === zoneOffsetPayload) + val durationResult = filter(duration === durationPayload) + val instantResult = filter(instant === instantPayload) + val localDateResult = filter(localDate === localDatePayload) + val localTimeResult = filter(localTime === localTimePayload) + val localDateTimeResult = filter(localDateTime === localDateTimePayload) + val offsetTimeResult = filter(offsetTime === offsetTimePayload) + val offsetDateTimeResult = filter(offsetDateTime === offsetDateTimePayload) + val zonedDateTimeResult = filter(zonedDateTime === zonedDateTimePayload) + + assert(stringResul)(isRight(equalTo(stringExpected))) && + assert(booleanResult)(isRight(equalTo(booleanExpected))) && + assert(byteResult)(isRight(equalTo(byteExpected))) && + assert(shortResult)(isRight(equalTo(shortExpected))) && + assert(intResult)(isRight(equalTo(intExpected))) && + assert(longResult)(isRight(equalTo(longExpected))) && + assert(floatResult)(isRight(equalTo(floatExpected))) && + assert(doubleResult)(isRight(equalTo(doubleExpected))) && + assert(binaryResult)(isRight(equalTo(binaryExpected))) && + assert(charResult)(isRight(equalTo(charExpected))) && + assert(uuidResult)(isRight(equalTo(uuidExpected))) && + assert(bigDecimalResult)(isRight(equalTo(bigDecimalExpected))) && + assert(bigIntegerResult)(isRight(equalTo(bigIntegerExpected))) && + assert(dayOfWeekResult)(isRight(equalTo(dayOfWeekExpected))) && + assert(monthResult)(isRight(equalTo(monthExpected))) && + assert(monthDayResult)(isRight(equalTo(monthDayExpected))) && + assert(periodResult)(isRight(equalTo(periodExpected))) && + assert(yearResult)(isRight(equalTo(yearExpected))) && + assert(yearMonthResult)(isRight(equalTo(yearMonthExpected))) && + assert(zoneIdResult)(isRight(equalTo(zoneIdExpected))) && + assert(zoneOffsetResult)(isRight(equalTo(zoneOffsetExpected))) && + assert(durationResult)(isRight(equalTo(durationExpected))) && + assert(instantResult)(isRight(equalTo(instantExpected))) && + assert(localDateResult)(isRight(equalTo(localDateExpected))) && + assert(localTimeResult)(isRight(equalTo(localTimeExpected))) && + assert(localDateTimeResult)(isRight(equalTo(localDateTimeExpected))) && + assert(offsetTimeResult)(isRight(equalTo(offsetTimeExpected))) && + assert(offsetDateTimeResult)(isRight(equalTo(offsetDateTimeExpected))) && + assert(zonedDateTimeResult)(isRight(equalTo(zonedDateTimeExpected))) + }, + test("compile option") { + // TODO: test failing compile-time cases + val (_, _, _, _, opt) = Filter[MyRecord].columns + + val expected = FilterApi.gt(FilterApi.intColumn("opt"), Int.box(Value.int(3).value)) + val result = filter(opt.nullable > 3) + + assert(result)(isRight(equalTo(expected))) + }, + test("compile enum") { + val (_, _, _, enm, _) = Filter[MyRecord].columns + + val result = filter(enm === MyRecord.Enum.Done) + val expected = FilterApi.eq(FilterApi.binaryColumn("enm"), Value.string("Done").value) + + assert(result)(isRight(equalTo(expected))) + }, + test("column path concatenation") { + // TODO: test failing compile-time cases + // Show the macro determines the names of the parent/child fields no matter how we name + // the variables that represent columns + val (_, _, child0, _, _) = Filter[MyRecord].columns + val (c0, d0) = Filter[MyRecord.Child].columns + + assert(concat(child0, c0).path)(equalTo("child.c")) && + assert(concat(child0, d0).path)(equalTo("child.d")) + } + ) + +} diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala index 4b2fda5..83ed1a1 100644 --- a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala @@ -1,9 +1,11 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop -import me.mnedokushev.zio.apache.parquet.core.codec._ +import me.mnedokushev.zio.apache.parquet.core.Fixtures._ +import me.mnedokushev.zio.apache.parquet.core.filter.Filter +import me.mnedokushev.zio.apache.parquet.core.filter.syntax._ import zio._ -import zio.schema._ import zio.stream._ +import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -16,78 +18,70 @@ object ParquetIOSpec extends ZIOSpecDefault { val tmpCrcPath = tmpDir / ".parquet-writer-spec.parquet.crc" val tmpPath = tmpDir / tmpFile - case class Record(a: Int, b: String, c: Option[Long], d: List[Int], e: Map[String, Int]) - object Record { - implicit val schema: Schema[Record] = - DeriveSchema.gen[Record] - implicit val schemaEncoder: SchemaEncoder[Record] = - Derive.derive[SchemaEncoder, Record](SchemaEncoderDeriver.summoned) - implicit val valueEncoder: ValueEncoder[Record] = - Derive.derive[ValueEncoder, Record](ValueEncoderDeriver.summoned) - implicit val valueDecoder: ValueDecoder[Record] = - Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.summoned) - } - - case class ProjectedRecord(a: Int, c: Option[Long], d: List[Int], e: Map[String, Int]) - object ProjectedRecord { - implicit val schema: Schema[ProjectedRecord] = - DeriveSchema.gen[ProjectedRecord] - implicit val schemaEncoder: SchemaEncoder[ProjectedRecord] = - Derive.derive[SchemaEncoder, ProjectedRecord](SchemaEncoderDeriver.summoned) - implicit val valueEncoder: ValueEncoder[ProjectedRecord] = - Derive.derive[ValueEncoder, ProjectedRecord](ValueEncoderDeriver.summoned) - implicit val valueDecoder: ValueDecoder[ProjectedRecord] = - Derive.derive[ValueDecoder, ProjectedRecord](ValueDecoderDeriver.summoned) - } - override def spec: Spec[TestEnvironment & Scope, Any] = suite("ParquetIOSpec")( test("write and read - chunk") { val payload = Chunk( - Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), - Record(2, "bar", Some(3L), List.empty, Map("third" -> 3)) + MyRecordIO(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), + MyRecordIO(2, "bar", Some(3L), List.empty, Map("third" -> 3)) ) for { - writer <- ZIO.service[ParquetWriter[Record]] - reader <- ZIO.service[ParquetReader[Record]] + writer <- ZIO.service[ParquetWriter[MyRecordIO]] + reader <- ZIO.service[ParquetReader[MyRecordIO]] _ <- writer.writeChunk(tmpPath, payload) result <- reader.readChunk(tmpPath) } yield assertTrue(result == payload) } @@ after(cleanTmpFile(tmpDir)), test("write and read - stream") { val payload = Chunk( - Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), - Record(2, "bar", Some(3L), List.empty, Map("third" -> 3)) + MyRecordIO(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), + MyRecordIO(2, "bar", Some(3L), List.empty, Map("third" -> 3)) ) for { - writer <- ZIO.service[ParquetWriter[Record]] - reader <- ZIO.service[ParquetReader[Record]] + writer <- ZIO.service[ParquetWriter[MyRecordIO]] + reader <- ZIO.service[ParquetReader[MyRecordIO]] _ <- writer.writeStream(tmpPath, ZStream.fromChunk(payload)) resultStream <- ZIO.scoped[Any](reader.readStream(tmpPath).runCollect) } yield assertTrue(resultStream == payload) } @@ after(cleanTmpFile(tmpDir)), test("write full and read projected") { val payload = Chunk( - Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), - Record(2, "bar", Some(3L), List.empty, Map("third" -> 3)) + MyRecordIO(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), + MyRecordIO(2, "bar", Some(3L), List.empty, Map("third" -> 3)) ) val projectedPayload = payload.map { r => - ProjectedRecord(r.a, r.c, r.d, r.e) + MyProjectedRecordIO(r.a, r.c, r.d, r.e) } for { - writer <- ZIO.service[ParquetWriter[Record]] - reader <- ZIO.service[ParquetReader[ProjectedRecord]] + writer <- ZIO.service[ParquetWriter[MyRecordIO]] + reader <- ZIO.service[ParquetReader[MyProjectedRecordIO]] _ <- writer.writeChunk(tmpPath, payload) result <- reader.readChunk(tmpPath) } yield assertTrue(result == projectedPayload) + } @@ after(cleanTmpFile(tmpDir)), + test("write and read with filter") { + val payload = Chunk( + MyRecordIO(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), + MyRecordIO(2, "foo", None, List(1, 2), Map.empty), + MyRecordIO(3, "bar", Some(3L), List.empty, Map("third" -> 3)), + MyRecordIO(4, "baz", None, List.empty, Map("fourth" -> 3)) + ) + val (id, name, _, _, _) = Filter[MyRecordIO].columns + + for { + writer <- ZIO.service[ParquetWriter[MyRecordIO]] + reader <- ZIO.service[ParquetReader[MyRecordIO]] + _ <- writer.writeChunk(tmpPath, payload) + result <- reader.readChunkFiltered(tmpPath, filter(id > 1 `and` name =!= "foo")) + } yield assertTrue(result.size == 2) && assert(result)(equalTo(payload.drop(2))) } @@ after(cleanTmpFile(tmpDir)) ).provide( - ParquetWriter.configured[Record](), - ParquetReader.configured[Record](), - ParquetReader.projected[ProjectedRecord]() + ParquetWriter.configured[MyRecordIO](), + ParquetReader.configured[MyRecordIO](), + ParquetReader.projected[MyProjectedRecordIO]() ) @@ sequential private def cleanTmpFile(path: Path) = diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index 1cd6391..d4047fd 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -10,7 +10,17 @@ object BuildHelper { libraryDependencies ++= betterMonadicFor(scalaVersion.value), semanticdbEnabled := true, semanticdbVersion := scalafixSemanticdb.revision, - Test / fork := true + Test / fork := true, + Test / unmanagedSourceDirectories ++= crossVersionSources(scalaVersion.value, "test", baseDirectory.value), + Test / unmanagedSourceDirectories ++= crossVersionSources(scalaVersion.value, "main", baseDirectory.value), + libraryDependencies ++= { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, _)) => + Seq(Dep.scalaReflect.value) + case _ => Seq.empty + } + }, + scalacOptions ++= source3Compatibility(scalaVersion.value) ) val Scala212 = "2.12.19" @@ -23,4 +33,26 @@ object BuildHelper { case _ => Seq() } + private def source3Compatibility(scalaVersion: String) = + CrossVersion.partialVersion(scalaVersion) match { + case Some((2, 13)) => Seq("-Xsource:3") + case _ => Seq() + } + + def crossVersionSources(scalaVersion: String, conf: String, baseDirectory: File): List[File] = { + val versions = CrossVersion.partialVersion(scalaVersion) match { + case Some((2, 13)) => + List("2", "2.13", "2.13+") + case Some((3, _)) => + List("2.13+", "3") + case _ => + List.empty + } + + for { + version <- "scala" :: versions.map("scala-" + _) + file = baseDirectory / "src" / conf / version if file.exists + } yield file + } + } diff --git a/project/Dep.scala b/project/Dep.scala index 5d64019..aad8a0a 100644 --- a/project/Dep.scala +++ b/project/Dep.scala @@ -1,4 +1,5 @@ import sbt._ +import sbt.Keys.scalaVersion object Dep { @@ -13,9 +14,9 @@ object Dep { object O { val apacheParquet = "org.apache.parquet" val apacheHadoop = "org.apache.hadoop" - val scalaLang = "org.scala-lang" val zio = "dev.zio" - val scalaLangModules = "org.scala-lang.modules" + val scalaLang = "org.scala-lang" + val scalaLangModules = s"$scalaLang.modules" } lazy val zio = O.zio %% "zio" % V.zio @@ -32,6 +33,8 @@ object Dep { lazy val scalaCollectionCompat = O.scalaLangModules %% "scala-collection-compat" % V.scalaCollectionCompat + lazy val scalaReflect = Def.setting("org.scala-lang" % "scala-reflect" % scalaVersion.value) + lazy val core = Seq( zio, zioSchema,