Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh): don't hit network for complete blobs&collections #2576

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fmt,
future::Future,
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -82,6 +83,8 @@ pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Un
fn pending_count(&self) -> usize;
/// Check if a node is being dialed.
fn is_pending(&self, node: NodeId) -> bool;
/// Get the node id of our node.
fn node_id(&self) -> NodeId;
}

/// Signals what should be done with the request when it fails.
Expand Down Expand Up @@ -111,6 +114,15 @@ pub trait Getter {
conn: Self::Connection,
progress_sender: BroadcastProgressSender,
) -> GetFut;

/// Checks if a blob is available in the local store.
///
/// If it exists and is fully complete, emit progress events as if we downloaded the blob.
fn check_local(
&mut self,
kind: DownloadKind,
progress: Option<ProgressSubscriber>,
) -> impl Future<Output = anyhow::Result<bool>>;
}

/// Concurrency limits for the [`Downloader`].
Expand Down Expand Up @@ -280,7 +292,7 @@ pub struct DownloadHandle {
receiver: oneshot::Receiver<ExternalDownloadResult>,
}

impl std::future::Future for DownloadHandle {
impl Future for DownloadHandle {
type Output = ExternalDownloadResult;

fn poll(
Expand Down Expand Up @@ -666,20 +678,38 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
on_progress: progress,
};

// early exit if no providers.
if nodes.is_empty() && self.providers.get_candidates(&kind.hash()).next().is_none() {
// early exit if the blob/collection is already complete locally.
if self
.getter
.check_local(kind, intent_handlers.on_progress.clone())
.await
.unwrap_or(false)
{
self.finalize_download(
kind,
[(intent_id, intent_handlers)].into(),
Err(DownloadError::NoProviders),
Ok(Default::default()),
);
return;
}

// add the nodes to the provider map
let updated = self
.providers
.add_hash_with_nodes(kind.hash(), nodes.iter().map(|n| n.node_id));
// (skip the node id of our own node - we should never attempt to download from ourselves)
let node_ids = nodes
.iter()
.map(|n| n.node_id)
.filter(|node_id| *node_id != self.dialer.node_id());
let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids);

// early exit if no providers.
if self.providers.get_candidates(&kind.hash()).next().is_none() {
self.finalize_download(
kind,
[(intent_id, intent_handlers)].into(),
Err(DownloadError::NoProviders),
);
return;
}

// queue the transfer (if not running) or attach to transfer progress (if already running)
if self.active_requests.contains_key(&kind) {
Expand Down Expand Up @@ -860,7 +890,6 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
) {
self.progress_tracker.remove(&kind);
self.remove_hash_if_not_queued(&kind.hash());
let result = result.map_err(|_| DownloadError::DownloadFailed);
for (_id, handlers) in intents.into_iter() {
handlers.on_finish.send(result.clone()).ok();
}
Expand Down Expand Up @@ -1433,4 +1462,8 @@ impl Dialer for iroh_net::dialer::Dialer {
fn is_pending(&self, node: NodeId) -> bool {
self.is_pending(node)
}

fn node_id(&self) -> NodeId {
self.endpoint().node_id()
}
}
14 changes: 13 additions & 1 deletion iroh-blobs/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
//! [`Connection`]: iroh_net::endpoint::Connection

use crate::{
get::{db::get_to_db, error::GetError},
downloader::progress::ProgressSubscriber,
get::{
db::{check_local_with_progress_if_complete, get_to_db},
error::GetError,
},
store::Store,
};
use futures_lite::FutureExt;
Expand Down Expand Up @@ -81,4 +85,12 @@ impl<S: Store> Getter for IoGetter<S> {
};
fut.boxed_local()
}

async fn check_local(
&mut self,
kind: DownloadKind,
progress: Option<ProgressSubscriber>,
) -> anyhow::Result<bool> {
check_local_with_progress_if_complete(&self.store, &kind.hash_and_format(), progress).await
}
}
7 changes: 7 additions & 0 deletions iroh-blobs/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ struct TestingDialerInner {
dial_duration: Duration,
/// Fn deciding if a dial is successful.
dial_outcome: Box<dyn Fn(NodeId) -> bool + Send + Sync + 'static>,
/// Our own node id
node_id: NodeId,
}

impl Default for TestingDialerInner {
Expand All @@ -31,6 +33,7 @@ impl Default for TestingDialerInner {
dial_history: Vec::default(),
dial_duration: Duration::from_millis(10),
dial_outcome: Box::new(|_| true),
node_id: NodeId::from_bytes(&[0u8; 32]).unwrap(),
}
}
}
Expand All @@ -55,6 +58,10 @@ impl Dialer for TestingDialer {
fn is_pending(&self, node: NodeId) -> bool {
self.0.read().dialing.contains(&node)
}

fn node_id(&self) -> NodeId {
self.0.read().node_id
}
}

impl Stream for TestingDialer {
Expand Down
8 changes: 8 additions & 0 deletions iroh-blobs/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ impl Getter for TestingGetter {
}
.boxed_local()
}

async fn check_local(
&mut self,
_kind: DownloadKind,
_progress: Option<ProgressSubscriber>,
) -> anyhow::Result<bool> {
Ok(false)
}
}

impl TestingGetter {
Expand Down
67 changes: 67 additions & 0 deletions iroh-blobs/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,73 @@ pub async fn get_to_db<
}
}

/// Checks if a blob or collection exists fully in the local store.
///
/// Returns `true` if the blob is complete (and for hashseqs if all children are complete too).
///
/// Emits the same sequence of progress events as if we were downloading the blob, but only if the
/// return value is `true`, i.e. if the blob (and all children for hashseqs) are fully available in
/// the store. If the return value is `false`, no events will be emitted at all.
pub async fn check_local_with_progress_if_complete<D: BaoStore>(
db: &D,
hash_and_format: &HashAndFormat,
progress: Option<impl ProgressSender<Msg = DownloadProgress> + IdGenerator>,
) -> anyhow::Result<bool> {
// We collect all progress events that should be emitted if the blob/hashseq is complete.
// We do not emit them right away because we don't want to emit any events in case the
// blob/hashseq is not complete.
let mut progress_events = vec![];

// Check if the root blob is fully available.
let HashAndFormat { hash, format } = *hash_and_format;
let entry = match db.get(&hash).await? {
Some(entry) if entry.is_complete() => entry,
_ => return Ok(false),
};
progress_events.push(DownloadProgress::FoundLocal {
child: BlobId::Root,
hash,
size: entry.size(),
valid_ranges: RangeSpec::all(),
});

match format {
BlobFormat::Raw => {
// For a raw blob, we're done.
}
BlobFormat::HashSeq => {
// For a hashseq, check if all children are complete.
let reader = entry.data_reader().await?;
let (mut hash_seq, children) = parse_hash_seq(reader).await.map_err(|err| {
GetError::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}"))
})?;
progress_events.push(DownloadProgress::FoundHashSeq { hash, children });

let mut children: Vec<Hash> = vec![];
while let Some(hash) = hash_seq.next().await? {
children.push(hash);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why collect first, instead of directly doing the db checks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because with the PR as-is, we may only emit events here if everything is complete. Otherwise the same events would be emitted again when actually starting the download.

This is suboptimal for sure. See #2586 for a revised approach that does not need to collect events by using a generator and turning get_to_db into a two-step process.

}
for hash in children {
let entry = match db.get(&hash).await? {
Some(entry) if entry.is_complete() => entry,
_ => return Ok(false),
};
progress_events.push(DownloadProgress::FoundLocal {
child: BlobId::Root,
hash,
size: entry.size(),
valid_ranges: RangeSpec::all(),
});
}
}
}

for event in progress_events {
progress.send(event).await?;
}
Ok(true)
}

/// Get a blob that was requested completely.
///
/// We need to create our own files and handle the case where an outboard
Expand Down
5 changes: 5 additions & 0 deletions iroh-net/src/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Dialer {
pub fn pending_count(&self) -> usize {
self.pending_dials.len()
}

/// Returns a reference to the endpoint used in this dialer.
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}
}

impl Stream for Dialer {
Expand Down
Loading
Loading