Skip to content

Commit

Permalink
Remove most uses of flume in iroh itself
Browse files Browse the repository at this point in the history
The rest requires the docs flume purge PR to be merged.
  • Loading branch information
rklaehn committed Jul 24, 2024
1 parent 407ff53 commit 31bb997
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions iroh-blobs/src/downloader/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use parking_lot::Mutex;

use crate::{
get::{db::DownloadProgress, progress::TransferState},
util::progress::{FlumeProgressSender, IdGenerator, ProgressSendError, ProgressSender},
util::progress::{AsyncChannelProgressSender, IdGenerator, ProgressSendError, ProgressSender},
};

use super::DownloadKind;

/// The channel that can be used to subscribe to progress updates.
pub type ProgressSubscriber = FlumeProgressSender<DownloadProgress>;
pub type ProgressSubscriber = AsyncChannelProgressSender<DownloadProgress>;

/// Track the progress of downloads.
///
Expand Down
26 changes: 13 additions & 13 deletions iroh-blobs/src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
get::{db::BlobId, progress::TransferState},
util::{
local_pool::LocalPool,
progress::{FlumeProgressSender, IdGenerator},
progress::{AsyncChannelProgressSender, IdGenerator},
},
};

Expand Down Expand Up @@ -276,13 +276,13 @@ async fn concurrent_progress() {
let hash = Hash::new([0u8; 32]);
let kind_1 = HashAndFormat::raw(hash);

let (prog_a_tx, prog_a_rx) = flume::bounded(64);
let prog_a_tx = FlumeProgressSender::new(prog_a_tx);
let (prog_a_tx, prog_a_rx) = async_channel::bounded(64);
let prog_a_tx = AsyncChannelProgressSender::new(prog_a_tx);
let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_a_tx);
let handle_a = downloader.queue(req).await;

let (prog_b_tx, prog_b_rx) = flume::bounded(64);
let prog_b_tx = FlumeProgressSender::new(prog_b_tx);
let (prog_b_tx, prog_b_rx) = async_channel::bounded(64);
let prog_b_tx = AsyncChannelProgressSender::new(prog_b_tx);
let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_b_tx);
let handle_b = downloader.queue(req).await;

Expand All @@ -292,21 +292,21 @@ async fn concurrent_progress() {
let mut state_b = TransferState::new(hash);
let mut state_c = TransferState::new(hash);

let prog1_a = prog_a_rx.recv_async().await.unwrap();
let prog1_b = prog_b_rx.recv_async().await.unwrap();
let prog1_a = prog_a_rx.recv().await.unwrap();
let prog1_b = prog_b_rx.recv().await.unwrap();
assert!(matches!(prog1_a, DownloadProgress::Found { hash, size: 100, ..} if hash == hash));
assert!(matches!(prog1_b, DownloadProgress::Found { hash, size: 100, ..} if hash == hash));

state_a.on_progress(prog1_a);
state_b.on_progress(prog1_b);
assert_eq!(state_a, state_b);

let (prog_c_tx, prog_c_rx) = flume::bounded(64);
let prog_c_tx = FlumeProgressSender::new(prog_c_tx);
let (prog_c_tx, prog_c_rx) = async_channel::bounded(64);
let prog_c_tx = AsyncChannelProgressSender::new(prog_c_tx);
let req = DownloadRequest::new(kind_1, vec![peer]).progress_sender(prog_c_tx);
let handle_c = downloader.queue(req).await;

let prog1_c = prog_c_rx.recv_async().await.unwrap();
let prog1_c = prog_c_rx.recv().await.unwrap();
assert!(matches!(&prog1_c, DownloadProgress::InitialState(state) if state == &state_a));
state_c.on_progress(prog1_c);

Expand All @@ -317,9 +317,9 @@ async fn concurrent_progress() {
res_b.unwrap();
res_c.unwrap();

let prog_a: Vec<_> = prog_a_rx.into_stream().collect().await;
let prog_b: Vec<_> = prog_b_rx.into_stream().collect().await;
let prog_c: Vec<_> = prog_c_rx.into_stream().collect().await;
let prog_a: Vec<_> = prog_a_rx.collect().await;
let prog_b: Vec<_> = prog_b_rx.collect().await;
let prog_c: Vec<_> = prog_c_rx.collect().await;

assert_eq!(prog_a.len(), 1);
assert_eq!(prog_b.len(), 1);
Expand Down
1 change: 1 addition & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing = "0.1"
walkdir = "2"

# Examples
async-channel = "2.3.1"
clap = { version = "4", features = ["derive"], optional = true }
indicatif = { version = "0.17", features = ["tokio"], optional = true }
ref-cast = "1.0.23"
Expand Down
Loading

0 comments on commit 31bb997

Please sign in to comment.