|
6 | 6 | //! has more details on advisory locks. |
7 | 7 | //! |
8 | 8 | //! We use the following 64 bit locks: |
9 | | -//! * 1,2: to synchronize on migratons |
| 9 | +//! * 1: to synchronize on migratons |
10 | 10 | //! |
11 | 11 | //! We use the following 2x 32-bit locks |
12 | 12 | //! * 1, n: to lock copying of the deployment with id n in the destination |
@@ -69,41 +69,26 @@ const COPY: Scope = Scope { id: 1 }; |
69 | 69 | const WRITE: Scope = Scope { id: 2 }; |
70 | 70 | const PRUNE: Scope = Scope { id: 3 }; |
71 | 71 |
|
72 | | -/// Get a lock for running migrations. Blocks until we get the lock. |
73 | | -fn lock_migration(conn: &mut PgConnection) -> Result<(), StoreError> { |
74 | | - sql_query("select pg_advisory_lock(1)") |
75 | | - .execute(conn) |
76 | | - .map_err(|e| { |
77 | | - StoreError::from_diesel_error(&e).unwrap_or_else(|| { |
78 | | - StoreError::Unknown(anyhow::anyhow!("failed to acquire migration lock: {}", e)) |
79 | | - }) |
80 | | - })?; |
81 | | - |
82 | | - Ok(()) |
83 | | -} |
84 | | - |
85 | | -/// Release the migration lock. |
86 | | -fn unlock_migration(conn: &mut PgConnection) -> Result<(), StoreError> { |
87 | | - sql_query("select pg_advisory_unlock(1)") |
88 | | - .execute(conn) |
89 | | - .map_err(|e| { |
90 | | - StoreError::from_diesel_error(&e).unwrap_or_else(|| { |
91 | | - StoreError::Unknown(anyhow::anyhow!("failed to release migration lock: {}", e)) |
92 | | - }) |
93 | | - })?; |
94 | | - Ok(()) |
95 | | -} |
96 | | - |
97 | 72 | /// Block until we can get the migration lock, then run `f` and unlock when |
98 | 73 | /// it is done. This is used to make sure that only one node runs setup at a |
99 | 74 | /// time. |
100 | 75 | pub(crate) fn with_migration_lock<F, R>(conn: &mut PgConnection, f: F) -> Result<R, StoreError> |
101 | 76 | where |
102 | 77 | F: FnOnce(&mut PgConnection) -> Result<R, StoreError>, |
103 | 78 | { |
104 | | - lock_migration(conn)?; |
| 79 | + fn execute(conn: &mut PgConnection, query: &str, msg: &str) -> Result<(), StoreError> { |
| 80 | + sql_query(query).execute(conn).map(|_| ()).map_err(|e| { |
| 81 | + StoreError::from_diesel_error(&e) |
| 82 | + .unwrap_or_else(|| StoreError::Unknown(anyhow::anyhow!("{}: {}", msg, e))) |
| 83 | + }) |
| 84 | + } |
| 85 | + |
| 86 | + const LOCK: &str = "select pg_advisory_lock(1)"; |
| 87 | + const UNLOCK: &str = "select pg_advisory_unlock(1)"; |
| 88 | + |
| 89 | + execute(conn, LOCK, "failed to acquire migration lock")?; |
105 | 90 | let res = f(conn); |
106 | | - unlock_migration(conn)?; |
| 91 | + execute(conn, UNLOCK, "failed to release migration lock")?; |
107 | 92 | res |
108 | 93 | } |
109 | 94 |
|
|
0 commit comments