Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
wip(backend): add execution time property for JobOutputResult
Browse files Browse the repository at this point in the history
  • Loading branch information
evoxmusic committed Jan 6, 2024
1 parent bdb54de commit 30aaa99
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 8 deletions.
12 changes: 9 additions & 3 deletions backend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ pub struct ExecValidateScriptRequest {
payload: serde_json::Value,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct JobOutputResult {
pub one_liner_command: String,
pub output: serde_json::Value,
pub execution_time_in_millis: u128,
}

fn find_catalog_by_slug<'a>(catalogs: &'a Vec<CatalogYamlConfig>, catalog_slug: &str) -> Option<&'a CatalogYamlConfig> {
Expand All @@ -43,15 +44,17 @@ fn find_catalog_service_by_slug<'a>(catalog: &'a CatalogYamlConfig, service_slug
}

/// Extract the job output from the environment variable TORII_JSON_OUTPUT and reset it to an empty JSON object
fn consume_job_output_result_from_json_output_env(service_slug: &str) -> JobOutputResult {
fn consume_job_output_result_from_json_output_env(service_slug: &str, execution_time: u128) -> JobOutputResult {
let job_output_result = match std::env::var("TORII_JSON_OUTPUT") {
Ok(json_output) => JobOutputResult {
one_liner_command: service_slug.to_string(),
output: serde_json::from_str(json_output.as_str()).unwrap_or(serde_json::json!({})),
execution_time_in_millis: execution_time,
},
Err(_) => JobOutputResult {
one_liner_command: service_slug.to_string(),
output: serde_json::json!({}),
execution_time_in_millis: execution_time,
}
};

Expand Down Expand Up @@ -118,6 +121,9 @@ async fn execute_command<T>(

cmd.arg(json_payload);

// start execution timer
let start = std::time::Instant::now();

let mut child = match cmd.spawn() {
Ok(child) => child,
Err(err) => return Err(format!("Validate script '{}' failed: {}", &cmd_one_line, err))
Expand All @@ -144,7 +150,7 @@ async fn execute_command<T>(

// TODO parse output.stdout and output.stderr and forward to the frontend

Ok(consume_job_output_result_from_json_output_env(cmd_one_line.as_str()))
Ok(consume_job_output_result_from_json_output_env(cmd_one_line.as_str(), start.elapsed().as_millis()))
}

fn get_catalog_and_service<'a>(
Expand Down
14 changes: 9 additions & 5 deletions backend/src/catalog/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Receiver;
use tracing::error;

use crate::catalog::{execute_command, ExecValidateScriptRequest};
use crate::catalog::{execute_command, ExecValidateScriptRequest, JobOutputResult};
use crate::database::{Status, update_catalog_execution_status};
use crate::yaml_config::{CatalogServicePostValidateYamlConfig, CatalogServiceYamlConfig};

Expand Down Expand Up @@ -39,7 +39,8 @@ pub struct TasksPayload {
pub struct TaskPayload {
status: Status,
message: Option<String>,
post_validate: CatalogServicePostValidateYamlConfig,
post_validate_input: CatalogServicePostValidateYamlConfig,
post_validate_output: Option<JobOutputResult>,
}

pub async fn background_worker(mut rx: Receiver<BackgroundWorkerTask>, pg_pool: Arc<Pool<Postgres>>) {
Expand All @@ -56,7 +57,6 @@ pub async fn background_worker(mut rx: Receiver<BackgroundWorkerTask>, pg_pool:
continue;
}

// TODO pass output from post_validate script to next post_validate script
let mut tasks_payload = TasksPayload { tasks: vec![] };

for cmd in task.catalog_service_yaml_config.post_validate.as_ref().unwrap_or(&vec![]) {
Expand All @@ -66,7 +66,8 @@ pub async fn background_worker(mut rx: Receiver<BackgroundWorkerTask>, pg_pool:
let task_payload = TaskPayload {
status: Status::Failure,
message: Some(err.to_string()),
post_validate: cmd.clone(),
post_validate_input: cmd.clone(),
post_validate_output: None,
};

let _ = tasks_payload.tasks.push(task_payload);
Expand All @@ -82,10 +83,13 @@ pub async fn background_worker(mut rx: Receiver<BackgroundWorkerTask>, pg_pool:
}
};

// TODO pass output to next command

let task_payload = TaskPayload {
status: Status::Success,
message: None,
post_validate: cmd.clone(),
post_validate_input: cmd.clone(),
post_validate_output: Some(job_output_result),
};

let _ = tasks_payload.tasks.push(task_payload);
Expand Down

0 comments on commit 30aaa99

Please sign in to comment.