Skip to content

Commit

Permalink
Batch delete and bump pool (#8285)
Browse files Browse the repository at this point in the history
* batch delete

* bump pool to 128 :o
  • Loading branch information
spolu authored Oct 29, 2024
1 parent 6f2a657 commit 6ab5fa6
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions core/src/databases_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct PostgresDatabasesStore {
impl PostgresDatabasesStore {
pub async fn new(db_uri: &str) -> Result<Self> {
let manager = PostgresConnectionManager::new_from_stringlike(db_uri, NoTls)?;
let pool = Pool::builder().max_size(16).build(manager).await?;
let pool = Pool::builder().max_size(128).build(manager).await?;
Ok(Self { pool })
}
}
Expand Down Expand Up @@ -152,15 +152,28 @@ impl DatabasesStore for PostgresDatabasesStore {
let pool = self.pool.clone();
let c = pool.get().await?;

// Truncate table if required.
// Truncate table if required. Rows can be numerous so we delete rows in small batches to
// avoid long running operations.
if truncate {
let deletion_batch_size: u64 = 512;

let stmt = c
.prepare(
"DELETE FROM tables_rows
WHERE table_id = $1",
"DELETE FROM table_rows WHERE id IN (
SELECT id FROM table_rows WHERE table_id = $1 LIMIT $2
)",
)
.await?;
c.execute(&stmt, &[&table_id]).await?;

loop {
let deleted_rows = c
.execute(&stmt, &[&table_id, &(deletion_batch_size as i64)])
.await?;

if deleted_rows < deletion_batch_size {
break;
}
}
}

let stmt = c
Expand Down Expand Up @@ -192,11 +205,25 @@ impl DatabasesStore for PostgresDatabasesStore {
let pool = self.pool.clone();
let c = pool.get().await?;

let deletion_batch_size: u64 = 512;

let stmt = c
.prepare("DELETE FROM tables_rows WHERE table_id = $1")
.prepare(
"DELETE FROM table_rows WHERE id IN (
SELECT id FROM table_rows WHERE table_id = $1 LIMIT $2
)",
)
.await?;

c.execute(&stmt, &[&table_id]).await?;
loop {
let deleted_rows = c
.execute(&stmt, &[&table_id, &(deletion_batch_size as i64)])
.await?;

if deleted_rows < deletion_batch_size {
break;
}
}

Ok(())
}
Expand Down

0 comments on commit 6ab5fa6

Please sign in to comment.