@@ -37,7 +37,7 @@ use graph::{
3737 info, lazy_static, o, warn, BlockNumber , BlockPtr , CheapClone , Logger , StoreError , ENV_VARS ,
3838 } ,
3939 schema:: EntityType ,
40- slog:: error,
40+ slog:: { debug , error} ,
4141} ;
4242use itertools:: Itertools ;
4343
@@ -113,16 +113,33 @@ table! {
113113}
114114
115115/// Return `true` if the site is the source of a copy operation. The copy
116- /// operation might be just queued or in progress already
117- pub fn is_source ( conn : & mut PgConnection , site : & Site ) -> Result < bool , StoreError > {
116+ /// operation might be just queued or in progress already. This method will
117+ /// block until a fdw connection becomes available.
118+ pub fn is_source ( logger : & Logger , pool : & ConnectionPool , site : & Site ) -> Result < bool , StoreError > {
118119 use active_copies as ac;
119120
121+ // We use a fdw connection to check if the site is being copied. If we
122+ // used an ordinary connection and there are many calls to this method,
123+ // postgres_fdw might open an unmanageable number of connections into
124+ // the primary, which makes the primary run out of connections
125+ let mut last_log = Instant :: now ( ) ;
126+ let mut conn = pool. get_fdw ( & logger, || {
127+ if last_log. elapsed ( ) > LOG_INTERVAL {
128+ last_log = Instant :: now ( ) ;
129+ debug ! (
130+ logger,
131+ "Waiting for fdw connection to check if site {} is being copied" , site. namespace
132+ ) ;
133+ }
134+ false
135+ } ) ?;
136+
120137 select ( diesel:: dsl:: exists (
121138 ac:: table
122139 . filter ( ac:: src. eq ( site. id ) )
123140 . filter ( ac:: cancelled_at. is_null ( ) ) ,
124141 ) )
125- . get_result :: < bool > ( conn)
142+ . get_result :: < bool > ( & mut conn)
126143 . map_err ( StoreError :: from)
127144}
128145
0 commit comments