Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
wip(backend): update database records on catalog service execution
Browse files Browse the repository at this point in the history
  • Loading branch information
evoxmusic committed Jan 5, 2024
1 parent 55bc9d2 commit 642eb00
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 10 deletions.
21 changes: 19 additions & 2 deletions backend/src/catalog/controllers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::error;
use crate::catalog::{check_json_payload_against_yaml_config_fields, execute_command, ExecValidateScriptRequest, find_catalog_by_slug, get_catalog_and_service, JobResponse, JobResults, ResultsResponse};
use crate::catalog::services::BackgroundWorkerTask;
use crate::database;
use crate::database::CatalogExecutionStatusJson;
use crate::database::{CatalogExecutionStatusJson, insert_catalog_execution_status, Status};
use crate::yaml_config::{CatalogServiceYamlConfig, CatalogYamlConfig, ExternalCommand, YamlConfig};

#[debug_handler]
Expand Down Expand Up @@ -99,6 +99,7 @@ pub async fn exec_catalog_service_validate_scripts(
pub async fn exec_catalog_service_post_validate_scripts(
Extension(yaml_config): Extension<Arc<YamlConfig>>,
Extension(tx): Extension<Sender<BackgroundWorkerTask>>,
Extension(pg_pool): Extension<Arc<sqlx::PgPool>>,
Path((catalog_slug, service_slug)): Path<(String, String)>,
Json(req): Json<ExecValidateScriptRequest>,
) -> (StatusCode, Json<JobResponse>) {
Expand Down Expand Up @@ -131,13 +132,29 @@ pub async fn exec_catalog_service_post_validate_scripts(
};
}

let ces = match insert_catalog_execution_status(
&pg_pool,
&catalog_slug,
&service_slug,
Status::Queued,
&req.payload,
&serde_json::Value::Object(serde_json::Map::new()),
).await {
Ok(ces) => ces,
Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(JobResponse {
message: Some(err.to_string()),
results: None,
}))
};

// execute post validate scripts
let _ = tx.send(BackgroundWorkerTask::new(
catalog_slug.clone(),
ces.id(),
service.clone(),
req,
)).await.unwrap_or_else(|err| {
error!("failed to send task to background worker: {}", err);
// TODO change catalog execution status to Failure
});

(StatusCode::NO_CONTENT, Json(JobResponse { message: Some("workflow executed".to_string()), results: None }))
Expand Down
10 changes: 3 additions & 7 deletions backend/src/catalog/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ use crate::yaml_config::CatalogServiceYamlConfig;

#[derive(Serialize, Deserialize)]
pub struct BackgroundWorkerTask {
pub catalog_slug: String,
pub catalog_execution_status_id: String,
pub catalog_service_yaml_config: CatalogServiceYamlConfig,
pub req: ExecValidateScriptRequest,
}

impl BackgroundWorkerTask {
pub fn new(
catalog_slug: String,
catalog_execution_status_id: String,
catalog_service_yaml_config: CatalogServiceYamlConfig,
req: ExecValidateScriptRequest,
) -> Self {
Self {
catalog_slug,
catalog_execution_status_id,
catalog_service_yaml_config,
req,
}
Expand All @@ -43,9 +43,5 @@ pub async fn background_worker(mut rx: Receiver<BackgroundWorkerTask>, pg_pool:

let _ = job_results.results.push(job_output_result);
}

println!("job_results: {:?}", job_results);

// TODO persist results in database
}
}
36 changes: 35 additions & 1 deletion backend/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl CatalogExecutionStatus {
tasks_payload: self.tasks_payload.clone(),
}
}

pub fn id(&self) -> String {
self.id.to_string()
}
}

#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -65,7 +69,11 @@ pub async fn init_database(pg_pool: &Pool<Postgres>) -> Result<(), QError> {
Ok(())
}

pub async fn list_catalog_execution_statuses(pg_pool: &Pool<Postgres>, catalog_slug: &str, service_slug: &str) -> Result<Vec<CatalogExecutionStatus>, QError> {
pub async fn list_catalog_execution_statuses(
pg_pool: &Pool<Postgres>,
catalog_slug: &str,
service_slug: &str,
) -> Result<Vec<CatalogExecutionStatus>, QError> {
Ok(
sqlx::query_as::<_, CatalogExecutionStatus>(
r#"
Expand All @@ -80,3 +88,29 @@ pub async fn list_catalog_execution_statuses(pg_pool: &Pool<Postgres>, catalog_s
.await?
)
}

pub async fn insert_catalog_execution_status(
pg_pool: &Pool<Postgres>,
catalog_slug: &str,
service_slug: &str,
status: Status,
input_payload: &serde_json::Value,
tasks_payload: &serde_json::Value,
) -> Result<CatalogExecutionStatus, QError> {
Ok(
sqlx::query_as::<_, CatalogExecutionStatus>(
r#"
INSERT INTO catalog_execution_statuses (catalog_slug, service_slug, status, input_payload, tasks_payload)
VALUES ($1, $2, $3, $4, $5)
RETURNING *
"#
)
.bind(catalog_slug)
.bind(service_slug)
.bind(status)
.bind(input_payload)
.bind(tasks_payload)
.fetch_one(pg_pool)
.await?
)
}

0 comments on commit 642eb00

Please sign in to comment.