Skip to content

Commit

Permalink
Merge pull request #58 from EspressoSystems/rm/message-hooking
Browse files Browse the repository at this point in the history
Introduce message hooking
  • Loading branch information
rob-maron authored Sep 9, 2024
2 parents 5406fde + 5bd4b0e commit 98422e6
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 19 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion cdn-broker/src/binaries/bad-broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
use std::time::Duration;

use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use cdn_proto::{
crypto::signature::KeyPair,
def::{NoMessageHook, ProductionRunDef},
error::Result,
};
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -76,6 +80,9 @@ async fn main() -> Result<()> {
private_bind_endpoint: format!("0.0.0.0:{private_port}"),
private_advertise_endpoint: format!("local_ip:{private_port}"),
global_memory_pool_size: None,

user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create new `Broker`
Expand Down
9 changes: 8 additions & 1 deletion cdn-broker/src/binaries/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
//! The following is the main `Broker` binary, which just instantiates and runs
//! a `Broker` object.
use cdn_broker::{Broker, Config};
use cdn_proto::{crypto::signature::KeyPair, def::ProductionRunDef, error::Result};
use cdn_proto::{
crypto::signature::KeyPair,
def::{NoMessageHook, ProductionRunDef},
error::Result,
};
use clap::Parser;
use jf_signature::{bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS, SignatureScheme};
use rand::{rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -111,6 +115,9 @@ async fn main() -> Result<()> {
private_bind_endpoint: args.private_bind_endpoint,
private_advertise_endpoint: args.private_advertise_endpoint,
global_memory_pool_size: Some(args.global_memory_pool_size),

user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create new `Broker`
Expand Down
19 changes: 18 additions & 1 deletion cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use cdn_proto::{
bail,
connection::{limiter::Limiter, protocols::Protocol as _},
crypto::tls::{generate_cert_from_ca, load_ca},
def::{Listener, Protocol, RunDef, Scheme},
def::{Listener, MessageHook, Protocol, RunDef, Scheme},
discovery::{BrokerIdentifier, DiscoveryClient},
error::{Error, Result},
util::AbortOnDropHandle,
Expand Down Expand Up @@ -74,6 +74,12 @@ pub struct Config<R: RunDef> {
/// tries to allocate more than this amount until some memory is freed.
/// Default is 1GB.
pub global_memory_pool_size: Option<usize>,

/// The hook we use when receiving incoming messages from users
pub user_message_hook: MessageHook<R::User>,

/// The hook we use when receiving incoming messages from brokers
pub broker_message_hook: MessageHook<R::Broker>,
}

/// The broker `Inner` that we use to share common data between broker tasks.
Expand All @@ -93,6 +99,12 @@ struct Inner<R: RunDef> {

/// The shared limiter that we use for all connections.
limiter: Limiter,

/// The hook we use when receiving incoming messages from users
user_message_hook: MessageHook<R::User>,

/// The hook we use when receiving incoming messages from brokers
broker_message_hook: MessageHook<R::Broker>,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
Expand Down Expand Up @@ -136,6 +148,9 @@ impl<R: RunDef> Broker<R> {
ca_key_path,

global_memory_pool_size,

user_message_hook,
broker_message_hook,
} = config;

// Get the local IP address so we can replace in
Expand Down Expand Up @@ -233,6 +248,8 @@ impl<R: RunDef> Broker<R> {
keypair,
connections: Arc::from(RwLock::from(Connections::new(identity))),
limiter,
user_message_hook,
broker_message_hook,
}),
metrics_bind_endpoint,
user_listener,
Expand Down
7 changes: 7 additions & 0 deletions cdn-broker/src/reexports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub mod discovery {

pub mod def {
pub use cdn_proto::def::{ConnectionDef, RunDef, Topic};
pub mod hook {
pub use cdn_proto::def::{HookResult, MessageHook, MessageHookDef, NoMessageHook};
}
}

pub mod crypto {
Expand All @@ -32,6 +35,10 @@ pub mod error {
pub use cdn_proto::error::{Error, Result};
}

pub mod message {
pub use cdn_proto::message::{Broadcast, Direct, Message};
}

/// This is not guarded by `![cfg(test)]` because we use the same functions
/// when doing benchmarks.
pub mod tests {
Expand Down
16 changes: 14 additions & 2 deletions cdn-broker/src/tasks/broker/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{sync::Arc, time::Duration};
use cdn_proto::{
authenticate_with_broker, bail,
connection::{auth::broker::BrokerAuth, protocols::Connection, Bytes, UserPublicKey},
def::RunDef,
def::{HookResult, MessageHookDef, RunDef},
discovery::BrokerIdentifier,
error::{Error, Result},
message::{Message, Topic},
Expand Down Expand Up @@ -123,12 +123,24 @@ impl<Def: RunDef> Inner<Def> {
broker_identifier: &BrokerIdentifier,
connection: Connection,
) -> Result<()> {
// Clone the hook
let mut local_message_hook = self.broker_message_hook.clone();

loop {
// Receive a message from the broker
let raw_message = connection.recv_message_raw().await?;

// Attempt to deserialize the message
let message = Message::deserialize(&raw_message)?;
let mut message = Message::deserialize(&raw_message)?;

// Call the hook for the broker and handle the result
match local_message_hook.on_message_received(&mut message) {
Ok(HookResult::SkipMessage) => continue,
Ok(HookResult::ProcessMessage) => (),
Err(err) => {
Err(Error::Connection(format!("hook failed: {err}")))?;
}
}

match message {
// If we receive a direct message from a broker, we want to send it to the user with that key
Expand Down
16 changes: 14 additions & 2 deletions cdn-broker/src/tasks/user/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use std::time::Duration;

use cdn_proto::connection::{protocols::Connection, UserPublicKey};
use cdn_proto::def::{RunDef, Topic as _};
use cdn_proto::def::{HookResult, MessageHookDef, RunDef, Topic as _};
use cdn_proto::error::{Error, Result};
use cdn_proto::util::mnemonic;
use cdn_proto::{connection::auth::broker::BrokerAuth, message::Message};
Expand Down Expand Up @@ -97,12 +97,24 @@ impl<Def: RunDef> Inner<Def> {
public_key: &UserPublicKey,
connection: Connection,
) -> Result<()> {
// Clone the hook
let mut local_message_hook = self.user_message_hook.clone();

loop {
// Receive a message from the user
let raw_message = connection.recv_message_raw().await?;

// Attempt to deserialize the message
let message = Message::deserialize(&raw_message)?;
let mut message = Message::deserialize(&raw_message)?;

// Call the hook for the user and handle the result
match local_message_hook.on_message_received(&mut message) {
Ok(HookResult::SkipMessage) => continue,
Ok(HookResult::ProcessMessage) => (),
Err(err) => {
Err(Error::Connection(format!("hook failed: {err}")))?;
}
}

match message {
// If we get a direct message from a user, send it to both users and brokers.
Expand Down
4 changes: 3 additions & 1 deletion cdn-broker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use cdn_proto::{
signature::KeyPair,
tls::{generate_cert_from_ca, LOCAL_CA_CERT, LOCAL_CA_KEY},
},
def::TestingRunDef,
def::{NoMessageHook, TestingRunDef},
discovery::BrokerIdentifier,
message::{Message, Topic},
};
Expand Down Expand Up @@ -240,6 +240,8 @@ async fn new_broker_under_test<B: Protocol, U: Protocol>() -> Broker<TestingRunD
global_memory_pool_size: None,
ca_cert_path: None,
ca_key_path: None,
user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create and return the broker
Expand Down
2 changes: 1 addition & 1 deletion cdn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ harness = false


[dependencies]
redis = { version = "0.26", default-features = false, features = [
redis = { version = "0.25", default-features = false, features = [
"connection-manager",
"tokio-comp",
] }
Expand Down
2 changes: 1 addition & 1 deletion cdn-proto/src/connection/auth/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl<R: RunDef> BrokerAuth<R> {
let public_key_bytes = bail!(
keypair.public_key.serialize(),
Serialize,
"failed to serialize publi key"
"failed to serialize public key"
);

// We authenticate to the marshal with a key
Expand Down
8 changes: 4 additions & 4 deletions cdn-proto/src/connection/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ use prometheus::{register_gauge, register_histogram, Gauge, Histogram};
lazy_static! {
// The total number of bytes sent
pub static ref BYTES_SENT: Option<Gauge> =
register_gauge!("total_bytes_sent", "the total number of bytes sent").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_gauge!("total_bytes_sent", "the total number of bytes sent").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();

// The total number of bytes received
pub static ref BYTES_RECV: Option<Gauge> =
register_gauge!("total_bytes_recv", "the total number of bytes received").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_gauge!("total_bytes_recv", "the total number of bytes received").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();

// Per-message latency
pub static ref LATENCY: Option<Histogram> =
register_histogram!("latency", "message delivery latency").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_histogram!("latency", "message delivery latency").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();

// The per-message latency over the last 30 seconds
pub static ref RUNNING_LATENCY: Option<Gauge> =
register_gauge!("running_latency", "average tail latency over the last 30s").map_err(|e| eprintln!("Could not register metric: {:?}", e)).ok();
register_gauge!("running_latency", "average tail latency over the last 30s").map_err(|e| eprintln!("Could not register metric: {e:?}")).ok();
}
36 changes: 35 additions & 1 deletion cdn-proto/src/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::crypto::signature::SignatureScheme;
use crate::discovery::embedded::Embedded;
use crate::discovery::{redis::Redis, DiscoveryClient};
use crate::error::{Error, Result};
use crate::message::Message;
use anyhow::Result as AnyhowResult;

/// An implementation of `Topic` for testing purposes.
#[repr(u8)]
Expand Down Expand Up @@ -55,12 +57,39 @@ pub trait RunDef: 'static {
type Topic: Topic;
}

/// This trait defines the connection configuration for a single CDN component.
/// This trait defines the connection configuration for a single CDN component
pub trait ConnectionDef: 'static {
type Scheme: SignatureScheme;
type Protocol: ProtocolType;
type MessageHook: MessageHookDef;
}

/// The result of a message hooking operation
pub enum HookResult {
/// Skip processing the message
SkipMessage,

/// Process the message
ProcessMessage,
}

/// This trait defines a hook that we use to perform additional actions on receiving a message
pub trait MessageHookDef: Send + Sync + 'static + Clone {
/// The hook that is called when a message is received. If an error is returned, the connection
/// will be closed.
///
/// # Errors
/// Is supposed to return an error if the other end should be disconnected.
fn on_message_received(&mut self, _message: &mut Message) -> AnyhowResult<HookResult> {
Ok(HookResult::ProcessMessage)
}
}

/// The no-op hook
#[derive(Clone)]
pub struct NoMessageHook;
impl MessageHookDef for NoMessageHook {}

/// The production run configuration.
/// Uses the real network protocols and Redis for discovery.
pub struct ProductionRunDef;
Expand All @@ -77,6 +106,7 @@ pub struct ProductionBrokerConnection;
impl ConnectionDef for ProductionBrokerConnection {
type Scheme = BLS;
type Protocol = Tcp;
type MessageHook = NoMessageHook;
}

/// The production user connection configuration.
Expand All @@ -85,6 +115,7 @@ pub struct ProductionUserConnection;
impl ConnectionDef for ProductionUserConnection {
type Scheme = BLS;
type Protocol = Quic;
type MessageHook = NoMessageHook;
}

/// The production client connection configuration.
Expand All @@ -95,6 +126,7 @@ pub struct ProductionClientConnection;
impl ConnectionDef for ProductionClientConnection {
type Scheme = Scheme<<ProductionRunDef as RunDef>::User>;
type Protocol = Protocol<<ProductionRunDef as RunDef>::User>;
type MessageHook = NoMessageHook;
}

/// The testing run configuration.
Expand All @@ -117,11 +149,13 @@ pub struct TestingConnection<P: ProtocolType> {
impl<P: ProtocolType> ConnectionDef for TestingConnection<P> {
type Scheme = BLS;
type Protocol = P;
type MessageHook = NoMessageHook;
}

// Type aliases to automatically disambiguate usage
pub type Scheme<A> = <A as ConnectionDef>::Scheme;
pub type PublicKey<A> = <Scheme<A> as SignatureScheme>::PublicKey;
pub type MessageHook<A> = <A as ConnectionDef>::MessageHook;

// Type aliases to automatically disambiguate usage
pub type Protocol<A> = <A as ConnectionDef>::Protocol;
Expand Down
4 changes: 3 additions & 1 deletion tests/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use cdn_marshal::{Config as MarshalConfig, Marshal};
use cdn_proto::{
connection::protocols::memory::Memory,
crypto::signature::{KeyPair, Serializable, SignatureScheme},
def::{TestingConnection, TestingRunDef},
def::{NoMessageHook, TestingConnection, TestingRunDef},
discovery::{embedded::Embedded, BrokerIdentifier, DiscoveryClient},
message::Topic,
};
Expand Down Expand Up @@ -78,6 +78,8 @@ async fn new_broker(key: u64, public_ep: &str, private_ep: &str, discovery_ep: &
public_advertise_endpoint: public_ep.to_string(),
public_bind_endpoint: public_ep.to_string(),
global_memory_pool_size: None,
user_message_hook: NoMessageHook,
broker_message_hook: NoMessageHook,
};

// Create broker
Expand Down

0 comments on commit 98422e6

Please sign in to comment.