From 2471e1af05c8ff17374daeaa60911ed6a4491f71 Mon Sep 17 00:00:00 2001 From: GregoryNwosu Date: Tue, 2 Oct 2018 22:21:57 +0100 Subject: [PATCH 1/7] implement the FlatMap typeclass on Job. This allows Job to be used as a Reader and composesed --- .../main/scala/frameless/cats/implicits.scala | 21 +++++++++++++++++++ dataset/src/main/scala/frameless/Job.scala | 5 +++++ 2 files changed, 26 insertions(+) diff --git a/cats/src/main/scala/frameless/cats/implicits.scala b/cats/src/main/scala/frameless/cats/implicits.scala index f5de3576..2296fdaa 100644 --- a/cats/src/main/scala/frameless/cats/implicits.scala +++ b/cats/src/main/scala/frameless/cats/implicits.scala @@ -72,3 +72,24 @@ object outer { } } } + +object jobber { + implicit def flatMapJob: FlatMap[Job] = + new FlatMap[Job] { + + override def flatMap[A, B](j: Job[A])(f: A => Job[B]): Job[B] = j.flatMap(f) + + override def tailRecM[A, B](a: A)(f: A => Job[Either[A, B]]): Job[B] = { + val j = f(a) + for { + either_ab <- j + b <- either_ab match { + case Left(la) => tailRecM(la)(f) + case Right(_) => j.map(_.right.get) + } + } yield b + } + + override def map[A, B](fa: Job[A])(f: A => B): Job[B] = fa.map(f) + } +} diff --git a/dataset/src/main/scala/frameless/Job.scala b/dataset/src/main/scala/frameless/Job.scala index 40931b8b..3c4abbb5 100644 --- a/dataset/src/main/scala/frameless/Job.scala +++ b/dataset/src/main/scala/frameless/Job.scala @@ -2,6 +2,7 @@ package frameless import org.apache.spark.sql.SparkSession + sealed abstract class Job[A](implicit spark: SparkSession) { self => /** Runs a new Spark job. */ def run(): A @@ -30,10 +31,14 @@ sealed abstract class Job[A](implicit spark: SparkSession) { self => def flatMap[B](fn: A => Job[B]): Job[B] = new Job[B]()(spark) { def run(): B = fn(Job.this.run()).run() } + + } object Job { + + def apply[A](a: => A)(implicit spark: SparkSession): Job[A] = new Job[A] { def run(): A = a } From a3e56c693be11aaf1a9f0d92686885055bde1246 Mon Sep 17 00:00:00 2001 From: Gregory Nwosu Date: Tue, 2 Oct 2018 22:43:23 +0100 Subject: [PATCH 2/7] Update Job.scala --- dataset/src/main/scala/frameless/Job.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/dataset/src/main/scala/frameless/Job.scala b/dataset/src/main/scala/frameless/Job.scala index 3c4abbb5..0ac5fa8e 100644 --- a/dataset/src/main/scala/frameless/Job.scala +++ b/dataset/src/main/scala/frameless/Job.scala @@ -2,7 +2,6 @@ package frameless import org.apache.spark.sql.SparkSession - sealed abstract class Job[A](implicit spark: SparkSession) { self => /** Runs a new Spark job. */ def run(): A From 555bb3bfed66cbacd9cfdc94381c97a0f6045675 Mon Sep 17 00:00:00 2001 From: Gregory Nwosu Date: Tue, 2 Oct 2018 22:43:59 +0100 Subject: [PATCH 3/7] Update Job.scala --- dataset/src/main/scala/frameless/Job.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dataset/src/main/scala/frameless/Job.scala b/dataset/src/main/scala/frameless/Job.scala index 0ac5fa8e..40931b8b 100644 --- a/dataset/src/main/scala/frameless/Job.scala +++ b/dataset/src/main/scala/frameless/Job.scala @@ -30,14 +30,10 @@ sealed abstract class Job[A](implicit spark: SparkSession) { self => def flatMap[B](fn: A => Job[B]): Job[B] = new Job[B]()(spark) { def run(): B = fn(Job.this.run()).run() } - - } object Job { - - def apply[A](a: => A)(implicit spark: SparkSession): Job[A] = new Job[A] { def run(): A = a } From 949f23bfb7698e95fbd4d1b2c25f7b27b1348388 Mon Sep 17 00:00:00 2001 From: Greg Nwosu Date: Sun, 2 Dec 2018 02:37:53 +0000 Subject: [PATCH 4/7] added for monad integer monads --- build.sbt | 4 +++ .../main/scala/frameless/cats/implicits.scala | 11 ++++++++ .../test/scala/frameless/cats/LawTests.scala | 25 +++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 cats/src/test/scala/frameless/cats/LawTests.scala diff --git a/build.sbt b/build.sbt index 064f0ff3..13db1203 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,12 @@ val sparkVersion = "2.3.1" val catsCoreVersion = "1.4.0" +val catsLawVersion = "1.1.0" val catsEffectVersion = "1.0.0" val catsMtlVersion = "0.3.0" val scalatest = "3.0.3" val shapeless = "2.3.2" val scalacheck = "1.13.5" +val scalacheckShaplessVersion = "1.1.6" lazy val root = Project("frameless", file("." + "frameless")).in(file(".")) .aggregate(core, cats, dataset, ml, docs) @@ -32,6 +34,8 @@ lazy val cats = project "org.typelevel" %% "cats-effect" % catsEffectVersion, "org.typelevel" %% "cats-mtl-core" % catsMtlVersion, "org.typelevel" %% "alleycats-core" % catsCoreVersion, + "org.typelevel" %% "cats-testkit" % "1.1.0" % Test, //or `cats-testkit` if you are using ScalaTest + "com.github.alexarchambault" %% "scalacheck-shapeless_1.13" % scalacheckShaplessVersion % Test, "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided")) .dependsOn(dataset % "test->test;compile->compile") diff --git a/cats/src/main/scala/frameless/cats/implicits.scala b/cats/src/main/scala/frameless/cats/implicits.scala index 2296fdaa..5a76bb1e 100644 --- a/cats/src/main/scala/frameless/cats/implicits.scala +++ b/cats/src/main/scala/frameless/cats/implicits.scala @@ -8,6 +8,7 @@ import alleycats.Empty import scala.reflect.ClassTag import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession object implicits extends FramelessSyntax with SparkDelayInstances { implicit class rddOps[A: ClassTag](lhs: RDD[A]) { @@ -74,6 +75,7 @@ object outer { } object jobber { + implicit def flatMapJob: FlatMap[Job] = new FlatMap[Job] { @@ -92,4 +94,13 @@ object jobber { override def map[A, B](fa: Job[A])(f: A => B): Job[B] = fa.map(f) } + implicit def eqJob[A: Eq]: Eq[Job[A]] = Eq.fromUniversalEquals + implicit def monadJob(implicit spark: SparkSession): Monad[Job] = new Monad[Job]{ + + override def pure[A](x: A): Job[A] = Job(x) + + override def flatMap[A, B](fa: Job[A])(f: A => Job[B]): Job[B] = flatMapJob.flatMap(fa)(f) + + override def tailRecM[A, B](a: A)(f: A => Job[Either[A, B]]): Job[B] = flatMapJob.tailRecM(a)(f) + } } diff --git a/cats/src/test/scala/frameless/cats/LawTests.scala b/cats/src/test/scala/frameless/cats/LawTests.scala new file mode 100644 index 00000000..697d954d --- /dev/null +++ b/cats/src/test/scala/frameless/cats/LawTests.scala @@ -0,0 +1,25 @@ +package frameless.cats + + +import cats.laws.discipline.MonadTests +import cats.tests.CatsSuite +import frameless.Job +import org.apache.spark.sql.SparkSession +import jobber._ +import org.scalacheck.{Arbitrary, Gen} + +class LawTests extends CatsSuite with SparkTests { + implicit val spark: SparkSession = session + val genJob: Gen[Job[Int]] = for { + i: Int <- Arbitrary.arbitrary[Int] + }yield Job(i) + + val genJob2: Gen[Job[Function1[Int,Int]]] = for { + a: Int <- Arbitrary.arbitrary[Int] + }yield Job((b: Int) => a + b) + + implicit val arbJob: Arbitrary[Job[Int]] = Arbitrary(genJob) + implicit val arbJob2: Arbitrary[Job[Function1[Int,Int]]] = Arbitrary(genJob2) + + checkAll("Job.MonadLaws", MonadTests[Job].monad[Int,Int,Int]) +} \ No newline at end of file From 2f2fc4e8234c17a61ff42f75f3f4350457aa7318 Mon Sep 17 00:00:00 2001 From: Greg Nwosu Date: Sun, 2 Dec 2018 08:59:01 +0000 Subject: [PATCH 5/7] added for monad integer monads --- .../main/scala/frameless/cats/implicits.scala | 3 ++- .../test/scala/frameless/cats/LawTests.scala | 24 +++++++++---------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/cats/src/main/scala/frameless/cats/implicits.scala b/cats/src/main/scala/frameless/cats/implicits.scala index 5a76bb1e..64d68749 100644 --- a/cats/src/main/scala/frameless/cats/implicits.scala +++ b/cats/src/main/scala/frameless/cats/implicits.scala @@ -94,7 +94,8 @@ object jobber { override def map[A, B](fa: Job[A])(f: A => B): Job[B] = fa.map(f) } - implicit def eqJob[A: Eq]: Eq[Job[A]] = Eq.fromUniversalEquals + implicit def eqJob[A: Eq]: Eq[Job[A]] = Eq.allEqual + implicit def monadJob(implicit spark: SparkSession): Monad[Job] = new Monad[Job]{ override def pure[A](x: A): Job[A] = Job(x) diff --git a/cats/src/test/scala/frameless/cats/LawTests.scala b/cats/src/test/scala/frameless/cats/LawTests.scala index 697d954d..4fa01d86 100644 --- a/cats/src/test/scala/frameless/cats/LawTests.scala +++ b/cats/src/test/scala/frameless/cats/LawTests.scala @@ -3,23 +3,21 @@ package frameless.cats import cats.laws.discipline.MonadTests import cats.tests.CatsSuite -import frameless.Job -import org.apache.spark.sql.SparkSession +import frameless.{Job, TypedDatasetSuite} +import org.scalacheck.ScalacheckShapeless._ import jobber._ import org.scalacheck.{Arbitrary, Gen} -class LawTests extends CatsSuite with SparkTests { - implicit val spark: SparkSession = session - val genJob: Gen[Job[Int]] = for { - i: Int <- Arbitrary.arbitrary[Int] - }yield Job(i) - val genJob2: Gen[Job[Function1[Int,Int]]] = for { - a: Int <- Arbitrary.arbitrary[Int] - }yield Job((b: Int) => a + b) - implicit val arbJob: Arbitrary[Job[Int]] = Arbitrary(genJob) - implicit val arbJob2: Arbitrary[Job[Function1[Int,Int]]] = Arbitrary(genJob2) +class LawTests extends TypedDatasetSuite with CatsSuite { + + + implicit def genJob[A: Arbitrary]: Gen[Job[A]] = for { + a <- Arbitrary.arbitrary[A] + } yield Job(a) + + implicit def arbJob[A: Arbitrary]: Arbitrary[Job[A]] = Arbitrary[Job[A]](genJob) + checkAll("Job.MonadLaws", MonadTests[Job].monad[Int,String,Map[String,Int]]) - checkAll("Job.MonadLaws", MonadTests[Job].monad[Int,Int,Int]) } \ No newline at end of file From 7b4149ec8c3884d272a60596bfca95695435b8b6 Mon Sep 17 00:00:00 2001 From: Greg Nwosu Date: Sun, 2 Dec 2018 16:18:52 +0000 Subject: [PATCH 6/7] added for functor tests integer monads --- build.sbt | 5 ++++- cats/src/test/scala/frameless/cats/LawTests.scala | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 13db1203..95b34810 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,8 @@ val scalatest = "3.0.3" val shapeless = "2.3.2" val scalacheck = "1.13.5" val scalacheckShaplessVersion = "1.1.6" +val magnoliaVersion = "0.6.1" +val catsTestkitVersion = "1.1.0" lazy val root = Project("frameless", file("." + "frameless")).in(file(".")) .aggregate(core, cats, dataset, ml, docs) @@ -34,7 +36,8 @@ lazy val cats = project "org.typelevel" %% "cats-effect" % catsEffectVersion, "org.typelevel" %% "cats-mtl-core" % catsMtlVersion, "org.typelevel" %% "alleycats-core" % catsCoreVersion, - "org.typelevel" %% "cats-testkit" % "1.1.0" % Test, //or `cats-testkit` if you are using ScalaTest + "com.propensive" %% "magnolia" % magnoliaVersion % Test, + "org.typelevel" %% "cats-testkit" % catsTestkitVersion % Test, //or `cats-testkit` if you are using ScalaTest "com.github.alexarchambault" %% "scalacheck-shapeless_1.13" % scalacheckShaplessVersion % Test, "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided")) diff --git a/cats/src/test/scala/frameless/cats/LawTests.scala b/cats/src/test/scala/frameless/cats/LawTests.scala index 4fa01d86..808c6ded 100644 --- a/cats/src/test/scala/frameless/cats/LawTests.scala +++ b/cats/src/test/scala/frameless/cats/LawTests.scala @@ -1,7 +1,7 @@ package frameless.cats -import cats.laws.discipline.MonadTests +import cats.laws.discipline.{FunctorTests, MonadTests} import cats.tests.CatsSuite import frameless.{Job, TypedDatasetSuite} import org.scalacheck.ScalacheckShapeless._ @@ -19,5 +19,5 @@ class LawTests extends TypedDatasetSuite with CatsSuite { implicit def arbJob[A: Arbitrary]: Arbitrary[Job[A]] = Arbitrary[Job[A]](genJob) checkAll("Job.MonadLaws", MonadTests[Job].monad[Int,String,Map[String,Int]]) - + checkAll("Job.FunctorLaws", FunctorTests[Job].functor[Int,String,Map[String,Int]]) } \ No newline at end of file From e0b1752494ef5f47c350a62ab34be40600980a82 Mon Sep 17 00:00:00 2001 From: Greg Nwosu Date: Sun, 2 Dec 2018 21:08:24 +0000 Subject: [PATCH 7/7] better equals --- cats/src/main/scala/frameless/cats/implicits.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cats/src/main/scala/frameless/cats/implicits.scala b/cats/src/main/scala/frameless/cats/implicits.scala index 64d68749..d66a9fd9 100644 --- a/cats/src/main/scala/frameless/cats/implicits.scala +++ b/cats/src/main/scala/frameless/cats/implicits.scala @@ -94,7 +94,7 @@ object jobber { override def map[A, B](fa: Job[A])(f: A => B): Job[B] = fa.map(f) } - implicit def eqJob[A: Eq]: Eq[Job[A]] = Eq.allEqual + implicit def eqJob[A: Eq]: Eq[Job[A]] = Eq.fromUniversalEquals implicit def monadJob(implicit spark: SparkSession): Monad[Job] = new Monad[Job]{