From 440c1035373948dea704113d881ea6cbd70b54ce Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 8 Nov 2024 12:04:31 +0700 Subject: [PATCH] chore : refactored code according to comments --- crates/orchestrator/src/alerts/aws_sns/mod.rs | 2 +- crates/orchestrator/src/alerts/mod.rs | 6 +- crates/orchestrator/src/cron/event_bridge.rs | 102 +++++++++++++++ crates/orchestrator/src/cron/mod.rs | 50 ++++++++ crates/orchestrator/src/data_storage/mod.rs | 5 +- crates/orchestrator/src/lib.rs | 3 + crates/orchestrator/src/queue/mod.rs | 116 +++++++++++------- crates/orchestrator/src/queue/sqs/mod.rs | 38 ++---- crates/orchestrator/src/setup/mod.rs | 53 ++------ .../src/setup/worker_triggers/event_bridge.rs | 62 ---------- .../src/setup/worker_triggers/mod.rs | 1 - crates/utils/src/settings/mod.rs | 2 +- 12 files changed, 256 insertions(+), 184 deletions(-) create mode 100644 crates/orchestrator/src/cron/event_bridge.rs create mode 100644 crates/orchestrator/src/cron/mod.rs delete mode 100644 crates/orchestrator/src/setup/worker_triggers/event_bridge.rs delete mode 100644 crates/orchestrator/src/setup/worker_triggers/mod.rs diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs index 031ec7fe..3416be91 100644 --- a/crates/orchestrator/src/alerts/aws_sns/mod.rs +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -35,7 +35,7 @@ impl Alerts for AWSSNS { async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()> { let response = self.client.create_topic().name(topic_name).send().await?; - let topic_arn = response.topic_arn().unwrap_or_default(); + let topic_arn = response.topic_arn().expect("Topic Not found"); log::info!("SNS topic created. Topic ARN: {}", topic_arn); Ok(()) } diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs index 088cd416..30220cf8 100644 --- a/crates/orchestrator/src/alerts/mod.rs +++ b/crates/orchestrator/src/alerts/mod.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; use mockall::automock; -use utils::settings::env::EnvSettingsProvider; use utils::settings::Settings; pub mod aws_sns; @@ -11,9 +10,8 @@ pub trait Alerts: Send + Sync { /// To send an alert message to our alert service async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>; async fn create_alert(&self, topic_name: &str) -> color_eyre::Result<()>; - async fn setup(&self) -> color_eyre::Result<()> { - let settings_provider = EnvSettingsProvider {}; - let sns_topic_name = settings_provider.get_settings_or_panic("SNS_TOPIC_NAME"); + async fn setup(&self, settings_provider: Box) -> color_eyre::Result<()> { + let sns_topic_name = settings_provider.get_settings_or_panic("ALERT_TOPIC_NAME"); self.create_alert(&sns_topic_name).await?; Ok(()) } diff --git a/crates/orchestrator/src/cron/event_bridge.rs b/crates/orchestrator/src/cron/event_bridge.rs new file mode 100644 index 00000000..7c9676d7 --- /dev/null +++ b/crates/orchestrator/src/cron/event_bridge.rs @@ -0,0 +1,102 @@ +use std::time::Duration; + +use async_trait::async_trait; +use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target}; +use aws_sdk_sqs::types::QueueAttributeName; + +use crate::cron::Cron; +use crate::queue::job_queue::WorkerTriggerType; +use crate::setup::SetupConfig; + +pub struct AWSEventBridge {} + +const WORKER_TRIGGER_RULE_NAME: &str = "worker_trigger_scheduled"; + +#[async_trait] +impl Cron for AWSEventBridge { + #[allow(unreachable_patterns)] + async fn setup_cron( + &self, + config: &SetupConfig, + cron_time: Duration, + target_queue_name: String, + message: String, + worker_trigger_type: WorkerTriggerType, + ) -> color_eyre::Result<()> { + let config = match config { + SetupConfig::AWS(config) => config, + _ => panic!("Unsupported Event Bridge configuration"), + }; + let event_bridge_client = aws_sdk_eventbridge::Client::new(config); + let sqs_client = aws_sdk_sqs::Client::new(config); + + event_bridge_client + .put_rule() + .name(WORKER_TRIGGER_RULE_NAME) + .schedule_expression(duration_to_rate_string(cron_time)) + .state(RuleState::Enabled) + .send() + .await?; + let queue_url = sqs_client.get_queue_url().queue_name(target_queue_name).send().await?; + + let queue_attributes = sqs_client + .get_queue_attributes() + .queue_url(queue_url.queue_url.unwrap()) + .attribute_names(QueueAttributeName::QueueArn) + .send() + .await?; + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap(); + + // Create the EventBridge target with the input transformer + let input_transformer = + InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?; + + event_bridge_client + .put_targets() + .rule(WORKER_TRIGGER_RULE_NAME) + .targets( + Target::builder() + .id(format!("worker-trigger-target-{:?}", worker_trigger_type)) + .arn(queue_arn) + .input_transformer(input_transformer) + .build()?, + ) + .send() + .await?; + + Ok(()) + } +} + +fn duration_to_rate_string(duration: Duration) -> String { + let total_secs = duration.as_secs(); + let total_mins = duration.as_secs() / 60; + let total_hours = duration.as_secs() / 3600; + let total_days = duration.as_secs() / 86400; + + if total_days > 0 { + format!("rate({} day{})", total_days, if total_days == 1 { "" } else { "s" }) + } else if total_hours > 0 { + format!("rate({} hour{})", total_hours, if total_hours == 1 { "" } else { "s" }) + } else if total_mins > 0 { + format!("rate({} minute{})", total_mins, if total_mins == 1 { "" } else { "s" }) + } else { + format!("rate({} second{})", total_secs, if total_secs == 1 { "" } else { "s" }) + } +} + +#[cfg(test)] +mod event_bridge_utils_test { + use rstest::rstest; + + use super::*; + + #[rstest] + fn test_duration_to_rate_string() { + assert_eq!(duration_to_rate_string(Duration::from_secs(60)), "rate(1 minute)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(120)), "rate(2 minutes)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(30)), "rate(30 seconds)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(3600)), "rate(1 hour)"); + assert_eq!(duration_to_rate_string(Duration::from_secs(86400)), "rate(1 day)"); + } +} diff --git a/crates/orchestrator/src/cron/mod.rs b/crates/orchestrator/src/cron/mod.rs new file mode 100644 index 00000000..7e8b02dd --- /dev/null +++ b/crates/orchestrator/src/cron/mod.rs @@ -0,0 +1,50 @@ +use std::time::Duration; + +use async_trait::async_trait; +use lazy_static::lazy_static; + +use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType}; +use crate::setup::SetupConfig; + +pub mod event_bridge; + +lazy_static! { + pub static ref CRON_DURATION: Duration = Duration::from_mins(1); + pub static ref TARGET_QUEUE_NAME: String = String::from("madara_orchestrator_worker_trigger_queue"); + pub static ref WORKER_TRIGGERS: Vec = vec![ + WorkerTriggerType::Snos, + WorkerTriggerType::Proving, + WorkerTriggerType::DataSubmission, + WorkerTriggerType::UpdateState + ]; +} + +#[async_trait] +pub trait Cron { + async fn setup_cron( + &self, + config: &SetupConfig, + cron_time: Duration, + target_queue_name: String, + message: String, + worker_trigger_type: WorkerTriggerType, + ) -> color_eyre::Result<()>; + async fn setup(&self, config: SetupConfig) -> color_eyre::Result<()> { + for triggers in WORKER_TRIGGERS.iter() { + self.setup_cron( + &config, + *CRON_DURATION, + TARGET_QUEUE_NAME.clone(), + get_worker_trigger_message(triggers.clone())?, + triggers.clone(), + ) + .await?; + } + Ok(()) + } +} + +fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result { + let message = WorkerTriggerMessage { worker: worker_trigger_type }; + Ok(serde_json::to_string(&message)?) +} diff --git a/crates/orchestrator/src/data_storage/mod.rs b/crates/orchestrator/src/data_storage/mod.rs index 71ffcfd4..f85b74f2 100644 --- a/crates/orchestrator/src/data_storage/mod.rs +++ b/crates/orchestrator/src/data_storage/mod.rs @@ -22,8 +22,9 @@ pub trait DataStorage: Send + Sync { async fn get_data(&self, key: &str) -> Result; async fn put_data(&self, data: Bytes, key: &str) -> Result<()>; async fn create_bucket(&self, bucket_name: &str) -> Result<()>; - async fn setup(&self, bucket_name: &str) -> Result<()> { - self.create_bucket(bucket_name).await + async fn setup(&self, settings_provider: Box) -> Result<()> { + let bucket_name = settings_provider.get_settings_or_panic("STORAGE_BUCKET_NAME"); + self.create_bucket(&bucket_name).await } } diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 6013bc80..86d432b5 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(duration_constructors)] + /// Contains the trait implementations for alerts pub mod alerts; /// Config of the service. Contains configurations for DB, Queues and other services. @@ -5,6 +7,7 @@ pub mod config; pub mod constants; /// Controllers for the routes pub mod controllers; +pub mod cron; /// Contains the trait that implements the fetching functions /// for blob and SNOS data from cloud for a particular block. pub mod data_storage; diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index 3cc342d3..a1d46399 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -14,22 +14,78 @@ use crate::config::Config; use crate::jobs::JobError; use crate::setup::SetupConfig; +#[derive(Clone)] +pub struct QueueConfig { + pub name: String, + pub visibility_timeout: i32, + pub max_receive_count: Option, + pub dlq_name: Option, +} + lazy_static! { - pub static ref JOB_QUEUES: Vec = vec![ - String::from("madara_orchestrator_snos_job_processing_queue"), - String::from("madara_orchestrator_snos_job_verification_queue"), - String::from("madara_orchestrator_proving_job_processing_queue"), - String::from("madara_orchestrator_proving_job_verification_queue"), - String::from("madara_orchestrator_data_submission_job_processing_queue"), - String::from("madara_orchestrator_data_submission_job_verification_queue"), - String::from("madara_orchestrator_update_state_job_processing_queue"), - String::from("madara_orchestrator_update_state_job_verification_queue"), - ]; - pub static ref OTHER_QUEUES: Vec = vec![ - String::from("madara_orchestrator_job_handle_failure_queue"), - String::from("madara_orchestrator_worker_trigger_queue"), - ]; pub static ref JOB_HANDLE_FAILURE_QUEUE: String = String::from("madara_orchestrator_job_handle_failure_queue"); + pub static ref QUEUES: Vec = vec![ + QueueConfig { + name: String::from("madara_orchestrator_snos_job_processing_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_snos_job_verification_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_proving_job_processing_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_proving_job_verification_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_data_submission_job_processing_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_data_submission_job_verification_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_update_state_job_processing_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_update_state_job_verification_queue"), + visibility_timeout: 300, + max_receive_count: Some(5), + dlq_name: Some(JOB_HANDLE_FAILURE_QUEUE.clone()) + }, + QueueConfig { + name: String::from("madara_orchestrator_job_handle_failure_queue"), + visibility_timeout: 300, + max_receive_count: None, + dlq_name: None + }, + QueueConfig { + name: String::from("madara_orchestrator_worker_trigger_queue"), + visibility_timeout: 300, + max_receive_count: None, + dlq_name: None + }, + ]; } /// Queue Provider Trait @@ -42,36 +98,12 @@ lazy_static! { pub trait QueueProvider: Send + Sync { async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> EyreResult<()>; async fn consume_message_from_queue(&self, queue: String) -> Result; - async fn create_queue(&self, queue_name: &str, config: &SetupConfig) -> EyreResult<()>; - async fn setup_queue( - &self, - queue_name: &str, - config: &SetupConfig, - needs_dlq: Option, - visibility_timeout: u32, - max_receive_count: Option, - ) -> EyreResult<()>; - async fn setup(&self, config: SetupConfig, visibility_timeout: u32, max_receive_count: u32) -> EyreResult<()> { - // Creating the job queues : - for queue in JOB_QUEUES.iter() { + async fn create_queue(&self, queue_config: &QueueConfig, config: &SetupConfig) -> EyreResult<()>; + async fn setup(&self, config: SetupConfig) -> EyreResult<()> { + // Creating the queues : + for queue in QUEUES.iter() { self.create_queue(queue, &config).await?; } - // Creating the other queues : - for queue in OTHER_QUEUES.iter() { - self.create_queue(queue, &config).await?; - } - - // Setting up the job queues : - for queue in JOB_QUEUES.iter() { - self.setup_queue( - queue, - &config, - Some(JOB_HANDLE_FAILURE_QUEUE.clone()), - visibility_timeout, - Some(max_receive_count), - ) - .await?; - } Ok(()) } } diff --git a/crates/orchestrator/src/queue/sqs/mod.rs b/crates/orchestrator/src/queue/sqs/mod.rs index 7b2b73c8..e5234d3c 100644 --- a/crates/orchestrator/src/queue/sqs/mod.rs +++ b/crates/orchestrator/src/queue/sqs/mod.rs @@ -16,7 +16,7 @@ use crate::queue::job_queue::{ PROVING_JOB_VERIFICATION_QUEUE, SNOS_JOB_PROCESSING_QUEUE, SNOS_JOB_VERIFICATION_QUEUE, UPDATE_STATE_JOB_PROCESSING_QUEUE, UPDATE_STATE_JOB_VERIFICATION_QUEUE, WORKER_TRIGGER_QUEUE, }; -use crate::queue::QueueProvider; +use crate::queue::{QueueConfig, QueueProvider}; use crate::setup::SetupConfig; pub struct SqsQueue; @@ -60,45 +60,27 @@ impl QueueProvider for SqsQueue { consumer.receive().await } - async fn create_queue(&self, queue_name: &str, config: &SetupConfig) -> Result<()> { + async fn create_queue(&self, queue_config: &QueueConfig, config: &SetupConfig) -> Result<()> { let config = match config { SetupConfig::AWS(config) => config, _ => panic!("Unsupported SQS configuration"), }; let sqs_client = Client::new(config); - let res = sqs_client.create_queue().queue_name(queue_name).send().await?; - log::debug!("Created queue: {} | Queue URL : {:?}", queue_name, res.queue_url); - - Ok(()) - } - - async fn setup_queue( - &self, - queue_name: &str, - config: &SetupConfig, - needs_dlq: Option, - visibility_timeout: u32, - max_receive_count: Option, - ) -> Result<()> { - let config = match config { - SetupConfig::AWS(config) => config, - _ => panic!("Unsupported SQS configuration"), - }; - let sqs_client = Client::new(config); - let queue_url = Self::determine_queue_url(queue_name, &sqs_client).await?; + let res = sqs_client.create_queue().queue_name(&queue_config.name).send().await?; + let queue_url = res.queue_url().expect("Not able to get queue url from result"); let mut attributes = HashMap::new(); - attributes.insert(QueueAttributeName::VisibilityTimeout, visibility_timeout.to_string()); + attributes.insert(QueueAttributeName::VisibilityTimeout, queue_config.visibility_timeout.to_string()); - if let Some(queue) = needs_dlq { - let dlq_url = Self::determine_queue_url(&queue, &sqs_client).await?; + if let Some(queue) = &queue_config.dlq_name { + let dlq_url = Self::get_queue_url_from_client(queue, &sqs_client).await?; let dlq_arn = Self::get_queue_arn(&sqs_client, &dlq_url).await?; let policy = format!( r#"{{"deadLetterTargetArn":"{}","maxReceiveCount":"{}"}}"#, dlq_arn, - max_receive_count.unwrap_or(0) + &queue_config.max_receive_count.unwrap_or(0) ); - attributes.insert(QueueAttributeName::RedrivePolicy, policy.to_string()); + attributes.insert(QueueAttributeName::RedrivePolicy, policy); } sqs_client.set_queue_attributes().queue_url(queue_url).set_attributes(Some(attributes)).send().await?; @@ -109,7 +91,7 @@ impl QueueProvider for SqsQueue { impl SqsQueue { /// To get the queue url from the given queue name - async fn determine_queue_url(queue_name: &str, sqs_client: &Client) -> Result { + async fn get_queue_url_from_client(queue_name: &str, sqs_client: &Client) -> Result { Ok(sqs_client .get_queue_url() .queue_name(queue_name) diff --git a/crates/orchestrator/src/setup/mod.rs b/crates/orchestrator/src/setup/mod.rs index f17f4c85..317453a2 100644 --- a/crates/orchestrator/src/setup/mod.rs +++ b/crates/orchestrator/src/setup/mod.rs @@ -1,5 +1,3 @@ -mod worker_triggers; - use std::process::Command; use std::sync::Arc; @@ -8,16 +6,14 @@ use aws_config::{from_env, Region, SdkConfig}; use aws_credential_types::provider::ProvideCredentials; use utils::env_utils::get_env_var_or_panic; use utils::settings::env::EnvSettingsProvider; -use utils::settings::Settings; use crate::alerts::aws_sns::AWSSNS; use crate::alerts::Alerts; use crate::config::{get_aws_config, ProviderConfig}; +use crate::cron::Cron; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::DataStorage; -use crate::queue::job_queue::WorkerTriggerType; use crate::queue::QueueProvider; -use crate::setup::worker_triggers::event_bridge::setup_event_bridge_for_trigger_type; #[derive(Clone)] pub enum SetupConfig { @@ -28,11 +24,6 @@ pub enum ConfigType { AWS, } -const WORKER_TRIGGER_RULE: &str = "worker_trigger_scheduled"; -const WORKER_TRIGGER_QUEUE_NAME: &str = "madara_orchestrator_worker_trigger_queue"; -const QUEUE_VISIBILITY_TIMEOUT: u32 = 1800; -const QUEUE_MAX_RECEIVE_COUNT: u32 = 5; - async fn setup_config(client_type: ConfigType) -> SetupConfig { match client_type { ConfigType::AWS => { @@ -43,6 +34,7 @@ async fn setup_config(client_type: ConfigType) -> SetupConfig { } } +// TODO : move this to main.rs after moving to clap. pub async fn setup_cloud() -> color_eyre::Result<()> { log::info!("Setting up cloud."); let settings_provider = EnvSettingsProvider {}; @@ -52,7 +44,7 @@ pub async fn setup_cloud() -> color_eyre::Result<()> { match get_env_var_or_panic("DATA_STORAGE").as_str() { "s3" => { let s3 = Box::new(AWSS3::new_with_settings(&settings_provider, provider_config.clone()).await); - s3.setup(&settings_provider.get_settings_or_panic("AWS_S3_BUCKET_NAME")).await? + s3.setup(Box::new(settings_provider.clone())).await? } _ => panic!("Unsupported Storage Client"), } @@ -63,53 +55,28 @@ pub async fn setup_cloud() -> color_eyre::Result<()> { "sqs" => { let config = setup_config(ConfigType::AWS).await; let sqs = Box::new(crate::queue::sqs::SqsQueue {}); - sqs.setup(config, QUEUE_VISIBILITY_TIMEOUT, QUEUE_MAX_RECEIVE_COUNT).await? + sqs.setup(config).await? } _ => panic!("Unsupported Queue Client"), } log::info!("Queues setup completed ✅"); - log::info!("Setting up event bridge"); - match get_env_var_or_panic("EVENT_BRIDGE_PROVIDER").as_str() { + log::info!("Setting up cron"); + match get_env_var_or_panic("CRON_PROVIDER").as_str() { "event_bridge" => { let config = setup_config(ConfigType::AWS).await; - setup_event_bridge_for_trigger_type( - WorkerTriggerType::Snos, - &config, - WORKER_TRIGGER_RULE, - WORKER_TRIGGER_QUEUE_NAME, - ) - .await?; - setup_event_bridge_for_trigger_type( - WorkerTriggerType::Proving, - &config, - WORKER_TRIGGER_RULE, - WORKER_TRIGGER_QUEUE_NAME, - ) - .await?; - setup_event_bridge_for_trigger_type( - WorkerTriggerType::DataSubmission, - &config, - WORKER_TRIGGER_RULE, - WORKER_TRIGGER_QUEUE_NAME, - ) - .await?; - setup_event_bridge_for_trigger_type( - WorkerTriggerType::UpdateState, - &config, - WORKER_TRIGGER_RULE, - WORKER_TRIGGER_QUEUE_NAME, - ) - .await?; + let event_bridge = Box::new(crate::cron::event_bridge::AWSEventBridge {}); + event_bridge.setup(config).await? } _ => panic!("Unsupported Event Bridge Client"), } + log::info!("Cron setup completed ✅"); log::info!("Setting up alerts."); match get_env_var_or_panic("ALERTS").as_str() { "sns" => { let sns = Box::new(AWSSNS::new_with_settings(&settings_provider, provider_config).await); - sns.setup().await? + sns.setup(Box::new(settings_provider)).await? } _ => panic!("Unsupported Alert Client"), } diff --git a/crates/orchestrator/src/setup/worker_triggers/event_bridge.rs b/crates/orchestrator/src/setup/worker_triggers/event_bridge.rs deleted file mode 100644 index 04ae4f64..00000000 --- a/crates/orchestrator/src/setup/worker_triggers/event_bridge.rs +++ /dev/null @@ -1,62 +0,0 @@ -use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target}; -use aws_sdk_sqs::types::QueueAttributeName; - -use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType}; -use crate::setup::SetupConfig; - -#[allow(unreachable_patterns)] -pub async fn setup_event_bridge_for_trigger_type( - worker_trigger_type: WorkerTriggerType, - config: &SetupConfig, - rule_name: &str, - worker_trigger_queue_name: &str, -) -> color_eyre::Result<()> { - let config = match config { - SetupConfig::AWS(config) => config, - _ => panic!("Unsupported Event Bridge configuration"), - }; - let event_bridge_client = aws_sdk_eventbridge::Client::new(config); - let sqs_client = aws_sdk_sqs::Client::new(config); - - event_bridge_client - .put_rule() - .name(rule_name) - .schedule_expression("rate(1 minute)") - .state(RuleState::Enabled) - .send() - .await?; - let queue_url = sqs_client.get_queue_url().queue_name(worker_trigger_queue_name).send().await?; - - let queue_attributes = sqs_client - .get_queue_attributes() - .queue_url(queue_url.queue_url.unwrap()) - .attribute_names(QueueAttributeName::QueueArn) - .send() - .await?; - let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap(); - - // Create a sample WorkerTriggerMessage - let message = WorkerTriggerMessage { worker: worker_trigger_type.clone() }; - let event_detail = serde_json::to_string(&message)?; - - // Create the EventBridge target with the input transformer - let input_transformer = InputTransformer::builder() - .input_paths_map("$.time", "time") - .input_template(event_detail.to_string()) - .build()?; - - event_bridge_client - .put_targets() - .rule(rule_name) - .targets( - Target::builder() - .id(format!("worker-trigger-target-{:?}", worker_trigger_type)) - .arn(queue_arn) - .input_transformer(input_transformer) - .build()?, - ) - .send() - .await?; - - Ok(()) -} diff --git a/crates/orchestrator/src/setup/worker_triggers/mod.rs b/crates/orchestrator/src/setup/worker_triggers/mod.rs deleted file mode 100644 index bc93c735..00000000 --- a/crates/orchestrator/src/setup/worker_triggers/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod event_bridge; diff --git a/crates/utils/src/settings/mod.rs b/crates/utils/src/settings/mod.rs index 05b0d787..b6fa3bba 100644 --- a/crates/utils/src/settings/mod.rs +++ b/crates/utils/src/settings/mod.rs @@ -6,6 +6,6 @@ pub enum SettingsProviderError { Internal(#[source] Box), } -pub trait Settings { +pub trait Settings: Send { fn get_settings_or_panic(&self, name: &'static str) -> String; }