@@ -64,8 +64,6 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
6464/// the lag again
6565const REPLICATION_SLEEP : Duration = Duration :: from_secs ( 10 ) ;
6666
67- type PooledPgConnection = PooledConnection < ConnectionManager < PgConnection > > ;
68-
6967lazy_static ! {
7068 static ref STATEMENT_TIMEOUT : Option <String > = ENV_VARS
7169 . store
@@ -667,17 +665,72 @@ impl From<Result<CopyTableWorker, StoreError>> for WorkerResult {
667665 }
668666}
669667
668+ /// We pass connections back and forth between the control loop and various
669+ /// workers. We need to make sure that we end up with the connection that
670+ /// was used to acquire the copy lock in the right place so we can release
671+ /// the copy lock which is only possible with the connection that acquired
672+ /// it.
673+ ///
674+ /// This struct helps us with that. It wraps a connection and tracks whether
675+ /// the connection was used to acquire the copy lock
676+ struct LockTrackingConnection {
677+ inner : PooledConnection < ConnectionManager < PgConnection > > ,
678+ has_lock : bool ,
679+ }
680+
681+ impl LockTrackingConnection {
682+ fn new ( inner : PooledConnection < ConnectionManager < PgConnection > > ) -> Self {
683+ Self {
684+ inner,
685+ has_lock : false ,
686+ }
687+ }
688+
689+ fn transaction < T , F > ( & mut self , f : F ) -> Result < T , StoreError >
690+ where
691+ F : FnOnce ( & mut PgConnection ) -> Result < T , StoreError > ,
692+ {
693+ let conn = & mut self . inner ;
694+ conn. transaction ( |conn| f ( conn) )
695+ }
696+
697+ /// Put `self` into `other` if `self` has the lock.
698+ fn switch ( self , other : & mut Option < Self > ) {
699+ if self . has_lock {
700+ * other = Some ( self ) ;
701+ }
702+ }
703+
704+ fn lock ( & mut self , dst : & Site ) -> Result < ( ) , StoreError > {
705+ if self . has_lock {
706+ return Ok ( ( ) ) ;
707+ }
708+ advisory_lock:: lock_copying ( & mut self . inner , dst) ?;
709+ self . has_lock = true ;
710+ Ok ( ( ) )
711+ }
712+
713+ fn unlock ( & mut self , dst : & Site ) -> Result < ( ) , StoreError > {
714+ if !self . has_lock {
715+ return Ok ( ( ) ) ;
716+ }
717+ advisory_lock:: unlock_copying ( & mut self . inner , dst) ?;
718+ self . has_lock = false ;
719+ Ok ( ( ) )
720+ }
721+ }
722+
670723/// A helper to run copying of one table. We need to thread `conn` and
671724/// `table` from the control loop to the background worker and back again to
672725/// the control loop. This worker facilitates that
673726struct CopyTableWorker {
674- conn : PooledPgConnection ,
727+ conn : LockTrackingConnection ,
675728 table : TableState ,
676729 result : Result < Status , StoreError > ,
677730}
678731
679732impl CopyTableWorker {
680- fn new ( conn : PooledPgConnection , table : TableState ) -> Self {
733+ fn new ( conn : LockTrackingConnection , table : TableState ) -> Self {
681734 Self {
682735 conn,
683736 table,
@@ -699,7 +752,7 @@ impl CopyTableWorker {
699752 fn run_inner ( & mut self , logger : Logger , progress : & CopyProgress ) -> Result < Status , StoreError > {
700753 use Status :: * ;
701754
702- let conn = & mut self . conn ;
755+ let conn = & mut self . conn . inner ;
703756 progress. start_table ( & self . table ) ;
704757 while !self . table . finished ( ) {
705758 // It is important that this check happens outside the write
@@ -855,7 +908,7 @@ pub struct Connection {
855908 /// individual table. Except for that case, this will always be
856909 /// `Some(..)`. Most code shouldn't access `self.conn` directly, but use
857910 /// `self.transaction`
858- conn : Option < PooledPgConnection > ,
911+ conn : Option < LockTrackingConnection > ,
859912 pool : ConnectionPool ,
860913 primary : Primary ,
861914 workers : usize ,
@@ -901,9 +954,9 @@ impl Connection {
901954 }
902955 false
903956 } ) ?;
904- let conn = Some ( conn) ;
905957 let src_manifest_idx_and_name = Arc :: new ( src_manifest_idx_and_name) ;
906958 let dst_manifest_idx_and_name = Arc :: new ( dst_manifest_idx_and_name) ;
959+ let conn = Some ( LockTrackingConnection :: new ( conn) ) ;
907960 Ok ( Self {
908961 logger,
909962 conn,
@@ -990,6 +1043,7 @@ impl Connection {
9901043 let Some ( table) = state. unfinished . pop ( ) else {
9911044 return None ;
9921045 } ;
1046+ let conn = LockTrackingConnection :: new ( conn) ;
9931047
9941048 let worker = CopyTableWorker :: new ( conn, table) ;
9951049 Some ( Box :: pin (
@@ -1031,7 +1085,7 @@ impl Connection {
10311085 let result = workers. select ( ) . await ;
10321086 match result {
10331087 Ok ( worker) => {
1034- self . conn = Some ( worker . conn ) ;
1088+ worker . conn . switch ( & mut self . conn ) ;
10351089 }
10361090 Err ( e) => {
10371091 /* Ignore; we had an error previously */
@@ -1098,7 +1152,7 @@ impl Connection {
10981152 W :: Ok ( worker) => {
10991153 // Put the connection back into self.conn so that we can use it
11001154 // in the next iteration.
1101- self . conn = Some ( worker . conn ) ;
1155+ worker . conn . switch ( & mut self . conn ) ;
11021156
11031157 match ( worker. result , progress. is_cancelled ( ) ) {
11041158 ( Ok ( Status :: Finished ) , false ) => {
@@ -1207,20 +1261,30 @@ impl Connection {
12071261 ) ;
12081262
12091263 let dst_site = self . dst . site . cheap_clone ( ) ;
1210- self . transaction ( |conn| advisory_lock:: lock_copying ( conn, & dst_site) ) ?;
1264+ let Some ( conn) = self . conn . as_mut ( ) else {
1265+ return Err ( constraint_violation ! (
1266+ "copy connection went missing (copy_data)"
1267+ ) ) ;
1268+ } ;
1269+ conn. lock ( & dst_site) ?;
12111270
12121271 let res = self . copy_data_internal ( index_list) . await ;
12131272
1214- if self . conn . is_none ( ) {
1215- // A background worker panicked and left us without our
1216- // dedicated connection, but we still need to release the copy
1217- // lock; get a normal connection, not from the fdw pool for that
1218- // as that will be much less contended. We won't be holding on
1219- // to the connection for long as `res` will be an error and we
1220- // will abort starting this subgraph
1221- self . conn = Some ( self . pool . get ( ) ?) ;
1273+ match self . conn . as_mut ( ) {
1274+ None => {
1275+ // A background worker panicked and left us without our
1276+ // dedicated connection; we would need to get that
1277+ // connection to unlock the advisory lock. We can't do that,
1278+ // so we just log an error
1279+ warn ! (
1280+ self . logger,
1281+ "can't unlock copy lock since the default worker panicked; lock will linger until session ends"
1282+ ) ;
1283+ }
1284+ Some ( conn) => {
1285+ conn. unlock ( & dst_site) ?;
1286+ }
12221287 }
1223- self . transaction ( |conn| advisory_lock:: unlock_copying ( conn, & dst_site) ) ?;
12241288
12251289 if matches ! ( res, Ok ( Status :: Cancelled ) ) {
12261290 warn ! ( & self . logger, "Copying was cancelled and is incomplete" ) ;
0 commit comments