From 642eb003450c94c22e8c443bf710b00a6e138558 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romaric=20Philog=C3=A8ne?= Date: Fri, 5 Jan 2024 14:26:34 +0100 Subject: [PATCH] wip(backend): update database records on catalog service execution --- backend/src/catalog/controllers.rs | 21 +++++++++++++++-- backend/src/catalog/services.rs | 10 +++------ backend/src/database.rs | 36 +++++++++++++++++++++++++++++- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/backend/src/catalog/controllers.rs b/backend/src/catalog/controllers.rs index 31cb235..19e93b4 100644 --- a/backend/src/catalog/controllers.rs +++ b/backend/src/catalog/controllers.rs @@ -9,7 +9,7 @@ use tracing::error; use crate::catalog::{check_json_payload_against_yaml_config_fields, execute_command, ExecValidateScriptRequest, find_catalog_by_slug, get_catalog_and_service, JobResponse, JobResults, ResultsResponse}; use crate::catalog::services::BackgroundWorkerTask; use crate::database; -use crate::database::CatalogExecutionStatusJson; +use crate::database::{CatalogExecutionStatusJson, insert_catalog_execution_status, Status}; use crate::yaml_config::{CatalogServiceYamlConfig, CatalogYamlConfig, ExternalCommand, YamlConfig}; #[debug_handler] @@ -99,6 +99,7 @@ pub async fn exec_catalog_service_validate_scripts( pub async fn exec_catalog_service_post_validate_scripts( Extension(yaml_config): Extension>, Extension(tx): Extension>, + Extension(pg_pool): Extension>, Path((catalog_slug, service_slug)): Path<(String, String)>, Json(req): Json, ) -> (StatusCode, Json) { @@ -131,13 +132,29 @@ pub async fn exec_catalog_service_post_validate_scripts( }; } + let ces = match insert_catalog_execution_status( + &pg_pool, + &catalog_slug, + &service_slug, + Status::Queued, + &req.payload, + &serde_json::Value::Object(serde_json::Map::new()), + ).await { + Ok(ces) => ces, + Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(JobResponse { + message: Some(err.to_string()), + results: None, + })) + }; + // execute post validate scripts let _ = tx.send(BackgroundWorkerTask::new( - catalog_slug.clone(), + ces.id(), service.clone(), req, )).await.unwrap_or_else(|err| { error!("failed to send task to background worker: {}", err); + // TODO change catalog execution status to Failure }); (StatusCode::NO_CONTENT, Json(JobResponse { message: Some("workflow executed".to_string()), results: None })) diff --git a/backend/src/catalog/services.rs b/backend/src/catalog/services.rs index 16c9118..d07af17 100644 --- a/backend/src/catalog/services.rs +++ b/backend/src/catalog/services.rs @@ -9,19 +9,19 @@ use crate::yaml_config::CatalogServiceYamlConfig; #[derive(Serialize, Deserialize)] pub struct BackgroundWorkerTask { - pub catalog_slug: String, + pub catalog_execution_status_id: String, pub catalog_service_yaml_config: CatalogServiceYamlConfig, pub req: ExecValidateScriptRequest, } impl BackgroundWorkerTask { pub fn new( - catalog_slug: String, + catalog_execution_status_id: String, catalog_service_yaml_config: CatalogServiceYamlConfig, req: ExecValidateScriptRequest, ) -> Self { Self { - catalog_slug, + catalog_execution_status_id, catalog_service_yaml_config, req, } @@ -43,9 +43,5 @@ pub async fn background_worker(mut rx: Receiver, pg_pool: let _ = job_results.results.push(job_output_result); } - - println!("job_results: {:?}", job_results); - - // TODO persist results in database } } diff --git a/backend/src/database.rs b/backend/src/database.rs index 0864b90..f905bd1 100644 --- a/backend/src/database.rs +++ b/backend/src/database.rs @@ -40,6 +40,10 @@ impl CatalogExecutionStatus { tasks_payload: self.tasks_payload.clone(), } } + + pub fn id(&self) -> String { + self.id.to_string() + } } #[derive(Serialize, Deserialize, Debug)] @@ -65,7 +69,11 @@ pub async fn init_database(pg_pool: &Pool) -> Result<(), QError> { Ok(()) } -pub async fn list_catalog_execution_statuses(pg_pool: &Pool, catalog_slug: &str, service_slug: &str) -> Result, QError> { +pub async fn list_catalog_execution_statuses( + pg_pool: &Pool, + catalog_slug: &str, + service_slug: &str, +) -> Result, QError> { Ok( sqlx::query_as::<_, CatalogExecutionStatus>( r#" @@ -80,3 +88,29 @@ pub async fn list_catalog_execution_statuses(pg_pool: &Pool, catalog_s .await? ) } + +pub async fn insert_catalog_execution_status( + pg_pool: &Pool, + catalog_slug: &str, + service_slug: &str, + status: Status, + input_payload: &serde_json::Value, + tasks_payload: &serde_json::Value, +) -> Result { + Ok( + sqlx::query_as::<_, CatalogExecutionStatus>( + r#" + INSERT INTO catalog_execution_statuses (catalog_slug, service_slug, status, input_payload, tasks_payload) + VALUES ($1, $2, $3, $4, $5) + RETURNING * + "# + ) + .bind(catalog_slug) + .bind(service_slug) + .bind(status) + .bind(input_payload) + .bind(tasks_payload) + .fetch_one(pg_pool) + .await? + ) +}