Skip to content

Commit f4d1e71

Browse files
authored
refactor(iroh)!: Remove server channel type parameter (#2461)
## Description This is a followup to the modularize services PR. I did not want to do it in that one since it was big enough already. ## Breaking Changes - A `#[must_use]` annotation was added to AbortingJoinHandle ``` Checked [ 0.095s] 75 checks; 74 passed, 1 failed, 0 unnecessary --- failure struct_must_use_added: struct #[must_use] added --- Description: A struct is now #[must_use]. Downstream crates that did not use its value will get a compiler lint. ref: https://doc.rust-lang.org/reference/attributes/diagnostics.html#the-must_use-attribute impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.32.0/src/lints/struct_must_use_added.ron Failed in: struct AbortingJoinHandle in /home/runner/work/iroh/iroh/iroh-net/src/util.rs:18 Summary semver requires new minor version: 0 major and 1 minor checks failed ``` ## Notes & open questions ## Change checklist - [x] Self-review. - [x] ~~Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant.~~ - [x] ~~Tests if relevant.~~ - [x] All breaking changes documented.
1 parent 74e8a6a commit f4d1e71

File tree

5 files changed

+56
-85
lines changed

5 files changed

+56
-85
lines changed

iroh-net/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod chain;
1414

1515
/// A join handle that owns the task it is running, and aborts it when dropped.
1616
#[derive(Debug, derive_more::Deref)]
17+
#[must_use = "Aborting join handles abort the task when dropped"]
1718
pub struct AbortingJoinHandle<T> {
1819
handle: tokio::task::JoinHandle<T>,
1920
}

iroh/src/client/quic.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
use anyhow::{bail, Context};
1111
use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection};
1212

13-
use super::Iroh;
13+
use super::{Iroh, RpcClient};
1414
use crate::{
1515
node::RpcStatus,
1616
rpc_protocol::{node::StatusRequest, RpcService},
@@ -20,9 +20,6 @@ use crate::{
2020
// TODO: Change to "/iroh-rpc/1"
2121
pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";
2222

23-
/// RPC client to an iroh node running in a separate process.
24-
pub type RpcClient = quic_rpc::RpcClient<RpcService, BoxedConnection<RpcService>>;
25-
2623
impl Iroh {
2724
/// Connect to an iroh node running on the same computer, but in a different process.
2825
pub async fn connect_path(root: impl AsRef<Path>) -> anyhow::Result<Self> {

iroh/src/node.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,14 @@ use iroh_gossip::net::Gossip;
5050
use iroh_net::key::SecretKey;
5151
use iroh_net::Endpoint;
5252
use iroh_net::{endpoint::DirectAddrsStream, util::SharedAbortingJoinHandle};
53-
use quic_rpc::{RpcServer, ServiceEndpoint};
53+
use quic_rpc::transport::ServerEndpoint as _;
54+
use quic_rpc::RpcServer;
5455
use tokio::task::JoinSet;
5556
use tokio_util::sync::CancellationToken;
5657
use tokio_util::task::LocalPoolHandle;
5758
use tracing::{debug, error, info, warn};
5859

59-
use crate::{
60-
client::RpcService,
61-
node::{docs::DocsEngine, protocol::ProtocolMap},
62-
};
60+
use crate::node::{docs::DocsEngine, protocol::ProtocolMap};
6361

6462
mod builder;
6563
mod docs;
@@ -73,6 +71,14 @@ pub use self::builder::{
7371
pub use self::rpc_status::RpcStatus;
7472
pub use protocol::ProtocolHandler;
7573

74+
/// The quic-rpc server endpoint for the iroh node.
75+
///
76+
/// We use a boxed endpoint here to allow having a concrete type for the server endpoint.
77+
pub type IrohServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
78+
crate::rpc_protocol::Request,
79+
crate::rpc_protocol::Response,
80+
>;
81+
7682
/// A server which implements the iroh node.
7783
///
7884
/// Clients can connect to this server and requests hashes from it.
@@ -245,8 +251,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
245251

246252
async fn run(
247253
self: Arc<Self>,
248-
external_rpc: impl ServiceEndpoint<RpcService>,
249-
internal_rpc: impl ServiceEndpoint<RpcService>,
254+
external_rpc: IrohServerEndpoint,
255+
internal_rpc: IrohServerEndpoint,
250256
protocols: Arc<ProtocolMap>,
251257
gc_policy: GcPolicy,
252258
gc_done_callback: Option<Box<dyn Fn() + Send>>,

iroh/src/node/builder.rs

Lines changed: 15 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use iroh_net::{
2424
relay::RelayMode,
2525
Endpoint,
2626
};
27-
use quic_rpc::transport::{
28-
boxed::BoxableServerEndpoint, flume::FlumeServerEndpoint, quinn::QuinnServerEndpoint,
29-
};
27+
use quic_rpc::transport::{boxed::BoxableServerEndpoint, quinn::QuinnServerEndpoint};
3028
use serde::{Deserialize, Serialize};
3129
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
3230
use tracing::{debug, error_span, trace, Instrument};
@@ -41,7 +39,7 @@ use crate::{
4139
util::{fs::load_secret_key, path::IrohPaths},
4240
};
4341

44-
use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner};
42+
use super::{docs::DocsEngine, rpc_status::RpcStatus, IrohServerEndpoint, Node, NodeInner};
4543

4644
/// Default bind address for the node.
4745
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
@@ -56,11 +54,6 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5);
5654
const MAX_CONNECTIONS: u32 = 1024;
5755
const MAX_STREAMS: u64 = 10;
5856

59-
type BoxedServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
60-
crate::rpc_protocol::Request,
61-
crate::rpc_protocol::Response,
62-
>;
63-
6457
/// Storage backend for documents.
6558
#[derive(Debug, Clone)]
6659
pub enum DocsStorage {
@@ -93,7 +86,7 @@ where
9386
storage: StorageConfig,
9487
bind_port: Option<u16>,
9588
secret_key: SecretKey,
96-
rpc_endpoint: BoxedServerEndpoint,
89+
rpc_endpoint: IrohServerEndpoint,
9790
rpc_port: Option<u16>,
9891
blobs_store: D,
9992
keylog: bool,
@@ -180,7 +173,7 @@ impl BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Re
180173
}
181174
}
182175

183-
fn mk_external_rpc() -> BoxedServerEndpoint {
176+
fn mk_external_rpc() -> IrohServerEndpoint {
184177
quic_rpc::transport::boxed::ServerEndpoint::new(DummyServerEndpoint)
185178
}
186179

@@ -308,53 +301,28 @@ where
308301
})
309302
}
310303

311-
/// Configure rpc endpoint, changing the type of the builder to the new endpoint type.
312-
pub fn rpc_endpoint(self, value: BoxedServerEndpoint, port: Option<u16>) -> Builder<D> {
313-
// we can't use ..self here because the return type is different
314-
Builder {
315-
storage: self.storage,
316-
bind_port: self.bind_port,
317-
secret_key: self.secret_key,
318-
blobs_store: self.blobs_store,
319-
keylog: self.keylog,
304+
/// Configure rpc endpoint.
305+
pub fn rpc_endpoint(self, value: IrohServerEndpoint, port: Option<u16>) -> Self {
306+
Self {
320307
rpc_endpoint: value,
321308
rpc_port: port,
322-
relay_mode: self.relay_mode,
323-
dns_resolver: self.dns_resolver,
324-
gc_policy: self.gc_policy,
325-
docs_storage: self.docs_storage,
326-
node_discovery: self.node_discovery,
327-
#[cfg(any(test, feature = "test-utils"))]
328-
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
329-
gc_done_callback: self.gc_done_callback,
309+
..self
330310
}
331311
}
332312

333313
/// Configure the default iroh rpc endpoint.
334-
pub async fn enable_rpc(self) -> Result<Builder<D>> {
314+
pub async fn enable_rpc(self) -> Result<Self> {
335315
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?;
336316
let ep = quic_rpc::transport::boxed::ServerEndpoint::new(ep);
337317
if let StorageConfig::Persistent(ref root) = self.storage {
338318
// store rpc endpoint
339319
RpcStatus::store(root, actual_rpc_port).await?;
340320
}
341321

342-
Ok(Builder {
343-
storage: self.storage,
344-
bind_port: self.bind_port,
345-
secret_key: self.secret_key,
346-
blobs_store: self.blobs_store,
347-
keylog: self.keylog,
322+
Ok(Self {
348323
rpc_endpoint: ep,
349324
rpc_port: Some(actual_rpc_port),
350-
relay_mode: self.relay_mode,
351-
dns_resolver: self.dns_resolver,
352-
gc_policy: self.gc_policy,
353-
docs_storage: self.docs_storage,
354-
node_discovery: self.node_discovery,
355-
#[cfg(any(test, feature = "test-utils"))]
356-
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
357-
gc_done_callback: self.gc_done_callback,
325+
..self
358326
})
359327
}
360328

@@ -551,7 +519,8 @@ where
551519
let gossip_dispatcher = GossipDispatcher::new(gossip.clone());
552520

553521
// Initialize the internal RPC connection.
554-
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(32);
522+
let (internal_rpc, controller) = quic_rpc::transport::flume::connection::<RpcService>(32);
523+
let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc);
555524
// box the controller. Boxing has a special case for the flume channel that avoids allocations,
556525
// so this has zero overhead.
557526
let controller = quic_rpc::transport::boxed::Connection::new(controller);
@@ -597,9 +566,8 @@ where
597566
#[derive(derive_more::Debug)]
598567
pub struct ProtocolBuilder<D> {
599568
inner: Arc<NodeInner<D>>,
600-
internal_rpc: FlumeServerEndpoint<RpcService>,
601-
#[debug("external rpc")]
602-
external_rpc: BoxedServerEndpoint,
569+
internal_rpc: IrohServerEndpoint,
570+
external_rpc: IrohServerEndpoint,
603571
protocols: ProtocolMap,
604572
#[debug("callback")]
605573
gc_done_callback: Option<Box<dyn Fn() + Send>>,

iroh/src/node/rpc.rs

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ use iroh_blobs::{
2626
use iroh_io::AsyncSliceReader;
2727
use iroh_net::relay::RelayUrl;
2828
use iroh_net::{Endpoint, NodeAddr, NodeId};
29-
use quic_rpc::{
30-
server::{RpcChannel, RpcServerError},
31-
ServiceEndpoint,
32-
};
29+
use quic_rpc::server::{RpcChannel, RpcServerError};
3330
use tokio::task::JoinSet;
3431
use tokio_util::{either::Either, task::LocalPoolHandle};
3532
use tracing::{debug, info, warn};
@@ -64,6 +61,8 @@ use crate::rpc_protocol::{
6461
Request, RpcService,
6562
};
6663

64+
use super::IrohServerEndpoint;
65+
6766
mod docs;
6867

6968
const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
@@ -118,10 +117,10 @@ impl<D: BaoStore> Handler<D> {
118117
}
119118
}
120119

121-
pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
120+
pub(crate) fn spawn_rpc_request(
122121
inner: Arc<NodeInner<D>>,
123122
join_set: &mut JoinSet<anyhow::Result<()>>,
124-
accepting: quic_rpc::server::Accepting<RpcService, E>,
123+
accepting: quic_rpc::server::Accepting<RpcService, IrohServerEndpoint>,
125124
) {
126125
let handler = Self::new(inner);
127126
join_set.spawn(async move {
@@ -133,11 +132,11 @@ impl<D: BaoStore> Handler<D> {
133132
});
134133
}
135134

136-
async fn handle_node_request<E: ServiceEndpoint<RpcService>>(
135+
async fn handle_node_request(
137136
self,
138137
msg: node::Request,
139-
chan: RpcChannel<RpcService, E>,
140-
) -> Result<(), RpcServerError<E>> {
138+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
139+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
141140
use node::Request::*;
142141
debug!("handling node request: {msg}");
143142
match msg {
@@ -157,11 +156,11 @@ impl<D: BaoStore> Handler<D> {
157156
}
158157
}
159158

160-
async fn handle_blobs_request<E: ServiceEndpoint<RpcService>>(
159+
async fn handle_blobs_request(
161160
self,
162161
msg: blobs::Request,
163-
chan: RpcChannel<RpcService, E>,
164-
) -> Result<(), RpcServerError<E>> {
162+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
163+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
165164
use blobs::Request::*;
166165
debug!("handling blob request: {msg}");
167166
match msg {
@@ -189,23 +188,23 @@ impl<D: BaoStore> Handler<D> {
189188
}
190189
}
191190

192-
async fn handle_tags_request<E: ServiceEndpoint<RpcService>>(
191+
async fn handle_tags_request(
193192
self,
194193
msg: tags::Request,
195-
chan: RpcChannel<RpcService, E>,
196-
) -> Result<(), RpcServerError<E>> {
194+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
195+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
197196
use tags::Request::*;
198197
match msg {
199198
ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await,
200199
DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await,
201200
}
202201
}
203202

204-
async fn handle_gossip_request<E: ServiceEndpoint<RpcService>>(
203+
async fn handle_gossip_request(
205204
self,
206205
msg: gossip::Request,
207-
chan: RpcChannel<RpcService, E>,
208-
) -> Result<(), RpcServerError<E>> {
206+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
207+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
209208
use gossip::Request::*;
210209
match msg {
211210
Subscribe(msg) => {
@@ -225,11 +224,11 @@ impl<D: BaoStore> Handler<D> {
225224
}
226225
}
227226

228-
async fn handle_authors_request<E: ServiceEndpoint<RpcService>>(
227+
async fn handle_authors_request(
229228
self,
230229
msg: authors::Request,
231-
chan: RpcChannel<RpcService, E>,
232-
) -> Result<(), RpcServerError<E>> {
230+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
231+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
233232
use authors::Request::*;
234233
match msg {
235234
List(msg) => {
@@ -277,11 +276,11 @@ impl<D: BaoStore> Handler<D> {
277276
}
278277
}
279278

280-
async fn handle_docs_request<E: ServiceEndpoint<RpcService>>(
279+
async fn handle_docs_request(
281280
self,
282281
msg: DocsRequest,
283-
chan: RpcChannel<RpcService, E>,
284-
) -> Result<(), RpcServerError<E>> {
282+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
283+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
285284
use DocsRequest::*;
286285
match msg {
287286
Open(msg) => {
@@ -412,11 +411,11 @@ impl<D: BaoStore> Handler<D> {
412411
}
413412
}
414413

415-
pub(crate) async fn handle_rpc_request<E: ServiceEndpoint<RpcService>>(
414+
pub(crate) async fn handle_rpc_request(
416415
self,
417416
msg: Request,
418-
chan: RpcChannel<RpcService, E>,
419-
) -> Result<(), RpcServerError<E>> {
417+
chan: RpcChannel<RpcService, IrohServerEndpoint>,
418+
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
420419
use Request::*;
421420
debug!("handling rpc request: {msg}");
422421
match msg {

0 commit comments

Comments
 (0)