diff --git a/crates/topos-api/build.rs b/crates/topos-api/build.rs index 208f1640f..5b2d70926 100644 --- a/crates/topos-api/build.rs +++ b/crates/topos-api/build.rs @@ -73,6 +73,10 @@ fn main() -> Result<(), Box> { ".topos.tce.v1.DoubleEchoRequest", "#[derive(serde::Deserialize, serde::Serialize)]", ) + .type_attribute( + ".topos.tce.v1.Batch", + "#[derive(serde::Deserialize, serde::Serialize)]", + ) .type_attribute( ".topos.uci.v1.Certificate", "#[derive(Eq, Hash, serde::Deserialize, serde::Serialize)]", @@ -87,6 +91,7 @@ fn main() -> Result<(), Box> { "proto/topos/tce/v1/console.proto", "proto/topos/tce/v1/synchronization.proto", "proto/topos/tce/v1/double_echo.proto", + "proto/topos/tce/v1/gossipsub.proto", "proto/topos/uci/v1/certification.proto", "proto/topos/p2p/info.proto", ], diff --git a/crates/topos-api/proto/topos/tce/v1/gossipsub.proto b/crates/topos-api/proto/topos/tce/v1/gossipsub.proto new file mode 100644 index 000000000..76059823c --- /dev/null +++ b/crates/topos-api/proto/topos/tce/v1/gossipsub.proto @@ -0,0 +1,9 @@ +syntax = "proto3"; + +package topos.tce.v1; + +import "topos/tce/v1/double_echo.proto"; + +message Batch { + repeated bytes messages = 1; +} diff --git a/crates/topos-api/src/grpc/generated/topos.bin b/crates/topos-api/src/grpc/generated/topos.bin index c5a791666..635dba028 100644 Binary files a/crates/topos-api/src/grpc/generated/topos.bin and b/crates/topos-api/src/grpc/generated/topos.bin differ diff --git a/crates/topos-api/src/grpc/generated/topos.tce.v1.rs b/crates/topos-api/src/grpc/generated/topos.tce.v1.rs index 448e4323e..7f2ea84af 100644 --- a/crates/topos-api/src/grpc/generated/topos.tce.v1.rs +++ b/crates/topos-api/src/grpc/generated/topos.tce.v1.rs @@ -1495,3 +1495,10 @@ pub mod double_echo_request { Ready(super::Ready), } } +#[derive(serde::Deserialize, serde::Serialize)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Batch { + #[prost(bytes = "vec", repeated, tag = "1")] + pub messages: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index 05dd5ef61..04dacb879 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, env, task::Poll, time::Duration, @@ -10,37 +10,34 @@ use libp2p::{ identity::Keypair, swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm}, }; -use serde::{Deserialize, Serialize}; -use topos_metrics::{ - P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE, - P2P_MESSAGE_SERIALIZE_FAILURE_TOTAL, -}; +use prost::Message as ProstMessage; +use topos_api::grpc::tce::v1::Batch; +use topos_metrics::{P2P_DUPLICATE_MESSAGE_ID_RECEIVED_TOTAL, P2P_GOSSIP_BATCH_SIZE}; use tracing::{debug, error}; use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct Batch { - pub(crate) data: Vec>, -} +const MAX_BATCH_SIZE: usize = 10; pub struct Behaviour { batch_size: usize, gossipsub: gossipsub::Behaviour, - echo_queue: VecDeque>, - ready_queue: VecDeque>, + pending: HashMap<&'static str, VecDeque>>, tick: tokio::time::Interval, cache: HashSet, } impl Behaviour { - pub fn publish(&mut self, topic: &'static str, data: Vec) -> Result { + pub fn publish( + &mut self, + topic: &'static str, + message: Vec, + ) -> Result { match topic { TOPOS_GOSSIP => { - _ = self.gossipsub.publish(IdentTopic::new(topic), data); + _ = self.gossipsub.publish(IdentTopic::new(topic), message); } - TOPOS_ECHO => self.echo_queue.push_back(data), - TOPOS_READY => self.ready_queue.push_back(data), + TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message), _ => return Err("Invalid topic"), } @@ -66,7 +63,7 @@ impl Behaviour { pub async fn new(peer_key: Keypair) -> Self { let batch_size = env::var("TOPOS_GOSSIP_BATCH_SIZE") .map(|v| v.parse::()) - .unwrap_or(Ok(10)) + .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() .max_transmit_size(2 * 1024 * 1024) @@ -88,8 +85,12 @@ impl Behaviour { Self { batch_size, gossipsub, - echo_queue: VecDeque::new(), - ready_queue: VecDeque::new(), + pending: [ + (TOPOS_ECHO, VecDeque::new()), + (TOPOS_READY, VecDeque::new()), + ] + .into_iter() + .collect(), tick: tokio::time::interval(Duration::from_millis( env::var("TOPOS_GOSSIP_INTERVAL") .map(|v| v.parse::()) @@ -157,49 +158,20 @@ impl NetworkBehaviour for Behaviour { ) -> Poll>> { if self.tick.poll_tick(cx).is_ready() { // Publish batch - if !self.echo_queue.is_empty() { - let mut echos = Batch { data: Vec::new() }; - for _ in 0..self.batch_size { - if let Some(data) = self.echo_queue.pop_front() { - echos.data.push(data); - } else { - break; - } - } - - debug!("Publishing {} echos", echos.data.len()); - if let Ok(msg) = bincode::serialize::(&echos) { - P2P_GOSSIP_BATCH_SIZE.observe(echos.data.len() as f64); + for (topic, queue) in self.pending.iter_mut() { + if !queue.is_empty() { + let num_of_message = queue.len().min(self.batch_size); + let batch = Batch { + messages: queue.drain(0..num_of_message).collect(), + }; - match self.gossipsub.publish(IdentTopic::new(TOPOS_ECHO), msg) { - Ok(message_id) => debug!("Published echo {}", message_id), - Err(error) => error!("Failed to publish echo: {}", error), + debug!("Publishing {} {}", batch.messages.len(), topic); + let msg = batch.encode_to_vec(); + P2P_GOSSIP_BATCH_SIZE.observe(batch.messages.len() as f64); + match self.gossipsub.publish(IdentTopic::new(*topic), msg) { + Ok(message_id) => debug!("Published {} {}", topic, message_id), + Err(error) => error!("Failed to publish {}: {}", topic, error), } - } else { - P2P_MESSAGE_SERIALIZE_FAILURE_TOTAL - .with_label_values(&["echo"]) - .inc(); - } - } - - if !self.ready_queue.is_empty() { - let mut readies = Batch { data: Vec::new() }; - for _ in 0..self.batch_size { - if let Some(data) = self.ready_queue.pop_front() { - readies.data.push(data); - } else { - break; - } - } - - debug!("Publishing {} readies", readies.data.len()); - if let Ok(msg) = bincode::serialize::(&readies) { - P2P_GOSSIP_BATCH_SIZE.observe(readies.data.len() as f64); - _ = self.gossipsub.publish(IdentTopic::new(TOPOS_READY), msg); - } else { - P2P_MESSAGE_SERIALIZE_FAILURE_TOTAL - .with_label_values(&["ready"]) - .inc(); } } } diff --git a/crates/topos-p2p/src/command.rs b/crates/topos-p2p/src/command.rs index ea7833686..4b046d159 100644 --- a/crates/topos-p2p/src/command.rs +++ b/crates/topos-p2p/src/command.rs @@ -31,7 +31,6 @@ pub enum Command { to: PeerId, sender: oneshot::Sender, CommandExecutionError>>, }, - Gossip { topic: &'static str, data: Vec, diff --git a/crates/topos-p2p/src/runtime/handle_command.rs b/crates/topos-p2p/src/runtime/handle_command.rs index b61fae0f7..efda95dff 100644 --- a/crates/topos-p2p/src/runtime/handle_command.rs +++ b/crates/topos-p2p/src/runtime/handle_command.rs @@ -90,15 +90,16 @@ impl Runtime { } } - Command::Gossip { topic, data } => { - match self.swarm.behaviour_mut().gossipsub.publish(topic, data) { - Ok(message_id) => { - debug!("Published message to {topic}"); - P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc(); - } - Err(err) => error!("Failed to publish message to {topic}: {err}"), + Command::Gossip { + topic, + data: message, + } => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) { + Ok(message_id) => { + debug!("Published message to {topic}"); + P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc(); } - } + Err(err) => error!("Failed to publish message to {topic}: {err}"), + }, } } } diff --git a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs index 9ca052f22..ee71b1855 100644 --- a/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs +++ b/crates/topos-p2p/src/runtime/handle_event/gossipsub.rs @@ -5,10 +5,9 @@ use topos_metrics::{ }; use tracing::{debug, error}; -use crate::{ - behaviour::gossip::Batch, constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, - TOPOS_GOSSIP, TOPOS_READY, -}; +use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY}; +use prost::Message; +use topos_api::grpc::tce::v1::Batch; use super::EventHandler; @@ -41,41 +40,28 @@ impl EventHandler for Runtime { error!("Failed to send gossip event to runtime: {:?}", e); } } - TOPOS_ECHO => { - P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(); - - if let Ok(msg) = bincode::deserialize::(&message) { - for data in msg.data { - if let Err(e) = self - .event_sender - .send(Event::Gossip { from: source, data }) - .await - { - error!("Failed to send gossip event to runtime: {:?}", e); - } - } + TOPOS_ECHO | TOPOS_READY => { + if topic == TOPOS_ECHO { + P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(); } else { - P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL - .with_label_values(&["echo"]) - .inc(); + P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(); } - } - TOPOS_READY => { - P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(); - - if let Ok(msg) = bincode::deserialize::(&message) { - for data in msg.data { + if let Ok(Batch { messages }) = Batch::decode(&message[..]) { + for message in messages { if let Err(e) = self .event_sender - .send(Event::Gossip { from: source, data }) + .send(Event::Gossip { + from: source, + data: message, + }) .await { - error!("Failed to send gossip event to runtime: {:?}", e); + error!("Failed to send gossip {} event to runtime: {:?}", topic, e); } } } else { P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL - .with_label_values(&["ready"]) + .with_label_values(&[topic]) .inc(); } }