Skip to content

Commit

Permalink
Try async_channel instead of flume
Browse files Browse the repository at this point in the history
also remove the backup timer
  • Loading branch information
rklaehn committed Jul 23, 2024
1 parent fb5c6fa commit b9a9a4b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
36 changes: 35 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
async-channel = "2.3.1"
bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
Expand Down
17 changes: 7 additions & 10 deletions iroh-blobs/src/util/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Deref for LocalPool {
#[derive(Debug, Clone)]
pub struct LocalPoolHandle {
/// The sender half of the channel used to send tasks to the pool
send: flume::Sender<Message>,
send: async_channel::Sender<Message>,
}

/// What to do when a panic occurs in a pool thread
Expand Down Expand Up @@ -124,7 +124,7 @@ impl LocalPool {
panic_mode,
} = config;
let cancel_token = CancellationToken::new();
let (send, recv) = flume::unbounded::<Message>();
let (send, recv) = async_channel::unbounded::<Message>();
let shutdown_sem = Arc::new(Semaphore::new(0));
let handle = tokio::runtime::Handle::current();
let handles = (0..threads)
Expand Down Expand Up @@ -159,7 +159,7 @@ impl LocalPool {
/// Spawn a new pool thread.
fn spawn_pool_thread(
thread_name: String,
recv: flume::Receiver<Message>,
recv: async_channel::Receiver<Message>,
cancel_token: CancellationToken,
panic_mode: PanicMode,
shutdown_sem: Arc<Semaphore>,
Expand Down Expand Up @@ -203,7 +203,7 @@ impl LocalPool {
break ShutdownMode::Stop;
},
// if we receive a message, execute it
msg = recv.recv_async() => {
msg = recv.recv() => {
match msg {
// just push into the join set
Ok(Message::Execute(f)) => {
Expand All @@ -216,12 +216,9 @@ impl LocalPool {
break ShutdownMode::Finish;
}
// if the sender is dropped, break the loop immediately
Err(flume::RecvError::Disconnected) => break ShutdownMode::Stop,
Err(async_channel::RecvError) => break ShutdownMode::Stop,
}
},
_ = tokio::time::sleep(std::time::Duration::from_secs(100)) => {
tracing::trace!("Thread {} tick", get_thread_name());
}
}
}
}));
Expand Down Expand Up @@ -291,7 +288,7 @@ impl LocalPool {
// Threads will add a permit in any case, so await_thread_completion
// will then immediately return.
// tracing::trace!("Sending finish message");
let sent = self.send.send(Message::Finish);
let sent = self.send.send_blocking(Message::Finish);
res.push(sent);
// tracing::trace!("Sent finish message {:?}", sent);
}
Expand Down Expand Up @@ -479,7 +476,7 @@ impl LocalPoolHandle {
/// spawn a task in the pool.
pub fn try_spawn_detached_boxed(&self, gen: SpawnFn) -> SpawnResult<()> {
self.send
.send(Message::Execute(gen))
.send_blocking(Message::Execute(gen))
.map_err(|_| SpawnError::Cancelled)
}
}
Expand Down

0 comments on commit b9a9a4b

Please sign in to comment.