From d69c5d423b9a41b064c60814b813caded5259f73 Mon Sep 17 00:00:00 2001 From: adz Date: Fri, 15 Nov 2024 17:51:48 +0100 Subject: [PATCH] Send events when watcher detected object --- rhio/src/blobs/watcher.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/rhio/src/blobs/watcher.rs b/rhio/src/blobs/watcher.rs index 4d9ea666..b616c5b6 100644 --- a/rhio/src/blobs/watcher.rs +++ b/rhio/src/blobs/watcher.rs @@ -8,9 +8,6 @@ use rhio_blobs::{ use tokio::sync::{broadcast, RwLock}; use tracing::debug; -#[derive(Clone, Debug)] -pub enum S3WatcherEvent {} - const POLL_FREQUENCY: Duration = Duration::from_secs(1); const NO_PREFIX: String = String::new(); // Empty string. @@ -70,7 +67,7 @@ impl S3Watcher { })); let watcher = Self { - event_tx, + event_tx: event_tx.clone(), inner: inner.clone(), }; @@ -134,7 +131,11 @@ impl S3Watcher { // as soon as a new object was completed. if is_new && !first_run { debug!(key = %path.data(), size = %size, hash = %hash, "detected newly completed S3 object"); - // @TODO: Send event, this object was just completed. + event_tx.send(S3WatcherEvent::ShareableBlobDetected( + hash, + size, + path.data(), + )); } } @@ -143,7 +144,10 @@ impl S3Watcher { for object in maybe_to_be_imported { if !inner.completed.contains(&object) { debug!(key = %object.key, size = %object.size, "detected new S3 object, needs to be imported"); - // @TODO: Send event, this object has not yet been imported! + event_tx.send(S3WatcherEvent::NewLocalBlobDetected( + object.size, + object.key, + )); } } } @@ -161,7 +165,11 @@ impl S3Watcher { }); if is_new { debug!(key = %path.data(), size = %size, hash = %hash, "detected incomplete S3 object, download needs to be resumed"); - // @TODO: Send event, this download needs to be resumed + event_tx.send(S3WatcherEvent::IncompleteBlobDetected( + hash, + size, + path.data(), + )); } } } @@ -182,3 +190,10 @@ impl S3Watcher { self.event_tx.subscribe() } } + +#[derive(Clone, Debug)] +pub enum S3WatcherEvent { + NewLocalBlobDetected(u64, String), + ShareableBlobDetected(BlobsHash, u64, String), + IncompleteBlobDetected(BlobsHash, u64, String), +}