Skip to content

Commit

Permalink
chore : refactored code according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ocdbytes committed Nov 8, 2024
1 parent bbe4b8f commit 440c103
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 184 deletions.
2 changes: 1 addition & 1 deletion crates/orchestrator/src/alerts/aws_sns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Alerts for AWSSNS {

async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> {
let response = self.client.create_topic().name(topic_name).send().await?;
let topic_arn = response.topic_arn().unwrap_or_default();
let topic_arn = response.topic_arn().expect("Topic Not found");
log::info!("SNS topic created. Topic ARN: {}", topic_arn);
Ok(())
}
Expand Down
6 changes: 2 additions & 4 deletions crates/orchestrator/src/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use async_trait::async_trait;
use mockall::automock;
use utils::settings::env::EnvSettingsProvider;
use utils::settings::Settings;

pub mod aws_sns;
Expand All @@ -11,9 +10,8 @@ pub trait Alerts: Send + Sync {
/// To send an alert message to our alert service
async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>;
async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>;
async fn setup(&self) -> color_eyre::Result<()> {
let settings_provider = EnvSettingsProvider {};
let sns_topic_name = settings_provider.get_settings_or_panic("SNS_TOPIC_NAME");
async fn setup(&self, settings_provider: Box<dyn Settings>) -> color_eyre::Result<()> {
let sns_topic_name = settings_provider.get_settings_or_panic("ALERT_TOPIC_NAME");
self.create_alert(&sns_topic_name).await?;
Ok(())
}
Expand Down
102 changes: 102 additions & 0 deletions crates/orchestrator/src/cron/event_bridge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::time::Duration;

use async_trait::async_trait;
use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target};
use aws_sdk_sqs::types::QueueAttributeName;

use crate::cron::Cron;
use crate::queue::job_queue::WorkerTriggerType;
use crate::setup::SetupConfig;

pub struct AWSEventBridge {}

const WORKER_TRIGGER_RULE_NAME: &str = "worker_trigger_scheduled";

#[async_trait]
impl Cron for AWSEventBridge {
#[allow(unreachable_patterns)]
async fn setup_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
target_queue_name: String,
message: String,
worker_trigger_type: WorkerTriggerType,
) -> color_eyre::Result<()> {
let config = match config {
SetupConfig::AWS(config) => config,
_ => panic!("Unsupported Event Bridge configuration"),
};
let event_bridge_client = aws_sdk_eventbridge::Client::new(config);
let sqs_client = aws_sdk_sqs::Client::new(config);

event_bridge_client
.put_rule()
.name(WORKER_TRIGGER_RULE_NAME)
.schedule_expression(duration_to_rate_string(cron_time))
.state(RuleState::Enabled)
.send()
.await?;
let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?;

let queue_attributes = sqs_client
.get_queue_attributes()
.queue_url(queue_url.queue_url.unwrap())
.attribute_names(QueueAttributeName::QueueArn)
.send()
.await?;
let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap();

// Create the EventBridge target with the input transformer
let input_transformer =
InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?;

event_bridge_client
.put_targets()
.rule(WORKER_TRIGGER_RULE_NAME)
.targets(
Target::builder()
.id(format!("worker-trigger-target-{:?}", worker_trigger_type))
.arn(queue_arn)
.input_transformer(input_transformer)
.build()?,
)
.send()
.await?;

Ok(())
}
}

fn duration_to_rate_string(duration: Duration) -> String {
let total_secs = duration.as_secs();
let total_mins = duration.as_secs() / 60;
let total_hours = duration.as_secs() / 3600;
let total_days = duration.as_secs() / 86400;

if total_days > 0 {
format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" })
} else if total_hours > 0 {
format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" })
} else if total_mins > 0 {
format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" })
} else {
format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" })
}
}

#[cfg(test)]
mod event_bridge_utils_test {
use rstest::rstest;

use super::*;

#[rstest]
fn test_duration_to_rate_string() {
assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)");
assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)");
assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)");
assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)");
assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)");
}
}
50 changes: 50 additions & 0 deletions crates/orchestrator/src/cron/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::time::Duration;

use async_trait::async_trait;
use lazy_static::lazy_static;

use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType};
use crate::setup::SetupConfig;

pub mod event_bridge;

lazy_static! {
pub static ref CRON_DURATION: Duration = Duration::from_mins(1);
pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue");
pub static ref WORKER_TRIGGERS: Vec<WorkerTriggerType> = vec![
WorkerTriggerType::Snos,
WorkerTriggerType::Proving,
WorkerTriggerType::DataSubmission,
WorkerTriggerType::UpdateState
];
}

#[async_trait]
pub trait Cron {
async fn setup_cron(
&self,
config: &SetupConfig,
cron_time: Duration,
target_queue_name: String,
message: String,
worker_trigger_type: WorkerTriggerType,
) -> color_eyre::Result<()>;
async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> {
for triggers in WORKER_TRIGGERS.iter() {
self.setup_cron(
&config,
*CRON_DURATION,
TARGET_QUEUE_NAME.clone(),
get_worker_trigger_message(triggers.clone())?,
triggers.clone(),
)
.await?;
}
Ok(())
}
}

fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result<String> {
let message = WorkerTriggerMessage { worker: worker_trigger_type };
Ok(serde_json::to_string(&message)?)
}
5 changes: 3 additions & 2 deletions crates/orchestrator/src/data_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ pub trait DataStorage: Send + Sync {
async fn get_data(&self, key: &str) -> Result<Bytes>;
async fn put_data(&self, data: Bytes, key: &str) -> Result<()>;
async fn create_bucket(&self, bucket_name: &str) -> Result<()>;
async fn setup(&self, bucket_name: &str) -> Result<()> {
self.create_bucket(bucket_name).await
async fn setup(&self, settings_provider: Box<dyn Settings>) -> Result<()> {
let bucket_name = settings_provider.get_settings_or_panic("STORAGE_BUCKET_NAME");
self.create_bucket(&bucket_name).await
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![feature(duration_constructors)]

/// Contains the trait implementations for alerts
pub mod alerts;
/// Config of the service. Contains configurations for DB, Queues and other services.
pub mod config;
pub mod constants;
/// Controllers for the routes
pub mod controllers;
pub mod cron;
/// Contains the trait that implements the fetching functions
/// for blob and SNOS data from cloud for a particular block.
pub mod data_storage;
Expand Down
116 changes: 74 additions & 42 deletions crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,78 @@ use crate::config::Config;
use crate::jobs::JobError;
use crate::setup::SetupConfig;

#[derive(Clone)]
pub struct QueueConfig {
pub name: String,
pub visibility_timeout: i32,
pub max_receive_count: Option<i32>,
pub dlq_name: Option<String>,
}

lazy_static! {
pub static ref JOB_QUEUES: Vec<String> = vec![
String::from("madara_orchestrator_snos_job_processing_queue"),
String::from("madara_orchestrator_snos_job_verification_queue"),
String::from("madara_orchestrator_proving_job_processing_queue"),
String::from("madara_orchestrator_proving_job_verification_queue"),
String::from("madara_orchestrator_data_submission_job_processing_queue"),
String::from("madara_orchestrator_data_submission_job_verification_queue"),
String::from("madara_orchestrator_update_state_job_processing_queue"),
String::from("madara_orchestrator_update_state_job_verification_queue"),
];
pub static ref OTHER_QUEUES: Vec<String> = vec![
String::from("madara_orchestrator_job_handle_failure_queue"),
String::from("madara_orchestrator_worker_trigger_queue"),
];
pub static ref JOB_HANDLE_FAILURE_QUEUE: String = String::from("madara_orchestrator_job_handle_failure_queue");
pub static ref QUEUES: Vec<QueueConfig> = vec![
QueueConfig {
name: String::from("madara_orchestrator_snos_job_processing_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_snos_job_verification_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_processing_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_proving_job_verification_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_processing_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_data_submission_job_verification_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_processing_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_update_state_job_verification_queue"),
visibility_timeout: 300,
max_receive_count: Some(5),
dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone())
},
QueueConfig {
name: String::from("madara_orchestrator_job_handle_failure_queue"),
visibility_timeout: 300,
max_receive_count: None,
dlq_name: None
},
QueueConfig {
name: String::from("madara_orchestrator_worker_trigger_queue"),
visibility_timeout: 300,
max_receive_count: None,
dlq_name: None
},
];
}

/// Queue Provider Trait
Expand All @@ -42,36 +98,12 @@ lazy_static! {
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue: String) -> Result<Delivery, QueueError>;
async fn create_queue(&self, queue_name: &str, config: &SetupConfig) -> EyreResult<()>;
async fn setup_queue(
&self,
queue_name: &str,
config: &SetupConfig,
needs_dlq: Option<String>,
visibility_timeout: u32,
max_receive_count: Option<u32>,
) -> EyreResult<()>;
async fn setup(&self, config: SetupConfig, visibility_timeout: u32, max_receive_count: u32) -> EyreResult<()> {
// Creating the job queues :
for queue in JOB_QUEUES.iter() {
async fn create_queue(&self, queue_config: &QueueConfig, config: &SetupConfig) -> EyreResult<()>;
async fn setup(&self, config: SetupConfig) -> EyreResult<()> {
// Creating the queues :
for queue in QUEUES.iter() {
self.create_queue(queue, &config).await?;
}
// Creating the other queues :
for queue in OTHER_QUEUES.iter() {
self.create_queue(queue, &config).await?;
}

// Setting up the job queues :
for queue in JOB_QUEUES.iter() {
self.setup_queue(
queue,
&config,
Some(JOB_HANDLE_FAILURE_QUEUE.clone()),
visibility_timeout,
Some(max_receive_count),
)
.await?;
}
Ok(())
}
}
Expand Down
Loading

0 comments on commit 440c103

Please sign in to comment.