Skip to content

Commit 6f60bd8

Browse files
committed
replace flume in iroh-net with async_channel
Rationale: see #2536
1 parent 9052905 commit 6f60bd8

File tree

10 files changed

+37
-37
lines changed

10 files changed

+37
-37
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-net/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ workspace = true
1717

1818
[dependencies]
1919
anyhow = { version = "1" }
20+
async-channel = "2.3.1"
2021
base64 = "0.22.1"
2122
backoff = "0.4.0"
2223
bytes = "1"
2324
netdev = "0.30.0"
2425
der = { version = "0.7", features = ["alloc", "derive"] }
2526
derive_more = { version = "1.0.0-beta.6", features = ["debug", "display", "from", "try_into", "deref"] }
26-
flume = "0.11"
2727
futures-buffered = "0.2.4"
2828
futures-concurrency = "7.6.0"
2929
futures-lite = "2.3"

iroh-net/src/discovery/local_swarm_discovery.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use derive_more::FromStr;
1414
use futures_lite::{stream::Boxed as BoxStream, StreamExt};
1515
use tracing::{debug, error, trace, warn};
1616

17-
use flume::Sender;
17+
use async_channel::Sender;
1818
use iroh_base::key::PublicKey;
1919
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
2020
use tokio::task::JoinSet;
@@ -62,7 +62,7 @@ impl LocalSwarmDiscovery {
6262
/// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime.
6363
pub fn new(node_id: NodeId) -> Result<Self> {
6464
debug!("Creating new LocalSwarmDiscovery service");
65-
let (send, recv) = flume::bounded(64);
65+
let (send, recv) = async_channel::bounded(64);
6666
let task_sender = send.clone();
6767
let rt = tokio::runtime::Handle::current();
6868
let mut guard = Some(LocalSwarmDiscovery::spawn_discoverer(
@@ -80,7 +80,7 @@ impl LocalSwarmDiscovery {
8080
let mut timeouts = JoinSet::new();
8181
loop {
8282
trace!(?node_addrs, "LocalSwarmDiscovery Service loop tick");
83-
let msg = match recv.recv_async().await {
83+
let msg = match recv.recv().await {
8484
Err(err) => {
8585
error!("LocalSwarmDiscovery service error: {err:?}");
8686
error!("closing LocalSwarmDiscovery");
@@ -124,7 +124,7 @@ impl LocalSwarmDiscovery {
124124
for sender in senders.values() {
125125
let item: DiscoveryItem = (&peer_info).into();
126126
trace!(?item, "sending DiscoveryItem");
127-
sender.send_async(Ok(item)).await.ok();
127+
sender.send(Ok(item)).await.ok();
128128
}
129129
}
130130
trace!(
@@ -141,7 +141,7 @@ impl LocalSwarmDiscovery {
141141
if let Some(peer_info) = node_addrs.get(&node_id) {
142142
let item: DiscoveryItem = peer_info.into();
143143
debug!(?item, "sending DiscoveryItem");
144-
sender.send_async(Ok(item)).await.ok();
144+
sender.send(Ok(item)).await.ok();
145145
}
146146
if let Some(senders_for_node_id) = senders.get_mut(&node_id) {
147147
senders_for_node_id.insert(id, sender);
@@ -155,7 +155,7 @@ impl LocalSwarmDiscovery {
155155
tokio::time::sleep(DISCOVERY_DURATION).await;
156156
trace!(?node_id, "discovery timeout");
157157
timeout_sender
158-
.send_async(Message::Timeout(node_id, id))
158+
.send(Message::Timeout(node_id, id))
159159
.await
160160
.ok();
161161
});
@@ -210,7 +210,7 @@ impl LocalSwarmDiscovery {
210210
);
211211

212212
sender
213-
.send(Message::Discovery(node_id.to_string(), peer.clone()))
213+
.send_blocking(Message::Discovery(node_id.to_string(), peer.clone()))
214214
.ok();
215215
};
216216
let mut addrs: HashMap<u16, Vec<IpAddr>> = HashMap::default();
@@ -267,23 +267,23 @@ impl From<&Peer> for DiscoveryItem {
267267

268268
impl Discovery for LocalSwarmDiscovery {
269269
fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option<BoxStream<Result<DiscoveryItem>>> {
270-
let (send, recv) = flume::bounded(20);
270+
let (send, recv) = async_channel::bounded(20);
271271
let discovery_sender = self.sender.clone();
272272
tokio::spawn(async move {
273273
discovery_sender
274-
.send_async(Message::SendAddrs(node_id, send))
274+
.send(Message::SendAddrs(node_id, send))
275275
.await
276276
.ok();
277277
});
278-
Some(recv.into_stream().boxed())
278+
Some(recv.boxed())
279279
}
280280

281281
fn publish(&self, info: &AddrInfo) {
282282
let discovery_sender = self.sender.clone();
283283
let info = info.clone();
284284
tokio::spawn(async move {
285285
discovery_sender
286-
.send_async(Message::ChangeLocalAddrs(info))
286+
.send(Message::ChangeLocalAddrs(info))
287287
.await
288288
.ok();
289289
});

iroh-net/src/magicsock.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ pub(crate) struct MagicSock {
177177
proxy_url: Option<Url>,
178178

179179
/// Used for receiving relay messages.
180-
relay_recv_receiver: flume::Receiver<RelayRecvResult>,
180+
relay_recv_receiver: async_channel::Receiver<RelayRecvResult>,
181181
/// Stores wakers, to be called when relay_recv_ch receives new data.
182182
network_recv_wakers: parking_lot::Mutex<Option<Waker>>,
183183
network_send_wakers: parking_lot::Mutex<Option<Waker>>,
@@ -786,11 +786,11 @@ impl MagicSock {
786786
break;
787787
}
788788
match self.relay_recv_receiver.try_recv() {
789-
Err(flume::TryRecvError::Empty) => {
789+
Err(async_channel::TryRecvError::Empty) => {
790790
self.network_recv_wakers.lock().replace(cx.waker().clone());
791791
break;
792792
}
793-
Err(flume::TryRecvError::Disconnected) => {
793+
Err(async_channel::TryRecvError::Closed) => {
794794
return Poll::Ready(Err(io::Error::new(
795795
io::ErrorKind::NotConnected,
796796
"connection closed",
@@ -1375,7 +1375,7 @@ impl Handle {
13751375
insecure_skip_relay_cert_verify,
13761376
} = opts;
13771377

1378-
let (relay_recv_sender, relay_recv_receiver) = flume::bounded(128);
1378+
let (relay_recv_sender, relay_recv_receiver) = async_channel::bounded(128);
13791379

13801380
let (pconn4, pconn6) = bind(port)?;
13811381
let port = pconn4.port();
@@ -1701,7 +1701,7 @@ struct Actor {
17011701
relay_actor_sender: mpsc::Sender<RelayActorMessage>,
17021702
relay_actor_cancel_token: CancellationToken,
17031703
/// Channel to send received relay messages on, for processing.
1704-
relay_recv_sender: flume::Sender<RelayRecvResult>,
1704+
relay_recv_sender: async_channel::Sender<RelayRecvResult>,
17051705
/// When set, is an AfterFunc timer that will call MagicSock::do_periodic_stun.
17061706
periodic_re_stun_timer: time::Interval,
17071707
/// The `NetInfo` provided in the last call to `net_info_func`. It's used to deduplicate calls to netInfoFunc.
@@ -1855,7 +1855,7 @@ impl Actor {
18551855
let passthroughs = self.process_relay_read_result(read_result);
18561856
for passthrough in passthroughs {
18571857
self.relay_recv_sender
1858-
.send_async(passthrough)
1858+
.send(passthrough)
18591859
.await
18601860
.expect("missing recv sender");
18611861
let mut wakers = self.msock.network_recv_wakers.lock();

iroh-net/src/magicsock/udp_conn.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,15 @@ mod tests {
192192
let (m2, _m2_key) = wrap_socket(m2)?;
193193

194194
let m1_addr = SocketAddr::new(network.local_addr(), m1.local_addr()?.port());
195-
let (m1_send, m1_recv) = flume::bounded(8);
195+
let (m1_send, m1_recv) = async_channel::bounded(8);
196196

197197
let m1_task = tokio::task::spawn(async move {
198198
if let Some(conn) = m1.accept().await {
199199
let conn = conn.await?;
200200
let (mut send_bi, mut recv_bi) = conn.accept_bi().await?;
201201

202202
let val = recv_bi.read_to_end(usize::MAX).await?;
203-
m1_send.send_async(val).await?;
203+
m1_send.send(val).await?;
204204
send_bi.finish().await?;
205205
}
206206

@@ -220,7 +220,7 @@ mod tests {
220220
drop(send_bi);
221221

222222
// make sure the right values arrived
223-
let val = m1_recv.recv_async().await?;
223+
let val = m1_recv.recv().await?;
224224
assert_eq!(val, b"hello");
225225

226226
m1_task.await??;

iroh-net/src/net/netmon/actor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub(super) struct Actor {
5757
/// OS specific monitor.
5858
#[allow(dead_code)]
5959
route_monitor: RouteMonitor,
60-
mon_receiver: flume::Receiver<NetworkMessage>,
60+
mon_receiver: async_channel::Receiver<NetworkMessage>,
6161
actor_receiver: mpsc::Receiver<ActorMessage>,
6262
actor_sender: mpsc::Sender<ActorMessage>,
6363
/// Callback registry.
@@ -84,7 +84,7 @@ impl Actor {
8484
let wall_time = Instant::now();
8585

8686
// Use flume channels, as tokio::mpsc is not safe to use across ffi boundaries.
87-
let (mon_sender, mon_receiver) = flume::bounded(MON_CHAN_CAPACITY);
87+
let (mon_sender, mon_receiver) = async_channel::bounded(MON_CHAN_CAPACITY);
8888
let route_monitor = RouteMonitor::new(mon_sender)?;
8989
let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);
9090

@@ -129,7 +129,7 @@ impl Actor {
129129
debounce_interval.reset_immediately();
130130
}
131131
}
132-
Ok(_event) = self.mon_receiver.recv_async() => {
132+
Ok(_event) = self.mon_receiver.recv() => {
133133
trace!("network activity detected");
134134
last_event.replace(false);
135135
debounce_interval.reset_immediately();

iroh-net/src/net/netmon/android.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use super::actor::NetworkMessage;
66
pub(super) struct RouteMonitor {}
77

88
impl RouteMonitor {
9-
pub(super) fn new(_sender: flume::Sender<NetworkMessage>) -> Result<Self> {
9+
pub(super) fn new(_sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
1010
// Very sad monitor. Android doesn't allow us to do this
1111

1212
Ok(RouteMonitor {})

iroh-net/src/net/netmon/bsd.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl Drop for RouteMonitor {
2323
}
2424

2525
impl RouteMonitor {
26-
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
26+
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
2727
let socket = socket2::Socket::new(libc::AF_ROUTE.into(), socket2::Type::RAW, None)?;
2828
socket.set_nonblocking(true)?;
2929
let socket_std: std::os::unix::net::UnixStream = socket.into();
@@ -44,7 +44,7 @@ impl RouteMonitor {
4444
) {
4545
Ok(msgs) => {
4646
if contains_interesting_message(&msgs) {
47-
sender.send_async(NetworkMessage::Change).await.ok();
47+
sender.send(NetworkMessage::Change).await.ok();
4848
}
4949
}
5050
Err(err) => {

iroh-net/src/net/netmon/linux.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ macro_rules! get_nla {
4949
}
5050

5151
impl RouteMonitor {
52-
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
52+
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
5353
let (mut conn, mut _handle, mut messages) = new_connection()?;
5454

5555
// Specify flags to listen on.
@@ -87,7 +87,7 @@ impl RouteMonitor {
8787
continue;
8888
} else {
8989
addrs.insert(addr.clone());
90-
sender.send_async(NetworkMessage::Change).await.ok();
90+
sender.send(NetworkMessage::Change).await.ok();
9191
}
9292
}
9393
}
@@ -97,7 +97,7 @@ impl RouteMonitor {
9797
if let Some(addr) = get_nla!(msg, address::Nla::Address) {
9898
addrs.remove(addr);
9999
}
100-
sender.send_async(NetworkMessage::Change).await.ok();
100+
sender.send(NetworkMessage::Change).await.ok();
101101
}
102102
RtnlMessage::NewRoute(msg) | RtnlMessage::DelRoute(msg) => {
103103
trace!("ROUTE:: {:?}", msg);
@@ -124,15 +124,15 @@ impl RouteMonitor {
124124
}
125125
}
126126
}
127-
sender.send_async(NetworkMessage::Change).await.ok();
127+
sender.send(NetworkMessage::Change).await.ok();
128128
}
129129
RtnlMessage::NewRule(msg) => {
130130
trace!("NEWRULE: {:?}", msg);
131-
sender.send_async(NetworkMessage::Change).await.ok();
131+
sender.send(NetworkMessage::Change).await.ok();
132132
}
133133
RtnlMessage::DelRule(msg) => {
134134
trace!("DELRULE: {:?}", msg);
135-
sender.send_async(NetworkMessage::Change).await.ok();
135+
sender.send(NetworkMessage::Change).await.ok();
136136
}
137137
RtnlMessage::NewLink(msg) => {
138138
trace!("NEWLINK: {:?}", msg);

iroh-net/src/net/netmon/windows.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@ pub(super) struct RouteMonitor {
1919
}
2020

2121
impl RouteMonitor {
22-
pub(super) fn new(sender: flume::Sender<NetworkMessage>) -> Result<Self> {
22+
pub(super) fn new(sender: async_channel::Sender<NetworkMessage>) -> Result<Self> {
2323
// Register two callbacks with the windows api
2424
let mut cb_handler = CallbackHandler::default();
2525

2626
// 1. Unicast Address Changes
2727
let s = sender.clone();
2828
cb_handler.register_unicast_address_change_callback(Box::new(move || {
29-
if let Err(err) = s.send(NetworkMessage::Change) {
29+
if let Err(err) = s.send_blocking(NetworkMessage::Change) {
3030
warn!("unable to send: unicast change notification: {:?}", err);
3131
}
3232
}))?;
3333

3434
// 2. Route Changes
3535
cb_handler.register_route_change_callback(Box::new(move || {
36-
if let Err(err) = sender.send(NetworkMessage::Change) {
36+
if let Err(err) = sender.send_blocking(NetworkMessage::Change) {
3737
warn!("unable to send: route change notification: {:?}", err);
3838
}
3939
}))?;

0 commit comments

Comments
 (0)