diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index 31dfdfd8eb2..ace5cddd719 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -33,7 +33,7 @@ use std::{collections::HashMap, sync::RwLock}; use postgres::config::{Config, Host}; -use crate::primary::{self, NAMESPACE_PUBLIC}; +use crate::primary::{self, Mirror, NAMESPACE_PUBLIC}; use crate::{advisory_lock, catalog}; use crate::{Shard, PRIMARY_SHARD}; @@ -54,10 +54,36 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [ "subgraph_error", "subgraph_manifest", "table_stats", + "subgraph", + "subgraph_version", + "subgraph_deployment_assignment", ], ), ]; +/// Make sure that the tables that `jobs::MirrorJob` wants to mirror are +/// actually mapped into the various shards. A failure here is simply a +/// coding mistake +fn check_mirrored_tables() { + for table in Mirror::PUBLIC_TABLES { + if !PRIMARY_TABLES.contains(&table) { + panic!("table {} is not in PRIMARY_TABLES", table); + } + } + + let subgraphs_tables = *SHARDED_TABLES + .iter() + .find(|(nsp, _)| *nsp == "subgraphs") + .map(|(_, tables)| tables) + .unwrap(); + + for table in Mirror::SUBGRAPHS_TABLES { + if !subgraphs_tables.contains(&table) { + panic!("table {} is not in SHARDED_TABLES[subgraphs]", table); + } + } +} + pub struct ForeignServer { pub name: String, pub shard: Shard, @@ -817,6 +843,8 @@ impl PoolInner { registry: Arc, state_tracker: PoolStateTracker, ) -> PoolInner { + check_mirrored_tables(); + let logger_store = logger.new(o!("component" => "Store")); let logger_pool = logger.new(o!("component" => "ConnectionPool")); let const_labels = { diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index ab6be9ee0ba..5ec81dcbd61 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1839,6 +1839,20 @@ pub struct Mirror { } impl Mirror { + // The tables that we mirror + // + // `chains` needs to be mirrored before `deployment_schemas` because + // of the fk constraint on `deployment_schemas.network`. We don't + // care much about mirroring `active_copies` but it has a fk + // constraint on `deployment_schemas` and is tiny, therefore it's + // easiest to just mirror it + pub(crate) const PUBLIC_TABLES: [&str; 3] = ["chains", "deployment_schemas", "active_copies"]; + pub(crate) const SUBGRAPHS_TABLES: [&str; 3] = [ + "subgraph_deployment_assignment", + "subgraph", + "subgraph_version", + ]; + pub fn new(pools: &HashMap) -> Mirror { let primary = pools .get(&PRIMARY_SHARD) @@ -1895,18 +1909,6 @@ impl Mirror { conn: &mut PgConnection, handle: &CancelHandle, ) -> Result<(), StoreError> { - // `chains` needs to be mirrored before `deployment_schemas` because - // of the fk constraint on `deployment_schemas.network`. We don't - // care much about mirroring `active_copies` but it has a fk - // constraint on `deployment_schemas` and is tiny, therefore it's - // easiest to just mirror it - const PUBLIC_TABLES: [&str; 3] = ["chains", "deployment_schemas", "active_copies"]; - const SUBGRAPHS_TABLES: [&str; 3] = [ - "subgraph_deployment_assignment", - "subgraph", - "subgraph_version", - ]; - fn run_query(conn: &mut PgConnection, query: String) -> Result<(), StoreError> { conn.batch_execute(&query).map_err(StoreError::from) } @@ -1938,11 +1940,11 @@ impl Mirror { // Truncate all tables at once, otherwise truncation can fail // because of foreign key constraints - let tables = PUBLIC_TABLES + let tables = Self::PUBLIC_TABLES .iter() .map(|name| (NAMESPACE_PUBLIC, name)) .chain( - SUBGRAPHS_TABLES + Self::SUBGRAPHS_TABLES .iter() .map(|name| (NAMESPACE_SUBGRAPHS, name)), ) @@ -1953,7 +1955,7 @@ impl Mirror { check_cancel()?; // Repopulate `PUBLIC_TABLES` by copying their data wholesale - for table_name in PUBLIC_TABLES { + for table_name in Self::PUBLIC_TABLES { copy_table( conn, ForeignServer::PRIMARY_PUBLIC,