@@ -3190,35 +3190,34 @@ pub async fn push<'c, 'd>(
3190
3190
)
3191
3191
}
3192
3192
JobPayload :: Flow { path, dedicated_worker, apply_preprocessor } => {
3193
+ let mut ntx = tx. into_tx ( ) . await ?;
3193
3194
// Fetch the latest version of the flow.
3194
- // Note that this query is performed within an isolated transaction to secure the
3195
- // API surface.
3196
- let version = fetch_scalar_isolated ! (
3197
- sqlx:: query_scalar!(
3198
- "SELECT flow.versions[array_upper(flow.versions, 1)] AS \" version!: i64\"
3199
- FROM flow WHERE path = $1 AND workspace_id = $2" ,
3200
- & path,
3201
- & workspace_id
3202
- ) ,
3203
- tx
3204
- ) ?
3195
+ let version = sqlx:: query_scalar!(
3196
+ "SELECT flow.versions[array_upper(flow.versions, 1)] AS \" version!: i64\"
3197
+ FROM flow WHERE path = $1 AND workspace_id = $2" ,
3198
+ & path,
3199
+ & workspace_id
3200
+ )
3201
+ . fetch_optional ( & mut * ntx)
3202
+ . await ?
3205
3203
. ok_or_else ( || Error :: InternalErr ( format ! ( "not found flow at path {:?}" , path) ) ) ?;
3206
3204
3207
3205
// Do not use the lite version unless all workers are updated.
3208
- // This does not need to be performed within the isolated Tx as checks had been
3209
- // performed before when the version was fetched.
3210
3206
let data = if * DISABLE_FLOW_SCRIPT
3211
3207
|| ( !* MIN_VERSION_IS_AT_LEAST_1_432 . read ( ) . await && !* CLOUD_HOSTED )
3212
3208
{
3213
- cache:: flow:: fetch_version ( _db , version) . await
3209
+ cache:: flow:: fetch_version ( & mut * ntx , version) . await
3214
3210
} else {
3215
3211
// Fallback to the original version if the lite version is not found.
3216
3212
// This also prevent a race condition where the flow is run just after deploy and
3217
3213
// the lite version is still being created.
3218
- cache:: flow:: fetch_version_lite ( _db, version)
3219
- . or_else ( |_| cache:: flow:: fetch_version ( _db, version) )
3220
- . await
3214
+ match cache:: flow:: fetch_version_lite ( & mut * ntx, version) . await {
3215
+ Ok ( data) => Ok ( data) ,
3216
+ Err ( _) => cache:: flow:: fetch_version ( & mut * ntx, version) . await ,
3217
+ }
3221
3218
} ?;
3219
+ tx = PushIsolationLevel :: Transaction ( ntx) ;
3220
+
3222
3221
let value = data. value ( ) ?. clone ( ) ;
3223
3222
let priority = value. priority ;
3224
3223
let cache_ttl = value. cache_ttl . map ( |x| x as i32 ) ;
0 commit comments