Skip to content

Commit 004dfe2

Browse files
committed
store: Guard against a race setting up the 'sharded' namespace
1 parent 08e043b commit 004dfe2

File tree

1 file changed

+30
-22
lines changed

1 file changed

+30
-22
lines changed

store/postgres/src/connection_pool.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use postgres::config::{Config, Host};
3535

3636
use crate::advisory_lock::with_migration_lock;
3737
use crate::catalog;
38-
use crate::primary::{self, NAMESPACE_PUBLIC};
38+
use crate::primary::{self, Namespace, NAMESPACE_PUBLIC};
3939
use crate::{Shard, PRIMARY_SHARD};
4040

4141
/// Tables that we map from the primary into `primary_public` in each shard
@@ -1225,30 +1225,38 @@ impl PoolInner {
12251225
return Ok(());
12261226
}
12271227

1228-
info!(&self.logger, "Creating cross-shard views");
12291228
let mut conn = self.get()?;
1229+
let sharded = Namespace::special(ForeignServer::CROSS_SHARD_NSP);
1230+
if catalog::has_namespace(&mut conn, &sharded)? {
1231+
// We dropped the namespace before, but another node must have
1232+
// recreated it in the meantime so we don't need to do anything
1233+
return Ok(());
1234+
}
12301235

1231-
conn.transaction(|conn| {
1232-
let query = format!(
1233-
"create schema if not exists {}",
1234-
ForeignServer::CROSS_SHARD_NSP
1235-
);
1236-
conn.batch_execute(&query)?;
1237-
for (src_nsp, src_tables) in SHARDED_TABLES {
1238-
// Pairs of (shard, nsp) for all servers
1239-
let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers);
1240-
for src_table in src_tables {
1241-
let create_view = catalog::create_cross_shard_view(
1242-
conn,
1243-
src_nsp,
1244-
src_table,
1245-
ForeignServer::CROSS_SHARD_NSP,
1246-
&nsps,
1247-
)?;
1248-
conn.batch_execute(&create_view)?;
1249-
}
1236+
info!(&self.logger, "Creating cross-shard views");
1237+
with_migration_lock(&mut conn, |conn| {
1238+
if catalog::has_namespace(conn, &sharded)? {
1239+
return Ok(());
12501240
}
1251-
Ok(())
1241+
conn.transaction(|conn| {
1242+
let query = format!("create schema {}", ForeignServer::CROSS_SHARD_NSP);
1243+
conn.batch_execute(&query)?;
1244+
for (src_nsp, src_tables) in SHARDED_TABLES {
1245+
// Pairs of (shard, nsp) for all servers
1246+
let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers);
1247+
for src_table in src_tables {
1248+
let create_view = catalog::create_cross_shard_view(
1249+
conn,
1250+
src_nsp,
1251+
src_table,
1252+
ForeignServer::CROSS_SHARD_NSP,
1253+
&nsps,
1254+
)?;
1255+
conn.batch_execute(&create_view)?;
1256+
}
1257+
}
1258+
Ok(())
1259+
})
12521260
})
12531261
}
12541262

0 commit comments

Comments
 (0)