@@ -37,6 +37,23 @@ use crate::primary::{self, NAMESPACE_PUBLIC};
3737use crate :: { advisory_lock, catalog} ;
3838use crate :: { Shard , PRIMARY_SHARD } ;
3939
40+ const SHARDED_TABLES : [ ( & str , & [ & str ] ) ; 2 ] = [
41+ ( "public" , & [ "ethereum_networks" ] ) ,
42+ (
43+ "subgraphs" ,
44+ & [
45+ "copy_state" ,
46+ "copy_table_state" ,
47+ "dynamic_ethereum_contract_data_source" ,
48+ "subgraph_deployment" ,
49+ "subgraph_error" ,
50+ "subgraph_features" ,
51+ "subgraph_manifest" ,
52+ "table_stats" ,
53+ ] ,
54+ ) ,
55+ ] ;
56+
4057pub struct ForeignServer {
4158 pub name : String ,
4259 pub shard : Shard ,
@@ -49,6 +66,7 @@ pub struct ForeignServer {
4966
5067impl ForeignServer {
5168 pub ( crate ) const PRIMARY_PUBLIC : & ' static str = "primary_public" ;
69+ pub ( crate ) const CROSS_SHARD_NSP : & ' static str = "sharded" ;
5270
5371 /// The name of the foreign server under which data for `shard` is
5472 /// accessible
@@ -206,26 +224,10 @@ impl ForeignServer {
206224 /// Map the `subgraphs` schema from the foreign server `self` into the
207225 /// database accessible through `conn`
208226 fn map_metadata ( & self , conn : & mut PgConnection ) -> Result < ( ) , StoreError > {
209- const MAP_TABLES : [ ( & str , & [ & str ] ) ; 2 ] = [
210- ( "public" , & [ "ethereum_networks" ] ) ,
211- (
212- "subgraphs" ,
213- & [
214- "copy_state" ,
215- "copy_table_state" ,
216- "dynamic_ethereum_contract_data_source" ,
217- "subgraph_deployment" ,
218- "subgraph_error" ,
219- "subgraph_features" ,
220- "subgraph_manifest" ,
221- "table_stats" ,
222- ] ,
223- ) ,
224- ] ;
225227 let nsp = Self :: metadata_schema ( & self . shard ) ;
226228 catalog:: recreate_schema ( conn, & nsp) ?;
227229 let mut query = String :: new ( ) ;
228- for ( src_nsp, src_tables) in MAP_TABLES {
230+ for ( src_nsp, src_tables) in SHARDED_TABLES {
229231 for src_table in src_tables {
230232 let create_stmt =
231233 catalog:: create_foreign_table ( conn, src_nsp, src_table, & nsp, & self . name ) ?;
@@ -1024,7 +1026,12 @@ impl PoolInner {
10241026 // in the database instead of just in memory
10251027 let result = pool
10261028 . configure_fdw ( coord. servers . as_ref ( ) )
1027- . and_then ( |( ) | migrate_schema ( & pool. logger , & mut conn) ) ;
1029+ . and_then ( |( ) | pool. drop_cross_shard_views ( ) )
1030+ . and_then ( |( ) | migrate_schema ( & pool. logger , & mut conn) )
1031+ . and_then ( |count| {
1032+ pool. create_cross_shard_views ( coord. servers . as_ref ( ) )
1033+ . map ( |( ) | count)
1034+ } ) ;
10281035 debug ! ( & pool. logger, "Release migration lock" ) ;
10291036 advisory_lock:: unlock_migration ( & mut conn) . unwrap_or_else ( |err| {
10301037 die ( & pool. logger , "failed to release migration lock" , & err) ;
@@ -1077,6 +1084,76 @@ impl PoolInner {
10771084 } )
10781085 }
10791086
1087+ /// If this is the primary shard, drop the namespace `CROSS_SHARD_NSP`
1088+ fn drop_cross_shard_views ( & self ) -> Result < ( ) , StoreError > {
1089+ if self . shard != * PRIMARY_SHARD {
1090+ return Ok ( ( ) ) ;
1091+ }
1092+
1093+ info ! ( & self . logger, "Dropping cross-shard views" ) ;
1094+ let mut conn = self . get ( ) ?;
1095+ conn. transaction ( |conn| {
1096+ let query = format ! (
1097+ "drop schema if exists {} cascade" ,
1098+ ForeignServer :: CROSS_SHARD_NSP
1099+ ) ;
1100+ conn. batch_execute ( & query) ?;
1101+ Ok ( ( ) )
1102+ } )
1103+ }
1104+
1105+ /// If this is the primary shard, create the namespace `CROSS_SHARD_NSP`
1106+ /// and populate it with tables that union various imported tables
1107+ fn create_cross_shard_views ( & self , servers : & [ ForeignServer ] ) -> Result < ( ) , StoreError > {
1108+ fn shard_nsp_pairs < ' a > (
1109+ current : & Shard ,
1110+ local_nsp : & str ,
1111+ servers : & ' a [ ForeignServer ] ,
1112+ ) -> Vec < ( & ' a str , String ) > {
1113+ servers
1114+ . into_iter ( )
1115+ . map ( |server| {
1116+ let nsp = if & server. shard == current {
1117+ local_nsp. to_string ( )
1118+ } else {
1119+ ForeignServer :: metadata_schema ( & server. shard )
1120+ } ;
1121+ ( server. shard . as_str ( ) , nsp)
1122+ } )
1123+ . collect :: < Vec < _ > > ( )
1124+ }
1125+
1126+ if self . shard != * PRIMARY_SHARD {
1127+ return Ok ( ( ) ) ;
1128+ }
1129+
1130+ info ! ( & self . logger, "Creating cross-shard views" ) ;
1131+ let mut conn = self . get ( ) ?;
1132+
1133+ conn. transaction ( |conn| {
1134+ let query = format ! (
1135+ "create schema if not exists {}" ,
1136+ ForeignServer :: CROSS_SHARD_NSP
1137+ ) ;
1138+ conn. batch_execute ( & query) ?;
1139+ for ( src_nsp, src_tables) in SHARDED_TABLES {
1140+ // Pairs of (shard, nsp) for all servers
1141+ let nsps = shard_nsp_pairs ( & self . shard , src_nsp, servers) ;
1142+ for src_table in src_tables {
1143+ let create_view = catalog:: create_cross_shard_view (
1144+ conn,
1145+ src_nsp,
1146+ src_table,
1147+ ForeignServer :: CROSS_SHARD_NSP ,
1148+ & nsps,
1149+ ) ?;
1150+ conn. batch_execute ( & create_view) ?;
1151+ }
1152+ }
1153+ Ok ( ( ) )
1154+ } )
1155+ }
1156+
10801157 /// Copy the data from key tables in the primary into our local schema
10811158 /// so it can be used as a fallback when the primary goes down
10821159 pub async fn mirror_primary_tables ( & self ) -> Result < ( ) , StoreError > {
0 commit comments