From 0482bac6b83316d887e1dadf6aa58145e9e4df3d Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 2 Aug 2024 09:54:10 +0200 Subject: [PATCH 1/7] test: add failing test Ref #2575 --- iroh/src/client/blobs.rs | 87 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 3151c3fb1f..d0812cd8a0 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -1248,4 +1248,91 @@ mod tests { Ok(()) } + + /// Download a blob from oneself + #[tokio::test] + async fn test_blob_get_self() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = crate::node::Node::memory().spawn().await?; + + // create temp file + let temp_dir = tempfile::tempdir().context("tempdir")?; + + let in_root = temp_dir.path().join("in"); + tokio::fs::create_dir_all(in_root.clone()) + .await + .context("create dir all")?; + + let path = in_root.join("test-blob"); + let size = 1024 * 128; + let buf: Vec = (0..size).map(|i| i as u8).collect(); + let mut file = tokio::fs::File::create(path.clone()) + .await + .context("create file")?; + file.write_all(&buf.clone()).await.context("write_all")?; + file.flush().await.context("flush")?; + + let client = node.client(); + + let import_outcome = client + .blobs() + .add_from_path( + path.to_path_buf(), + false, + SetTagOption::Auto, + WrapOption::NoWrap, + ) + .await + .context("import file")? + .finish() + .await + .context("import finish")?; + + let hash = import_outcome.hash; + + let node_id = node.node_id(); + + // Direct + let res = client + .blobs() + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Direct, + }, + ) + .await + .context("direct")? + .await + .context("direct")?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + // Queued + let res = client + .blobs() + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await + .context("queued")? + .await + .context("queued")?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + Ok(()) + } } From 8845025ea57c74938c8d5e0a7277c4a45c62c062 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 2 Aug 2024 12:01:45 +0200 Subject: [PATCH 2/7] fix(iroh-blobs): in downloader, check if blob is available locally, and do not attempt to download from ourselves --- iroh-blobs/src/downloader.rs | 52 ++++++++++++++++++++---- iroh-blobs/src/downloader/get.rs | 11 ++++- iroh-blobs/src/downloader/test/dialer.rs | 7 ++++ iroh-blobs/src/downloader/test/getter.rs | 4 ++ iroh-net/src/dialer.rs | 5 +++ 5 files changed, 70 insertions(+), 9 deletions(-) diff --git a/iroh-blobs/src/downloader.rs b/iroh-blobs/src/downloader.rs index dd26a8bc6d..2ab6079423 100644 --- a/iroh-blobs/src/downloader.rs +++ b/iroh-blobs/src/downloader.rs @@ -29,6 +29,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet}, fmt, + future::Future, num::NonZeroUsize, sync::{ atomic::{AtomicU64, Ordering}, @@ -50,7 +51,10 @@ use tokio_util::{sync::CancellationToken, time::delay_queue}; use tracing::{debug, error_span, trace, warn, Instrument}; use crate::{ - get::{db::DownloadProgress, Stats}, + get::{ + db::{BlobId, DownloadProgress}, + Stats, + }, metrics::Metrics, store::Store, util::{local_pool::LocalPoolHandle, progress::ProgressSender}, @@ -82,6 +86,8 @@ pub trait Dialer: Stream)> + Un fn pending_count(&self) -> usize; /// Check if a node is being dialed. fn is_pending(&self, node: NodeId) -> bool; + /// Get the node id of our node. + fn node_id(&self) -> NodeId; } /// Signals what should be done with the request when it fails. @@ -111,6 +117,9 @@ pub trait Getter { conn: Self::Connection, progress_sender: BroadcastProgressSender, ) -> GetFut; + + /// Checks if a blob is available in the local store. + fn has_complete(&mut self, kind: DownloadKind) -> impl Future>; } /// Concurrency limits for the [`Downloader`]. @@ -280,7 +289,7 @@ pub struct DownloadHandle { receiver: oneshot::Receiver, } -impl std::future::Future for DownloadHandle { +impl Future for DownloadHandle { type Output = ExternalDownloadResult; fn poll( @@ -666,20 +675,43 @@ impl, D: Dialer> Service { on_progress: progress, }; - // early exit if no providers. - if nodes.is_empty() && self.providers.get_candidates(&kind.hash()).next().is_none() { + // early exit if the hash is already available. + if let Some(local_size) = self.getter.has_complete(kind).await { + intent_handlers + .on_progress + .send(DownloadProgress::FoundLocal { + child: BlobId::Root, + hash: kind.hash(), + size: crate::store::BaoBlobSize::Verified(local_size), + valid_ranges: crate::protocol::RangeSpec::all(), + }) + .await + .ok(); self.finalize_download( kind, [(intent_id, intent_handlers)].into(), - Err(DownloadError::NoProviders), + Ok(Default::default()), ); return; } // add the nodes to the provider map - let updated = self - .providers - .add_hash_with_nodes(kind.hash(), nodes.iter().map(|n| n.node_id)); + // (skip the node id of our own node - we should never attempt to download from ourselves) + let node_ids = nodes + .iter() + .map(|n| n.node_id) + .filter(|node_id| *node_id != self.dialer.node_id()); + let updated = self.providers.add_hash_with_nodes(kind.hash(), node_ids); + + // early exit if no providers. + if self.providers.get_candidates(&kind.hash()).next().is_none() { + self.finalize_download( + kind, + [(intent_id, intent_handlers)].into(), + Err(DownloadError::NoProviders), + ); + return; + } // queue the transfer (if not running) or attach to transfer progress (if already running) if self.active_requests.contains_key(&kind) { @@ -1433,4 +1465,8 @@ impl Dialer for iroh_net::dialer::Dialer { fn is_pending(&self, node: NodeId) -> bool { self.is_pending(node) } + + fn node_id(&self) -> NodeId { + self.endpoint().node_id() + } } diff --git a/iroh-blobs/src/downloader/get.rs b/iroh-blobs/src/downloader/get.rs index e48370d42c..8cc6cc1204 100644 --- a/iroh-blobs/src/downloader/get.rs +++ b/iroh-blobs/src/downloader/get.rs @@ -4,7 +4,7 @@ use crate::{ get::{db::get_to_db, error::GetError}, - store::Store, + store::{MapEntry, Store}, }; use futures_lite::FutureExt; #[cfg(feature = "metrics")] @@ -81,4 +81,13 @@ impl Getter for IoGetter { }; fut.boxed_local() } + + async fn has_complete(&mut self, kind: DownloadKind) -> Option { + let entry = self.store.get(&kind.hash()).await.ok().flatten()?; + if entry.is_complete() { + Some(entry.size().value()) + } else { + None + } + } } diff --git a/iroh-blobs/src/downloader/test/dialer.rs b/iroh-blobs/src/downloader/test/dialer.rs index d099552a11..fc5a939959 100644 --- a/iroh-blobs/src/downloader/test/dialer.rs +++ b/iroh-blobs/src/downloader/test/dialer.rs @@ -21,6 +21,8 @@ struct TestingDialerInner { dial_duration: Duration, /// Fn deciding if a dial is successful. dial_outcome: Box bool + Send + Sync + 'static>, + /// Our own node id + node_id: NodeId, } impl Default for TestingDialerInner { @@ -31,6 +33,7 @@ impl Default for TestingDialerInner { dial_history: Vec::default(), dial_duration: Duration::from_millis(10), dial_outcome: Box::new(|_| true), + node_id: NodeId::from_bytes(&[0u8; 32]).unwrap(), } } } @@ -55,6 +58,10 @@ impl Dialer for TestingDialer { fn is_pending(&self, node: NodeId) -> bool { self.0.read().dialing.contains(&node) } + + fn node_id(&self) -> NodeId { + self.0.read().node_id + } } impl Stream for TestingDialer { diff --git a/iroh-blobs/src/downloader/test/getter.rs b/iroh-blobs/src/downloader/test/getter.rs index 397f1134f1..2afe7a4b9c 100644 --- a/iroh-blobs/src/downloader/test/getter.rs +++ b/iroh-blobs/src/downloader/test/getter.rs @@ -55,6 +55,10 @@ impl Getter for TestingGetter { } .boxed_local() } + + async fn has_complete(&mut self, _kind: DownloadKind) -> Option { + None + } } impl TestingGetter { diff --git a/iroh-net/src/dialer.rs b/iroh-net/src/dialer.rs index 7a7685d97b..8c37b08c08 100644 --- a/iroh-net/src/dialer.rs +++ b/iroh-net/src/dialer.rs @@ -99,6 +99,11 @@ impl Dialer { pub fn pending_count(&self) -> usize { self.pending_dials.len() } + + /// Returns a reference to the endpoint used in this dialer. + pub fn endpoint(&self) -> &Endpoint { + &self.endpoint + } } impl Stream for Dialer { From 44cf42169df2f518f12d22fb42783c276a061c8a Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 2 Aug 2024 12:30:31 +0200 Subject: [PATCH 3/7] fix/test: never download from ourselves, also not in direct mode --- iroh-blobs/src/downloader.rs | 1 - iroh/src/client/blobs.rs | 104 +++++++++++++++++++++-------------- iroh/src/node/rpc.rs | 12 +++- 3 files changed, 71 insertions(+), 46 deletions(-) diff --git a/iroh-blobs/src/downloader.rs b/iroh-blobs/src/downloader.rs index 2ab6079423..8b941673bf 100644 --- a/iroh-blobs/src/downloader.rs +++ b/iroh-blobs/src/downloader.rs @@ -892,7 +892,6 @@ impl, D: Dialer> Service { ) { self.progress_tracker.remove(&kind); self.remove_hash_if_not_queued(&kind.hash()); - let result = result.map_err(|_| DownloadError::DownloadFailed); for (_id, handlers) in intents.into_iter() { handlers.on_finish.send(result.clone()).ok(); } diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index d0812cd8a0..6ccd148cb3 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -1249,49 +1249,17 @@ mod tests { Ok(()) } - /// Download a blob from oneself + /// Download a existing blob from oneself #[tokio::test] - async fn test_blob_get_self() -> Result<()> { + async fn test_blob_get_self_existing() -> Result<()> { let _guard = iroh_test::logging::setup(); let node = crate::node::Node::memory().spawn().await?; - - // create temp file - let temp_dir = tempfile::tempdir().context("tempdir")?; - - let in_root = temp_dir.path().join("in"); - tokio::fs::create_dir_all(in_root.clone()) - .await - .context("create dir all")?; - - let path = in_root.join("test-blob"); - let size = 1024 * 128; - let buf: Vec = (0..size).map(|i| i as u8).collect(); - let mut file = tokio::fs::File::create(path.clone()) - .await - .context("create file")?; - file.write_all(&buf.clone()).await.context("write_all")?; - file.flush().await.context("flush")?; - + let node_id = node.node_id(); let client = node.client(); - let import_outcome = client - .blobs() - .add_from_path( - path.to_path_buf(), - false, - SetTagOption::Auto, - WrapOption::NoWrap, - ) - .await - .context("import file")? - .finish() - .await - .context("import finish")?; - - let hash = import_outcome.hash; - - let node_id = node.node_id(); + let AddOutcome { hash, size, .. } = + client.blobs().add_bytes("foo").await.context("add bytes")?; // Direct let res = client @@ -1305,10 +1273,9 @@ mod tests { mode: DownloadMode::Direct, }, ) + .await? .await - .context("direct")? - .await - .context("direct")?; + .context("direct (download)")?; assert_eq!(res.local_size, size); assert_eq!(res.downloaded_size, 0); @@ -1325,8 +1292,7 @@ mod tests { mode: DownloadMode::Queued, }, ) - .await - .context("queued")? + .await? .await .context("queued")?; @@ -1335,4 +1301,58 @@ mod tests { Ok(()) } + + /// Download a missing blob from oneself + #[tokio::test] + async fn test_blob_get_self_missing() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = crate::node::Node::memory().spawn().await?; + let node_id = node.node_id(); + let client = node.client(); + + let hash = Hash::from_bytes([0u8; 32]); + + // Direct + let res = client + .blobs() + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Direct, + }, + ) + .await? + .await; + assert!(res.is_err()); + assert_eq!( + res.err().unwrap().to_string().as_str(), + "No nodes to download from provided" + ); + + // Queued + let res = client + .blobs() + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::Raw, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await? + .await; + assert!(res.is_err()); + assert_eq!( + res.err().unwrap().to_string().as_str(), + "No provider nodes found" + ); + + Ok(()) + } } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 467e91d402..49e1cd0dca 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -3,7 +3,7 @@ use std::io; use std::sync::{Arc, Mutex}; use std::time::Duration; -use anyhow::{anyhow, ensure, Result}; +use anyhow::{anyhow, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; @@ -1201,10 +1201,13 @@ async fn download_direct_from_nodes( where D: BaoStore, { - ensure!(!nodes.is_empty(), "No nodes to download from provided."); let mut last_err = None; for node in nodes { let node_id = node.node_id; + // never attempt to download from ourselves. + if node_id == endpoint.node_id() { + continue; + } match download_direct( db, endpoint.clone(), @@ -1221,7 +1224,10 @@ where } } } - Err(last_err.unwrap()) + match last_err { + Some(err) => Err(err), + None => Err(anyhow!("No nodes to download from provided")) + } } async fn download_direct( From 90258c5f3894009f694f2f64dc60a8d8eadda513 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 2 Aug 2024 16:06:44 +0200 Subject: [PATCH 4/7] fix: properly check local availability before downloading --- iroh-blobs/src/downloader.rs | 32 ++++++----- iroh-blobs/src/downloader/get.rs | 21 ++++---- iroh-blobs/src/downloader/test/getter.rs | 8 ++- iroh-blobs/src/get/db.rs | 67 ++++++++++++++++++++++++ iroh/src/node/rpc.rs | 7 ++- 5 files changed, 105 insertions(+), 30 deletions(-) diff --git a/iroh-blobs/src/downloader.rs b/iroh-blobs/src/downloader.rs index 8b941673bf..1505324458 100644 --- a/iroh-blobs/src/downloader.rs +++ b/iroh-blobs/src/downloader.rs @@ -51,10 +51,7 @@ use tokio_util::{sync::CancellationToken, time::delay_queue}; use tracing::{debug, error_span, trace, warn, Instrument}; use crate::{ - get::{ - db::{BlobId, DownloadProgress}, - Stats, - }, + get::{db::DownloadProgress, Stats}, metrics::Metrics, store::Store, util::{local_pool::LocalPoolHandle, progress::ProgressSender}, @@ -119,7 +116,13 @@ pub trait Getter { ) -> GetFut; /// Checks if a blob is available in the local store. - fn has_complete(&mut self, kind: DownloadKind) -> impl Future>; + /// + /// If it exists and is fully complete, emit progress events as if we downloaded the blob. + fn check_local( + &mut self, + kind: DownloadKind, + progress: Option, + ) -> impl Future>; } /// Concurrency limits for the [`Downloader`]. @@ -675,18 +678,13 @@ impl, D: Dialer> Service { on_progress: progress, }; - // early exit if the hash is already available. - if let Some(local_size) = self.getter.has_complete(kind).await { - intent_handlers - .on_progress - .send(DownloadProgress::FoundLocal { - child: BlobId::Root, - hash: kind.hash(), - size: crate::store::BaoBlobSize::Verified(local_size), - valid_ranges: crate::protocol::RangeSpec::all(), - }) - .await - .ok(); + // early exit if the blob/collection is already complete locally. + if self + .getter + .check_local(kind, intent_handlers.on_progress.clone()) + .await + .unwrap_or(false) + { self.finalize_download( kind, [(intent_id, intent_handlers)].into(), diff --git a/iroh-blobs/src/downloader/get.rs b/iroh-blobs/src/downloader/get.rs index 8cc6cc1204..a154de9036 100644 --- a/iroh-blobs/src/downloader/get.rs +++ b/iroh-blobs/src/downloader/get.rs @@ -3,8 +3,12 @@ //! [`Connection`]: iroh_net::endpoint::Connection use crate::{ - get::{db::get_to_db, error::GetError}, - store::{MapEntry, Store}, + downloader::progress::ProgressSubscriber, + get::{ + db::{check_local_with_progress_if_complete, get_to_db}, + error::GetError, + }, + store::Store, }; use futures_lite::FutureExt; #[cfg(feature = "metrics")] @@ -82,12 +86,11 @@ impl Getter for IoGetter { fut.boxed_local() } - async fn has_complete(&mut self, kind: DownloadKind) -> Option { - let entry = self.store.get(&kind.hash()).await.ok().flatten()?; - if entry.is_complete() { - Some(entry.size().value()) - } else { - None - } + async fn check_local( + &mut self, + kind: DownloadKind, + progress: Option, + ) -> anyhow::Result { + check_local_with_progress_if_complete(&self.store, &kind.hash_and_format(), progress).await } } diff --git a/iroh-blobs/src/downloader/test/getter.rs b/iroh-blobs/src/downloader/test/getter.rs index 2afe7a4b9c..98de2bcea5 100644 --- a/iroh-blobs/src/downloader/test/getter.rs +++ b/iroh-blobs/src/downloader/test/getter.rs @@ -56,8 +56,12 @@ impl Getter for TestingGetter { .boxed_local() } - async fn has_complete(&mut self, _kind: DownloadKind) -> Option { - None + async fn check_local( + &mut self, + _kind: DownloadKind, + _progress: Option, + ) -> anyhow::Result { + Ok(false) } } diff --git a/iroh-blobs/src/get/db.rs b/iroh-blobs/src/get/db.rs index 08ef2f82c7..af58f3aa42 100644 --- a/iroh-blobs/src/get/db.rs +++ b/iroh-blobs/src/get/db.rs @@ -59,6 +59,73 @@ pub async fn get_to_db< } } +/// Checks if a blob or collection exists fully in the local store. +/// +/// Returns `true` if the blob is complete (and for hashseqs if all children are complete too). +/// +/// Emits the same sequence of progress events as if we were downloading the blob, but only if the +/// return value is `true`, i.e. if the blob (and all children for hashseqs) are fully available in +/// the store. If the return value is `false`, no events will be emitted at all. +pub async fn check_local_with_progress_if_complete( + db: &D, + hash_and_format: &HashAndFormat, + progress: Option + IdGenerator>, +) -> anyhow::Result { + // We collect all progress events that should be emitted if the blob/hashseq is complete. + // We do not emit them right away because we don't want to emit any events in case the + // blob/hashseq is not complete. + let mut progress_events = vec![]; + + // Check if the root blob is fully available. + let HashAndFormat { hash, format } = *hash_and_format; + let entry = match db.get(&hash).await? { + Some(entry) if entry.is_complete() => entry, + _ => return Ok(false), + }; + progress_events.push(DownloadProgress::FoundLocal { + child: BlobId::Root, + hash, + size: entry.size(), + valid_ranges: RangeSpec::all(), + }); + + match format { + BlobFormat::Raw => { + // For a raw blob, we're done. + } + BlobFormat::HashSeq => { + // For a hashseq, check if all children are complete. + let reader = entry.data_reader().await?; + let (mut hash_seq, children) = parse_hash_seq(reader).await.map_err(|err| { + GetError::NoncompliantNode(anyhow!("Failed to parse downloaded HashSeq: {err}")) + })?; + progress_events.push(DownloadProgress::FoundHashSeq { hash, children }); + + let mut children: Vec = vec![]; + while let Some(hash) = hash_seq.next().await? { + children.push(hash); + } + for hash in children { + let entry = match db.get(&hash).await? { + Some(entry) if entry.is_complete() => entry, + _ => return Ok(false), + }; + progress_events.push(DownloadProgress::FoundLocal { + child: BlobId::Root, + hash, + size: entry.size(), + valid_ranges: RangeSpec::all(), + }); + } + } + } + + for event in progress_events { + progress.send(event).await?; + } + Ok(true) +} + /// Get a blob that was requested completely. /// /// We need to create our own files and handle the case where an outboard diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 49e1cd0dca..178736f153 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -9,7 +9,6 @@ use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; -use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; @@ -18,6 +17,7 @@ use iroh_blobs::util::local_pool::LocalPoolHandle; use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender}; use iroh_blobs::util::SetTagOption; use iroh_blobs::BlobFormat; +use iroh_blobs::{export::ExportProgress, get::db::check_local_with_progress_if_complete}; use iroh_blobs::{ provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, @@ -1201,6 +1201,9 @@ async fn download_direct_from_nodes( where D: BaoStore, { + if check_local_with_progress_if_complete(db, &hash_and_format, Some(progress.clone())).await? { + return Ok(Default::default()); + } let mut last_err = None; for node in nodes { let node_id = node.node_id; @@ -1226,7 +1229,7 @@ where } match last_err { Some(err) => Err(err), - None => Err(anyhow!("No nodes to download from provided")) + None => Err(anyhow!("No nodes to download from provided")), } } From ccf15d40b2df2655261c7885ff2a9381b667f5f8 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 2 Aug 2024 16:22:19 +0200 Subject: [PATCH 5/7] tests: add test for downloading existing collection --- iroh/src/client/blobs.rs | 79 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 6ccd148cb3..9594b6e9e3 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -944,6 +944,8 @@ mod tests { use super::*; use anyhow::Context as _; + use iroh_blobs::hashseq::HashSeq; + use iroh_net::NodeId; use rand::RngCore; use tokio::io::AsyncWriteExt; @@ -1355,4 +1357,81 @@ mod tests { Ok(()) } + + /// Download a existing collection. Check that things succeed and no download is performed. + #[tokio::test] + async fn test_blob_get_existing_collection() -> Result<()> { + let _guard = iroh_test::logging::setup(); + + let node = crate::node::Node::memory().spawn().await?; + // We use a nonexisting node id because we just want to check that this succeeds without + // hitting the network. + let node_id = NodeId::from_bytes(&[0u8; 32])?; + let client = node.client(); + + let mut collection = Collection::default(); + let mut tags = Vec::new(); + let mut size = 0; + for value in ["f", "fo", "foo"] { + let import_outcome = client.blobs().add_bytes(value).await.context("add bytes")?; + collection.push(value.to_string(), import_outcome.hash); + tags.push(import_outcome.tag); + size += import_outcome.size; + } + + let (hash, _tag) = client + .blobs() + .create_collection(collection, SetTagOption::Auto, tags) + .await?; + + // load the hashseq and collection header manually to calculate our expected size + let hashseq_bytes = client.blobs().read_to_bytes(hash).await?; + size += hashseq_bytes.len() as u64; + let hashseq = HashSeq::try_from(hashseq_bytes)?; + let collection_header_bytes = client + .blobs() + .read_to_bytes(hashseq.into_iter().next().expect("header to exist")) + .await?; + size += collection_header_bytes.len() as u64; + + // Direct + let res = client + .blobs() + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::HashSeq, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Direct, + }, + ) + .await? + .await + .context("direct (download)")?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + // Queued + let res = client + .blobs() + .download_with_opts( + hash, + DownloadOptions { + format: BlobFormat::HashSeq, + nodes: vec![node_id.into()], + tag: SetTagOption::Auto, + mode: DownloadMode::Queued, + }, + ) + .await? + .await + .context("queued")?; + + assert_eq!(res.local_size, size); + assert_eq!(res.downloaded_size, 0); + + Ok(()) + } } From d0bdb8ede9a828ca105b228c1418899e6b852f3c Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 2 Aug 2024 16:33:47 +0200 Subject: [PATCH 6/7] tests: use TestResult --- iroh/src/client/blobs.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 9594b6e9e3..7620c4178d 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -948,6 +948,7 @@ mod tests { use iroh_net::NodeId; use rand::RngCore; use tokio::io::AsyncWriteExt; + use testresult::TestResult; #[tokio::test] async fn test_blob_create_collection() -> Result<()> { @@ -1253,7 +1254,7 @@ mod tests { /// Download a existing blob from oneself #[tokio::test] - async fn test_blob_get_self_existing() -> Result<()> { + async fn test_blob_get_self_existing() -> TestResult<()> { let _guard = iroh_test::logging::setup(); let node = crate::node::Node::memory().spawn().await?; @@ -1261,7 +1262,7 @@ mod tests { let client = node.client(); let AddOutcome { hash, size, .. } = - client.blobs().add_bytes("foo").await.context("add bytes")?; + client.blobs().add_bytes("foo").await?; // Direct let res = client @@ -1276,8 +1277,7 @@ mod tests { }, ) .await? - .await - .context("direct (download)")?; + .await?; assert_eq!(res.local_size, size); assert_eq!(res.downloaded_size, 0); @@ -1295,8 +1295,7 @@ mod tests { }, ) .await? - .await - .context("queued")?; + .await?; assert_eq!(res.local_size, size); assert_eq!(res.downloaded_size, 0); @@ -1306,7 +1305,7 @@ mod tests { /// Download a missing blob from oneself #[tokio::test] - async fn test_blob_get_self_missing() -> Result<()> { + async fn test_blob_get_self_missing() -> TestResult<()> { let _guard = iroh_test::logging::setup(); let node = crate::node::Node::memory().spawn().await?; @@ -1360,7 +1359,7 @@ mod tests { /// Download a existing collection. Check that things succeed and no download is performed. #[tokio::test] - async fn test_blob_get_existing_collection() -> Result<()> { + async fn test_blob_get_existing_collection() -> TestResult<()> { let _guard = iroh_test::logging::setup(); let node = crate::node::Node::memory().spawn().await?; From be28e29c28e7e227aed3ecc763cfed384c20778d Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 2 Aug 2024 16:36:54 +0200 Subject: [PATCH 7/7] chore: make typechecker happy & fmt --- iroh/src/client/blobs.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 7620c4178d..04e544e8b1 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -947,8 +947,8 @@ mod tests { use iroh_blobs::hashseq::HashSeq; use iroh_net::NodeId; use rand::RngCore; - use tokio::io::AsyncWriteExt; use testresult::TestResult; + use tokio::io::AsyncWriteExt; #[tokio::test] async fn test_blob_create_collection() -> Result<()> { @@ -1261,8 +1261,7 @@ mod tests { let node_id = node.node_id(); let client = node.client(); - let AddOutcome { hash, size, .. } = - client.blobs().add_bytes("foo").await?; + let AddOutcome { hash, size, .. } = client.blobs().add_bytes("foo").await?; // Direct let res = client @@ -1371,7 +1370,7 @@ mod tests { let mut collection = Collection::default(); let mut tags = Vec::new(); let mut size = 0; - for value in ["f", "fo", "foo"] { + for value in ["iroh", "is", "cool"] { let import_outcome = client.blobs().add_bytes(value).await.context("add bytes")?; collection.push(value.to_string(), import_outcome.hash); tags.push(import_outcome.tag);