Skip to content

Commit

Permalink
Save filter progress #1
Browse files Browse the repository at this point in the history
  • Loading branch information
grouzen committed Apr 11, 2024
1 parent 56b6029 commit 7de487d
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,34 @@ package me.mnedokushev.zio.apache.parquet.core.filter

import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.filter2.predicate.FilterApi
import me.mnedokushev.zio.apache.parquet.core.Value
// import me.mnedokushev.zio.apache.parquet.core.Value
import zio.prelude._
import org.apache.parquet.filter2.predicate.Operators

sealed trait Expr[-A]

object Expr {

final case class Column[A](path: String)(implicit val typeTag: TypeTag[A]) extends Expr[A] { self =>
sealed trait Column[A] extends Expr[A] { self =>
def typeTag: TypeTag[A]
def path: String

def /[B: TypeTag](column: Column[B]): Column[B] =
Column(s"$path.${column.path}")
def /[B: TypeTag.LtGt](column: ColumnLtGt[B]): ColumnLtGt[B] =
ColumnLtGt(s"$path.${column.path}")

def >(value: A)(implicit ev: OperatorSupport.LessGreater[A]): Predicate[A] =
def >(value: A)(implicit ev: OperatorSupport.LtGt[A]): Predicate[A] =
Predicate.Binary(self, value, Operator.Binary.GreaterThen())

def ===(value: A)(implicit ev: OperatorSupport.EqNotEq[A]): Predicate[A] =
Predicate.Binary(self, value, Operator.Binary.Eq())

}

final case class ColumnDummy[A](path: String)(implicit val typeTag: TypeTag.Dummy[A]) extends Column[A]

final case class ColumnEqNotEq[A](path: String)(implicit val typeTag: TypeTag.EqNotEq[A]) extends Column[A]

final case class ColumnLtGt[A](path: String)(implicit val typeTag: TypeTag.LtGt[A]) extends Column[A]

sealed trait Predicate[A] extends Expr[A] { self =>

def not: Predicate[A] =
Expand All @@ -37,7 +45,7 @@ object Expr {

object Predicate {

final case class Binary[A](column: Column[A], value: A, op: Operator.Binary[A]) extends Predicate[A]
final case class Binary[A, C <: Column[A]](column: C, value: A, op: Operator.Binary[A]) extends Predicate[A]

final case class Unary[A](predicate: Predicate[A], op: Operator.Unary[A]) extends Predicate[A]

Expand All @@ -46,14 +54,47 @@ object Expr {

}

def compile[A](predicate: Predicate[A]): Either[String, FilterPredicate] =
def compile[A](predicate: Predicate[A]): Either[String, FilterPredicate] = {

def handleEqNotEq[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsEqNotEq](
column0: C,
value0: T,
op: Operator.Binary[_]
) = op match {
case Operator.Binary.Eq() =>
Right(FilterApi.eq(column0, value0))
case Operator.Binary.NotEq() =>
Right(FilterApi.notEq(column0, value0))
case _ =>
Left("")
}

def handleLtGt[T <: Comparable[T], C <: Operators.Column[T] with Operators.SupportsLtGt](
column0: C,
value0: T,
op: Operator.Binary[_]
) = op match {
case Operator.Binary.Eq() =>
Right(FilterApi.eq(column0, value0))
case Operator.Binary.NotEq() =>
Right(FilterApi.notEq(column0, value0))
case Operator.Binary.LessThen() =>
Right(FilterApi.lt(column0, value0))
case Operator.Binary.LessEq() =>
Right(FilterApi.ltEq(column0, value0))
case Operator.Binary.GreaterThen() =>
Right(FilterApi.gt(column0, value0))
case Operator.Binary.GreaterEq() =>
Right(FilterApi.gtEq(column0, value0))
}

predicate match {
case Predicate.Unary(predicate0, op) =>
case Predicate.Unary(predicate0, op) =>
op match {
case Operator.Unary.Not() =>
compile(predicate0).map(FilterApi.not)
}
case Predicate.Logical(left, right, op) =>
case Predicate.Logical(left, right, op) =>
(compile(left) <*> compile(right)).map { case (left0, right0) =>
op match {
case Operator.Logical.And() =>
Expand All @@ -63,30 +104,19 @@ object Expr {
}
}
case Predicate.Binary(column, value, op) =>
(column.typeTag, value) match {
case (TypeTag.String, v: String) =>
val column0 = FilterApi.binaryColumn(column.path)
val value0 = Value.string(v).value

op match {
case Operator.Binary.Eq() =>
Right(FilterApi.eq(column0, value0))
case Operator.Binary.NotEq() =>
Right(FilterApi.notEq(column0, value0))
case _ => ???
}
case (TypeTag.Int, v: Int) =>
val column0 = FilterApi.intColumn(column.path)
val value0 = Int.box(Value.int(v).value)

op match {
case Operator.Binary.GreaterThen() =>
Right(FilterApi.gt(column0, value0))
case _ => ???
}
case _ => ???
(column, column.typeTag, value) match {
case (c: ColumnEqNotEq[A], TypeTag.TString, v: String) =>
handleEqNotEq(c.typeTag.column(column.path), c.typeTag.value(v), op)
case (c: ColumnEqNotEq[A], TypeTag.TBoolean, v: Boolean) =>
handleEqNotEq(c.typeTag.column(column.path), c.typeTag.value(v), op)
case (c: ColumnLtGt[A], TypeTag.TByte, v: Byte) =>
handleLtGt(c.typeTag.column(column.path), c.typeTag.value(v), op)
case (c: ColumnLtGt[A], TypeTag.TInt, v: Int) =>
handleLtGt(c.typeTag.column(column.path), c.typeTag.value(v), op)
case _ => ???
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package me.mnedokushev.zio.apache.parquet.core.filter
import zio.schema.AccessorBuilder
import zio.schema.Schema

class ExprAccessorBuilder extends AccessorBuilder {
final class ExprAccessorBuilder(typeTags: Map[String, TypeTag[_]]) extends AccessorBuilder {

override type Lens[F, S, A] = Expr.Column[A]

Expand All @@ -12,9 +12,20 @@ class ExprAccessorBuilder extends AccessorBuilder {
override type Traversal[S, A] = Unit

override def makeLens[F, S, A](product: Schema.Record[S], term: Schema.Field[S, A]): Expr.Column[A] = {
implicit val typeTag: TypeTag[A] = TypeTag.deriveTypeTag(term.schema).get

Expr.Column(term.name.toString)
// val typeTag = TypeTag.deriveTypeTag(term.schema).get
// val typeTag = Derive.derive[TypeTag, A](TypeTagDeriver.default)(term.schema)
val typeTag = typeTags.get(term.name.toString).map(_.asInstanceOf[TypeTag[A]])

val r: Expr.Column[A] = typeTag match {
case Some(t: TypeTag.EqNotEq[A]) =>
Expr.ColumnEqNotEq[A](term.name.toString)(t)
case Some(t: TypeTag.LtGt[A]) =>
Expr.ColumnLtGt[A](term.name.toString)(t)
case _ =>
Expr.ColumnDummy[A](term.name.toString)(TypeTag.dummy[A])
}

r
}

override def makePrism[F, S, A](sum: Schema.Enum[S], term: Schema.Case[S, A]): Prism[F, S, A] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ trait Filter[Columns0] {

object Filter {

def columns[A](implicit schema: Schema.Record[A]): schema.Accessors[Lens, Prism, Traversal] =
def columns[A](implicit
schema: Schema.Record[A],
typeTag: TypeTag[A]
): schema.Accessors[Lens, Prism, Traversal] =
new Filter[schema.Accessors[Lens, Prism, Traversal]] {

val accessorBuilder = new ExprAccessorBuilder
val accessorBuilder = new ExprAccessorBuilder(typeTag.asInstanceOf[TypeTag.Record[A]].columns)

override type Columns = schema.Accessors[accessorBuilder.Lens, accessorBuilder.Prism, accessorBuilder.Traversal]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ object Operator {
object Binary {
final case class Eq[A: OperatorSupport.EqNotEq]() extends Binary[A]
final case class NotEq[A: OperatorSupport.EqNotEq]() extends Binary[A]
final case class LessThen[A: OperatorSupport.LessGreater]() extends Binary[A]
final case class LessEq[A: OperatorSupport.LessGreater]() extends Binary[A]
final case class GreaterThen[A: OperatorSupport.LessGreater]() extends Binary[A]
final case class GreaterEq[A: OperatorSupport.LessGreater]() extends Binary[A]
final case class LessThen[A: OperatorSupport.LtGt]() extends Binary[A]
final case class LessEq[A: OperatorSupport.LtGt]() extends Binary[A]
final case class GreaterThen[A: OperatorSupport.LtGt]() extends Binary[A]
final case class GreaterEq[A: OperatorSupport.LtGt]() extends Binary[A]

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
package me.mnedokushev.zio.apache.parquet.core.filter

sealed trait OperatorSupport[A] {
def typeTag: TypeTag[A]
}
import scala.annotation.implicitNotFound

sealed trait OperatorSupport[A]

object OperatorSupport {

abstract class LessGreater[A: TypeTag] extends OperatorSupport[A] {
override def typeTag: TypeTag[A] = implicitly[TypeTag[A]]
}
@implicitNotFound("You can't use this operator for the type ${A}")
abstract class LtGt[A: TypeTag] extends OperatorSupport[A]

object LessGreater {
implicit case object Byte extends LessGreater[Byte]
implicit case object Short extends LessGreater[Short]
implicit case object Int extends LessGreater[Int]
object LtGt {
implicit case object SByte extends LtGt[Byte]
implicit case object SShort extends LtGt[Short]
implicit case object SInt extends LtGt[Int]
}

abstract class EqNotEq[A: TypeTag] extends OperatorSupport[A] {
override def typeTag: TypeTag[A] = implicitly[TypeTag[A]]
}
@implicitNotFound("You can't use this operator for the type ${A}")
abstract class EqNotEq[A: TypeTag] extends OperatorSupport[A]

object EqNotEq {
implicit case object String extends EqNotEq[String]
implicit case object Boolean extends EqNotEq[Boolean]
implicit case object Byte extends EqNotEq[Byte]
implicit case object Short extends EqNotEq[Short]
implicit case object SString extends EqNotEq[String]
implicit case object SBoolean extends EqNotEq[Boolean]
implicit case object SByte extends EqNotEq[Byte]
implicit case object SShort extends EqNotEq[Short]
implicit case object SInt extends EqNotEq[Int]
}

}
Loading

0 comments on commit 7de487d

Please sign in to comment.