Skip to content

Commit b91b684

Browse files
refactor!: avoid using futures crate directly (#2117)
Due to the low maintenance and absurd high amount of `unsafe` code in parts of the `futures` crate, we are trying to avoid usage of it. Usages are replaced with - `futures-lite` : general `Future` and `Stream` tooling - `futures-sink`: `Sink` trait - `futures-buffered`: faster and safer version of `Futures{Un}Ordered` - If must be `futures-util` for sink specific things that are missing ## Breaking Changes - `iroh::node::Node` does not implement `Future` anymore - `iroh::node::Node::shutdown()` is now `async` and can be awaited upon to wait for the node to exit - `iroh_net::util::AbortingJoinHandle`s inner field is not public anymore, use the `From<JoinHandle>` implementation to contruct it ## Followups - [x] Apply this to `bao-tree`: n0-computer/bao-tree#49 - [x] Apply this to `iroh-io`: n0-computer/iroh-io#6 - [x] Apply this to `quic-rpc`: n0-computer/quic-rpc#73 --------- Co-authored-by: Ruediger Klaehn <rklaehn@protonmail.com>
1 parent a26a350 commit b91b684

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

87 files changed

+607
-533
lines changed

Cargo.lock

Lines changed: 164 additions & 121 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-bytes/Cargo.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ bytes = { version = "1.4", features = ["serde"] }
2222
chrono = "0.4.31"
2323
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] }
2424
flume = "0.11"
25-
futures = "0.3.25"
2625
futures-buffered = "0.2.4"
26+
futures-lite = "2.3"
2727
genawaiter = { version = "0.99.1", features = ["futures03"] }
2828
hashlink = { version = "0.9.0", optional = true }
2929
hex = "0.4.3"
@@ -54,6 +54,7 @@ tracing-futures = "0.2.5"
5454
http-body = "0.4.5"
5555
iroh-bytes = { path = ".", features = ["downloader"] }
5656
iroh-test = { path = "../iroh-test" }
57+
futures-buffered = "0.2.4"
5758
proptest = "1.0.0"
5859
serde_json = "1.0.107"
5960
serde_test = "1.0.176"
@@ -62,12 +63,14 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
6263
rcgen = "0.12.0"
6364
rustls = { version = "0.21.11", default-features = false, features = ["quic"] }
6465
tempfile = "3.10.0"
66+
futures-util = "0.3.30"
6567

6668
[features]
6769
default = ["fs-store"]
68-
downloader = ["iroh-net", "parking_lot", "tokio-util/time", "hashlink"]
69-
fs-store = ["reflink-copy", "redb", "redb_v1", "tempfile"]
70-
metrics = ["iroh-metrics"]
70+
downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"]
71+
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
72+
metrics = ["dep:iroh-metrics"]
73+
redb = ["dep:redb"]
7174

7275
[[example]]
7376
name = "provide-bytes"

iroh-bytes/examples/fetch-stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::io;
1212

1313
use bao_tree::io::fsm::BaoContentItem;
1414
use bytes::Bytes;
15-
use futures::{Stream, StreamExt};
15+
use futures_lite::{Stream, StreamExt};
1616
use genawaiter::sync::Co;
1717
use genawaiter::sync::Gen;
1818
use tokio::io::AsyncWriteExt;

iroh-bytes/examples/provide-bytes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ async fn main() -> Result<()> {
110110
#[derive(Clone)]
111111
struct MockEventSender;
112112

113-
use futures::future::FutureExt;
113+
use futures_lite::future::FutureExt;
114114

115115
impl iroh_bytes::provider::EventSender for MockEventSender {
116-
fn send(&self, _event: iroh_bytes::provider::Event) -> futures::future::BoxFuture<()> {
116+
fn send(&self, _event: iroh_bytes::provider::Event) -> futures_lite::future::Boxed<()> {
117117
async move {}.boxed()
118118
}
119119
}

iroh-bytes/src/downloader.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::{
3737
time::Duration,
3838
};
3939

40-
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
40+
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
4141
use hashlink::LinkedHashSet;
4242
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
4343
use iroh_net::{MagicEndpoint, NodeAddr, NodeId};
@@ -72,9 +72,7 @@ const SERVICE_CHANNEL_CAPACITY: usize = 128;
7272
pub struct IntentId(pub u64);
7373

7474
/// Trait modeling a dialer. This allows for IO-less testing.
75-
pub trait Dialer:
76-
futures::Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin
77-
{
75+
pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
7876
/// Type of connections returned by the Dialer.
7977
type Connection: Clone;
8078
/// Dial a node.
@@ -99,7 +97,7 @@ pub enum FailureAction {
9997
}
10098

10199
/// Future of a get request.
102-
type GetFut = LocalBoxFuture<'static, InternalDownloadResult>;
100+
type GetFut = BoxedLocal<InternalDownloadResult>;
103101

104102
/// Trait modelling performing a single request over a connection. This allows for IO-less testing.
105103
pub trait Getter {
@@ -307,7 +305,7 @@ impl std::future::Future for DownloadHandle {
307305
use std::task::Poll::*;
308306
// make it easier on holders of the handle to poll the result, removing the receiver error
309307
// from the middle
310-
match self.receiver.poll_unpin(cx) {
308+
match std::pin::Pin::new(&mut self.receiver).poll(cx) {
311309
Ready(Ok(result)) => Ready(result),
312310
Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
313311
Pending => Pending,

iroh-bytes/src/downloader/get.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
get::{db::get_to_db, error::GetError},
55
store::Store,
66
};
7-
use futures::FutureExt;
7+
use futures_lite::FutureExt;
88
#[cfg(feature = "metrics")]
99
use iroh_metrics::{inc, inc_by};
1010

iroh-bytes/src/downloader/progress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ impl ProgressSender for BroadcastProgressSender {
155155
futs
156156
};
157157

158-
let failed_senders = futures::future::join_all(futs).await;
158+
let failed_senders = futures_buffered::join_all(futs).await;
159159
// remove senders where the receiver is dropped
160160
if failed_senders.iter().any(|s| s.is_some()) {
161161
let mut inner = self.shared.lock();

iroh-bytes/src/downloader/test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#![cfg(test)]
22
use anyhow::anyhow;
3-
use futures::FutureExt;
43
use std::{
54
sync::atomic::AtomicUsize,
65
time::{Duration, Instant},
76
};
87

8+
use futures_util::future::FutureExt;
99
use iroh_net::key::SecretKey;
1010

1111
use crate::{
@@ -101,7 +101,7 @@ async fn deduplication() {
101101
handles.push(h);
102102
}
103103
assert!(
104-
futures::future::join_all(handles)
104+
futures_buffered::join_all(handles)
105105
.await
106106
.into_iter()
107107
.all(|r| r.is_ok()),
@@ -175,7 +175,7 @@ async fn max_concurrent_requests_total() {
175175
}
176176

177177
assert!(
178-
futures::future::join_all(handles)
178+
futures_buffered::join_all(handles)
179179
.await
180180
.into_iter()
181181
.all(|r| r.is_ok()),
@@ -215,7 +215,7 @@ async fn max_concurrent_requests_per_peer() {
215215
handles.push(h);
216216
}
217217

218-
futures::future::join_all(handles).await;
218+
futures_buffered::join_all(handles).await;
219219
}
220220

221221
/// Tests concurrent progress reporting for multiple intents.
@@ -301,7 +301,7 @@ async fn concurrent_progress() {
301301

302302
done_tx.send(()).unwrap();
303303

304-
let (res_a, res_b, res_c) = futures::future::join3(handle_a, handle_b, handle_c).await;
304+
let (res_a, res_b, res_c) = tokio::join!(handle_a, handle_b, handle_c);
305305
res_a.unwrap();
306306
res_b.unwrap();
307307
res_c.unwrap();
@@ -359,7 +359,7 @@ async fn long_queue() {
359359
handles.push(h);
360360
}
361361

362-
let res = futures::future::join_all(handles).await;
362+
let res = futures_buffered::join_all(handles).await;
363363
for res in res {
364364
res.expect("all downloads to succeed");
365365
}

iroh-bytes/src/downloader/test/dialer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl Dialer for TestingDialer {
6060
}
6161
}
6262

63-
impl futures::Stream for TestingDialer {
63+
impl Stream for TestingDialer {
6464
type Item = (NodeId, anyhow::Result<NodeId>);
6565

6666
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

iroh-bytes/src/downloader/test/getter.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
//! Implementation of [`super::Getter`] used for testing.
22
3-
use std::{sync::Arc, time::Duration};
4-
5-
use futures::future::BoxFuture;
3+
use futures_lite::{future::Boxed as BoxFuture, FutureExt};
64
use parking_lot::RwLock;
5+
use std::{sync::Arc, time::Duration};
76

87
use super::*;
98

@@ -16,7 +15,7 @@ pub(super) type RequestHandlerFn = Arc<
1615
NodeId,
1716
BroadcastProgressSender,
1817
Duration,
19-
) -> BoxFuture<'static, InternalDownloadResult>
18+
) -> BoxFuture<InternalDownloadResult>
2019
+ Send
2120
+ Sync
2221
+ 'static,

iroh-bytes/src/get/db.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
//! Functions that use the iroh-bytes protocol in conjunction with a bao store.
2-
use bao_tree::ChunkNum;
3-
use futures::{Future, StreamExt};
2+
3+
use std::future::Future;
4+
use std::io;
5+
use std::num::NonZeroU64;
6+
7+
use futures_lite::StreamExt;
48
use iroh_base::hash::Hash;
59
use iroh_base::rpc::RpcError;
610
use serde::{Deserialize, Serialize};
711

12+
use crate::hashseq::parse_hash_seq;
813
use crate::protocol::RangeSpec;
14+
use crate::store::BaoBatchWriter;
915
use crate::store::BaoBlobSize;
1016
use crate::store::FallibleProgressBatchWriter;
11-
use std::io;
12-
use std::num::NonZeroU64;
13-
14-
use crate::hashseq::parse_hash_seq;
15-
use crate::store::BaoBatchWriter;
1617

1718
use crate::{
1819
get::{
@@ -28,7 +29,7 @@ use crate::{
2829
BlobFormat, HashAndFormat,
2930
};
3031
use anyhow::anyhow;
31-
use bao_tree::ChunkRanges;
32+
use bao_tree::{ChunkNum, ChunkRanges};
3233
use iroh_io::AsyncSliceReader;
3334
use tracing::trace;
3435

@@ -294,7 +295,7 @@ pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<
294295

295296
/// Like `get_blob_info`, but for multiple hashes
296297
async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<BlobInfo<D>>> {
297-
let items = futures::stream::iter(hash_seq)
298+
let items = futures_lite::stream::iter(hash_seq)
298299
.then(|hash| blob_info(db, hash))
299300
.collect::<Vec<_>>();
300301
items.await.into_iter().collect()

iroh-bytes/src/provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::Duration;
55
use anyhow::{Context, Result};
66
use bao_tree::io::fsm::{encode_ranges_validated, Outboard};
77
use bao_tree::io::EncodeError;
8-
use futures::future::BoxFuture;
8+
use futures_lite::future::Boxed as BoxFuture;
99
use iroh_base::rpc::RpcError;
1010
use iroh_io::stats::{
1111
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,

iroh-bytes/src/store/bao_file.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ impl BaoBatchWriter for BaoFileWriter {
720720

721721
#[cfg(test)]
722722
pub mod test_support {
723-
use std::{io::Cursor, ops::Range};
723+
use std::{future::Future, io::Cursor, ops::Range};
724724

725725
use bao_tree::{
726726
io::{
@@ -731,7 +731,8 @@ pub mod test_support {
731731
},
732732
BlockSize, ChunkRanges,
733733
};
734-
use futures::{Future, Stream, StreamExt};
734+
use futures_lite::{Stream, StreamExt};
735+
use iroh_base::hash::Hash;
735736
use iroh_io::AsyncStreamReader;
736737
use rand::RngCore;
737738
use range_collections::RangeSet2;
@@ -853,7 +854,7 @@ pub mod test_support {
853854
.chunks(mtu)
854855
.map(Bytes::copy_from_slice)
855856
.collect::<Vec<_>>();
856-
futures::stream::iter(parts).then(move |part| async move {
857+
futures_lite::stream::iter(parts).then(move |part| async move {
857858
tokio::time::sleep(delay).await;
858859
part
859860
})
@@ -872,7 +873,7 @@ mod tests {
872873
use std::io::Write;
873874

874875
use bao_tree::{blake3, ChunkNum, ChunkRanges};
875-
use futures::StreamExt;
876+
use futures_lite::StreamExt;
876877
use iroh_io::TokioStreamReader;
877878
use tests::test_support::{
878879
decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,

iroh-bytes/src/store/fs.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ use bao_tree::io::{
7878
sync::{ReadAt, Size},
7979
};
8080
use bytes::Bytes;
81-
use futures::{channel::oneshot, Stream, StreamExt};
81+
use futures_lite::{Stream, StreamExt};
8282

8383
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
8484
use iroh_io::AsyncSliceReader;
8585
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
8686
use serde::{Deserialize, Serialize};
8787
use smallvec::SmallVec;
88-
use tokio::io::AsyncWriteExt;
88+
use tokio::{io::AsyncWriteExt, sync::oneshot};
8989
use tracing::trace_span;
9090

9191
mod import_flat_store;
@@ -1250,7 +1250,7 @@ pub(crate) enum OuterError {
12501250
#[error("progress send error: {0}")]
12511251
ProgressSend(#[from] ProgressSendError),
12521252
#[error("recv error: {0}")]
1253-
Recv(#[from] oneshot::Canceled),
1253+
Recv(#[from] oneshot::error::RecvError),
12541254
#[error("recv error: {0}")]
12551255
FlumeRecv(#[from] flume::RecvError),
12561256
#[error("join error: {0}")]

iroh-bytes/src/store/fs/test_support.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
path::{Path, PathBuf},
88
};
99

10-
use futures::channel::oneshot;
10+
use tokio::sync::oneshot;
1111

1212
use super::{
1313
tables::{ReadableTables, Tables},

iroh-bytes/src/store/fs/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub fn to_stream(
3636
.chunks(mtu)
3737
.map(Bytes::copy_from_slice)
3838
.collect::<Vec<_>>();
39-
futures::stream::iter(parts)
39+
futures_lite::stream::iter(parts)
4040
.then(move |part| async move {
4141
tokio::time::sleep(delay).await;
4242
io::Result::Ok(part)

iroh-bytes/src/store/mem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bao_tree::{
66
BaoTree,
77
};
88
use bytes::{Bytes, BytesMut};
9-
use futures::{Stream, StreamExt};
9+
use futures_lite::{Stream, StreamExt};
1010
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
1111
use iroh_io::AsyncSliceReader;
1212
use std::{

iroh-bytes/src/store/readonly_mem.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use bao_tree::{
2424
io::{outboard::PreOrderMemOutboard, sync::Outboard},
2525
};
2626
use bytes::Bytes;
27-
use futures::Stream;
27+
use futures_lite::Stream;
2828
use iroh_io::AsyncSliceReader;
2929
use tokio::io::AsyncWriteExt;
3030

iroh-bytes/src/store/traits.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2-
use std::{collections::BTreeSet, io, path::PathBuf};
2+
use std::{collections::BTreeSet, future::Future, io, path::PathBuf};
33

44
use bao_tree::{
55
io::fsm::{BaoContentItem, Outboard},
66
BaoTree, ChunkRanges,
77
};
88
use bytes::Bytes;
9-
use futures::{Future, Stream, StreamExt};
9+
use futures_lite::{Stream, StreamExt};
1010
use genawaiter::rc::{Co, Gen};
1111
use iroh_base::rpc::RpcError;
1212
use iroh_io::AsyncSliceReader;
@@ -433,7 +433,7 @@ async fn validate_impl(
433433
total: complete.len() as u64,
434434
})
435435
.await?;
436-
let complete_result = futures::stream::iter(complete)
436+
let complete_result = futures_lite::stream::iter(complete)
437437
.map(|hash| {
438438
let store = store.clone();
439439
let tx = tx.clone();
@@ -482,7 +482,7 @@ async fn validate_impl(
482482
.buffered_unordered(validate_parallelism)
483483
.collect::<Vec<_>>()
484484
.await;
485-
let partial_result = futures::stream::iter(partial)
485+
let partial_result = futures_lite::stream::iter(partial)
486486
.map(|hash| {
487487
let store = store.clone();
488488
let tx = tx.clone();

0 commit comments

Comments
 (0)