@@ -64,8 +64,6 @@ const ACCEPTABLE_REPLICATION_LAG: Duration = Duration::from_secs(30);
6464/// the lag again
6565const REPLICATION_SLEEP : Duration = Duration :: from_secs ( 10 ) ;
6666
67- type PooledPgConnection = PooledConnection < ConnectionManager < PgConnection > > ;
68-
6967lazy_static ! {
7068 static ref STATEMENT_TIMEOUT : Option <String > = ENV_VARS
7169 . store
@@ -667,17 +665,77 @@ impl From<Result<CopyTableWorker, StoreError>> for WorkerResult {
667665 }
668666}
669667
668+ /// We pass connections back and forth between the control loop and various
669+ /// workers. We need to make sure that we end up with the connection that
670+ /// was used to acquire the copy lock in the right place so we can release
671+ /// the copy lock which is only possible with the connection that acquired
672+ /// it.
673+ ///
674+ /// This struct helps us with that. It wraps a connection and tracks whether
675+ /// the connection was used to acquire the copy lock
676+ struct LockTrackingConnection {
677+ inner : PooledConnection < ConnectionManager < PgConnection > > ,
678+ has_lock : bool ,
679+ }
680+
681+ impl LockTrackingConnection {
682+ fn new ( inner : PooledConnection < ConnectionManager < PgConnection > > ) -> Self {
683+ Self {
684+ inner,
685+ has_lock : false ,
686+ }
687+ }
688+
689+ fn transaction < T , F > ( & mut self , f : F ) -> Result < T , StoreError >
690+ where
691+ F : FnOnce ( & mut PgConnection ) -> Result < T , StoreError > ,
692+ {
693+ let conn = & mut self . inner ;
694+ conn. transaction ( |conn| f ( conn) )
695+ }
696+
697+ /// Put `self` into `other` if `self` has the lock.
698+ fn extract ( self , other : & mut Option < Self > ) {
699+ if self . has_lock {
700+ * other = Some ( self ) ;
701+ }
702+ }
703+
704+ fn lock ( & mut self , logger : & Logger , dst : & Site ) -> Result < ( ) , StoreError > {
705+ if self . has_lock {
706+ warn ! ( logger, "already acquired copy lock for {}" , dst) ;
707+ return Ok ( ( ) ) ;
708+ }
709+ advisory_lock:: lock_copying ( & mut self . inner , dst) ?;
710+ self . has_lock = true ;
711+ Ok ( ( ) )
712+ }
713+
714+ fn unlock ( & mut self , logger : & Logger , dst : & Site ) -> Result < ( ) , StoreError > {
715+ if !self . has_lock {
716+ error ! (
717+ logger,
718+ "tried to release copy lock for {} even though we are not the owner" , dst
719+ ) ;
720+ return Ok ( ( ) ) ;
721+ }
722+ advisory_lock:: unlock_copying ( & mut self . inner , dst) ?;
723+ self . has_lock = false ;
724+ Ok ( ( ) )
725+ }
726+ }
727+
670728/// A helper to run copying of one table. We need to thread `conn` and
671729/// `table` from the control loop to the background worker and back again to
672730/// the control loop. This worker facilitates that
673731struct CopyTableWorker {
674- conn : PooledPgConnection ,
732+ conn : LockTrackingConnection ,
675733 table : TableState ,
676734 result : Result < Status , StoreError > ,
677735}
678736
679737impl CopyTableWorker {
680- fn new ( conn : PooledPgConnection , table : TableState ) -> Self {
738+ fn new ( conn : LockTrackingConnection , table : TableState ) -> Self {
681739 Self {
682740 conn,
683741 table,
@@ -699,7 +757,7 @@ impl CopyTableWorker {
699757 fn run_inner ( & mut self , logger : Logger , progress : & CopyProgress ) -> Result < Status , StoreError > {
700758 use Status :: * ;
701759
702- let conn = & mut self . conn ;
760+ let conn = & mut self . conn . inner ;
703761 progress. start_table ( & self . table ) ;
704762 while !self . table . finished ( ) {
705763 // It is important that this check happens outside the write
@@ -855,7 +913,7 @@ pub struct Connection {
855913 /// individual table. Except for that case, this will always be
856914 /// `Some(..)`. Most code shouldn't access `self.conn` directly, but use
857915 /// `self.transaction`
858- conn : Option < PooledPgConnection > ,
916+ conn : Option < LockTrackingConnection > ,
859917 pool : ConnectionPool ,
860918 primary : Primary ,
861919 workers : usize ,
@@ -901,9 +959,9 @@ impl Connection {
901959 }
902960 false
903961 } ) ?;
904- let conn = Some ( conn) ;
905962 let src_manifest_idx_and_name = Arc :: new ( src_manifest_idx_and_name) ;
906963 let dst_manifest_idx_and_name = Arc :: new ( dst_manifest_idx_and_name) ;
964+ let conn = Some ( LockTrackingConnection :: new ( conn) ) ;
907965 Ok ( Self {
908966 logger,
909967 conn,
@@ -990,6 +1048,7 @@ impl Connection {
9901048 let Some ( table) = state. unfinished . pop ( ) else {
9911049 return None ;
9921050 } ;
1051+ let conn = LockTrackingConnection :: new ( conn) ;
9931052
9941053 let worker = CopyTableWorker :: new ( conn, table) ;
9951054 Some ( Box :: pin (
@@ -1031,7 +1090,7 @@ impl Connection {
10311090 let result = workers. select ( ) . await ;
10321091 match result {
10331092 Ok ( worker) => {
1034- self . conn = Some ( worker . conn ) ;
1093+ worker . conn . extract ( & mut self . conn ) ;
10351094 }
10361095 Err ( e) => {
10371096 /* Ignore; we had an error previously */
@@ -1098,7 +1157,7 @@ impl Connection {
10981157 W :: Ok ( worker) => {
10991158 // Put the connection back into self.conn so that we can use it
11001159 // in the next iteration.
1101- self . conn = Some ( worker . conn ) ;
1160+ worker . conn . extract ( & mut self . conn ) ;
11021161
11031162 match ( worker. result , progress. is_cancelled ( ) ) {
11041163 ( Ok ( Status :: Finished ) , false ) => {
@@ -1207,20 +1266,30 @@ impl Connection {
12071266 ) ;
12081267
12091268 let dst_site = self . dst . site . cheap_clone ( ) ;
1210- self . transaction ( |conn| advisory_lock:: lock_copying ( conn, & dst_site) ) ?;
1269+ let Some ( conn) = self . conn . as_mut ( ) else {
1270+ return Err ( constraint_violation ! (
1271+ "copy connection went missing (copy_data)"
1272+ ) ) ;
1273+ } ;
1274+ conn. lock ( & self . logger , & dst_site) ?;
12111275
12121276 let res = self . copy_data_internal ( index_list) . await ;
12131277
1214- if self . conn . is_none ( ) {
1215- // A background worker panicked and left us without our
1216- // dedicated connection, but we still need to release the copy
1217- // lock; get a normal connection, not from the fdw pool for that
1218- // as that will be much less contended. We won't be holding on
1219- // to the connection for long as `res` will be an error and we
1220- // will abort starting this subgraph
1221- self . conn = Some ( self . pool . get ( ) ?) ;
1278+ match self . conn . as_mut ( ) {
1279+ None => {
1280+ // A background worker panicked and left us without our
1281+ // dedicated connection; we would need to get that
1282+ // connection to unlock the advisory lock. We can't do that,
1283+ // so we just log an error
1284+ warn ! (
1285+ self . logger,
1286+ "can't unlock copy lock since the default worker panicked; lock will linger until session ends"
1287+ ) ;
1288+ }
1289+ Some ( conn) => {
1290+ conn. unlock ( & self . logger , & dst_site) ?;
1291+ }
12221292 }
1223- self . transaction ( |conn| advisory_lock:: unlock_copying ( conn, & dst_site) ) ?;
12241293
12251294 if matches ! ( res, Ok ( Status :: Cancelled ) ) {
12261295 warn ! ( & self . logger, "Copying was cancelled and is incomplete" ) ;
0 commit comments