diff --git a/offchain/authority-claimer/src/claimer.rs b/offchain/authority-claimer/src/claimer.rs index 1c4ea83c..803eb7fb 100644 --- a/offchain/authority-claimer/src/claimer.rs +++ b/offchain/authority-claimer/src/claimer.rs @@ -2,20 +2,45 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use async_trait::async_trait; -use ethers::types::Address; -use std::marker::PhantomData; use tracing::{trace, warn}; use crate::{listener::BrokerListener, sender::ClaimSender}; +/// The `AuthorityClaimer` starts an event loop that waits for claim messages +/// from the broker, and then sends the claims to the blockchain. +/// +/// It uses a `BrokerListener` for listening for messages from the broker. +/// +/// It also uses a `ClaimSender` that interacts with the blockchain and +/// effectively submits the claims. #[async_trait] -pub trait AuthorityClaimer { +pub trait AuthorityClaimer<'a, L: BrokerListener, S: ClaimSender> +where + L: 'a + Sync, + S: 'a, +{ async fn start( - self, - claim_sender: S, + &'a self, broker_listener: L, - dapp_address: Address, - ) -> Result<(), AuthorityClaimerError>; + claim_sender: S, + ) -> Result<(), AuthorityClaimerError> { + trace!("Starting the authority claimer loop"); + let mut claim_sender = claim_sender; + loop { + match broker_listener.listen().await { + Ok(rollups_claim) => { + trace!("Got a claim from the broker: {:?}", rollups_claim); + claim_sender = claim_sender + .send_claim(rollups_claim) + .await + .map_err(AuthorityClaimerError::ClaimSenderError)?; + } + Err(e) => { + warn!("Broker error `{}`", e); + } + } + } + } } #[derive(Debug, thiserror::Error)] @@ -25,60 +50,24 @@ pub enum AuthorityClaimerError { #[error("broker listener error: {0}")] BrokerListenerError(L::Error), - - #[error("connection with the broker was closed")] - BrokerListenerClosed, } // ------------------------------------------------------------------------------------------------ // DefaultAuthorityClaimer // ------------------------------------------------------------------------------------------------ -#[derive(Default)] -pub struct DefaultAuthorityClaimer<'a> { - _lifetime: PhantomData<&'a ()>, -} +pub struct DefaultAuthorityClaimer; -impl DefaultAuthorityClaimer<'_> { +impl DefaultAuthorityClaimer { pub fn new() -> Self { - Self::default() + Self } } -#[async_trait] -impl<'a, S, L> AuthorityClaimer for DefaultAuthorityClaimer<'a> +impl<'a, L: BrokerListener, S: ClaimSender> AuthorityClaimer<'a, L, S> + for DefaultAuthorityClaimer where - S: 'a + ClaimSender, - L: 'a + BrokerListener + Sync, + L: 'a + Sync, + S: 'a, { - async fn start( - self, - claim_sender: S, - broker_listener: L, - dapp_address: Address, - ) -> Result<(), AuthorityClaimerError> { - trace!("Starting the authority claimer loop"); - let mut claim_sender = claim_sender; - loop { - match broker_listener.listen().await { - Some(Ok(rollups_claim)) => { - trace!( - "Got a claim from the broker channel: {:?}", - rollups_claim - ); - claim_sender = claim_sender - .send_claim(dapp_address.clone(), rollups_claim) - .await - .map_err(AuthorityClaimerError::ClaimSenderError)?; - } - Some(Err(e)) => { - warn!("Broker channel error `{}`", e); - } - - None => { - return Err(AuthorityClaimerError::BrokerListenerClosed); - } - } - } - } } diff --git a/offchain/authority-claimer/src/listener.rs b/offchain/authority-claimer/src/listener.rs index 7ab23afa..362a0d69 100644 --- a/offchain/authority-claimer/src/listener.rs +++ b/offchain/authority-claimer/src/listener.rs @@ -6,11 +6,17 @@ use rollups_events::{BrokerConfig, DAppMetadata, RollupsClaim}; use snafu::Snafu; use std::fmt::Debug; +use crate::metrics::AuthorityClaimerMetrics; + +/// The `BrokerListener` listens for new claims from the broker. +/// +/// The `listen` function should preferably yield to other processes while +/// waiting for new messages (instead of busy-waiting). #[async_trait] pub trait BrokerListener: Sized + Send + Debug { type Error: snafu::Error + Send; - async fn listen(&self) -> Option>; + async fn listen(&self) -> Result; } // ------------------------------------------------------------------------------------------------ @@ -29,6 +35,7 @@ impl DefaultBrokerListener { pub fn new( _broker_config: BrokerConfig, _dapp_metadata: DAppMetadata, + _metrics: AuthorityClaimerMetrics, ) -> Result { todo!() } @@ -38,7 +45,7 @@ impl DefaultBrokerListener { impl BrokerListener for DefaultBrokerListener { type Error = DefaultBrokerListenerError; - async fn listen(&self) -> Option> { + async fn listen(&self) -> Result { todo!() } } diff --git a/offchain/authority-claimer/src/main.rs b/offchain/authority-claimer/src/main.rs index cb70a6d9..a8bbf867 100644 --- a/offchain/authority-claimer/src/main.rs +++ b/offchain/authority-claimer/src/main.rs @@ -16,7 +16,7 @@ use authority_claimer::{ #[tokio::main] async fn main() -> Result<(), Box> { - // Reading the configurations. + // Getting the configuration. let config = Config::new().map_err(Box::new)?; tracing::info!(?config, "starting authority-claimer"); @@ -32,33 +32,32 @@ async fn main() -> Result<(), Box> { let http_server_handle = http_server::start(config.http_server_config, metrics.clone().into()); + let dapp_address = config.authority_claimer_config.dapp_address; let dapp_metadata = DAppMetadata { chain_id: config.authority_claimer_config.txm_config.chain_id, - dapp_address: rollups_events::Address::new( - config.authority_claimer_config.dapp_address.into(), - ), + dapp_address: rollups_events::Address::new(dapp_address.into()), }; - // Creating the transaction manager claim sender. - trace!("Creating the claim sender"); - let claim_sender = - TxManagerClaimSender::new(dapp_metadata.clone(), metrics) - .map_err(Box::new)?; - - // Creating the broker listener. + // Creating the default broker listener. trace!("Creating the broker listener"); - let broker_listener = DefaultBrokerListener::new( + let default_broker_listener = DefaultBrokerListener::new( config.authority_claimer_config.broker_config.clone(), - dapp_metadata, + dapp_metadata.clone(), + metrics.clone(), ) .map_err(Box::new)?; + // Creating the transaction manager claim sender. + trace!("Creating the claim sender"); + let tx_manager_claim_sender = + TxManagerClaimSender::new(dapp_metadata, metrics).map_err(Box::new)?; + // Creating the claimer loop. - let dapp_address = config.authority_claimer_config.dapp_address; let authority_claimer = DefaultAuthorityClaimer::new(); - let claimer_handle = - authority_claimer.start(claim_sender, broker_listener, dapp_address); + let claimer_handle = authority_claimer + .start(default_broker_listener, tx_manager_claim_sender); + // Starting the HTTP server and the claimer loop. tokio::select! { ret = http_server_handle => { ret.map_err(Box::new)? } ret = claimer_handle => { ret.map_err(Box::new)? } diff --git a/offchain/authority-claimer/src/sender.rs b/offchain/authority-claimer/src/sender.rs index bf9b09f6..5eddc5ec 100644 --- a/offchain/authority-claimer/src/sender.rs +++ b/offchain/authority-claimer/src/sender.rs @@ -2,20 +2,24 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use async_trait::async_trait; -use ethers::types::Address; use rollups_events::{DAppMetadata, RollupsClaim}; use snafu::Snafu; use std::fmt::Debug; use crate::metrics::AuthorityClaimerMetrics; +/// The `ClaimSender` sends claims to the blockchain. +/// +/// It should wait for N blockchain confirmations. #[async_trait] pub trait ClaimSender: Sized + Send + Debug { type Error: snafu::Error + Send; + /// The `send_claim` function consumes the `ClaimSender` object + /// and then returns it to avoid that processes use the claim sender + /// concurrently. async fn send_claim( self, - dapp_address: Address, rollups_claim: RollupsClaim, ) -> Result; } @@ -47,7 +51,6 @@ impl ClaimSender for TxManagerClaimSender { async fn send_claim( self, - _dapp_address: Address, _rollups_claim: RollupsClaim, ) -> Result { todo!()