Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: avoid using futures crate directly #2117

Merged
merged 35 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4af11f1
refactor(iroh-net): stop using the main futures crate
dignifiedquire Mar 22, 2024
caef7ba
refactor(iroh-bytes): avoid using futures crate
dignifiedquire Mar 22, 2024
da3ba14
refactor(iroh-sync): avoid using futures crate directly
dignifiedquire Mar 22, 2024
ca287a9
refactor(iroh-gossip): avoid futures crate
dignifiedquire Mar 22, 2024
c7b9ffb
fixup: iroh-bytes downloader
dignifiedquire Mar 22, 2024
010bce4
refactor(iroh): avoid futures crate
dignifiedquire Mar 22, 2024
e934047
refactor(iroh-cli): avoid futures crate
dignifiedquire Mar 22, 2024
ab582b1
fixup
dignifiedquire Mar 25, 2024
887eb6c
update shutdown logic
dignifiedquire Mar 25, 2024
871f180
fixup
dignifiedquire Mar 25, 2024
6ee446b
cleanup
dignifiedquire Mar 25, 2024
8de4283
more fixes
dignifiedquire Mar 25, 2024
3f422f1
more fixes
dignifiedquire Mar 25, 2024
00492f6
fixup
dignifiedquire Mar 25, 2024
a604583
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 5, 2024
9c838e0
first pass of merge updates
dignifiedquire Apr 5, 2024
82559e2
fix shutdown
dignifiedquire Apr 5, 2024
a26a253
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 22, 2024
dbe2299
merge fixup
dignifiedquire Apr 22, 2024
a3e894b
fixup: iroh-dns-server
dignifiedquire Apr 22, 2024
1713a16
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 22, 2024
b7a7da0
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 24, 2024
dda6dde
try updating to new quic rpc
dignifiedquire Apr 24, 2024
e9aa5d9
Fix compile errors
rklaehn Apr 25, 2024
416fc57
fix linux
dignifiedquire Apr 26, 2024
2a94478
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 26, 2024
ceaa997
update deps:
dignifiedquire Apr 26, 2024
1c9dc7c
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 26, 2024
0c3f2f2
improve shutdown
dignifiedquire Apr 26, 2024
a2cb330
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 26, 2024
f12562d
fixup
dignifiedquire Apr 26, 2024
b4994d7
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 29, 2024
f6945a7
fixup
dignifiedquire Apr 29, 2024
5dfc961
Merge branch 'main' into refactor-futures
dignifiedquire Apr 29, 2024
61014e8
Merge remote-tracking branch 'origin/main' into refactor-futures
dignifiedquire Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 164 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] }
flume = "0.11"
futures = "0.3.25"
futures-buffered = "0.2.4"
futures-lite = "2.3"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hashlink = { version = "0.9.0", optional = true }
hex = "0.4.3"
Expand Down Expand Up @@ -54,6 +54,7 @@ tracing-futures = "0.2.5"
http-body = "0.4.5"
iroh-bytes = { path = ".", features = ["downloader"] }
iroh-test = { path = "../iroh-test" }
futures-buffered = "0.2.4"
proptest = "1.0.0"
serde_json = "1.0.107"
serde_test = "1.0.176"
Expand All @@ -62,12 +63,14 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rcgen = "0.12.0"
rustls = { version = "0.21.11", default-features = false, features = ["quic"] }
tempfile = "3.10.0"
futures-util = "0.3.30"

[features]
default = ["fs-store"]
downloader = ["iroh-net", "parking_lot", "tokio-util/time", "hashlink"]
fs-store = ["reflink-copy", "redb", "redb_v1", "tempfile"]
metrics = ["iroh-metrics"]
downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["dep:iroh-metrics"]
redb = ["dep:redb"]

[[example]]
name = "provide-bytes"
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/examples/fetch-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::io;

use bao_tree::io::fsm::BaoContentItem;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use genawaiter::sync::Co;
use genawaiter::sync::Gen;
use tokio::io::AsyncWriteExt;
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ async fn main() -> Result<()> {
#[derive(Clone)]
struct MockEventSender;

use futures::future::FutureExt;
use futures_lite::future::FutureExt;

impl iroh_bytes::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_bytes::provider::Event) -> futures::future::BoxFuture<()> {
fn send(&self, _event: iroh_bytes::provider::Event) -> futures_lite::future::Boxed<()> {
async move {}.boxed()
}
}
10 changes: 4 additions & 6 deletions iroh-bytes/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{
time::Duration,
};

use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_net::{MagicEndpoint, NodeAddr, NodeId};
Expand Down Expand Up @@ -72,9 +72,7 @@ const SERVICE_CHANNEL_CAPACITY: usize = 128;
pub struct IntentId(pub u64);

/// Trait modeling a dialer. This allows for IO-less testing.
pub trait Dialer:
futures::Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin
{
pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
/// Type of connections returned by the Dialer.
type Connection: Clone;
/// Dial a node.
Expand All @@ -99,7 +97,7 @@ pub enum FailureAction {
}

/// Future of a get request.
type GetFut = LocalBoxFuture<'static, InternalDownloadResult>;
type GetFut = BoxedLocal<InternalDownloadResult>;

/// Trait modelling performing a single request over a connection. This allows for IO-less testing.
pub trait Getter {
Expand Down Expand Up @@ -307,7 +305,7 @@ impl std::future::Future for DownloadHandle {
use std::task::Poll::*;
// make it easier on holders of the handle to poll the result, removing the receiver error
// from the middle
match self.receiver.poll_unpin(cx) {
match std::pin::Pin::new(&mut self.receiver).poll(cx) {
Ready(Ok(result)) => Ready(result),
Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)),
Pending => Pending,
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
get::{db::get_to_db, error::GetError},
store::Store,
};
use futures::FutureExt;
use futures_lite::FutureExt;
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};

Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ProgressSender for BroadcastProgressSender {
futs
};

let failed_senders = futures::future::join_all(futs).await;
let failed_senders = futures_buffered::join_all(futs).await;
// remove senders where the receiver is dropped
if failed_senders.iter().any(|s| s.is_some()) {
let mut inner = self.shared.lock();
Expand Down
12 changes: 6 additions & 6 deletions iroh-bytes/src/downloader/test.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#![cfg(test)]
use anyhow::anyhow;
use futures::FutureExt;
use std::{
sync::atomic::AtomicUsize,
time::{Duration, Instant},
};

use futures_util::future::FutureExt;
use iroh_net::key::SecretKey;

use crate::{
Expand Down Expand Up @@ -101,7 +101,7 @@ async fn deduplication() {
handles.push(h);
}
assert!(
futures::future::join_all(handles)
futures_buffered::join_all(handles)
.await
.into_iter()
.all(|r| r.is_ok()),
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn max_concurrent_requests_total() {
}

assert!(
futures::future::join_all(handles)
futures_buffered::join_all(handles)
.await
.into_iter()
.all(|r| r.is_ok()),
Expand Down Expand Up @@ -215,7 +215,7 @@ async fn max_concurrent_requests_per_peer() {
handles.push(h);
}

futures::future::join_all(handles).await;
futures_buffered::join_all(handles).await;
}

/// Tests concurrent progress reporting for multiple intents.
Expand Down Expand Up @@ -301,7 +301,7 @@ async fn concurrent_progress() {

done_tx.send(()).unwrap();

let (res_a, res_b, res_c) = futures::future::join3(handle_a, handle_b, handle_c).await;
let (res_a, res_b, res_c) = tokio::join!(handle_a, handle_b, handle_c);
res_a.unwrap();
res_b.unwrap();
res_c.unwrap();
Expand Down Expand Up @@ -359,7 +359,7 @@ async fn long_queue() {
handles.push(h);
}

let res = futures::future::join_all(handles).await;
let res = futures_buffered::join_all(handles).await;
for res in res {
res.expect("all downloads to succeed");
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Dialer for TestingDialer {
}
}

impl futures::Stream for TestingDialer {
impl Stream for TestingDialer {
type Item = (NodeId, anyhow::Result<NodeId>);

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
7 changes: 3 additions & 4 deletions iroh-bytes/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Implementation of [`super::Getter`] used for testing.
use std::{sync::Arc, time::Duration};

use futures::future::BoxFuture;
use futures_lite::{future::Boxed as BoxFuture, FutureExt};
use parking_lot::RwLock;
use std::{sync::Arc, time::Duration};

use super::*;

Expand All @@ -16,7 +15,7 @@ pub(super) type RequestHandlerFn = Arc<
NodeId,
BroadcastProgressSender,
Duration,
) -> BoxFuture<'static, InternalDownloadResult>
) -> BoxFuture<InternalDownloadResult>
+ Send
+ Sync
+ 'static,
Expand Down
19 changes: 10 additions & 9 deletions iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
//! Functions that use the iroh-bytes protocol in conjunction with a bao store.
use bao_tree::ChunkNum;
use futures::{Future, StreamExt};

use std::future::Future;
use std::io;
use std::num::NonZeroU64;

use futures_lite::StreamExt;
use iroh_base::hash::Hash;
use iroh_base::rpc::RpcError;
use serde::{Deserialize, Serialize};

use crate::hashseq::parse_hash_seq;
use crate::protocol::RangeSpec;
use crate::store::BaoBatchWriter;
use crate::store::BaoBlobSize;
use crate::store::FallibleProgressBatchWriter;
use std::io;
use std::num::NonZeroU64;

use crate::hashseq::parse_hash_seq;
use crate::store::BaoBatchWriter;

use crate::{
get::{
Expand All @@ -28,7 +29,7 @@ use crate::{
BlobFormat, HashAndFormat,
};
use anyhow::anyhow;
use bao_tree::ChunkRanges;
use bao_tree::{ChunkNum, ChunkRanges};
use iroh_io::AsyncSliceReader;
use tracing::trace;

Expand Down Expand Up @@ -294,7 +295,7 @@ pub async fn blob_info<D: BaoStore>(db: &D, hash: &Hash) -> io::Result<BlobInfo<

/// Like `get_blob_info`, but for multiple hashes
async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<BlobInfo<D>>> {
let items = futures::stream::iter(hash_seq)
let items = futures_lite::stream::iter(hash_seq)
.then(|hash| blob_info(db, hash))
.collect::<Vec<_>>();
items.await.into_iter().collect()
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use bao_tree::io::fsm::{encode_ranges_validated, Outboard};
use bao_tree::io::EncodeError;
use futures::future::BoxFuture;
use futures_lite::future::Boxed as BoxFuture;
use iroh_base::rpc::RpcError;
use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
Expand Down
9 changes: 5 additions & 4 deletions iroh-bytes/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ impl BaoBatchWriter for BaoFileWriter {

#[cfg(test)]
pub mod test_support {
use std::{io::Cursor, ops::Range};
use std::{future::Future, io::Cursor, ops::Range};

use bao_tree::{
io::{
Expand All @@ -731,7 +731,8 @@ pub mod test_support {
},
BlockSize, ChunkRanges,
};
use futures::{Future, Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::Hash;
use iroh_io::AsyncStreamReader;
use rand::RngCore;
use range_collections::RangeSet2;
Expand Down Expand Up @@ -853,7 +854,7 @@ pub mod test_support {
.chunks(mtu)
.map(Bytes::copy_from_slice)
.collect::<Vec<_>>();
futures::stream::iter(parts).then(move |part| async move {
futures_lite::stream::iter(parts).then(move |part| async move {
tokio::time::sleep(delay).await;
part
})
Expand All @@ -872,7 +873,7 @@ mod tests {
use std::io::Write;

use bao_tree::{blake3, ChunkNum, ChunkRanges};
use futures::StreamExt;
use futures_lite::StreamExt;
use iroh_io::TokioStreamReader;
use tests::test_support::{
decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
Expand Down
6 changes: 3 additions & 3 deletions iroh-bytes/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ use bao_tree::io::{
sync::{ReadAt, Size},
};
use bytes::Bytes;
use futures::{channel::oneshot, Stream, StreamExt};
use futures_lite::{Stream, StreamExt};

use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use tokio::io::AsyncWriteExt;
use tokio::{io::AsyncWriteExt, sync::oneshot};
use tracing::trace_span;

mod import_flat_store;
Expand Down Expand Up @@ -1250,7 +1250,7 @@ pub(crate) enum OuterError {
#[error("progress send error: {0}")]
ProgressSend(#[from] ProgressSendError),
#[error("recv error: {0}")]
Recv(#[from] oneshot::Canceled),
Recv(#[from] oneshot::error::RecvError),
#[error("recv error: {0}")]
FlumeRecv(#[from] flume::RecvError),
#[error("join error: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/fs/test_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
path::{Path, PathBuf},
};

use futures::channel::oneshot;
use tokio::sync::oneshot;

use super::{
tables::{ReadableTables, Tables},
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn to_stream(
.chunks(mtu)
.map(Bytes::copy_from_slice)
.collect::<Vec<_>>();
futures::stream::iter(parts)
futures_lite::stream::iter(parts)
.then(move |part| async move {
tokio::time::sleep(delay).await;
io::Result::Ok(part)
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bao_tree::{
BaoTree,
};
use bytes::{Bytes, BytesMut};
use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use std::{
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use bao_tree::{
io::{outboard::PreOrderMemOutboard, sync::Outboard},
};
use bytes::Bytes;
use futures::Stream;
use futures_lite::Stream;
use iroh_io::AsyncSliceReader;
use tokio::io::AsyncWriteExt;

Expand Down
8 changes: 4 additions & 4 deletions iroh-bytes/src/store/traits.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
use std::{collections::BTreeSet, io, path::PathBuf};
use std::{collections::BTreeSet, future::Future, io, path::PathBuf};

use bao_tree::{
io::fsm::{BaoContentItem, Outboard},
BaoTree, ChunkRanges,
};
use bytes::Bytes;
use futures::{Future, Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use genawaiter::rc::{Co, Gen};
use iroh_base::rpc::RpcError;
use iroh_io::AsyncSliceReader;
Expand Down Expand Up @@ -433,7 +433,7 @@ async fn validate_impl(
total: complete.len() as u64,
})
.await?;
let complete_result = futures::stream::iter(complete)
let complete_result = futures_lite::stream::iter(complete)
.map(|hash| {
let store = store.clone();
let tx = tx.clone();
Expand Down Expand Up @@ -482,7 +482,7 @@ async fn validate_impl(
.buffered_unordered(validate_parallelism)
.collect::<Vec<_>>()
.await;
let partial_result = futures::stream::iter(partial)
let partial_result = futures_lite::stream::iter(partial)
.map(|hash| {
let store = store.clone();
let tx = tx.clone();
Expand Down
Loading
Loading