Skip to content

Commit

Permalink
Resume incomplete downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Nov 16, 2024
1 parent 89a4a4f commit 8c52612
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
12 changes: 6 additions & 6 deletions rhio/src/blobs/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum ToBlobsActor {
},
DownloadBlob {
hash: BlobHash,
bucket: ScopedBucket,
bucket_name: BucketName,
key: ObjectKey,
size: ObjectSize,
reply: oneshot::Sender<Result<()>>,
Expand Down Expand Up @@ -118,12 +118,12 @@ impl BlobsActor {
}
ToBlobsActor::DownloadBlob {
hash,
bucket,
bucket_name,
key,
size,
reply,
} => {
let result = self.on_download_blob(hash, bucket, key, size).await;
let result = self.on_download_blob(hash, bucket_name, key, size).await;
reply.send(result).ok();
}
ToBlobsActor::CompleteBlobs { reply } => {
Expand All @@ -145,12 +145,12 @@ impl BlobsActor {
async fn on_download_blob(
&mut self,
hash: BlobHash,
bucket: ScopedBucket,
bucket_name: BucketName,
key: ObjectKey,
size: ObjectSize,
) -> Result<()> {
self.store
.blob_discovered(hash, &bucket.bucket_name(), key.clone(), size)
.blob_discovered(hash, &bucket_name, key.clone(), size)
.await?;

let hash = Hash::from_bytes(*hash.as_bytes());
Expand All @@ -161,7 +161,7 @@ impl BlobsActor {
error!(%err, "failed downloading blob");
}
DownloadBlobEvent::Done => {
debug!(%hash, bucket = %bucket.bucket_name(), %key, %size, "finished downloading blob");
debug!(%hash, %bucket_name, %key, %size, "finished downloading blob");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions rhio/src/blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ impl Blobs {
pub async fn download(
&self,
hash: BlobHash,
bucket: ScopedBucket,
bucket_name: BucketName,
key: ObjectKey,
size: ObjectSize,
) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::DownloadBlob {
hash,
bucket,
bucket_name,
key,
size,
reply,
Expand Down
24 changes: 19 additions & 5 deletions rhio/src/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ impl NodeActor {
match network_message.payload {
NetworkPayload::BlobAnnouncement(hash, bucket, key, size) => {
if is_bucket_matching(&self.subscriptions, &bucket) {
self.blobs.download(hash, bucket, key, size).await?;
self.blobs
.download(hash, bucket.bucket_name(), key, size)
.await?;
}
}
NetworkPayload::NatsMessage(message) => {
Expand Down Expand Up @@ -346,10 +348,9 @@ impl NodeActor {
self.on_import_finished(hash, bucket_name, key, size)
.await?;
}
// We've detected an uncomplete blob, probably the process was exited before the
// download finished.
S3Event::DetectedIncompleteBlob(_hash, _bucket_name, _key, _size) => {
// @TODO: Resume download!
S3Event::DetectedIncompleteBlob(hash, bucket_name, key, size) => {
self.on_incomplete_blob_detected(hash, bucket_name, key, size)
.await?;
}
}

Expand Down Expand Up @@ -396,6 +397,19 @@ impl NodeActor {
Ok(())
}

/// Handler when incomplete blob was detected, probably the process was exited before the
/// download hash finished.
async fn on_incomplete_blob_detected(
&self,
hash: BlobHash,
bucket_name: BucketName,
key: ObjectKey,
size: ObjectSize,
) -> Result<()> {
self.blobs.download(hash, bucket_name, key, size).await?;
Ok(())
}

async fn shutdown(&self) -> Result<()> {
self.nats.shutdown().await?;
self.panda.shutdown().await?;
Expand Down

0 comments on commit 8c52612

Please sign in to comment.