diff --git a/build.sbt b/build.sbt index a36101c8..27e57919 100644 --- a/build.sbt +++ b/build.sbt @@ -80,7 +80,7 @@ val catsVersion = "2.10.0" val dataScalaxyReaderVersion = "1.1.0" val dataScalaxyTestUtilVersion = "1.0.0" val dataScalaxyTransformerVersion = "1.2.0" -val googleCloudStorageVersion = "2.29.1" +val googleCloudStorageVersion = "2.30.1" val jsonPathVersion = "2.8.0" val jwtCoreVersion = "9.4.5" val monovoreDeclineVersion = "2.4.1" diff --git a/project/build.properties b/project/build.properties index 27430827..e8a1e246 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.6 +sbt.version=1.9.7 diff --git a/project/plugins.sbt b/project/plugins.sbt index f00c0173..c9c1ad07 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ -val assemblyVersion = "1.2.0" -val scalafixVersion = "0.11.0" -val scalafmtVersion = "2.4.6" -val wartremoverVersion = "3.1.3" +val assemblyVersion = "2.1.5" +val scalafixVersion = "0.11.1" +val scalafmtVersion = "2.5.2" +val wartremoverVersion = "3.1.5" val assemblyPluginDependency = "com.eed3si9n" % "sbt-assembly" % assemblyVersion val scalafixPluginDependency = "ch.epfl.scala" % "sbt-scalafix" % scalafixVersion diff --git a/site/docs/config_classes/checkpoint_config.md b/site/docs/config_classes/checkpoint_config.md index cd4cf2bf..a84fbaeb 100644 --- a/site/docs/config_classes/checkpoint_config.md +++ b/site/docs/config_classes/checkpoint_config.md @@ -6,11 +6,12 @@ trigger a checkpoint. The checkpoint configuration contains below config options to be provided by the user: -| Config Name | Mandatory | Default Value | Description | -|:------------|:---------:|:-------------:|:------------------------------------------------------------------| -| name | Yes | - | Unique name for your checkpoint | -| token | No | - | Token request configuration represented by `TokenConfig` class | -| data | Yes | - | Main data request configuration represented by `DataConfig` class | +| Config Name | Mandatory | Default Value | Description | +|:-------------|:---------:|:-------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| name | Yes | - | Unique name for your checkpoint | +| token | No | - | Token request configuration represented by `TokenConfig` class | +| data | Yes | - | Main data request configuration represented by `DataConfig` class | +| sparkConfigs | No | - | Map of spark configurations specific to the checkpoint.
If the same config is also present in `application.conf` file, then checkpoint specific config gets the priority. | User can provide checkpoint configuration file in HOCON format in the below format: diff --git a/site/docs/restonomer_context/checkpoints.md b/site/docs/restonomer_context/checkpoints.md index ad0af194..7dc9d88b 100644 --- a/site/docs/restonomer_context/checkpoints.md +++ b/site/docs/restonomer_context/checkpoints.md @@ -57,6 +57,10 @@ data = { } } } + +spark-configs = { + "spark.app.name" = "sample_postman_checkpoint" +} ``` You can place a checkpoint file either: diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala index 666e4ea2..bfeffcf9 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerContext.scala @@ -4,7 +4,8 @@ import com.clairvoyant.restonomer.config.{ConfigVariablesSubstitutor, GCSRestono import com.clairvoyant.restonomer.exception.RestonomerException import com.clairvoyant.restonomer.model.{ApplicationConfig, CheckpointConfig} import com.google.cloud.storage.{Storage, StorageOptions} -import zio.Config +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession import zio.config.magnolia.* import java.io.FileNotFoundException @@ -37,9 +38,9 @@ class RestonomerContext( val restonomerContextDirectoryPath: String, val configVariablesFromApplicationArgs: Map[String, String] ) { + // ---------- CONFIG VARIABLES ---------- // + private val CONFIG_VARIABLES_FILE_PATH = s"$restonomerContextDirectoryPath/uncommitted/config_variables.conf" - private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf" - private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints" private val configVariablesFromFile = { given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None @@ -51,6 +52,13 @@ class RestonomerContext( Map[String, String]() } + given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = + Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs)) + + // ---------- APPLICATION CONFIGS ---------- // + + private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf" + private val applicationConfig = { given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None @@ -63,8 +71,24 @@ class RestonomerContext( ) } - given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = - Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs)) + // ---------- RUN CHECKPOINTS ---------- // + + private val sparkConf = applicationConfig.sparkConfigs + .map { sparkConfigs => + sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) => + sparkConf.set(sparkConfig._1, sparkConfig._2) + } + } + .getOrElse(new SparkConf()) + + private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit = + checkpointConfigs.foreach { checkpointConfig => + println(s"Checkpoint Name -> ${checkpointConfig.name}\n") + runCheckpoint(checkpointConfig) + println("\n=====================================================\n") + } + + private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints" def runCheckpoint(checkpointFilePath: String): Unit = { val absoluteCheckpointFilePath = s"$CHECKPOINTS_CONFIG_DIRECTORY_PATH/$checkpointFilePath" @@ -105,14 +129,17 @@ class RestonomerContext( s"The config directory with the path: $CHECKPOINTS_CONFIG_DIRECTORY_PATH does not exists." ) - private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit = - checkpointConfigs.foreach { checkpointConfig => - println(s"Checkpoint Name -> ${checkpointConfig.name}\n") - runCheckpoint(checkpointConfig) - println("\n=====================================================\n") - } - - def runCheckpoint(checkpointConfig: CheckpointConfig): Unit = - RestonomerWorkflow(applicationConfig).run(checkpointConfig) + def runCheckpoint(checkpointConfig: CheckpointConfig): Unit = { + given sparkSession: SparkSession = + SparkSession + .builder() + .config { + checkpointConfig.sparkConfigs + .foldLeft(sparkConf) { case (sparkConf, sparkConfig) => sparkConf.set(sparkConfig._1, sparkConfig._2) } + } + .getOrCreate() + + RestonomerWorkflow.run(checkpointConfig) + } } diff --git a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala index 0f9ee99c..a503ad28 100644 --- a/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala +++ b/src/main/scala/com/clairvoyant/restonomer/app/RestonomerWorkflow.scala @@ -8,16 +8,15 @@ import com.clairvoyant.restonomer.http.* import com.clairvoyant.restonomer.model.* import com.clairvoyant.restonomer.sttpBackend import com.jayway.jsonpath.JsonPath -import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.Duration -class RestonomerWorkflow(using sparkSession: SparkSession) { +object RestonomerWorkflow { - def run(checkpointConfig: CheckpointConfig): Unit = { + def run(checkpointConfig: CheckpointConfig)(using sparkSession: SparkSession): Unit = { val tokenFunction = checkpointConfig.token .map { tokenConfig => @@ -79,25 +78,3 @@ class RestonomerWorkflow(using sparkSession: SparkSession) { } } - -private object RestonomerWorkflow { - - def apply(applicationConfig: ApplicationConfig): RestonomerWorkflow = { - given sparkSession: SparkSession = - SparkSession - .builder() - .config( - applicationConfig.sparkConfigs - .map { sparkConfigs => - sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) => - sparkConf.set(sparkConfig._1, sparkConfig._2) - } - } - .getOrElse(new SparkConf()) - ) - .getOrCreate() - - new RestonomerWorkflow() - } - -} diff --git a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala index 0e94d7b1..17bead7c 100644 --- a/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala +++ b/src/main/scala/com/clairvoyant/restonomer/model/CheckpointConfig.scala @@ -7,7 +7,8 @@ import zio.config.magnolia.* case class CheckpointConfig( name: String, token: Option[TokenConfig], - data: DataConfig + data: DataConfig, + sparkConfigs: Map[String, String] = Map.empty ) object CheckpointConfig {