Skip to content

Commit

Permalink
feat: add batch message and update double echo (#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko authored Nov 30, 2023
1 parent 2820664 commit f0bc90c
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 98 deletions.
5 changes: 5 additions & 0 deletions crates/topos-api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
".topos.tce.v1.DoubleEchoRequest",
"#[derive(serde::Deserialize, serde::Serialize)]",
)
.type_attribute(
".topos.tce.v1.Batch",
"#[derive(serde::Deserialize, serde::Serialize)]",
)
.type_attribute(
".topos.uci.v1.Certificate",
"#[derive(Eq, Hash, serde::Deserialize, serde::Serialize)]",
Expand All @@ -87,6 +91,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"proto/topos/tce/v1/console.proto",
"proto/topos/tce/v1/synchronization.proto",
"proto/topos/tce/v1/double_echo.proto",
"proto/topos/tce/v1/gossipsub.proto",
"proto/topos/uci/v1/certification.proto",
"proto/topos/p2p/info.proto",
],
Expand Down
9 changes: 9 additions & 0 deletions crates/topos-api/proto/topos/tce/v1/gossipsub.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax = "proto3";

package topos.tce.v1;

import "topos/tce/v1/double_echo.proto";

message Batch {
repeated bytes messages = 1;
}
Binary file modified crates/topos-api/src/grpc/generated/topos.bin
Binary file not shown.
7 changes: 7 additions & 0 deletions crates/topos-api/src/grpc/generated/topos.tce.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,3 +1495,10 @@ pub mod double_echo_request {
Ready(super::Ready),
}
}
#[derive(serde::Deserialize, serde::Serialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Batch {
#[prost(bytes = "vec", repeated, tag = "1")]
pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
}
92 changes: 32 additions & 60 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
env,
task::Poll,
time::Duration,
Expand All @@ -10,37 +10,34 @@ use libp2p::{
identity::Keypair,
swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm},
};
use serde::{Deserialize, Serialize};
use topos_metrics::{
P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE,
P2P_MESSAGE_SERIALIZE_FAILURE_TOTAL,
};
use prost::Message as ProstMessage;
use topos_api::grpc::tce::v1::Batch;
use topos_metrics::{P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE};
use tracing::{debug, error};

use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct Batch {
pub(crate) data: Vec<Vec<u8>>,
}
const MAX_BATCH_SIZE: usize = 10;

pub struct Behaviour {
batch_size: usize,
gossipsub: gossipsub::Behaviour,
echo_queue: VecDeque<Vec<u8>>,
ready_queue: VecDeque<Vec<u8>>,
pending: HashMap<&'static str, VecDeque<Vec<u8>>>,
tick: tokio::time::Interval,
cache: HashSet<MessageId>,
}

impl Behaviour {
pub fn publish(&mut self, topic: &'static str, data: Vec<u8>) -> Result<usize, &'static str> {
pub fn publish(
&mut self,
topic: &'static str,
message: Vec<u8>,
) -> Result<usize, &'static str> {
match topic {
TOPOS_GOSSIP => {
_ = self.gossipsub.publish(IdentTopic::new(topic), data);
_ = self.gossipsub.publish(IdentTopic::new(topic), message);
}
TOPOS_ECHO => self.echo_queue.push_back(data),
TOPOS_READY => self.ready_queue.push_back(data),
TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message),
_ => return Err("Invalid topic"),
}

Expand All @@ -66,7 +63,7 @@ impl Behaviour {
pub async fn new(peer_key: Keypair) -> Self {
let batch_size = env::var("TOPOS_GOSSIP_BATCH_SIZE")
.map(|v| v.parse::<usize>())
.unwrap_or(Ok(10))
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(2 * 1024 * 1024)
Expand All @@ -88,8 +85,12 @@ impl Behaviour {
Self {
batch_size,
gossipsub,
echo_queue: VecDeque::new(),
ready_queue: VecDeque::new(),
pending: [
(TOPOS_ECHO, VecDeque::new()),
(TOPOS_READY, VecDeque::new()),
]
.into_iter()
.collect(),
tick: tokio::time::interval(Duration::from_millis(
env::var("TOPOS_GOSSIP_INTERVAL")
.map(|v| v.parse::<u64>())
Expand Down Expand Up @@ -157,49 +158,20 @@ impl NetworkBehaviour for Behaviour {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if self.tick.poll_tick(cx).is_ready() {
// Publish batch
if !self.echo_queue.is_empty() {
let mut echos = Batch { data: Vec::new() };
for _ in 0..self.batch_size {
if let Some(data) = self.echo_queue.pop_front() {
echos.data.push(data);
} else {
break;
}
}

debug!("Publishing {} echos", echos.data.len());
if let Ok(msg) = bincode::serialize::<Batch>(&echos) {
P2P_GOSSIP_BATCH_SIZE.observe(echos.data.len() as f64);
for (topic, queue) in self.pending.iter_mut() {
if !queue.is_empty() {
let num_of_message = queue.len().min(self.batch_size);
let batch = Batch {
messages: queue.drain(0..num_of_message).collect(),
};

match self.gossipsub.publish(IdentTopic::new(TOPOS_ECHO), msg) {
Ok(message_id) => debug!("Published echo {}", message_id),
Err(error) => error!("Failed to publish echo: {}", error),
debug!("Publishing {} {}", batch.messages.len(), topic);
let msg = batch.encode_to_vec();
P2P_GOSSIP_BATCH_SIZE.observe(batch.messages.len() as f64);
match self.gossipsub.publish(IdentTopic::new(*topic), msg) {
Ok(message_id) => debug!("Published {} {}", topic, message_id),
Err(error) => error!("Failed to publish {}: {}", topic, error),
}
} else {
P2P_MESSAGE_SERIALIZE_FAILURE_TOTAL
.with_label_values(&["echo"])
.inc();
}
}

if !self.ready_queue.is_empty() {
let mut readies = Batch { data: Vec::new() };
for _ in 0..self.batch_size {
if let Some(data) = self.ready_queue.pop_front() {
readies.data.push(data);
} else {
break;
}
}

debug!("Publishing {} readies", readies.data.len());
if let Ok(msg) = bincode::serialize::<Batch>(&readies) {
P2P_GOSSIP_BATCH_SIZE.observe(readies.data.len() as f64);
_ = self.gossipsub.publish(IdentTopic::new(TOPOS_READY), msg);
} else {
P2P_MESSAGE_SERIALIZE_FAILURE_TOTAL
.with_label_values(&["ready"])
.inc();
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/topos-p2p/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub enum Command {
to: PeerId,
sender: oneshot::Sender<Result<Vec<Multiaddr>, CommandExecutionError>>,
},

Gossip {
topic: &'static str,
data: Vec<u8>,
Expand Down
17 changes: 9 additions & 8 deletions crates/topos-p2p/src/runtime/handle_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,16 @@ impl Runtime {
}
}

Command::Gossip { topic, data } => {
match self.swarm.behaviour_mut().gossipsub.publish(topic, data) {
Ok(message_id) => {
debug!("Published message to {topic}");
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
}
Err(err) => error!("Failed to publish message to {topic}: {err}"),
Command::Gossip {
topic,
data: message,
} => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) {
Ok(message_id) => {
debug!("Published message to {topic}");
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
}
}
Err(err) => error!("Failed to publish message to {topic}: {err}"),
},
}
}
}
44 changes: 15 additions & 29 deletions crates/topos-p2p/src/runtime/handle_event/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use topos_metrics::{
};
use tracing::{debug, error};

use crate::{
behaviour::gossip::Batch, constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO,
TOPOS_GOSSIP, TOPOS_READY,
};
use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};
use prost::Message;
use topos_api::grpc::tce::v1::Batch;

use super::EventHandler;

Expand Down Expand Up @@ -41,41 +40,28 @@ impl EventHandler<GossipEvent> for Runtime {
error!("Failed to send gossip event to runtime: {:?}", e);
}
}
TOPOS_ECHO => {
P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc();

if let Ok(msg) = bincode::deserialize::<Batch>(&message) {
for data in msg.data {
if let Err(e) = self
.event_sender
.send(Event::Gossip { from: source, data })
.await
{
error!("Failed to send gossip event to runtime: {:?}", e);
}
}
TOPOS_ECHO | TOPOS_READY => {
if topic == TOPOS_ECHO {
P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc();
} else {
P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL
.with_label_values(&["echo"])
.inc();
P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc();
}
}
TOPOS_READY => {
P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc();

if let Ok(msg) = bincode::deserialize::<Batch>(&message) {
for data in msg.data {
if let Ok(Batch { messages }) = Batch::decode(&message[..]) {
for message in messages {
if let Err(e) = self
.event_sender
.send(Event::Gossip { from: source, data })
.send(Event::Gossip {
from: source,
data: message,
})
.await
{
error!("Failed to send gossip event to runtime: {:?}", e);
error!("Failed to send gossip {} event to runtime: {:?}", topic, e);
}
}
} else {
P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL
.with_label_values(&["ready"])
.with_label_values(&[topic])
.inc();
}
}
Expand Down

0 comments on commit f0bc90c

Please sign in to comment.