diff --git a/backend/.sqlx/query-97048ce0bcabb9baecb80cde5ab3c989e1575fbd20ef22766d2887a86dce15e1.json b/backend/.sqlx/query-97048ce0bcabb9baecb80cde5ab3c989e1575fbd20ef22766d2887a86dce15e1.json new file mode 100644 index 0000000000000..c8ab870c76ae7 --- /dev/null +++ b/backend/.sqlx/query-97048ce0bcabb9baecb80cde5ab3c989e1575fbd20ef22766d2887a86dce15e1.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\"\n FROM flow WHERE path = $1 AND workspace_id = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version!: i64", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "97048ce0bcabb9baecb80cde5ab3c989e1575fbd20ef22766d2887a86dce15e1" +} diff --git a/backend/windmill-queue/src/jobs.rs b/backend/windmill-queue/src/jobs.rs index 1592eebe20428..88d53e50db5fa 100644 --- a/backend/windmill-queue/src/jobs.rs +++ b/backend/windmill-queue/src/jobs.rs @@ -3190,35 +3190,34 @@ pub async fn push<'c, 'd>( ) } JobPayload::Flow { path, dedicated_worker, apply_preprocessor } => { + let mut ntx = tx.into_tx().await?; // Fetch the latest version of the flow. - // Note that this query is performed within an isolated transaction to secure the - // API surface. - let version = fetch_scalar_isolated!( - sqlx::query_scalar!( - "SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\" - FROM flow WHERE path = $1 AND workspace_id = $2", - &path, - &workspace_id - ), - tx - )? + let version = sqlx::query_scalar!( + "SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\" + FROM flow WHERE path = $1 AND workspace_id = $2", + &path, + &workspace_id + ) + .fetch_optional(&mut *ntx) + .await? .ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?; // Do not use the lite version unless all workers are updated. - // This does not need to be performed within the isolated Tx as checks had been - // performed before when the version was fetched. let data = if *DISABLE_FLOW_SCRIPT || (!*MIN_VERSION_IS_AT_LEAST_1_432.read().await && !*CLOUD_HOSTED) { - cache::flow::fetch_version(_db, version).await + cache::flow::fetch_version(&mut *ntx, version).await } else { // Fallback to the original version if the lite version is not found. // This also prevent a race condition where the flow is run just after deploy and // the lite version is still being created. - cache::flow::fetch_version_lite(_db, version) - .or_else(|_| cache::flow::fetch_version(_db, version)) - .await + match cache::flow::fetch_version_lite(&mut *ntx, version).await { + Ok(data) => Ok(data), + Err(_) => cache::flow::fetch_version(&mut *ntx, version).await, + } }?; + tx = PushIsolationLevel::Transaction(ntx); + let value = data.value()?.clone(); let priority = value.priority; let cache_ttl = value.cache_ttl.map(|x| x as i32);