diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index e0ae71eab3e..effe2950ee2 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -32,12 +32,13 @@ use diesel::{ }; use graph::{ constraint_violation, - futures03::future::select_all, + futures03::{future::select_all, FutureExt as _}, prelude::{ info, lazy_static, o, warn, BlockNumber, BlockPtr, CheapClone, Logger, StoreError, ENV_VARS, }, schema::EntityType, slog::{debug, error}, + tokio, }; use itertools::Itertools; @@ -687,6 +688,21 @@ impl CopyProgress { } } +enum WorkerResult { + Ok(CopyTableWorker), + Err(StoreError), + Wake, +} + +impl From> for WorkerResult { + fn from(result: Result) -> Self { + match result { + Ok(worker) => WorkerResult::Ok(worker), + Err(e) => WorkerResult::Err(e), + } + } +} + /// A helper to run copying of one table. We need to thread `conn` and /// `table` from the control loop to the background worker and back again to /// the control loop. This worker facilitates that @@ -705,11 +721,7 @@ impl CopyTableWorker { } } - async fn run( - mut self, - logger: Logger, - progress: Arc, - ) -> Result { + async fn run(mut self, logger: Logger, progress: Arc) -> WorkerResult { let object = self.table.dst.object.cheap_clone(); graph::spawn_blocking_allow_panic(move || { self.result = self.run_inner(logger, &progress); @@ -717,6 +729,7 @@ impl CopyTableWorker { }) .await .map_err(|e| constraint_violation!("copy worker for {} panicked: {}", object, e)) + .into() } fn run_inner(&mut self, logger: Logger, progress: &CopyProgress) -> Result { @@ -812,6 +825,57 @@ impl CopyTableWorker { } } +/// A helper to manage the workers that are copying data. Besides the actual +/// workers it also keeps a worker that wakes us up periodically to give us +/// a chance to create more workers if there are database connections +/// available +struct Workers { + /// The list of workers that are currently running. This will always + /// include a future that wakes us up periodically + futures: Vec>>>, +} + +impl Workers { + fn new() -> Self { + Self { + futures: vec![Self::waker()], + } + } + + fn add(&mut self, worker: Pin>>) { + self.futures.push(worker); + } + + fn has_work(&self) -> bool { + self.futures.len() > 1 + } + + async fn select(&mut self) -> WorkerResult { + use WorkerResult::*; + + let futures = std::mem::take(&mut self.futures); + let (result, _idx, remaining) = select_all(futures).await; + self.futures = remaining; + match result { + Ok(_) | Err(_) => { /* nothing to do */ } + Wake => { + self.futures.push(Self::waker()); + } + } + result + } + + fn waker() -> Pin>> { + let sleep = tokio::time::sleep(ENV_VARS.store.batch_target_duration); + Box::pin(sleep.map(|()| WorkerResult::Wake)) + } + + /// Return the number of workers that are not the waker + fn len(&self) -> usize { + self.futures.len() - 1 + } +} + /// A helper for copying subgraphs pub struct Connection { /// The connection pool for the shard that will contain the destination @@ -926,7 +990,7 @@ impl Connection { &mut self, state: &mut CopyState, progress: &Arc, - ) -> Option>>>> { + ) -> Option>>> { let Some(conn) = self.conn.take() else { return None; }; @@ -947,7 +1011,7 @@ impl Connection { &mut self, state: &mut CopyState, progress: &Arc, - ) -> Option>>>> { + ) -> Option>>> { // It's important that we get the connection before the table since // we remove the table from the state and could drop it otherwise let Some(conn) = self @@ -989,19 +1053,15 @@ impl Connection { /// Wait for all workers to finish. This is called when we a worker has /// failed with an error that forces us to abort copying - async fn cancel_workers( - &mut self, - progress: Arc, - mut workers: Vec>>>>, - ) { + async fn cancel_workers(&mut self, progress: Arc, mut workers: Workers) { progress.cancel(); error!( self.logger, "copying encountered an error; waiting for all workers to finish" ); - while !workers.is_empty() { - let (result, _, remaining) = select_all(workers).await; - workers = remaining; + while workers.has_work() { + use WorkerResult::*; + let result = workers.select().await; match result { Ok(worker) => { self.conn = Some(worker.conn); @@ -1010,6 +1070,7 @@ impl Connection { /* Ignore; we had an error previously */ error!(self.logger, "copy worker panicked: {}", e); } + Wake => { /* Ignore; this is just a waker */ } } } } @@ -1031,14 +1092,14 @@ impl Connection { // // The loop has to be very careful about terminating early so that // we do not ever leave the loop with `self.conn == None` - let mut workers = Vec::new(); - while !state.unfinished.is_empty() || !workers.is_empty() { + let mut workers = Workers::new(); + while !state.unfinished.is_empty() || workers.has_work() { // We usually add at least one job here, except if we are out of // tables to copy. In that case, we go through the `while` loop // every time one of the tables we are currently copying // finishes if let Some(worker) = self.default_worker(&mut state, &progress) { - workers.push(worker); + workers.add(worker); } loop { if workers.len() >= self.workers { @@ -1047,24 +1108,24 @@ impl Connection { let Some(worker) = self.extra_worker(&mut state, &progress) else { break; }; - workers.push(worker); + workers.add(worker); } self.assert_progress(workers.len(), &state)?; - let (result, _idx, remaining) = select_all(workers).await; - workers = remaining; + let result = workers.select().await; // Analyze `result` and take another trip through the loop if // everything is ok; wait for pending workers and return if // there was an error or if copying was cancelled. + use WorkerResult as W; match result { - Err(e) => { + W::Err(e) => { // This is a panic in the background task. We need to // cancel all other tasks and return the error self.cancel_workers(progress, workers).await; return Err(e); } - Ok(worker) => { + W::Ok(worker) => { // Put the connection back into self.conn so that we can use it // in the next iteration. self.conn = Some(worker.conn); @@ -1090,6 +1151,10 @@ impl Connection { } } } + W::Wake => { + // nothing to do, just try to create more workers by + // going through the loop again + } }; } debug_assert!(self.conn.is_some());