Skip to content

Commit

Permalink
Verify if messages comes from the same peer
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Nov 16, 2024
1 parent 54f7346 commit bc43cea
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
2 changes: 1 addition & 1 deletion rhio-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl NetworkMessage {
self.signature = Some(signature);
}

pub fn verify(&mut self, public_key: &PublicKey) -> bool {
pub fn verify(&self, public_key: &PublicKey) -> bool {
match &self.signature {
Some(signature) => {
let mut header = self.clone();
Expand Down
38 changes: 29 additions & 9 deletions rhio/src/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use s3::error::S3Error;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use crate::blobs::watcher::{S3Event, S3Watcher};
use crate::blobs::Blobs;
Expand Down Expand Up @@ -274,7 +274,7 @@ impl NodeActor {
///
/// These events can come from either gossip broadcast or sync sessions with other peers.
async fn on_network_event(&mut self, event: FromNetwork) -> Result<()> {
let bytes = match event {
let (bytes, delivered_from) = match event {
FromNetwork::GossipMessage {
bytes,
delivered_from,
Expand All @@ -285,7 +285,7 @@ impl NodeActor {
delivered_from = %delivered_from,
"received network message"
);
bytes
(bytes, delivered_from)
}
FromNetwork::SyncMessage {
header,
Expand All @@ -298,26 +298,46 @@ impl NodeActor {
delivered_from = %delivered_from,
"received network message"
);
header
(header, delivered_from)
}
};

let network_message = NetworkMessage::from_bytes(&bytes)?;
match network_message.payload {

// Make sure the message comes from the same peer.
if !network_message.verify(&delivered_from) {
warn!(node_id = %delivered_from, "ignored network message with invalid signature");
return Ok(());
}

match &network_message.payload {
NetworkPayload::BlobAnnouncement(hash, bucket, key, size) => {
// Make sure the bucket comes from the same peer.
if !network_message.verify(&bucket.public_key()) {
warn!(node_id = %delivered_from, "ignored blob announcement with invalid owner");
return Ok(());
}

// We're interested in a bucket from a _specific_ public key. Filter out everything
// which is not the right bucket name or not the right author.
if is_bucket_matching(&self.subscriptions, &bucket) {
if is_bucket_matching(&self.subscriptions, bucket) {
self.blobs
.download(hash, bucket.bucket_name(), key, size)
.download(*hash, bucket.bucket_name(), key.to_owned(), *size)
.await?;
}
}
NetworkPayload::NatsMessage(message) => {
// Filter out all incoming messages we're not subscribed to. This can happen
// especially when receiving messages over the gossip overlay as they are not
// necessarily for us.
let subject = message.subject.clone().try_into()?;
let subject: ScopedSubject = message.subject.clone().try_into()?;

// Make sure the NATS message comes from the same peer.
if !network_message.verify(&subject.public_key()) {
warn!(node_id = %delivered_from, "ignored NATS message with invalid owner");
return Ok(());
}

if !is_subject_matching(&self.subscriptions, &subject) {
return Ok(());
}
Expand All @@ -326,7 +346,7 @@ impl NodeActor {
.publish(
true,
message.subject.to_string(),
message.headers,
message.headers.clone(),
message.payload.to_vec(),
)
.await?;
Expand Down

0 comments on commit bc43cea

Please sign in to comment.