Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iroh): don't hit network for complete blobs&collections #2576

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fmt,
future::Future,
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, Ordering},
Expand All @@ -50,7 +51,10 @@ use tokio_util::{sync::CancellationToken, time::delay_queue};
use tracing::{debug, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
get::{
db::{BlobId, DownloadProgress},
Stats,
},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
Expand Down Expand Up @@ -82,6 +86,8 @@ pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Un
fn pending_count(&self) -> usize;
/// Check if a node is being dialed.
fn is_pending(&self, node: NodeId) -> bool;
/// Get the node id of our node.
fn node_id(&self) -> NodeId;
}

/// Signals what should be done with the request when it fails.
Expand Down Expand Up @@ -111,6 +117,9 @@ pub trait Getter {
conn: Self::Connection,
progress_sender: BroadcastProgressSender,
) -> GetFut;

/// Checks if a blob is available in the local store.
fn has_complete(&mut self, kind: DownloadKind) -> impl Future<Output = Option<u64>>;
}

/// Concurrency limits for the [`Downloader`].
Expand Down Expand Up @@ -280,7 +289,7 @@ pub struct DownloadHandle {
receiver: oneshot::Receiver<ExternalDownloadResult>,
}

impl std::future::Future for DownloadHandle {
impl Future for DownloadHandle {
type Output = ExternalDownloadResult;

fn poll(
Expand Down Expand Up @@ -666,20 +675,43 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
on_progress: progress,
};

// early exit if no providers.
if nodes.is_empty() && self.providers.get_candidates(&kind.hash()).next().is_none() {
// early exit if the hash is already available.
if let Some(local_size) = self.getter.has_complete(kind).await {
intent_handlers
.on_progress
.send(DownloadProgress::FoundLocal {
child: BlobId::Root,
hash: kind.hash(),
size: crate::store::BaoBlobSize::Verified(local_size),
valid_ranges: crate::protocol::RangeSpec::all(),
})
.await
.ok();
self.finalize_download(
kind,
[(intent_id, intent_handlers)].into(),
Err(DownloadError::NoProviders),
Ok(Default::default()),
);
return;
}

// add the nodes to the provider map
let updated = self
.providers
.add_hash_with_nodes(kind.hash(), nodes.iter().map(|n| n.node_id));
// (skip the node id of our own node - we should never attempt to download from ourselves)
let node_ids = nodes
.iter()
.map(|n| n.node_id)
.filter(|node_id| *node_id != self.dialer.node_id());
let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids);

// early exit if no providers.
if self.providers.get_candidates(&kind.hash()).next().is_none() {
self.finalize_download(
kind,
[(intent_id, intent_handlers)].into(),
Err(DownloadError::NoProviders),
);
return;
}

// queue the transfer (if not running) or attach to transfer progress (if already running)
if self.active_requests.contains_key(&kind) {
Expand Down Expand Up @@ -1433,4 +1465,8 @@ impl Dialer for iroh_net::dialer::Dialer {
fn is_pending(&self, node: NodeId) -> bool {
self.is_pending(node)
}

fn node_id(&self) -> NodeId {
self.endpoint().node_id()
}
}
11 changes: 10 additions & 1 deletion iroh-blobs/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
get::{db::get_to_db, error::GetError},
store::Store,
store::{MapEntry, Store},
};
use futures_lite::FutureExt;
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -81,4 +81,13 @@ impl<S: Store> Getter for IoGetter<S> {
};
fut.boxed_local()
}

async fn has_complete(&mut self, kind: DownloadKind) -> Option<u64> {
let entry = self.store.get(&kind.hash()).await.ok().flatten()?;
if entry.is_complete() {
Some(entry.size().value())
} else {
None
}
}
}
7 changes: 7 additions & 0 deletions iroh-blobs/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ struct TestingDialerInner {
dial_duration: Duration,
/// Fn deciding if a dial is successful.
dial_outcome: Box<dyn Fn(NodeId) -> bool + Send + Sync + 'static>,
/// Our own node id
node_id: NodeId,
}

impl Default for TestingDialerInner {
Expand All @@ -31,6 +33,7 @@ impl Default for TestingDialerInner {
dial_history: Vec::default(),
dial_duration: Duration::from_millis(10),
dial_outcome: Box::new(|_| true),
node_id: NodeId::from_bytes(&[0u8; 32]).unwrap(),
}
}
}
Expand All @@ -55,6 +58,10 @@ impl Dialer for TestingDialer {
fn is_pending(&self, node: NodeId) -> bool {
self.0.read().dialing.contains(&node)
}

fn node_id(&self) -> NodeId {
self.0.read().node_id
}
}

impl Stream for TestingDialer {
Expand Down
4 changes: 4 additions & 0 deletions iroh-blobs/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl Getter for TestingGetter {
}
.boxed_local()
}

async fn has_complete(&mut self, _kind: DownloadKind) -> Option<u64> {
None
}
}

impl TestingGetter {
Expand Down
5 changes: 5 additions & 0 deletions iroh-net/src/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Dialer {
pub fn pending_count(&self) -> usize {
self.pending_dials.len()
}

/// Returns a reference to the endpoint used in this dialer.
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}
}

impl Stream for Dialer {
Expand Down
87 changes: 87 additions & 0 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,4 +1248,91 @@ mod tests {

Ok(())
}

/// Download a blob from oneself
#[tokio::test]
async fn test_blob_get_self() -> Result<()> {
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
let _guard = iroh_test::logging::setup();

let node = crate::node::Node::memory().spawn().await?;

// create temp file
let temp_dir = tempfile::tempdir().context("tempdir")?;

let in_root = temp_dir.path().join("in");
tokio::fs::create_dir_all(in_root.clone())
.await
.context("create dir all")?;

let path = in_root.join("test-blob");
let size = 1024 * 128;
let buf: Vec<u8> = (0..size).map(|i| i as u8).collect();
let mut file = tokio::fs::File::create(path.clone())
.await
.context("create file")?;
file.write_all(&buf.clone()).await.context("write_all")?;
file.flush().await.context("flush")?;

let client = node.client();

let import_outcome = client
.blobs()
.add_from_path(
path.to_path_buf(),
false,
SetTagOption::Auto,
WrapOption::NoWrap,
)
.await
.context("import file")?
.finish()
.await
.context("import finish")?;

let hash = import_outcome.hash;

let node_id = node.node_id();

// Direct
let res = client
.blobs()
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Direct,
},
)
.await
.context("direct")?
.await
.context("direct")?;

assert_eq!(res.local_size, size);
assert_eq!(res.downloaded_size, 0);

// Queued
let res = client
.blobs()
.download_with_opts(
hash,
DownloadOptions {
format: BlobFormat::Raw,
nodes: vec![node_id.into()],
tag: SetTagOption::Auto,
mode: DownloadMode::Queued,
},
)
.await
.context("queued")?
.await
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably check the events in these tests, to make sure they look like we expect them, especially in the collection case

.context("queued")?;

assert_eq!(res.local_size, size);
assert_eq!(res.downloaded_size, 0);

Ok(())
}
}
Loading