Timeout copy batches that take too long and copy tables in parallel#5918
Timeout copy batches that take too long and copy tables in parallel#5918
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR implements a mechanism to handle long-running copy batch operations by timing out queries and retrying with a smaller batch size if necessary. Key changes include:
- Adding a setter method for batch size in VidBatcher.
- Wrapping batch copy transactions in a loop that sets a local statement timeout and resets the batch size on a timeout.
- Introducing a new environment variable and error variant (StatementTimeout) to support batch timeout functionality.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| store/postgres/src/vid_batcher.rs | Added a new method to adjust the batch size dynamically. |
| store/postgres/src/copy.rs | Implemented timeout checking for copy batches with retry logic and a timeout. |
| graph/src/env/store.rs | Updated EnvVarsStore to include batch_timeout with a constraint check. |
| graph/src/env/mod.rs | Modified from_env to use try_into for proper error handling. |
| graph/src/components/store/err.rs | Added a new StatementTimeout error variant and refactored error conversion logic. |
| docs/environment-variables.md | Updated documentation to describe the new GRAPH_STORE_BATCH_TIMEOUT variable. |
store/postgres/src/copy.rs
Outdated
|
|
||
| let status = self.transaction(|conn| table.copy_batch(conn))?; | ||
| let status = { |
There was a problem hiding this comment.
Consider introducing a maximum retry limit in this loop to avoid potential infinite retries in the event of persistent statement timeouts.
c456c0a to
417ee08
Compare
graph/src/env/store.rs
Outdated
| pub batch_timeout: Option<Duration>, | ||
|
|
||
| /// The number of workers to use for batch operations. If there are idle | ||
| /// connectiosn, each subgraph copy operation will use up to this many |
There was a problem hiding this comment.
| /// connectiosn, each subgraph copy operation will use up to this many | |
| /// connections, each subgraph copy operation will use up to this many |
| if let Some(worker) = self.default_worker(&mut state, &progress)? { | ||
| workers.push(worker); | ||
| } | ||
| loop { |
There was a problem hiding this comment.
Probably question of taste but I would move the loop inside the above if. Current way is correct too.
There was a problem hiding this comment.
That wouldn't be correct - we can only call self.default_worker when self.conn.is_some(), i.e., once per while loop. The loop { .. } is about trying to get more workers than just the one we always have.
There was a problem hiding this comment.
Just noticed that I misread your comment - yes, putting the loop inside the if would also have been possible, but as you said, more a matter of taste
zorancv
left a comment
There was a problem hiding this comment.
Hope it helps the copies.
a39eaf1 to
b3543bb
Compare
|
Messed up my git commands, this was merged at b3543bb |
Our estimation of batch sizes is generally good and stays within the prescribed bounds, but there are cases where proper estimation of the batch size is nearly impossible since the size of the rows in the table jumps sharply at some point that is hard to predict. This mechanism ensures that if our estimation is wrong, the consequences aren't too severe.
That's what the first three commits do; the rest of this PR changes how we copy so that we can copy the tables for a deployment in parallel. The copying parallelizes opportunistically, i.e., it will always copy at least one table, and more if there are database connections available and the configuration allows copying more than one table