Skip to content

Commit 56b6029

Browse files
committed
Initial work on type-safe filter predicates
1 parent 4cc3a0d commit 56b6029

File tree

9 files changed

+311
-14
lines changed

9 files changed

+311
-14
lines changed
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
import org.apache.parquet.filter2.predicate.FilterPredicate
4+
import org.apache.parquet.filter2.predicate.FilterApi
5+
import me.mnedokushev.zio.apache.parquet.core.Value
6+
import zio.prelude._
7+
8+
sealed trait Expr[-A]
9+
10+
object Expr {
11+
12+
final case class Column[A](path: String)(implicit val typeTag: TypeTag[A]) extends Expr[A] { self =>
13+
14+
def /[B: TypeTag](column: Column[B]): Column[B] =
15+
Column(s"$path.${column.path}")
16+
17+
def >(value: A)(implicit ev: OperatorSupport.LessGreater[A]): Predicate[A] =
18+
Predicate.Binary(self, value, Operator.Binary.GreaterThen())
19+
20+
def ===(value: A)(implicit ev: OperatorSupport.EqNotEq[A]): Predicate[A] =
21+
Predicate.Binary(self, value, Operator.Binary.Eq())
22+
23+
}
24+
25+
sealed trait Predicate[A] extends Expr[A] { self =>
26+
27+
def not: Predicate[A] =
28+
Predicate.Unary(self, Operator.Unary.Not[A]())
29+
30+
def and[B](other: Predicate[B]): Predicate[A] =
31+
Predicate.Logical(self, other, Operator.Logical.And[A, B])
32+
33+
def or[B](other: Predicate[B]): Predicate[A] =
34+
Predicate.Logical(self, other, Operator.Logical.Or[A, B])
35+
36+
}
37+
38+
object Predicate {
39+
40+
final case class Binary[A](column: Column[A], value: A, op: Operator.Binary[A]) extends Predicate[A]
41+
42+
final case class Unary[A](predicate: Predicate[A], op: Operator.Unary[A]) extends Predicate[A]
43+
44+
final case class Logical[A, B](left: Predicate[A], right: Predicate[B], op: Operator.Logical[A, B])
45+
extends Predicate[A]
46+
47+
}
48+
49+
def compile[A](predicate: Predicate[A]): Either[String, FilterPredicate] =
50+
predicate match {
51+
case Predicate.Unary(predicate0, op) =>
52+
op match {
53+
case Operator.Unary.Not() =>
54+
compile(predicate0).map(FilterApi.not)
55+
}
56+
case Predicate.Logical(left, right, op) =>
57+
(compile(left) <*> compile(right)).map { case (left0, right0) =>
58+
op match {
59+
case Operator.Logical.And() =>
60+
FilterApi.and(left0, right0)
61+
case Operator.Logical.Or() =>
62+
FilterApi.or(left0, right0)
63+
}
64+
}
65+
case Predicate.Binary(column, value, op) =>
66+
(column.typeTag, value) match {
67+
case (TypeTag.String, v: String) =>
68+
val column0 = FilterApi.binaryColumn(column.path)
69+
val value0 = Value.string(v).value
70+
71+
op match {
72+
case Operator.Binary.Eq() =>
73+
Right(FilterApi.eq(column0, value0))
74+
case Operator.Binary.NotEq() =>
75+
Right(FilterApi.notEq(column0, value0))
76+
case _ => ???
77+
}
78+
case (TypeTag.Int, v: Int) =>
79+
val column0 = FilterApi.intColumn(column.path)
80+
val value0 = Int.box(Value.int(v).value)
81+
82+
op match {
83+
case Operator.Binary.GreaterThen() =>
84+
Right(FilterApi.gt(column0, value0))
85+
case _ => ???
86+
}
87+
case _ => ???
88+
}
89+
90+
}
91+
92+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
import zio.schema.AccessorBuilder
4+
import zio.schema.Schema
5+
6+
class ExprAccessorBuilder extends AccessorBuilder {
7+
8+
override type Lens[F, S, A] = Expr.Column[A]
9+
10+
override type Prism[F, S, A] = Unit
11+
12+
override type Traversal[S, A] = Unit
13+
14+
override def makeLens[F, S, A](product: Schema.Record[S], term: Schema.Field[S, A]): Expr.Column[A] = {
15+
implicit val typeTag: TypeTag[A] = TypeTag.deriveTypeTag(term.schema).get
16+
17+
Expr.Column(term.name.toString)
18+
}
19+
20+
override def makePrism[F, S, A](sum: Schema.Enum[S], term: Schema.Case[S, A]): Prism[F, S, A] =
21+
()
22+
23+
override def makeTraversal[S, A](collection: Schema.Collection[S, A], element: Schema[A]): Traversal[S, A] =
24+
()
25+
26+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
import zio.schema._
4+
import me.mnedokushev.zio.apache.parquet.core._
5+
6+
trait Filter[Columns0] {
7+
8+
type Columns
9+
10+
val columns: Columns0
11+
12+
}
13+
14+
object Filter {
15+
16+
def columns[A](implicit schema: Schema.Record[A]): schema.Accessors[Lens, Prism, Traversal] =
17+
new Filter[schema.Accessors[Lens, Prism, Traversal]] {
18+
19+
val accessorBuilder = new ExprAccessorBuilder
20+
21+
override type Columns = schema.Accessors[accessorBuilder.Lens, accessorBuilder.Prism, accessorBuilder.Traversal]
22+
23+
override val columns: Columns = schema.makeAccessors(accessorBuilder)
24+
25+
}.columns
26+
27+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
sealed trait Operator
4+
5+
object Operator {
6+
7+
sealed trait Binary[A] extends Operator
8+
9+
object Binary {
10+
final case class Eq[A: OperatorSupport.EqNotEq]() extends Binary[A]
11+
final case class NotEq[A: OperatorSupport.EqNotEq]() extends Binary[A]
12+
final case class LessThen[A: OperatorSupport.LessGreater]() extends Binary[A]
13+
final case class LessEq[A: OperatorSupport.LessGreater]() extends Binary[A]
14+
final case class GreaterThen[A: OperatorSupport.LessGreater]() extends Binary[A]
15+
final case class GreaterEq[A: OperatorSupport.LessGreater]() extends Binary[A]
16+
17+
}
18+
19+
sealed trait Unary[A] extends Operator
20+
21+
object Unary {
22+
final case class Not[A]() extends Unary[A]
23+
}
24+
25+
sealed trait Logical[A, B] extends Operator
26+
27+
object Logical {
28+
final case class And[A, B]() extends Logical[A, B]
29+
final case class Or[A, B]() extends Logical[A, B]
30+
}
31+
32+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
sealed trait OperatorSupport[A] {
4+
def typeTag: TypeTag[A]
5+
}
6+
7+
object OperatorSupport {
8+
9+
abstract class LessGreater[A: TypeTag] extends OperatorSupport[A] {
10+
override def typeTag: TypeTag[A] = implicitly[TypeTag[A]]
11+
}
12+
13+
object LessGreater {
14+
implicit case object Byte extends LessGreater[Byte]
15+
implicit case object Short extends LessGreater[Short]
16+
implicit case object Int extends LessGreater[Int]
17+
}
18+
19+
abstract class EqNotEq[A: TypeTag] extends OperatorSupport[A] {
20+
override def typeTag: TypeTag[A] = implicitly[TypeTag[A]]
21+
}
22+
23+
object EqNotEq {
24+
implicit case object String extends EqNotEq[String]
25+
implicit case object Boolean extends EqNotEq[Boolean]
26+
implicit case object Byte extends EqNotEq[Byte]
27+
implicit case object Short extends EqNotEq[Short]
28+
}
29+
30+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
import zio.schema.Schema
4+
import zio.schema.StandardType
5+
6+
sealed trait TypeTag[+A]
7+
8+
object TypeTag {
9+
10+
implicit case object String extends TypeTag[String]
11+
implicit case object Boolean extends TypeTag[Boolean]
12+
implicit case object Byte extends TypeTag[Byte]
13+
implicit case object Short extends TypeTag[Short]
14+
implicit case object Int extends TypeTag[Int]
15+
implicit case object Long extends TypeTag[Long]
16+
17+
def deriveTypeTag[A](schema: Schema[A]): Option[TypeTag[A]] =
18+
schema match {
19+
case s: Schema.Lazy[_] => deriveTypeTag(s.schema)
20+
case s: Schema.Optional[_] => deriveTypeTag(s.schema.asInstanceOf[Schema[A]])
21+
case Schema.Primitive(standartType, _) => deriveTypeTag(standartType)
22+
case _ => None
23+
}
24+
25+
def deriveTypeTag[A](standartType: StandardType[A]): Option[TypeTag[A]] =
26+
standartType match {
27+
case StandardType.StringType => Some(TypeTag.String)
28+
case StandardType.BoolType => Some(TypeTag.Boolean)
29+
case StandardType.ByteType => Some(TypeTag.Byte)
30+
case StandardType.ShortType => Some(TypeTag.Short)
31+
case StandardType.IntType => Some(TypeTag.Int)
32+
case StandardType.LongType => Some(TypeTag.Long)
33+
case _ => None
34+
}
35+
36+
}

modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/hadoop/ParquetReader.scala

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ import zio.schema.Schema
1111
import zio.stream._
1212

1313
import java.io.IOException
14+
import me.mnedokushev.zio.apache.parquet.core.filter.Expr
15+
import org.apache.parquet.filter2.compat.FilterCompat
1416

1517
trait ParquetReader[+A <: Product] {
1618

1719
def readStream(path: Path): ZStream[Scope, Throwable, A]
1820

19-
def readChunk(path: Path): Task[Chunk[A]]
21+
def readChunk[B](path: Path, filter: Option[Expr.Predicate[B]] = None): Task[Chunk[A]]
2022

2123
}
2224

@@ -39,10 +41,10 @@ final class ParquetReaderLive[A <: Product: Tag](
3941
)
4042
} yield value
4143

42-
override def readChunk(path: Path): Task[Chunk[A]] =
44+
override def readChunk[B](path: Path, filter: Option[Expr.Predicate[B]] = None): Task[Chunk[A]] =
4345
ZIO.scoped(
4446
for {
45-
reader <- build(path)
47+
reader <- build(path, filter)
4648
readNext = for {
4749
value <- ZIO.attemptBlockingIO(reader.read())
4850
record <- if (value != null)
@@ -63,14 +65,25 @@ final class ParquetReaderLive[A <: Product: Tag](
6365
} yield builder.result()
6466
)
6567

66-
private def build(path: Path): ZIO[Scope, IOException, HadoopParquetReader[RecordValue]] =
68+
private def build[B](
69+
path: Path,
70+
filter: Option[Expr.Predicate[B]] = None
71+
): ZIO[Scope, IOException, HadoopParquetReader[RecordValue]] =
6772
for {
68-
inputFile <- path.toInputFileZIO(hadoopConf)
69-
reader <- ZIO.fromAutoCloseable(
70-
ZIO.attemptBlockingIO(
71-
new ParquetReader.Builder(inputFile, schema, schemaEncoder).withConf(hadoopConf).build()
72-
)
73-
)
73+
inputFile <- path.toInputFileZIO(hadoopConf)
74+
compiledFilter <- ZIO.foreach(filter) { pred =>
75+
ZIO
76+
.fromEither(Expr.compile(pred))
77+
.mapError(new IOException(_))
78+
}
79+
reader <- ZIO.fromAutoCloseable(
80+
ZIO.attemptBlockingIO {
81+
val builder = new ParquetReader.Builder(inputFile, schema, schemaEncoder)
82+
83+
compiledFilter.foreach(pred => builder.withFilter(FilterCompat.get(pred)))
84+
builder.withConf(hadoopConf).build()
85+
}
86+
)
7487
} yield reader
7588

7689
}
@@ -88,14 +101,17 @@ object ParquetReader {
88101

89102
}
90103

91-
def configured[A <: Product: ValueDecoder](
104+
def configured[A <: Product: ValueDecoder: Tag](
92105
hadoopConf: Configuration = new Configuration()
93-
)(implicit tag: Tag[A]): ULayer[ParquetReader[A]] =
106+
): ULayer[ParquetReader[A]] =
94107
ZLayer.succeed(new ParquetReaderLive[A](hadoopConf))
95108

96-
def projected[A <: Product: ValueDecoder](
109+
def projected[A <: Product: ValueDecoder: Tag](
97110
hadoopConf: Configuration = new Configuration()
98-
)(implicit schema: Schema[A], schemaEncoder: SchemaEncoder[A], tag: Tag[A]): ULayer[ParquetReader[A]] =
111+
)(implicit
112+
schema: Schema[A],
113+
schemaEncoder: SchemaEncoder[A]
114+
): ULayer[ParquetReader[A]] =
99115
ZLayer.succeed(new ParquetReaderLive[A](hadoopConf, Some(schema), Some(schemaEncoder)))
100116

101117
}

modules/core/src/main/scala/me/mnedokushev/zio/apache/parquet/core/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package me.mnedokushev.zio.apache.parquet
22

3+
import me.mnedokushev.zio.apache.parquet.core.filter.Expr
4+
35
package object core {
46

57
val MILLIS_PER_DAY = 86400000L
@@ -10,4 +12,8 @@ package object core {
1012
val DECIMAL_PRECISION = 11
1113
val DECIMAL_SCALE = 2
1214

15+
type Lens[F, S, A] = Expr.Column[A]
16+
type Prism[F, S, A] = Unit
17+
type Traversal[S, A] = Unit
18+
1319
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package me.mnedokushev.zio.apache.parquet.core.filter
2+
3+
import zio.test._
4+
import zio._
5+
import zio.schema._
6+
import org.apache.parquet.filter2.predicate.FilterApi
7+
import me.mnedokushev.zio.apache.parquet.core.Value
8+
9+
object ExprSpec extends ZIOSpecDefault {
10+
11+
case class MyRecord(a: String, b: Int)
12+
object MyRecord {
13+
implicit val recordSchema = DeriveSchema.gen[MyRecord]
14+
15+
val (a0, b0) = Filter.columns[MyRecord]
16+
}
17+
18+
override def spec: Spec[TestEnvironment with Scope, Any] =
19+
suite("ExprSpec")(
20+
test("foo") {
21+
val result = Expr.compile(MyRecord.a0 === "bar" and MyRecord.b0 > 1)
22+
val expected =
23+
FilterApi.and(
24+
FilterApi.eq(FilterApi.binaryColumn("a"), Value.string("bar").value),
25+
FilterApi.gt(FilterApi.intColumn("b"), Int.box(Value.int(1).value))
26+
)
27+
28+
assertTrue(result.contains(expected))
29+
}
30+
)
31+
32+
}

0 commit comments

Comments
 (0)