Skip to content

Commit f1b235e

Browse files
committed
Support exhaustive list of primitive types for ValueEncoder/ValueDecoder (WIP)
1 parent dea5531 commit f1b235e

File tree

6 files changed

+523
-80
lines changed

6 files changed

+523
-80
lines changed

modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Schemas.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,36 +106,37 @@ object Schemas {
106106
import PrimitiveTypeName._
107107
import LogicalTypeAnnotation._
108108

109+
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
109110
def enum0: PrimitiveDef = PrimitiveDef(BINARY, Some(enumType()))
110111
val string: PrimitiveDef = PrimitiveDef(BINARY, Some(stringType()))
111-
val boolean: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
112+
val boolean: PrimitiveDef = PrimitiveDef(BOOLEAN)
112113
val byte: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
113114
val short: PrimitiveDef = PrimitiveDef(INT32, Some(intType(16, true)))
114115
val int: PrimitiveDef = PrimitiveDef(INT32, Some(intType(32, true)))
115116
val long: PrimitiveDef = PrimitiveDef(INT64, Some(intType(64, true)))
116117
val float: PrimitiveDef = PrimitiveDef(FLOAT)
117118
val double: PrimitiveDef = PrimitiveDef(DOUBLE)
118119
val binary: PrimitiveDef = PrimitiveDef(BINARY)
119-
val char: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
120+
val char: PrimitiveDef = byte
120121
val uuid: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY, Some(uuidType())).length(16)
121-
val bigDecimal: PrimitiveDef = PrimitiveDef(INT64, Some(decimalType(11, 2)))
122+
val bigDecimal: PrimitiveDef = PrimitiveDef(INT64, Some(decimalType(DECIMAL_PRECISION, DECIMAL_SCALE)))
122123
val bigInteger: PrimitiveDef = PrimitiveDef(BINARY)
123-
val dayOfWeek: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
124-
val monthType: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
125-
val monthDay: PrimitiveDef = PrimitiveDef(INT32, Some(intType(8, false)))
124+
val dayOfWeek: PrimitiveDef = byte
125+
val monthType: PrimitiveDef = byte
126+
val monthDay: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY).length(2)
126127
val period: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY).length(12)
127128
val year: PrimitiveDef = PrimitiveDef(INT32, Some(intType(16, false)))
128-
val yearMonth: PrimitiveDef = PrimitiveDef(INT32, Some(intType(32, false)))
129-
val zoneId: PrimitiveDef = PrimitiveDef(BINARY, Some(stringType()))
130-
val zoneOffset: PrimitiveDef = PrimitiveDef(BINARY, Some(stringType()))
129+
val yearMonth: PrimitiveDef = PrimitiveDef(FIXED_LEN_BYTE_ARRAY).length(4)
130+
val zoneId: PrimitiveDef = string
131+
val zoneOffset: PrimitiveDef = string
131132
val duration: PrimitiveDef = PrimitiveDef(INT64, Some(intType(64, false)))
132133
val instant: PrimitiveDef = PrimitiveDef(INT64, Some(intType(64, false)))
133134
val localDate: PrimitiveDef = PrimitiveDef(INT32, Some(dateType()))
134135
val localTime: PrimitiveDef = PrimitiveDef(INT32, Some(timeType(true, TimeUnit.MILLIS)))
135136
val localDateTime: PrimitiveDef = PrimitiveDef(INT64, Some(timestampType(true, TimeUnit.MILLIS)))
136137
val offsetTime: PrimitiveDef = PrimitiveDef(INT32, Some(timeType(false, TimeUnit.MILLIS)))
137138
val offsetDateTime: PrimitiveDef = PrimitiveDef(INT64, Some(timestampType(false, TimeUnit.MILLIS)))
138-
val zonedDateTime: PrimitiveDef = PrimitiveDef(INT64, Some(timestampType(false, TimeUnit.MILLIS)))
139+
val zonedDateTime: PrimitiveDef = offsetDateTime
139140

140141
def record(fields: Chunk[Type]): RecordDef = RecordDef(fields)
141142
def list(element: Type): ListDef = ListDef(element)

modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/Value.scala

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,26 @@ import org.apache.parquet.io.api.{ Binary, RecordConsumer }
44
import org.apache.parquet.schema.Type
55
import zio.Chunk
66

7-
import java.nio.ByteBuffer
7+
import java.math.{ BigDecimal, BigInteger }
8+
import java.nio.{ ByteBuffer, ByteOrder }
9+
import java.time.{
10+
DayOfWeek,
11+
Duration,
12+
Instant,
13+
LocalDate,
14+
LocalDateTime,
15+
LocalTime,
16+
Month,
17+
MonthDay,
18+
OffsetDateTime,
19+
OffsetTime,
20+
Period,
21+
Year,
22+
YearMonth,
23+
ZoneId,
24+
ZoneOffset,
25+
ZonedDateTime
26+
}
827
import java.util.UUID
928

1029
sealed trait Value {
@@ -179,8 +198,11 @@ object Value {
179198
def boolean(v: Boolean) =
180199
PrimitiveValue.BooleanValue(v)
181200

201+
def byte(v: Byte) =
202+
int(v.toInt)
203+
182204
def short(v: Short) =
183-
PrimitiveValue.Int32Value(v.toInt)
205+
int(v.toInt)
184206

185207
def int(v: Int) =
186208
PrimitiveValue.Int32Value(v)
@@ -198,7 +220,7 @@ object Value {
198220
PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(v.toArray))
199221

200222
def char(v: Char) =
201-
PrimitiveValue.Int32Value(v.toInt)
223+
int(v.toInt)
202224

203225
def uuid(v: UUID) = {
204226
val bb = ByteBuffer.wrap(Array.ofDim(16))
@@ -209,6 +231,101 @@ object Value {
209231
PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(bb.array()))
210232
}
211233

234+
def bigDecimal(v: BigDecimal) =
235+
long(v.unscaledValue.longValue)
236+
237+
def bigInteger(v: BigInteger) =
238+
PrimitiveValue.BinaryValue(Binary.fromConstantByteArray(v.toByteArray))
239+
240+
def dayOfWeek(v: DayOfWeek) =
241+
byte(v.getValue.toByte)
242+
243+
def month(v: Month) =
244+
byte(v.getValue.toByte)
245+
246+
def monthDay(v: MonthDay) = {
247+
val bb = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN)
248+
249+
bb.put(v.getMonthValue.toByte)
250+
bb.put(v.getDayOfMonth.toByte)
251+
252+
PrimitiveValue.BinaryValue(Binary.fromReusedByteArray(bb.array()))
253+
}
254+
255+
def period(v: Period) = {
256+
val bb = ByteBuffer.allocate(12).order(ByteOrder.LITTLE_ENDIAN)
257+
258+
bb.putInt(v.getYears)
259+
bb.putInt(v.getMonths)
260+
bb.putInt(v.getDays)
261+
262+
PrimitiveValue.BinaryValue(Binary.fromReusedByteArray(bb.array()))
263+
}
264+
265+
def year(v: Year) =
266+
short(v.getValue.toShort)
267+
268+
def yearMonth(v: YearMonth) = {
269+
val bb = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN)
270+
271+
bb.putShort(v.getYear.toShort)
272+
bb.putShort(v.getMonthValue.toShort)
273+
274+
PrimitiveValue.BinaryValue(Binary.fromReusedByteArray(bb.array()))
275+
}
276+
277+
def zoneId(v: ZoneId) =
278+
string(v.getId)
279+
280+
def zoneOffset(v: ZoneOffset) =
281+
string(v.getId)
282+
283+
def duration(v: Duration) =
284+
long(v.toMillis)
285+
286+
def instant(v: Instant) =
287+
long(v.toEpochMilli)
288+
289+
def localDate(v: LocalDate) =
290+
int(v.toEpochDay.toInt)
291+
292+
def localTime(v: LocalTime) =
293+
int((v.toNanoOfDay / MICROS_FACTOR).toInt)
294+
295+
def localDateTime(v: LocalDateTime) = {
296+
val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY
297+
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
298+
val epochMillis = dateMillis + timeMillis
299+
300+
long(epochMillis)
301+
}
302+
303+
def offsetTime(v: OffsetTime) = {
304+
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
305+
val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR
306+
val dayMillis = timeMillis - offsetMillis
307+
308+
int(dayMillis.toInt)
309+
}
310+
311+
def offsetDateTime(v: OffsetDateTime) = {
312+
val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY
313+
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
314+
val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR
315+
val epochMillis = dateMillis + timeMillis - offsetMillis
316+
317+
long(epochMillis)
318+
}
319+
320+
def zonedDateTime(v: ZonedDateTime) = {
321+
val dateMillis = v.toLocalDate.toEpochDay * MILLIS_PER_DAY
322+
val timeMillis = v.toLocalTime.toNanoOfDay / MICROS_FACTOR
323+
val offsetMillis = v.getOffset.getTotalSeconds * MILLIS_FACTOR
324+
val epochMillis = dateMillis + timeMillis - offsetMillis
325+
326+
long(epochMillis)
327+
}
328+
212329
def record(r: Map[String, Value]) =
213330
GroupValue.RecordValue(r)
214331

modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/codec/ValueDecoderDeriver.scala

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,29 @@
11
package me.mnedokushev.zio.apache.parquet.core.codec
22

3-
import me.mnedokushev.zio.apache.parquet.core.Value
3+
import me.mnedokushev.zio.apache.parquet.core.{ DECIMAL_SCALE, MICROS_FACTOR, MILLIS_PER_DAY, Value }
44
import me.mnedokushev.zio.apache.parquet.core.Value.{ GroupValue, PrimitiveValue }
55
import zio._
66
import zio.schema._
77

8-
import java.nio.ByteBuffer
9-
import java.nio.charset.StandardCharsets
8+
import java.math.{ BigDecimal, BigInteger }
9+
import java.nio.{ ByteBuffer, ByteOrder }
10+
import java.time.{
11+
DayOfWeek,
12+
Instant,
13+
LocalDate,
14+
LocalDateTime,
15+
LocalTime,
16+
Month,
17+
MonthDay,
18+
OffsetDateTime,
19+
OffsetTime,
20+
Period,
21+
Year,
22+
YearMonth,
23+
ZoneId,
24+
ZoneOffset,
25+
ZonedDateTime
26+
}
1027
import java.util.UUID
1128

1229
object ValueDecoderDeriver {
@@ -63,32 +80,86 @@ object ValueDecoderDeriver {
6380
st: StandardType[A],
6481
summoned: => Option[ValueDecoder[A]]
6582
): ValueDecoder[A] = new ValueDecoder[A] {
83+
84+
private def localTime(v: Int) =
85+
LocalTime.ofNanoOfDay(v * MICROS_FACTOR)
86+
87+
private def localDateTime(v: Long) = {
88+
val epochDay = v / MILLIS_PER_DAY
89+
val nanoOfDay = (v - (epochDay * MILLIS_PER_DAY)) * MICROS_FACTOR
90+
91+
LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay))
92+
}
93+
6694
override def decode(value: Value): A =
6795
(st, value) match {
68-
case (StandardType.StringType, PrimitiveValue.BinaryValue(v)) =>
69-
new String(v.getBytes, StandardCharsets.UTF_8)
70-
case (StandardType.BoolType, PrimitiveValue.BooleanValue(v)) =>
96+
case (StandardType.StringType, PrimitiveValue.BinaryValue(v)) =>
97+
v.toStringUsingUTF8
98+
case (StandardType.BoolType, PrimitiveValue.BooleanValue(v)) =>
7199
v
72-
case (StandardType.ByteType, PrimitiveValue.Int32Value(v)) =>
100+
case (StandardType.ByteType, PrimitiveValue.Int32Value(v)) =>
73101
v.toByte
74-
case (StandardType.ShortType, PrimitiveValue.Int32Value(v)) =>
102+
case (StandardType.ShortType, PrimitiveValue.Int32Value(v)) =>
75103
v.toShort
76-
case (StandardType.IntType, PrimitiveValue.Int32Value(v)) =>
104+
case (StandardType.IntType, PrimitiveValue.Int32Value(v)) =>
77105
v
78-
case (StandardType.LongType, PrimitiveValue.Int64Value(v)) =>
106+
case (StandardType.LongType, PrimitiveValue.Int64Value(v)) =>
79107
v
80-
case (StandardType.FloatType, PrimitiveValue.FloatValue(v)) =>
108+
case (StandardType.FloatType, PrimitiveValue.FloatValue(v)) =>
81109
v
82-
case (StandardType.DoubleType, PrimitiveValue.DoubleValue(v)) =>
110+
case (StandardType.DoubleType, PrimitiveValue.DoubleValue(v)) =>
83111
v
84-
case (StandardType.BinaryType, PrimitiveValue.BinaryValue(v)) =>
112+
case (StandardType.BinaryType, PrimitiveValue.BinaryValue(v)) =>
85113
Chunk.fromArray(v.getBytes)
86-
case (StandardType.CharType, PrimitiveValue.Int32Value(v)) =>
114+
case (StandardType.CharType, PrimitiveValue.Int32Value(v)) =>
87115
v.toChar
88-
case (StandardType.UUIDType, PrimitiveValue.BinaryValue(v)) =>
116+
case (StandardType.UUIDType, PrimitiveValue.BinaryValue(v)) =>
89117
val bb = ByteBuffer.wrap(v.getBytes)
118+
90119
new UUID(bb.getLong, bb.getLong)
91-
case (other, _) =>
120+
case (StandardType.BigDecimalType, PrimitiveValue.Int64Value(v)) =>
121+
BigDecimal.valueOf(v, DECIMAL_SCALE)
122+
case (StandardType.BigIntegerType, PrimitiveValue.BinaryValue(v)) =>
123+
new BigInteger(v.getBytes)
124+
case (StandardType.DayOfWeekType, PrimitiveValue.Int32Value(v)) =>
125+
DayOfWeek.of(v)
126+
case (StandardType.MonthType, PrimitiveValue.Int32Value(v)) =>
127+
Month.of(v)
128+
case (StandardType.MonthDayType, PrimitiveValue.BinaryValue(v)) =>
129+
val bb = ByteBuffer.wrap(v.getBytes).order(ByteOrder.LITTLE_ENDIAN)
130+
131+
MonthDay.of(bb.get.toInt, bb.get.toInt)
132+
case (StandardType.PeriodType, PrimitiveValue.BinaryValue(v)) =>
133+
val bb = ByteBuffer.wrap(v.getBytes).order(ByteOrder.LITTLE_ENDIAN)
134+
135+
Period.of(bb.getInt, bb.getInt, bb.getInt)
136+
case (StandardType.YearType, PrimitiveValue.Int32Value(v)) =>
137+
Year.of(v)
138+
case (StandardType.YearMonthType, PrimitiveValue.BinaryValue(v)) =>
139+
val bb = ByteBuffer.wrap(v.getBytes).order(ByteOrder.LITTLE_ENDIAN)
140+
141+
YearMonth.of(bb.getShort.toInt, bb.getShort.toInt)
142+
case (StandardType.ZoneIdType, PrimitiveValue.BinaryValue(v)) =>
143+
ZoneId.of(v.toStringUsingUTF8)
144+
case (StandardType.ZoneOffsetType, PrimitiveValue.BinaryValue(v)) =>
145+
ZoneOffset.of(v.toStringUsingUTF8)
146+
case (StandardType.DurationType, PrimitiveValue.Int64Value(v)) =>
147+
Duration.fromMillis(v)
148+
case (StandardType.InstantType, PrimitiveValue.Int64Value(v)) =>
149+
Instant.ofEpochMilli(v)
150+
case (StandardType.LocalDateType, PrimitiveValue.Int32Value(v)) =>
151+
LocalDate.ofEpochDay(v.toLong)
152+
case (StandardType.LocalTimeType, PrimitiveValue.Int32Value(v)) =>
153+
localTime(v)
154+
case (StandardType.LocalDateTimeType, PrimitiveValue.Int64Value(v)) =>
155+
localDateTime(v)
156+
case (StandardType.OffsetTimeType, PrimitiveValue.Int32Value(v)) =>
157+
OffsetTime.of(localTime(v), ZoneOffset.UTC)
158+
case (StandardType.OffsetDateTimeType, PrimitiveValue.Int64Value(v)) =>
159+
OffsetDateTime.of(localDateTime(v), ZoneOffset.UTC)
160+
case (StandardType.ZonedDateTimeType, PrimitiveValue.Int64Value(v)) =>
161+
ZonedDateTime.of(localDateTime(v), ZoneId.of("Z"))
162+
case (other, _) =>
92163
throw DecoderError(s"Unsupported ZIO Schema StandartType $other")
93164
}
94165
}

0 commit comments

Comments
 (0)