Skip to content

Commit 3369381

Browse files
committed
graph, store: Wait for a little bit for an addl fdw connection
If we don't wait at all, we can use fewer connections than are available since the pool might be below its capacity but has no idle connections open currently
1 parent 5d75dc6 commit 3369381

File tree

3 files changed

+27
-4
lines changed

3 files changed

+27
-4
lines changed

graph/src/env/store.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ pub struct EnvVarsStore {
9191
/// least 1
9292
pub batch_workers: usize,
9393

94+
/// How long to wait to get an additional connection for a batch worker.
95+
/// This should just be big enough to allow the connection pool to
96+
/// establish a connection. Set by `GRAPH_STORE_BATCH_WORKER_WAIT`.
97+
/// Value is in ms and defaults to 2000ms
98+
pub batch_worker_wait: Duration,
99+
94100
/// Prune tables where we will remove at least this fraction of entity
95101
/// versions by rebuilding the table. Set by
96102
/// `GRAPH_STORE_HISTORY_REBUILD_THRESHOLD`. The default is 0.5
@@ -182,6 +188,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
182188
batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs),
183189
batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs),
184190
batch_workers: x.batch_workers,
191+
batch_worker_wait: Duration::from_millis(x.batch_worker_wait),
185192
rebuild_threshold: x.rebuild_threshold.0,
186193
delete_threshold: x.delete_threshold.0,
187194
history_slack_factor: x.history_slack_factor.0,
@@ -251,6 +258,8 @@ pub struct InnerStore {
251258
batch_timeout_in_secs: Option<u64>,
252259
#[envconfig(from = "GRAPH_STORE_BATCH_WORKERS", default = "1")]
253260
batch_workers: usize,
261+
#[envconfig(from = "GRAPH_STORE_BATCH_WORKER_WAIT", default = "2000")]
262+
batch_worker_wait: u64,
254263
#[envconfig(from = "GRAPH_STORE_HISTORY_REBUILD_THRESHOLD", default = "0.5")]
255264
rebuild_threshold: ZeroToOneF64,
256265
#[envconfig(from = "GRAPH_STORE_HISTORY_DELETE_THRESHOLD", default = "0.05")]

store/postgres/src/connection_pool.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,9 @@ impl ConnectionPool {
529529
pub fn try_get_fdw(
530530
&self,
531531
logger: &Logger,
532+
timeout: Duration,
532533
) -> Result<Option<PooledConnection<ConnectionManager<PgConnection>>>, StoreError> {
533-
self.get_ready()?.try_get_fdw(logger)
534+
self.get_ready()?.try_get_fdw(logger, timeout)
534535
}
535536

536537
pub fn connection_detail(&self) -> Result<ForeignServer, StoreError> {
@@ -1034,12 +1035,22 @@ impl PoolInner {
10341035
}
10351036
}
10361037

1037-
/// Get a connection from the fdw pool if one is available
1038+
/// Get a connection from the fdw pool if one is available. We wait for
1039+
/// `timeout` for a connection which should be set just big enough to
1040+
/// allow establishing a connection
10381041
pub fn try_get_fdw(
10391042
&self,
10401043
logger: &Logger,
1044+
timeout: Duration,
10411045
) -> Result<Option<PooledConnection<ConnectionManager<PgConnection>>>, StoreError> {
1042-
Ok(self.fdw_pool(logger)?.try_get())
1046+
// Any error trying to get a connection is treated as "couldn't get
1047+
// a connection in time". If there is a serious error with the
1048+
// database, e.g., because it's not available, the next database
1049+
// operation will run into it and report it.
1050+
self.fdw_pool(logger)?
1051+
.get_timeout(timeout)
1052+
.map(|conn| Some(conn))
1053+
.or_else(|_| Ok(None))
10431054
}
10441055

10451056
pub fn connection_detail(&self) -> Result<ForeignServer, StoreError> {

store/postgres/src/copy.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -914,7 +914,10 @@ impl Connection {
914914
) -> Result<Option<Pin<Box<dyn Future<Output = CopyTableWorker>>>>, StoreError> {
915915
// It's important that we get the connection before the table since
916916
// we remove the table from the state and could drop it otherwise
917-
let Some(conn) = self.pool.try_get_fdw(&self.logger)? else {
917+
let Some(conn) = self
918+
.pool
919+
.try_get_fdw(&self.logger, ENV_VARS.store.batch_worker_wait)?
920+
else {
918921
return Ok(None);
919922
};
920923
let Some(table) = state.unfinished.pop() else {

0 commit comments

Comments
 (0)