From aecfe063e90e8d148bec3efd64ca03cab47bace3 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Fri, 10 Jan 2025 10:47:12 -0800 Subject: [PATCH] Make message delivery in run_sync() ordered by default --- manul/src/dev/run_sync.rs | 72 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/manul/src/dev/run_sync.rs b/manul/src/dev/run_sync.rs index 35ab393..3650de7 100644 --- a/manul/src/dev/run_sync.rs +++ b/manul/src/dev/run_sync.rs @@ -27,6 +27,73 @@ struct RoundMessage { message: Message, } +enum Messages { + /// For each node, if message A was sent before message B, it will be popped before message B as well. + Ordered(BTreeMap>>), + /// The messages will be popped completely at random. + Unordered(Vec>), +} + +impl Messages +where + SP: SessionParameters, +{ + fn new(ordered: bool) -> Self { + if ordered { + Self::Ordered(BTreeMap::new()) + } else { + Self::Unordered(Vec::new()) + } + } + + /// Adds a message to the queue. + fn push(&mut self, message: RoundMessage) { + match self { + Self::Ordered(m) => m.entry(message.from.clone()).or_insert(Vec::new()).push(message), + Self::Unordered(v) => v.push(message), + } + } + + /// Adds a a vector of messages to the queue. + fn extend(&mut self, messages: Vec>) { + for message in messages { + self.push(message) + } + } + + /// Removes a random message from the queue and returns it. + fn pop(&mut self, rng: &mut impl CryptoRngCore) -> RoundMessage { + match self { + Self::Ordered(m) => { + let senders = m.keys().cloned().collect::>(); + let sender_idx = rng.gen_range(0..senders.len()); + let sender = &senders.get(sender_idx).expect("the entry exists"); + + let (message, is_empty) = { + let messages = m.get_mut(sender).expect("the entry exists"); + let message = messages.remove(0); + (message, messages.is_empty()) + }; + if is_empty { + m.remove(sender); + } + message + } + Self::Unordered(v) => { + let message_idx = rng.gen_range(0..v.len()); + v.swap_remove(message_idx) + } + } + } + + fn is_empty(&self) -> bool { + match self { + Self::Ordered(m) => m.is_empty(), + Self::Unordered(v) => v.is_empty(), + } + } +} + #[allow(clippy::type_complexity)] fn propagate( rng: &mut impl CryptoRngCore, @@ -98,7 +165,7 @@ where { let session_id = SessionId::random::(rng); - let mut messages = Vec::new(); + let mut messages = Messages::new(true); let mut states = BTreeMap::new(); for (signer, entry_point) in entry_points { @@ -124,8 +191,7 @@ where loop { // Pick a random message and deliver it - let message_idx = rng.gen_range(0..messages.len()); - let message = messages.swap_remove(message_idx); + let message = messages.pop(rng); debug!("Delivering message from {:?} to {:?}", message.from, message.to);