diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index be75f69e..47f8e69e 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -1,11 +1,8 @@ -use axum::Router; use dotenvy::dotenv; use orchestrator::config::init_config; use orchestrator::queue::init_consumers; -use orchestrator::routes::app_routes::{app_router, handler_404}; -use orchestrator::routes::job_routes::job_routes; +use orchestrator::routes::setup_server; use orchestrator::telemetry::{setup_analytics, shutdown_analytics}; -use utils::env_utils::get_env_var_or_default; /// Start the server #[tokio::main] @@ -24,15 +21,8 @@ async fn main() { let config = init_config().await.expect("Config instantiation failed"); tracing::debug!(service = "orchestrator", "Configuration initialized"); - let host = get_env_var_or_default("HOST", "127.0.0.1"); - let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); - let address = format!("{}:{}", host, port); - let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener"); - - let job_routes = job_routes(config.clone()); - let app_routes = app_router(); - - let app = Router::new().merge(app_routes).merge(job_routes).fallback(handler_404); + // initialize the server + let _ = setup_server(config.clone()).await; tracing::debug!(service = "orchestrator", "Application router initialized"); @@ -45,11 +35,6 @@ async fn main() { } } - if let Err(e) = axum::serve(listener, app).await { - tracing::error!(service = "orchestrator", error = %e, "Server failed to start"); - panic!("Failed to start axum server: {}", e); - } - // Analytics Shutdown shutdown_analytics(meter_provider); tracing::info!(service = "orchestrator", "Orchestrator service shutting down"); diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 8f592e8d..aeeb25c2 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -65,11 +65,11 @@ async fn handle_verify_job_request( Err(e) => ApiResponse::::error(e.to_string()).into_response(), } } -pub fn job_routes(config: Arc) -> Router { - Router::new().nest("/jobs", trigger_routes(config.clone())) +pub fn job_router(config: Arc) -> Router { + Router::new().nest("/jobs", trigger_router(config.clone())) } -fn trigger_routes(config: Arc) -> Router { +fn trigger_router(config: Arc) -> Router { Router::new() .route("/:id/process", get(handle_process_job_request)) .route("/:id/verify", get(handle_verify_job_request)) diff --git a/crates/orchestrator/src/routes/mod.rs b/crates/orchestrator/src/routes/mod.rs index 961989af..1799c88c 100644 --- a/crates/orchestrator/src/routes/mod.rs +++ b/crates/orchestrator/src/routes/mod.rs @@ -1,7 +1,15 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use app_routes::{app_router, handler_404}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use axum::Json; +use axum::{Json, Router}; +use job_routes::job_router; use serde::Serialize; +use utils::env_utils::get_env_var_or_default; + +use crate::config::Config; pub mod app_routes; pub mod job_routes; @@ -40,3 +48,28 @@ where (status, json).into_response() } } + +pub async fn setup_server(config: Arc) -> SocketAddr { + let (api_server_url, listener) = get_server_url().await; + + let job_routes = job_router(config.clone()); + let app_routes = app_router(); + let app = Router::new().merge(app_routes).merge(job_routes).fallback(handler_404); + + if let Err(e) = axum::serve(listener, app).await { + tracing::error!(service = "orchestrator", error = %e, "Server failed to start"); + panic!("Failed to start axum server: {}", e); + } + + api_server_url +} + +pub async fn get_server_url() -> (SocketAddr, tokio::net::TcpListener) { + let host = get_env_var_or_default("HOST", "127.0.0.1"); + let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); + let address = format!("{}:{}", host, port); + let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener"); + let api_server_url = listener.local_addr().expect("Unable to bind address to listener."); + + (api_server_url, listener) +} diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 8fe11648..77439103 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,5 +1,7 @@ +use std::net::SocketAddr; use std::sync::Arc; +use axum::Router; use da_client_interface::{DaClient, MockDaClient}; use httpmock::MockServer; use prover_client_interface::{MockProverClient, ProverClient}; @@ -15,6 +17,7 @@ use crate::config::{get_aws_config, Config, ProviderConfig}; use crate::data_storage::{DataStorage, MockDataStorage}; use crate::database::{Database, MockDatabase}; use crate::queue::{MockQueueProvider, QueueProvider}; +use crate::routes::{get_server_url, setup_server}; use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database}; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html @@ -22,6 +25,7 @@ use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database}; // Eg: We want to mock only the da client and leave rest to be as it is, use mock_da_client. pub enum MockType { + Server(Router), RpcUrl(Url), StarknetClient(Arc>), DaClient(Box), @@ -91,6 +95,8 @@ pub struct TestConfigBuilder { queue_type: ConfigType, /// Storage client storage_type: ConfigType, + /// API Service + api_server_type: ConfigType, } impl Default for TestConfigBuilder { @@ -100,10 +106,12 @@ impl Default for TestConfigBuilder { } pub struct TestConfigBuilderReturns { - pub server: Option, + pub starknet_server: Option, pub config: Arc, pub provider_config: Arc, + pub api_server_address: Option, } + impl TestConfigBuilder { /// Create a new config pub fn new() -> TestConfigBuilder { @@ -117,6 +125,7 @@ impl TestConfigBuilder { queue_type: ConfigType::default(), storage_type: ConfigType::default(), alerts_type: ConfigType::default(), + api_server_type: ConfigType::default(), } } @@ -164,6 +173,11 @@ impl TestConfigBuilder { self } + pub fn configure_api_server(mut self, api_server_type: ConfigType) -> TestConfigBuilder { + self.api_server_type = api_server_type; + self + } + pub async fn build(self) -> TestConfigBuilderReturns { dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file"); @@ -182,9 +196,10 @@ impl TestConfigBuilder { database_type, queue_type, storage_type, + api_server_type, } = self; - let (starknet_rpc_url, starknet_client, server) = + let (starknet_rpc_url, starknet_client, starknet_server) = implement_client::init_starknet_client(starknet_rpc_url_type, starknet_client_type).await; let alerts = implement_client::init_alerts(alerts_type, &settings_provider, provider_config.clone()).await; let da_client = implement_client::init_da_client(da_client_type, &settings_provider).await; @@ -221,7 +236,35 @@ impl TestConfigBuilder { alerts, )); - TestConfigBuilderReturns { server, config, provider_config: provider_config.clone() } + let api_server_address = implement_api_server(api_server_type, config.clone()).await; + + TestConfigBuilderReturns { + starknet_server, + config, + provider_config: provider_config.clone(), + api_server_address, + } + } +} + +async fn implement_api_server(api_server_type: ConfigType, config: Arc) -> Option { + match api_server_type { + ConfigType::Mock(client) => { + if let MockType::Server(router) = client { + let (api_server_url, listener) = get_server_url().await; + let app = Router::new().merge(router); + + if let Err(e) = axum::serve(listener, app).await { + tracing::error!(service = "orchestrator", error = %e, "Server failed to start"); + panic!("Failed to start axum server: {}", e); + } + Some(api_server_url) + } else { + panic!(concat!("Mock client is not a ", stringify!($client_type))); + } + } + ConfigType::Actual => Some(setup_server(config.clone()).await), + ConfigType::Dummy => None, } } diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 5ce427a5..871318e7 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -53,7 +53,7 @@ async fn test_da_job_process_job_failure_on_small_blob_size( let state_update = MaybePendingStateUpdate::Update(state_update); let state_update = serde_json::to_value(&state_update).unwrap(); let response = json!({ "id": 640641,"jsonrpc":"2.0","result": state_update }); - let server = services.server.unwrap(); + let server = services.starknet_server.unwrap(); get_nonce_attached(&server, nonces_file.as_str()); let state_update_mock = server.mock(|when, then| { @@ -104,7 +104,7 @@ async fn test_da_job_process_job_failure_on_pending_block() { .configure_da_client(ConfigType::Actual) .build() .await; - let server = services.server.unwrap(); + let server = services.starknet_server.unwrap(); let internal_id = "1"; let pending_state_update = MaybePendingStateUpdate::PendingUpdate(PendingStateUpdate { @@ -196,7 +196,7 @@ async fn test_da_job_process_job_success( .configure_da_client(da_client.into()) .build() .await; - let server = services.server.unwrap(); + let server = services.starknet_server.unwrap(); let state_update = read_state_update_from_file(state_update_file.as_str()).expect("issue while reading"); diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index 680e6b4c..e90d6dc8 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -6,6 +6,10 @@ use chrono::{SubsecRound as _, Utc}; use hyper::{Body, Request}; use mockall::predicate::eq; use rstest::*; +use starknet::providers::jsonrpc::HttpTransport; +use starknet::providers::JsonRpcClient; +use url::Url; +use utils::env_utils::get_env_var_or_panic; use uuid::Uuid; use crate::config::Config; @@ -13,13 +17,34 @@ use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::{Job, MockJob}; use crate::queue::init_consumers; -use crate::tests::config::TestConfigBuilder; -use crate::tests::server::setup_server; +use crate::tests::config::{ConfigType, TestConfigBuilder}; + +#[fixture] +async fn setup_trigger() -> (SocketAddr, Arc) { + dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file"); + + let madara_url = get_env_var_or_panic("MADARA_RPC_URL"); + let provider = JsonRpcClient::new(HttpTransport::new( + Url::parse(madara_url.as_str().to_string().as_str()).expect("Failed to parse URL"), + )); + + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .configure_starknet_client(provider.into()) + .configure_api_server(ConfigType::Actual) + .build() + .await; + + let addr = services.api_server_address.unwrap(); + let config = services.config; + (addr, config) +} #[tokio::test] #[rstest] -async fn test_trigger_process_job(#[future] setup_server: (SocketAddr, Arc)) { - let (addr, config) = setup_server.await; +async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc)) { + let (addr, config) = setup_trigger.await; let job_type = JobType::DataSubmission; @@ -57,8 +82,8 @@ async fn test_trigger_process_job(#[future] setup_server: (SocketAddr, Arc)) { - let (addr, config) = setup_server.await; +async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc)) { + let (addr, config) = setup_trigger.await; let job_type = JobType::DataSubmission; diff --git a/crates/orchestrator/src/tests/server/mod.rs b/crates/orchestrator/src/tests/server/mod.rs index ae18c60b..2a12b848 100644 --- a/crates/orchestrator/src/tests/server/mod.rs +++ b/crates/orchestrator/src/tests/server/mod.rs @@ -1,64 +1,22 @@ pub mod job_routes; use std::io::Read; -use std::net::SocketAddr; -use std::sync::Arc; use axum::http::StatusCode; -use axum::Router; use hyper::body::Buf; use hyper::{Body, Request}; use rstest::*; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::JsonRpcClient; -use url::Url; -use utils::env_utils::{get_env_var_or_default, get_env_var_or_panic}; -use crate::config::Config; use crate::queue::init_consumers; -use crate::routes::app_routes::{app_router, handler_404}; -use crate::routes::job_routes::job_routes; use crate::tests::config::{ConfigType, TestConfigBuilder}; -#[fixture] -pub async fn setup_server() -> (SocketAddr, Arc) { - dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file"); - - let madara_url = get_env_var_or_panic("MADARA_RPC_URL"); - let provider = JsonRpcClient::new(HttpTransport::new( - Url::parse(madara_url.as_str().to_string().as_str()).expect("Failed to parse URL"), - )); - - let services = TestConfigBuilder::new() - .configure_database(ConfigType::Actual) - .configure_queue_client(ConfigType::Actual) - .configure_starknet_client(provider.into()) - .build() - .await; - - let host = get_env_var_or_default("HOST", "0.0.0.0"); - let port = get_env_var_or_default("PORT", "3000").parse::().expect("PORT must be a u16"); - let address = format!("{}:{}", host, port); - - let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener"); - let addr = listener.local_addr().expect("Unable to bind address to listener."); - - let job_routes = job_routes(services.config.clone()); - let app_routes = app_router(); - - let app = Router::new().merge(app_routes).merge(job_routes).fallback(handler_404); - - tokio::spawn(async move { - axum::serve(listener, app).await.expect("Failed to start axum server"); - }); - - (addr, services.config.clone()) -} - #[rstest] #[tokio::test] -async fn test_health_endpoint(#[future] setup_server: (SocketAddr, Arc)) { - let (addr, _) = setup_server.await; +async fn test_health_endpoint() { + dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file"); + + let services = TestConfigBuilder::new().configure_api_server(ConfigType::Actual).build().await; + let addr = services.api_server_address.unwrap(); let client = hyper::Client::new(); let response = client .request(Request::builder().uri(format!("http://{}/health", addr)).body(Body::empty()).unwrap())