Skip to content

Commit 411c1c3

Browse files
committed
store: Create views in the primary that union the tables from shards
Fixes #4824 Fixes #4825
1 parent 1508c4d commit 411c1c3

File tree

2 files changed

+147
-18
lines changed

2 files changed

+147
-18
lines changed

store/postgres/src/catalog.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,58 @@ pub fn create_foreign_table(
626626
Ok(query)
627627
}
628628

629+
/// Create a SQL statement unioning imported tables from all shards,
630+
/// something like
631+
///
632+
/// ```sql
633+
/// create view "dst_nsp"."src_table" as
634+
/// select 'shard1' as shard, "col1", "col2" from "shard_shard1_subgraphs"."table_name"
635+
/// union all
636+
/// ...
637+
/// ````
638+
///
639+
/// The list `shard_nsps` consists of pairs `(name, namespace)` where `name`
640+
/// is the name of the shard and `namespace` is the namespace where the
641+
/// `src_table` is mapped
642+
pub fn create_cross_shard_view(
643+
conn: &mut PgConnection,
644+
src_nsp: &str,
645+
src_table: &str,
646+
dst_nsp: &str,
647+
shard_nsps: &[(&str, String)],
648+
) -> Result<String, StoreError> {
649+
fn build_query(
650+
columns: &[table_schema::Column],
651+
table_name: &str,
652+
dst_nsp: &str,
653+
shard_nsps: &[(&str, String)],
654+
) -> Result<String, std::fmt::Error> {
655+
let mut query = String::new();
656+
write!(query, "create view \"{}\".\"{}\" as ", dst_nsp, table_name)?;
657+
for (idx, (name, nsp)) in shard_nsps.into_iter().enumerate() {
658+
if idx > 0 {
659+
write!(query, " union all ")?;
660+
}
661+
write!(query, "select '{name}' as shard")?;
662+
for column in columns {
663+
write!(query, ", \"{}\"", column.column_name)?;
664+
}
665+
writeln!(query, " from \"{}\".\"{}\"", nsp, table_name)?;
666+
}
667+
Ok(query)
668+
}
669+
670+
let columns = table_schema::columns(conn, src_nsp, src_table)?;
671+
let query = build_query(&columns, src_table, dst_nsp, shard_nsps).map_err(|_| {
672+
anyhow!(
673+
"failed to generate 'create foreign table' query for {}.{}",
674+
dst_nsp,
675+
src_table
676+
)
677+
})?;
678+
Ok(query)
679+
}
680+
629681
/// Checks in the database if a given index is valid.
630682
pub(crate) fn check_index_is_valid(
631683
conn: &mut PgConnection,

store/postgres/src/connection_pool.rs

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,23 @@ use crate::primary::{self, NAMESPACE_PUBLIC};
3737
use crate::{advisory_lock, catalog};
3838
use crate::{Shard, PRIMARY_SHARD};
3939

40+
const SHARDED_TABLES: [(&str, &[&str]); 2] = [
41+
("public", &["ethereum_networks"]),
42+
(
43+
"subgraphs",
44+
&[
45+
"copy_state",
46+
"copy_table_state",
47+
"dynamic_ethereum_contract_data_source",
48+
"subgraph_deployment",
49+
"subgraph_error",
50+
"subgraph_features",
51+
"subgraph_manifest",
52+
"table_stats",
53+
],
54+
),
55+
];
56+
4057
pub struct ForeignServer {
4158
pub name: String,
4259
pub shard: Shard,
@@ -49,6 +66,7 @@ pub struct ForeignServer {
4966

5067
impl ForeignServer {
5168
pub(crate) const PRIMARY_PUBLIC: &'static str = "primary_public";
69+
pub(crate) const CROSS_SHARD_NSP: &'static str = "sharded";
5270

5371
/// The name of the foreign server under which data for `shard` is
5472
/// accessible
@@ -206,26 +224,10 @@ impl ForeignServer {
206224
/// Map the `subgraphs` schema from the foreign server `self` into the
207225
/// database accessible through `conn`
208226
fn map_metadata(&self, conn: &mut PgConnection) -> Result<(), StoreError> {
209-
const MAP_TABLES: [(&str, &[&str]); 2] = [
210-
("public", &["ethereum_networks"]),
211-
(
212-
"subgraphs",
213-
&[
214-
"copy_state",
215-
"copy_table_state",
216-
"dynamic_ethereum_contract_data_source",
217-
"subgraph_deployment",
218-
"subgraph_error",
219-
"subgraph_features",
220-
"subgraph_manifest",
221-
"table_stats",
222-
],
223-
),
224-
];
225227
let nsp = Self::metadata_schema(&self.shard);
226228
catalog::recreate_schema(conn, &nsp)?;
227229
let mut query = String::new();
228-
for (src_nsp, src_tables) in MAP_TABLES {
230+
for (src_nsp, src_tables) in SHARDED_TABLES {
229231
for src_table in src_tables {
230232
let create_stmt =
231233
catalog::create_foreign_table(conn, src_nsp, src_table, &nsp, &self.name)?;
@@ -1024,7 +1026,12 @@ impl PoolInner {
10241026
// in the database instead of just in memory
10251027
let result = pool
10261028
.configure_fdw(coord.servers.as_ref())
1027-
.and_then(|()| migrate_schema(&pool.logger, &mut conn));
1029+
.and_then(|()| pool.drop_cross_shard_views())
1030+
.and_then(|()| migrate_schema(&pool.logger, &mut conn))
1031+
.and_then(|count| {
1032+
pool.create_cross_shard_views(coord.servers.as_ref())
1033+
.map(|()| count)
1034+
});
10281035
debug!(&pool.logger, "Release migration lock");
10291036
advisory_lock::unlock_migration(&mut conn).unwrap_or_else(|err| {
10301037
die(&pool.logger, "failed to release migration lock", &err);
@@ -1077,6 +1084,76 @@ impl PoolInner {
10771084
})
10781085
}
10791086

1087+
/// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP`
1088+
fn drop_cross_shard_views(&self) -> Result<(), StoreError> {
1089+
if self.shard != *PRIMARY_SHARD {
1090+
return Ok(());
1091+
}
1092+
1093+
info!(&self.logger, "Dropping cross-shard views");
1094+
let mut conn = self.get()?;
1095+
conn.transaction(|conn| {
1096+
let query = format!(
1097+
"drop schema if exists {} cascade",
1098+
ForeignServer::CROSS_SHARD_NSP
1099+
);
1100+
conn.batch_execute(&query)?;
1101+
Ok(())
1102+
})
1103+
}
1104+
1105+
/// If this is the primary shard, create the namespace `CROSS_SHARD_NSP`
1106+
/// and populate it with tables that union various imported tables
1107+
fn create_cross_shard_views(&self, servers: &[ForeignServer]) -> Result<(), StoreError> {
1108+
fn shard_nsp_pairs<'a>(
1109+
current: &Shard,
1110+
local_nsp: &str,
1111+
servers: &'a [ForeignServer],
1112+
) -> Vec<(&'a str, String)> {
1113+
servers
1114+
.into_iter()
1115+
.map(|server| {
1116+
let nsp = if &server.shard == current {
1117+
local_nsp.to_string()
1118+
} else {
1119+
ForeignServer::metadata_schema(&server.shard)
1120+
};
1121+
(server.shard.as_str(), nsp)
1122+
})
1123+
.collect::<Vec<_>>()
1124+
}
1125+
1126+
if self.shard != *PRIMARY_SHARD {
1127+
return Ok(());
1128+
}
1129+
1130+
info!(&self.logger, "Creating cross-shard views");
1131+
let mut conn = self.get()?;
1132+
1133+
conn.transaction(|conn| {
1134+
let query = format!(
1135+
"create schema if not exists {}",
1136+
ForeignServer::CROSS_SHARD_NSP
1137+
);
1138+
conn.batch_execute(&query)?;
1139+
for (src_nsp, src_tables) in SHARDED_TABLES {
1140+
// Pairs of (shard, nsp) for all servers
1141+
let nsps = shard_nsp_pairs(&self.shard, src_nsp, servers);
1142+
for src_table in src_tables {
1143+
let create_view = catalog::create_cross_shard_view(
1144+
conn,
1145+
src_nsp,
1146+
src_table,
1147+
ForeignServer::CROSS_SHARD_NSP,
1148+
&nsps,
1149+
)?;
1150+
conn.batch_execute(&create_view)?;
1151+
}
1152+
}
1153+
Ok(())
1154+
})
1155+
}
1156+
10801157
/// Copy the data from key tables in the primary into our local schema
10811158
/// so it can be used as a fallback when the primary goes down
10821159
pub async fn mirror_primary_tables(&self) -> Result<(), StoreError> {

0 commit comments

Comments
 (0)