Skip to content

Commit

Permalink
Merge branch 'staging' into REST-97
Browse files Browse the repository at this point in the history
  • Loading branch information
harshal359 authored Dec 12, 2023
2 parents a85634e + 39f5c3c commit f659880
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 51 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ val catsVersion = "2.10.0"
val dataScalaxyReaderVersion = "1.1.0"
val dataScalaxyTestUtilVersion = "1.0.0"
val dataScalaxyTransformerVersion = "1.2.0"
val googleCloudStorageVersion = "2.29.1"
val googleCloudStorageVersion = "2.30.1"
val jsonPathVersion = "2.8.0"
val jwtCoreVersion = "9.4.5"
val monovoreDeclineVersion = "2.4.1"
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.6
sbt.version=1.9.7
8 changes: 4 additions & 4 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val assemblyVersion = "1.2.0"
val scalafixVersion = "0.11.0"
val scalafmtVersion = "2.4.6"
val wartremoverVersion = "3.1.3"
val assemblyVersion = "2.1.5"
val scalafixVersion = "0.11.1"
val scalafmtVersion = "2.5.2"
val wartremoverVersion = "3.1.5"

val assemblyPluginDependency = "com.eed3si9n" % "sbt-assembly" % assemblyVersion
val scalafixPluginDependency = "ch.epfl.scala" % "sbt-scalafix" % scalafixVersion
Expand Down
11 changes: 6 additions & 5 deletions site/docs/config_classes/checkpoint_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ trigger a checkpoint.

The checkpoint configuration contains below config options to be provided by the user:

| Config Name | Mandatory | Default Value | Description |
|:------------|:---------:|:-------------:|:------------------------------------------------------------------|
| name | Yes | - | Unique name for your checkpoint |
| token | No | - | Token request configuration represented by `TokenConfig` class |
| data | Yes | - | Main data request configuration represented by `DataConfig` class |
| Config Name | Mandatory | Default Value | Description |
|:-------------|:---------:|:-------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| name | Yes | - | Unique name for your checkpoint |
| token | No | - | Token request configuration represented by `TokenConfig` class |
| data | Yes | - | Main data request configuration represented by `DataConfig` class |
| sparkConfigs | No | - | Map of spark configurations specific to the checkpoint. <br>If the same config is also present in `application.conf` file, then checkpoint specific config gets the priority. |

User can provide checkpoint configuration file in HOCON format in the below format:

Expand Down
4 changes: 4 additions & 0 deletions site/docs/restonomer_context/checkpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ data = {
}
}
}
spark-configs = {
"spark.app.name" = "sample_postman_checkpoint"
}
```

You can place a checkpoint file either:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import com.clairvoyant.restonomer.config.{ConfigVariablesSubstitutor, GCSRestono
import com.clairvoyant.restonomer.exception.RestonomerException
import com.clairvoyant.restonomer.model.{ApplicationConfig, CheckpointConfig}
import com.google.cloud.storage.{Storage, StorageOptions}
import zio.Config
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import zio.config.magnolia.*

import java.io.FileNotFoundException
Expand Down Expand Up @@ -37,9 +38,9 @@ class RestonomerContext(
val restonomerContextDirectoryPath: String,
val configVariablesFromApplicationArgs: Map[String, String]
) {
// ---------- CONFIG VARIABLES ---------- //

private val CONFIG_VARIABLES_FILE_PATH = s"$restonomerContextDirectoryPath/uncommitted/config_variables.conf"
private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf"
private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints"

private val configVariablesFromFile = {
given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None
Expand All @@ -51,6 +52,13 @@ class RestonomerContext(
Map[String, String]()
}

given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] =
Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs))

// ---------- APPLICATION CONFIGS ---------- //

private val APPLICATION_CONFIG_FILE_PATH = s"$restonomerContextDirectoryPath/application.conf"

private val applicationConfig = {
given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] = None

Expand All @@ -63,8 +71,24 @@ class RestonomerContext(
)
}

given configVariablesSubstitutor: Option[ConfigVariablesSubstitutor] =
Some(ConfigVariablesSubstitutor(configVariablesFromFile, configVariablesFromApplicationArgs))
// ---------- RUN CHECKPOINTS ---------- //

private val sparkConf = applicationConfig.sparkConfigs
.map { sparkConfigs =>
sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) =>
sparkConf.set(sparkConfig._1, sparkConfig._2)
}
}
.getOrElse(new SparkConf())

private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit =
checkpointConfigs.foreach { checkpointConfig =>
println(s"Checkpoint Name -> ${checkpointConfig.name}\n")
runCheckpoint(checkpointConfig)
println("\n=====================================================\n")
}

private val CHECKPOINTS_CONFIG_DIRECTORY_PATH = s"$restonomerContextDirectoryPath/checkpoints"

def runCheckpoint(checkpointFilePath: String): Unit = {
val absoluteCheckpointFilePath = s"$CHECKPOINTS_CONFIG_DIRECTORY_PATH/$checkpointFilePath"
Expand Down Expand Up @@ -105,14 +129,17 @@ class RestonomerContext(
s"The config directory with the path: $CHECKPOINTS_CONFIG_DIRECTORY_PATH does not exists."
)

private def runCheckpoints(checkpointConfigs: List[CheckpointConfig]): Unit =
checkpointConfigs.foreach { checkpointConfig =>
println(s"Checkpoint Name -> ${checkpointConfig.name}\n")
runCheckpoint(checkpointConfig)
println("\n=====================================================\n")
}

def runCheckpoint(checkpointConfig: CheckpointConfig): Unit =
RestonomerWorkflow(applicationConfig).run(checkpointConfig)
def runCheckpoint(checkpointConfig: CheckpointConfig): Unit = {
given sparkSession: SparkSession =
SparkSession
.builder()
.config {
checkpointConfig.sparkConfigs
.foldLeft(sparkConf) { case (sparkConf, sparkConfig) => sparkConf.set(sparkConfig._1, sparkConfig._2) }
}
.getOrCreate()

RestonomerWorkflow.run(checkpointConfig)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import com.clairvoyant.restonomer.http.*
import com.clairvoyant.restonomer.model.*
import com.clairvoyant.restonomer.sttpBackend
import com.jayway.jsonpath.JsonPath
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

class RestonomerWorkflow(using sparkSession: SparkSession) {
object RestonomerWorkflow {

def run(checkpointConfig: CheckpointConfig): Unit = {
def run(checkpointConfig: CheckpointConfig)(using sparkSession: SparkSession): Unit = {

val tokenFunction = checkpointConfig.token
.map { tokenConfig =>
Expand Down Expand Up @@ -79,25 +78,3 @@ class RestonomerWorkflow(using sparkSession: SparkSession) {
}

}

private object RestonomerWorkflow {

def apply(applicationConfig: ApplicationConfig): RestonomerWorkflow = {
given sparkSession: SparkSession =
SparkSession
.builder()
.config(
applicationConfig.sparkConfigs
.map { sparkConfigs =>
sparkConfigs.foldLeft(new SparkConf()) { case (sparkConf, sparkConfig) =>
sparkConf.set(sparkConfig._1, sparkConfig._2)
}
}
.getOrElse(new SparkConf())
)
.getOrCreate()

new RestonomerWorkflow()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import zio.config.magnolia.*
case class CheckpointConfig(
name: String,
token: Option[TokenConfig],
data: DataConfig
data: DataConfig,
sparkConfigs: Map[String, String] = Map.empty
)

object CheckpointConfig {
Expand Down

0 comments on commit f659880

Please sign in to comment.