Skip to content
This repository has been archived by the owner on Mar 12, 2024. It is now read-only.

Commit

Permalink
pending style reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 committed Jul 31, 2023
1 parent d0e79ca commit c36b25d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 70 deletions.
107 changes: 66 additions & 41 deletions offchain/authority-claimer/src/claimer.rs
Original file line number Diff line number Diff line change
@@ -1,81 +1,106 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use rollups_events::{Address, DAppMetadata};
use snafu::{whatever, ResultExt};
use async_trait::async_trait;
use rollups_events::DAppMetadata;
use tracing::{info, trace, warn};

use crate::{
config::AuthorityClaimerConfig,
error::{AuthorityClaimerError, BrokerListenerSnafu, ClaimSenderSnafu},
error::AuthorityClaimerError,
listener::{BrokerListener, RedisBrokerListener},
metrics::AuthorityClaimerMetrics,
sender::{ClaimSender, TxManagerClaimSender},
};

pub struct AuthorityClaimer<S: ClaimSender, L: BrokerListener> {
claim_sender: S,
broker_listener: L,
#[async_trait]
pub trait AuthorityClaimerServer<S: ClaimSender, L: BrokerListener> {
async fn start(
self,
dapp_address: ethers::types::Address,
) -> Result<(), AuthorityClaimerError<S, L>>;
}

impl AuthorityClaimer<TxManagerClaimSender, RedisBrokerListener> {
pub struct DefaultAuthorityClaimerServer {
claim_sender: TxManagerClaimSender,
broker_listener: RedisBrokerListener,
}

impl DefaultAuthorityClaimerServer {
pub async fn new(
config: AuthorityClaimerConfig,
metrics: AuthorityClaimerMetrics,
) -> Result<Self, AuthorityClaimerError> {
) -> Result<
Self,
AuthorityClaimerError<TxManagerClaimSender, RedisBrokerListener>,
> {
info!("Setting up authority claimer with config: {:?}", config);

let dapp_metadata = DAppMetadata {
chain_id: config.txm_config.chain_id,
dapp_address: Address::new(config.dapp_address.into()),
dapp_address: rollups_events::Address::new(
config.dapp_address.into(),
),
};

trace!("Creating the claim sender");
let claim_sender =
TxManagerClaimSender::new(dapp_metadata.clone(), metrics)
.map_err(|e| Box::new(e) as Box<dyn snafu::Error>)
.context(ClaimSenderSnafu)?;
.map_err(AuthorityClaimerError::ClaimSenderError)?;

trace!("Creating the broker listener");
let broker_listener = RedisBrokerListener::new(dapp_metadata.clone())
.map_err(|e| Box::new(e) as Box<dyn snafu::Error>)
.context(BrokerListenerSnafu)?;
let broker_listener =
RedisBrokerListener::new(dapp_metadata.clone())
.map_err(AuthorityClaimerError::BrokerListenerError)?;

Ok(AuthorityClaimer {
Ok(DefaultAuthorityClaimerServer {
claim_sender,
broker_listener,
})
}
}

impl<S: ClaimSender, L: BrokerListener> AuthorityClaimer<S, L> {
pub async fn start(
#[async_trait]
impl AuthorityClaimerServer<TxManagerClaimSender, RedisBrokerListener>
for DefaultAuthorityClaimerServer
{
async fn start(
self,
config: AuthorityClaimerConfig,
metrics: AuthorityClaimerMetrics,
) -> Result<(), AuthorityClaimerError> {
trace!("Starting the authority claimer loop");
let mut claim_sender = self.claim_sender;
loop {
match self.broker_listener.listen().await {
Some(Ok(rollups_claim)) => {
trace!(
"Got a claim from the broker channel: {:?}",
rollups_claim
);
claim_sender = claim_sender
.send_claim(config.dapp_address.clone(), rollups_claim)
.await
.map_err(|e| Box::new(e) as Box<dyn snafu::Error>)
.context(ClaimSenderSnafu)?;
}
Some(Err(e)) => {
warn!("Broker channel error `{}`", e);
}
dapp_address: ethers::types::Address,
) -> Result<
(),
AuthorityClaimerError<TxManagerClaimSender, RedisBrokerListener>,
> {
start_server(self.claim_sender, self.broker_listener, dapp_address)
.await
}
}

async fn start_server<S: ClaimSender, L: BrokerListener>(
claim_sender: S,
broker_listener: L,
dapp_address: ethers::types::Address,
) -> Result<(), AuthorityClaimerError<S, L>> {
trace!("Starting the authority claimer loop");
let mut claim_sender = claim_sender;
loop {
match broker_listener.listen().await {
Some(Ok(rollups_claim)) => {
trace!(
"Got a claim from the broker channel: {:?}",
rollups_claim
);
claim_sender = claim_sender
.send_claim(dapp_address.clone(), rollups_claim)
.await
.map_err(AuthorityClaimerError::ClaimSenderError)?;
}
Some(Err(e)) => {
warn!("Broker channel error `{}`", e);
}

None => {
whatever!("Broker channel closed");
}
None => {
return Err(AuthorityClaimerError::BrokerListenerClosed);
}
}
}
Expand Down
24 changes: 8 additions & 16 deletions offchain/authority-claimer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use http_server::HttpServerError;
use snafu::Snafu;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum AuthorityClaimerError {
#[snafu(display("http server error"))]
HttpServerError { source: HttpServerError },
use crate::{listener::BrokerListener, sender::ClaimSender};

#[snafu(display("claim sender error"))]
ClaimSenderError { source: Box<dyn snafu::Error> },
#[derive(Debug)]
pub enum AuthorityClaimerError<S: ClaimSender, L: BrokerListener> {
HttpServerError(HttpServerError),

#[snafu(display("broker listener error"))]
BrokerListenerError { source: Box<dyn snafu::Error> },
ClaimSenderError(S::Error),

#[snafu(whatever, display("{message}"))]
Whatever {
message: String,
#[snafu(source(from(Box<dyn std::error::Error>, Some)))]
source: Option<Box<dyn std::error::Error>>,
},
BrokerListenerError(L::Error),

BrokerListenerClosed,
}
6 changes: 3 additions & 3 deletions offchain/authority-claimer/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ pub trait BrokerListener: Debug {
pub struct RedisBrokerListener {}

#[derive(Debug, Snafu)]
pub enum BrokerListenerError {}
pub enum RedisBrokerListenerError {}

impl RedisBrokerListener {
pub fn new(
_dapp_metadata: DAppMetadata,
) -> Result<Self, BrokerListenerError> {
) -> Result<Self, RedisBrokerListenerError> {
todo!()
}
}

#[async_trait]
impl BrokerListener for RedisBrokerListener {
type Error = BrokerListenerError;
type Error = RedisBrokerListenerError;

async fn listen(&self) -> Option<Result<RollupsClaim, Self::Error>> {
Some(Ok(RollupsClaim::default()))
Expand Down
23 changes: 15 additions & 8 deletions offchain/authority-claimer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use snafu::ResultExt;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};

use authority_claimer::{
claimer, config::Config, error::HttpServerSnafu,
claimer::{AuthorityClaimerServer, DefaultAuthorityClaimerServer},
config::Config,
error::AuthorityClaimerError,
metrics::AuthorityClaimerMetrics,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() {
// Configurations.
let config = Config::new()?;
let config = Config::new().unwrap();

tracing::info!(?config, "starting authority-claimer");

Expand All @@ -28,17 +29,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
http_server::start(config.http_server_config, metrics.clone().into());

// Claimer loop.
let claimer_handle =
claimer::start(config.authority_claimer_config, metrics);
let dapp_address = config.authority_claimer_config.dapp_address;
let authority_claimer = DefaultAuthorityClaimerServer::new(
config.authority_claimer_config,
metrics,
)
.await
.unwrap();
let claimer_handle = authority_claimer.start(dapp_address);

tokio::select! {
ret = http_server_handle => {
ret.context(HttpServerSnafu)
ret.map_err(AuthorityClaimerError::HttpServerError)
}

ret = claimer_handle => {
ret
}
}
.map_err(|e| e.into())
.unwrap()
}
4 changes: 2 additions & 2 deletions offchain/authority-claimer/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::fmt::Debug;
use crate::metrics::AuthorityClaimerMetrics;

#[async_trait]
pub trait ClaimSender: Debug + Sized {
pub trait ClaimSender: Sized + Debug + Clone {
type Error: snafu::Error;

async fn send_claim(
Expand All @@ -24,7 +24,7 @@ pub trait ClaimSender: Debug + Sized {
// TxManagerClaimSender
// ------------------------------------------------------------------------------------------------

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct TxManagerClaimSender {}

#[derive(Debug, Snafu)]
Expand Down

0 comments on commit c36b25d

Please sign in to comment.