Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{collections::HashMap, sync::RwLock};

use postgres::config::{Config, Host};

use crate::primary::{self, NAMESPACE_PUBLIC};
use crate::primary::{self, Mirror, NAMESPACE_PUBLIC};
use crate::{advisory_lock, catalog};
use crate::{Shard, PRIMARY_SHARD};

Expand All @@ -54,10 +54,36 @@ const SHARDED_TABLES: [(&str, &[&str]); 2] = [
"subgraph_error",
"subgraph_manifest",
"table_stats",
"subgraph",
"subgraph_version",
"subgraph_deployment_assignment",
],
),
];

/// Make sure that the tables that `jobs::MirrorJob` wants to mirror are
/// actually mapped into the various shards. A failure here is simply a
/// coding mistake
fn check_mirrored_tables() {
for table in Mirror::PUBLIC_TABLES {
if !PRIMARY_TABLES.contains(&table) {
panic!("table {} is not in PRIMARY_TABLES", table);
}
}

let subgraphs_tables = *SHARDED_TABLES
.iter()
.find(|(nsp, _)| *nsp == "subgraphs")
.map(|(_, tables)| tables)
.unwrap();

for table in Mirror::SUBGRAPHS_TABLES {
if !subgraphs_tables.contains(&table) {
panic!("table {} is not in SHARDED_TABLES[subgraphs]", table);
}
}
}

pub struct ForeignServer {
pub name: String,
pub shard: Shard,
Expand Down Expand Up @@ -817,6 +843,8 @@ impl PoolInner {
registry: Arc<MetricsRegistry>,
state_tracker: PoolStateTracker,
) -> PoolInner {
check_mirrored_tables();

let logger_store = logger.new(o!("component" => "Store"));
let logger_pool = logger.new(o!("component" => "ConnectionPool"));
let const_labels = {
Expand Down
32 changes: 17 additions & 15 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,20 @@ pub struct Mirror {
}

impl Mirror {
// The tables that we mirror
//
// `chains` needs to be mirrored before `deployment_schemas` because
// of the fk constraint on `deployment_schemas.network`. We don't
// care much about mirroring `active_copies` but it has a fk
// constraint on `deployment_schemas` and is tiny, therefore it's
// easiest to just mirror it
pub(crate) const PUBLIC_TABLES: [&str; 3] = ["chains", "deployment_schemas", "active_copies"];
pub(crate) const SUBGRAPHS_TABLES: [&str; 3] = [
"subgraph_deployment_assignment",
"subgraph",
"subgraph_version",
];

pub fn new(pools: &HashMap<Shard, ConnectionPool>) -> Mirror {
let primary = pools
.get(&PRIMARY_SHARD)
Expand Down Expand Up @@ -1895,18 +1909,6 @@ impl Mirror {
conn: &mut PgConnection,
handle: &CancelHandle,
) -> Result<(), StoreError> {
// `chains` needs to be mirrored before `deployment_schemas` because
// of the fk constraint on `deployment_schemas.network`. We don't
// care much about mirroring `active_copies` but it has a fk
// constraint on `deployment_schemas` and is tiny, therefore it's
// easiest to just mirror it
const PUBLIC_TABLES: [&str; 3] = ["chains", "deployment_schemas", "active_copies"];
const SUBGRAPHS_TABLES: [&str; 3] = [
"subgraph_deployment_assignment",
"subgraph",
"subgraph_version",
];

fn run_query(conn: &mut PgConnection, query: String) -> Result<(), StoreError> {
conn.batch_execute(&query).map_err(StoreError::from)
}
Expand Down Expand Up @@ -1938,11 +1940,11 @@ impl Mirror {

// Truncate all tables at once, otherwise truncation can fail
// because of foreign key constraints
let tables = PUBLIC_TABLES
let tables = Self::PUBLIC_TABLES
.iter()
.map(|name| (NAMESPACE_PUBLIC, name))
.chain(
SUBGRAPHS_TABLES
Self::SUBGRAPHS_TABLES
.iter()
.map(|name| (NAMESPACE_SUBGRAPHS, name)),
)
Expand All @@ -1953,7 +1955,7 @@ impl Mirror {
check_cancel()?;

// Repopulate `PUBLIC_TABLES` by copying their data wholesale
for table_name in PUBLIC_TABLES {
for table_name in Self::PUBLIC_TABLES {
copy_table(
conn,
ForeignServer::PRIMARY_PUBLIC,
Expand Down
Loading