Skip to content

Commit 1cb1941

Browse files
lutterencalypto
authored andcommitted
store: Check that we map all the tables that MirrorJob needs
1 parent 5c1b62c commit 1cb1941

File tree

2 files changed

+46
-16
lines changed

2 files changed

+46
-16
lines changed

store/postgres/src/connection_pool.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use std::{collections::HashMap, sync::RwLock};
3333

3434
use postgres::config::{Config, Host};
3535

36-
use crate::primary::{self, NAMESPACE_PUBLIC};
36+
use crate::primary::{self, Mirror, NAMESPACE_PUBLIC};
3737
use crate::{advisory_lock, catalog};
3838
use crate::{Shard, PRIMARY_SHARD};
3939

@@ -54,10 +54,36 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [
5454
"subgraph_error",
5555
"subgraph_manifest",
5656
"table_stats",
57+
"subgraph",
58+
"subgraph_version",
59+
"subgraph_deployment_assignment",
5760
],
5861
),
5962
];
6063

64+
/// Make sure that the tables that `jobs::MirrorJob` wants to mirror are
65+
/// actually mapped into the various shards. A failure here is simply a
66+
/// coding mistake
67+
fn check_mirrored_tables() {
68+
for table in Mirror::PUBLIC_TABLES {
69+
if !PRIMARY_TABLES.contains(&table) {
70+
panic!("table {} is not in PRIMARY_TABLES", table);
71+
}
72+
}
73+
74+
let subgraphs_tables = *SHARDED_TABLES
75+
.iter()
76+
.find(|(nsp, _)| *nsp == "subgraphs")
77+
.map(|(_, tables)| tables)
78+
.unwrap();
79+
80+
for table in Mirror::SUBGRAPHS_TABLES {
81+
if !subgraphs_tables.contains(&table) {
82+
panic!("table {} is not in SHARDED_TABLES[subgraphs]", table);
83+
}
84+
}
85+
}
86+
6187
pub struct ForeignServer {
6288
pub name: String,
6389
pub shard: Shard,
@@ -784,6 +810,8 @@ impl PoolInner {
784810
registry: Arc<MetricsRegistry>,
785811
state_tracker: PoolStateTracker,
786812
) -> PoolInner {
813+
check_mirrored_tables();
814+
787815
let logger_store = logger.new(o!("component" => "Store"));
788816
let logger_pool = logger.new(o!("component" => "ConnectionPool"));
789817
let const_labels = {

store/postgres/src/primary.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,6 +1839,20 @@ pub struct Mirror {
18391839
}
18401840

18411841
impl Mirror {
1842+
// The tables that we mirror
1843+
//
1844+
// `chains` needs to be mirrored before `deployment_schemas` because
1845+
// of the fk constraint on `deployment_schemas.network`. We don't
1846+
// care much about mirroring `active_copies` but it has a fk
1847+
// constraint on `deployment_schemas` and is tiny, therefore it's
1848+
// easiest to just mirror it
1849+
pub(crate) const PUBLIC_TABLES: [&str; 3] = ["chains", "deployment_schemas", "active_copies"];
1850+
pub(crate) const SUBGRAPHS_TABLES: [&str; 3] = [
1851+
"subgraph_deployment_assignment",
1852+
"subgraph",
1853+
"subgraph_version",
1854+
];
1855+
18421856
pub fn new(pools: &HashMap<Shard, ConnectionPool>) -> Mirror {
18431857
let primary = pools
18441858
.get(&PRIMARY_SHARD)
@@ -1895,18 +1909,6 @@ impl Mirror {
18951909
conn: &mut PgConnection,
18961910
handle: &CancelHandle,
18971911
) -> Result<(), StoreError> {
1898-
// `chains` needs to be mirrored before `deployment_schemas` because
1899-
// of the fk constraint on `deployment_schemas.network`. We don't
1900-
// care much about mirroring `active_copies` but it has a fk
1901-
// constraint on `deployment_schemas` and is tiny, therefore it's
1902-
// easiest to just mirror it
1903-
const PUBLIC_TABLES: [&str; 3] = ["chains", "deployment_schemas", "active_copies"];
1904-
const SUBGRAPHS_TABLES: [&str; 3] = [
1905-
"subgraph_deployment_assignment",
1906-
"subgraph",
1907-
"subgraph_version",
1908-
];
1909-
19101912
fn run_query(conn: &mut PgConnection, query: String) -> Result<(), StoreError> {
19111913
conn.batch_execute(&query).map_err(StoreError::from)
19121914
}
@@ -1938,11 +1940,11 @@ impl Mirror {
19381940

19391941
// Truncate all tables at once, otherwise truncation can fail
19401942
// because of foreign key constraints
1941-
let tables = PUBLIC_TABLES
1943+
let tables = Self::PUBLIC_TABLES
19421944
.iter()
19431945
.map(|name| (NAMESPACE_PUBLIC, name))
19441946
.chain(
1945-
SUBGRAPHS_TABLES
1947+
Self::SUBGRAPHS_TABLES
19461948
.iter()
19471949
.map(|name| (NAMESPACE_SUBGRAPHS, name)),
19481950
)
@@ -1953,7 +1955,7 @@ impl Mirror {
19531955
check_cancel()?;
19541956

19551957
// Repopulate `PUBLIC_TABLES` by copying their data wholesale
1956-
for table_name in PUBLIC_TABLES {
1958+
for table_name in Self::PUBLIC_TABLES {
19571959
copy_table(
19581960
conn,
19591961
ForeignServer::PRIMARY_PUBLIC,

0 commit comments

Comments
 (0)