Skip to content

Commit

Permalink
improve configuration time definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Apr 19, 2024
1 parent ea397a3 commit dd8cd09
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 146 deletions.
47 changes: 23 additions & 24 deletions cdn-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ use cdn_proto::{
bail,
connection::protocols::Protocol as _,
crypto::tls::{generate_cert_from_ca, load_ca},
def::{Listener, Protocol, Scheme},
def::{Listener, Protocol, RunDef, Scheme},
discovery::{BrokerIdentifier, DiscoveryClient},
error::{Error, Result},
};
use cdn_proto::{crypto::signature::KeyPair, def::RunDef, metrics as proto_metrics};
use cdn_proto::{crypto::signature::KeyPair, metrics as proto_metrics};
use connections::Connections;
use local_ip_address::local_ip;
use tokio::{select, spawn, sync::Semaphore};
use tracing::info;

/// The broker's configuration. We need this when we create a new one.
pub struct Config<Def: RunDef> {
pub struct Config<R: RunDef> {
/// The user (public) advertise endpoint in `IP:port` form: what the marshals send to
/// users upon authentication. Users connect to us with this endpoint.
pub public_advertise_endpoint: String,
Expand All @@ -53,7 +53,7 @@ pub struct Config<Def: RunDef> {
/// The discovery endpoint. We use this to maintain consistency between brokers and marshals.
pub discovery_endpoint: String,

pub keypair: KeyPair<Scheme<Def::Broker>>,
pub keypair: KeyPair<Scheme<R::Broker>>,

/// An optional TLS CA cert path. If not specified, will use the local one.
pub ca_cert_path: Option<String>,
Expand All @@ -63,50 +63,49 @@ pub struct Config<Def: RunDef> {
}

/// The broker `Inner` that we use to share common data between broker tasks.
struct Inner<Def: RunDef> {
struct Inner<R: RunDef> {
/// A broker identifier that we can use to establish uniqueness among brokers.
identity: BrokerIdentifier,

/// The (clonable) `Discovery` client that we will use to maintain consistency between brokers and marshals
discovery_client: Def::DiscoveryClientType,
discovery_client: R::DiscoveryClientType,

/// The underlying (public) verification key, used to authenticate with the server. Checked
/// against the stake table.
keypair: KeyPair<Scheme<Def::Broker>>,
/// The underlying (public) verification key, used to authenticate with other brokers.
keypair: KeyPair<Scheme<R::Broker>>,

/// A lock on authentication so we don't thrash when authenticating with brokers.
/// Only lets us authenticate to one broker at a time.
auth_lock: Semaphore,

/// The connections that currently exist. We use this everywhere we need to update connection
/// state or send messages.
connections: Arc<Connections<Def>>,
connections: Arc<Connections<R>>,
}

/// The main `Broker` struct. We instantiate this when we want to run a broker.
pub struct Broker<Def: RunDef> {
pub struct Broker<R: RunDef> {
/// The broker's `Inner`. We clone this and pass it around when needed.
inner: Arc<Inner<Def>>,
inner: Arc<Inner<R>>,

/// The public (user -> broker) listener
user_listener: Listener<Def::User>,
user_listener: Listener<R::User>,

/// The private (broker <-> broker) listener
broker_listener: Listener<Def::Broker>,
broker_listener: Listener<R::Broker>,

/// The endpoint to bind to for externalizing metrics (in `IP:port` form). If not provided,
/// metrics are not exposed.
metrics_bind_endpoint: Option<SocketAddr>,
}

impl<Def: RunDef> Broker<Def> {
impl<R: RunDef> Broker<R> {
/// Create a new `Broker` from a `Config`
///
/// # Errors
/// - If we fail to create the `Discovery` client
/// - If we fail to bind to our public endpoint
/// - If we fail to bind to our private endpoint
pub async fn new(config: Config<Def>) -> Result<Self> {
pub async fn new(config: Config<R>) -> Result<Self> {
// Extrapolate values from the underlying broker configuration
let Config {
public_advertise_endpoint,
Expand Down Expand Up @@ -146,7 +145,7 @@ impl<Def: RunDef> Broker<Def> {

// Create the `Discovery` client we will use to maintain consistency
let discovery_client = bail!(
Def::DiscoveryClientType::new(discovery_endpoint, Some(identity.clone()),).await,
R::DiscoveryClientType::new(discovery_endpoint, Some(identity.clone()),).await,
Parse,
"failed to create discovery client"
);
Expand All @@ -159,7 +158,7 @@ impl<Def: RunDef> Broker<Def> {

// Create the user (public) listener
let user_listener = bail!(
Protocol::<Def::User>::bind(
Protocol::<R::User>::bind(
public_bind_endpoint.as_str(),
tls_cert.clone(),
tls_key.clone()
Expand All @@ -174,7 +173,7 @@ impl<Def: RunDef> Broker<Def> {

// Create the broker (private) listener
let broker_listener = bail!(
Protocol::<Def::Broker>::bind(private_bind_endpoint.as_str(), tls_cert, tls_key).await,
Protocol::<R::Broker>::bind(private_bind_endpoint.as_str(), tls_cert, tls_key).await,
Connection,
format!(
"failed to bind to private (broker) bind endpoint {}",
Expand All @@ -188,15 +187,15 @@ impl<Def: RunDef> Broker<Def> {
// Parse the metrics bind endpoint
let metrics_bind_endpoint: Option<SocketAddr> = metrics_bind_endpoint
.map(|m| {
Ok(bail!(
bail!(
m.to_socket_addrs(),
Parse,
"failed to parse metrics bind endpoint"
)
.find(|s| s.is_ipv4())
.ok_or(Error::Connection(
"failed to resolve metrics bind endpoint".to_string(),
))?)
.find(SocketAddr::is_ipv4)
.ok_or_else(|| {
Error::Connection("failed to resolve metrics bind endpoint".to_string())
})
})
.transpose()?;

Expand Down
5 changes: 4 additions & 1 deletion cdn-broker/src/reexports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ pub mod connection {
pub use cdn_proto::connection::protocols::quic::Quic;
pub use cdn_proto::connection::protocols::tcp::Tcp;
}
pub use cdn_proto::connection::middleware::{
Middleware, NoMiddleware, TrustedMiddleware, UntrustedMiddleware,
};
}

pub mod discovery {
pub use cdn_proto::discovery::{embedded::Embedded, redis::Redis, DiscoveryClient};
}

pub mod def {
pub use cdn_proto::def::RunDef;
pub use cdn_proto::def::{ConnectionDef, RunDef};
}

pub mod crypto {
Expand Down
12 changes: 6 additions & 6 deletions cdn-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod retry;
use cdn_proto::{
bail,
crypto::signature::Serializable,
def::{PublicKey, RunDef},
def::{ConnectionDef, PublicKey},
error::{Error, Result},
message::{Broadcast, Direct, Message, Topic},
};
Expand All @@ -19,13 +19,13 @@ use retry::Retry;
/// for common operations to and from a server. Mostly just used to make the API
/// more ergonomic. Also keeps track of subscriptions.
#[derive(Clone)]
pub struct Client<Def: RunDef>(Retry<Def>);
pub struct Client<C: ConnectionDef>(Retry<C>);

pub type Config<D> = retry::Config<D>;
pub type Config<C> = retry::Config<C>;

impl<Def: RunDef> Client<Def> {
impl<C: ConnectionDef> Client<C> {
/// Creates a new `Retry` from a configuration.
pub fn new(config: Config<Def>) -> Self {
pub fn new(config: Config<C>) -> Self {
Self(Retry::from_config(config))
}

Expand Down Expand Up @@ -65,7 +65,7 @@ impl<Def: RunDef> Client<Def> {
/// If the connection or serialization has failed
pub async fn send_direct_message(
&self,
recipient: &PublicKey<Def::User>,
recipient: &PublicKey<C>,
message: Vec<u8>,
) -> Result<()> {
// Serialize recipient to a byte array before sending the message
Expand Down
4 changes: 2 additions & 2 deletions cdn-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;
use cdn_client::{Client, Config};
use cdn_proto::{
crypto::signature::{KeyPair, Serializable},
def::ProductionRunDef,
def::ProductionClientConnection,
message::{Broadcast, Direct, Message, Topic},
};
use clap::Parser;
Expand Down Expand Up @@ -56,7 +56,7 @@ async fn main() {

// Create a client, specifying the BLS signature algorithm
// and the `QUIC` protocol.
let client = Client::<ProductionRunDef>::new(config);
let client = Client::<ProductionClientConnection>::new(config);

// In a loop,
loop {
Expand Down
5 changes: 4 additions & 1 deletion cdn-client/src/reexports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ pub mod connection {
pub use cdn_proto::connection::protocols::quic::Quic;
pub use cdn_proto::connection::protocols::tcp::Tcp;
}
pub use cdn_proto::connection::middleware::{
Middleware, NoMiddleware, TrustedMiddleware, UntrustedMiddleware,
};
}

pub mod discovery {
pub use cdn_proto::discovery::{embedded::Embedded, redis::Redis, DiscoveryClient};
}

pub mod def {
pub use cdn_proto::def::RunDef;
pub use cdn_proto::def::{ConnectionDef, RunDef};
}

pub mod crypto {
Expand Down
32 changes: 16 additions & 16 deletions cdn-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use cdn_proto::{
protocols::{Protocol as _, Receiver as _, Sender as _},
},
crypto::signature::KeyPair,
def::{Connection, Protocol, Receiver, RunDef, Scheme, Sender},
def::{Connection, ConnectionDef, Protocol, Scheme},
error::{Error, Result},
message::{Message, Topic},
};
Expand All @@ -30,13 +30,13 @@ use crate::bail;
/// It employs synchronization as well as retry logic.
/// Can be cloned to provide a handle to the same underlying elastic connection.
#[derive(Clone)]
pub struct Retry<Def: RunDef> {
pub inner: Arc<Inner<Def>>,
pub struct Retry<C: ConnectionDef> {
pub inner: Arc<Inner<C>>,
}

/// `Inner` is held exclusively by `Retry`, wherein an `Arc` is used
/// to facilitate interior mutability.
pub struct Inner<Def: RunDef> {
pub struct Inner<C: ConnectionDef> {
/// This is the remote endpoint of the marshal that we authenticate with.
endpoint: String,

Expand All @@ -45,49 +45,49 @@ pub struct Inner<Def: RunDef> {
use_local_authority: bool,

/// The underlying connection
connection: Arc<RwLock<OnceCell<Connection<Def::User>>>>,
connection: Arc<RwLock<OnceCell<Connection<C>>>>,

/// The keypair to use when authenticating
pub keypair: KeyPair<Scheme<Def::User>>,
pub keypair: KeyPair<Scheme<C>>,

/// The topics we're currently subscribed to. We need this so we can send our subscriptions
/// when we connect to a new server.
pub subscribed_topics: RwLock<HashSet<Topic>>,
}

impl<Def: RunDef> Inner<Def> {
impl<C: ConnectionDef> Inner<C> {
/// Attempt a reconnection to the remote marshal endpoint.
/// Returns the connection verbatim without updating any internal
/// structs.
///
/// # Errors
/// - If the connection failed
/// - If authentication failed
async fn connect(self: &Arc<Self>) -> Result<(Sender<Def::User>, Receiver<Def::User>)> {
async fn connect(self: &Arc<Self>) -> Result<Connection<C>> {
// Make the connection to the marshal
let connection = bail!(
Protocol::<Def::User>::connect(&self.endpoint, self.use_local_authority).await,
Protocol::<C>::connect(&self.endpoint, self.use_local_authority).await,
Connection,
"failed to connect to endpoint"
);

// Authenticate the connection to the marshal (if not provided)
let (broker_endpoint, permit) = bail!(
UserAuth::<Def::User>::authenticate_with_marshal(&connection, &self.keypair).await,
UserAuth::<C>::authenticate_with_marshal(&connection, &self.keypair).await,
Authentication,
"failed to authenticate to marshal"
);

// Make the connection to the broker
let connection = bail!(
Protocol::<Def::User>::connect(&broker_endpoint, self.use_local_authority).await,
Protocol::<C>::connect(&broker_endpoint, self.use_local_authority).await,
Connection,
"failed to connect to broker"
);

// Authenticate the connection to the broker
bail!(
UserAuth::<Def::User>::authenticate_with_broker(
UserAuth::<C>::authenticate_with_broker(
&connection,
permit,
self.subscribed_topics.read().await.clone()
Expand All @@ -105,7 +105,7 @@ impl<Def: RunDef> Inner<Def> {

/// The configuration needed to construct a `Retry` connection.
#[derive(Clone)]
pub struct Config<Def: RunDef> {
pub struct Config<C: ConnectionDef> {
/// This is the remote endpoint of the marshal that we authenticate with.
pub endpoint: String,

Expand All @@ -115,7 +115,7 @@ pub struct Config<Def: RunDef> {

/// The underlying (public) verification key, used to authenticate with the server. Checked
/// against the stake table.
pub keypair: KeyPair<Scheme<Def::User>>,
pub keypair: KeyPair<Scheme<C>>,

/// The topics we're currently subscribed to. We need this here so we can send our subscriptions
/// when we connect to a new server.
Expand Down Expand Up @@ -166,15 +166,15 @@ macro_rules! try_with_reconnect {
}};
}

impl<Def: RunDef> Retry<Def> {
impl<C: ConnectionDef> Retry<C> {
/// Creates a new `Retry` connection from a `Config`
/// Attempts to make an initial connection.
/// This allows us to create elastic clients that always try to maintain a connection.
///
/// # Errors
/// - If we are unable to either parse or bind an endpoint to the local endpoint.
/// - If we are unable to make the initial connection
pub fn from_config(config: Config<Def>) -> Self {
pub fn from_config(config: Config<C>) -> Self {
// Extrapolate values from the underlying client configuration
let Config {
endpoint,
Expand Down
10 changes: 5 additions & 5 deletions cdn-marshal/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ use std::time::Duration;

use cdn_proto::{
connection::{auth::marshal::MarshalAuth, protocols::Sender as _},
def::{Receiver, RunDef, Sender},
def::{Connection, RunDef},
mnemonic,
};
use tokio::time::timeout;
use tracing::info;

use crate::Marshal;

impl<Def: RunDef> Marshal<Def> {
impl<R: RunDef> Marshal<R> {
/// Handles a user's connection, including authentication.
pub async fn handle_connection(
connection: (Sender<Def::User>, Receiver<Def::User>),
mut discovery_client: Def::DiscoveryClientType,
connection: Connection<R::User>,
mut discovery_client: R::DiscoveryClientType,
) {
// Verify (authenticate) the connection
if let Ok(Ok(user_public_key)) = timeout(
Duration::from_secs(5),
MarshalAuth::<Def>::verify_user(&connection, &mut discovery_client),
MarshalAuth::<R>::verify_user(&connection, &mut discovery_client),
)
.await
{
Expand Down
Loading

0 comments on commit dd8cd09

Please sign in to comment.