@@ -32,12 +32,13 @@ use diesel::{
3232} ;
3333use graph:: {
3434 constraint_violation,
35- futures03:: future:: select_all,
35+ futures03:: { future:: select_all, FutureExt as _ } ,
3636 prelude:: {
3737 info, lazy_static, o, warn, BlockNumber , BlockPtr , CheapClone , Logger , StoreError , ENV_VARS ,
3838 } ,
3939 schema:: EntityType ,
4040 slog:: { debug, error} ,
41+ tokio,
4142} ;
4243use itertools:: Itertools ;
4344
@@ -687,6 +688,21 @@ impl CopyProgress {
687688 }
688689}
689690
691+ enum WorkerResult {
692+ Ok ( CopyTableWorker ) ,
693+ Err ( StoreError ) ,
694+ Wake ,
695+ }
696+
697+ impl From < Result < CopyTableWorker , StoreError > > for WorkerResult {
698+ fn from ( result : Result < CopyTableWorker , StoreError > ) -> Self {
699+ match result {
700+ Ok ( worker) => WorkerResult :: Ok ( worker) ,
701+ Err ( e) => WorkerResult :: Err ( e) ,
702+ }
703+ }
704+ }
705+
690706/// A helper to run copying of one table. We need to thread `conn` and
691707/// `table` from the control loop to the background worker and back again to
692708/// the control loop. This worker facilitates that
@@ -705,18 +721,15 @@ impl CopyTableWorker {
705721 }
706722 }
707723
708- async fn run (
709- mut self ,
710- logger : Logger ,
711- progress : Arc < CopyProgress > ,
712- ) -> Result < Self , StoreError > {
724+ async fn run ( mut self , logger : Logger , progress : Arc < CopyProgress > ) -> WorkerResult {
713725 let object = self . table . dst . object . cheap_clone ( ) ;
714726 graph:: spawn_blocking_allow_panic ( move || {
715727 self . result = self . run_inner ( logger, & progress) ;
716728 self
717729 } )
718730 . await
719731 . map_err ( |e| constraint_violation ! ( "copy worker for {} panicked: {}" , object, e) )
732+ . into ( )
720733 }
721734
722735 fn run_inner ( & mut self , logger : Logger , progress : & CopyProgress ) -> Result < Status , StoreError > {
@@ -812,6 +825,57 @@ impl CopyTableWorker {
812825 }
813826}
814827
828+ /// A helper to manage the workers that are copying data. Besides the actual
829+ /// workers it also keeps a worker that wakes us up periodically to give us
830+ /// a chance to create more workers if there are database connections
831+ /// available
832+ struct Workers {
833+ /// The list of workers that are currently running. This will always
834+ /// include a future that wakes us up periodically
835+ futures : Vec < Pin < Box < dyn Future < Output = WorkerResult > > > > ,
836+ }
837+
838+ impl Workers {
839+ fn new ( ) -> Self {
840+ Self {
841+ futures : vec ! [ Self :: waker( ) ] ,
842+ }
843+ }
844+
845+ fn add ( & mut self , worker : Pin < Box < dyn Future < Output = WorkerResult > > > ) {
846+ self . futures . push ( worker) ;
847+ }
848+
849+ fn has_work ( & self ) -> bool {
850+ self . futures . len ( ) > 1
851+ }
852+
853+ async fn select ( & mut self ) -> WorkerResult {
854+ use WorkerResult :: * ;
855+
856+ let futures = std:: mem:: take ( & mut self . futures ) ;
857+ let ( result, _idx, remaining) = select_all ( futures) . await ;
858+ self . futures = remaining;
859+ match result {
860+ Ok ( _) | Err ( _) => { /* nothing to do */ }
861+ Wake => {
862+ self . futures . push ( Self :: waker ( ) ) ;
863+ }
864+ }
865+ result
866+ }
867+
868+ fn waker ( ) -> Pin < Box < dyn Future < Output = WorkerResult > > > {
869+ let sleep = tokio:: time:: sleep ( ENV_VARS . store . batch_target_duration ) ;
870+ Box :: pin ( sleep. map ( |( ) | WorkerResult :: Wake ) )
871+ }
872+
873+ /// Return the number of workers that are not the waker
874+ fn len ( & self ) -> usize {
875+ self . futures . len ( ) - 1
876+ }
877+ }
878+
815879/// A helper for copying subgraphs
816880pub struct Connection {
817881 /// The connection pool for the shard that will contain the destination
@@ -926,7 +990,7 @@ impl Connection {
926990 & mut self ,
927991 state : & mut CopyState ,
928992 progress : & Arc < CopyProgress > ,
929- ) -> Option < Pin < Box < dyn Future < Output = Result < CopyTableWorker , StoreError > > > > > {
993+ ) -> Option < Pin < Box < dyn Future < Output = WorkerResult > > > > {
930994 let Some ( conn) = self . conn . take ( ) else {
931995 return None ;
932996 } ;
@@ -947,7 +1011,7 @@ impl Connection {
9471011 & mut self ,
9481012 state : & mut CopyState ,
9491013 progress : & Arc < CopyProgress > ,
950- ) -> Option < Pin < Box < dyn Future < Output = Result < CopyTableWorker , StoreError > > > > > {
1014+ ) -> Option < Pin < Box < dyn Future < Output = WorkerResult > > > > {
9511015 // It's important that we get the connection before the table since
9521016 // we remove the table from the state and could drop it otherwise
9531017 let Some ( conn) = self
@@ -989,19 +1053,15 @@ impl Connection {
9891053
9901054 /// Wait for all workers to finish. This is called when we a worker has
9911055 /// failed with an error that forces us to abort copying
992- async fn cancel_workers (
993- & mut self ,
994- progress : Arc < CopyProgress > ,
995- mut workers : Vec < Pin < Box < dyn Future < Output = Result < CopyTableWorker , StoreError > > > > > ,
996- ) {
1056+ async fn cancel_workers ( & mut self , progress : Arc < CopyProgress > , mut workers : Workers ) {
9971057 progress. cancel ( ) ;
9981058 error ! (
9991059 self . logger,
10001060 "copying encountered an error; waiting for all workers to finish"
10011061 ) ;
1002- while ! workers. is_empty ( ) {
1003- let ( result , _ , remaining ) = select_all ( workers ) . await ;
1004- workers = remaining ;
1062+ while workers. has_work ( ) {
1063+ use WorkerResult :: * ;
1064+ let result = workers . select ( ) . await ;
10051065 match result {
10061066 Ok ( worker) => {
10071067 self . conn = Some ( worker. conn ) ;
@@ -1010,6 +1070,7 @@ impl Connection {
10101070 /* Ignore; we had an error previously */
10111071 error ! ( self . logger, "copy worker panicked: {}" , e) ;
10121072 }
1073+ Wake => { /* Ignore; this is just a waker */ }
10131074 }
10141075 }
10151076 }
@@ -1031,14 +1092,14 @@ impl Connection {
10311092 //
10321093 // The loop has to be very careful about terminating early so that
10331094 // we do not ever leave the loop with `self.conn == None`
1034- let mut workers = Vec :: new ( ) ;
1035- while !state. unfinished . is_empty ( ) || ! workers. is_empty ( ) {
1095+ let mut workers = Workers :: new ( ) ;
1096+ while !state. unfinished . is_empty ( ) || workers. has_work ( ) {
10361097 // We usually add at least one job here, except if we are out of
10371098 // tables to copy. In that case, we go through the `while` loop
10381099 // every time one of the tables we are currently copying
10391100 // finishes
10401101 if let Some ( worker) = self . default_worker ( & mut state, & progress) {
1041- workers. push ( worker) ;
1102+ workers. add ( worker) ;
10421103 }
10431104 loop {
10441105 if workers. len ( ) >= self . workers {
@@ -1047,24 +1108,24 @@ impl Connection {
10471108 let Some ( worker) = self . extra_worker ( & mut state, & progress) else {
10481109 break ;
10491110 } ;
1050- workers. push ( worker) ;
1111+ workers. add ( worker) ;
10511112 }
10521113
10531114 self . assert_progress ( workers. len ( ) , & state) ?;
1054- let ( result, _idx, remaining) = select_all ( workers) . await ;
1055- workers = remaining;
1115+ let result = workers. select ( ) . await ;
10561116
10571117 // Analyze `result` and take another trip through the loop if
10581118 // everything is ok; wait for pending workers and return if
10591119 // there was an error or if copying was cancelled.
1120+ use WorkerResult as W ;
10601121 match result {
1061- Err ( e) => {
1122+ W :: Err ( e) => {
10621123 // This is a panic in the background task. We need to
10631124 // cancel all other tasks and return the error
10641125 self . cancel_workers ( progress, workers) . await ;
10651126 return Err ( e) ;
10661127 }
1067- Ok ( worker) => {
1128+ W :: Ok ( worker) => {
10681129 // Put the connection back into self.conn so that we can use it
10691130 // in the next iteration.
10701131 self . conn = Some ( worker. conn ) ;
@@ -1090,6 +1151,10 @@ impl Connection {
10901151 }
10911152 }
10921153 }
1154+ W :: Wake => {
1155+ // nothing to do, just try to create more workers by
1156+ // going through the loop again
1157+ }
10931158 } ;
10941159 }
10951160 debug_assert ! ( self . conn. is_some( ) ) ;
0 commit comments