From 98920e513e7f39456d4b900c54a813f1710690ea Mon Sep 17 00:00:00 2001 From: Michael Nedokushev Date: Sat, 9 Dec 2023 01:42:51 +0000 Subject: [PATCH] Implement ParquetReader (WIP) --- .../zio/apache/parquet/core/Value.scala | 33 +++-- .../core/codec/ValueDecoderDeriver.scala | 7 +- .../core/hadoop/GroupValueConverter.scala | 135 ++++++++++++++++++ .../parquet/core/hadoop/ParquetReader.scala | 56 ++++++++ .../parquet/core/hadoop/ParquetWriter.scala | 9 +- .../zio/apache/parquet/core/hadoop/Path.scala | 5 + .../parquet/core/hadoop/ReadSupport.scala | 34 +++++ ...etWriterSpec.scala => ParquetIOSpec.scala} | 26 ++-- 8 files changed, 279 insertions(+), 26 deletions(-) create mode 100644 modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala create mode 100644 modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala create mode 100644 modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala rename modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/{ParquetWriterSpec.scala => ParquetIOSpec.scala} (56%) diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala index 23f4aa7..244b22f 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala @@ -59,7 +59,7 @@ object Value { } - case class ByteArrayValue(value: Binary) extends PrimitiveValue[Binary] { + case class BinaryValue(value: Binary) extends PrimitiveValue[Binary] { override def write(schema: Type, recordConsumer: RecordConsumer): Unit = recordConsumer.addBinary(value) @@ -68,11 +68,15 @@ object Value { } - sealed trait GroupValue extends Value + sealed trait GroupValue[Self <: GroupValue[Self]] extends Value { + + def put(name: String, value: Value): Self + + } object GroupValue { - case class RecordValue(values: Map[String, Value]) extends GroupValue { + case class RecordValue(values: Map[String, Value]) extends GroupValue[RecordValue] { override def write(schema: Type, recordConsumer: RecordConsumer): Unit = { val groupSchema = schema.asGroupType() @@ -91,9 +95,15 @@ object Value { recordConsumer.endGroup() } + override def put(name: String, value: Value): RecordValue = + if (values.contains(name)) + this.copy(values.updated(name, value)) + else + throw new IllegalArgumentException(s"Record doesn't contain field $name") + } - case class ListValue(values: Chunk[Value]) extends GroupValue { + case class ListValue(values: Chunk[Value]) extends GroupValue[ListValue] { override def write(schema: Type, recordConsumer: RecordConsumer): Unit = { recordConsumer.startGroup() @@ -102,7 +112,7 @@ object Value { val groupSchema = schema.asGroupType() val listSchema = groupSchema.getFields.get(0).asGroupType() val listFieldName = listSchema.getName - val elementName = listSchema.getFields.get(0).getName + val elementName = listSchema.getFields.get(0).getName // TODO: validate, must be "element" val listIndex = groupSchema.getFieldIndex(listFieldName) recordConsumer.startField(listFieldName, listIndex) @@ -117,9 +127,12 @@ object Value { recordConsumer.endGroup() } + override def put(name: String, value: Value): ListValue = + this.copy(values = values :+ value) + } - case class MapValue(values: Map[Value, Value]) extends GroupValue { + case class MapValue(values: Map[Value, Value]) extends GroupValue[MapValue] { override def write(schema: Type, recordConsumer: RecordConsumer): Unit = { recordConsumer.startGroup() @@ -142,6 +155,8 @@ object Value { recordConsumer.endGroup() } + override def put(name: String, value: Value): MapValue = ??? +// this.copy(values = values.updated(name, value)) } } @@ -150,7 +165,7 @@ object Value { NullValue def string(v: String) = - PrimitiveValue.ByteArrayValue(Binary.fromString(v)) + PrimitiveValue.BinaryValue(Binary.fromString(v)) def boolean(v: Boolean) = PrimitiveValue.BooleanValue(v) @@ -171,7 +186,7 @@ object Value { PrimitiveValue.DoubleValue(v) def binary(v: Chunk[Byte]) = - PrimitiveValue.ByteArrayValue(Binary.fromConstantByteArray(v.toArray)) + PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(v.toArray)) def char(v: Char) = PrimitiveValue.Int32Value(v.toInt) @@ -182,7 +197,7 @@ object Value { bb.putLong(v.getMostSignificantBits) bb.putLong(v.getLeastSignificantBits) - PrimitiveValue.ByteArrayValue(Binary.fromConstantByteArray(bb.array())) + PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(bb.array())) } def record(r: Map[String, Value]) = diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala index 0076f7a..bee3002 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala @@ -55,7 +55,7 @@ object ValueDecoderDeriver { ): ValueDecoder[A] = new ValueDecoder[A] { override def decode(value: Value): A = (st, value) match { - case (StandardType.StringType, PrimitiveValue.ByteArrayValue(v)) => + case (StandardType.StringType, PrimitiveValue.BinaryValue(v)) => new String(v.getBytes, StandardCharsets.UTF_8) case (StandardType.BoolType, PrimitiveValue.BooleanValue(v)) => v @@ -71,11 +71,11 @@ object ValueDecoderDeriver { v case (StandardType.DoubleType, PrimitiveValue.DoubleValue(v)) => v - case (StandardType.BinaryType, PrimitiveValue.ByteArrayValue(v)) => + case (StandardType.BinaryType, PrimitiveValue.BinaryValue(v)) => Chunk.fromArray(v.getBytes) case (StandardType.CharType, PrimitiveValue.Int32Value(v)) => v.toChar - case (StandardType.UUIDType, PrimitiveValue.ByteArrayValue(v)) => + case (StandardType.UUIDType, PrimitiveValue.BinaryValue(v)) => val bb = ByteBuffer.wrap(v.getBytes) new UUID(bb.getLong, bb.getLong) case (other, _) => @@ -135,6 +135,7 @@ object ValueDecoderDeriver { fields: => Chunk[Deriver.WrappedF[ValueDecoder, _]], summoned: => Option[ValueDecoder[B]] ): ValueDecoder[B] = ??? + }.cached def summoned: Deriver[ValueDecoder] = diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala new file mode 100644 index 0000000..4d1497c --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/GroupValueConverter.scala @@ -0,0 +1,135 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value +import me.mnedokushev.zio.apache.parquet.core.Value.{ GroupValue, PrimitiveValue } +import org.apache.parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter } +import org.apache.parquet.schema.{ GroupType, LogicalTypeAnnotation, Type } +import zio.Chunk + +import scala.jdk.CollectionConverters._ + +abstract class GroupValueConverter[V <: GroupValue[V]](schema: GroupType) extends GroupConverter { parent => + + def get: V = + this.groupValue + + def put(name: String, value: Value): Unit = + this.groupValue = this.groupValue.put(name, value) + + protected var groupValue: V = _ + + private val converters: Chunk[Converter] = + Chunk.fromIterable(schema.getFields.asScala.toList.map(fromSchema)) + + private def fromSchema(schema0: Type) = { + val name = schema0.getName + val groupSchema = schema0.asGroupType() + + schema0.getLogicalTypeAnnotation match { + case _ if schema0.isPrimitive => + primitive(name) + case _: LogicalTypeAnnotation.ListLogicalTypeAnnotation => + GroupValueConverter.list(groupSchema, name, parent) + case _: LogicalTypeAnnotation.MapLogicalTypeAnnotation => + GroupValueConverter.map(groupSchema, name, parent) + case _ => + GroupValueConverter.record(groupSchema, name, parent) + } + } + + override def getConverter(fieldIndex: Int): Converter = + converters(fieldIndex) + + private def primitive(name: String) = + new PrimitiveConverter { + + override def addBinary(value: Binary): Unit = + parent.groupValue = parent.groupValue.put(name, PrimitiveValue.BinaryValue(value)) + + override def addBoolean(value: Boolean): Unit = + parent.groupValue = parent.groupValue.put(name, PrimitiveValue.BooleanValue(value)) + + override def addDouble(value: Double): Unit = + parent.groupValue = parent.groupValue.put(name, PrimitiveValue.DoubleValue(value)) + + override def addFloat(value: Float): Unit = + parent.groupValue = parent.groupValue.put(name, PrimitiveValue.FloatValue(value)) + + override def addInt(value: Int): Unit = + parent.groupValue = parent.groupValue.put(name, PrimitiveValue.Int32Value(value)) + + override def addLong(value: Long): Unit = + parent.groupValue = parent.groupValue.put(name, PrimitiveValue.Int64Value(value)) + + } + +} + +object GroupValueConverter { + + private abstract class RecordValueConverter(schema: GroupType) + extends GroupValueConverter[GroupValue.RecordValue](schema) { + + override def start(): Unit = + this.groupValue = Value.record(Map.empty) + + } + +// private abstract class NestedValueConverter[V <: GroupValue[V]]( +// schema: GroupType, +// name: String, +// parent: GroupValueConverter[GroupValue.RecordValue] +// ) extends GroupValueConverter[V](schema) { +// +// override def end(): Unit = +// parent.put(name, this.groupValue) +// +// } + + def root(schema: GroupType): GroupValueConverter[GroupValue.RecordValue] = + new RecordValueConverter(schema) { + + override def end(): Unit = () + } + + def record[V <: GroupValue[V]]( + schema: GroupType, + name: String, + parent: GroupValueConverter[V] + ): GroupValueConverter[GroupValue.RecordValue] = + new RecordValueConverter(schema) { + + override def end(): Unit = + parent.put(name, this.groupValue) + + } + + def list[V <: GroupValue[V]]( + schema: GroupType, + name: String, + parent: GroupValueConverter[V] + ): GroupValueConverter[GroupValue.ListValue] = + new GroupValueConverter[GroupValue.ListValue](schema) { + + override def start(): Unit = + this.groupValue = Value.list(Chunk.empty) + + override def end(): Unit = + parent.put(name, this.groupValue) + } + + def map[V <: GroupValue[V]]( + schema: GroupType, + name: String, + parent: GroupValueConverter[V] + ): GroupValueConverter[GroupValue.MapValue] = + new GroupValueConverter[GroupValue.MapValue](schema) { + + override def start(): Unit = + this.groupValue = Value.map(Map.empty) + + override def end(): Unit = + parent.put(name, this.groupValue) + } + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala new file mode 100644 index 0000000..25f72f0 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala @@ -0,0 +1,56 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import me.mnedokushev.zio.apache.parquet.core.codec.ValueDecoder +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.{ ParquetReader => HadoopParquetReader } +import org.apache.parquet.hadoop.api.{ ReadSupport => HadoopReadSupport } +import org.apache.parquet.io.InputFile +import zio._ +import zio.stream._ + +trait ParquetReader[A <: Product] { + + def read(path: Path): ZStream[Scope, Throwable, A] + +} + +final class ParquetReaderLive[A <: Product](conf: Configuration)(implicit decoder: ValueDecoder[A]) + extends ParquetReader[A] { + + override def read(path: Path): ZStream[Scope, Throwable, A] = + for { + inputFile <- ZStream.fromZIO(ZIO.attemptBlockingIO(path.toInputFile(conf))) + reader <- ZStream.fromZIO( + ZIO.fromAutoCloseable( + ZIO.attemptBlockingIO( + new ParquetReader.Builder(inputFile).withConf(conf).build() + ) + ) + ) + value <- ZStream.repeatZIOOption( + ZIO + .attemptBlockingIO(reader.read()) + .asSomeError + .filterOrFail(_ == null)(None) + .flatMap(decoder.decodeZIO(_).asSomeError) + ) + } yield value + +} + +object ParquetReader { + + final class Builder(file: InputFile) extends HadoopParquetReader.Builder[RecordValue](file) { + + override def getReadSupport: HadoopReadSupport[RecordValue] = + new ReadSupport + + } + + def configured[A <: Product: ValueDecoder: Tag]( + hadoopConf: Configuration = new Configuration() + ): ULayer[ParquetReader[A]] = + ZLayer.succeed(new ParquetReaderLive[A](hadoopConf)) + +} diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala index 5c84219..6c7a24f 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriter.scala @@ -42,7 +42,7 @@ object ParquetWriter { } - def configured[A <: Product]( + def configured[A <: Product: ValueEncoder]( path: Path, writeMode: ParquetFileWriter.Mode = ParquetFileWriter.Mode.CREATE, compressionCodecName: CompressionCodecName = HadoopParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME, @@ -56,11 +56,10 @@ object ParquetWriter { )(implicit schema: Schema[A], schemaEncoder: SchemaEncoder[A], - encoder: ValueEncoder[A], tag: Tag[A] ): TaskLayer[ParquetWriter[A]] = { - def castSchema(schema: Type) = + def castToMessageSchema(schema: Type) = ZIO.attempt { val groupSchema = schema.asGroupType() val name = groupSchema.getName @@ -72,7 +71,7 @@ object ParquetWriter { ZLayer.scoped( for { schema <- schemaEncoder.encodeZIO(schema, tag.tag.shortName, optional = false) - messageSchema <- castSchema(schema) + messageSchema <- castToMessageSchema(schema) hadoopFile <- ZIO.attemptBlockingIO(HadoopOutputFile.fromPath(path.toHadoop, hadoopConf)) builder = new Builder(hadoopFile, messageSchema) .withWriteMode(writeMode) @@ -84,7 +83,7 @@ object ParquetWriter { .withRowGroupSize(rowGroupSize) .withValidation(validationEnabled) .withConf(hadoopConf) - underlying <- ZIO.fromAutoCloseable(ZIO.attemptBlocking(builder.build())) + underlying <- ZIO.fromAutoCloseable(ZIO.attemptBlockingIO(builder.build())) writer = new ParquetWriterLive[A](underlying) } yield writer ) diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala index 4814758..22ee182 100644 --- a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/Path.scala @@ -1,6 +1,8 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{ Path => HadoopPath } +import org.apache.parquet.hadoop.util.HadoopInputFile import java.net.URI import java.nio.file.{ Path => JPath, Paths } @@ -19,6 +21,9 @@ case class Path(underlying: HadoopPath) { def toHadoop: HadoopPath = underlying + def toInputFile(conf: Configuration): HadoopInputFile = + HadoopInputFile.fromPath(underlying, conf) + } object Path { diff --git a/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala new file mode 100644 index 0000000..b51a499 --- /dev/null +++ b/modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ReadSupport.scala @@ -0,0 +1,34 @@ +package me.mnedokushev.zio.apache.parquet.core.hadoop + +import me.mnedokushev.zio.apache.parquet.core.Value.GroupValue.RecordValue +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.hadoop.api.{ InitContext, ReadSupport => HadoopReadSupport } +import org.apache.parquet.io.api.{ GroupConverter, RecordMaterializer } +import org.apache.parquet.schema.MessageType + +import java.util + +class ReadSupport extends HadoopReadSupport[RecordValue] { + + override def prepareForRead( + configuration: Configuration, + keyValueMetaData: util.Map[String, String], + fileSchema: MessageType, + readContext: HadoopReadSupport.ReadContext + ): RecordMaterializer[RecordValue] = new RecordMaterializer[RecordValue] { + + private val converter = + GroupValueConverter.root(fileSchema) + + override def getCurrentRecord: RecordValue = + converter.get + + override def getRootConverter: GroupConverter = + converter + + } + + override def init(context: InitContext): HadoopReadSupport.ReadContext = + new HadoopReadSupport.ReadContext(context.getFileSchema) + +} diff --git a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriterSpec.scala b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala similarity index 56% rename from modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriterSpec.scala rename to modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala index f0ca4c1..d3f8cb0 100644 --- a/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetWriterSpec.scala +++ b/modules/core/src/test/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetIOSpec.scala @@ -8,7 +8,7 @@ import zio.test.TestAspect._ import java.nio.file.Files -object ParquetWriterSpec extends ZIOSpecDefault { +object ParquetIOSpec extends ZIOSpecDefault { val tmpDir = Files.createTempDirectory("zio-apache-parquet") val tmpFile = "parquet-writer-spec.parquet" @@ -22,17 +22,25 @@ object ParquetWriterSpec extends ZIOSpecDefault { Derive.derive[SchemaEncoder, Record](SchemaEncoderDeriver.summoned) implicit val valueEncoder: ValueEncoder[Record] = Derive.derive[ValueEncoder, Record](ValueEncoderDeriver.summoned) + implicit val valueDecoder: ValueDecoder[Record] = + Derive.derive[ValueDecoder, Record](ValueDecoderDeriver.summoned) } override def spec: Spec[TestEnvironment with Scope, Any] = - suite("ParquetWriterSpec")( - test("write") { - ZIO.serviceWithZIO[ParquetWriter[Record]] { writer => - for { - _ <- writer.write(Chunk(Record(1, "foo"), Record(2, "bar"))) - } yield assertTrue(true) - } - }.provide(ParquetWriter.configured[Record](tmpPath)) @@ after(cleanTmpFile(tmpPath)) + suite("ParquetIOSpec")( + test("write and read") { + val payload = Chunk(Record(1, "foo"), Record(2, "bar")) + + for { + writer <- ZIO.service[ParquetWriter[Record]] + reader <- ZIO.service[ParquetReader[Record]] + _ <- writer.write(payload) + result <- ZIO.scoped(reader.read(tmpPath).runCollect) + } yield assertTrue(result == payload) + }.provide( + ParquetWriter.configured[Record](tmpPath), + ParquetReader.configured[Record]() + ) @@ after(cleanTmpFile(tmpPath)) ) def cleanTmpFile(path: Path) =