Skip to content

Commit

Permalink
update: e2e fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Nov 13, 2024
1 parent 414d624 commit 02618d1
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 40 deletions.
4 changes: 2 additions & 2 deletions crates/orchestrator/src/cli/cron/event_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ pub struct AWSEventBridgeCliArgs {
pub aws_event_bridge: bool,

/// The name of the S3 bucket.
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-target-queue-name"), help = "The name of the SNS queue to send messages to from the event bridge.")]
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME", long, default_value = Some("madara_orchestrator_worker_trigger_queue"), help = "The name of the SNS queue to send messages to from the event bridge.")]
pub target_queue_name: Option<String>,
/// The cron time for the event bridge trigger rule.
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("10"), help = "The cron time for the event bridge trigger rule. Defaults to 10 seconds.")]
#[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_CRON_TIME", long, default_value = Some("300"), help = "The cron time for the event bridge trigger rule. Defaults to 10 seconds.")]
pub cron_time: Option<String>,

/// The name of the event bridge trigger rule.
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub enum Commands {
),
)]
pub struct RunCmd {
// AWS Config
// Provider Config
#[clap(flatten)]
pub aws_config_args: AWSConfigCliArgs,

Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct QueueConfig {
// TODO: use QueueType::iter() or format!
lazy_static! {
pub static ref QUEUES: Vec<QueueConfig> = vec![
QueueConfig { name: QueueType::JobHandleFailure, visibility_timeout: 300, dlq_config: None },
QueueConfig {
name: QueueType::SnosJobProcessing,
visibility_timeout: 300,
Expand Down Expand Up @@ -98,7 +99,6 @@ lazy_static! {
visibility_timeout: 300,
dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: QueueType::JobHandleFailure })
},
QueueConfig { name: QueueType::JobHandleFailure, visibility_timeout: 300, dlq_config: None },
QueueConfig { name: QueueType::WorkerTrigger, visibility_timeout: 300, dlq_config: None },
];
}
Expand Down
10 changes: 5 additions & 5 deletions crates/orchestrator/src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ pub enum SetupConfig {

// TODO : move this to main.rs after moving to clap.
pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> {
println!("Setting up cloud.");
println!("Setting up cloud.");
// AWS
let provider_params = run_cmd.validate_provider_params().expect("Failed to validate provider params");
let provider_config = build_provider_config(&provider_params).await;

// Data Storage
println!("Setting up data storage.");
println!("Setting up data storage.");
let data_storage_params = run_cmd.validate_storage_params().expect("Failed to validate storage params");
let aws_config = provider_config.get_aws_client_or_panic();

Expand All @@ -43,7 +43,7 @@ pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> {
println!("Data storage setup completed ✅");

// Queues
println!("Setting up queues");
println!("Setting up queues. ⏳");
let queue_params = run_cmd.validate_queue_params().expect("Failed to validate queue params");
match queue_params {
QueueValidatedArgs::AWSSQS(aws_sqs_params) => {
Expand All @@ -54,7 +54,7 @@ pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> {
println!("Queues setup completed ✅");

// Cron
println!("Setting up cron");
println!("Setting up cron. ⏳");
let cron_params = run_cmd.validate_cron_params().expect("Failed to validate cron params");
match cron_params {
CronValidatedArgs::AWSEventBridge(aws_event_bridge_params) => {
Expand All @@ -66,7 +66,7 @@ pub async fn setup_cloud(run_cmd: &SetupCmd) -> color_eyre::Result<()> {
println!("Cron setup completed ✅");

// Alerts
println!("Setting up alerts.");
println!("Setting up alerts.");
let alert_params = run_cmd.validate_alert_params().expect("Failed to validate alert params");
match alert_params {
AlertValidatedArgs::AWSSNS(aws_sns_params) => {
Expand Down
82 changes: 53 additions & 29 deletions e2e-tests/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub enum OrchestratorMode {
#[strum(serialize = "setup")]
Setup,
}

impl Orchestrator {
pub fn new(mode: OrchestratorMode, mut envs: Vec<(String, String)>) -> Option<Self> {
let repository_root = &get_repository_root();
Expand All @@ -55,18 +54,19 @@ impl Orchestrator {
.arg("--features")
.arg("testing")
.arg(mode_str)
.arg("--")
.arg("--aws")
.arg("--settle-on-ethereum")
.arg("--aws-s3")
.arg("--aws-sqs")
.arg("--aws-sns")
.arg("--mongodb")
.arg("--sharp")
.arg("--da-on-ethereum");
.arg("--aws-sns");

// Add event bridge arg only for setup mode
if !is_run_mode {
if is_run_mode {
command.arg("--settle-on-ethereum");
command.arg("--da-on-ethereum");
command.arg("--sharp");
command.arg("--mongodb");

} else {
command.arg("--aws-event-bridge");
}

Expand All @@ -80,34 +80,58 @@ impl Orchestrator {
String::new()
};

command.current_dir(repository_root).envs(envs).stdout(Stdio::piped()).stderr(Stdio::piped());
command.current_dir(repository_root).envs(envs);

// Handle stdout/stderr differently based on mode
if is_run_mode {
command.stdout(Stdio::piped()).stderr(Stdio::piped());
} else {
// For setup mode, inherit the stdio to show output directly
command.stdout(Stdio::inherit()).stderr(Stdio::inherit());
}

let mut process = command.spawn().expect("Failed to start process");

// Set up stdout and stderr handling for run mode
let stdout = process.stdout.take().expect("Failed to capture stdout");
thread::spawn(move || {
let reader = BufReader::new(stdout);
reader.lines().for_each(|line| {
if let Ok(line) = line {
println!("STDOUT: {}", line);
}
// Set up stdout and stderr handling only for run mode
if is_run_mode {
let stdout = process.stdout.take().expect("Failed to capture stdout");
thread::spawn(move || {
let reader = BufReader::new(stdout);
reader.lines().for_each(|line| {
if let Ok(line) = line {
println!("STDOUT: {}", line);
}
});
});
});

let stderr = process.stderr.take().expect("Failed to capture stderr");
thread::spawn(move || {
let reader = BufReader::new(stderr);
reader.lines().for_each(|line| {
if let Ok(line) = line {
eprintln!("STDERR: {}", line);
}

let stderr = process.stderr.take().expect("Failed to capture stderr");
thread::spawn(move || {
let reader = BufReader::new(stderr);
reader.lines().for_each(|line| {
if let Ok(line) = line {
eprintln!("STDERR: {}", line);
}
});
});
});
}

if is_run_mode { Some(Self { process, address }) } else { None }
if is_run_mode { Some(Self { process, address }) } else {
// Wait for the process to complete and get its exit status
let status = process.wait().expect("Failed to wait for process");
if status.success() {
println!("Orchestrator cloud setup completed ✅");
} else {
// Get the exit code if available
if let Some(code) = status.code() {
println!("Orchestrator cloud setup failed with exit code: {}", code);
} else {
println!("Orchestrator cloud setup terminated by signal");
}
}
None
}
}

pub fn endpoint(&self) -> Url {
Url::parse(&format!("http://{}", self.address)).unwrap()
}
Expand Down
4 changes: 2 additions & 2 deletions e2e-tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,14 @@ pub async fn put_message_in_queue(message: JobQueueMessage, queue_url: String) -

let rsp = client.send_message().queue_url(queue_url).message_body(serde_json::to_string(&message)?).send().await?;

println!("Successfully sent message with ID: {:?}", rsp.message_id());
println!("Successfully sent message with ID: {:?}", rsp.message_id());

Ok(())
}

/// Mocks the endpoint for sharp client
pub async fn mock_proving_job_endpoint_output(sharp_client: &mut SharpClient) {
// Add job response
// Add job response,
let add_job_response = json!(
{
"code" : "JOB_RECEIVED_SUCCESSFULLY"
Expand Down

0 comments on commit 02618d1

Please sign in to comment.