Skip to content

Commit 43a8bac

Browse files
committed
replace flume in iroh-net with async_channel
Rationale: see #2536
1 parent 9052905 commit 43a8bac

File tree

5 files changed

+15
-15
lines changed

5 files changed

+15
-15
lines changed

iroh-net/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ workspace = true
1717

1818
[dependencies]
1919
anyhow = { version = "1" }
20+
async-channel = "2.3.1"
2021
base64 = "0.22.1"
2122
backoff = "0.4.0"
2223
bytes = "1"
2324
netdev = "0.30.0"
2425
der = { version = "0.7", features = ["alloc", "derive"] }
2526
derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] }
26-
flume = "0.11"
2727
futures-buffered = "0.2.4"
2828
futures-concurrency = "7.6.0"
2929
futures-lite = "2.3"

iroh-net/src/magicsock.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pub(crate) struct MagicSock {
177177
proxy_url: Option<Url>,
178178

179179
/// Used for receiving relay messages.
180-
relay_recv_receiver: flume::Receiver<RelayRecvResult>,
180+
relay_recv_receiver: async_channel::Receiver<RelayRecvResult>,
181181
/// Stores wakers, to be called when relay_recv_ch receives new data.
182182
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
183183
network_send_wakers: parking_lot::Mutex<Option<Waker>>,
@@ -786,11 +786,11 @@ impl MagicSock {
786786
break;
787787
}
788788
match self.relay_recv_receiver.try_recv() {
789-
Err(flume::TryRecvError::Empty) => {
789+
Err(async_channel::TryRecvError::Empty) => {
790790
self.network_recv_wakers.lock().replace(cx.waker().clone());
791791
break;
792792
}
793-
Err(flume::TryRecvError::Disconnected) => {
793+
Err(async_channel::TryRecvError::Closed) => {
794794
return Poll::Ready(Err(io::Error::new(
795795
io::ErrorKind::NotConnected,
796796
"connection closed",
@@ -1375,7 +1375,7 @@ impl Handle {
13751375
insecure_skip_relay_cert_verify,
13761376
} = opts;
13771377

1378-
let (relay_recv_sender, relay_recv_receiver) = flume::bounded(128);
1378+
let (relay_recv_sender, relay_recv_receiver) = async_channel::bounded(128);
13791379

13801380
let (pconn4, pconn6) = bind(port)?;
13811381
let port = pconn4.port();
@@ -1701,7 +1701,7 @@ struct Actor {
17011701
relay_actor_sender: mpsc::Sender<RelayActorMessage>,
17021702
relay_actor_cancel_token: CancellationToken,
17031703
/// Channel to send received relay messages on, for processing.
1704-
relay_recv_sender: flume::Sender<RelayRecvResult>,
1704+
relay_recv_sender: async_channel::Sender<RelayRecvResult>,
17051705
/// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun.
17061706
periodic_re_stun_timer: time::Interval,
17071707
/// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc.
@@ -1855,7 +1855,7 @@ impl Actor {
18551855
let passthroughs = self.process_relay_read_result(read_result);
18561856
for passthrough in passthroughs {
18571857
self.relay_recv_sender
1858-
.send_async(passthrough)
1858+
.send(passthrough)
18591859
.await
18601860
.expect("missing recv sender");
18611861
let mut wakers = self.msock.network_recv_wakers.lock();

iroh-net/src/magicsock/udp_conn.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,15 @@ mod tests {
192192
let (m2, _m2_key) = wrap_socket(m2)?;
193193

194194
let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port());
195-
let (m1_send, m1_recv) = flume::bounded(8);
195+
let (m1_send, m1_recv) = async_channel::bounded(8);
196196

197197
let m1_task = tokio::task::spawn(async move {
198198
if let Some(conn) = m1.accept().await {
199199
let conn = conn.await?;
200200
let (mut send_bi, mut recv_bi) = conn.accept_bi().await?;
201201

202202
let val = recv_bi.read_to_end(usize::MAX).await?;
203-
m1_send.send_async(val).await?;
203+
m1_send.send(val).await?;
204204
send_bi.finish().await?;
205205
}
206206

@@ -220,7 +220,7 @@ mod tests {
220220
drop(send_bi);
221221

222222
// make sure the right values arrived
223-
let val = m1_recv.recv_async().await?;
223+
let val = m1_recv.recv().await?;
224224
assert_eq!(val, b"hello");
225225

226226
m1_task.await??;

iroh-net/src/net/netmon/actor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub(super) struct Actor {
5757
/// OS specific monitor.
5858
#[allow(dead_code)]
5959
route_monitor: RouteMonitor,
60-
mon_receiver: flume::Receiver<NetworkMessage>,
60+
mon_receiver: async_channel::Receiver<NetworkMessage>,
6161
actor_receiver: mpsc::Receiver<ActorMessage>,
6262
actor_sender: mpsc::Sender<ActorMessage>,
6363
/// Callback registry.
@@ -84,7 +84,7 @@ impl Actor {
8484
let wall_time = Instant::now();
8585

8686
// Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries.
87-
let (mon_sender, mon_receiver) = flume::bounded(MON_CHAN_CAPACITY);
87+
let (mon_sender, mon_receiver) = async_channel::bounded(MON_CHAN_CAPACITY);
8888
let route_monitor = RouteMonitor::new(mon_sender)?;
8989
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);
9090

@@ -129,7 +129,7 @@ impl Actor {
129129
debounce_interval.reset_immediately();
130130
}
131131
}
132-
Ok(_event) = self.mon_receiver.recv_async() => {
132+
Ok(_event) = self.mon_receiver.recv() => {
133133
trace!("network activity detected");
134134
last_event.replace(false);
135135
debounce_interval.reset_immediately();

iroh-net/src/net/netmon/bsd.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl Drop for RouteMonitor {
2323
}
2424

2525
impl RouteMonitor {
26-
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
26+
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
2727
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
2828
socket.set_nonblocking(true)?;
2929
let socket_std: std::os::unix::net::UnixStream = socket.into();
@@ -44,7 +44,7 @@ impl RouteMonitor {
4444
) {
4545
Ok(msgs) => {
4646
if contains_interesting_message(&msgs) {
47-
sender.send_async(NetworkMessage::Change).await.ok();
47+
sender.send(NetworkMessage::Change).await.ok();
4848
}
4949
}
5050
Err(err) => {

0 commit comments

Comments
 (0)