diff --git a/core/Cargo.toml b/core/Cargo.toml index 293a07d6a4ff..941b6424a689 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,10 +35,6 @@ path = "bin/oauth.rs" name = "oauth_generate_key" path = "bin/oauth_generate_key.rs" -[[bin]] -name = "migration_clean_legacy_gcs" -path = "bin/migrations/20241024_clean_legacy_gcs.rs" - [[test]] name = "oauth_connections_test" path = "src/oauth/tests/functional_connections.rs" diff --git a/core/bin/migrations/20241025_scrub_old_superseded_verions.rs b/core/bin/migrations/20241025_scrub_old_superseded_verions.rs new file mode 100644 index 000000000000..861f98f99754 --- /dev/null +++ b/core/bin/migrations/20241025_scrub_old_superseded_verions.rs @@ -0,0 +1,206 @@ +use anyhow::{anyhow, Context, Error, Result}; +use dust::data_sources::data_source::{DataSource, DocumentVersion}; +use dust::stores::{postgres, store}; +use tokio_postgres::Row; + +use bb8::Pool; +use bb8_postgres::PostgresConnectionManager; +use futures::prelude::*; +use futures::{StreamExt, TryStreamExt}; +use std::time::Duration; +use tokio_postgres::NoTls; +use tokio_stream::{self as stream}; + +pub async fn with_retryable_back_off( + mut f: impl FnMut() -> F, + log_retry: impl Fn(&Error, &Duration, usize) -> (), + log_error: impl Fn(&Error) -> (), +) -> Result +where + F: Future>, +{ + let factor = 2; + let retries = 3; + let mut sleep = Duration::from_millis(500); + let mut attempts = 0_usize; + let out = loop { + match f().await { + Err(err) => { + log_error(&err); + attempts += 1; + log_retry(&err, &sleep, attempts); + tokio::time::sleep(sleep).await; + sleep *= factor; + if attempts > retries { + break Err(anyhow!("Too many retries ({}): {}", retries, err)); + } + } + Ok(out) => break Ok(out), + } + }; + out +} + +async fn fetch_data_sources_documents_batch( + pool: &Pool>, + data_source_id: i64, + last_id: u64, + limit: usize, +) -> Result, anyhow::Error> { + let c = pool.get().await?; + + c.query( + "SELECT id, document_id FROM data_sources_documents WHERE data_source = $1 AND status='latest' AND id > $2 ORDER BY id ASC LIMIT $3", + &[&data_source_id, &(last_id as i64), &(limit as i64)], + ) + .await + .context("fetch_data_sources_documents") +} + +async fn scrub_wrapper( + store: Box, + data_source: &DataSource, + document_id: &str, +) -> Result> { + data_source + .scrub_document_superseded_versions(store, document_id) + .await +} + +async fn scrub_superseded_versions_for_data_source( + store: Box, + data_source_internal_id: &str, + data_source_id: i64, +) -> Result<()> { + let data_source = match store + .load_data_source_by_internal_id(&data_source_internal_id) + .await? + { + Some(ds) => ds, + None => Err(anyhow!("Data source not found"))?, + }; + + let pool = store.raw_pool(); + + let limit: usize = 1024; + let mut last_data_source_document_id = 0; + let mut iteration = 0; + + loop { + let rows = fetch_data_sources_documents_batch( + pool, + data_source_id, + last_data_source_document_id, + limit, + ) + .await?; + + stream::iter( + rows.iter() + .map(|row| { + let document_id: String = row.get(1); + (store.clone(), document_id, data_source.clone()) + }) + .map(|(store, document_id, data_source)| async move { + let v = with_retryable_back_off( + || scrub_wrapper(store.clone(), &data_source, &document_id), + |err, sleep, attempts| { + println!( + "Retrying scrub: err_msg={}, sleep={:?}, attempts={}", + err, sleep, attempts + ); + }, + |err| { + println!( + "Error scrubbing: data_source_id={}, err_msg={}", + data_source_id, err + ); + }, + ) + .await?; + if v.len() > 0 { + println!( + "Scrubbed document: data_source_id={} document_id={} scrubbed={}", + data_source_id, + document_id, + v.len() + ); + } + Ok::<(), anyhow::Error>(()) + }), + ) + .buffer_unordered(8) + .try_collect::>() + .await?; + + if rows.len() < limit { + println!("Scrub loop done: data_source_id={}", data_source_id); + break; + } + + last_data_source_document_id = match rows.last() { + Some(r) => { + let id: i64 = r.get(0); + println!( + "Scrub loop: data_source_id={} iteration={}, last_data_source_document_id={}", + data_source_id, iteration, id + ); + + id as u64 + } + None => { + println!( + "Scrub loop done: data_source_id={} iteration={}", + data_source_id, iteration + ); + break; + } + }; + + iteration += 1; + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let store: Box = match std::env::var("CORE_DATABASE_URI") { + Ok(db_uri) => { + let store = postgres::PostgresStore::new(&db_uri).await?; + store.init().await?; + Box::new(store) + } + Err(_) => Err(anyhow!("CORE_DATABASE_URI is required (postgres)"))?, + }; + + let pool = store.raw_pool(); + + let c = pool.get().await?; + + let rows = c + .query("SELECT id, internal_id FROM data_sources ORDER BY id", &[]) + .await + .context("fetch_data_sources")?; + + stream::iter(rows.iter().map(|row| { + let store = store.clone(); + + async move { + let data_source_id: i64 = row.get(0); + let data_source_internal_id: String = row.get(1); + + scrub_superseded_versions_for_data_source( + store, + &data_source_internal_id, + data_source_id, + ) + .await + } + })) + .buffer_unordered(8) + .try_collect::>() + .await?; + + Ok(()) +} diff --git a/core/src/data_sources/data_source.rs b/core/src/data_sources/data_source.rs index c67e7f775d0d..1c855e7198a2 100644 --- a/core/src/data_sources/data_source.rs +++ b/core/src/data_sources/data_source.rs @@ -227,6 +227,16 @@ impl FromStr for DocumentStatus { } } +impl ToString for DocumentStatus { + fn to_string(&self) -> String { + match self { + DocumentStatus::Latest => "latest".to_string(), + DocumentStatus::Superseded => "superseded".to_string(), + DocumentStatus::Deleted => "deleted".to_string(), + } + } +} + #[derive(Debug, Serialize, Clone)] pub struct DocumentVersion { pub created: u64, @@ -699,7 +709,7 @@ impl DataSource { .await?; } - // Upsert document (SQL) + // Upsert document (SQL). store .upsert_data_source_document( &self.project, @@ -708,6 +718,10 @@ impl DataSource { ) .await?; + // Clean-up old superseded versions. + self.scrub_document_superseded_versions(store, &document_id) + .await?; + Ok(main_collection_document) } @@ -1769,7 +1783,7 @@ impl DataSource { .filter(|v| v.status == DocumentStatus::Deleted) .collect::>(); - let mut scrubbed_hashes: Vec = vec![]; + let mut scrubbed_versions: Vec = vec![]; for v in versions { let document_id_hash = make_document_id_hash(document_id); @@ -1782,7 +1796,7 @@ impl DataSource { .await?; store - .scrub_data_source_document_version( + .delete_data_source_document_version( &self.project, &self.data_source_id, document_id, @@ -1798,10 +1812,77 @@ impl DataSource { "Scrubbed deleted document version" ); - scrubbed_hashes.push(v); + scrubbed_versions.push(v); } - Ok(scrubbed_hashes) + Ok(scrubbed_versions) + } + + pub async fn scrub_document_superseded_versions( + &self, + store: Box, + document_id: &str, + ) -> Result> { + let (versions, _) = store + .list_data_source_document_versions( + &self.project, + &self.data_source_id, + document_id, + None, + &None, + &None, + ) + .await?; + + // We scrub only superseded version keeping always the last one as well as the ones that + // have been created within the past 24h. Document versions are ordered by creation date + // (descending) but we resort here just to be safe in case the API of the store changes. + let now = utils::now(); + let scrubbed_versions = versions + .into_iter() + .sorted_by(|a, b| Ord::cmp(&b.created, &a.created)) + .filter(|v| v.status == DocumentStatus::Superseded) + .skip(1) + .filter(|v| now - v.created > 24 * 60 * 60 * 1000) + .collect::>(); + + for v in scrubbed_versions.iter() { + let document_id_hash = make_document_id_hash(document_id); + + FileStorageDocument::scrub_document_version_from_file_storage( + &self, + document_id, + &document_id_hash, + v, + ) + .await?; + + store + .delete_data_source_document_version( + &self.project, + &self.data_source_id, + document_id, + v, + ) + .await?; + + info!( + data_source_internal_id = self.internal_id, + document_id = document_id, + version_created = v.created, + version_hash = v.hash, + "Scrubbed superseded document version" + ); + } + + info!( + data_source_internal_id = self.internal_id, + document_id = document_id, + scrubbed_version_count = scrubbed_versions.len(), + "Scrubbed superseded document versions" + ); + + Ok(scrubbed_versions) } pub async fn delete( diff --git a/core/src/data_sources/file_storage_document.rs b/core/src/data_sources/file_storage_document.rs index 35dca04fecd5..012b2d4e22d9 100644 --- a/core/src/data_sources/file_storage_document.rs +++ b/core/src/data_sources/file_storage_document.rs @@ -61,10 +61,7 @@ impl FileStorageDocument { let bucket = FileStorageDocument::get_bucket().await?; match Object::delete(&bucket, &path).await { - Ok(_) => { - // println!("Deleted: path={}", path); - Ok(true) - } + Ok(_) => Ok(true), Err(e) => match e { cloud_storage::Error::Google(GoogleErrorResponse { error: ErrorList { code: 404, .. }, diff --git a/core/src/stores/postgres.rs b/core/src/stores/postgres.rs index 5c75a2c469ad..997792339e8b 100644 --- a/core/src/stores/postgres.rs +++ b/core/src/stores/postgres.rs @@ -1938,7 +1938,7 @@ impl Store for PostgresStore { Ok(()) } - async fn scrub_data_source_document_version( + async fn delete_data_source_document_version( &self, project: &Project, data_source_id: &str, @@ -1950,6 +1950,7 @@ impl Store for PostgresStore { let document_id = document_id.to_string(); let created = version.created as i64; let hash = version.hash.clone(); + let status = version.status.to_string(); let pool = self.pool.clone(); let c = pool.get().await?; @@ -1971,11 +1972,14 @@ impl Store for PostgresStore { .prepare( "DELETE FROM data_sources_documents \ WHERE data_source = $1 AND document_id = $2 \ - AND created = $3 AND hash = $4 AND status='deleted'", + AND created = $3 AND hash = $4 AND status=$5", ) .await?; let _ = c - .query(&stmt, &[&data_source_row_id, &document_id, &created, &hash]) + .query( + &stmt, + &[&data_source_row_id, &document_id, &created, &hash, &status], + ) .await?; Ok(()) diff --git a/core/src/stores/store.rs b/core/src/stores/store.rs index 912eeaad963a..0ed5bcd5048c 100644 --- a/core/src/stores/store.rs +++ b/core/src/stores/store.rs @@ -179,7 +179,7 @@ pub trait Store { data_source_id: &str, document_id: &str, ) -> Result<()>; - async fn scrub_data_source_document_version( + async fn delete_data_source_document_version( &self, project: &Project, data_source_id: &str,