Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
wip(backend): add async background worker
Browse files Browse the repository at this point in the history
  • Loading branch information
evoxmusic committed Dec 31, 2023
1 parent 37364d9 commit b773de0
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 25 deletions.
12 changes: 6 additions & 6 deletions backend/examples/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ catalogs:
- command:
- bash
- examples/dumb_script_ok.sh # AND then this one
post-validation:
post_validate:
- command:
- python
- examples/validation_script_ok.py # executed first
timeout: 60 # timeout in seconds
output-model: string (optional) # model name
output_model: string (optional) # model name
- command:
- bash
- examples/dumb_script_ok.sh # AND then this one
output-model: string (optional) # model name
output_model: string (optional) # model name
- slug: stop-testing-environment
name: Stop Testing Environment
description: stop a testing environment
Expand All @@ -68,16 +68,16 @@ catalogs:
- command:
- bash
- examples/dumb_script_ok.sh # AND then this one
post-validation:
post_validate:
- command:
- python
- examples/validation_script_ok.py # executed first
timeout: 60 # timeout in seconds
output-model: string (optional) # model name
output_model: string (optional) # model name
- command:
- bash
- examples/dumb_script_ok.sh # AND then this one
output-model: string (optional) # model name
output_model: string (optional) # model name
models:
- name: string
description: string (optional)
Expand Down
29 changes: 10 additions & 19 deletions backend/src/catalog/controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::sync::Arc;
use axum::{debug_handler, Extension, Json};
use axum::extract::Path;
use axum::http::StatusCode;
use tokio::sync::mpsc::Sender;
use tracing::error;

use crate::catalog::{check_json_payload_against_yaml_config_fields, execute_command, ExecValidateScriptRequest, find_catalog_by_slug, get_catalog_and_service, JobResponse, JobResults, ResultsResponse};
use crate::catalog::services::BackgroundWorkerTask;
use crate::yaml_config::{CatalogServiceYamlConfig, CatalogYamlConfig, ExternalCommand, YamlConfig};

#[debug_handler]
Expand Down Expand Up @@ -77,6 +80,7 @@ pub async fn exec_catalog_service_validate_scripts(
#[debug_handler]
pub async fn exec_catalog_service_post_validate_scripts(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Extension(tx): Extension<Sender<BackgroundWorkerTask>>,
Path((catalog_slug, service_slug)): Path<(String, String)>,
Json(req): Json<ExecValidateScriptRequest>,
) -> (StatusCode, Json<JobResponse>) {
Expand Down Expand Up @@ -109,28 +113,15 @@ pub async fn exec_catalog_service_post_validate_scripts(
};
}


let service = service.clone();
// execute post validate scripts
let _ = tokio::spawn(async move {
let mut job_results = JobResults {
user_fields_input: req.payload.clone(),
results: vec![],
};

for cmd in service.post_validate.as_ref().unwrap_or(&vec![]) {
let job_output_result = match execute_command(cmd, req.payload.to_string().as_str()).await {
Ok(job_output_result) => job_output_result,
Err(err) => todo!("{}", err) // TODO persist error in database
};

let _ = job_results.results.push(job_output_result);
}

// TODO persist results in database
let _ = tx.send(BackgroundWorkerTask::new(
catalog_slug.clone(),
service.clone(),
req,
)).await.unwrap_or_else(|err| {
error!("failed to send task to background worker: {}", err);
});


(StatusCode::NO_CONTENT, Json(JobResponse { message: Some("workflow executed".to_string()), results: None }))
}

Expand Down
1 change: 1 addition & 0 deletions backend/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::debug;
use crate::yaml_config::{CatalogServiceYamlConfig, CatalogYamlConfig, ExternalCommand, YamlConfig};

pub mod controllers;
pub mod services;

#[derive(Serialize, Deserialize)]
pub struct ResultsResponse<T> {
Expand Down
48 changes: 48 additions & 0 deletions backend/src/catalog/services.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;

use crate::catalog::{execute_command, ExecValidateScriptRequest, JobResults};
use crate::yaml_config::CatalogServiceYamlConfig;

#[derive(Serialize, Deserialize)]
pub struct BackgroundWorkerTask {
pub catalog_slug: String,
pub catalog_service_yaml_config: CatalogServiceYamlConfig,
pub req: ExecValidateScriptRequest,
}

impl BackgroundWorkerTask {
pub fn new(
catalog_slug: String,
catalog_service_yaml_config: CatalogServiceYamlConfig,
req: ExecValidateScriptRequest,
) -> Self {
Self {
catalog_slug,
catalog_service_yaml_config,
req,
}
}
}

pub async fn background_worker(mut rx: Receiver<BackgroundWorkerTask>) {
while let Some(task) = rx.recv().await {
let mut job_results = JobResults {
user_fields_input: task.req.payload.clone(),
results: vec![],
};

for cmd in task.catalog_service_yaml_config.post_validate.as_ref().unwrap_or(&vec![]) {
let job_output_result = match execute_command(cmd, task.req.payload.to_string().as_str()).await {
Ok(job_output_result) => job_output_result,
Err(err) => todo!("{}", err) // TODO persist error in database
};

let _ = job_results.results.push(job_output_result);
}

println!("job_results: {:?}", job_results);

// TODO persist results in database
}
}
8 changes: 8 additions & 0 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

use crate::catalog::controllers::{exec_catalog_service_post_validate_scripts, exec_catalog_service_validate_scripts, list_catalog_services, list_catalogs};
use crate::catalog::services::BackgroundWorkerTask;
use crate::cli::CLI;
use crate::yaml_config::YamlConfig;

Expand Down Expand Up @@ -67,6 +68,12 @@ async fn main() {

show_loaded_config(&yaml_config);

let (tx, rx) = tokio::sync::mpsc::channel::<BackgroundWorkerTask>(100);

let _ = tokio::spawn(async move {
catalog::services::background_worker(rx).await;
});

let app = Router::new()
.fallback(unknown_route)
.route("/", get(|| async { "OK" }))
Expand All @@ -76,6 +83,7 @@ async fn main() {
.route("/catalogs/:slug/services/:slug/validate", post(exec_catalog_service_validate_scripts))
.route("/catalogs/:slug/services/:slug/execute", post(exec_catalog_service_post_validate_scripts))
.layer(Extension(yaml_config))
.layer(Extension(tx))
.layer(CorsLayer::new().allow_origin(Any));
//.route("/catalog/:id", get(catalog::get_catalog_by_id))
//.route("/catalog", post(catalog::create_catalog));
Expand Down
6 changes: 6 additions & 0 deletions backend/src/yaml_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use crate::constants::DEFAULT_TIMEOUT_IN_SECONDS;

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct YamlConfig {
pub catalogs: Vec<CatalogYamlConfig>,
}
Expand All @@ -20,6 +21,7 @@ impl YamlConfig {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct CatalogYamlConfig {
pub slug: String,
pub name: String,
Expand All @@ -44,6 +46,7 @@ impl CatalogYamlConfig {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct CatalogServiceYamlConfig {
pub slug: String,
pub name: String,
Expand Down Expand Up @@ -118,6 +121,7 @@ pub trait ExternalCommand {


#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct CatalogServiceValidateYamlConfig {
pub command: Vec<String>,
pub timeout: Option<u64>,
Expand All @@ -141,6 +145,7 @@ impl Display for CatalogServiceValidateYamlConfig {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct CatalogServicePostValidateYamlConfig {
pub command: Vec<String>,
pub timeout: Option<u64>,
Expand All @@ -165,6 +170,7 @@ impl Display for CatalogServicePostValidateYamlConfig {
}

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct CatalogFieldYamlConfig {
pub slug: String,
pub title: String,
Expand Down

0 comments on commit b773de0

Please sign in to comment.