Skip to content

Commit

Permalink
fix: fix redeploying flows with attached schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
uael authored and rubenfiszel committed Dec 18, 2024
1 parent 3339e69 commit fb536df
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 16 additions & 17 deletions backend/windmill-queue/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3190,35 +3190,34 @@ pub async fn push<'c, 'd>(
)
}
JobPayload::Flow { path, dedicated_worker, apply_preprocessor } => {
let mut ntx = tx.into_tx().await?;
// Fetch the latest version of the flow.
// Note that this query is performed within an isolated transaction to secure the
// API surface.
let version = fetch_scalar_isolated!(
sqlx::query_scalar!(
"SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\"
FROM flow WHERE path = $1 AND workspace_id = $2",
&path,
&workspace_id
),
tx
)?
let version = sqlx::query_scalar!(
"SELECT flow.versions[array_upper(flow.versions, 1)] AS \"version!: i64\"
FROM flow WHERE path = $1 AND workspace_id = $2",
&path,
&workspace_id
)
.fetch_optional(&mut *ntx)
.await?
.ok_or_else(|| Error::InternalErr(format!("not found flow at path {:?}", path)))?;

// Do not use the lite version unless all workers are updated.
// This does not need to be performed within the isolated Tx as checks had been
// performed before when the version was fetched.
let data = if *DISABLE_FLOW_SCRIPT
|| (!*MIN_VERSION_IS_AT_LEAST_1_432.read().await && !*CLOUD_HOSTED)
{
cache::flow::fetch_version(_db, version).await
cache::flow::fetch_version(&mut *ntx, version).await
} else {
// Fallback to the original version if the lite version is not found.
// This also prevent a race condition where the flow is run just after deploy and
// the lite version is still being created.
cache::flow::fetch_version_lite(_db, version)
.or_else(|_| cache::flow::fetch_version(_db, version))
.await
match cache::flow::fetch_version_lite(&mut *ntx, version).await {
Ok(data) => Ok(data),
Err(_) => cache::flow::fetch_version(&mut *ntx, version).await,
}
}?;
tx = PushIsolationLevel::Transaction(ntx);

let value = data.value()?.clone();
let priority = value.priority;
let cache_ttl = value.cache_ttl.map(|x| x as i32);
Expand Down

0 comments on commit fb536df

Please sign in to comment.