File tree Expand file tree Collapse file tree 2 files changed +16
-12
lines changed
Expand file tree Collapse file tree 2 files changed +16
-12
lines changed Original file line number Diff line number Diff line change @@ -641,16 +641,22 @@ impl ConnectionPool {
641641
642642 /// Get a connection from the pool for foreign data wrapper access if
643643 /// one is available
644- pub fn try_get_fdw (
644+ pub async fn try_get_fdw (
645645 & self ,
646646 logger : & Logger ,
647647 timeout : Duration ,
648648 ) -> Option < PooledConnection < ConnectionManager < PgConnection > > > {
649- let Ok ( inner) = self . get_ready ( ) else {
650- return None ;
651- } ;
652- self . state_tracker
653- . ignore_timeout ( || inner. try_get_fdw ( logger, timeout) )
649+ let pool = self . clone ( ) ;
650+ let logger = logger. cheap_clone ( ) ;
651+ tokio:: task:: spawn_blocking ( move || {
652+ let Ok ( inner) = pool. get_ready ( ) else {
653+ return None ;
654+ } ;
655+ pool. state_tracker
656+ . ignore_timeout ( || inner. try_get_fdw ( & logger, timeout) )
657+ } )
658+ . await
659+ . unwrap_or ( None )
654660 }
655661
656662 pub ( crate ) async fn query_permit ( & self ) -> QueryPermit {
Original file line number Diff line number Diff line change @@ -1033,19 +1033,17 @@ impl Connection {
10331033 /// Opportunistically create an extra worker if we have more tables to
10341034 /// copy and there are idle fdw connections. If there are no more tables
10351035 /// or no idle connections, this will return `None`.
1036- fn extra_worker (
1036+ async fn extra_worker (
10371037 & mut self ,
10381038 state : & mut CopyState ,
10391039 progress : & Arc < CopyProgress > ,
10401040 ) -> Option < Pin < Box < dyn Future < Output = WorkerResult > > > > {
10411041 // It's important that we get the connection before the table since
10421042 // we remove the table from the state and could drop it otherwise
1043- let Some ( conn) = self
1043+ let conn = self
10441044 . pool
10451045 . try_get_fdw ( & self . logger , ENV_VARS . store . batch_worker_wait )
1046- else {
1047- return None ;
1048- } ;
1046+ . await ?;
10491047 let Some ( table) = state. unfinished . pop ( ) else {
10501048 return None ;
10511049 } ;
@@ -1134,7 +1132,7 @@ impl Connection {
11341132 if workers. len ( ) >= self . workers {
11351133 break ;
11361134 }
1137- let Some ( worker) = self . extra_worker ( & mut state, & progress) else {
1135+ let Some ( worker) = self . extra_worker ( & mut state, & progress) . await else {
11381136 break ;
11391137 } ;
11401138 workers. add ( worker) ;
You can’t perform that action at this time.
0 commit comments