From e505b6f722341fe4c3d15b74d241f3d95de477bb Mon Sep 17 00:00:00 2001 From: Abel Lucas Date: Wed, 18 Dec 2024 11:29:52 +0100 Subject: [PATCH] flow: add path to flow script as well --- backend/windmill-common/src/jobs.rs | 3 ++- backend/windmill-queue/src/jobs.rs | 5 +++-- backend/windmill-worker/src/worker_flow.rs | 11 +++++++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/backend/windmill-common/src/jobs.rs b/backend/windmill-common/src/jobs.rs index 086976e701e0f..1190f09f47d7b 100644 --- a/backend/windmill-common/src/jobs.rs +++ b/backend/windmill-common/src/jobs.rs @@ -274,7 +274,8 @@ pub enum JobPayload { apply_preprocessor: bool, }, FlowScript { - id: FlowNodeId, // flow_node(id). + id: FlowNodeId, // flow_node(id). + path: Option, // flow step path. language: ScriptLang, custom_concurrency_key: Option, concurrent_limit: Option, diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 9df0b70d897d8..3a361890fbf24 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -2845,7 +2845,8 @@ pub async fn push<'c, 'd>( ) } JobPayload::FlowScript { - id, // flow_node(id). + id, // flow_node(id). + path, // flow step path. language, custom_concurrency_key, concurrent_limit, @@ -2854,7 +2855,7 @@ pub async fn push<'c, 'd>( dedicated_worker, } => ( Some(id.0), - None, + path, None, JobKind::FlowScript, None, diff --git a/backend/windmill-worker/src/worker_flow.rs b/backend/windmill-worker/src/worker_flow.rs index 7d7ea8d0acd4d..8626ef44501e1 100644 --- a/backend/windmill-worker/src/worker_flow.rs +++ b/backend/windmill-worker/src/worker_flow.rs @@ -3089,9 +3089,19 @@ async fn compute_next_flow_transform( concurrency_time_window_s, .. } => { + let path = if status + .preprocessor_module + .as_ref() + .is_some_and(|x| x.id() == module.id) + { + format!("{}/preprocessor", flow_job.script_path()) + } else { + format!("{}/step-{}", flow_job.script_path(), status.step) + }; let payload = JobPayloadWithTag { payload: JobPayload::FlowScript { id, + path: Some(path), language, custom_concurrency_key: custom_concurrency_key.clone(), concurrent_limit, @@ -3693,6 +3703,7 @@ async fn payload_from_simple_module( } => JobPayloadWithTag { payload: JobPayload::FlowScript { id, + path: inner_path, language, custom_concurrency_key, concurrent_limit,