From 31bb997951b57eac5df2c9e40dd303c830c913cb Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 24 Jul 2024 15:27:13 +0300 Subject: [PATCH] Remove most uses of flume in iroh itself The rest requires the docs flume purge PR to be merged. --- Cargo.lock | 1 + iroh-blobs/src/downloader/progress.rs | 4 +- iroh-blobs/src/downloader/test.rs | 26 +++---- iroh/Cargo.toml | 1 + iroh/src/node/rpc.rs | 99 +++++++++++++-------------- iroh/tests/gc.rs | 23 ++++--- 6 files changed, 78 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb970b9915..7f2bbb25cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2478,6 +2478,7 @@ name = "iroh" version = "0.21.0" dependencies = [ "anyhow", + "async-channel", "bao-tree", "bytes", "clap", diff --git a/iroh-blobs/src/downloader/progress.rs b/iroh-blobs/src/downloader/progress.rs index 8a0114dda2..eac80985d5 100644 --- a/iroh-blobs/src/downloader/progress.rs +++ b/iroh-blobs/src/downloader/progress.rs @@ -11,13 +11,13 @@ use parking_lot::Mutex; use crate::{ get::{db::DownloadProgress, progress::TransferState}, - util::progress::{FlumeProgressSender, IdGenerator, ProgressSendError, ProgressSender}, + util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender}, }; use super::DownloadKind; /// The channel that can be used to subscribe to progress updates. -pub type ProgressSubscriber = FlumeProgressSender; +pub type ProgressSubscriber = AsyncChannelProgressSender; /// Track the progress of downloads. /// diff --git a/iroh-blobs/src/downloader/test.rs b/iroh-blobs/src/downloader/test.rs index 871b835ba7..2e734eaf3b 100644 --- a/iroh-blobs/src/downloader/test.rs +++ b/iroh-blobs/src/downloader/test.rs @@ -12,7 +12,7 @@ use crate::{ get::{db::BlobId, progress::TransferState}, util::{ local_pool::LocalPool, - progress::{FlumeProgressSender, IdGenerator}, + progress::{AsyncChannelProgressSender, IdGenerator}, }, }; @@ -276,13 +276,13 @@ async fn concurrent_progress() { let hash = Hash::new([0u8; 32]); let kind_1 = HashAndFormat::raw(hash); - let (prog_a_tx, prog_a_rx) = flume::bounded(64); - let prog_a_tx = FlumeProgressSender::new(prog_a_tx); + let (prog_a_tx, prog_a_rx) = async_channel::bounded(64); + let prog_a_tx = AsyncChannelProgressSender::new(prog_a_tx); let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_a_tx); let handle_a = downloader.queue(req).await; - let (prog_b_tx, prog_b_rx) = flume::bounded(64); - let prog_b_tx = FlumeProgressSender::new(prog_b_tx); + let (prog_b_tx, prog_b_rx) = async_channel::bounded(64); + let prog_b_tx = AsyncChannelProgressSender::new(prog_b_tx); let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_b_tx); let handle_b = downloader.queue(req).await; @@ -292,8 +292,8 @@ async fn concurrent_progress() { let mut state_b = TransferState::new(hash); let mut state_c = TransferState::new(hash); - let prog1_a = prog_a_rx.recv_async().await.unwrap(); - let prog1_b = prog_b_rx.recv_async().await.unwrap(); + let prog1_a = prog_a_rx.recv().await.unwrap(); + let prog1_b = prog_b_rx.recv().await.unwrap(); assert!(matches!(prog1_a, DownloadProgress::Found { hash, size: 100, ..} if hash == hash)); assert!(matches!(prog1_b, DownloadProgress::Found { hash, size: 100, ..} if hash == hash)); @@ -301,12 +301,12 @@ async fn concurrent_progress() { state_b.on_progress(prog1_b); assert_eq!(state_a, state_b); - let (prog_c_tx, prog_c_rx) = flume::bounded(64); - let prog_c_tx = FlumeProgressSender::new(prog_c_tx); + let (prog_c_tx, prog_c_rx) = async_channel::bounded(64); + let prog_c_tx = AsyncChannelProgressSender::new(prog_c_tx); let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_c_tx); let handle_c = downloader.queue(req).await; - let prog1_c = prog_c_rx.recv_async().await.unwrap(); + let prog1_c = prog_c_rx.recv().await.unwrap(); assert!(matches!(&prog1_c, DownloadProgress::InitialState(state) if state == &state_a)); state_c.on_progress(prog1_c); @@ -317,9 +317,9 @@ async fn concurrent_progress() { res_b.unwrap(); res_c.unwrap(); - let prog_a: Vec<_> = prog_a_rx.into_stream().collect().await; - let prog_b: Vec<_> = prog_b_rx.into_stream().collect().await; - let prog_c: Vec<_> = prog_c_rx.into_stream().collect().await; + let prog_a: Vec<_> = prog_a_rx.collect().await; + let prog_b: Vec<_> = prog_b_rx.collect().await; + let prog_c: Vec<_> = prog_c_rx.collect().await; assert_eq!(prog_a.len(), 1); assert_eq!(prog_b.len(), 1); diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 4ec069fe30..f10a3b9029 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -53,6 +53,7 @@ tracing = "0.1" walkdir = "2" # Examples +async-channel = "2.3.1" clap = { version = "4", features = ["derive"], optional = true } indicatif = { version = "0.17", features = ["tokio"], optional = true } ref-cast = "1.0.23" diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 0796a0d86e..467e91d402 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -15,13 +15,12 @@ use iroh_blobs::get::db::DownloadProgress; use iroh_blobs::get::Stats; use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry}; use iroh_blobs::util::local_pool::LocalPoolHandle; -use iroh_blobs::util::progress::ProgressSender; +use iroh_blobs::util::progress::{AsyncChannelProgressSender, ProgressSender}; use iroh_blobs::util::SetTagOption; use iroh_blobs::BlobFormat; use iroh_blobs::{ provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, - util::progress::FlumeProgressSender, HashAndFormat, }; use iroh_io::AsyncSliceReader; @@ -527,18 +526,18 @@ impl Handler { self, msg: ValidateRequest, ) -> impl Stream + Send + 'static { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); let tx2 = tx.clone(); let db = self.inner.db.clone(); tokio::task::spawn(async move { if let Err(e) = db - .validate(msg.repair, FlumeProgressSender::new(tx).boxed()) + .validate(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) .await { - tx2.send_async(ValidateProgress::Abort(e.into())).await.ok(); + tx2.send(ValidateProgress::Abort(e.into())).await.ok(); } }); - rx.into_stream() + rx } /// Invoke validate on the database and stream out the result @@ -546,59 +545,59 @@ impl Handler { self, msg: ConsistencyCheckRequest, ) -> impl Stream + Send + 'static { - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); let tx2 = tx.clone(); let db = self.inner.db.clone(); tokio::task::spawn(async move { if let Err(e) = db - .consistency_check(msg.repair, FlumeProgressSender::new(tx).boxed()) + .consistency_check(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) .await { - tx2.send_async(ConsistencyCheckProgress::Abort(e.into())) + tx2.send(ConsistencyCheckProgress::Abort(e.into())) .await .ok(); } }); - rx.into_stream() + rx } fn blob_add_from_path(self, msg: AddPathRequest) -> impl Stream { // provide a little buffer so that we don't slow down the sender - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.blob_add_from_path0(msg, tx).await { - tx2.send_async(AddProgress::Abort(e.into())).await.ok(); + tx2.send(AddProgress::Abort(e.into())).await.ok(); } }); - rx.into_stream().map(AddPathResponse) + rx.map(AddPathResponse) } fn doc_import_file(self, msg: ImportFileRequest) -> impl Stream { // provide a little buffer so that we don't slow down the sender - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.doc_import_file0(msg, tx).await { - tx2.send_async(crate::client::docs::ImportProgress::Abort(e.into())) + tx2.send(crate::client::docs::ImportProgress::Abort(e.into())) .await .ok(); } }); - rx.into_stream().map(ImportFileResponse) + rx.map(ImportFileResponse) } async fn doc_import_file0( self, msg: ImportFileRequest, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let names = Arc::new(Mutex::new(BTreeMap::new())); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { @@ -660,23 +659,23 @@ impl Handler { } fn doc_export_file(self, msg: ExportFileRequest) -> impl Stream { - let (tx, rx) = flume::bounded(1024); + let (tx, rx) = async_channel::bounded(1024); let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.doc_export_file0(msg, tx).await { - tx2.send_async(ExportProgress::Abort(e.into())).await.ok(); + tx2.send(ExportProgress::Abort(e.into())).await.ok(); } }); - rx.into_stream().map(ExportFileResponse) + rx.map(ExportFileResponse) } async fn doc_export_file0( self, msg: ExportFileRequest, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let ExportFileRequest { entry, path, mode } = msg; let key = bytes::Bytes::from(entry.key().to_vec()); let export_progress = progress.clone().with_map(move |mut x| { @@ -700,11 +699,11 @@ impl Handler { } fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream { - let (sender, receiver) = flume::bounded(1024); + let (sender, receiver) = async_channel::bounded(1024); let db = self.inner.db.clone(); let downloader = self.inner.downloader.clone(); let endpoint = self.inner.endpoint.clone(); - let progress = FlumeProgressSender::new(sender); + let progress = AsyncChannelProgressSender::new(sender); self.local_pool_handle().spawn_detached(move || async move { if let Err(err) = download(&db, endpoint, &downloader, msg, progress.clone()).await { progress @@ -714,12 +713,12 @@ impl Handler { } }); - receiver.into_stream().map(DownloadResponse) + receiver.map(DownloadResponse) } fn blob_export(self, msg: ExportRequest) -> impl Stream { - let (tx, rx) = flume::bounded(1024); - let progress = FlumeProgressSender::new(tx); + let (tx, rx) = async_channel::bounded(1024); + let progress = AsyncChannelProgressSender::new(tx); self.local_pool_handle().spawn_detached(move || async move { let res = iroh_blobs::export::export( &self.inner.db, @@ -735,18 +734,18 @@ impl Handler { Err(err) => progress.send(ExportProgress::Abort(err.into())).await.ok(), }; }); - rx.into_stream().map(ExportResponse) + rx.map(ExportResponse) } async fn blob_add_from_path0( self, msg: AddPathRequest, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let names = Arc::new(Mutex::new(BTreeMap::new())); // convert import progress to provide progress let import_progress = progress.clone().with_filter_map(move |x| match x { @@ -923,25 +922,25 @@ impl Handler { msg: AddStreamRequest, stream: impl Stream + Send + Unpin + 'static, ) -> impl Stream { - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let this = self.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { - tx.send_async(AddProgress::Abort(err.into())).await.ok(); + tx.send(AddProgress::Abort(err.into())).await.ok(); } }); - rx.into_stream().map(AddStreamResponse) + rx.map(AddStreamResponse) } async fn blob_add_stream0( self, msg: AddStreamRequest, stream: impl Stream + Send + Unpin + 'static, - progress: flume::Sender, + progress: async_channel::Sender, ) -> anyhow::Result<()> { - let progress = FlumeProgressSender::new(progress); + let progress = AsyncChannelProgressSender::new(progress); let stream = stream.map(|item| match item { AddStreamUpdate::Chunk(chunk) => Ok(chunk), @@ -993,24 +992,24 @@ impl Handler { self, req: ReadAtRequest, ) -> impl Stream> + Send + 'static { - let (tx, rx) = flume::bounded(RPC_BLOB_GET_CHANNEL_CAP); + let (tx, rx) = async_channel::bounded(RPC_BLOB_GET_CHANNEL_CAP); let db = self.inner.db.clone(); self.local_pool_handle().spawn_detached(move || async move { if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send_async(RpcResult::Err(err.into())).await.ok(); + tx.send(RpcResult::Err(err.into())).await.ok(); } }); async fn read_loop( req: ReadAtRequest, db: D, - tx: flume::Sender>, + tx: async_channel::Sender>, max_chunk_size: usize, ) -> anyhow::Result<()> { let entry = db.get(&req.hash).await?; let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; let size = entry.size(); - tx.send_async(Ok(ReadAtResponse::Entry { + tx.send(Ok(ReadAtResponse::Entry { size, is_complete: entry.is_complete(), })) @@ -1037,7 +1036,7 @@ impl Handler { let chunk = reader.read_at(req.offset + read, chunk_size).await?; let chunk_len = chunk.len(); if !chunk.is_empty() { - tx.send_async(Ok(ReadAtResponse::Data { chunk })).await?; + tx.send(Ok(ReadAtResponse::Data { chunk })).await?; } if chunk_len < chunk_size { break; @@ -1048,7 +1047,7 @@ impl Handler { Ok(()) } - rx.into_stream() + rx } fn node_connections( @@ -1056,17 +1055,15 @@ impl Handler { _: ConnectionsRequest, ) -> impl Stream> + Send + 'static { // provide a little buffer so that we don't slow down the sender - let (tx, rx) = flume::bounded(32); + let (tx, rx) = async_channel::bounded(32); let mut conn_infos = self.inner.endpoint.connection_infos(); conn_infos.sort_by_key(|n| n.node_id.to_string()); self.local_pool_handle().spawn_detached(|| async move { for conn_info in conn_infos { - tx.send_async(Ok(ConnectionsResponse { conn_info })) - .await - .ok(); + tx.send(Ok(ConnectionsResponse { conn_info })).await.ok(); } }); - rx.into_stream() + rx } // This method is called as an RPC method, which have to be async @@ -1125,7 +1122,7 @@ async fn download( endpoint: Endpoint, downloader: &Downloader, req: BlobDownloadRequest, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result<()> where D: iroh_blobs::store::Store, @@ -1175,7 +1172,7 @@ async fn download_queued( downloader: &Downloader, hash_and_format: HashAndFormat, nodes: Vec, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result { let mut node_ids = Vec::with_capacity(nodes.len()); let mut any_added = false; @@ -1199,7 +1196,7 @@ async fn download_direct_from_nodes( endpoint: Endpoint, hash_and_format: HashAndFormat, nodes: Vec, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result where D: BaoStore, @@ -1232,7 +1229,7 @@ async fn download_direct( endpoint: Endpoint, hash_and_format: HashAndFormat, node: NodeAddr, - progress: FlumeProgressSender, + progress: AsyncChannelProgressSender, ) -> Result where D: BaoStore, diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index e032691df9..83a21f8d7f 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -37,11 +37,14 @@ pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { } /// Wrap a bao store in a node that has gc enabled. -async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume::Receiver<()>) +async fn wrap_in_node( + bao_store: S, + gc_period: Duration, +) -> (Node, async_channel::Receiver<()>) where S: iroh_blobs::store::Store, { - let (gc_send, gc_recv) = flume::unbounded(); + let (gc_send, gc_recv) = async_channel::unbounded(); let node = node::Builder::with_db_and_store( bao_store, DocsStorage::Memory, @@ -49,7 +52,7 @@ where ) .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) .register_gc_done_cb(Box::new(move || { - gc_send.send(()).ok(); + gc_send.send_blocking(()).ok(); })) .spawn() .await @@ -60,19 +63,19 @@ where async fn gc_test_node() -> ( Node, iroh_blobs::store::mem::Store, - flume::Receiver<()>, + async_channel::Receiver<()>, ) { let bao_store = iroh_blobs::store::mem::Store::new(); let (node, gc_recv) = wrap_in_node(bao_store.clone(), Duration::from_millis(500)).await; (node, bao_store, gc_recv) } -async fn step(evs: &flume::Receiver<()>) { +async fn step(evs: &async_channel::Receiver<()>) { // drain the event queue, we want a new GC while evs.try_recv().is_ok() {} // wait for several GC cycles for _ in 0..3 { - evs.recv_async().await.unwrap(); + evs.recv().await.unwrap(); } } @@ -191,7 +194,7 @@ mod file { use iroh_blobs::{ store::{BaoBatchWriter, ConsistencyCheckProgress, Map, MapEntryMut, ReportLevel}, - util::progress::{FlumeProgressSender, ProgressSender as _}, + util::progress::{AsyncChannelProgressSender, ProgressSender as _}, TempTag, }; use tokio::io::AsyncReadExt; @@ -212,16 +215,16 @@ mod file { async fn check_consistency(store: &impl Store) -> anyhow::Result { let mut max_level = ReportLevel::Trace; - let (tx, rx) = flume::bounded(1); + let (tx, rx) = async_channel::bounded(1); let task = tokio::task::spawn(async move { - while let Ok(ev) = rx.recv_async().await { + while let Ok(ev) = rx.recv().await { if let ConsistencyCheckProgress::Update { level, .. } = &ev { max_level = max_level.max(*level); } } }); store - .consistency_check(false, FlumeProgressSender::new(tx).boxed()) + .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) .await?; task.await?; Ok(max_level)