diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index ded0be9f144..661d0356446 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -91,6 +91,12 @@ pub struct EnvVarsStore { /// least 1 pub batch_workers: usize, + /// How long to wait to get an additional connection for a batch worker. + /// This should just be big enough to allow the connection pool to + /// establish a connection. Set by `GRAPH_STORE_BATCH_WORKER_WAIT`. + /// Value is in ms and defaults to 2000ms + pub batch_worker_wait: Duration, + /// Prune tables where we will remove at least this fraction of entity /// versions by rebuilding the table. Set by /// `GRAPH_STORE_HISTORY_REBUILD_THRESHOLD`. The default is 0.5 @@ -182,6 +188,7 @@ impl TryFrom for EnvVarsStore { batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs), batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs), batch_workers: x.batch_workers, + batch_worker_wait: Duration::from_millis(x.batch_worker_wait), rebuild_threshold: x.rebuild_threshold.0, delete_threshold: x.delete_threshold.0, history_slack_factor: x.history_slack_factor.0, @@ -251,6 +258,8 @@ pub struct InnerStore { batch_timeout_in_secs: Option, #[envconfig(from = "GRAPH_STORE_BATCH_WORKERS", default = "1")] batch_workers: usize, + #[envconfig(from = "GRAPH_STORE_BATCH_WORKER_WAIT", default = "2000")] + batch_worker_wait: u64, #[envconfig(from = "GRAPH_STORE_HISTORY_REBUILD_THRESHOLD", default = "0.5")] rebuild_threshold: ZeroToOneF64, #[envconfig(from = "GRAPH_STORE_HISTORY_DELETE_THRESHOLD", default = "0.05")] diff --git a/store/postgres/src/connection_pool.rs b/store/postgres/src/connection_pool.rs index f710fd2316d..782c2a57489 100644 --- a/store/postgres/src/connection_pool.rs +++ b/store/postgres/src/connection_pool.rs @@ -529,8 +529,9 @@ impl ConnectionPool { pub fn try_get_fdw( &self, logger: &Logger, + timeout: Duration, ) -> Result>>, StoreError> { - self.get_ready()?.try_get_fdw(logger) + self.get_ready()?.try_get_fdw(logger, timeout) } pub fn connection_detail(&self) -> Result { @@ -1034,12 +1035,22 @@ impl PoolInner { } } - /// Get a connection from the fdw pool if one is available + /// Get a connection from the fdw pool if one is available. We wait for + /// `timeout` for a connection which should be set just big enough to + /// allow establishing a connection pub fn try_get_fdw( &self, logger: &Logger, + timeout: Duration, ) -> Result>>, StoreError> { - Ok(self.fdw_pool(logger)?.try_get()) + // Any error trying to get a connection is treated as "couldn't get + // a connection in time". If there is a serious error with the + // database, e.g., because it's not available, the next database + // operation will run into it and report it. + self.fdw_pool(logger)? + .get_timeout(timeout) + .map(|conn| Some(conn)) + .or_else(|_| Ok(None)) } pub fn connection_detail(&self) -> Result { diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 4dbc312bc9d..25792bba04e 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -195,7 +195,9 @@ impl CopyState { target_block: BlockPtr, ) -> Result { let tables = TableState::load(conn, src.as_ref(), dst.as_ref())?; - let (finished, unfinished) = tables.into_iter().partition(|table| table.finished()); + let (finished, mut unfinished): (Vec<_>, Vec<_>) = + tables.into_iter().partition(|table| table.finished()); + unfinished.sort_by_key(|table| table.dst.object.to_string()); Ok(CopyState { src, dst, @@ -694,6 +696,7 @@ impl CopyTableWorker { use Status::*; let conn = &mut self.conn; + progress.start_table(&self.table); while !self.table.finished() { // It is important that this check happens outside the write // transaction so that we do not hold on to locks acquired @@ -720,7 +723,6 @@ impl CopyTableWorker { } } - progress.start_table(&self.table); let status = { loop { if progress.is_cancelled() { @@ -914,7 +916,10 @@ impl Connection { ) -> Result>>>, StoreError> { // It's important that we get the connection before the table since // we remove the table from the state and could drop it otherwise - let Some(conn) = self.pool.try_get_fdw(&self.logger)? else { + let Some(conn) = self + .pool + .try_get_fdw(&self.logger, ENV_VARS.store.batch_worker_wait)? + else { return Ok(None); }; let Some(table) = state.unfinished.pop() else {