Skip to content

Commit dced544

Browse files
refactor(iroh): avoid futures crate
1 parent 199934e commit dced544

File tree

20 files changed

+122
-109
lines changed

20 files changed

+122
-109
lines changed

Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-bytes/src/downloader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use std::{
3737

3838
use crate::{get::Stats, protocol::RangeSpecSeq, store::Store, Hash, HashAndFormat};
3939
use bao_tree::ChunkRanges;
40-
use futures_lite::{future::BoxedLocal, StreamExt, Stream};
40+
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
4141
use iroh_net::{MagicEndpoint, NodeId};
4242
use tokio::{
4343
sync::{mpsc, oneshot},

iroh-bytes/src/downloader/test/getter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl Getter for TestingGetter {
3131
tokio::time::sleep(request_duration).await;
3232
Ok(Stats::default())
3333
}
34-
.boxed_local()
34+
.boxed_local()
3535
}
3636
}
3737

iroh/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ bytes = "1"
2222
data-encoding = "2.4.0"
2323
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] }
2424
flume = "0.11"
25-
futures = "0.3.25"
25+
futures-buffered = "0.2.4"
26+
futures-lite = "2.3"
27+
futures-util = "0.3"
2628
genawaiter = { version = "0.99", default-features = false, features = ["futures03"] }
2729
hashlink = "0.8.4"
2830
hex = { version = "0.4.3" }

iroh/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//!
33
//! TODO: Contains only iroh sync related methods. Add other methods.
44
5-
use futures::{Stream, StreamExt};
5+
use futures_lite::{Stream, StreamExt};
66
use quic_rpc::{RpcClient, ServiceConnection};
77

88
use crate::rpc_protocol::ProviderService;

iroh/src/client/authors.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::Result;
2-
use futures::{Stream, TryStreamExt};
2+
use futures_lite::{stream::StreamExt, Stream};
33
use iroh_sync::AuthorId;
44
use quic_rpc::{RpcClient, ServiceConnection};
55

@@ -26,6 +26,6 @@ where
2626
/// List document authors for which we have a secret key.
2727
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
2828
let stream = self.rpc.server_streaming(AuthorListRequest {}).await?;
29-
Ok(flatten(stream).map_ok(|res| res.author_id))
29+
Ok(flatten(stream).map(|res| res.map(|res| res.author_id)))
3030
}
3131
}

iroh/src/client/blobs.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
future::Future,
23
io,
34
path::PathBuf,
45
pin::Pin,
@@ -8,7 +9,8 @@ use std::{
89

910
use anyhow::{anyhow, Result};
1011
use bytes::Bytes;
11-
use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt};
12+
use futures_lite::{Stream, StreamExt};
13+
use futures_util::SinkExt;
1214
use iroh_base::ticket::BlobTicket;
1315
use iroh_bytes::{
1416
format::collection::Collection, get::db::DownloadProgress, provider::AddProgress,
@@ -176,7 +178,7 @@ where
176178

177179
/// Write a blob by passing bytes.
178180
pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<BlobAddOutcome> {
179-
let input = futures::stream::once(futures::future::ready(Ok(bytes.into())));
181+
let input = futures_lite::stream::once(Ok(bytes.into()));
180182
self.add_stream(input, SetTagOption::Auto).await?.await
181183
}
182184

@@ -186,7 +188,7 @@ where
186188
bytes: impl Into<Bytes>,
187189
name: impl Into<Tag>,
188190
) -> anyhow::Result<BlobAddOutcome> {
189-
let input = futures::stream::once(futures::future::ready(Ok(bytes.into())));
191+
let input = futures_lite::stream::once(Ok(bytes.into()));
190192
self.add_stream(input, SetTagOption::Named(name.into()))
191193
.await?
192194
.await
@@ -203,14 +205,14 @@ where
203205
.rpc
204206
.server_streaming(BlobValidateRequest { repair })
205207
.await?;
206-
Ok(stream.map_err(anyhow::Error::from))
208+
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
207209
}
208210

209211
/// Download a blob from another node and add it to the local database.
210212
pub async fn download(&self, req: BlobDownloadRequest) -> Result<BlobDownloadProgress> {
211213
let stream = self.rpc.server_streaming(req).await?;
212214
Ok(BlobDownloadProgress::new(
213-
stream.map_err(anyhow::Error::from),
215+
stream.map(|res| res.map_err(anyhow::Error::from)),
214216
))
215217
}
216218

@@ -384,7 +386,7 @@ impl BlobAddProgress {
384386
impl Stream for BlobAddProgress {
385387
type Item = Result<AddProgress>;
386388
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
387-
self.stream.poll_next_unpin(cx)
389+
Pin::new(&mut self.stream).poll_next(cx)
388390
}
389391
}
390392

@@ -393,7 +395,7 @@ impl Future for BlobAddProgress {
393395

394396
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395397
loop {
396-
match self.stream.poll_next_unpin(cx) {
398+
match Pin::new(&mut self.stream).poll_next(cx) {
397399
Poll::Pending => return Poll::Pending,
398400
Poll::Ready(None) => {
399401
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
@@ -487,7 +489,7 @@ impl BlobDownloadProgress {
487489
impl Stream for BlobDownloadProgress {
488490
type Item = Result<DownloadProgress>;
489491
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
490-
self.stream.poll_next_unpin(cx)
492+
Pin::new(&mut self.stream).poll_next(cx)
491493
}
492494
}
493495

@@ -496,7 +498,7 @@ impl Future for BlobDownloadProgress {
496498

497499
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
498500
loop {
499-
match self.stream.poll_next_unpin(cx) {
501+
match Pin::new(&mut self.stream).poll_next(cx) {
500502
Poll::Pending => return Poll::Pending,
501503
Poll::Ready(None) => {
502504
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))

iroh/src/client/docs.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77

88
use anyhow::{anyhow, Context as _, Result};
99
use bytes::Bytes;
10-
use futures::{Stream, StreamExt, TryStreamExt};
10+
use futures_lite::{Stream, StreamExt};
1111
use iroh_base::key::PublicKey;
1212
use iroh_bytes::{export::ExportProgress, store::ExportMode, Hash};
1313
use iroh_net::NodeAddr;
@@ -72,7 +72,7 @@ where
7272
/// List all documents.
7373
pub async fn list(&self) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>>> {
7474
let stream = self.rpc.server_streaming(DocListRequest {}).await?;
75-
Ok(flatten(stream).map_ok(|res| (res.id, res.capability)))
75+
Ok(flatten(stream).map(|res| res.map(|res| (res.id, res.capability))))
7676
}
7777

7878
/// Get a [`Doc`] client for a single document. Return None if the document cannot be found.
@@ -293,7 +293,7 @@ where
293293
query: query.into(),
294294
})
295295
.await?;
296-
Ok(flatten(stream).map_ok(|res| res.entry.into()))
296+
Ok(flatten(stream).map(|res| res.map(|res| res.entry.into())))
297297
}
298298

299299
/// Get a single entry.
@@ -340,9 +340,10 @@ where
340340
.rpc
341341
.server_streaming(DocSubscribeRequest { doc_id: self.id() })
342342
.await?;
343-
Ok(flatten(stream)
344-
.map_ok(|res| res.event.into())
345-
.map_err(Into::into))
343+
Ok(flatten(stream).map(|res| match res {
344+
Ok(res) => Ok(res.event.into()),
345+
Err(err) => Err(err.into()),
346+
}))
346347
}
347348

348349
/// Get status info for this document
@@ -588,7 +589,7 @@ pub struct DocImportFileOutcome {
588589
impl Stream for DocImportFileProgress {
589590
type Item = Result<DocImportProgress>;
590591
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591-
self.stream.poll_next_unpin(cx)
592+
Pin::new(&mut self.stream).poll_next(cx)
592593
}
593594
}
594595

@@ -653,8 +654,9 @@ pub struct DocExportFileOutcome {
653654

654655
impl Stream for DocExportFileProgress {
655656
type Item = Result<ExportProgress>;
657+
656658
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
657-
self.stream.poll_next_unpin(cx)
659+
Pin::new(&mut self.stream).poll_next(cx)
658660
}
659661
}
660662

iroh/src/client/node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::BTreeMap;
22

33
use anyhow::Result;
4-
use futures::{Stream, TryStreamExt};
4+
use futures_lite::{Stream, StreamExt};
55
use iroh_base::key::PublicKey;
66
use iroh_net::magic_endpoint::ConnectionInfo;
77
use quic_rpc::{RpcClient, ServiceConnection};
@@ -32,7 +32,7 @@ where
3232
/// Get information about the different connections we have made
3333
pub async fn connections(&self) -> Result<impl Stream<Item = Result<ConnectionInfo>>> {
3434
let stream = self.rpc.server_streaming(NodeConnectionsRequest {}).await?;
35-
Ok(flatten(stream).map_ok(|res| res.conn_info))
35+
Ok(flatten(stream).map(|res| res.map(|res| res.conn_info)))
3636
}
3737

3838
/// Get connection information about a node

iroh/src/client/tags.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::Result;
2-
use futures::{Stream, TryStreamExt};
2+
use futures_lite::{Stream, StreamExt};
33
use iroh_bytes::Tag;
44
use quic_rpc::{RpcClient, ServiceConnection};
55

@@ -18,7 +18,7 @@ where
1818
/// List all tags.
1919
pub async fn list(&self) -> Result<impl Stream<Item = Result<ListTagsResponse>>> {
2020
let stream = self.rpc.server_streaming(ListTagsRequest).await?;
21-
Ok(stream.map_err(anyhow::Error::from))
21+
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
2222
}
2323

2424
/// Delete a tag.

iroh/src/node.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use std::sync::Arc;
1414
use std::task::Poll;
1515

1616
use anyhow::{anyhow, Result};
17-
use futures::future::{BoxFuture, Shared};
18-
use futures::{FutureExt, StreamExt};
17+
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
1918
use iroh_bytes::store::Store as BaoStore;
2019
use iroh_bytes::BlobFormat;
2120
use iroh_bytes::Hash;
@@ -46,7 +45,7 @@ mod rpc_status;
4645
pub use builder::{Builder, GcPolicy, StorageConfig};
4746
pub use rpc_status::RpcStatus;
4847

49-
type EventCallback = Box<dyn Fn(Event) -> BoxFuture<'static, ()> + 'static + Sync + Send>;
48+
type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;
5049

5150
#[derive(Default, derive_more::Debug, Clone)]
5251
struct Callbacks(#[debug("..")] Arc<RwLock<Vec<EventCallback>>>);
@@ -67,8 +66,9 @@ impl Callbacks {
6766

6867
impl iroh_bytes::provider::EventSender for Callbacks {
6968
fn send(&self, event: iroh_bytes::provider::Event) -> BoxFuture<()> {
69+
let this = self.clone();
7070
async move {
71-
let cbs = self.0.read().await;
71+
let cbs = this.0.read().await;
7272
for cb in &*cbs {
7373
cb(Event::ByteProvide(event.clone())).await;
7474
}
@@ -90,7 +90,7 @@ impl iroh_bytes::provider::EventSender for Callbacks {
9090
#[derive(Debug, Clone)]
9191
pub struct Node<D> {
9292
inner: Arc<NodeInner<D>>,
93-
task: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
93+
task: (), // Arc<BoxFuture<anyhow::Result<()>>>,
9494
client: crate::client::mem::Iroh,
9595
}
9696

@@ -102,7 +102,7 @@ struct NodeInner<D> {
102102
cancel_token: CancellationToken,
103103
controller: FlumeConnection<ProviderResponse, ProviderRequest>,
104104
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
105-
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<'static, ()> + Send + Sync + 'static>>,
105+
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
106106
callbacks: Callbacks,
107107
#[allow(dead_code)]
108108
gc_task: Option<AbortingJoinHandle<()>>,
@@ -190,7 +190,7 @@ impl<D: BaoStore> Node<D> {
190190
/// progress.
191191
///
192192
/// Warning: The callback must complete quickly, as otherwise it will block ongoing work.
193-
pub async fn subscribe<F: Fn(Event) -> BoxFuture<'static, ()> + Send + Sync + 'static>(
193+
pub async fn subscribe<F: Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>(
194194
&self,
195195
cb: F,
196196
) -> Result<()> {
@@ -254,7 +254,8 @@ impl<D> Future for Node<D> {
254254
type Output = Result<(), Arc<JoinError>>;
255255

256256
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
257-
Pin::new(&mut self.task).poll(cx)
257+
// Pin::new(&mut self.task).poll(cx)
258+
todo!()
258259
}
259260
}
260261

iroh/src/node/builder.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
};
88

99
use anyhow::{bail, Context, Result};
10-
use futures::{FutureExt, StreamExt, TryFutureExt};
10+
use futures_lite::{FutureExt, StreamExt};
1111
use iroh_base::key::SecretKey;
1212
use iroh_bytes::{
1313
downloader::Downloader,
@@ -382,9 +382,17 @@ where
382382
)
383383
};
384384

385+
let task = Arc::new(
386+
async move {
387+
task.await?;
388+
anyhow::Ok(())
389+
}
390+
.boxed(),
391+
);
392+
385393
let node = Node {
386394
inner,
387-
task: task.map_err(Arc::new).boxed().shared(),
395+
task: (),
388396
client,
389397
};
390398

0 commit comments

Comments
 (0)