Skip to content

Commit

Permalink
Send events when watcher detected object
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Nov 15, 2024
1 parent cd07cf6 commit d69c5d4
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions rhio/src/blobs/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ use rhio_blobs::{
use tokio::sync::{broadcast, RwLock};
use tracing::debug;

#[derive(Clone, Debug)]
pub enum S3WatcherEvent {}

const POLL_FREQUENCY: Duration = Duration::from_secs(1);

const NO_PREFIX: String = String::new(); // Empty string.
Expand Down Expand Up @@ -70,7 +67,7 @@ impl S3Watcher {
}));

let watcher = Self {
event_tx,
event_tx: event_tx.clone(),
inner: inner.clone(),
};

Expand Down Expand Up @@ -134,7 +131,11 @@ impl S3Watcher {
// as soon as a new object was completed.
if is_new && !first_run {
debug!(key = %path.data(), size = %size, hash = %hash, "detected newly completed S3 object");
// @TODO: Send event, this object was just completed.
event_tx.send(S3WatcherEvent::ShareableBlobDetected(
hash,
size,
path.data(),
));
}
}

Expand All @@ -143,7 +144,10 @@ impl S3Watcher {
for object in maybe_to_be_imported {
if !inner.completed.contains(&object) {
debug!(key = %object.key, size = %object.size, "detected new S3 object, needs to be imported");
// @TODO: Send event, this object has not yet been imported!
event_tx.send(S3WatcherEvent::NewLocalBlobDetected(
object.size,
object.key,
));
}
}
}
Expand All @@ -161,7 +165,11 @@ impl S3Watcher {
});
if is_new {
debug!(key = %path.data(), size = %size, hash = %hash, "detected incomplete S3 object, download needs to be resumed");
// @TODO: Send event, this download needs to be resumed
event_tx.send(S3WatcherEvent::IncompleteBlobDetected(
hash,
size,
path.data(),
));
}
}
}
Expand All @@ -182,3 +190,10 @@ impl S3Watcher {
self.event_tx.subscribe()
}
}

#[derive(Clone, Debug)]
pub enum S3WatcherEvent {
NewLocalBlobDetected(u64, String),
ShareableBlobDetected(BlobsHash, u64, String),
IncompleteBlobDetected(BlobsHash, u64, String),
}

0 comments on commit d69c5d4

Please sign in to comment.