Skip to content

Commit d259f0b

Browse files
committed
backend: finalize v2 migration (v2 phase 4 - final)
1 parent 1a2839a commit d259f0b

File tree

2 files changed

+143
-14
lines changed

2 files changed

+143
-14
lines changed

backend/windmill-api/src/db.rs

Lines changed: 135 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
* LICENSE-AGPL for a copy of the license.
77
*/
88

9-
use futures::FutureExt;
10-
use sqlx::Executor;
9+
use std::sync::atomic::Ordering;
10+
use std::time::Duration;
1111

12+
use futures::FutureExt;
1213
use sqlx::{
1314
migrate::{Migrate, MigrateError},
1415
pool::PoolConnection,
15-
PgConnection, Pool, Postgres,
16+
Executor, PgConnection, Pool, Postgres,
1617
};
18+
1719
use windmill_audit::audit_ee::{AuditAuthor, AuditAuthorable};
1820
use windmill_common::utils::generate_lock_id;
1921
use windmill_common::{
@@ -199,6 +201,24 @@ pub async fn migrate(db: &DB) -> Result<(), Error> {
199201
}
200202
});
201203

204+
let db2 = db.clone();
205+
let _ = tokio::task::spawn(async move {
206+
use windmill_common::worker::MIN_VERSION_IS_LATEST;
207+
loop {
208+
if !MIN_VERSION_IS_LATEST.load(Ordering::Relaxed) {
209+
tokio::time::sleep(Duration::from_secs(30)).await;
210+
continue;
211+
}
212+
if let Err(err) = v2_finalize(&db2).await {
213+
tracing::error!("{err:#}: Could not apply v2 finalize migration, retry in 30s..");
214+
tokio::time::sleep(Duration::from_secs(30)).await;
215+
continue;
216+
}
217+
tracing::info!("v2 finalization step successfully applied.");
218+
break;
219+
}
220+
});
221+
202222
Ok(())
203223
}
204224

@@ -250,7 +270,7 @@ async fn fix_flow_versioning_migration(
250270
}
251271

252272
macro_rules! run_windmill_migration {
253-
($migration_job_name:expr, $db:expr, $code:block) => {
273+
($migration_job_name:expr, $db:expr, |$tx:ident| $code:block) => {
254274
{
255275
let migration_job_name = $migration_job_name;
256276
let db: &Pool<Postgres> = $db;
@@ -264,11 +284,11 @@ macro_rules! run_windmill_migration {
264284
.unwrap_or(false);
265285
if !has_done_migration {
266286
tracing::info!("Applying {migration_job_name} migration");
267-
let mut tx = db.begin().await?;
287+
let mut $tx = db.begin().await?;
268288
let mut r = false;
269289
while !r {
270290
r = sqlx::query_scalar!("SELECT pg_try_advisory_lock(4242)")
271-
.fetch_one(&mut *tx)
291+
.fetch_one(&mut *$tx)
272292
.await
273293
.map_err(|e| {
274294
tracing::error!("Error acquiring {migration_job_name} lock: {e:#}");
@@ -298,17 +318,17 @@ macro_rules! run_windmill_migration {
298318
"INSERT INTO windmill_migrations (name) VALUES ($1) ON CONFLICT DO NOTHING",
299319
migration_job_name
300320
)
301-
.execute(&mut *tx)
321+
.execute(&mut *$tx)
302322
.await?;
303323
tracing::info!("Finished applying {migration_job_name} migration");
304324
} else {
305325
tracing::debug!("migration {migration_job_name} already done");
306326
}
307327

308328
let _ = sqlx::query("SELECT pg_advisory_unlock(4242)")
309-
.execute(&mut *tx)
329+
.execute(&mut *$tx)
310330
.await?;
311-
tx.commit().await?;
331+
$tx.commit().await?;
312332
tracing::info!("released lock for {migration_job_name}");
313333
} else {
314334
tracing::debug!("migration {migration_job_name} already done");
@@ -318,6 +338,108 @@ macro_rules! run_windmill_migration {
318338
};
319339
}
320340

341+
async fn v2_finalize(db: &DB) -> Result<(), Error> {
342+
run_windmill_migration!("v2_finalize_disable_sync", db, |tx| {
343+
tx.execute(
344+
r#"
345+
CREATE OR REPLACE TRIGGER v2_queue_instead_of_update_trigger
346+
INSTEAD OF UPDATE ON v2_queue
347+
FOR EACH ROW
348+
EXECUTE PROCEDURE v2_queue_instead_of_update();
349+
350+
CREATE OR REPLACE TRIGGER v2_completed_job_instead_of_update_trigger
351+
INSTEAD OF UPDATE ON v2_completed_job
352+
FOR EACH ROW
353+
EXECUTE PROCEDURE v2_completed_job_instead_of_update();
354+
355+
DROP FUNCTION v2_queue_instead_of_update_overlay() CASCADE;
356+
DROP FUNCTION v2_completed_job_instead_of_update_overlay() CASCADE;
357+
DROP FUNCTION v2_job_completed_before_insert() CASCADE;
358+
DROP FUNCTION v2_job_flow_runtime_before_insert() CASCADE;
359+
DROP FUNCTION v2_job_flow_runtime_before_update() CASCADE;
360+
DROP FUNCTION v2_job_queue_after_insert() CASCADE;
361+
DROP FUNCTION v2_job_queue_before_insert() CASCADE;
362+
DROP FUNCTION v2_job_queue_before_update() CASCADE;
363+
DROP FUNCTION v2_job_runtime_before_insert() CASCADE;
364+
DROP FUNCTION v2_job_runtime_before_update() CASCADE;
365+
366+
DROP VIEW completed_job, completed_job_view, job, queue, queue_view CASCADE;
367+
"#,
368+
)
369+
.await?;
370+
});
371+
run_windmill_migration!("v2_finalize_job_queue", db, |tx| {
372+
tx.execute(
373+
r#"
374+
ALTER TABLE v2_job_queue
375+
DROP COLUMN __parent_job CASCADE,
376+
DROP COLUMN __created_by CASCADE,
377+
DROP COLUMN __script_hash CASCADE,
378+
DROP COLUMN __script_path CASCADE,
379+
DROP COLUMN __args CASCADE,
380+
DROP COLUMN __logs CASCADE,
381+
DROP COLUMN __raw_code CASCADE,
382+
DROP COLUMN __canceled CASCADE,
383+
DROP COLUMN __last_ping CASCADE,
384+
DROP COLUMN __job_kind CASCADE,
385+
DROP COLUMN __env_id CASCADE,
386+
DROP COLUMN __schedule_path CASCADE,
387+
DROP COLUMN __permissioned_as CASCADE,
388+
DROP COLUMN __flow_status CASCADE,
389+
DROP COLUMN __raw_flow CASCADE,
390+
DROP COLUMN __is_flow_step CASCADE,
391+
DROP COLUMN __language CASCADE,
392+
DROP COLUMN __same_worker CASCADE,
393+
DROP COLUMN __raw_lock CASCADE,
394+
DROP COLUMN __pre_run_error CASCADE,
395+
DROP COLUMN __email CASCADE,
396+
DROP COLUMN __visible_to_owner CASCADE,
397+
DROP COLUMN __mem_peak CASCADE,
398+
DROP COLUMN __root_job CASCADE,
399+
DROP COLUMN __leaf_jobs CASCADE,
400+
DROP COLUMN __concurrent_limit CASCADE,
401+
DROP COLUMN __concurrency_time_window_s CASCADE,
402+
DROP COLUMN __timeout CASCADE,
403+
DROP COLUMN __flow_step_id CASCADE,
404+
DROP COLUMN __cache_ttl CASCADE;
405+
"#,
406+
)
407+
.await?;
408+
});
409+
run_windmill_migration!("v2_finalize_job_completed", db, |tx| {
410+
tx.execute(
411+
r#"
412+
ALTER TABLE v2_job_completed
413+
DROP COLUMN __parent_job CASCADE,
414+
DROP COLUMN __created_by CASCADE,
415+
DROP COLUMN __created_at CASCADE,
416+
DROP COLUMN __success CASCADE,
417+
DROP COLUMN __script_hash CASCADE,
418+
DROP COLUMN __script_path CASCADE,
419+
DROP COLUMN __args CASCADE,
420+
DROP COLUMN __logs CASCADE,
421+
DROP COLUMN __raw_code CASCADE,
422+
DROP COLUMN __canceled CASCADE,
423+
DROP COLUMN __job_kind CASCADE,
424+
DROP COLUMN __env_id CASCADE,
425+
DROP COLUMN __schedule_path CASCADE,
426+
DROP COLUMN __permissioned_as CASCADE,
427+
DROP COLUMN __raw_flow CASCADE,
428+
DROP COLUMN __is_flow_step CASCADE,
429+
DROP COLUMN __language CASCADE,
430+
DROP COLUMN __is_skipped CASCADE,
431+
DROP COLUMN __raw_lock CASCADE,
432+
DROP COLUMN __email CASCADE,
433+
DROP COLUMN __visible_to_owner CASCADE,
434+
DROP COLUMN __tag CASCADE,
435+
DROP COLUMN __priority CASCADE;
436+
"#,
437+
)
438+
.await?;
439+
});
440+
Ok(())
441+
}
442+
321443
async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
322444
// let has_done_migration = sqlx::query_scalar!(
323445
// "SELECT EXISTS(SELECT name FROM windmill_migrations WHERE name = 'fix_job_completed_index')"
@@ -360,7 +482,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
360482
// tx.commit().await?;
361483
// }
362484

363-
run_windmill_migration!("fix_job_completed_index_2", &db, {
485+
run_windmill_migration!("fix_job_completed_index_2", &db, |tx| {
364486
// sqlx::query(
365487
// "CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_completed_job_workspace_id_created_at_new_2 ON completed_job (workspace_id, job_kind, success, is_skipped, is_flow_step, created_at DESC)"
366488
// ).execute(db).await?;
@@ -380,7 +502,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
380502
.await?;
381503
});
382504

383-
run_windmill_migration!("fix_job_completed_index_3", &db, {
505+
run_windmill_migration!("fix_job_completed_index_3", &db, |tx| {
384506
sqlx::query("DROP INDEX CONCURRENTLY IF EXISTS index_completed_job_on_schedule_path")
385507
.execute(db)
386508
.await?;
@@ -398,7 +520,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
398520
.await?;
399521
});
400522

401-
run_windmill_migration!("fix_job_index_1", &db, {
523+
run_windmill_migration!("fix_job_index_1", &db, |tx| {
402524
let migration_job_name = "fix_job_completed_index_4";
403525
let mut i = 1;
404526
tracing::info!("step {i} of {migration_job_name} migration");
@@ -479,7 +601,7 @@ async fn fix_job_completed_index(db: &DB) -> Result<(), Error> {
479601
.await?;
480602
});
481603

482-
run_windmill_migration!("fix_labeled_jobs_index", &db, {
604+
run_windmill_migration!("fix_labeled_jobs_index", &db, |tx| {
483605
tracing::info!("Special migration to add index concurrently on job labels 2");
484606
sqlx::query!("DROP INDEX CONCURRENTLY IF EXISTS labeled_jobs_on_jobs")
485607
.execute(db)

backend/windmill-common/src/worker.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::{
1212
io::Write,
1313
path::{Component, Path, PathBuf},
1414
str::FromStr,
15-
sync::{atomic::AtomicBool, Arc},
15+
sync::{
16+
atomic::{AtomicBool, Ordering},
17+
Arc,
18+
},
1619
};
1720
use tokio::sync::RwLock;
1821
use windmill_macros::annotations;
@@ -103,6 +106,8 @@ lazy_static::lazy_static! {
103106
pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true");
104107
}
105108

109+
pub static MIN_VERSION_IS_LATEST: AtomicBool = AtomicBool::new(false);
110+
106111
pub async fn make_suspended_pull_query(wc: &WorkerConfig) {
107112
if wc.worker_tags.len() == 0 {
108113
tracing::error!("Empty tags in worker tags, skipping");
@@ -636,6 +641,8 @@ pub async fn update_min_version<'c, E: sqlx::Executor<'c, Database = sqlx::Postg
636641
tracing::info!("Minimal worker version: {min_version}");
637642
}
638643

644+
MIN_VERSION_IS_LATEST.store(min_version == cur_version, Ordering::Relaxed);
645+
639646
*MIN_VERSION_IS_AT_LEAST_1_427.write().await = min_version >= Version::new(1, 427, 0);
640647
*MIN_VERSION_IS_AT_LEAST_1_432.write().await = min_version >= Version::new(1, 432, 0);
641648
*MIN_VERSION_IS_AT_LEAST_1_440.write().await = min_version >= Version::new(1, 440, 0);

0 commit comments

Comments
 (0)