diff --git a/README.md b/README.md index f715783..de9c667 100644 --- a/README.md +++ b/README.md @@ -1 +1,231 @@ -# zio-apache-parquet \ No newline at end of file +![Build status](https://github.com/grouzen/zio-apache-parquet/actions/workflows/ci.yml/badge.svg) +![Sonatype Nexus (Releases)](https://img.shields.io/nexus/r/me.mnedokushev/zio-apache-parquet-core_2.13?server=https%3A%2F%2Foss.sonatype.org) +![Sonatype Nexus (Snapshots)](https://img.shields.io/nexus/s/me.mnedokushev/zio-apache-parquet-core_2.13?server=https%3A%2F%2Foss.sonatype.org) +[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org) + +# ZIO Apache Parquet + +ZIO based wrapper for [Apache Parquet Java implementation](https://github.com/apache/parquet-mr) that +leverages [ZIO Schema](https://zio.dev/zio-schema/) to derive codecs + +# Overview + +## Installation + +```scala +libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION@" +``` + +## Codecs + +To be able to write/read data to/from parquet files you need to define the following schema and value codecs +`SchemaEncoder`, `ValueEncoder`, and `ValueDecoder` for your case classes. + +### Schema + +You can get Java SDK's `Type` by using `SchemaEncoder` generated by `SchemaEncoderDeriver.default` ZIO Schema deriver: + +```scala +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1 + +import zio.schema._ +import me.mnedokushev.zio.apache.parquet.core.codec._ + +case class MyRecord(a: Int, b: String, c: Option[Long]) + +object MyRecord { + implicit val schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + implicit val schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) +} + +import MyRecord._ + +val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false) + +println(parquetSchema) +// Outputs: +// required group my_record { +// required int32 a (INTEGER(32,true)); +// required binary b (STRING); +// optional int64 c (INTEGER(64,true)); +// } +``` + +Alternatively, you can override the schemas of some fields in your record by defining a custom `SchemaEncoder` +and using `SchemaEncoderDeriver.summoned` deriver. + +```scala +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1 + +import me.mnedokushev.zio.apache.parquet.core.Schemas +import zio.schema._ +import me.mnedokushev.zio.apache.parquet.core.codec._ + +case class MyRecord(a: Int, b: String, c: Option[Long]) + +object MyRecord { + implicit val schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + implicit val intEncoder: SchemaEncoder[Int] = new SchemaEncoder[Int] { + override def encode(schema: Schema[Int], name: String, optional: Boolean) = + Schemas.uuid.optionality(optional).named(name) + } + implicit val schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned) +} + +import MyRecord._ + +val parquetSchema = schemaEncoder.encode(schema, "my_record", optional = false) + +println(parquetSchema) +// Outputs: +// required group my_record { +// required fixed_len_byte_array(16) a (UUID); +// required binary b (STRING); +// optional int64 c (INTEGER(64,true)); +// } +``` + +### Value + +There is a sealed hierarchy of `Value` types for interop between Scala values and Parquet readers/writers. +For converting Scala values into `Value` and back we need to define instances of `ValueEncoder` and `ValueDecoder` +type classes. This could be done by using `ValueDecoderDeriver.default` ZIO Schema deriver. + +```scala +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1 + +import zio.schema._ +import me.mnedokushev.zio.apache.parquet.core.codec._ + +case class MyRecord(a: Int, b: String, c: Option[Long]) + +object MyRecord { + implicit val schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + implicit val encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + implicit val decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) +} + +import MyRecord._ + +val value = encoder.encode(MyRecord(3, "zio", None)) +val record = decoder.decode(value) + +println(value) +// Outputs: +// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) +println(record) +// Outputs: +// MyRecord(3,zio,None) +``` + +Same as for `SchemaEncoder`, you can override the schemas of some fields in your record by defining custom +`ValueEncoder`/`ValueDecoder` and using `ValueEncoderDeriver.summoned`/`ValueDecoderDeriver.summoned` derivers accordingly. + +```scala +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1 + +import me.mnedokushev.zio.apache.parquet.core.Value +import zio.schema._ +import me.mnedokushev.zio.apache.parquet.core.codec._ + +import java.nio.charset.StandardCharsets + +case class MyRecord(a: Int, b: String, c: Option[Long]) + +object MyRecord { + implicit val schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + implicit val intEncoder: ValueEncoder[Int] = new ValueEncoder[Int] { + override def encode(value: Int): Value = + Value.string(value.toString) + } + implicit val intDecoder: ValueDecoder[Int] = new ValueDecoder[Int] { + override def decode(value: Value): Int = + value match { + case Value.PrimitiveValue.BinaryValue(v) => + new String(v.getBytes, StandardCharsets.UTF_8).toInt + case other => + throw DecoderError(s"Wrong value: $other") + } + } + implicit val encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned) + implicit val decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned) +} + +import MyRecord._ + +val value = encoder.encode(MyRecord(3, "zio", None)) +val record = decoder.decode(value) + +println(value) +// Outputs: +// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue)) +println(record) +// Outputs: +// MyRecord(3,zio,None) +``` + +## Reading/Writing files + +Finally, to perform some IO operations we need to initialize `ParquetWriter` and `ParquetReader`. + +```scala +//> using dep me.mnedokushev::zio-apache-parquet-core:0.0.1 + +import zio.schema._ +import me.mnedokushev.zio.apache.parquet.core.codec._ +import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path } +import zio._ + +import java.nio.file.Files + +case class MyRecord(a: Int, b: String, c: Option[Long]) +object MyRecord { + implicit val schema: Schema[MyRecord] = + DeriveSchema.gen[MyRecord] + implicit val schemaEncoder: SchemaEncoder[MyRecord] = + Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default) + implicit val encoder: ValueEncoder[MyRecord] = + Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default) + implicit val decoder: ValueDecoder[MyRecord] = + Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default) +} + +val data = + Chunk( + MyRecord(1, "first", Some(11)), + MyRecord(3, "third", None) + ) + +val tmpDir = Path(Files.createTempDirectory("records")) +val recordsFile = tmpDir / "records.parquet" + +Unsafe.unsafe { implicit unsafe => + Runtime.default.unsafe + .run( + (for { + writer <- ZIO.service[ParquetWriter[MyRecord]] + reader <- ZIO.service[ParquetReader[MyRecord]] + _ <- writer.write(data) + _ <- writer.close // force to flush parquet data on disk + fromFile <- ZIO.scoped(reader.read(recordsFile).runCollect) + _ <- Console.printLine(fromFile) + } yield ()).provide( + ParquetWriter.configured[MyRecord](recordsFile), + ParquetReader.configured[MyRecord]() + ) + ) + .getOrThrowFiberFailure() +} +// Outputs: +// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None)) +``` \ No newline at end of file diff --git a/build.sbt b/build.sbt index f9cd584..ef12cd0 100644 --- a/build.sbt +++ b/build.sbt @@ -38,7 +38,7 @@ inThisBuild( lazy val root = project .in(file(".")) - .aggregate(core, docs) + .aggregate(core) .settings(publish / skip := true) lazy val core = @@ -49,18 +49,3 @@ lazy val core = libraryDependencies ++= Dep.core, testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework") ) - -lazy val docs = - project - .in(file("docs")) - .dependsOn(core) - .settings( - name := "zio-apache-parquet-docs", - organization := "me.mnedokushev", - publish / skip := true, - mdocIn := file("docs/src/main/mdoc"), - mdocVariables := Map( - "VERSION" -> version.value - ) - ) - .enablePlugins(MdocPlugin) diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala new file mode 100644 index 0000000..ec6e3b6 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala @@ -0,0 +1,119 @@ +package me.mnedokushev.zio.apache.parquet.core + +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema._ +import zio.Chunk + +object Schemas { + + abstract class Def[Self <: Def[_]] { + + def named(name: String): Type + + def optionality(condition: Boolean): Self = + if (condition) optional else required + + def required: Self + + def optional: Self + + } + + case class PrimitiveDef( + typeName: PrimitiveTypeName, + annotation: LogicalTypeAnnotation, + isOptional: Boolean = false, + length: Int = 0 + ) extends Def[PrimitiveDef] { + + def named(name: String): Type = + Types + .primitive(typeName, repetition(isOptional)) + .as(annotation) + .length(length) + .named(name) + + def length(len: Int): PrimitiveDef = + this.copy(length = len) + + def required: PrimitiveDef = + this.copy(isOptional = false) + + def optional: PrimitiveDef = + this.copy(isOptional = true) + + } + + case class RecordDef(fields: Chunk[Type], isOptional: Boolean = false) extends Def[RecordDef] { + + def named(name: String): Type = { + val builder = Types.buildGroup(repetition(isOptional)) + + fields.foreach(builder.addField) + builder.named(name) + } + + def required: RecordDef = + this.copy(isOptional = false) + + def optional: RecordDef = + this.copy(isOptional = true) + + } + + case class ListDef( + element: Type, + isOptional: Boolean = false + ) extends Def[ListDef] { + + def named(name: String): Type = + Types + .list(repetition(isOptional)) + .element(element) + .named(name) + + def required: ListDef = + this.copy(isOptional = false) + + def optional: ListDef = + this.copy(isOptional = true) + + } + + case class MapDef(key: Type, value: Type, isOptional: Boolean = false) extends Def[MapDef] { + + override def named(name: String): Type = + Types + .map(repetition(isOptional)) + .key(key) + .value(value) + .named(name) + + override def required: MapDef = + this.copy(isOptional = false) + + override def optional: MapDef = + this.copy(isOptional = true) + + } + + def repetition(optional: Boolean): Repetition = + if (optional) Repetition.OPTIONAL else Repetition.REQUIRED + + import PrimitiveTypeName._ + import LogicalTypeAnnotation._ + + val string: PrimitiveDef = PrimitiveDef(BINARY, stringType()) + val boolean: PrimitiveDef = PrimitiveDef(INT32, intType(8, false)) + val byte: PrimitiveDef = PrimitiveDef(INT32, intType(8, false)) + val short: PrimitiveDef = PrimitiveDef(INT32, intType(16, true)) + val int: PrimitiveDef = PrimitiveDef(INT32, intType(32, true)) + val long: PrimitiveDef = PrimitiveDef(INT64, intType(64, true)) + val uuid: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY, uuidType()).length(16) + + def record(fields: Chunk[Type]): RecordDef = RecordDef(fields) + def list(element: Type): ListDef = ListDef(element) + def map(key: Type, value: Type): MapDef = MapDef(key, value) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala new file mode 100644 index 0000000..9b088f5 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala @@ -0,0 +1,220 @@ +package me.mnedokushev.zio.apache.parquet.core + +import org.apache.parquet.io.api.{ Binary, RecordConsumer } +import org.apache.parquet.schema.Type +import zio.Chunk + +import java.nio.ByteBuffer +import java.util.UUID + +sealed trait Value { + def write(schema: Type, recordConsumer: RecordConsumer): Unit +} + +object Value { + + case object NullValue extends Value { + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + throw new UnsupportedOperationException(s"NullValue cannot be written") + } + + sealed trait PrimitiveValue[A] extends Value { + def value: A + } + + object PrimitiveValue { + + case class BooleanValue(value: Boolean) extends PrimitiveValue[Boolean] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + recordConsumer.addBoolean(value) + + } + + case class Int32Value(value: Int) extends PrimitiveValue[Int] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + recordConsumer.addInteger(value) + + } + + case class Int64Value(value: Long) extends PrimitiveValue[Long] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + recordConsumer.addLong(value) + + } + + case class FloatValue(value: Float) extends PrimitiveValue[Float] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + recordConsumer.addFloat(value) + + } + + case class DoubleValue(value: Double) extends PrimitiveValue[Double] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + recordConsumer.addDouble(value) + + } + + case class BinaryValue(value: Binary) extends PrimitiveValue[Binary] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = + recordConsumer.addBinary(value) + + } + + } + + sealed trait GroupValue[Self <: GroupValue[Self]] extends Value { + + def put(name: String, value: Value): Self + + } + + object GroupValue { + + case class RecordValue(values: Map[String, Value]) extends GroupValue[RecordValue] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = { + val groupSchema = schema.asGroupType() + + recordConsumer.startGroup() + + values.foreach { case (name, value) => + val fieldIndex = groupSchema.getFieldIndex(name) + val fieldType = groupSchema.getType(name) + + recordConsumer.startField(name, fieldIndex) + value.write(fieldType, recordConsumer) + recordConsumer.endField(name, fieldIndex) + } + + recordConsumer.endGroup() + } + + override def put(name: String, value: Value): RecordValue = + if (values.contains(name)) + this.copy(values.updated(name, value)) + else + throw new IllegalArgumentException(s"Record doesn't contain field $name") + + } + + case class ListValue(values: Chunk[Value]) extends GroupValue[ListValue] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = { + recordConsumer.startGroup() + + if (values.nonEmpty) { + val groupSchema = schema.asGroupType() + val listSchema = groupSchema.getFields.get(0).asGroupType() + val listFieldName = listSchema.getName + val elementName = listSchema.getFields.get(0).getName // TODO: validate, must be "element" + val listIndex = groupSchema.getFieldIndex(listFieldName) + + recordConsumer.startField(listFieldName, listIndex) + + values.foreach { value => + RecordValue(Map(elementName -> value)).write(listSchema, recordConsumer) + } + + recordConsumer.endField(listFieldName, listIndex) + } + + recordConsumer.endGroup() + } + + override def put(name: String, value: Value): ListValue = + this.copy(values = values :+ value) + + } + + case class MapValue(values: Map[Value, Value]) extends GroupValue[MapValue] { + + override def write(schema: Type, recordConsumer: RecordConsumer): Unit = { + recordConsumer.startGroup() + + if (values.nonEmpty) { + val groupSchema = schema.asGroupType() + val mapSchema = groupSchema.getFields.get(0).asGroupType() + val mapFieldName = mapSchema.getName + val mapIndex = groupSchema.getFieldIndex(mapFieldName) + + recordConsumer.startField(mapFieldName, mapIndex) + + values.foreach { case (key, value) => + RecordValue(Map("key" -> key, "value" -> value)).write(mapSchema, recordConsumer) + } + + recordConsumer.endField(mapFieldName, mapIndex) + } + + recordConsumer.endGroup() + } + + override def put(name: String, value: Value): MapValue = + value match { + case RecordValue(values0) => + (values0.get("key"), values0.get("value")) match { + case (Some(k), Some(v)) => + this.copy(values = values.updated(k, v)) + case _ => this + } + case mv: MapValue => mv + case _ => this + } + } + + } + + def nil = + NullValue + + def string(v: String) = + PrimitiveValue.BinaryValue(Binary.fromString(v)) + + def boolean(v: Boolean) = + PrimitiveValue.BooleanValue(v) + + def short(v: Short) = + PrimitiveValue.Int32Value(v.toInt) + + def int(v: Int) = + PrimitiveValue.Int32Value(v) + + def long(v: Long) = + PrimitiveValue.Int64Value(v) + + def float(v: Float) = + PrimitiveValue.FloatValue(v) + + def double(v: Double) = + PrimitiveValue.DoubleValue(v) + + def binary(v: Chunk[Byte]) = + PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(v.toArray)) + + def char(v: Char) = + PrimitiveValue.Int32Value(v.toInt) + + def uuid(v: UUID) = { + val bb = ByteBuffer.wrap(Array.ofDim(16)) + + bb.putLong(v.getMostSignificantBits) + bb.putLong(v.getLeastSignificantBits) + + PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(bb.array())) + } + + def record(r: Map[String, Value]) = + GroupValue.RecordValue(r) + + def list(vs: Chunk[Value]) = + GroupValue.ListValue(vs) + + def map(kvs: Map[Value, Value]) = + GroupValue.MapValue(kvs) +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/DecoderError.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/DecoderError.scala new file mode 100644 index 0000000..690102b --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/DecoderError.scala @@ -0,0 +1,8 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import java.io.IOException + +final case class DecoderError( + message: String, + cause: Option[Throwable] = None +) extends IOException(message, cause.getOrElse(new Throwable())) diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/EncoderError.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/EncoderError.scala new file mode 100644 index 0000000..40a17fc --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/EncoderError.scala @@ -0,0 +1,8 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import java.io.IOException + +final case class EncoderError( + message: String, + cause: Option[Throwable] = None +) extends IOException(message, cause.getOrElse(new Throwable())) diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala new file mode 100644 index 0000000..84d3c8f --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoder.scala @@ -0,0 +1,14 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import org.apache.parquet.schema.Type +import zio._ +import zio.schema._ + +trait SchemaEncoder[A] { + + def encode(schema: Schema[A], name: String, optional: Boolean): Type + + def encodeZIO(schema: Schema[A], name: String, optional: Boolean): Task[Type] = + ZIO.attempt(encode(schema, name, optional)) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoderDeriver.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoderDeriver.scala new file mode 100644 index 0000000..2b972d6 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoderDeriver.scala @@ -0,0 +1,115 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import me.mnedokushev.zio.apache.parquet.core.Schemas +import org.apache.parquet.schema.Type +import zio.Chunk +import zio.schema.{ Deriver, Schema, StandardType } + +object SchemaEncoderDeriver { + + val default: Deriver[SchemaEncoder] = new Deriver[SchemaEncoder] { + + override def deriveRecord[A]( + record: Schema.Record[A], + fields: => Chunk[Deriver.WrappedF[SchemaEncoder, _]], + summoned: => Option[SchemaEncoder[A]] + ): SchemaEncoder[A] = new SchemaEncoder[A] { + + private def enc[A1](name0: String, schema0: Schema[A1], encoder: SchemaEncoder[_]) = + encoder.asInstanceOf[SchemaEncoder[A1]].encode(schema0, name0, isSchemaOptional(schema0)) + + override def encode(schema: Schema[A], name: String, optional: Boolean): Type = { + val fieldTypes = record.fields.zip(fields.map(_.unwrap)).map { case (field, encoder) => + enc(field.name, field.schema, encoder) + } + + Schemas.record(fieldTypes).optionality(optional).named(name) + } + } + + override def deriveEnum[A]( + `enum`: Schema.Enum[A], + cases: => Chunk[Deriver.WrappedF[SchemaEncoder, _]], + summoned: => Option[SchemaEncoder[A]] + ): SchemaEncoder[A] = ??? + + override def derivePrimitive[A]( + st: StandardType[A], + summoned: => Option[SchemaEncoder[A]] + ): SchemaEncoder[A] = + new SchemaEncoder[A] { + override def encode(schema: Schema[A], name: String, optional: Boolean): Type = + st match { + case StandardType.StringType => + Schemas.string.optionality(optional).named(name) + case StandardType.BoolType => + Schemas.boolean.optionality(optional).named(name) + case StandardType.ByteType => + Schemas.byte.optionality(optional).named(name) + case StandardType.ShortType => + Schemas.short.optionality(optional).named(name) + case StandardType.IntType => + Schemas.int.optionality(optional).named(name) + case StandardType.LongType => + Schemas.long.optionality(optional).named(name) + // TODO: add the other types + case StandardType.UUIDType => + Schemas.uuid.optionality(optional).named(name) + case _ => ??? + } + } + + override def deriveOption[A]( + option: Schema.Optional[A], + inner: => SchemaEncoder[A], + summoned: => Option[SchemaEncoder[Option[A]]] + ): SchemaEncoder[Option[A]] = new SchemaEncoder[Option[A]] { + override def encode(schema: Schema[Option[A]], name: String, optional: Boolean): Type = + inner.encode(option.schema, name, optional = true) + } + + override def deriveSequence[C[_], A]( + sequence: Schema.Sequence[C[A], A, _], + inner: => SchemaEncoder[A], + summoned: => Option[SchemaEncoder[C[A]]] + ): SchemaEncoder[C[A]] = new SchemaEncoder[C[A]] { + override def encode(schema: Schema[C[A]], name: String, optional: Boolean): Type = + Schemas + .list(inner.encode(sequence.elementSchema, "element", isSchemaOptional(sequence.elementSchema))) + .optionality(optional) + .named(name) + } + + override def deriveMap[K, V]( + map: Schema.Map[K, V], + key: => SchemaEncoder[K], + value: => SchemaEncoder[V], + summoned: => Option[SchemaEncoder[Map[K, V]]] + ): SchemaEncoder[Map[K, V]] = new SchemaEncoder[Map[K, V]] { + override def encode(schema: Schema[Map[K, V]], name: String, optional: Boolean): Type = + Schemas + .map( + key.encode(map.keySchema, "key", optional = false), + value.encode(map.valueSchema, "value", optional = isSchemaOptional(map.valueSchema)) + ) + .optionality(optional) + .named(name) + } + + override def deriveTransformedRecord[A, B]( + record: Schema.Record[A], + transform: Schema.Transform[A, B, _], + fields: => Chunk[Deriver.WrappedF[SchemaEncoder, _]], + summoned: => Option[SchemaEncoder[B]] + ): SchemaEncoder[B] = ??? + + }.cached + + val summoned: Deriver[SchemaEncoder] = default.autoAcceptSummoned + + private def isSchemaOptional(schema: Schema[_]): Boolean = + schema match { + case _: Schema.Optional[_] => true + case _ => false + } +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoder.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoder.scala new file mode 100644 index 0000000..06d3afd --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoder.scala @@ -0,0 +1,13 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import me.mnedokushev.zio.apache.parquet.core.Value +import zio._ + +trait ValueDecoder[+A] { + + def decode(value: Value): A + + def decodeZIO(value: Value): Task[A] = + ZIO.attempt(decode(value)) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala new file mode 100644 index 0000000..f057efa --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala @@ -0,0 +1,144 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import me.mnedokushev.zio.apache.parquet.core.Value +import me.mnedokushev.zio.apache.parquet.core.Value.{ GroupValue, PrimitiveValue } +import zio._ +import zio.schema._ + +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.UUID + +object ValueDecoderDeriver { + + val default: Deriver[ValueDecoder] = new Deriver[ValueDecoder] { + + override def deriveRecord[A]( + record: Schema.Record[A], + fields: => Chunk[Deriver.WrappedF[ValueDecoder, _]], + summoned: => Option[ValueDecoder[A]] + ): ValueDecoder[A] = new ValueDecoder[A] { + override def decode(value: Value): A = + value match { + case GroupValue.RecordValue(values) => + Unsafe.unsafe { implicit unsafe => + record.construct( + Chunk + .fromIterable(record.fields.map(f => values(f.name))) + .zip(fields.map(_.unwrap)) + .map { case (v, decoder) => + decoder.decode(v) + } + ) match { + case Right(v) => + v + case Left(reason) => + throw DecoderError(s"Couldn't decode $value: $reason") + } + } + + case other => + throw DecoderError(s"Couldn't decode $other, it must be of type RecordValue") + } + + } + + override def deriveEnum[A]( + `enum`: Schema.Enum[A], + cases: => Chunk[Deriver.WrappedF[ValueDecoder, _]], + summoned: => Option[ValueDecoder[A]] + ): ValueDecoder[A] = ??? + + override def derivePrimitive[A]( + st: StandardType[A], + summoned: => Option[ValueDecoder[A]] + ): ValueDecoder[A] = new ValueDecoder[A] { + override def decode(value: Value): A = + (st, value) match { + case (StandardType.StringType, PrimitiveValue.BinaryValue(v)) => + new String(v.getBytes, StandardCharsets.UTF_8) + case (StandardType.BoolType, PrimitiveValue.BooleanValue(v)) => + v + case (StandardType.ByteType, PrimitiveValue.Int32Value(v)) => + v.toByte + case (StandardType.ShortType, PrimitiveValue.Int32Value(v)) => + v.toShort + case (StandardType.IntType, PrimitiveValue.Int32Value(v)) => + v + case (StandardType.LongType, PrimitiveValue.Int64Value(v)) => + v + case (StandardType.FloatType, PrimitiveValue.FloatValue(v)) => + v + case (StandardType.DoubleType, PrimitiveValue.DoubleValue(v)) => + v + case (StandardType.BinaryType, PrimitiveValue.BinaryValue(v)) => + Chunk.fromArray(v.getBytes) + case (StandardType.CharType, PrimitiveValue.Int32Value(v)) => + v.toChar + case (StandardType.UUIDType, PrimitiveValue.BinaryValue(v)) => + val bb = ByteBuffer.wrap(v.getBytes) + new UUID(bb.getLong, bb.getLong) + case (other, _) => + throw DecoderError(s"Unsupported ZIO Schema StandartType $other") + } + } + + override def deriveOption[A]( + option: Schema.Optional[A], + inner: => ValueDecoder[A], + summoned: => Option[ValueDecoder[Option[A]]] + ): ValueDecoder[Option[A]] = new ValueDecoder[Option[A]] { + override def decode(value: Value): Option[A] = + value match { + case Value.NullValue => + None + case _ => + Some(inner.decode(value)) + } + + } + + override def deriveSequence[C[_], A]( + sequence: Schema.Sequence[C[A], A, _], + inner: => ValueDecoder[A], + summoned: => Option[ValueDecoder[C[A]]] + ): ValueDecoder[C[A]] = new ValueDecoder[C[A]] { + override def decode(value: Value): C[A] = + value match { + case GroupValue.ListValue(values) => + sequence.fromChunk(values.map(inner.decode)) + case other => + throw DecoderError(s"Couldn't decode $other, it must be of type ListValue") + } + } + + override def deriveMap[K, V]( + map: Schema.Map[K, V], + key: => ValueDecoder[K], + value: => ValueDecoder[V], + summoned: => Option[ValueDecoder[Map[K, V]]] + ): ValueDecoder[Map[K, V]] = new ValueDecoder[Map[K, V]] { + override def decode(value0: Value): Map[K, V] = + value0 match { + case GroupValue.MapValue(values) => + values.map { case (k, v) => + key.decode(k) -> value.decode(v) + } + case other => + throw DecoderError(s"Couldn't decode $other, it must be of type MapValue") + } + } + + override def deriveTransformedRecord[A, B]( + record: Schema.Record[A], + transform: Schema.Transform[A, B, _], + fields: => Chunk[Deriver.WrappedF[ValueDecoder, _]], + summoned: => Option[ValueDecoder[B]] + ): ValueDecoder[B] = ??? + + }.cached + + def summoned: Deriver[ValueDecoder] = + default.autoAcceptSummoned + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueEncoder.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueEncoder.scala new file mode 100644 index 0000000..73203e6 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueEncoder.scala @@ -0,0 +1,13 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import me.mnedokushev.zio.apache.parquet.core.Value +import zio._ + +trait ValueEncoder[-A] { + + def encode(value: A): Value + + def encodeZIO(value: A): Task[Value] = + ZIO.attemptBlocking(encode(value)) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueEncoderDeriver.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueEncoderDeriver.scala new file mode 100644 index 0000000..6f2abc2 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueEncoderDeriver.scala @@ -0,0 +1,120 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import me.mnedokushev.zio.apache.parquet.core.Value +import zio.Chunk +import zio.schema.{ Deriver, Schema, StandardType } + +import java.util.UUID + +object ValueEncoderDeriver { + + val default: Deriver[ValueEncoder] = new Deriver[ValueEncoder] { + + override def deriveRecord[A]( + record: Schema.Record[A], + fields: => Chunk[Deriver.WrappedF[ValueEncoder, _]], + summoned: => Option[ValueEncoder[A]] + ): ValueEncoder[A] = new ValueEncoder[A] { + + private def enc[A1](v: A, field: Schema.Field[A, A1], encoder: ValueEncoder[_]) = + encoder.asInstanceOf[ValueEncoder[A1]].encode(field.get(v)) + + override def encode(value: A): Value = + Value.record( + record.fields + .zip(fields.map(_.unwrap)) + .map { case (field, encoder) => + field.name -> enc(value, field, encoder) + } + .toMap + ) + } + + override def deriveEnum[A]( + `enum`: Schema.Enum[A], + cases: => Chunk[Deriver.WrappedF[ValueEncoder, _]], + summoned: => Option[ValueEncoder[A]] + ): ValueEncoder[A] = ??? + + override def derivePrimitive[A]( + st: StandardType[A], + summoned: => Option[ValueEncoder[A]] + ): ValueEncoder[A] = + new ValueEncoder[A] { + override def encode(value: A): Value = + (st, value) match { + case (StandardType.StringType, v: String) => + Value.string(v) + case (StandardType.BoolType, v: Boolean) => + Value.boolean(v) + case (StandardType.ByteType, v: Byte) => + Value.int(v.toInt) + case (StandardType.ShortType, v: Short) => + Value.short(v) + case (StandardType.IntType, v: Int) => + Value.int(v) + case (StandardType.LongType, v: Long) => + Value.long(v) + case (StandardType.FloatType, v: Float) => + Value.float(v) + case (StandardType.DoubleType, v: Double) => + Value.double(v) + case (StandardType.BinaryType, v: Chunk[_]) => + Value.binary(v.asInstanceOf[Chunk[Byte]]) + case (StandardType.CharType, v: Char) => + Value.char(v) + case (StandardType.UUIDType, v: UUID) => + Value.uuid(v) + case (other, _) => + throw EncoderError(s"Unsupported ZIO Schema StandardType $other") + } + } + + override def deriveOption[A]( + option: Schema.Optional[A], + inner: => ValueEncoder[A], + summoned: => Option[ValueEncoder[Option[A]]] + ): ValueEncoder[Option[A]] = + new ValueEncoder[Option[A]] { + override def encode(value: Option[A]): Value = + value match { + case Some(v) => inner.encode(v) + case _ => Value.nil + } + } + + override def deriveSequence[C[_], A]( + sequence: Schema.Sequence[C[A], A, _], + inner: => ValueEncoder[A], + summoned: => Option[ValueEncoder[C[A]]] + ): ValueEncoder[C[A]] = new ValueEncoder[C[A]] { + override def encode(value: C[A]): Value = + Value.list(sequence.toChunk(value).map(inner.encode)) + } + + override def deriveMap[K, V]( + map: Schema.Map[K, V], + key: => ValueEncoder[K], + value: => ValueEncoder[V], + summoned: => Option[ValueEncoder[Map[K, V]]] + ): ValueEncoder[Map[K, V]] = new ValueEncoder[Map[K, V]] { + override def encode(value0: Map[K, V]): Value = + Value.map( + value0.map { case (k, v) => + key.encode(k) -> value.encode(v) + } + ) + } + + override def deriveTransformedRecord[A, B]( + record: Schema.Record[A], + transform: Schema.Transform[A, B, _], + fields: => Chunk[Deriver.WrappedF[ValueEncoder, _]], + summoned: => Option[ValueEncoder[B]] + ): ValueEncoder[B] = ??? + + }.cached + + val summoned: Deriver[ValueEncoder] = default.autoAcceptSummoned + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala new file mode 100644 index 0000000..7923cac --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala @@ -0,0 +1,154 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value +import me.mnedokushev.zio.apache.parquet.core.Value.{ GroupValue, PrimitiveValue } +import org.apache.parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter } +import org.apache.parquet.schema.Type.Repetition +import org.apache.parquet.schema.{ GroupType, LogicalTypeAnnotation } +import zio.Chunk + +import scala.jdk.CollectionConverters._ + +abstract class GroupValueConverter[V <: GroupValue[V]]( + schema: GroupType, + parent: Option[GroupValueConverter[_]] = None +) extends GroupConverter { self => + + def get: V = + this.groupValue + + def put(name: String, value: Value): Unit = + this.groupValue = this.groupValue.put(name, value) + + protected var groupValue: V = _ + + private val converters: Chunk[Converter] = + Chunk.fromIterable( + schema.getFields.asScala.toList.map { schema0 => + val name = schema0.getName + + schema0.getLogicalTypeAnnotation match { + case _ if schema0.isPrimitive => + primitive(name) + case _: LogicalTypeAnnotation.ListLogicalTypeAnnotation => + list(schema0.asGroupType(), name) + case _: LogicalTypeAnnotation.MapLogicalTypeAnnotation => + map(schema0.asGroupType(), name) + case _ => + (name, schema0.getRepetition) match { + case ("list", Repetition.REPEATED) => + listElement(schema0.asGroupType()) + case ("key_value", Repetition.REPEATED) => + mapKeyValue(schema0.asGroupType(), name) + case _ => + record(schema0.asGroupType(), name) + } + } + } + ) + + override def getConverter(fieldIndex: Int): Converter = + converters(fieldIndex) + + private def primitive(name: String) = + new PrimitiveConverter { + + override def addBinary(value: Binary): Unit = + parent.getOrElse(self).put(name, PrimitiveValue.BinaryValue(value)) + + override def addBoolean(value: Boolean): Unit = + parent.getOrElse(self).put(name, PrimitiveValue.BooleanValue(value)) + + override def addDouble(value: Double): Unit = + parent.getOrElse(self).put(name, PrimitiveValue.DoubleValue(value)) + + override def addFloat(value: Float): Unit = + parent.getOrElse(self).put(name, PrimitiveValue.FloatValue(value)) + + override def addInt(value: Int): Unit = + parent.getOrElse(self).put(name, PrimitiveValue.Int32Value(value)) + + override def addLong(value: Long): Unit = + parent.getOrElse(self).put(name, PrimitiveValue.Int64Value(value)) + + } + + private def record( + schema: GroupType, + name: String + ): GroupValueConverter[GroupValue.RecordValue] = + new GroupValueConverter[GroupValue.RecordValue](schema, parent) { + + override def start(): Unit = + this.groupValue = Value.record(Map.empty) + + override def end(): Unit = + put(name, this.groupValue) + + } + + private def list( + schema: GroupType, + name: String + ): GroupValueConverter[GroupValue.ListValue] = + new GroupValueConverter[GroupValue.ListValue](schema) { + + override def start(): Unit = + this.groupValue = Value.list(Chunk.empty) + + override def end(): Unit = + self.put(name, this.groupValue) + } + + private def listElement(schema: GroupType): GroupValueConverter[GroupValue.RecordValue] = + new GroupValueConverter[GroupValue.RecordValue](schema, Some(self)) { + + override def start(): Unit = () + + override def end(): Unit = () + + } + + private def map( + schema: GroupType, + name: String + ): GroupValueConverter[GroupValue.MapValue] = + new GroupValueConverter[GroupValue.MapValue](schema) { + + override def start(): Unit = + this.groupValue = Value.map(Map.empty) + + override def end(): Unit = + self.put(name, this.groupValue) + } + + private def mapKeyValue( + schema: GroupType, + name: String + ): GroupValueConverter[GroupValue.RecordValue] = + new GroupValueConverter[GroupValue.RecordValue](schema) { + + override def start(): Unit = + this.groupValue = Value.record(Map("key" -> Value.nil, "value" -> Value.nil)) + + override def end(): Unit = + self.put(name, this.groupValue) + + } + +} + +object GroupValueConverter { + + def root(schema: GroupType): GroupValueConverter[GroupValue.RecordValue] = + new GroupValueConverter[GroupValue.RecordValue](schema) { + + override def start(): Unit = + this.groupValue = Value.record( + schema.getFields.asScala.toList.map(_.getName -> Value.nil).toMap + ) + + override def end(): Unit = () + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala new file mode 100644 index 0000000..fe3fdb5 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala @@ -0,0 +1,58 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import me.mnedokushev.zio.apache.parquet.core.codec.ValueDecoder +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.{ ReadSupport => HadoopReadSupport } +import org.apache.parquet.hadoop.{ ParquetReader => HadoopParquetReader } +import org.apache.parquet.io.InputFile +import zio._ +import zio.stream._ + +import scala.annotation.nowarn + +trait ParquetReader[+A <: Product] { + + def read(path: Path): ZStream[Scope, Throwable, A] + +} + +final class ParquetReaderLive[A <: Product](conf: Configuration)(implicit decoder: ValueDecoder[A]) + extends ParquetReader[A] { + + override def read(path: Path): ZStream[Scope, Throwable, A] = + for { + inputFile <- ZStream.fromZIO(ZIO.attemptBlockingIO(path.toInputFile(conf))) + reader <- ZStream.fromZIO( + ZIO.fromAutoCloseable( + ZIO.attemptBlockingIO( + new ParquetReader.Builder(inputFile).withConf(conf).build() + ) + ) + ) + value <- ZStream.repeatZIOOption( + ZIO + .attemptBlockingIO(reader.read()) + .asSomeError + .filterOrFail(_ != null)(None) + .flatMap(decoder.decodeZIO(_).asSomeError) + ) + } yield value + +} + +object ParquetReader { + + final class Builder(file: InputFile) extends HadoopParquetReader.Builder[RecordValue](file) { + + override def getReadSupport: HadoopReadSupport[RecordValue] = + new ReadSupport + + } + + def configured[A <: Product: ValueDecoder]( + hadoopConf: Configuration = new Configuration() + )(implicit @nowarn tag: Tag[A]): ULayer[ParquetReader[A]] = + ZLayer.succeed(new ParquetReaderLive[A](hadoopConf)) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala new file mode 100644 index 0000000..fe78778 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala @@ -0,0 +1,100 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import me.mnedokushev.zio.apache.parquet.core.codec.{ SchemaEncoder, ValueEncoder } +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.{ WriteSupport => HadoopWriteSupport } +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.util.HadoopOutputFile +import org.apache.parquet.hadoop.{ ParquetFileWriter, ParquetWriter => HadoopParquetWriter } +import org.apache.parquet.io.OutputFile +import org.apache.parquet.schema.{ MessageType, Type } +import zio._ +import zio.schema.Schema + +trait ParquetWriter[-A <: Product] { + + def write(data: Chunk[A]): Task[Unit] + + def close: Task[Unit] + +} + +final class ParquetWriterLive[A <: Product]( + underlying: HadoopParquetWriter[RecordValue] +)(implicit encoder: ValueEncoder[A]) + extends ParquetWriter[A] { + + override def write(data: Chunk[A]): Task[Unit] = + ZIO.foreachDiscard(data) { value => + for { + record <- encoder.encodeZIO(value) + _ <- ZIO.attemptBlockingIO(underlying.write(record.asInstanceOf[RecordValue])) + } yield () + } + + override def close: Task[Unit] = + ZIO.attemptBlockingIO(underlying.close()) + +} + +object ParquetWriter { + + final private class Builder(file: OutputFile, schema: MessageType) + extends HadoopParquetWriter.Builder[RecordValue, Builder](file) { + + override def self(): Builder = this + + override def getWriteSupport(conf: Configuration): HadoopWriteSupport[RecordValue] = + new WriteSupport(schema, Map.empty) + + } + + def configured[A <: Product: ValueEncoder]( + path: Path, + writeMode: ParquetFileWriter.Mode = ParquetFileWriter.Mode.CREATE, + compressionCodecName: CompressionCodecName = HadoopParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME, + dictionaryEncodingEnabled: Boolean = HadoopParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, + dictionaryPageSize: Int = HadoopParquetWriter.DEFAULT_PAGE_SIZE, + maxPaddingSize: Int = HadoopParquetWriter.MAX_PADDING_SIZE_DEFAULT, + pageSize: Int = HadoopParquetWriter.DEFAULT_PAGE_SIZE, + rowGroupSize: Long = HadoopParquetWriter.DEFAULT_BLOCK_SIZE, + validationEnabled: Boolean = HadoopParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, + hadoopConf: Configuration = new Configuration() + )(implicit + schema: Schema[A], + schemaEncoder: SchemaEncoder[A], + tag: Tag[A] + ): TaskLayer[ParquetWriter[A]] = { + + def castToMessageSchema(schema: Type) = + ZIO.attempt { + val groupSchema = schema.asGroupType() + val name = groupSchema.getName + val fields = groupSchema.getFields + + new MessageType(name, fields) + } + + ZLayer.scoped( + for { + schema <- schemaEncoder.encodeZIO(schema, tag.tag.shortName, optional = false) + messageSchema <- castToMessageSchema(schema) + hadoopFile <- ZIO.attemptBlockingIO(HadoopOutputFile.fromPath(path.toHadoop, hadoopConf)) + builder = new Builder(hadoopFile, messageSchema) + .withWriteMode(writeMode) + .withCompressionCodec(compressionCodecName) + .withDictionaryEncoding(dictionaryEncodingEnabled) + .withDictionaryPageSize(dictionaryPageSize) + .withMaxPaddingSize(maxPaddingSize) + .withPageSize(pageSize) + .withRowGroupSize(rowGroupSize) + .withValidation(validationEnabled) + .withConf(hadoopConf) + underlying <- ZIO.fromAutoCloseable(ZIO.attemptBlockingIO(builder.build())) + writer = new ParquetWriterLive[A](underlying) + } yield writer + ) + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala new file mode 100644 index 0000000..22ee182 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala @@ -0,0 +1,34 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{ Path => HadoopPath } +import org.apache.parquet.hadoop.util.HadoopInputFile + +import java.net.URI +import java.nio.file.{ Path => JPath, Paths } + +case class Path(underlying: HadoopPath) { + + def /(child: String): Path = + this.copy(underlying = new HadoopPath(underlying, child)) + + def /(child: JPath): Path = + this.copy(underlying = new HadoopPath(underlying, Path(child).underlying)) + + def toJava: JPath = + Paths.get(underlying.toUri) + + def toHadoop: HadoopPath = + underlying + + def toInputFile(conf: Configuration): HadoopInputFile = + HadoopInputFile.fromPath(underlying, conf) + +} + +object Path { + + def apply(path: JPath): Path = + Path(new HadoopPath(new URI("file", null, path.toAbsolutePath.toString, null, null))) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala new file mode 100644 index 0000000..b51a499 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala @@ -0,0 +1,34 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.{ InitContext, ReadSupport => HadoopReadSupport } +import org.apache.parquet.io.api.{ GroupConverter, RecordMaterializer } +import org.apache.parquet.schema.MessageType + +import java.util + +class ReadSupport extends HadoopReadSupport[RecordValue] { + + override def prepareForRead( + configuration: Configuration, + keyValueMetaData: util.Map[String, String], + fileSchema: MessageType, + readContext: HadoopReadSupport.ReadContext + ): RecordMaterializer[RecordValue] = new RecordMaterializer[RecordValue] { + + private val converter = + GroupValueConverter.root(fileSchema) + + override def getCurrentRecord: RecordValue = + converter.get + + override def getRootConverter: GroupConverter = + converter + + } + + override def init(context: InitContext): HadoopReadSupport.ReadContext = + new HadoopReadSupport.ReadContext(context.getFileSchema) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/WriteSupport.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/WriteSupport.scala new file mode 100644 index 0000000..03d3387 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/WriteSupport.scala @@ -0,0 +1,40 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import me.mnedokushev.zio.apache.parquet.core.Value.NullValue +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.{ WriteSupport => HadoopWriteSupport } +import org.apache.parquet.io.api.RecordConsumer +import org.apache.parquet.schema.MessageType + +import scala.jdk.CollectionConverters._ + +class WriteSupport(schema: MessageType, metadata: Map[String, String]) extends HadoopWriteSupport[RecordValue] { + + override def init(configuration: Configuration): HadoopWriteSupport.WriteContext = + new HadoopWriteSupport.WriteContext(schema, metadata.asJava) + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = + this.consumer = recordConsumer + + override def write(record: RecordValue): Unit = { + consumer.startMessage() + + record.values.foreach { + case (_, NullValue) => + () + case (name, value) => + val fieldIndex = schema.getFieldIndex(name) + val fieldType = schema.getType(fieldIndex) + + consumer.startField(name, fieldIndex) + value.write(fieldType, consumer) + consumer.endField(name, fieldIndex) + } + + consumer.endMessage() + } + + private var consumer: RecordConsumer = _ + +} diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoderDeriverSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoderDeriverSpec.scala new file mode 100644 index 0000000..ffd2b6a --- /dev/null +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/SchemaEncoderDeriverSpec.scala @@ -0,0 +1,202 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +import me.mnedokushev.zio.apache.parquet.core.Schemas +import me.mnedokushev.zio.apache.parquet.core.Schemas.PrimitiveDef +import zio._ +import zio.schema._ +import zio.test._ + +import java.util.UUID +//import scala.annotation.nowarn + +object SchemaEncoderDeriverSpec extends ZIOSpecDefault { + + case class Record(a: Int, b: Option[String]) + object Record { + implicit val schema: Schema[Record] = DeriveSchema.gen[Record] + } + + // Helper for being able to extract type parameter A from a given schema in order to cast the type of encoder< + private def encode[A](encoder: SchemaEncoder[_], schema: Schema[A], name: String, optional: Boolean) = + encoder.asInstanceOf[SchemaEncoder[A]].encode(schema, name, optional) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("SchemaEncoderDeriverSpec")( + test("primitive") { + def named(defs: List[PrimitiveDef], names: List[String]) = + defs.zip(names).map { case (schemaDef, name) => + schemaDef.named(name) + } + + val encoders: List[SchemaEncoder[_]] = + List( + Derive.derive[SchemaEncoder, String](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, Boolean](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, Byte](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, Short](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, Int](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, Long](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, UUID](SchemaEncoderDeriver.default) + ) + val schemas: List[Schema[_]] = + List( + Schema.primitive[String], + Schema.primitive[Boolean], + Schema.primitive[Byte], + Schema.primitive[Short], + Schema.primitive[Int], + Schema.primitive[Long], + Schema.primitive[UUID] + ) + val names = + List( + "string", + "boolean", + "byte", + "short", + "int", + "long", + "uuid" + ) + val schemaDefs = List( + Schemas.string, + Schemas.boolean, + Schemas.byte, + Schemas.short, + Schemas.int, + Schemas.long, + Schemas.uuid + ) + val optionalDefs = + schemaDefs.map(_.optional) + val requiredDefs = + schemaDefs.map(_.required) + + val expectedOptional = named(optionalDefs, names) + val expectedRequired = named(requiredDefs, names) + + encoders + .zip(schemas) + .zip(names) + .zip(expectedOptional) + .zip(expectedRequired) + .map { case ((((encoder, schema), name), expOptional), expRequired) => + val tpeOptional = encode(encoder, schema, name, optional = true) + val tpeRequired = encode(encoder, schema, name, optional = false) + + assertTrue(tpeOptional == expOptional, tpeRequired == expRequired) + } + .reduce(_ && _) + }, + test("record") { + val name = "record" + val encoder = Derive.derive[SchemaEncoder, Record](SchemaEncoderDeriver.default) + val tpeOptional = encoder.encode(Record.schema, name, optional = true) + val tpeRequired = encoder.encode(Record.schema, name, optional = false) + val schemaDef = Schemas.record( + Chunk( + Schemas.int.required.named("a"), + Schemas.string.optional.named("b") + ) + ) + + assertTrue( + tpeOptional == schemaDef.optional.named(name), + tpeRequired == schemaDef.required.named(name) + ) + }, + test("sequence") { + val name = "mylist" + val encoders: List[SchemaEncoder[_]] = + List( + Derive.derive[SchemaEncoder, List[String]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Boolean]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Byte]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Short]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Int]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Long]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[UUID]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[String]]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[Boolean]]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[Byte]]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[Short]]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[Int]]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[Long]]](SchemaEncoderDeriver.default), + Derive.derive[SchemaEncoder, List[Option[UUID]]](SchemaEncoderDeriver.default) + ) + val schemas: List[Schema[_]] = + List( + Schema.list[String], + Schema.list[Int], + Schema.list[Option[String]], + Schema.list[Option[Int]] + ) + val elements = + List( + Schemas.string, + Schemas.boolean, + Schemas.byte, + Schemas.short, + Schemas.int, + Schemas.long, + Schemas.uuid + ) + val schemaDefs = + (elements.map(_.required) ++ elements.map(_.optional)) + .map(_.named("element")) + .map(Schemas.list) + val expectedOptional = + schemaDefs.map(_.optional.named(name)) + val expectedRequired = + schemaDefs.map(_.required.named(name)) + + encoders + .zip(schemas) + .zip(expectedOptional) + .zip(expectedRequired) + .map { case (((encoder, schema), expOptional), expRequired) => + val tpeOptional = encode(encoder, schema, name, optional = true) + val tpeRequired = encode(encoder, schema, name, optional = false) + + assertTrue( + tpeOptional == expOptional, + tpeRequired == expRequired + ) + } + .reduce(_ && _) + }, + test("map") { + val name = "mymap" + + val encoder = Derive.derive[SchemaEncoder, Map[String, Int]](SchemaEncoderDeriver.default) + val tpe = encoder.encode(Schema.map[String, Int], name, optional = true) + + assertTrue( + tpe == Schemas + .map(Schemas.string.required.named("key"), Schemas.int.required.named("value")) + .optional + .named(name) + ) + } +// test("summoned") { + // // @nowarn annotation is needed to avoid having 'variable is not used' compiler error + // @nowarn + // implicit val intEncoder: SchemaEncoder[Int] = new SchemaEncoder[Int] { + // override def encode(schema: Schema[Int], name: String, optional: Boolean): Type = + // Schemas.uuid.optionality(optional).named(name) + // } + // + // val name = "myrecord" + // val encoder = Derive.derive[SchemaEncoder, Record](SchemaEncoderDeriver.summoned) + // val tpe = encoder.encode(Record.schema, name, optional = true) + // + // assertTrue( + // tpe == Schemas + // .record(Chunk(Schemas.uuid.required.named("a"), Schemas.string.optional.named("b"))) + // .optional + // .named(name) + // ) + // } + ) + +} diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala new file mode 100644 index 0000000..1da1451 --- /dev/null +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueCodecDeriverSpec.scala @@ -0,0 +1,156 @@ +package me.mnedokushev.zio.apache.parquet.core.codec + +//import me.mnedokushev.zio.apache.parquet.core.Value +//import me.mnedokushev.zio.apache.parquet.core.Value.PrimitiveValue +import zio._ +import zio.schema._ +import zio.test._ + +import java.util.UUID + +//import java.nio.ByteBuffer +//import java.util.UUID +//import scala.annotation.nowarn + +object ValueCodecDeriverSpec extends ZIOSpecDefault { + + case class Record(a: Int, b: Boolean, c: Option[String], d: List[Int], e: Map[String, Int]) + object Record { + implicit val schema: Schema[Record] = DeriveSchema.gen[Record] + } + + case class SummonedRecord(a: Int, b: Boolean, c: Option[String], d: Option[Int]) + object SummonedRecord { + implicit val schema: Schema[SummonedRecord] = DeriveSchema.gen[SummonedRecord] + } + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ValueCodecDeriverSpec")( + test("primitive") { + val stringEncoder = Derive.derive[ValueEncoder, String](ValueEncoderDeriver.default) + val booleanEncoder = Derive.derive[ValueEncoder, Boolean](ValueEncoderDeriver.default) + val byteEncoder = Derive.derive[ValueEncoder, Byte](ValueEncoderDeriver.default) + val shortEncoder = Derive.derive[ValueEncoder, Short](ValueEncoderDeriver.default) + val intEncoder = Derive.derive[ValueEncoder, Int](ValueEncoderDeriver.default) + val longEncoder = Derive.derive[ValueEncoder, Long](ValueEncoderDeriver.default) + val uuidEncoder = Derive.derive[ValueEncoder, UUID](ValueEncoderDeriver.default) + + val stringDecoder = Derive.derive[ValueDecoder, String](ValueDecoderDeriver.default) + val booleanDecoder = Derive.derive[ValueDecoder, Boolean](ValueDecoderDeriver.default) + val byteDecoder = Derive.derive[ValueDecoder, Byte](ValueDecoderDeriver.default) + val shortDecoder = Derive.derive[ValueDecoder, Short](ValueDecoderDeriver.default) + val intDecoder = Derive.derive[ValueDecoder, Int](ValueDecoderDeriver.default) + val longDecoder = Derive.derive[ValueDecoder, Long](ValueDecoderDeriver.default) + val uuidDecoder = Derive.derive[ValueDecoder, UUID](ValueDecoderDeriver.default) + + val stringPayload = "foo" + val booleanPayload = false + val bytePayload = 10.toByte + val shortPayload = 30.toShort + val intPayload = 254 + val longPayload = 398812L + val uuidPayload = UUID.randomUUID() + + for { + stringValue <- stringEncoder.encodeZIO(stringPayload) + stringResult <- stringDecoder.decodeZIO(stringValue) + booleanValue <- booleanEncoder.encodeZIO(booleanPayload) + booleanResult <- booleanDecoder.decodeZIO(booleanValue) + byteValue <- byteEncoder.encodeZIO(bytePayload) + byteResult <- byteDecoder.decodeZIO(byteValue) + shortValue <- shortEncoder.encodeZIO(shortPayload) + shortResult <- shortDecoder.decodeZIO(shortValue) + intValue <- intEncoder.encodeZIO(intPayload) + intResult <- intDecoder.decodeZIO(intValue) + longValue <- longEncoder.encodeZIO(longPayload) + longResult <- longDecoder.decodeZIO(longValue) + uuidValue <- uuidEncoder.encodeZIO(uuidPayload) + uuidResult <- uuidDecoder.decodeZIO(uuidValue) + } yield assertTrue( + stringResult == stringPayload, + booleanResult == booleanPayload, + byteResult == bytePayload, + shortResult == shortPayload, + intResult == intPayload, + longResult == longPayload, + uuidResult == uuidPayload + ) + }, + test("option") { + val encoder = Derive.derive[ValueEncoder, Option[Int]](ValueEncoderDeriver.default) + val decoder = Derive.derive[ValueDecoder, Option[Int]](ValueDecoderDeriver.default) + val payloads = List(Option(3), Option.empty) + + payloads.map { payload => + for { + value <- encoder.encodeZIO(payload) + result <- decoder.decodeZIO(value) + } yield assertTrue(result == payload) + }.reduce(_ && _) + }, + test("sequence") { + val encoder = Derive.derive[ValueEncoder, List[Map[String, Int]]](ValueEncoderDeriver.default) + val decoder = Derive.derive[ValueDecoder, List[Map[String, Int]]](ValueDecoderDeriver.default) + val payloads = List(List(Map("foo" -> 1, "bar" -> 2)), List.empty) + + payloads.map { payload => + for { + value <- encoder.encodeZIO(payload) + result <- decoder.decodeZIO(value) + } yield assertTrue(result == payload) + }.reduce(_ && _) + }, + test("map") { + val encoder = Derive.derive[ValueEncoder, Map[String, Int]](ValueEncoderDeriver.default) + val decoder = Derive.derive[ValueDecoder, Map[String, Int]](ValueDecoderDeriver.default) + val payloads = List(Map("foo" -> 3), Map.empty[String, Int]) + + payloads.map { payload => + for { + value <- encoder.encodeZIO(payload) + result <- decoder.decodeZIO(value) + } yield assertTrue(result == payload) + }.reduce(_ && _) + }, + test("record") { + val encoder = Derive.derive[ValueEncoder, Record](ValueEncoderDeriver.default) + val decoder = Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.default) + val payload = Record(2, false, Some("data"), List(1), Map("zio" -> 1)) + + for { + value <- encoder.encodeZIO(payload) + result <- decoder.decodeZIO(value) + } yield assertTrue(result == payload) + } +// test("summoned") { + // @nowarn + // implicit val stringEncoder: ValueEncoder[String] = new ValueEncoder[String] { + // override def encode(value: String): Value = + // Value.uuid(UUID.fromString(value)) + // } + // @nowarn + // implicit val stringDecoder: ValueDecoder[String] = new ValueDecoder[String] { + // override def decode(value: Value): String = + // value match { + // case PrimitiveValue.ByteArrayValue(v) if v.length == 16 => + // val bb = ByteBuffer.wrap(v.getBytes) + // new UUID(bb.getLong, bb.getLong).toString + // case other => + // throw DecoderError(s"Wrong value: $other") + // + // } + // } + // + // val uuid = UUID.randomUUID() + // val encoder = Derive.derive[ValueEncoder, SummonedRecord](ValueEncoderDeriver.summoned) + // val decoder = Derive.derive[ValueDecoder, SummonedRecord](ValueDecoderDeriver.summoned) + // val payload = SummonedRecord(2, false, Some(uuid.toString), None) + // + // for { + // value <- encoder.encodeZIO(payload) + // result <- decoder.decodeZIO(value) + // } yield assertTrue(result == payload) + // } + ) + +} diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala new file mode 100644 index 0000000..5a45fa9 --- /dev/null +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala @@ -0,0 +1,58 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.codec._ +import zio._ +import zio.schema._ +import zio.test.TestAspect._ +import zio.test._ + +import java.nio.file.Files + +object ParquetIOSpec extends ZIOSpecDefault { + + val tmpDir = Path(Files.createTempDirectory("zio-apache-parquet")) + val tmpFile = "parquet-writer-spec.parquet" + val tmpCrcPath = tmpDir / ".parquet-writer-spec.parquet.crc" + val tmpPath = tmpDir / tmpFile + + case class Record(a: Int, b: String, c: Option[Long], d: List[Int], e: Map[String, Int]) + object Record { + implicit val schema: Schema[Record] = + DeriveSchema.gen[Record] + implicit val schemaEncoder: SchemaEncoder[Record] = + Derive.derive[SchemaEncoder, Record](SchemaEncoderDeriver.summoned) + implicit val valueEncoder: ValueEncoder[Record] = + Derive.derive[ValueEncoder, Record](ValueEncoderDeriver.summoned) + implicit val valueDecoder: ValueDecoder[Record] = + Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.summoned) + } + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ParquetIOSpec")( + test("write and read") { + val payload = Chunk( + Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)), + Record(2, "bar", Some(3L), List.empty, Map("third" -> 3)) + ) + + for { + writer <- ZIO.service[ParquetWriter[Record]] + reader <- ZIO.service[ParquetReader[Record]] + _ <- writer.write(payload) + _ <- writer.close // force to flush parquet data on disk + result <- ZIO.scoped[Any](reader.read(tmpPath).runCollect) + } yield assertTrue(result == payload) + }.provide( + ParquetWriter.configured[Record](tmpPath), + ParquetReader.configured[Record]() + ) @@ after(cleanTmpFile(tmpDir)) + ) + + private def cleanTmpFile(path: Path) = + for { + _ <- ZIO.attemptBlockingIO(Files.delete(tmpCrcPath.toJava)) + _ <- ZIO.attemptBlockingIO(Files.delete(tmpPath.toJava)) + _ <- ZIO.attemptBlockingIO(Files.delete(path.toJava)) + } yield () + +} diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index 3aad316..a90c9cc 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -1,5 +1,5 @@ -import sbt._ -import sbt.Keys._ +import sbt.* +import sbt.Keys.* import scalafix.sbt.ScalafixPlugin.autoImport.scalafixSemanticdb object BuildHelper { diff --git a/project/Dep.scala b/project/Dep.scala index aa85b44..cc75b97 100644 --- a/project/Dep.scala +++ b/project/Dep.scala @@ -6,9 +6,13 @@ object Dep { val zio = "2.0.19" val zioSchema = "0.4.16" val scalaCollectionCompat = "2.11.0" + val apacheParquet = "1.13.0" + val apacheHadoop = "3.3.6" } object O { + val apacheParquet = "org.apache.parquet" + val apacheHadoop = "org.apache.hadoop" val scalaLang = "org.scala-lang" val zio = "dev.zio" val scalaLangModules = "org.scala-lang.modules" @@ -20,6 +24,12 @@ object Dep { lazy val zioTest = O.zio %% "zio-test" % V.zio lazy val zioTestSbt = O.zio %% "zio-test-sbt" % V.zio + lazy val parquetHadoop = O.apacheParquet % "parquet-hadoop" % V.apacheParquet + lazy val parquetColumn = O.apacheParquet % "parquet-column" % V.apacheParquet + + lazy val hadoopCommon = O.apacheHadoop % "hadoop-common" % V.apacheHadoop + lazy val hadoopMapred = O.apacheHadoop % "hadoop-mapred" % "0.22.0" + lazy val scalaCollectionCompat = O.scalaLangModules %% "scala-collection-compat" % V.scalaCollectionCompat lazy val core = Seq( @@ -27,6 +37,10 @@ object Dep { zioSchema, zioSchemaDerivation, scalaCollectionCompat, + parquetHadoop, + parquetColumn, + hadoopCommon, + hadoopMapred, zioTest % Test, zioTestSbt % Test ) diff --git a/project/plugins.sbt b/project/plugins.sbt index acb9034..ddc4473 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -12,7 +12,4 @@ addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0") addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.12") addSbtPlugin("com.github.sbt" % "sbt-github-actions" % "0.19.0") -// Docs -addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.1") - addDependencyTreePlugin