Skip to content

Commit

Permalink
Merge pull request #167 from teamclairvoyant/REST-152
Browse files Browse the repository at this point in the history
[REST-152] Provision to supply spark configs at a checkpoint level
  • Loading branch information
rahulbhatia023 authored Dec 11, 2023
2 parents 45f0f27 + 04d3659 commit 39f5c3c
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 45 deletions.
11 changes: 6 additions & 5 deletions site/docs/config_classes/checkpoint_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ trigger a checkpoint.

The checkpoint configuration contains below config options to be provided by the user:

| Config Name | Mandatory | Default Value | Description |
|:------------|:---------:|:-------------:|:------------------------------------------------------------------|
| name | Yes | - | Unique name for your checkpoint |
| token | No | - | Token request configuration represented by `TokenConfig` class |
| data | Yes | - | Main data request configuration represented by `DataConfig` class |
| Config Name | Mandatory | Default Value | Description |
|:-------------|:---------:|:-------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| name | Yes | - | Unique name for your checkpoint |
| token | No | - | Token request configuration represented by `TokenConfig` class |
| data | Yes | - | Main data request configuration represented by `DataConfig` class |
| sparkConfigs | No | - | Map of spark configurations specific to the checkpoint. <br>If the same config is also present in `application.conf` file, then checkpoint specific config gets the priority. |

User can provide checkpoint configuration file in HOCON format in the below format:

Expand Down
4 changes: 4 additions & 0 deletions site/docs/restonomer_context/checkpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ data = {
}
}
}
spark-configs = {
"spark.app.name" = "sample_postman_checkpoint"
}
```

You can place a checkpoint file either:
Expand Down
1 change: 1 addition & 0 deletions src/it/resources/restonomer_context/application.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
spark-configs = {
"spark.master" = "local[*]"
"spark.app.name" = "Restonomer Execution"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import com.clairvoyant.restonomer.config.{ConfigVariablesSubstitutor, GCSRestono
import com.clairvoyant.restonomer.exception.RestonomerException
import com.clairvoyant.restonomer.model.{ApplicationConfig, CheckpointConfig}
import com.google.cloud.storage.{Storage, StorageOptions}
import zio.Config
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import zio.config.magnolia.*

import java.io.FileNotFoundException
Expand Down Expand Up @@ -37,9 +38,9 @@ class RestonomerContext(
val restonomerContextDirectoryPath: String,
val configVariablesFromApplicationArgs: Map[String, String]
) {
// ---------- CONFIG VARIABLES ---------- //

private val CONFIG_VARIABLES_FILE_PATH = s"$restonomerContextDirectoryPath/uncommitted/config_variables.conf"
private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf"
private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints"

private val configVariablesFromFile = {
given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None
Expand All @@ -51,6 +52,13 @@ class RestonomerContext(
Map[String, String]()
}

given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] =
Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs))

// ---------- APPLICATION CONFIGS ---------- //

private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf"

private val applicationConfig = {
given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None

Expand All @@ -63,8 +71,24 @@ class RestonomerContext(
)
}

given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] =
Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs))
// ---------- RUN CHECKPOINTS ---------- //

private val sparkConf = applicationConfig.sparkConfigs
.map { sparkConfigs =>
sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) =>
sparkConf.set(sparkConfig._1, sparkConfig._2)
}
}
.getOrElse(new SparkConf())

private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit =
checkpointConfigs.foreach { checkpointConfig =>
println(s"Checkpoint Name -> ${checkpointConfig.name}\n")
runCheckpoint(checkpointConfig)
println("\n=====================================================\n")
}

private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints"

def runCheckpoint(checkpointFilePath: String): Unit = {
val absoluteCheckpointFilePath = s"$CHECKPOINTS_CONFIG_DIRECTORY_PATH/$checkpointFilePath"
Expand Down Expand Up @@ -105,14 +129,17 @@ class RestonomerContext(
s"The config directory with the path: $CHECKPOINTS_CONFIG_DIRECTORY_PATH does not exists."
)

private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit =
checkpointConfigs.foreach { checkpointConfig =>
println(s"Checkpoint Name -> ${checkpointConfig.name}\n")
runCheckpoint(checkpointConfig)
println("\n=====================================================\n")
}

def runCheckpoint(checkpointConfig: CheckpointConfig): Unit =
RestonomerWorkflow(applicationConfig).run(checkpointConfig)
def runCheckpoint(checkpointConfig: CheckpointConfig): Unit = {
given sparkSession: SparkSession =
SparkSession
.builder()
.config {
checkpointConfig.sparkConfigs
.foldLeft(sparkConf) { case (sparkConf, sparkConfig) => sparkConf.set(sparkConfig._1, sparkConfig._2) }
}
.getOrCreate()

RestonomerWorkflow.run(checkpointConfig)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import com.clairvoyant.restonomer.http.*
import com.clairvoyant.restonomer.model.*
import com.clairvoyant.restonomer.sttpBackend
import com.jayway.jsonpath.JsonPath
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

class RestonomerWorkflow(using sparkSession: SparkSession) {
object RestonomerWorkflow {

def run(checkpointConfig: CheckpointConfig): Unit = {
def run(checkpointConfig: CheckpointConfig)(using sparkSession: SparkSession): Unit = {

val tokenFunction = checkpointConfig.token
.map { tokenConfig =>
Expand Down Expand Up @@ -79,25 +78,3 @@ class RestonomerWorkflow(using sparkSession: SparkSession) {
}

}

private object RestonomerWorkflow {

def apply(applicationConfig: ApplicationConfig): RestonomerWorkflow = {
given sparkSession: SparkSession =
SparkSession
.builder()
.config(
applicationConfig.sparkConfigs
.map { sparkConfigs =>
sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) =>
sparkConf.set(sparkConfig._1, sparkConfig._2)
}
}
.getOrElse(new SparkConf())
)
.getOrCreate()

new RestonomerWorkflow()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import zio.config.magnolia.*
case class CheckpointConfig(
name: String,
token: Option[TokenConfig],
data: DataConfig
data: DataConfig,
sparkConfigs: Map[String, String] = Map.empty
)

object CheckpointConfig {
Expand Down

0 comments on commit 39f5c3c

Please sign in to comment.