Skip to content

Commit bf27b19

Browse files
committed
Clarify the code around locking migrations a bit
1 parent 843278a commit bf27b19

File tree

3 files changed

+37
-21
lines changed

3 files changed

+37
-21
lines changed

graph/src/components/store/err.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl Clone for StoreError {
141141
}
142142

143143
impl StoreError {
144-
fn from_diesel_error(e: &DieselError) -> Option<Self> {
144+
pub fn from_diesel_error(e: &DieselError) -> Option<Self> {
145145
const CONN_CLOSE: &str = "server closed the connection unexpectedly";
146146
const STMT_TIMEOUT: &str = "canceling statement due to statement timeout";
147147
let DieselError::DatabaseError(_, info) = e else {

store/postgres/src/advisory_lock.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,43 @@ const WRITE: Scope = Scope { id: 2 };
7070
const PRUNE: Scope = Scope { id: 3 };
7171

7272
/// Get a lock for running migrations. Blocks until we get the lock.
73-
pub(crate) fn lock_migration(conn: &mut PgConnection) -> Result<(), StoreError> {
74-
sql_query("select pg_advisory_lock(1)").execute(conn)?;
73+
fn lock_migration(conn: &mut PgConnection) -> Result<(), StoreError> {
74+
sql_query("select pg_advisory_lock(1)")
75+
.execute(conn)
76+
.map_err(|e| {
77+
StoreError::from_diesel_error(&e).unwrap_or_else(|| {
78+
StoreError::Unknown(anyhow::anyhow!("failed to acquire migration lock: {}", e))
79+
})
80+
})?;
7581

7682
Ok(())
7783
}
7884

7985
/// Release the migration lock.
80-
pub(crate) fn unlock_migration(conn: &mut PgConnection) -> Result<(), StoreError> {
81-
sql_query("select pg_advisory_unlock(1)").execute(conn)?;
86+
fn unlock_migration(conn: &mut PgConnection) -> Result<(), StoreError> {
87+
sql_query("select pg_advisory_unlock(1)")
88+
.execute(conn)
89+
.map_err(|e| {
90+
StoreError::from_diesel_error(&e).unwrap_or_else(|| {
91+
StoreError::Unknown(anyhow::anyhow!("failed to release migration lock: {}", e))
92+
})
93+
})?;
8294
Ok(())
8395
}
8496

97+
/// Block until we can get the migration lock, then run `f` and unlock when
98+
/// it is done. This is used to make sure that only one node runs setup at a
99+
/// time.
100+
pub(crate) fn with_migration_lock<F, R>(conn: &mut PgConnection, f: F) -> Result<R, StoreError>
101+
where
102+
F: FnOnce(&mut PgConnection) -> Result<R, StoreError>,
103+
{
104+
lock_migration(conn)?;
105+
let res = f(conn);
106+
unlock_migration(conn)?;
107+
res
108+
}
109+
85110
/// Take the lock used to keep two copy operations to run simultaneously on
86111
/// the same deployment. Block until we can get the lock
87112
pub(crate) fn lock_copying(conn: &mut PgConnection, dst: &Site) -> Result<(), StoreError> {

store/postgres/src/connection_pool.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ use std::{collections::HashMap, sync::RwLock};
3333

3434
use postgres::config::{Config, Host};
3535

36+
use crate::advisory_lock::with_migration_lock;
37+
use crate::catalog;
3638
use crate::primary::{self, NAMESPACE_PUBLIC};
37-
use crate::{advisory_lock, catalog};
3839
use crate::{Shard, PRIMARY_SHARD};
3940

4041
/// Tables that we map from the primary into `primary_public` in each shard
@@ -1113,8 +1114,6 @@ impl PoolInner {
11131114

11141115
let start = Instant::now();
11151116

1116-
advisory_lock::lock_migration(&mut conn)
1117-
.unwrap_or_else(|err| die(&pool.logger, "failed to get migration lock", &err));
11181117
// This code can cause a race in database setup: if pool A has had
11191118
// schema changes and pool B then tries to map tables from pool A,
11201119
// but does so before the concurrent thread running this code for
@@ -1128,13 +1127,10 @@ impl PoolInner {
11281127
// the schema but before finishing remapping in all shards.
11291128
// Addressing that would require keeping track of the need to remap
11301129
// in the database instead of just in memory
1131-
let result = pool
1132-
.configure_fdw(coord.servers.as_ref())
1133-
.and_then(|()| pool.drop_cross_shard_views())
1134-
.and_then(|()| migrate_schema(&pool.logger, &mut conn));
1135-
debug!(&pool.logger, "Release migration lock");
1136-
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
1137-
die(&pool.logger, "failed to release migration lock", &err);
1130+
let result = with_migration_lock(&mut conn, |conn| {
1131+
pool.configure_fdw(coord.servers.as_ref())
1132+
.and_then(|()| pool.drop_cross_shard_views())
1133+
.and_then(|()| migrate_schema(&pool.logger, conn))
11381134
});
11391135
let result = result
11401136
.and_then(|count| coord.propagate(&pool, count))
@@ -1451,12 +1447,7 @@ impl PoolCoordinator {
14511447
let server = self.server(&pool.shard)?;
14521448
for pool in self.pools.lock().unwrap().values() {
14531449
let mut conn = pool.get()?;
1454-
let remap_res = {
1455-
advisory_lock::lock_migration(&mut conn)?;
1456-
let res = pool.remap(server);
1457-
advisory_lock::unlock_migration(&mut conn)?;
1458-
res
1459-
};
1450+
let remap_res = { with_migration_lock(&mut conn, |_| pool.remap(server)) };
14601451
if let Err(e) = remap_res {
14611452
error!(pool.logger, "Failed to map imports from {}", server.shard; "error" => e.to_string());
14621453
return Err(e);

0 commit comments

Comments
 (0)