Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions graph/src/env/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ pub struct EnvVarsStore {
/// least 1
pub batch_workers: usize,

/// How long to wait to get an additional connection for a batch worker.
/// This should just be big enough to allow the connection pool to
/// establish a connection. Set by `GRAPH_STORE_BATCH_WORKER_WAIT`.
/// Value is in ms and defaults to 2000ms
pub batch_worker_wait: Duration,

/// Prune tables where we will remove at least this fraction of entity
/// versions by rebuilding the table. Set by
/// `GRAPH_STORE_HISTORY_REBUILD_THRESHOLD`. The default is 0.5
Expand Down Expand Up @@ -182,6 +188,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs),
batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs),
batch_workers: x.batch_workers,
batch_worker_wait: Duration::from_millis(x.batch_worker_wait),
rebuild_threshold: x.rebuild_threshold.0,
delete_threshold: x.delete_threshold.0,
history_slack_factor: x.history_slack_factor.0,
Expand Down Expand Up @@ -251,6 +258,8 @@ pub struct InnerStore {
batch_timeout_in_secs: Option<u64>,
#[envconfig(from = "GRAPH_STORE_BATCH_WORKERS", default = "1")]
batch_workers: usize,
#[envconfig(from = "GRAPH_STORE_BATCH_WORKER_WAIT", default = "2000")]
batch_worker_wait: u64,
#[envconfig(from = "GRAPH_STORE_HISTORY_REBUILD_THRESHOLD", default = "0.5")]
rebuild_threshold: ZeroToOneF64,
#[envconfig(from = "GRAPH_STORE_HISTORY_DELETE_THRESHOLD", default = "0.05")]
Expand Down
17 changes: 14 additions & 3 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,9 @@ impl ConnectionPool {
pub fn try_get_fdw(
&self,
logger: &Logger,
timeout: Duration,
) -> Result<Option<PooledConnection<ConnectionManager<PgConnection>>>, StoreError> {
self.get_ready()?.try_get_fdw(logger)
self.get_ready()?.try_get_fdw(logger, timeout)
}

pub fn connection_detail(&self) -> Result<ForeignServer, StoreError> {
Expand Down Expand Up @@ -1034,12 +1035,22 @@ impl PoolInner {
}
}

/// Get a connection from the fdw pool if one is available
/// Get a connection from the fdw pool if one is available. We wait for
/// `timeout` for a connection which should be set just big enough to
/// allow establishing a connection
pub fn try_get_fdw(
&self,
logger: &Logger,
timeout: Duration,
) -> Result<Option<PooledConnection<ConnectionManager<PgConnection>>>, StoreError> {
Ok(self.fdw_pool(logger)?.try_get())
// Any error trying to get a connection is treated as "couldn't get
// a connection in time". If there is a serious error with the
// database, e.g., because it's not available, the next database
// operation will run into it and report it.
self.fdw_pool(logger)?
.get_timeout(timeout)
.map(|conn| Some(conn))
.or_else(|_| Ok(None))
}

pub fn connection_detail(&self) -> Result<ForeignServer, StoreError> {
Expand Down
11 changes: 8 additions & 3 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ impl CopyState {
target_block: BlockPtr,
) -> Result<CopyState, StoreError> {
let tables = TableState::load(conn, src.as_ref(), dst.as_ref())?;
let (finished, unfinished) = tables.into_iter().partition(|table| table.finished());
let (finished, mut unfinished): (Vec<_>, Vec<_>) =
tables.into_iter().partition(|table| table.finished());
unfinished.sort_by_key(|table| table.dst.object.to_string());
Ok(CopyState {
src,
dst,
Expand Down Expand Up @@ -694,6 +696,7 @@ impl CopyTableWorker {
use Status::*;

let conn = &mut self.conn;
progress.start_table(&self.table);
while !self.table.finished() {
// It is important that this check happens outside the write
// transaction so that we do not hold on to locks acquired
Expand All @@ -720,7 +723,6 @@ impl CopyTableWorker {
}
}

progress.start_table(&self.table);
let status = {
loop {
if progress.is_cancelled() {
Expand Down Expand Up @@ -914,7 +916,10 @@ impl Connection {
) -> Result<Option<Pin<Box<dyn Future<Output = CopyTableWorker>>>>, StoreError> {
// It's important that we get the connection before the table since
// we remove the table from the state and could drop it otherwise
let Some(conn) = self.pool.try_get_fdw(&self.logger)? else {
let Some(conn) = self
.pool
.try_get_fdw(&self.logger, ENV_VARS.store.batch_worker_wait)?
else {
return Ok(None);
};
let Some(table) = state.unfinished.pop() else {
Expand Down
Loading