Skip to content
This repository has been archived by the owner on Mar 12, 2024. It is now read-only.

Commit

Permalink
nugget
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Jul 31, 2023
1 parent 3bf5e84 commit 590a020
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 71 deletions.
89 changes: 39 additions & 50 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,45 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use async_trait::async_trait;
use ethers::types::Address;
use std::marker::PhantomData;
use tracing::{trace, warn};

use crate::{listener::BrokerListener, sender::ClaimSender};

/// The `AuthorityClaimer` starts an event loop that waits for claim messages
/// from the broker, and then sends the claims to the blockchain.
///
/// It uses a `BrokerListener` for listening for messages from the broker.
///
/// It also uses a `ClaimSender` that interacts with the blockchain and
/// effectively submits the claims.
#[async_trait]
pub trait AuthorityClaimer<S: ClaimSender, L: BrokerListener> {
pub trait AuthorityClaimer<'a, L: BrokerListener, S: ClaimSender>
where
L: 'a + Sync,
S: 'a,
{
async fn start(
self,
claim_sender: S,
&'a self,
broker_listener: L,
dapp_address: Address,
) -> Result<(), AuthorityClaimerError<S, L>>;
claim_sender: S,
) -> Result<(), AuthorityClaimerError<S, L>> {
trace!("Starting the authority claimer loop");
let mut claim_sender = claim_sender;
loop {
match broker_listener.listen().await {
Ok(rollups_claim) => {
trace!("Got a claim from the broker: {:?}", rollups_claim);
claim_sender = claim_sender
.send_claim(rollups_claim)
.await
.map_err(AuthorityClaimerError::ClaimSenderError)?;
}
Err(e) => {
warn!("Broker error `{}`", e);
}
}
}
}
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -25,60 +50,24 @@ pub enum AuthorityClaimerError<S: ClaimSender, L: BrokerListener> {

#[error("broker listener error: {0}")]
BrokerListenerError(L::Error),

#[error("connection with the broker was closed")]
BrokerListenerClosed,
}

// ------------------------------------------------------------------------------------------------
// DefaultAuthorityClaimer
// ------------------------------------------------------------------------------------------------

#[derive(Default)]
pub struct DefaultAuthorityClaimer<'a> {
_lifetime: PhantomData<&'a ()>,
}
pub struct DefaultAuthorityClaimer;

impl DefaultAuthorityClaimer<'_> {
impl DefaultAuthorityClaimer {
pub fn new() -> Self {
Self::default()
Self
}
}

#[async_trait]
impl<'a, S, L> AuthorityClaimer<S, L> for DefaultAuthorityClaimer<'a>
impl<'a, L: BrokerListener, S: ClaimSender> AuthorityClaimer<'a, L, S>
for DefaultAuthorityClaimer
where
S: 'a + ClaimSender,
L: 'a + BrokerListener + Sync,
L: 'a + Sync,
S: 'a,
{
async fn start(
self,
claim_sender: S,
broker_listener: L,
dapp_address: Address,
) -> Result<(), AuthorityClaimerError<S, L>> {
trace!("Starting the authority claimer loop");
let mut claim_sender = claim_sender;
loop {
match broker_listener.listen().await {
Some(Ok(rollups_claim)) => {
trace!(
"Got a claim from the broker channel: {:?}",
rollups_claim
);
claim_sender = claim_sender
.send_claim(dapp_address.clone(), rollups_claim)
.await
.map_err(AuthorityClaimerError::ClaimSenderError)?;
}
Some(Err(e)) => {
warn!("Broker channel error `{}`", e);
}

None => {
return Err(AuthorityClaimerError::BrokerListenerClosed);
}
}
}
}
}
11 changes: 9 additions & 2 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@ use rollups_events::{BrokerConfig, DAppMetadata, RollupsClaim};
use snafu::Snafu;
use std::fmt::Debug;

use crate::metrics::AuthorityClaimerMetrics;

/// The `BrokerListener` listens for new claims from the broker.
///
/// The `listen` function should preferably yield to other processes while
/// waiting for new messages (instead of busy-waiting).
#[async_trait]
pub trait BrokerListener: Sized + Send + Debug {
type Error: snafu::Error + Send;

async fn listen(&self) -> Option<Result<RollupsClaim, Self::Error>>;
async fn listen(&self) -> Result<RollupsClaim, Self::Error>;
}

// ------------------------------------------------------------------------------------------------
Expand All @@ -29,6 +35,7 @@ impl DefaultBrokerListener {
pub fn new(
_broker_config: BrokerConfig,
_dapp_metadata: DAppMetadata,
_metrics: AuthorityClaimerMetrics,
) -> Result<Self, DefaultBrokerListenerError> {
todo!()
}
Expand All @@ -38,7 +45,7 @@ impl DefaultBrokerListener {
impl BrokerListener for DefaultBrokerListener {
type Error = DefaultBrokerListenerError;

async fn listen(&self) -> Option<Result<RollupsClaim, Self::Error>> {
async fn listen(&self) -> Result<RollupsClaim, Self::Error> {
todo!()
}
}
Expand Down
31 changes: 15 additions & 16 deletions offchain/authority-claimer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use authority_claimer::{

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Reading the configurations.
// Getting the configuration.
let config = Config::new().map_err(Box::new)?;

tracing::info!(?config, "starting authority-claimer");
Expand All @@ -32,33 +32,32 @@ async fn main() -> Result<(), Box<dyn Error>> {
let http_server_handle =
http_server::start(config.http_server_config, metrics.clone().into());

let dapp_address = config.authority_claimer_config.dapp_address;
let dapp_metadata = DAppMetadata {
chain_id: config.authority_claimer_config.txm_config.chain_id,
dapp_address: rollups_events::Address::new(
config.authority_claimer_config.dapp_address.into(),
),
dapp_address: rollups_events::Address::new(dapp_address.into()),
};

// Creating the transaction manager claim sender.
trace!("Creating the claim sender");
let claim_sender =
TxManagerClaimSender::new(dapp_metadata.clone(), metrics)
.map_err(Box::new)?;

// Creating the broker listener.
// Creating the default broker listener.
trace!("Creating the broker listener");
let broker_listener = DefaultBrokerListener::new(
let default_broker_listener = DefaultBrokerListener::new(
config.authority_claimer_config.broker_config.clone(),
dapp_metadata,
dapp_metadata.clone(),
metrics.clone(),
)
.map_err(Box::new)?;

// Creating the transaction manager claim sender.
trace!("Creating the claim sender");
let tx_manager_claim_sender =
TxManagerClaimSender::new(dapp_metadata, metrics).map_err(Box::new)?;

// Creating the claimer loop.
let dapp_address = config.authority_claimer_config.dapp_address;
let authority_claimer = DefaultAuthorityClaimer::new();
let claimer_handle =
authority_claimer.start(claim_sender, broker_listener, dapp_address);
let claimer_handle = authority_claimer
.start(default_broker_listener, tx_manager_claim_sender);

// Starting the HTTP server and the claimer loop.
tokio::select! {
ret = http_server_handle => { ret.map_err(Box::new)? }
ret = claimer_handle => { ret.map_err(Box::new)? }
Expand Down
9 changes: 6 additions & 3 deletions offchain/authority-claimer/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use async_trait::async_trait;
use ethers::types::Address;
use rollups_events::{DAppMetadata, RollupsClaim};
use snafu::Snafu;
use std::fmt::Debug;

use crate::metrics::AuthorityClaimerMetrics;

/// The `ClaimSender` sends claims to the blockchain.
///
/// It should wait for N blockchain confirmations.
#[async_trait]
pub trait ClaimSender: Sized + Send + Debug {
type Error: snafu::Error + Send;

/// The `send_claim` function consumes the `ClaimSender` object
/// and then returns it to avoid that processes use the claim sender
/// concurrently.
async fn send_claim(
self,
dapp_address: Address,
rollups_claim: RollupsClaim,
) -> Result<Self, Self::Error>;
}
Expand Down Expand Up @@ -47,7 +51,6 @@ impl ClaimSender for TxManagerClaimSender {

async fn send_claim(
self,
_dapp_address: Address,
_rollups_claim: RollupsClaim,
) -> Result<Self, Self::Error> {
todo!()
Expand Down

0 comments on commit 590a020

Please sign in to comment.