Skip to content

Commit

Permalink
Sync blob announcements
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Nov 16, 2024
1 parent 787d004 commit 89a4a4f
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 15 deletions.
2 changes: 1 addition & 1 deletion rhio-blobs/src/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use iroh_io::AsyncSliceReader;
use s3::Bucket;
use serde::{Deserialize, Serialize};

use crate::paths::Paths;
use crate::s3_file::S3File;
use crate::Paths;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BaoMeta {
Expand Down
7 changes: 4 additions & 3 deletions rhio-blobs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pub mod bao_file;
pub mod paths;
pub mod s3_file;
mod bao_file;
mod paths;
mod s3_file;
mod store;

pub use iroh_blobs::Hash as BlobHash;

pub use paths::{Paths, META_SUFFIX, NO_PREFIX, OUTBOARD_SUFFIX};
pub use store::S3Store;

pub type ObjectSize = u64;
Expand Down
1 change: 1 addition & 0 deletions rhio-blobs/src/s3_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub enum MultiPartBufferError {
PartBufferDrained,
}

#[allow(dead_code)]
pub enum MultiPartBufferResult {
PartComplete(PartNumber, Offset, Vec<u8>),
PartExtended(PartNumber, Offset),
Expand Down
16 changes: 15 additions & 1 deletion rhio/src/blobs/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use iroh_blobs::store::bao_tree::io::fsm::AsyncSliceReader;
use iroh_blobs::store::{MapEntry, Store};
use p2panda_blobs::{Blobs as BlobsHandler, DownloadBlobEvent, ImportBlobEvent};
use p2panda_core::Hash;
use rhio_blobs::{BlobHash, ObjectKey, ObjectSize, S3Store};
use rhio_blobs::{BlobHash, BucketName, ObjectKey, ObjectSize, Paths, S3Store};
use rhio_core::ScopedBucket;
use s3::creds::Credentials;
use s3::error::S3Error;
Expand All @@ -32,6 +32,12 @@ pub enum ToBlobsActor {
size: ObjectSize,
reply: oneshot::Sender<Result<()>>,
},
CompleteBlobs {
reply: oneshot::Sender<Vec<(BlobHash, BucketName, Paths, ObjectSize)>>,
},
IncompleteBlobs {
reply: oneshot::Sender<Vec<(BlobHash, BucketName, Paths, ObjectSize)>>,
},
Shutdown {
reply: oneshot::Sender<()>,
},
Expand Down Expand Up @@ -120,6 +126,14 @@ impl BlobsActor {
let result = self.on_download_blob(hash, bucket, key, size).await;
reply.send(result).ok();
}
ToBlobsActor::CompleteBlobs { reply } => {
let result = self.store.complete_blobs().await;
reply.send(result).ok();
}
ToBlobsActor::IncompleteBlobs { reply } => {
let result = self.store.incomplete_blobs().await;
reply.send(result).ok();
}
ToBlobsActor::Shutdown { .. } => {
unreachable!("handled in run_inner");
}
Expand Down
24 changes: 22 additions & 2 deletions rhio/src/blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_util::{FutureExt, TryFutureExt};
use iroh_blobs::store::Store;
use p2panda_blobs::Blobs as BlobsHandler;
use p2panda_core::Hash;
use rhio_blobs::{BlobHash, ObjectKey, ObjectSize, S3Store};
use rhio_blobs::{BlobHash, BucketName, ObjectKey, ObjectSize, Paths, S3Store};
use rhio_core::ScopedBucket;
use s3::creds::Credentials;
use s3::{Bucket, Region};
Expand Down Expand Up @@ -61,6 +61,26 @@ impl Blobs {
}
}

/// Query the blob store for all complete blobs.
pub async fn complete_blobs(&self) -> Result<Vec<(BlobHash, BucketName, Paths, ObjectSize)>> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::CompleteBlobs { reply })
.await?;
let result = reply_rx.await?;
Ok(result)
}

/// Query the blob store for all incomplete blobs.
pub async fn incomplete_blobs(&self) -> Result<Vec<(BlobHash, BucketName, Paths, ObjectSize)>> {
let (reply, reply_rx) = oneshot::channel();
self.blobs_actor_tx
.send(ToBlobsActor::IncompleteBlobs { reply })
.await?;
let result = reply_rx.await?;
Ok(result)
}

/// Download a blob from the network.
///
/// Attempt to download a blob from peers on the network and place it into the nodes MinIO
Expand All @@ -87,7 +107,7 @@ impl Blobs {
Ok(())
}

/// Import an existing S3 object into our blob store, preparing it for p2p sync.
/// Import an existing, local S3 object into the blob store, preparing it for p2p sync.
pub async fn import_s3_object(
&self,
bucket_name: String,
Expand Down
5 changes: 3 additions & 2 deletions rhio/src/blobs/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use rhio_blobs::paths::{META_SUFFIX, NO_PREFIX, OUTBOARD_SUFFIX};
use rhio_blobs::{BlobHash, BucketName, ObjectKey, ObjectSize, S3Store};
use rhio_blobs::{
BlobHash, BucketName, ObjectKey, ObjectSize, S3Store, META_SUFFIX, NO_PREFIX, OUTBOARD_SUFFIX,
};
use s3::error::S3Error;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_util::task::LocalPoolHandle;
Expand Down
107 changes: 104 additions & 3 deletions rhio/src/network/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use p2panda_core::{Hash, PrivateKey};
use p2panda_net::TopicId;
use p2panda_sync::cbor::{into_cbor_sink, into_cbor_stream};
use p2panda_sync::{FromSync, SyncError, SyncProtocol};
use rhio_blobs::{BlobHash, BucketName, ObjectSize, Paths, S3Store};
use rhio_core::{NetworkMessage, ScopedSubject};
use serde::{Deserialize, Serialize};
use tokio_stream::wrappers::BroadcastStream;
Expand All @@ -24,12 +25,15 @@ pub enum Message {
Query(Query),
NatsHave(Vec<Hash>),
NatsMessages(Vec<NetworkMessage>),
BlobsHave(Vec<BlobHash>),
Blobs(Vec<NetworkMessage>),
}

#[derive(Clone, Debug)]
pub struct RhioSyncProtocol {
config: Config,
nats: Nats,
blob_store: S3Store,
private_key: PrivateKey,
}

Expand Down Expand Up @@ -71,7 +75,36 @@ impl<'a> SyncProtocol<'a, Query> for RhioSyncProtocol {
// 2. We can sync over NATS messages or S3 blob announcements.
match query {
Query::Bucket { ref bucket } => {
// @TODO
// Send over a list of blob hashes we have already to remote peer.
let blob_hashes: Vec<BlobHash> = self
.complete_blobs(&bucket.bucket_name())
.await
.iter()
.map(|(hash, _, _, _)| hash.to_owned())
.collect();
debug!(parent: &span, "we have {} completed blobs", blob_hashes.len());
sink.send(Message::BlobsHave(blob_hashes)).await?;

// Wait for other peer to send us what we're missing.
let message = stream.next().await.ok_or(SyncError::UnexpectedBehaviour(
"incoming message stream ended prematurely".into(),
))??;
let Message::Blobs(remote_blob_announcements) = message else {
return Err(SyncError::UnexpectedBehaviour(
"did not receive expected message".into(),
));
};

debug!(parent: &span,
"received {} new blob announcements from remote peer",
remote_blob_announcements.len()
);

for blob_announcement in remote_blob_announcements {
app_tx
.send(FromSync::Data(blob_announcement.to_bytes(), None))
.await?;
}
}
Query::Subject { ref subject } => {
// NATS streams are configured locally for every peer, so we need to look it up
Expand Down Expand Up @@ -186,7 +219,61 @@ impl<'a> SyncProtocol<'a, Query> for RhioSyncProtocol {
// 2. We can sync over NATS messages or S3 blob announcements.
match &query {
Query::Bucket { bucket } => {
// @TODO
// Await message from other peer on the blobs they _have_, so we can calculate what
// they're missing and send that delta to them.
let message = stream.next().await.ok_or(SyncError::UnexpectedBehaviour(
"incoming message stream ended prematurely".into(),
))??;
let Message::BlobsHave(remote_blob_hashes) = message else {
return Err(SyncError::UnexpectedBehaviour(
"did not receive expected message".into(),
));
};

let is_publishing = match &self.config.publish {
Some(publications) => publications
.s3_buckets
.iter()
.find(|bucket_name| &&bucket.bucket_name() == bucket_name),
None => None,
}
.is_some();

if !is_publishing {
// Inform the other peer politely that we need to end here as we can't provide for
// this S3 bucket by sending them an empty array back.
debug!(parent: &span,
"can't provide data, politely send empty array back",
);
sink.send(Message::Blobs(vec![])).await?;
return Ok(());
}

let blob_announcements: Vec<NetworkMessage> = self
.complete_blobs(&bucket.bucket_name())
.await
.into_iter()
.filter_map(|(hash, _, paths, size)| {
if remote_blob_hashes.contains(&hash) {
None
} else {
Some({
let mut signed_msg = NetworkMessage::new_blob_announcement(
hash,
bucket.clone(),
paths.data(),
size,
);
signed_msg.sign(&self.private_key);
signed_msg
})
}
})
.collect();

debug!(parent: &span, "send {} blob announcements", blob_announcements.len());

sink.send(Message::Blobs(blob_announcements)).await?;
}
Query::Subject { subject } => {
// Look up our config to find out if we have a NATS stream somewhere which fits the
Expand Down Expand Up @@ -276,14 +363,28 @@ impl<'a> SyncProtocol<'a, Query> for RhioSyncProtocol {
}

impl RhioSyncProtocol {
pub fn new(config: Config, nats: Nats, private_key: PrivateKey) -> Self {
pub fn new(config: Config, nats: Nats, blob_store: S3Store, private_key: PrivateKey) -> Self {
Self {
config,
nats,
blob_store,
private_key,
}
}

/// Get a list of blobs we have ourselves already in the blob store for this query.
async fn complete_blobs(
&self,
bucket_name: &BucketName,
) -> Vec<(BlobHash, BucketName, Paths, ObjectSize)> {
self.blob_store
.complete_blobs()
.await
.into_iter()
.filter(|(_, store_bucket_name, _, _)| store_bucket_name == bucket_name)
.collect()
}

/// Download all NATS messages we have for that subject and return them as a stream.
async fn nats_stream(
&self,
Expand Down
11 changes: 8 additions & 3 deletions rhio/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,20 @@ impl Node {
));
}

let sync_protocol =
RhioSyncProtocol::new(config.clone(), nats.clone(), private_key.clone());
let blob_store = store_from_config(&config).await?;

let sync_protocol = RhioSyncProtocol::new(
config.clone(),
nats.clone(),
blob_store.clone(),
private_key.clone(),
);

let builder = NetworkBuilder::from_config(network_config)
.private_key(private_key.clone())
.sync(sync_protocol);

// 3. Configure and set up S3 store and connection handlers for blob replication.
let blob_store = store_from_config(&config).await?;
let (network, blobs_handler) =
BlobsHandler::from_builder(builder, blob_store.clone()).await?;
let blobs = Blobs::new(config.s3.clone(), blob_store.clone(), blobs_handler);
Expand Down

0 comments on commit 89a4a4f

Please sign in to comment.