Skip to content

Commit

Permalink
Read/write parquet files using ZIO Schema derivation (PoC) (#1)
Browse files Browse the repository at this point in the history
* Add apache parquet dep

* Add Value.scala

* Start working on SchemaEncoder, added Schemas

* Schemas code deduplication

* Implement deriveSequence for SchemaEncoder

* Add test case for summoned SchemaEncoder deriver

* Basic implementation for ValueEncoder deriver

* Add test case for record case of ValueEncoder deriver

* Add test case for summoning ValueEncoder

* Remove worksheet

* Implement ParquetWriter

* Basic implementation for ValueDecoder deriver

* Add Path.toHadoop method

* Implement ParquetReader

* scalafmt

* scalafix

* Fix unused Tag[A] warning

* Fix scala 2.13 compilation error

* Add docs

* Implement map case for SchemaEncoder

* Fix list converter

* Fix list and map converters

* Update README

* Improve codec tests

* Add type annotation for Schemas.map
  • Loading branch information
grouzen authored Dec 11, 2023
1 parent 042db40 commit 286a160
Show file tree
Hide file tree
Showing 24 changed files with 1,858 additions and 22 deletions.
232 changes: 231 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,231 @@
# zio-apache-parquet
![Build status](https://github.com/grouzen/zio-apache-parquet/actions/workflows/ci.yml/badge.svg)
![Sonatype Nexus (Releases)](https://img.shields.io/nexus/r/me.mnedokushev/zio-apache-parquet-core_2.13?server=https%3A%2F%2Foss.sonatype.org)
![Sonatype Nexus (Snapshots)](https://img.shields.io/nexus/s/me.mnedokushev/zio-apache-parquet-core_2.13?server=https%3A%2F%2Foss.sonatype.org)
[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org)

# ZIO Apache Parquet

ZIO based wrapper for [Apache Parquet Java implementation](https://github.com/apache/parquet-mr) that
leverages [ZIO Schema](https://zio.dev/zio-schema/) to derive codecs

# Overview

## Installation

```scala
libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION@"
```

## Codecs

To be able to write/read data to/from parquet files you need to define the following schema and value codecs
`SchemaEncoder`, `ValueEncoder`, and `ValueDecoder` for your case classes.

### Schema

You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver:

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
}

import MyRecord._

val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)

println(parquetSchema)
// Outputs:
// required group my_record {
// required int32 a (INTEGER(32,true));
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
```

Alternatively, you can override the schemas of some fields in your record by defining a custom `SchemaEncoder`
and using `SchemaEncoderDeriver.summoned` deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val intEncoder: SchemaEncoder[Int] = new SchemaEncoder[Int] {
override def encode(schema: Schema[Int], name: String, optional: Boolean) =
Schemas.uuid.optionality(optional).named(name)
}
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned)
}

import MyRecord._

val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)

println(parquetSchema)
// Outputs:
// required group my_record {
// required fixed_len_byte_array(16) a (UUID);
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
```

### Value

There is a sealed hierarchy of `Value` types for interop between Scala values and Parquet readers/writers.
For converting Scala values into `Value` and back we need to define instances of `ValueEncoder` and `ValueDecoder`
type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}

import MyRecord._

val value = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)

println(value)
// Outputs:
// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
```

Same as for `SchemaEncoder`, you can override the schemas of some fields in your record by defining custom
`ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

import java.nio.charset.StandardCharsets

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val intEncoder: ValueEncoder[Int] = new ValueEncoder[Int] {
override def encode(value: Int): Value =
Value.string(value.toString)
}
implicit val intDecoder: ValueDecoder[Int] = new ValueDecoder[Int] {
override def decode(value: Value): Int =
value match {
case Value.PrimitiveValue.BinaryValue(v) =>
new String(v.getBytes, StandardCharsets.UTF_8).toInt
case other =>
throw DecoderError(s"Wrong value: $other")
}
}
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned)
}

import MyRecord._

val value = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)

println(value)
// Outputs:
// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
```

## Reading/Writing files

Finally, to perform some IO operations we need to initialize `ParquetWriter` and `ParquetReader`.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path }
import zio._

import java.nio.file.Files

case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}

val data =
Chunk(
MyRecord(1, "first", Some(11)),
MyRecord(3, "third", None)
)

val tmpDir = Path(Files.createTempDirectory("records"))
val recordsFile = tmpDir / "records.parquet"

Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe
.run(
(for {
writer <- ZIO.service[ParquetWriter[MyRecord]]
reader <- ZIO.service[ParquetReader[MyRecord]]
_ <- writer.write(data)
_ <- writer.close // force to flush parquet data on disk
fromFile <- ZIO.scoped(reader.read(recordsFile).runCollect)
_ <- Console.printLine(fromFile)
} yield ()).provide(
ParquetWriter.configured[MyRecord](recordsFile),
ParquetReader.configured[MyRecord]()
)
)
.getOrThrowFiberFailure()
}
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))
```
17 changes: 1 addition & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ inThisBuild(
lazy val root =
project
.in(file("."))
.aggregate(core, docs)
.aggregate(core)
.settings(publish / skip := true)

lazy val core =
Expand All @@ -49,18 +49,3 @@ lazy val core =
libraryDependencies ++= Dep.core,
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
)

lazy val docs =
project
.in(file("docs"))
.dependsOn(core)
.settings(
name := "zio-apache-parquet-docs",
organization := "me.mnedokushev",
publish / skip := true,
mdocIn := file("docs/src/main/mdoc"),
mdocVariables := Map(
"VERSION" -> version.value
)
)
.enablePlugins(MdocPlugin)
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package me.mnedokushev.zio.apache.parquet.core

import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._
import zio.Chunk

object Schemas {

abstract class Def[Self <: Def[_]] {

def named(name: String): Type

def optionality(condition: Boolean): Self =
if (condition) optional else required

def required: Self

def optional: Self

}

case class PrimitiveDef(
typeName: PrimitiveTypeName,
annotation: LogicalTypeAnnotation,
isOptional: Boolean = false,
length: Int = 0
) extends Def[PrimitiveDef] {

def named(name: String): Type =
Types
.primitive(typeName, repetition(isOptional))
.as(annotation)
.length(length)
.named(name)

def length(len: Int): PrimitiveDef =
this.copy(length = len)

def required: PrimitiveDef =
this.copy(isOptional = false)

def optional: PrimitiveDef =
this.copy(isOptional = true)

}

case class RecordDef(fields: Chunk[Type], isOptional: Boolean = false) extends Def[RecordDef] {

def named(name: String): Type = {
val builder = Types.buildGroup(repetition(isOptional))

fields.foreach(builder.addField)
builder.named(name)
}

def required: RecordDef =
this.copy(isOptional = false)

def optional: RecordDef =
this.copy(isOptional = true)

}

case class ListDef(
element: Type,
isOptional: Boolean = false
) extends Def[ListDef] {

def named(name: String): Type =
Types
.list(repetition(isOptional))
.element(element)
.named(name)

def required: ListDef =
this.copy(isOptional = false)

def optional: ListDef =
this.copy(isOptional = true)

}

case class MapDef(key: Type, value: Type, isOptional: Boolean = false) extends Def[MapDef] {

override def named(name: String): Type =
Types
.map(repetition(isOptional))
.key(key)
.value(value)
.named(name)

override def required: MapDef =
this.copy(isOptional = false)

override def optional: MapDef =
this.copy(isOptional = true)

}

def repetition(optional: Boolean): Repetition =
if (optional) Repetition.OPTIONAL else Repetition.REQUIRED

import PrimitiveTypeName._
import LogicalTypeAnnotation._

val string: PrimitiveDef = PrimitiveDef(BINARY, stringType())
val boolean: PrimitiveDef = PrimitiveDef(INT32, intType(8, false))
val byte: PrimitiveDef = PrimitiveDef(INT32, intType(8, false))
val short: PrimitiveDef = PrimitiveDef(INT32, intType(16, true))
val int: PrimitiveDef = PrimitiveDef(INT32, intType(32, true))
val long: PrimitiveDef = PrimitiveDef(INT64, intType(64, true))
val uuid: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY, uuidType()).length(16)

def record(fields: Chunk[Type]): RecordDef = RecordDef(fields)
def list(element: Type): ListDef = ListDef(element)
def map(key: Type, value: Type): MapDef = MapDef(key, value)

}
Loading

0 comments on commit 286a160

Please sign in to comment.