Skip to content

Commit

Permalink
Make message delivery in run_sync() ordered by default
Browse files Browse the repository at this point in the history
  • Loading branch information
fjarri committed Jan 10, 2025
1 parent f47631b commit aecfe06
Showing 1 changed file with 69 additions and 3 deletions.
72 changes: 69 additions & 3 deletions manul/src/dev/run_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,73 @@ struct RoundMessage<SP: SessionParameters> {
message: Message<SP::Verifier>,
}

enum Messages<SP: SessionParameters> {
/// For each node, if message A was sent before message B, it will be popped before message B as well.
Ordered(BTreeMap<SP::Verifier, Vec<RoundMessage<SP>>>),
/// The messages will be popped completely at random.
Unordered(Vec<RoundMessage<SP>>),
}

impl<SP> Messages<SP>
where
SP: SessionParameters,
{
fn new(ordered: bool) -> Self {
if ordered {
Self::Ordered(BTreeMap::new())
} else {
Self::Unordered(Vec::new())
}
}

/// Adds a message to the queue.
fn push(&mut self, message: RoundMessage<SP>) {
match self {
Self::Ordered(m) => m.entry(message.from.clone()).or_insert(Vec::new()).push(message),
Self::Unordered(v) => v.push(message),
}
}

/// Adds a a vector of messages to the queue.
fn extend(&mut self, messages: Vec<RoundMessage<SP>>) {
for message in messages {
self.push(message)
}
}

/// Removes a random message from the queue and returns it.
fn pop(&mut self, rng: &mut impl CryptoRngCore) -> RoundMessage<SP> {
match self {
Self::Ordered(m) => {
let senders = m.keys().cloned().collect::<Vec<_>>();
let sender_idx = rng.gen_range(0..senders.len());
let sender = &senders.get(sender_idx).expect("the entry exists");

let (message, is_empty) = {
let messages = m.get_mut(sender).expect("the entry exists");
let message = messages.remove(0);
(message, messages.is_empty())
};
if is_empty {
m.remove(sender);
}
message
}
Self::Unordered(v) => {
let message_idx = rng.gen_range(0..v.len());
v.swap_remove(message_idx)
}
}
}

fn is_empty(&self) -> bool {
match self {
Self::Ordered(m) => m.is_empty(),
Self::Unordered(v) => v.is_empty(),
}
}
}

#[allow(clippy::type_complexity)]
fn propagate<P, SP>(
rng: &mut impl CryptoRngCore,
Expand Down Expand Up @@ -98,7 +165,7 @@ where
{
let session_id = SessionId::random::<SP>(rng);

let mut messages = Vec::new();
let mut messages = Messages::new(true);
let mut states = BTreeMap::new();

for (signer, entry_point) in entry_points {
Expand All @@ -124,8 +191,7 @@ where

loop {
// Pick a random message and deliver it
let message_idx = rng.gen_range(0..messages.len());
let message = messages.swap_remove(message_idx);
let message = messages.pop(rng);

debug!("Delivering message from {:?} to {:?}", message.from, message.to);

Expand Down

0 comments on commit aecfe06

Please sign in to comment.