Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read/write parquet files using ZIO Schema derivation (PoC) #1

Merged
merged 25 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 231 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,231 @@
# zio-apache-parquet
![Build status](https://github.com/grouzen/zio-apache-parquet/actions/workflows/ci.yml/badge.svg)
![Sonatype Nexus (Releases)](https://img.shields.io/nexus/r/me.mnedokushev/zio-apache-parquet-core_2.13?server=https%3A%2F%2Foss.sonatype.org)
![Sonatype Nexus (Snapshots)](https://img.shields.io/nexus/s/me.mnedokushev/zio-apache-parquet-core_2.13?server=https%3A%2F%2Foss.sonatype.org)
[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=)](https://scala-steward.org)

# ZIO Apache Parquet

ZIO based wrapper for [Apache Parquet Java implementation](https://github.com/apache/parquet-mr) that
leverages [ZIO Schema](https://zio.dev/zio-schema/) to derive codecs

# Overview

## Installation

```scala
libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION@"
```

## Codecs

To be able to write/read data to/from parquet files you need to define the following schema and value codecs
`SchemaEncoder`, `ValueEncoder`, and `ValueDecoder` for your case classes.

### Schema

You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver:

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
}

import MyRecord._

val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)

println(parquetSchema)
// Outputs:
// required group my_record {
// required int32 a (INTEGER(32,true));
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
```

Alternatively, you can override the schemas of some fields in your record by defining a custom `SchemaEncoder`
and using `SchemaEncoderDeriver.summoned` deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val intEncoder: SchemaEncoder[Int] = new SchemaEncoder[Int] {
override def encode(schema: Schema[Int], name: String, optional: Boolean) =
Schemas.uuid.optionality(optional).named(name)
}
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned)
}

import MyRecord._

val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false)

println(parquetSchema)
// Outputs:
// required group my_record {
// required fixed_len_byte_array(16) a (UUID);
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
```

### Value

There is a sealed hierarchy of `Value` types for interop between Scala values and Parquet readers/writers.
For converting Scala values into `Value` and back we need to define instances of `ValueEncoder` and `ValueDecoder`
type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}

import MyRecord._

val value = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)

println(value)
// Outputs:
// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
```

Same as for `SchemaEncoder`, you can override the schemas of some fields in your record by defining custom
`ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._

import java.nio.charset.StandardCharsets

case class MyRecord(a: Int, b: String, c: Option[Long])

object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val intEncoder: ValueEncoder[Int] = new ValueEncoder[Int] {
override def encode(value: Int): Value =
Value.string(value.toString)
}
implicit val intDecoder: ValueDecoder[Int] = new ValueDecoder[Int] {
override def decode(value: Value): Int =
value match {
case Value.PrimitiveValue.BinaryValue(v) =>
new String(v.getBytes, StandardCharsets.UTF_8).toInt
case other =>
throw DecoderError(s"Wrong value: $other")
}
}
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned)
}

import MyRecord._

val value = encoder.encode(MyRecord(3, "zio", None))
val record = decoder.decode(value)

println(value)
// Outputs:
// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
```

## Reading/Writing files

Finally, to perform some IO operations we need to initialize `ParquetWriter` and `ParquetReader`.

```scala
//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1

import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path }
import zio._

import java.nio.file.Files

case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
implicit val schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
implicit val schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
implicit val encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
implicit val decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
}

val data =
Chunk(
MyRecord(1, "first", Some(11)),
MyRecord(3, "third", None)
)

val tmpDir = Path(Files.createTempDirectory("records"))
val recordsFile = tmpDir / "records.parquet"

Unsafe.unsafe { implicit unsafe =>
Runtime.default.unsafe
.run(
(for {
writer <- ZIO.service[ParquetWriter[MyRecord]]
reader <- ZIO.service[ParquetReader[MyRecord]]
_ <- writer.write(data)
_ <- writer.close // force to flush parquet data on disk
fromFile <- ZIO.scoped(reader.read(recordsFile).runCollect)
_ <- Console.printLine(fromFile)
} yield ()).provide(
ParquetWriter.configured[MyRecord](recordsFile),
ParquetReader.configured[MyRecord]()
)
)
.getOrThrowFiberFailure()
}
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))
```
17 changes: 1 addition & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ inThisBuild(
lazy val root =
project
.in(file("."))
.aggregate(core, docs)
.aggregate(core)
.settings(publish / skip := true)

lazy val core =
Expand All @@ -49,18 +49,3 @@ lazy val core =
libraryDependencies ++= Dep.core,
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
)

lazy val docs =
project
.in(file("docs"))
.dependsOn(core)
.settings(
name := "zio-apache-parquet-docs",
organization := "me.mnedokushev",
publish / skip := true,
mdocIn := file("docs/src/main/mdoc"),
mdocVariables := Map(
"VERSION" -> version.value
)
)
.enablePlugins(MdocPlugin)
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package me.mnedokushev.zio.apache.parquet.core

import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema._
import zio.Chunk

object Schemas {

abstract class Def[Self <: Def[_]] {

def named(name: String): Type

def optionality(condition: Boolean): Self =
if (condition) optional else required

def required: Self

def optional: Self

}

case class PrimitiveDef(
typeName: PrimitiveTypeName,
annotation: LogicalTypeAnnotation,
isOptional: Boolean = false,
length: Int = 0
) extends Def[PrimitiveDef] {

def named(name: String): Type =
Types
.primitive(typeName, repetition(isOptional))
.as(annotation)
.length(length)
.named(name)

def length(len: Int): PrimitiveDef =
this.copy(length = len)

def required: PrimitiveDef =
this.copy(isOptional = false)

def optional: PrimitiveDef =
this.copy(isOptional = true)

}

case class RecordDef(fields: Chunk[Type], isOptional: Boolean = false) extends Def[RecordDef] {

def named(name: String): Type = {
val builder = Types.buildGroup(repetition(isOptional))

fields.foreach(builder.addField)
builder.named(name)
}

def required: RecordDef =
this.copy(isOptional = false)

def optional: RecordDef =
this.copy(isOptional = true)

}

case class ListDef(
element: Type,
isOptional: Boolean = false
) extends Def[ListDef] {

def named(name: String): Type =
Types
.list(repetition(isOptional))
.element(element)
.named(name)

def required: ListDef =
this.copy(isOptional = false)

def optional: ListDef =
this.copy(isOptional = true)

}

case class MapDef(key: Type, value: Type, isOptional: Boolean = false) extends Def[MapDef] {

override def named(name: String): Type =
Types
.map(repetition(isOptional))
.key(key)
.value(value)
.named(name)

override def required: MapDef =
this.copy(isOptional = false)

override def optional: MapDef =
this.copy(isOptional = true)

}

def repetition(optional: Boolean): Repetition =
if (optional) Repetition.OPTIONAL else Repetition.REQUIRED

import PrimitiveTypeName._
import LogicalTypeAnnotation._

val string: PrimitiveDef = PrimitiveDef(BINARY, stringType())
val boolean: PrimitiveDef = PrimitiveDef(INT32, intType(8, false))
val byte: PrimitiveDef = PrimitiveDef(INT32, intType(8, false))
val short: PrimitiveDef = PrimitiveDef(INT32, intType(16, true))
val int: PrimitiveDef = PrimitiveDef(INT32, intType(32, true))
val long: PrimitiveDef = PrimitiveDef(INT64, intType(64, true))
val uuid: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY, uuidType()).length(16)

def record(fields: Chunk[Type]): RecordDef = RecordDef(fields)
def list(element: Type): ListDef = ListDef(element)
def map(key: Type, value: Type): MapDef = MapDef(key, value)

}
Loading