From b773de0c1cdcf99f046f433d9812ded876b3977b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romaric=20Philog=C3=A8ne?= Date: Sun, 31 Dec 2023 14:53:30 +0100 Subject: [PATCH] wip(backend): add async background worker --- backend/examples/config.yaml | 12 ++++---- backend/src/catalog/controllers.rs | 29 +++++++----------- backend/src/catalog/mod.rs | 1 + backend/src/catalog/services.rs | 48 ++++++++++++++++++++++++++++++ backend/src/main.rs | 8 +++++ backend/src/yaml_config.rs | 6 ++++ 6 files changed, 79 insertions(+), 25 deletions(-) create mode 100644 backend/src/catalog/services.rs diff --git a/backend/examples/config.yaml b/backend/examples/config.yaml index f3baf1c..d153da7 100644 --- a/backend/examples/config.yaml +++ b/backend/examples/config.yaml @@ -38,16 +38,16 @@ catalogs: - command: - bash - examples/dumb_script_ok.sh # AND then this one - post-validation: + post_validate: - command: - python - examples/validation_script_ok.py # executed first timeout: 60 # timeout in seconds - output-model: string (optional) # model name + output_model: string (optional) # model name - command: - bash - examples/dumb_script_ok.sh # AND then this one - output-model: string (optional) # model name + output_model: string (optional) # model name - slug: stop-testing-environment name: Stop Testing Environment description: stop a testing environment @@ -68,16 +68,16 @@ catalogs: - command: - bash - examples/dumb_script_ok.sh # AND then this one - post-validation: + post_validate: - command: - python - examples/validation_script_ok.py # executed first timeout: 60 # timeout in seconds - output-model: string (optional) # model name + output_model: string (optional) # model name - command: - bash - examples/dumb_script_ok.sh # AND then this one - output-model: string (optional) # model name + output_model: string (optional) # model name models: - name: string description: string (optional) diff --git a/backend/src/catalog/controllers.rs b/backend/src/catalog/controllers.rs index 07c5e3a..981aee3 100644 --- a/backend/src/catalog/controllers.rs +++ b/backend/src/catalog/controllers.rs @@ -3,8 +3,11 @@ use std::sync::Arc; use axum::{debug_handler, Extension, Json}; use axum::extract::Path; use axum::http::StatusCode; +use tokio::sync::mpsc::Sender; +use tracing::error; use crate::catalog::{check_json_payload_against_yaml_config_fields, execute_command, ExecValidateScriptRequest, find_catalog_by_slug, get_catalog_and_service, JobResponse, JobResults, ResultsResponse}; +use crate::catalog::services::BackgroundWorkerTask; use crate::yaml_config::{CatalogServiceYamlConfig, CatalogYamlConfig, ExternalCommand, YamlConfig}; #[debug_handler] @@ -77,6 +80,7 @@ pub async fn exec_catalog_service_validate_scripts( #[debug_handler] pub async fn exec_catalog_service_post_validate_scripts( Extension(yaml_config): Extension>, + Extension(tx): Extension>, Path((catalog_slug, service_slug)): Path<(String, String)>, Json(req): Json, ) -> (StatusCode, Json) { @@ -109,28 +113,15 @@ pub async fn exec_catalog_service_post_validate_scripts( }; } - - let service = service.clone(); // execute post validate scripts - let _ = tokio::spawn(async move { - let mut job_results = JobResults { - user_fields_input: req.payload.clone(), - results: vec![], - }; - - for cmd in service.post_validate.as_ref().unwrap_or(&vec![]) { - let job_output_result = match execute_command(cmd, req.payload.to_string().as_str()).await { - Ok(job_output_result) => job_output_result, - Err(err) => todo!("{}", err) // TODO persist error in database - }; - - let _ = job_results.results.push(job_output_result); - } - - // TODO persist results in database + let _ = tx.send(BackgroundWorkerTask::new( + catalog_slug.clone(), + service.clone(), + req, + )).await.unwrap_or_else(|err| { + error!("failed to send task to background worker: {}", err); }); - (StatusCode::NO_CONTENT, Json(JobResponse { message: Some("workflow executed".to_string()), results: None })) } diff --git a/backend/src/catalog/mod.rs b/backend/src/catalog/mod.rs index cfd631c..d9ba2dc 100644 --- a/backend/src/catalog/mod.rs +++ b/backend/src/catalog/mod.rs @@ -10,6 +10,7 @@ use tracing::debug; use crate::yaml_config::{CatalogServiceYamlConfig, CatalogYamlConfig, ExternalCommand, YamlConfig}; pub mod controllers; +pub mod services; #[derive(Serialize, Deserialize)] pub struct ResultsResponse { diff --git a/backend/src/catalog/services.rs b/backend/src/catalog/services.rs new file mode 100644 index 0000000..6d201d8 --- /dev/null +++ b/backend/src/catalog/services.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Receiver; + +use crate::catalog::{execute_command, ExecValidateScriptRequest, JobResults}; +use crate::yaml_config::CatalogServiceYamlConfig; + +#[derive(Serialize, Deserialize)] +pub struct BackgroundWorkerTask { + pub catalog_slug: String, + pub catalog_service_yaml_config: CatalogServiceYamlConfig, + pub req: ExecValidateScriptRequest, +} + +impl BackgroundWorkerTask { + pub fn new( + catalog_slug: String, + catalog_service_yaml_config: CatalogServiceYamlConfig, + req: ExecValidateScriptRequest, + ) -> Self { + Self { + catalog_slug, + catalog_service_yaml_config, + req, + } + } +} + +pub async fn background_worker(mut rx: Receiver) { + while let Some(task) = rx.recv().await { + let mut job_results = JobResults { + user_fields_input: task.req.payload.clone(), + results: vec![], + }; + + for cmd in task.catalog_service_yaml_config.post_validate.as_ref().unwrap_or(&vec![]) { + let job_output_result = match execute_command(cmd, task.req.payload.to_string().as_str()).await { + Ok(job_output_result) => job_output_result, + Err(err) => todo!("{}", err) // TODO persist error in database + }; + + let _ = job_results.results.push(job_output_result); + } + + println!("job_results: {:?}", job_results); + + // TODO persist results in database + } +} diff --git a/backend/src/main.rs b/backend/src/main.rs index ebc94cf..fe8e2ca 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -13,6 +13,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use crate::catalog::controllers::{exec_catalog_service_post_validate_scripts, exec_catalog_service_validate_scripts, list_catalog_services, list_catalogs}; +use crate::catalog::services::BackgroundWorkerTask; use crate::cli::CLI; use crate::yaml_config::YamlConfig; @@ -67,6 +68,12 @@ async fn main() { show_loaded_config(&yaml_config); + let (tx, rx) = tokio::sync::mpsc::channel::(100); + + let _ = tokio::spawn(async move { + catalog::services::background_worker(rx).await; + }); + let app = Router::new() .fallback(unknown_route) .route("/", get(|| async { "OK" })) @@ -76,6 +83,7 @@ async fn main() { .route("/catalogs/:slug/services/:slug/validate", post(exec_catalog_service_validate_scripts)) .route("/catalogs/:slug/services/:slug/execute", post(exec_catalog_service_post_validate_scripts)) .layer(Extension(yaml_config)) + .layer(Extension(tx)) .layer(CorsLayer::new().allow_origin(Any)); //.route("/catalog/:id", get(catalog::get_catalog_by_id)) //.route("/catalog", post(catalog::create_catalog)); diff --git a/backend/src/yaml_config.rs b/backend/src/yaml_config.rs index cb1cf09..9328cfa 100644 --- a/backend/src/yaml_config.rs +++ b/backend/src/yaml_config.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use crate::constants::DEFAULT_TIMEOUT_IN_SECONDS; #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct YamlConfig { pub catalogs: Vec, } @@ -20,6 +21,7 @@ impl YamlConfig { } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct CatalogYamlConfig { pub slug: String, pub name: String, @@ -44,6 +46,7 @@ impl CatalogYamlConfig { } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct CatalogServiceYamlConfig { pub slug: String, pub name: String, @@ -118,6 +121,7 @@ pub trait ExternalCommand { #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct CatalogServiceValidateYamlConfig { pub command: Vec, pub timeout: Option, @@ -141,6 +145,7 @@ impl Display for CatalogServiceValidateYamlConfig { } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct CatalogServicePostValidateYamlConfig { pub command: Vec, pub timeout: Option, @@ -165,6 +170,7 @@ impl Display for CatalogServicePostValidateYamlConfig { } #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] pub struct CatalogFieldYamlConfig { pub slug: String, pub title: String,