diff --git a/Cargo.lock b/Cargo.lock index 1d9cd243c..45f665cdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1573,14 +1573,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crond" -version = "0.7.19" -dependencies = [ - "revolt-database", - "revolt-files", -] - [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -2459,9 +2451,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -2490,9 +2482,9 @@ checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -5670,6 +5662,18 @@ dependencies = [ "serde", ] +[[package]] +name = "revolt-crond" +version = "0.7.19" +dependencies = [ + "log", + "revolt-config", + "revolt-database", + "revolt-files", + "revolt-result", + "tokio 1.35.1", +] + [[package]] name = "revolt-database" version = "0.7.19" diff --git a/crates/core/config/Revolt.toml b/crates/core/config/Revolt.toml index 359c0e998..1028a7e83 100644 --- a/crates/core/config/Revolt.toml +++ b/crates/core/config/Revolt.toml @@ -234,3 +234,4 @@ api = "" events = "" files = "" proxy = "" +crond = "" diff --git a/crates/core/config/src/lib.rs b/crates/core/config/src/lib.rs index e0f57d17b..2f55441e7 100644 --- a/crates/core/config/src/lib.rs +++ b/crates/core/config/src/lib.rs @@ -245,6 +245,7 @@ pub struct Sentry { pub events: String, pub files: String, pub proxy: String, + pub crond: String, } #[derive(Deserialize, Debug, Clone)] diff --git a/crates/core/database/src/models/file_hashes/ops.rs b/crates/core/database/src/models/file_hashes/ops.rs index e7f626584..648a1a2f6 100644 --- a/crates/core/database/src/models/file_hashes/ops.rs +++ b/crates/core/database/src/models/file_hashes/ops.rs @@ -15,4 +15,7 @@ pub trait AbstractAttachmentHashes: Sync + Send { /// Update an attachment hash nonce value. async fn set_attachment_hash_nonce(&self, hash: &str, nonce: &str) -> Result<()>; + + /// Delete attachment hash by id. + async fn delete_attachment_hash(&self, id: &str) -> Result<()>; } diff --git a/crates/core/database/src/models/file_hashes/ops/mongodb.rs b/crates/core/database/src/models/file_hashes/ops/mongodb.rs index c55a06104..153cb72f1 100644 --- a/crates/core/database/src/models/file_hashes/ops/mongodb.rs +++ b/crates/core/database/src/models/file_hashes/ops/mongodb.rs @@ -48,4 +48,9 @@ impl AbstractAttachmentHashes for MongoDb { .map(|_| ()) .map_err(|_| create_database_error!("update_one", COL)) } + + /// Delete attachment hash by id. + async fn delete_attachment_hash(&self, id: &str) -> Result<()> { + query!(self, delete_one_by_id, COL, id).map(|_| ()) + } } diff --git a/crates/core/database/src/models/file_hashes/ops/reference.rs b/crates/core/database/src/models/file_hashes/ops/reference.rs index 86bbeee8b..7733523e3 100644 --- a/crates/core/database/src/models/file_hashes/ops/reference.rs +++ b/crates/core/database/src/models/file_hashes/ops/reference.rs @@ -23,8 +23,8 @@ impl AbstractAttachmentHashes for ReferenceDb { let hashes = self.file_hashes.lock().await; hashes .values() + .find(|&hash| hash.id == hash_value || hash.processed_hash == hash_value) .cloned() - .find(|hash| hash.id == hash_value || hash.processed_hash == hash_value) .ok_or(create_error!(NotFound)) } @@ -38,4 +38,14 @@ impl AbstractAttachmentHashes for ReferenceDb { Err(create_error!(NotFound)) } } + + /// Delete attachment hash by id. + async fn delete_attachment_hash(&self, id: &str) -> Result<()> { + let mut file_hashes = self.file_hashes.lock().await; + if file_hashes.remove(id).is_some() { + Ok(()) + } else { + Err(create_error!(NotFound)) + } + } } diff --git a/crates/core/database/src/models/files/ops.rs b/crates/core/database/src/models/files/ops.rs index 58d599d35..b30d3acc8 100644 --- a/crates/core/database/src/models/files/ops.rs +++ b/crates/core/database/src/models/files/ops.rs @@ -15,6 +15,15 @@ pub trait AbstractAttachments: Sync + Send { /// Fetch an attachment by its id. async fn fetch_attachment(&self, tag: &str, file_id: &str) -> Result; + /// Fetch all deleted attachments. + async fn fetch_deleted_attachments(&self) -> Result>; + + /// Fetch all dangling attachments. + async fn fetch_dangling_files(&self) -> Result>; + + /// Count references to a given hash. + async fn count_file_hash_references(&self, hash: &str) -> Result; + /// Find an attachment by its details and mark it as used by a given parent. async fn find_and_use_attachment( &self, @@ -32,4 +41,7 @@ pub trait AbstractAttachments: Sync + Send { /// Mark multiple attachments as having been deleted. async fn mark_attachments_as_deleted(&self, ids: &[String]) -> Result<()>; + + /// Delete the attachment entry. + async fn delete_attachment(&self, id: &str) -> Result<()>; } diff --git a/crates/core/database/src/models/files/ops/mongodb.rs b/crates/core/database/src/models/files/ops/mongodb.rs index c2295cf3d..43e8e946d 100644 --- a/crates/core/database/src/models/files/ops/mongodb.rs +++ b/crates/core/database/src/models/files/ops/mongodb.rs @@ -32,6 +32,51 @@ impl AbstractAttachments for MongoDb { .ok_or_else(|| create_error!(NotFound)) } + /// Fetch all deleted attachments. + async fn fetch_deleted_attachments(&self) -> Result> { + query!( + self, + find, + COL, + doc! { + "deleted": true, + "reported": { + "$ne": true + } + } + ) + } + + /// Fetch all dangling attachments. + async fn fetch_dangling_files(&self) -> Result> { + query!( + self, + find, + COL, + doc! { + "used_for.type": { + "$exists": 0 + }, + "deleted": { + "$ne": true + } + } + ) + } + + /// Count references to a given hash. + async fn count_file_hash_references(&self, hash: &str) -> Result { + query!( + self, + count_documents, + COL, + doc! { + "hash": hash + } + ) + .map(|count| count as usize) + } + /// Find an attachment by its details and mark it as used by a given parent. async fn find_and_use_attachment( &self, @@ -114,7 +159,7 @@ impl AbstractAttachments for MongoDb { /// Mark multiple attachments as having been deleted. async fn mark_attachments_as_deleted(&self, ids: &[String]) -> Result<()> { self.col::(COL) - .update_one( + .update_many( doc! { "_id": { "$in": ids @@ -129,7 +174,12 @@ impl AbstractAttachments for MongoDb { ) .await .map(|_| ()) - .map_err(|_| create_database_error!("update_one", COL)) + .map_err(|_| create_database_error!("update_many", COL)) + } + + /// Delete the attachment entry. + async fn delete_attachment(&self, id: &str) -> Result<()> { + query!(self, delete_one_by_id, COL, id).map(|_| ()) } } diff --git a/crates/core/database/src/models/files/ops/reference.rs b/crates/core/database/src/models/files/ops/reference.rs index e18794272..208b0b0a9 100644 --- a/crates/core/database/src/models/files/ops/reference.rs +++ b/crates/core/database/src/models/files/ops/reference.rs @@ -33,6 +33,41 @@ impl AbstractAttachments for ReferenceDb { } } + /// Fetch all deleted attachments. + async fn fetch_deleted_attachments(&self) -> Result> { + let files = self.files.lock().await; + Ok(files + .values() + .filter(|file| { + // file has been marked as deleted + file.deleted.is_some_and(|v| v) + // and it has not been reported + && !file.reported.is_some_and(|v| v) + }) + .cloned() + .collect()) + } + + /// Fetch all dangling attachments. + async fn fetch_dangling_files(&self) -> Result> { + let files = self.files.lock().await; + Ok(files + .values() + .filter(|file| file.used_for.is_none() && !file.deleted.is_some_and(|v| v)) + .cloned() + .collect()) + } + + /// Count references to a given hash. + async fn count_file_hash_references(&self, hash: &str) -> Result { + let files = self.files.lock().await; + Ok(files + .values() + .filter(|file| file.hash.as_ref().is_some_and(|h| h == hash)) + .cloned() + .count()) + } + /// Find an attachment by its details and mark it as used by a given parent. async fn find_and_use_attachment( &self, @@ -96,4 +131,14 @@ impl AbstractAttachments for ReferenceDb { Ok(()) } + + /// Delete the attachment entry. + async fn delete_attachment(&self, id: &str) -> Result<()> { + let mut files = self.files.lock().await; + if files.remove(id).is_some() { + Ok(()) + } else { + Err(create_error!(NotFound)) + } + } } diff --git a/crates/core/files/src/lib.rs b/crates/core/files/src/lib.rs index c09853dd9..dacc988e5 100644 --- a/crates/core/files/src/lib.rs +++ b/crates/core/files/src/lib.rs @@ -113,6 +113,23 @@ pub async fn upload_to_s3(bucket_id: &str, path: &str, buf: &[u8]) -> Result Result<()> { + let config = config().await; + let client = create_client(config.files.s3); + + report_internal_error!( + client + .delete_object() + .bucket(bucket_id) + .key(path) + .send() + .await + )?; + + Ok(()) +} + /// Determine size of image at temp file pub fn image_size(f: &NamedTempFile) -> Option<(usize, usize)> { if let Ok(size) = imagesize::size(f.path()) diff --git a/crates/daemons/crond/Cargo.toml b/crates/daemons/crond/Cargo.toml index 85ec4349a..f2c4b94fe 100644 --- a/crates/daemons/crond/Cargo.toml +++ b/crates/daemons/crond/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "crond" +name = "revolt-crond" version = "0.7.19" license = "AGPL-3.0-or-later" authors = ["Paul Makles "] @@ -9,6 +9,14 @@ description = "Revolt Daemon Service: Timed data clean up tasks" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# Utility +log = "0.4" + +# Async +tokio = { version = "1" } + # Core revolt-database = { version = "0.7.19", path = "../../core/database" } +revolt-result = { version = "0.7.19", path = "../../core/result" } +revolt-config = { version = "0.7.19", path = "../../core/config" } revolt-files = { version = "0.7.19", path = "../../core/files" } diff --git a/crates/daemons/crond/src/main.rs b/crates/daemons/crond/src/main.rs index e7a11a969..40af6a2bd 100644 --- a/crates/daemons/crond/src/main.rs +++ b/crates/daemons/crond/src/main.rs @@ -1,3 +1,19 @@ -fn main() { - println!("Hello, world!"); +use revolt_config::configure; +use revolt_database::DatabaseInfo; +use revolt_result::Result; +use tasks::{file_deletion, prune_dangling_files}; +use tokio::try_join; + +pub mod tasks; + +#[tokio::main] +async fn main() -> Result<()> { + configure!(crond); + + let db = DatabaseInfo::Auto.connect().await.expect("database"); + try_join!( + file_deletion::task(db.clone()), + prune_dangling_files::task(db) + ) + .map(|_| ()) } diff --git a/crates/daemons/crond/src/tasks/file_deletion.rs b/crates/daemons/crond/src/tasks/file_deletion.rs new file mode 100644 index 000000000..912173b3f --- /dev/null +++ b/crates/daemons/crond/src/tasks/file_deletion.rs @@ -0,0 +1,39 @@ +use std::time::Duration; + +use log::info; +use revolt_database::Database; +use revolt_files::delete_from_s3; +use revolt_result::Result; +use tokio::time::sleep; + +pub async fn task(db: Database) -> Result<()> { + loop { + let files = db.fetch_deleted_attachments().await?; + + for file in files { + let count = db + .count_file_hash_references(file.hash.as_ref().expect("no `hash` present")) + .await?; + + // No other files reference this file on disk anymore + if count <= 1 { + let file_hash = db + .fetch_attachment_hash(file.hash.as_ref().expect("no `hash` present")) + .await?; + + // Delete from S3 + delete_from_s3(&file_hash.bucket_id, &file_hash.path).await?; + + // Delete the hash + db.delete_attachment_hash(&file_hash.id).await?; + info!("Deleted file hash {}", file_hash.id); + } + + // Delete the file + db.delete_attachment(&file.id).await?; + info!("Deleted file {}", file.id); + } + + sleep(Duration::from_secs(60)).await; + } +} diff --git a/crates/daemons/crond/src/tasks/mod.rs b/crates/daemons/crond/src/tasks/mod.rs new file mode 100644 index 000000000..2a668f14c --- /dev/null +++ b/crates/daemons/crond/src/tasks/mod.rs @@ -0,0 +1,2 @@ +pub mod file_deletion; +pub mod prune_dangling_files; diff --git a/crates/daemons/crond/src/tasks/prune_dangling_files.rs b/crates/daemons/crond/src/tasks/prune_dangling_files.rs new file mode 100644 index 000000000..be1eb6d6b --- /dev/null +++ b/crates/daemons/crond/src/tasks/prune_dangling_files.rs @@ -0,0 +1,36 @@ +use std::time::Duration; + +use revolt_database::{iso8601_timestamp::Timestamp, Database}; +use revolt_result::Result; +use tokio::time::sleep; + +use log::info; + +pub async fn task(db: Database) -> Result<()> { + loop { + // This could just be a single database query + // ... but timestamps are inconsistently serialised + // ... sometimes they are dates/numbers, hard to query + // ... in the future, we could use Postgres instead! :D + // ... + // ... on the plus side, it's still only 2 queries + + let files = db.fetch_dangling_files().await?; + let file_ids: Vec = files + .into_iter() + .filter(|file| { + file.uploaded_at.is_some_and(|uploaded_at| { + Timestamp::now_utc().duration_since(uploaded_at) > Duration::from_secs(60 * 60) + }) + }) + .map(|file| file.id) + .collect(); + + if !file_ids.is_empty() { + db.mark_attachments_as_deleted(&file_ids).await?; + info!("Marked {} dangling files for deletion", file_ids.len()); + } + + sleep(Duration::from_secs(60)).await; + } +}