diff --git a/rhio/src/blobs/mod.rs b/rhio/src/blobs/mod.rs index bdab4ae2..0a7c07b1 100644 --- a/rhio/src/blobs/mod.rs +++ b/rhio/src/blobs/mod.rs @@ -1,52 +1,40 @@ mod actor; +mod proxy; pub mod watcher; use std::collections::HashMap; use std::time::Duration; use anyhow::{anyhow, Context, Result}; -use futures_util::future::{MapErr, Shared}; -use futures_util::{FutureExt, TryFutureExt}; use p2panda_blobs::{Blobs as BlobsHandler, Config as BlobsConfig}; +use proxy::BlobsActorProxy; use rhio_blobs::{NotImportedObject, S3Store, SignedBlobInfo}; use s3::{Bucket, Region}; -use tokio::sync::{mpsc, oneshot}; -use tokio::task::JoinError; -use tokio_util::task::{AbortOnDropHandle, LocalPoolHandle}; -use tracing::error; +use tokio::sync::mpsc; +use watcher::S3Event; -use crate::blobs::actor::{BlobsActor, ToBlobsActor}; use crate::config::Config; use crate::topic::Query; -use crate::JoinErrToStr; + +use crate::blobs::watcher::S3Watcher; +use s3::error::S3Error; #[derive(Debug)] pub struct Blobs { - blobs_actor_tx: mpsc::Sender, + blobs: BlobsActorProxy, #[allow(dead_code)] - actor_handle: Shared, JoinErrToStr>>, + watcher: S3Watcher, } impl Blobs { - pub fn new(blob_store: S3Store, blobs_handler: BlobsHandler) -> Self { - let (blobs_actor_tx, blobs_actor_rx) = mpsc::channel(512); - let blobs_actor = BlobsActor::new(blob_store.clone(), blobs_handler, blobs_actor_rx); - - let pool = LocalPoolHandle::new(1); - let actor_handle = pool.spawn_pinned(|| async move { - if let Err(err) = blobs_actor.run().await { - error!("blobs actor failed: {err:?}"); - } - }); - - let actor_drop_handle = AbortOnDropHandle::new(actor_handle) - .map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr) - .shared(); - - Self { - blobs_actor_tx, - actor_handle: actor_drop_handle, - } + pub fn new( + blob_store: S3Store, + blobs_handler: BlobsHandler, + watcher_tx: mpsc::Sender>, + ) -> Blobs { + let blobs = BlobsActorProxy::new(blob_store.clone(), blobs_handler); + let watcher = S3Watcher::new(blob_store, watcher_tx); + Blobs { blobs, watcher } } /// Download a blob from the network. @@ -54,32 +42,16 @@ impl Blobs { /// Attempt to download a blob from peers on the network and place it into the nodes MinIO /// bucket. pub async fn download(&self, blob: SignedBlobInfo) -> Result<()> { - let (reply, reply_rx) = oneshot::channel(); - self.blobs_actor_tx - .send(ToBlobsActor::DownloadBlob { blob, reply }) - .await?; - let result = reply_rx.await?; - result?; - Ok(()) + self.blobs.download(blob).await } /// Import an existing, local S3 object into the blob store, preparing it for p2p sync. pub async fn import_s3_object(&self, object: NotImportedObject) -> Result<()> { - let (reply, reply_rx) = oneshot::channel(); - self.blobs_actor_tx - .send(ToBlobsActor::ImportS3Object { object, reply }) - .await?; - reply_rx.await??; - Ok(()) + self.blobs.import_s3_object(object).await } pub async fn shutdown(&self) -> Result<()> { - let (reply, reply_rx) = oneshot::channel(); - self.blobs_actor_tx - .send(ToBlobsActor::Shutdown { reply }) - .await?; - reply_rx.await?; - Ok(()) + self.blobs.shutdown().await } } diff --git a/rhio/src/blobs/proxy.rs b/rhio/src/blobs/proxy.rs new file mode 100644 index 00000000..6dfd4e13 --- /dev/null +++ b/rhio/src/blobs/proxy.rs @@ -0,0 +1,76 @@ +use anyhow::Result; +use futures_util::future::{MapErr, Shared}; +use futures_util::{FutureExt, TryFutureExt}; +use p2panda_blobs::Blobs as BlobsHandler; +use rhio_blobs::{NotImportedObject, S3Store, SignedBlobInfo}; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinError; +use tokio_util::task::{AbortOnDropHandle, LocalPoolHandle}; +use tracing::error; + +use crate::blobs::actor::{BlobsActor, ToBlobsActor}; +use crate::topic::Query; +use crate::JoinErrToStr; + +#[derive(Debug)] +pub struct BlobsActorProxy { + blobs_actor_tx: mpsc::Sender, + #[allow(dead_code)] + actor_handle: Shared, JoinErrToStr>>, +} + +impl BlobsActorProxy { + pub fn new(blob_store: S3Store, blobs_handler: BlobsHandler) -> Self { + let (blobs_actor_tx, blobs_actor_rx) = mpsc::channel(512); + let blobs_actor = BlobsActor::new(blob_store.clone(), blobs_handler, blobs_actor_rx); + + let pool = LocalPoolHandle::new(1); + let actor_handle = pool.spawn_pinned(|| async move { + if let Err(err) = blobs_actor.run().await { + error!("blobs actor failed: {err:?}"); + } + }); + + let actor_drop_handle = AbortOnDropHandle::new(actor_handle) + .map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr) + .shared(); + + Self { + blobs_actor_tx, + actor_handle: actor_drop_handle, + } + } + + /// Download a blob from the network. + /// + /// Attempt to download a blob from peers on the network and place it into the nodes MinIO + /// bucket. + pub async fn download(&self, blob: SignedBlobInfo) -> Result<()> { + let (reply, reply_rx) = oneshot::channel(); + self.blobs_actor_tx + .send(ToBlobsActor::DownloadBlob { blob, reply }) + .await?; + let result = reply_rx.await?; + result?; + Ok(()) + } + + /// Import an existing, local S3 object into the blob store, preparing it for p2p sync. + pub async fn import_s3_object(&self, object: NotImportedObject) -> Result<()> { + let (reply, reply_rx) = oneshot::channel(); + self.blobs_actor_tx + .send(ToBlobsActor::ImportS3Object { object, reply }) + .await?; + reply_rx.await??; + Ok(()) + } + + pub async fn shutdown(&self) -> Result<()> { + let (reply, reply_rx) = oneshot::channel(); + self.blobs_actor_tx + .send(ToBlobsActor::Shutdown { reply }) + .await?; + reply_rx.await?; + Ok(()) + } +} diff --git a/rhio/src/node/actor.rs b/rhio/src/node/actor.rs index 22a4af13..bbc3aff4 100644 --- a/rhio/src/node/actor.rs +++ b/rhio/src/node/actor.rs @@ -14,7 +14,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::{debug, error, trace, warn}; -use crate::blobs::watcher::{S3Event, S3Watcher}; +use crate::blobs::watcher::S3Event; use crate::blobs::Blobs; use crate::nats::{JetStreamEvent, Nats}; use crate::network::Panda; @@ -47,8 +47,6 @@ pub struct NodeActor { nats: Nats, panda: Panda, blobs: Option, - #[allow(dead_code)] - watcher: S3Watcher, } impl NodeActor { @@ -59,7 +57,6 @@ impl NodeActor { nats: Nats, panda: Panda, blobs: Option, - watcher: S3Watcher, inbox: mpsc::Receiver, s3_watcher_rx: mpsc::Receiver>, ) -> Self { @@ -74,7 +71,6 @@ impl NodeActor { nats, panda, blobs, - watcher, } } diff --git a/rhio/src/node/rhio.rs b/rhio/src/node/rhio.rs index 40e579e1..c561102c 100644 --- a/rhio/src/node/rhio.rs +++ b/rhio/src/node/rhio.rs @@ -13,7 +13,6 @@ use tokio::task::JoinError; use tokio_util::task::AbortOnDropHandle; use tracing::error; -use crate::blobs::watcher::S3Watcher; use crate::blobs::{blobs_config, store_from_config, Blobs}; use crate::config::Config; use crate::nats::Nats; @@ -57,21 +56,21 @@ impl Node { .sync(sync_config); // 3. Configure and set up blob store and connection handlers for blob replication. - let (network, blobs) = if config.s3.is_some() { + let (network, blobs, watcher_rx) = if config.s3.is_some() { let (network, blobs_handler) = BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config()) .await?; - let blobs = Blobs::new(blob_store.clone(), blobs_handler); - (network, Some(blobs)) + // 3.1. Start a service which watches the S3 buckets for changes. + let (watcher_tx, watcher_rx) = mpsc::channel(512); + + let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx); + (network, Some(blobs), watcher_rx) } else { let network = builder.build().await?; - (network, None) + let (_dummy_watcher_tx, dummy_watcher_rx) = mpsc::channel(512); + (network, None, dummy_watcher_rx) }; - // 4. Start a service which watches the S3 buckets for changes. - let (watcher_tx, watcher_rx) = mpsc::channel(512); - let watcher = S3Watcher::new(blob_store, watcher_tx); - // 5. Move all networking logic into dedicated "p2panda" actor, dealing with p2p // networking, data replication and gossipping. let node_id = network.node_id(); @@ -90,7 +89,6 @@ impl Node { nats, panda, blobs, - watcher, node_actor_rx, watcher_rx, );