Skip to content

Commit

Permalink
feat: allow custom blob providing event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Aug 2, 2024
1 parent d662bfc commit 309257b
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 18 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
}

/// Trait for sending events.
pub trait EventSender: Clone + Sync + Send + 'static {
pub trait EventSender: std::fmt::Debug + Clone + Sync + Send + 'static {
/// Send an event.
fn send(&self, event: Event) -> BoxFuture<()>;
}
Expand Down
84 changes: 83 additions & 1 deletion iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ mod tests {

use anyhow::Context as _;
use rand::RngCore;
use tokio::io::AsyncWriteExt;
use tokio::{io::AsyncWriteExt, sync::mpsc};

#[tokio::test]
async fn test_blob_create_collection() -> Result<()> {
Expand Down Expand Up @@ -1248,4 +1248,86 @@ mod tests {

Ok(())
}

#[derive(Debug, Clone)]
struct BlobEvents {
sender: mpsc::Sender<iroh_blobs::provider::Event>,
}
impl BlobEvents {
fn new(cap: usize) -> (Self, mpsc::Receiver<iroh_blobs::provider::Event>) {
let (s, r) = mpsc::channel(cap);
(Self { sender: s }, r)
}
}

impl iroh_blobs::provider::EventSender for BlobEvents {
fn send(&self, event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
let sender = self.sender.clone();
Box::pin(async move {
sender.send(event).await.ok();
})
}
}

#[tokio::test]
async fn test_blob_provide_events() -> Result<()> {
let _guard = iroh_test::logging::setup();

let (node1_events, mut node1_events_r) = BlobEvents::new(16);
let node1 = crate::node::Node::memory()
.set_blobs_events(node1_events)
.spawn()
.await?;

let (node2_events, mut node2_events_r) = BlobEvents::new(16);
let node2 = crate::node::Node::memory()
.set_blobs_events(node2_events)
.spawn()
.await?;

let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?;

// Download in node2
let node1_addr = node1.node_addr().await?;
let res = node2
.blobs()
.download(import_outcome.hash, node1_addr)
.await?
.await?;
dbg!(&res);
assert_eq!(res.local_size, 0);
assert_eq!(res.downloaded_size, 11);

node1.shutdown().await?;
node2.shutdown().await?;

let mut ev1 = Vec::new();
while let Some(ev) = node1_events_r.recv().await {
ev1.push(ev);
}
assert_eq!(ev1.len(), 3);
assert!(matches!(
ev1[0],
iroh_blobs::provider::Event::ClientConnected { .. }
));
assert!(matches!(
ev1[1],
iroh_blobs::provider::Event::GetRequestReceived { .. }
));
assert!(matches!(
ev1[2],
iroh_blobs::provider::Event::TransferCompleted { .. }
));
dbg!(&ev1);

let mut ev2 = Vec::new();
while let Some(ev) = node2_events_r.recv().await {
ev2.push(ev);
}

// Node 2 did not provide anything
assert!(ev2.is_empty());

Ok(())
}
}
2 changes: 1 addition & 1 deletion iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use self::builder::{
DEFAULT_RPC_ADDR,
};
pub use self::rpc_status::RpcStatus;
pub use protocol::ProtocolHandler;
pub use protocol::{MockEventSender, ProtocolHandler};

/// How often to save node data.
const SAVE_NODES_INTERVAL: Duration = Duration::from_secs(30);
Expand Down
65 changes: 55 additions & 10 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{docs::DocsEngine, rpc_status::RpcStatus, IrohServerEndpoint, Node, NodeInner};
use super::{
docs::DocsEngine, rpc_status::RpcStatus, IrohServerEndpoint, MockEventSender, Node, NodeInner,
};

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
Expand Down Expand Up @@ -84,9 +86,10 @@ pub enum DocsStorage {
/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated
/// using [`Node::shutdown`].
#[derive(derive_more::Debug)]
pub struct Builder<D>
pub struct Builder<D, E = MockEventSender>
where
D: Map,
E: iroh_blobs::provider::EventSender,
{
storage: StorageConfig,
bind_port: Option<u16>,
Expand All @@ -105,6 +108,7 @@ where
/// Callback to register when a gc loop is done
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
blob_events: E,
}

/// Configuration for storage.
Expand Down Expand Up @@ -206,6 +210,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: None,
blob_events: MockEventSender,
}
}
}
Expand Down Expand Up @@ -239,19 +244,52 @@ impl<D: Map> Builder<D> {
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: None,
blob_events: MockEventSender,
}
}
}

impl<D, E> Builder<D, E>
where
D: BaoStore,
E: iroh_blobs::provider::EventSender,
{
/// Set the blobs event sender.
pub fn set_blobs_events<F: iroh_blobs::provider::EventSender>(
self,
blob_events: F,
) -> Builder<D, F> {
Builder {
storage: self.storage,
bind_port: self.bind_port,
secret_key: self.secret_key,
blobs_store: self.blobs_store,
keylog: self.keylog,
rpc_endpoint: self.rpc_endpoint,
rpc_addr: self.rpc_addr,
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_storage: self.docs_storage,
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: self.gc_done_callback,
blob_events,
}
}
}

impl<D> Builder<D>
impl<D, E> Builder<D, E>
where
D: BaoStore,
E: iroh_blobs::provider::EventSender,
{
/// Persist all node data in the provided directory.
pub async fn persist(
self,
root: impl AsRef<Path>,
) -> Result<Builder<iroh_blobs::store::fs::Store>> {
) -> Result<Builder<iroh_blobs::store::fs::Store, E>> {
let root = root.as_ref();
let blob_dir = IrohPaths::BaoStoreDir.with_root(root);

Expand Down Expand Up @@ -303,6 +341,7 @@ where
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: self.gc_done_callback,
blob_events: self.blob_events,
})
}

Expand All @@ -316,12 +355,12 @@ where
}

/// Configure the default iroh rpc endpoint, on the default address.
pub async fn enable_rpc(self) -> Result<Builder<D>> {
pub async fn enable_rpc(self) -> Result<Builder<D, E>> {
self.enable_rpc_with_addr(DEFAULT_RPC_ADDR).await
}

/// Configure the default iroh rpc endpoint.
pub async fn enable_rpc_with_addr(self, mut rpc_addr: SocketAddr) -> Result<Builder<D>> {
pub async fn enable_rpc_with_addr(self, mut rpc_addr: SocketAddr) -> Result<Builder<D, E>> {
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, rpc_addr)?;
rpc_addr.set_port(actual_rpc_port);

Expand Down Expand Up @@ -592,7 +631,7 @@ where
local_pool: lp,
};

let protocol_builder = protocol_builder.register_iroh_protocols();
let protocol_builder = protocol_builder.register_iroh_protocols(self.blob_events);

Ok(protocol_builder)
}
Expand Down Expand Up @@ -715,10 +754,16 @@ impl<D: iroh_blobs::store::Store> ProtocolBuilder<D> {
}

/// Registers the core iroh protocols (blobs, gossip, docs).
fn register_iroh_protocols(mut self) -> Self {
fn register_iroh_protocols<E: iroh_blobs::provider::EventSender>(
mut self,
blob_events: E,
) -> Self {
// Register blobs.
let blobs_proto =
BlobsProtocol::new(self.blobs_db().clone(), self.local_pool_handle().clone());
let blobs_proto = BlobsProtocol::new_with_events(
self.blobs_db().clone(),
self.local_pool_handle().clone(),
blob_events,
);
self = self.accept(iroh_blobs::protocol::ALPN, Arc::new(blobs_proto));

// Register gossip.
Expand Down
19 changes: 14 additions & 5 deletions iroh/src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,32 @@ impl ProtocolMap {
}

#[derive(Debug)]
pub(crate) struct BlobsProtocol<S> {
pub(crate) struct BlobsProtocol<S, E = MockEventSender> {
rt: LocalPoolHandle,
store: S,
events: E,
}

impl<S: iroh_blobs::store::Store> BlobsProtocol<S> {
pub fn new(store: S, rt: LocalPoolHandle) -> Self {
Self { rt, store }
Self::new_with_events(store, rt, MockEventSender)
}
}
impl<S: iroh_blobs::store::Store, E: iroh_blobs::provider::EventSender> BlobsProtocol<S, E> {
pub fn new_with_events(store: S, rt: LocalPoolHandle, events: E) -> Self {
Self { rt, store, events }
}
}

impl<S: iroh_blobs::store::Store> ProtocolHandler for BlobsProtocol<S> {
impl<S: iroh_blobs::store::Store, E: iroh_blobs::provider::EventSender> ProtocolHandler
for BlobsProtocol<S, E>
{
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move {
iroh_blobs::provider::handle_connection(
conn.await?,
self.store.clone(),
MockEventSender,
self.events.clone(),
self.rt.clone(),
)
.await;
Expand All @@ -104,8 +112,9 @@ impl<S: iroh_blobs::store::Store> ProtocolHandler for BlobsProtocol<S> {
}
}

/// An event sender that consumes all events without doing anything with them.
#[derive(Debug, Clone)]
struct MockEventSender;
pub struct MockEventSender;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
Expand Down

0 comments on commit 309257b

Please sign in to comment.