Skip to content

Commit

Permalink
[102] turning Blobs into a facade to BlobActor and S3Watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ktatarnikov committed Jan 6, 2025
1 parent e84f052 commit ed7749a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 63 deletions.
68 changes: 20 additions & 48 deletions rhio/src/blobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,85 +1,57 @@
mod actor;
mod proxy;
pub mod watcher;

use std::collections::HashMap;
use std::time::Duration;

use anyhow::{anyhow, Context, Result};
use futures_util::future::{MapErr, Shared};
use futures_util::{FutureExt, TryFutureExt};
use p2panda_blobs::{Blobs as BlobsHandler, Config as BlobsConfig};
use proxy::BlobsActorProxy;
use rhio_blobs::{NotImportedObject, S3Store, SignedBlobInfo};
use s3::{Bucket, Region};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinError;
use tokio_util::task::{AbortOnDropHandle, LocalPoolHandle};
use tracing::error;
use tokio::sync::mpsc;
use watcher::S3Event;

use crate::blobs::actor::{BlobsActor, ToBlobsActor};
use crate::config::Config;
use crate::topic::Query;
use crate::JoinErrToStr;

use crate::blobs::watcher::S3Watcher;
use s3::error::S3Error;

#[derive(Debug)]
pub struct Blobs {
blobs_actor_tx: mpsc::Sender<ToBlobsActor>,
blobs: BlobsActorProxy,
#[allow(dead_code)]
actor_handle: Shared<MapErr<AbortOnDropHandle<()>, JoinErrToStr>>,
watcher: S3Watcher,
}

impl Blobs {
pub fn new(blob_store: S3Store, blobs_handler: BlobsHandler<Query, S3Store>) -> Self {
let (blobs_actor_tx, blobs_actor_rx) = mpsc::channel(512);
let blobs_actor = BlobsActor::new(blob_store.clone(), blobs_handler, blobs_actor_rx);

let pool = LocalPoolHandle::new(1);
let actor_handle = pool.spawn_pinned(|| async move {
if let Err(err) = blobs_actor.run().await {
error!("blobs actor failed: {err:?}");
}
});

let actor_drop_handle = AbortOnDropHandle::new(actor_handle)
.map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr)
.shared();

Self {
blobs_actor_tx,
actor_handle: actor_drop_handle,
}
pub fn new(
blob_store: S3Store,
blobs_handler: BlobsHandler<Query, S3Store>,
watcher_tx: mpsc::Sender<Result<S3Event, S3Error>>,
) -> Blobs {
let blobs = BlobsActorProxy::new(blob_store.clone(), blobs_handler);
let watcher = S3Watcher::new(blob_store, watcher_tx);
Blobs { blobs, watcher }
}

/// Download a blob from the network.
///
/// Attempt to download a blob from peers on the network and place it into the nodes MinIO
/// bucket.
pub async fn download(&self, blob: SignedBlobInfo) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::DownloadBlob { blob, reply })
.await?;
let result = reply_rx.await?;
result?;
Ok(())
self.blobs.download(blob).await
}

/// Import an existing, local S3 object into the blob store, preparing it for p2p sync.
pub async fn import_s3_object(&self, object: NotImportedObject) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::ImportS3Object { object, reply })
.await?;
reply_rx.await??;
Ok(())
self.blobs.import_s3_object(object).await
}

pub async fn shutdown(&self) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::Shutdown { reply })
.await?;
reply_rx.await?;
Ok(())
self.blobs.shutdown().await
}
}

Expand Down
76 changes: 76 additions & 0 deletions rhio/src/blobs/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use anyhow::Result;
use futures_util::future::{MapErr, Shared};
use futures_util::{FutureExt, TryFutureExt};
use p2panda_blobs::Blobs as BlobsHandler;
use rhio_blobs::{NotImportedObject, S3Store, SignedBlobInfo};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinError;
use tokio_util::task::{AbortOnDropHandle, LocalPoolHandle};
use tracing::error;

use crate::blobs::actor::{BlobsActor, ToBlobsActor};
use crate::topic::Query;
use crate::JoinErrToStr;

#[derive(Debug)]
pub struct BlobsActorProxy {
blobs_actor_tx: mpsc::Sender<ToBlobsActor>,
#[allow(dead_code)]
actor_handle: Shared<MapErr<AbortOnDropHandle<()>, JoinErrToStr>>,
}

impl BlobsActorProxy {
pub fn new(blob_store: S3Store, blobs_handler: BlobsHandler<Query, S3Store>) -> Self {
let (blobs_actor_tx, blobs_actor_rx) = mpsc::channel(512);
let blobs_actor = BlobsActor::new(blob_store.clone(), blobs_handler, blobs_actor_rx);

let pool = LocalPoolHandle::new(1);
let actor_handle = pool.spawn_pinned(|| async move {
if let Err(err) = blobs_actor.run().await {
error!("blobs actor failed: {err:?}");
}
});

let actor_drop_handle = AbortOnDropHandle::new(actor_handle)
.map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr)
.shared();

Self {
blobs_actor_tx,
actor_handle: actor_drop_handle,
}
}

/// Download a blob from the network.
///
/// Attempt to download a blob from peers on the network and place it into the nodes MinIO
/// bucket.
pub async fn download(&self, blob: SignedBlobInfo) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::DownloadBlob { blob, reply })
.await?;
let result = reply_rx.await?;
result?;
Ok(())
}

/// Import an existing, local S3 object into the blob store, preparing it for p2p sync.
pub async fn import_s3_object(&self, object: NotImportedObject) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::ImportS3Object { object, reply })
.await?;
reply_rx.await??;
Ok(())
}

pub async fn shutdown(&self) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::Shutdown { reply })
.await?;
reply_rx.await?;
Ok(())
}
}
6 changes: 1 addition & 5 deletions rhio/src/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{debug, error, trace, warn};

use crate::blobs::watcher::{S3Event, S3Watcher};
use crate::blobs::watcher::S3Event;
use crate::blobs::Blobs;
use crate::nats::{JetStreamEvent, Nats};
use crate::network::Panda;
Expand Down Expand Up @@ -47,8 +47,6 @@ pub struct NodeActor {
nats: Nats,
panda: Panda,
blobs: Option<Blobs>,
#[allow(dead_code)]
watcher: S3Watcher,
}

impl NodeActor {
Expand All @@ -59,7 +57,6 @@ impl NodeActor {
nats: Nats,
panda: Panda,
blobs: Option<Blobs>,
watcher: S3Watcher,
inbox: mpsc::Receiver<ToNodeActor>,
s3_watcher_rx: mpsc::Receiver<Result<S3Event, S3Error>>,
) -> Self {
Expand All @@ -74,7 +71,6 @@ impl NodeActor {
nats,
panda,
blobs,
watcher,
}
}

Expand Down
18 changes: 8 additions & 10 deletions rhio/src/node/rhio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio::task::JoinError;
use tokio_util::task::AbortOnDropHandle;
use tracing::error;

use crate::blobs::watcher::S3Watcher;
use crate::blobs::{blobs_config, store_from_config, Blobs};
use crate::config::Config;
use crate::nats::Nats;
Expand Down Expand Up @@ -57,21 +56,21 @@ impl Node {
.sync(sync_config);

// 3. Configure and set up blob store and connection handlers for blob replication.
let (network, blobs) = if config.s3.is_some() {
let (network, blobs, watcher_rx) = if config.s3.is_some() {
let (network, blobs_handler) =
BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config())
.await?;
let blobs = Blobs::new(blob_store.clone(), blobs_handler);
(network, Some(blobs))
// 3.1. Start a service which watches the S3 buckets for changes.
let (watcher_tx, watcher_rx) = mpsc::channel(512);

let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx);
(network, Some(blobs), watcher_rx)
} else {
let network = builder.build().await?;
(network, None)
let (_dummy_watcher_tx, dummy_watcher_rx) = mpsc::channel(512);
(network, None, dummy_watcher_rx)
};

// 4. Start a service which watches the S3 buckets for changes.
let (watcher_tx, watcher_rx) = mpsc::channel(512);
let watcher = S3Watcher::new(blob_store, watcher_tx);

// 5. Move all networking logic into dedicated "p2panda" actor, dealing with p2p
// networking, data replication and gossipping.
let node_id = network.node_id();
Expand All @@ -90,7 +89,6 @@ impl Node {
nats,
panda,
blobs,
watcher,
node_actor_rx,
watcher_rx,
);
Expand Down

0 comments on commit ed7749a

Please sign in to comment.