From 4b7c4221ddb2bf9951cded6c575fcc7351342dc6 Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Fri, 1 Dec 2023 17:08:12 -0500 Subject: [PATCH 01/11] Adding support for configuring spark configs at checkpoint level --- .../restonomer/app/RestonomerContext.scala | 60 ++++++++++++++----- .../restonomer/app/RestonomerWorkflow.scala | 27 +-------- .../restonomer/model/CheckpointConfig.scala | 3 +- 3 files changed, 50 insertions(+), 40 deletions(-) diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala index 666e4ea2..69e10209 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala @@ -4,7 +4,8 @@ import com.clairvoyant.restonomer.config.{ConfigVariablesSubstitutor, GCSRestono import com.clairvoyant.restonomer.exception.RestonomerException import com.clairvoyant.restonomer.model.{ApplicationConfig, CheckpointConfig} import com.google.cloud.storage.{Storage, StorageOptions} -import zio.Config +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession import zio.config.magnolia.* import java.io.FileNotFoundException @@ -37,9 +38,9 @@ class RestonomerContext( val restonomerContextDirectoryPath: String, val configVariablesFromApplicationArgs: Map[String, String] ) { + // ---------- CONFIG VARIABLES ---------- // + private val CONFIG_VARIABLES_FILE_PATH = s"$restonomerContextDirectoryPath/uncommitted/config_variables.conf" - private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf" - private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints" private val configVariablesFromFile = { given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None @@ -51,6 +52,13 @@ class RestonomerContext( Map[String, String]() } + given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = + Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs)) + + // ---------- APPLICATION CONFIGS ---------- // + + private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf" + private val applicationConfig = { given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None @@ -63,8 +71,24 @@ class RestonomerContext( ) } - given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = - Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs)) + // ---------- RUN CHECKPOINTS ---------- // + + private val sparkConf = applicationConfig.sparkConfigs + .map { sparkConfigs => + sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) => + sparkConf.set(sparkConfig._1, sparkConfig._2) + } + } + .getOrElse(new SparkConf()) + + private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit = + checkpointConfigs.foreach { checkpointConfig => + println(s"Checkpoint Name -> ${checkpointConfig.name}\n") + runCheckpoint(checkpointConfig) + println("\n=====================================================\n") + } + + private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints" def runCheckpoint(checkpointFilePath: String): Unit = { val absoluteCheckpointFilePath = s"$CHECKPOINTS_CONFIG_DIRECTORY_PATH/$checkpointFilePath" @@ -105,14 +129,22 @@ class RestonomerContext( s"The config directory with the path: $CHECKPOINTS_CONFIG_DIRECTORY_PATH does not exists." ) - private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit = - checkpointConfigs.foreach { checkpointConfig => - println(s"Checkpoint Name -> ${checkpointConfig.name}\n") - runCheckpoint(checkpointConfig) - println("\n=====================================================\n") - } - - def runCheckpoint(checkpointConfig: CheckpointConfig): Unit = - RestonomerWorkflow(applicationConfig).run(checkpointConfig) + def runCheckpoint(checkpointConfig: CheckpointConfig): Unit = { + given sparkSession: SparkSession = + SparkSession + .builder() + .config { + checkpointConfig.runtimeConfigs + .map { runtimeConfig => + runtimeConfig.foldLeft(sparkConf) { case (sparkConf, runtimeConfig) => + sparkConf.set(runtimeConfig._1, runtimeConfig._2) + } + } + .getOrElse(sparkConf) + } + .getOrCreate() + + RestonomerWorkflow.run(checkpointConfig) + } } diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala index 0f9ee99c..a503ad28 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala @@ -8,16 +8,15 @@ import com.clairvoyant.restonomer.http.* import com.clairvoyant.restonomer.model.* import com.clairvoyant.restonomer.sttpBackend import com.jayway.jsonpath.JsonPath -import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration -class RestonomerWorkflow(using sparkSession: SparkSession) { +object RestonomerWorkflow { - def run(checkpointConfig: CheckpointConfig): Unit = { + def run(checkpointConfig: CheckpointConfig)(using sparkSession: SparkSession): Unit = { val tokenFunction = checkpointConfig.token .map { tokenConfig => @@ -79,25 +78,3 @@ class RestonomerWorkflow(using sparkSession: SparkSession) { } } - -private object RestonomerWorkflow { - - def apply(applicationConfig: ApplicationConfig): RestonomerWorkflow = { - given sparkSession: SparkSession = - SparkSession - .builder() - .config( - applicationConfig.sparkConfigs - .map { sparkConfigs => - sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) => - sparkConf.set(sparkConfig._1, sparkConfig._2) - } - } - .getOrElse(new SparkConf()) - ) - .getOrCreate() - - new RestonomerWorkflow() - } - -} diff --git a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala index 0e94d7b1..9156cd91 100644 --- a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala +++ b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala @@ -7,7 +7,8 @@ import zio.config.magnolia.* case class CheckpointConfig( name: String, token: Option[TokenConfig], - data: DataConfig + data: DataConfig, + runtimeConfigs: Option[Map[String, String]] ) object CheckpointConfig { From 8bac431878a30282fbd8f787bbc4d6e898200da2 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Fri, 1 Dec 2023 22:47:16 +0000 Subject: [PATCH 02/11] Update sbt-scalafix to 0.11.1 in staging --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index f00c0173..68a5c7ee 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ val assemblyVersion = "1.2.0" -val scalafixVersion = "0.11.0" +val scalafixVersion = "0.11.1" val scalafmtVersion = "2.4.6" val wartremoverVersion = "3.1.3" From f5387eb7f17dbf6159638b89e2351b6c8c684dbf Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Fri, 1 Dec 2023 22:47:29 +0000 Subject: [PATCH 03/11] Update sbt-assembly to 2.1.5 in staging --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index f00c0173..918fcff8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -val assemblyVersion = "1.2.0" +val assemblyVersion = "2.1.5" val scalafixVersion = "0.11.0" val scalafmtVersion = "2.4.6" val wartremoverVersion = "3.1.3" From c992f6f77c259894b39fe0ad4886870e5fc59d69 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Fri, 1 Dec 2023 22:47:36 +0000 Subject: [PATCH 04/11] Update sbt to 1.9.7 in staging --- project/build.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index 27430827..e8a1e246 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.6 +sbt.version=1.9.7 From 31797b8f4d45058d5317ae3851ceb11496767107 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Fri, 1 Dec 2023 22:47:42 +0000 Subject: [PATCH 05/11] Update sbt-scalafmt to 2.5.2 in staging --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index f00c0173..23b5ba64 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ val assemblyVersion = "1.2.0" val scalafixVersion = "0.11.0" -val scalafmtVersion = "2.4.6" +val scalafmtVersion = "2.5.2" val wartremoverVersion = "3.1.3" val assemblyPluginDependency = "com.eed3si9n" % "sbt-assembly" % assemblyVersion From 668832303a7bd7d1c66337bc92b40753e05adb93 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Fri, 1 Dec 2023 22:47:48 +0000 Subject: [PATCH 06/11] Update sbt-wartremover, wartremover to 3.1.5 in staging --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index f00c0173..8ceb98e9 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ val assemblyVersion = "1.2.0" val scalafixVersion = "0.11.0" val scalafmtVersion = "2.4.6" -val wartremoverVersion = "3.1.3" +val wartremoverVersion = "3.1.5" val assemblyPluginDependency = "com.eed3si9n" % "sbt-assembly" % assemblyVersion val scalafixPluginDependency = "ch.epfl.scala" % "sbt-scalafix" % scalafixVersion From 44244e6a5ae3c180d931e73abfff547a76d6f958 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Tue, 5 Dec 2023 21:13:10 +0000 Subject: [PATCH 07/11] Update google-cloud-storage to 2.30.0 in staging --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index a36101c8..214474a8 100644 --- a/build.sbt +++ b/build.sbt @@ -80,7 +80,7 @@ val catsVersion = "2.10.0" val dataScalaxyReaderVersion = "1.1.0" val dataScalaxyTestUtilVersion = "1.0.0" val dataScalaxyTransformerVersion = "1.2.0" -val googleCloudStorageVersion = "2.29.1" +val googleCloudStorageVersion = "2.30.0" val jsonPathVersion = "2.8.0" val jwtCoreVersion = "9.4.5" val monovoreDeclineVersion = "2.4.1" From c282d3ad5de95d5328d90acb5ba0734860b805b2 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Thu, 7 Dec 2023 20:26:23 +0000 Subject: [PATCH 08/11] Update google-cloud-storage to 2.30.1 in staging --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 214474a8..27e57919 100644 --- a/build.sbt +++ b/build.sbt @@ -80,7 +80,7 @@ val catsVersion = "2.10.0" val dataScalaxyReaderVersion = "1.1.0" val dataScalaxyTestUtilVersion = "1.0.0" val dataScalaxyTransformerVersion = "1.2.0" -val googleCloudStorageVersion = "2.30.0" +val googleCloudStorageVersion = "2.30.1" val jsonPathVersion = "2.8.0" val jwtCoreVersion = "9.4.5" val monovoreDeclineVersion = "2.4.1" From bc6a1694ca69d8721196ff8734f1c2d67cd70452 Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Mon, 11 Dec 2023 12:43:18 -0500 Subject: [PATCH 09/11] Adding support for configuring spark configs at checkpoint level --- src/it/resources/restonomer_context/application.conf | 1 + .../checkpoint_api_key_authentication_query_param.conf | 4 ++++ .../APIKeyAuthenticationIntegrationTest.scala | 2 ++ .../clairvoyant/restonomer/app/RestonomerContext.scala | 8 ++++---- .../clairvoyant/restonomer/model/CheckpointConfig.scala | 2 +- 5 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/it/resources/restonomer_context/application.conf b/src/it/resources/restonomer_context/application.conf index 1624c220..accb7438 100644 --- a/src/it/resources/restonomer_context/application.conf +++ b/src/it/resources/restonomer_context/application.conf @@ -1,3 +1,4 @@ spark-configs = { "spark.master" = "local[*]" + "spark.app.name" = "Restonomer Execution" } diff --git a/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf b/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf index 2a7a4a64..a3bdf904 100644 --- a/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf +++ b/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf @@ -29,3 +29,7 @@ data = { } } } + +spark-configs = { + "spark.app.name" = "checkpoint_api_key_authentication_query_param" +} diff --git a/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala b/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala index 379cb722..2049a91d 100644 --- a/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala +++ b/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala @@ -8,6 +8,8 @@ class APIKeyAuthenticationIntegrationTest extends IntegrationTestDependencies wi it should "authenticate request with api key authentication using query string" in { runCheckpoint(checkpointFileName = "checkpoint_api_key_authentication_query_param.conf") + + sparkSession.sparkContext.getConf.get("spark.app.name") should be("checkpoint_api_key_authentication_query_param") outputDF should matchExpectedDataFrame("expected_api_key_authentication_query_param.json") } diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala index 69e10209..ecb625c0 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala @@ -134,10 +134,10 @@ class RestonomerContext( SparkSession .builder() .config { - checkpointConfig.runtimeConfigs - .map { runtimeConfig => - runtimeConfig.foldLeft(sparkConf) { case (sparkConf, runtimeConfig) => - sparkConf.set(runtimeConfig._1, runtimeConfig._2) + checkpointConfig.sparkConfigs + .map { sparkConfig => + sparkConfig.foldLeft(sparkConf) { case (sparkConf, sparkConfig) => + sparkConf.set(sparkConfig._1, sparkConfig._2) } } .getOrElse(sparkConf) diff --git a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala index 9156cd91..02d4b8ae 100644 --- a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala +++ b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala @@ -8,7 +8,7 @@ case class CheckpointConfig( name: String, token: Option[TokenConfig], data: DataConfig, - runtimeConfigs: Option[Map[String, String]] + sparkConfigs: Option[Map[String, String]] ) object CheckpointConfig { From 14eccea7552ad6b4c8909a8dced997c3e8392659 Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Mon, 11 Dec 2023 12:52:48 -0500 Subject: [PATCH 10/11] Adding doc regarding support for configuring spark configs at checkpoint level --- site/docs/config_classes/checkpoint_config.md | 11 ++++++----- site/docs/restonomer_context/checkpoints.md | 4 ++++ .../restonomer/app/RestonomerContext.scala | 7 +------ .../restonomer/model/CheckpointConfig.scala | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/site/docs/config_classes/checkpoint_config.md b/site/docs/config_classes/checkpoint_config.md index cd4cf2bf..a84fbaeb 100644 --- a/site/docs/config_classes/checkpoint_config.md +++ b/site/docs/config_classes/checkpoint_config.md @@ -6,11 +6,12 @@ trigger a checkpoint. The checkpoint configuration contains below config options to be provided by the user: -| Config Name | Mandatory | Default Value | Description | -|:------------|:---------:|:-------------:|:------------------------------------------------------------------| -| name | Yes | - | Unique name for your checkpoint | -| token | No | - | Token request configuration represented by `TokenConfig` class | -| data | Yes | - | Main data request configuration represented by `DataConfig` class | +| Config Name | Mandatory | Default Value | Description | +|:-------------|:---------:|:-------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| name | Yes | - | Unique name for your checkpoint | +| token | No | - | Token request configuration represented by `TokenConfig` class | +| data | Yes | - | Main data request configuration represented by `DataConfig` class | +| sparkConfigs | No | - | Map of spark configurations specific to the checkpoint.
If the same config is also present in `application.conf` file, then checkpoint specific config gets the priority. | User can provide checkpoint configuration file in HOCON format in the below format: diff --git a/site/docs/restonomer_context/checkpoints.md b/site/docs/restonomer_context/checkpoints.md index ad0af194..7dc9d88b 100644 --- a/site/docs/restonomer_context/checkpoints.md +++ b/site/docs/restonomer_context/checkpoints.md @@ -57,6 +57,10 @@ data = { } } } + +spark-configs = { + "spark.app.name" = "sample_postman_checkpoint" +} ``` You can place a checkpoint file either: diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala index ecb625c0..bfeffcf9 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala @@ -135,12 +135,7 @@ class RestonomerContext( .builder() .config { checkpointConfig.sparkConfigs - .map { sparkConfig => - sparkConfig.foldLeft(sparkConf) { case (sparkConf, sparkConfig) => - sparkConf.set(sparkConfig._1, sparkConfig._2) - } - } - .getOrElse(sparkConf) + .foldLeft(sparkConf) { case (sparkConf, sparkConfig) => sparkConf.set(sparkConfig._1, sparkConfig._2) } } .getOrCreate() diff --git a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala index 02d4b8ae..17bead7c 100644 --- a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala +++ b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala @@ -8,7 +8,7 @@ case class CheckpointConfig( name: String, token: Option[TokenConfig], data: DataConfig, - sparkConfigs: Option[Map[String, String]] + sparkConfigs: Map[String, String] = Map.empty ) object CheckpointConfig { From 04d3659d9c696238e3f7787e98c4b2c85dbd82c1 Mon Sep 17 00:00:00 2001 From: rahulbhatia023 Date: Mon, 11 Dec 2023 13:08:24 -0500 Subject: [PATCH 11/11] Fixed integration test --- .../checkpoint_api_key_authentication_query_param.conf | 4 ---- .../authentication/APIKeyAuthenticationIntegrationTest.scala | 2 -- 2 files changed, 6 deletions(-) diff --git a/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf b/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf index a3bdf904..2a7a4a64 100644 --- a/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf +++ b/src/it/resources/restonomer_context/checkpoints/authentication/api_key_authentication/checkpoint_api_key_authentication_query_param.conf @@ -29,7 +29,3 @@ data = { } } } - -spark-configs = { - "spark.app.name" = "checkpoint_api_key_authentication_query_param" -} diff --git a/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala b/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala index 2049a91d..379cb722 100644 --- a/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala +++ b/src/it/scala/com/clairvoyant/restonomer/authentication/APIKeyAuthenticationIntegrationTest.scala @@ -8,8 +8,6 @@ class APIKeyAuthenticationIntegrationTest extends IntegrationTestDependencies wi it should "authenticate request with api key authentication using query string" in { runCheckpoint(checkpointFileName = "checkpoint_api_key_authentication_query_param.conf") - - sparkSession.sparkContext.getConf.get("spark.app.name") should be("checkpoint_api_key_authentication_query_param") outputDF should matchExpectedDataFrame("expected_api_key_authentication_query_param.json") }