From 8d4b4ffbec05ac70e7971d2245b83a3ac645c590 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Tue, 29 Oct 2024 00:26:59 +0100 Subject: [PATCH] batch upsert to databases_store (#8283) --- core/src/databases_store/store.rs | 36 +++++++++++++------------------ 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/core/src/databases_store/store.rs b/core/src/databases_store/store.rs index 1d8d309414b2..b9b244d8642b 100644 --- a/core/src/databases_store/store.rs +++ b/core/src/databases_store/store.rs @@ -150,9 +150,7 @@ impl DatabasesStore for PostgresDatabasesStore { truncate: bool, ) -> Result<()> { let pool = self.pool.clone(); - let mut c = pool.get().await?; - // Start transaction. - let c = c.transaction().await?; + let c = pool.get().await?; // Truncate table if required. if truncate { @@ -165,31 +163,27 @@ impl DatabasesStore for PostgresDatabasesStore { c.execute(&stmt, &[&table_id]).await?; } - // Prepare insertion/updation statement. let stmt = c .prepare( "INSERT INTO tables_rows - (id, table_id, row_id, created, content) - VALUES (DEFAULT, $1, $2, $3, $4) - ON CONFLICT (table_id, row_id) DO UPDATE - SET content = EXCLUDED.content", + (table_id, row_id, created, content) + SELECT * FROM UNNEST($1::text[], $2::text[], $3::bigint[], $4::text[]) + ON CONFLICT (table_id, row_id) DO UPDATE + SET content = EXCLUDED.content", ) .await?; - for row in rows { - c.execute( - &stmt, - &[ - &table_id, - &row.row_id(), - &(utils::now() as i64), - &row.content().to_string(), - ], - ) - .await?; - } + for chunk in rows.chunks(1024) { + let now = utils::now() as i64; - c.commit().await?; + let table_ids: Vec<&str> = vec![table_id; chunk.len()]; + let row_ids: Vec<&str> = chunk.iter().map(|r| r.row_id()).collect(); + let createds: Vec = vec![now; chunk.len()]; + let contents: Vec = chunk.iter().map(|r| r.content().to_string()).collect(); + + c.execute(&stmt, &[&table_ids, &row_ids, &createds, &contents]) + .await?; + } Ok(()) }