From 02618d145acff4d7cd3b90d362c2c4161718e7d6 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Wed, 13 Nov 2024 23:42:29 +0530 Subject: [PATCH] update: e2e fixed --- .../orchestrator/src/cli/cron/event_bridge.rs | 4 +- crates/orchestrator/src/cli/mod.rs | 2 +- crates/orchestrator/src/queue/mod.rs | 2 +- crates/orchestrator/src/setup/mod.rs | 10 +-- e2e-tests/src/node.rs | 82 ++++++++++++------- e2e-tests/tests.rs | 4 +- 6 files changed, 64 insertions(+), 40 deletions(-) diff --git a/crates/orchestrator/src/cli/cron/event_bridge.rs b/crates/orchestrator/src/cli/cron/event_bridge.rs index 321eab9b..db32a091 100644 --- a/crates/orchestrator/src/cli/cron/event_bridge.rs +++ b/crates/orchestrator/src/cli/cron/event_bridge.rs @@ -9,10 +9,10 @@ pub struct AWSEventBridgeCliArgs { pub aws_event_bridge: bool, /// The name of the S3 bucket. - #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-target-queue-name"), help = "The name of the SNS queue to send messages to from the event bridge.")] + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara_orchestrator_worker_trigger_queue"), help = "The name of the SNS queue to send messages to from the event bridge.")] pub target_queue_name: Option, /// The cron time for the event bridge trigger rule. - #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("10"), help = "The cron time for the event bridge trigger rule. Defaults to 10 seconds.")] + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("300"), help = "The cron time for the event bridge trigger rule. Defaults to 10 seconds.")] pub cron_time: Option, /// The name of the event bridge trigger rule. diff --git a/crates/orchestrator/src/cli/mod.rs b/crates/orchestrator/src/cli/mod.rs index 886738c3..e8320fa3 100644 --- a/crates/orchestrator/src/cli/mod.rs +++ b/crates/orchestrator/src/cli/mod.rs @@ -114,7 +114,7 @@ pub enum Commands { ), )] pub struct RunCmd { - // AWS Config + // Provider Config #[clap(flatten)] pub aws_config_args: AWSConfigCliArgs, diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index 57fa8f83..cf9d9ece 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -58,6 +58,7 @@ pub struct QueueConfig { // TODO: use QueueType::iter() or format! lazy_static! { pub static ref QUEUES: Vec = vec![ + QueueConfig { name: QueueType::JobHandleFailure, visibility_timeout: 300, dlq_config: None }, QueueConfig { name: QueueType::SnosJobProcessing, visibility_timeout: 300, @@ -98,7 +99,6 @@ lazy_static! { visibility_timeout: 300, dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: QueueType::JobHandleFailure }) }, - QueueConfig { name: QueueType::JobHandleFailure, visibility_timeout: 300, dlq_config: None }, QueueConfig { name: QueueType::WorkerTrigger, visibility_timeout: 300, dlq_config: None }, ]; } diff --git a/crates/orchestrator/src/setup/mod.rs b/crates/orchestrator/src/setup/mod.rs index 3957816f..10c4e8b0 100644 --- a/crates/orchestrator/src/setup/mod.rs +++ b/crates/orchestrator/src/setup/mod.rs @@ -24,13 +24,13 @@ pub enum SetupConfig { // TODO : move this to main.rs after moving to clap. pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> { - println!("Setting up cloud."); + println!("Setting up cloud. ⏳"); // AWS let provider_params = run_cmd.validate_provider_params().expect("Failed to validate provider params"); let provider_config = build_provider_config(&provider_params).await; // Data Storage - println!("Setting up data storage."); + println!("Setting up data storage. ⏳"); let data_storage_params = run_cmd.validate_storage_params().expect("Failed to validate storage params"); let aws_config = provider_config.get_aws_client_or_panic(); @@ -43,7 +43,7 @@ pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> { println!("Data storage setup completed ✅"); // Queues - println!("Setting up queues"); + println!("Setting up queues. ⏳"); let queue_params = run_cmd.validate_queue_params().expect("Failed to validate queue params"); match queue_params { QueueValidatedArgs::AWSSQS(aws_sqs_params) => { @@ -54,7 +54,7 @@ pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> { println!("Queues setup completed ✅"); // Cron - println!("Setting up cron"); + println!("Setting up cron. ⏳"); let cron_params = run_cmd.validate_cron_params().expect("Failed to validate cron params"); match cron_params { CronValidatedArgs::AWSEventBridge(aws_event_bridge_params) => { @@ -66,7 +66,7 @@ pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> { println!("Cron setup completed ✅"); // Alerts - println!("Setting up alerts."); + println!("Setting up alerts. ⏳"); let alert_params = run_cmd.validate_alert_params().expect("Failed to validate alert params"); match alert_params { AlertValidatedArgs::AWSSNS(aws_sns_params) => { diff --git a/e2e-tests/src/node.rs b/e2e-tests/src/node.rs index f491efd3..58c50a45 100644 --- a/e2e-tests/src/node.rs +++ b/e2e-tests/src/node.rs @@ -34,7 +34,6 @@ pub enum OrchestratorMode { #[strum(serialize = "setup")] Setup, } - impl Orchestrator { pub fn new(mode: OrchestratorMode, mut envs: Vec<(String, String)>) -> Option { let repository_root = &get_repository_root(); @@ -55,18 +54,19 @@ impl Orchestrator { .arg("--features") .arg("testing") .arg(mode_str) - .arg("--") .arg("--aws") - .arg("--settle-on-ethereum") .arg("--aws-s3") .arg("--aws-sqs") - .arg("--aws-sns") - .arg("--mongodb") - .arg("--sharp") - .arg("--da-on-ethereum"); + .arg("--aws-sns"); // Add event bridge arg only for setup mode - if !is_run_mode { + if is_run_mode { + command.arg("--settle-on-ethereum"); + command.arg("--da-on-ethereum"); + command.arg("--sharp"); + command.arg("--mongodb"); + + } else { command.arg("--aws-event-bridge"); } @@ -80,34 +80,58 @@ impl Orchestrator { String::new() }; - command.current_dir(repository_root).envs(envs).stdout(Stdio::piped()).stderr(Stdio::piped()); + command.current_dir(repository_root).envs(envs); + + // Handle stdout/stderr differently based on mode + if is_run_mode { + command.stdout(Stdio::piped()).stderr(Stdio::piped()); + } else { + // For setup mode, inherit the stdio to show output directly + command.stdout(Stdio::inherit()).stderr(Stdio::inherit()); + } let mut process = command.spawn().expect("Failed to start process"); - // Set up stdout and stderr handling for run mode - let stdout = process.stdout.take().expect("Failed to capture stdout"); - thread::spawn(move || { - let reader = BufReader::new(stdout); - reader.lines().for_each(|line| { - if let Ok(line) = line { - println!("STDOUT: {}", line); - } + // Set up stdout and stderr handling only for run mode + if is_run_mode { + let stdout = process.stdout.take().expect("Failed to capture stdout"); + thread::spawn(move || { + let reader = BufReader::new(stdout); + reader.lines().for_each(|line| { + if let Ok(line) = line { + println!("STDOUT: {}", line); + } + }); }); - }); - - let stderr = process.stderr.take().expect("Failed to capture stderr"); - thread::spawn(move || { - let reader = BufReader::new(stderr); - reader.lines().for_each(|line| { - if let Ok(line) = line { - eprintln!("STDERR: {}", line); - } + + let stderr = process.stderr.take().expect("Failed to capture stderr"); + thread::spawn(move || { + let reader = BufReader::new(stderr); + reader.lines().for_each(|line| { + if let Ok(line) = line { + eprintln!("STDERR: {}", line); + } + }); }); - }); + } - if is_run_mode { Some(Self { process, address }) } else { None } + if is_run_mode { Some(Self { process, address }) } else { + // Wait for the process to complete and get its exit status + let status = process.wait().expect("Failed to wait for process"); + if status.success() { + println!("Orchestrator cloud setup completed ✅"); + } else { + // Get the exit code if available + if let Some(code) = status.code() { + println!("Orchestrator cloud setup failed with exit code: {}", code); + } else { + println!("Orchestrator cloud setup terminated by signal"); + } + } + None + } } - + pub fn endpoint(&self) -> Url { Url::parse(&format!("http://{}", self.address)).unwrap() } diff --git a/e2e-tests/tests.rs b/e2e-tests/tests.rs index 2cd91a59..8ff2900f 100644 --- a/e2e-tests/tests.rs +++ b/e2e-tests/tests.rs @@ -317,14 +317,14 @@ pub async fn put_message_in_queue(message: JobQueueMessage, queue_url: String) - let rsp = client.send_message().queue_url(queue_url).message_body(serde_json::to_string(&message)?).send().await?; - println!("Successfully sent message with ID: {:?}", rsp.message_id()); + println!("✅ Successfully sent message with ID: {:?}", rsp.message_id()); Ok(()) } /// Mocks the endpoint for sharp client pub async fn mock_proving_job_endpoint_output(sharp_client: &mut SharpClient) { - // Add job response + // Add job response, let add_job_response = json!( { "code" : "JOB_RECEIVED_SUCCESSFULLY"