Skip to content

Commit 90258c5

Browse files
committed
fix: properly check local availability before downloading
1 parent 44cf421 commit 90258c5

File tree

5 files changed

+105
-30
lines changed

5 files changed

+105
-30
lines changed

iroh-blobs/src/downloader.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ use tokio_util::{sync::CancellationToken, time::delay_queue};
5151
use tracing::{debug, error_span, trace, warn, Instrument};
5252

5353
use crate::{
54-
get::{
55-
db::{BlobId, DownloadProgress},
56-
Stats,
57-
},
54+
get::{db::DownloadProgress, Stats},
5855
metrics::Metrics,
5956
store::Store,
6057
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
@@ -119,7 +116,13 @@ pub trait Getter {
119116
) -> GetFut;
120117

121118
/// Checks if a blob is available in the local store.
122-
fn has_complete(&mut self, kind: DownloadKind) -> impl Future<Output = Option<u64>>;
119+
///
120+
/// If it exists and is fully complete, emit progress events as if we downloaded the blob.
121+
fn check_local(
122+
&mut self,
123+
kind: DownloadKind,
124+
progress: Option<ProgressSubscriber>,
125+
) -> impl Future<Output = anyhow::Result<bool>>;
123126
}
124127

125128
/// Concurrency limits for the [`Downloader`].
@@ -675,18 +678,13 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
675678
on_progress: progress,
676679
};
677680

678-
// early exit if the hash is already available.
679-
if let Some(local_size) = self.getter.has_complete(kind).await {
680-
intent_handlers
681-
.on_progress
682-
.send(DownloadProgress::FoundLocal {
683-
child: BlobId::Root,
684-
hash: kind.hash(),
685-
size: crate::store::BaoBlobSize::Verified(local_size),
686-
valid_ranges: crate::protocol::RangeSpec::all(),
687-
})
688-
.await
689-
.ok();
681+
// early exit if the blob/collection is already complete locally.
682+
if self
683+
.getter
684+
.check_local(kind, intent_handlers.on_progress.clone())
685+
.await
686+
.unwrap_or(false)
687+
{
690688
self.finalize_download(
691689
kind,
692690
[(intent_id, intent_handlers)].into(),

iroh-blobs/src/downloader/get.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
//! [`Connection`]: iroh_net::endpoint::Connection
44
55
use crate::{
6-
get::{db::get_to_db, error::GetError},
7-
store::{MapEntry, Store},
6+
downloader::progress::ProgressSubscriber,
7+
get::{
8+
db::{check_local_with_progress_if_complete, get_to_db},
9+
error::GetError,
10+
},
11+
store::Store,
812
};
913
use futures_lite::FutureExt;
1014
#[cfg(feature = "metrics")]
@@ -82,12 +86,11 @@ impl<S: Store> Getter for IoGetter<S> {
8286
fut.boxed_local()
8387
}
8488

85-
async fn has_complete(&mut self, kind: DownloadKind) -> Option<u64> {
86-
let entry = self.store.get(&kind.hash()).await.ok().flatten()?;
87-
if entry.is_complete() {
88-
Some(entry.size().value())
89-
} else {
90-
None
91-
}
89+
async fn check_local(
90+
&mut self,
91+
kind: DownloadKind,
92+
progress: Option<ProgressSubscriber>,
93+
) -> anyhow::Result<bool> {
94+
check_local_with_progress_if_complete(&self.store, &kind.hash_and_format(), progress).await
9295
}
9396
}

iroh-blobs/src/downloader/test/getter.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,12 @@ impl Getter for TestingGetter {
5656
.boxed_local()
5757
}
5858

59-
async fn has_complete(&mut self, _kind: DownloadKind) -> Option<u64> {
60-
None
59+
async fn check_local(
60+
&mut self,
61+
_kind: DownloadKind,
62+
_progress: Option<ProgressSubscriber>,
63+
) -> anyhow::Result<bool> {
64+
Ok(false)
6165
}
6266
}
6367

iroh-blobs/src/get/db.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,73 @@ pub async fn get_to_db<
5959
}
6060
}
6161

62+
/// Checks if a blob or collection exists fully in the local store.
63+
///
64+
/// Returns `true` if the blob is complete (and for hashseqs if all children are complete too).
65+
///
66+
/// Emits the same sequence of progress events as if we were downloading the blob, but only if the
67+
/// return value is `true`, i.e. if the blob (and all children for hashseqs) are fully available in
68+
/// the store. If the return value is `false`, no events will be emitted at all.
69+
pub async fn check_local_with_progress_if_complete<D: BaoStore>(
70+
db: &D,
71+
hash_and_format: &HashAndFormat,
72+
progress: Option<impl ProgressSender<Msg = DownloadProgress> + IdGenerator>,
73+
) -> anyhow::Result<bool> {
74+
// We collect all progress events that should be emitted if the blob/hashseq is complete.
75+
// We do not emit them right away because we don't want to emit any events in case the
76+
// blob/hashseq is not complete.
77+
let mut progress_events = vec![];
78+
79+
// Check if the root blob is fully available.
80+
let HashAndFormat { hash, format } = *hash_and_format;
81+
let entry = match db.get(&hash).await? {
82+
Some(entry) if entry.is_complete() => entry,
83+
_ => return Ok(false),
84+
};
85+
progress_events.push(DownloadProgress::FoundLocal {
86+
child: BlobId::Root,
87+
hash,
88+
size: entry.size(),
89+
valid_ranges: RangeSpec::all(),
90+
});
91+
92+
match format {
93+
BlobFormat::Raw => {
94+
// For a raw blob, we're done.
95+
}
96+
BlobFormat::HashSeq => {
97+
// For a hashseq, check if all children are complete.
98+
let reader = entry.data_reader().await?;
99+
let (mut hash_seq, children) = parse_hash_seq(reader).await.map_err(|err| {
100+
GetError::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}"))
101+
})?;
102+
progress_events.push(DownloadProgress::FoundHashSeq { hash, children });
103+
104+
let mut children: Vec<Hash> = vec![];
105+
while let Some(hash) = hash_seq.next().await? {
106+
children.push(hash);
107+
}
108+
for hash in children {
109+
let entry = match db.get(&hash).await? {
110+
Some(entry) if entry.is_complete() => entry,
111+
_ => return Ok(false),
112+
};
113+
progress_events.push(DownloadProgress::FoundLocal {
114+
child: BlobId::Root,
115+
hash,
116+
size: entry.size(),
117+
valid_ranges: RangeSpec::all(),
118+
});
119+
}
120+
}
121+
}
122+
123+
for event in progress_events {
124+
progress.send(event).await?;
125+
}
126+
Ok(true)
127+
}
128+
62129
/// Get a blob that was requested completely.
63130
///
64131
/// We need to create our own files and handle the case where an outboard

iroh/src/node/rpc.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use futures_lite::{Stream, StreamExt};
99
use genawaiter::sync::{Co, Gen};
1010
use iroh_base::rpc::{RpcError, RpcResult};
1111
use iroh_blobs::downloader::{DownloadRequest, Downloader};
12-
use iroh_blobs::export::ExportProgress;
1312
use iroh_blobs::format::collection::Collection;
1413
use iroh_blobs::get::db::DownloadProgress;
1514
use iroh_blobs::get::Stats;
@@ -18,6 +17,7 @@ use iroh_blobs::util::local_pool::LocalPoolHandle;
1817
use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender};
1918
use iroh_blobs::util::SetTagOption;
2019
use iroh_blobs::BlobFormat;
20+
use iroh_blobs::{export::ExportProgress, get::db::check_local_with_progress_if_complete};
2121
use iroh_blobs::{
2222
provider::AddProgress,
2323
store::{Store as BaoStore, ValidateProgress},
@@ -1201,6 +1201,9 @@ async fn download_direct_from_nodes<D>(
12011201
where
12021202
D: BaoStore,
12031203
{
1204+
if check_local_with_progress_if_complete(db, &hash_and_format, Some(progress.clone())).await? {
1205+
return Ok(Default::default());
1206+
}
12041207
let mut last_err = None;
12051208
for node in nodes {
12061209
let node_id = node.node_id;
@@ -1226,7 +1229,7 @@ where
12261229
}
12271230
match last_err {
12281231
Some(err) => Err(err),
1229-
None => Err(anyhow!("No nodes to download from provided"))
1232+
None => Err(anyhow!("No nodes to download from provided")),
12301233
}
12311234
}
12321235

0 commit comments

Comments
 (0)