Skip to content

Commit

Permalink
update: simpler api server setup
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Oct 24, 2024
1 parent d47ff8c commit 1c86624
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 81 deletions.
21 changes: 3 additions & 18 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use axum::Router;
use dotenvy::dotenv;
use orchestrator::config::init_config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_routes::{app_router, handler_404};
use orchestrator::routes::job_routes::job_routes;
use orchestrator::routes::setup_server;
use orchestrator::telemetry::{setup_analytics, shutdown_analytics};
use utils::env_utils::get_env_var_or_default;

/// Start the server
#[tokio::main]
Expand All @@ -24,15 +21,8 @@ async fn main() {
let config = init_config().await.expect("Config instantiation failed");
tracing::debug!(service = "orchestrator", "Configuration initialized");

let host = get_env_var_or_default("HOST", "127.0.0.1");
let port = get_env_var_or_default("PORT", "3000").parse::<u16>().expect("PORT must be a u16");
let address = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener");

let job_routes = job_routes(config.clone());
let app_routes = app_router();

let app = Router::new().merge(app_routes).merge(job_routes).fallback(handler_404);
// initialize the server
let _ = setup_server(config.clone()).await;

tracing::debug!(service = "orchestrator", "Application router initialized");

Expand All @@ -45,11 +35,6 @@ async fn main() {
}
}

if let Err(e) = axum::serve(listener, app).await {
tracing::error!(service = "orchestrator", error = %e, "Server failed to start");
panic!("Failed to start axum server: {}", e);
}

// Analytics Shutdown
shutdown_analytics(meter_provider);
tracing::info!(service = "orchestrator", "Orchestrator service shutting down");
Expand Down
6 changes: 3 additions & 3 deletions crates/orchestrator/src/routes/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ async fn handle_verify_job_request(
Err(e) => ApiResponse::<JobApiResponse>::error(e.to_string()).into_response(),
}
}
pub fn job_routes(config: Arc<Config>) -> Router {
Router::new().nest("/jobs", trigger_routes(config.clone()))
pub fn job_router(config: Arc<Config>) -> Router {
Router::new().nest("/jobs", trigger_router(config.clone()))
}

fn trigger_routes(config: Arc<Config>) -> Router {
fn trigger_router(config: Arc<Config>) -> Router {
Router::new()
.route("/:id/process", get(handle_process_job_request))
.route("/:id/verify", get(handle_verify_job_request))
Expand Down
35 changes: 34 additions & 1 deletion crates/orchestrator/src/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use std::net::SocketAddr;
use std::sync::Arc;

use app_routes::{app_router, handler_404};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use axum::{Json, Router};
use job_routes::job_router;
use serde::Serialize;
use utils::env_utils::get_env_var_or_default;

use crate::config::Config;

pub mod app_routes;
pub mod job_routes;
Expand Down Expand Up @@ -40,3 +48,28 @@ where
(status, json).into_response()
}
}

pub async fn setup_server(config: Arc<Config>) -> SocketAddr {
let (api_server_url, listener) = get_server_url().await;

let job_routes = job_router(config.clone());
let app_routes = app_router();
let app = Router::new().merge(app_routes).merge(job_routes).fallback(handler_404);

if let Err(e) = axum::serve(listener, app).await {
tracing::error!(service = "orchestrator", error = %e, "Server failed to start");
panic!("Failed to start axum server: {}", e);
}

api_server_url
}

pub async fn get_server_url() -> (SocketAddr, tokio::net::TcpListener) {
let host = get_env_var_or_default("HOST", "127.0.0.1");
let port = get_env_var_or_default("PORT", "3000").parse::<u16>().expect("PORT must be a u16");
let address = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener");
let api_server_url = listener.local_addr().expect("Unable to bind address to listener.");

(api_server_url, listener)
}
49 changes: 46 additions & 3 deletions crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::net::SocketAddr;
use std::sync::Arc;

use axum::Router;
use da_client_interface::{DaClient, MockDaClient};
use httpmock::MockServer;
use prover_client_interface::{MockProverClient, ProverClient};
Expand All @@ -15,13 +17,15 @@ use crate::config::{get_aws_config, Config, ProviderConfig};
use crate::data_storage::{DataStorage, MockDataStorage};
use crate::database::{Database, MockDatabase};
use crate::queue::{MockQueueProvider, QueueProvider};
use crate::routes::{get_server_url, setup_server};
use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database};

// Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html
// TestConfigBuilder allows to heavily customise the global configs based on the test's requirement.
// Eg: We want to mock only the da client and leave rest to be as it is, use mock_da_client.

pub enum MockType {
Server(Router),
RpcUrl(Url),
StarknetClient(Arc<JsonRpcClient<HttpTransport>>),
DaClient(Box<dyn DaClient>),
Expand Down Expand Up @@ -91,6 +95,8 @@ pub struct TestConfigBuilder {
queue_type: ConfigType,
/// Storage client
storage_type: ConfigType,
/// API Service
api_server_type: ConfigType,
}

impl Default for TestConfigBuilder {
Expand All @@ -100,10 +106,12 @@ impl Default for TestConfigBuilder {
}

pub struct TestConfigBuilderReturns {
pub server: Option<MockServer>,
pub starknet_server: Option<MockServer>,
pub config: Arc<Config>,
pub provider_config: Arc<ProviderConfig>,
pub api_server_address: Option<SocketAddr>,
}

impl TestConfigBuilder {
/// Create a new config
pub fn new() -> TestConfigBuilder {
Expand All @@ -117,6 +125,7 @@ impl TestConfigBuilder {
queue_type: ConfigType::default(),
storage_type: ConfigType::default(),
alerts_type: ConfigType::default(),
api_server_type: ConfigType::default(),
}
}

Expand Down Expand Up @@ -164,6 +173,11 @@ impl TestConfigBuilder {
self
}

pub fn configure_api_server(mut self, api_server_type: ConfigType) -> TestConfigBuilder {
self.api_server_type = api_server_type;
self
}

pub async fn build(self) -> TestConfigBuilderReturns {
dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file");

Expand All @@ -182,9 +196,10 @@ impl TestConfigBuilder {
database_type,
queue_type,
storage_type,
api_server_type,
} = self;

let (starknet_rpc_url, starknet_client, server) =
let (starknet_rpc_url, starknet_client, starknet_server) =
implement_client::init_starknet_client(starknet_rpc_url_type, starknet_client_type).await;
let alerts = implement_client::init_alerts(alerts_type, &settings_provider, provider_config.clone()).await;
let da_client = implement_client::init_da_client(da_client_type, &settings_provider).await;
Expand Down Expand Up @@ -221,7 +236,35 @@ impl TestConfigBuilder {
alerts,
));

TestConfigBuilderReturns { server, config, provider_config: provider_config.clone() }
let api_server_address = implement_api_server(api_server_type, config.clone()).await;

TestConfigBuilderReturns {
starknet_server,
config,
provider_config: provider_config.clone(),
api_server_address,
}
}
}

async fn implement_api_server(api_server_type: ConfigType, config: Arc<Config>) -> Option<SocketAddr> {
match api_server_type {
ConfigType::Mock(client) => {
if let MockType::Server(router) = client {
let (api_server_url, listener) = get_server_url().await;
let app = Router::new().merge(router);

if let Err(e) = axum::serve(listener, app).await {
tracing::error!(service = "orchestrator", error = %e, "Server failed to start");
panic!("Failed to start axum server: {}", e);
}
Some(api_server_url)
} else {
panic!(concat!("Mock client is not a ", stringify!($client_type)));
}
}
ConfigType::Actual => Some(setup_server(config.clone()).await),
ConfigType::Dummy => None,
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/orchestrator/src/tests/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn test_da_job_process_job_failure_on_small_blob_size(
let state_update = MaybePendingStateUpdate::Update(state_update);
let state_update = serde_json::to_value(&state_update).unwrap();
let response = json!({ "id": 640641,"jsonrpc":"2.0","result": state_update });
let server = services.server.unwrap();
let server = services.starknet_server.unwrap();
get_nonce_attached(&server, nonces_file.as_str());

let state_update_mock = server.mock(|when, then| {
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn test_da_job_process_job_failure_on_pending_block() {
.configure_da_client(ConfigType::Actual)
.build()
.await;
let server = services.server.unwrap();
let server = services.starknet_server.unwrap();
let internal_id = "1";

let pending_state_update = MaybePendingStateUpdate::PendingUpdate(PendingStateUpdate {
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn test_da_job_process_job_success(
.configure_da_client(da_client.into())
.build()
.await;
let server = services.server.unwrap();
let server = services.starknet_server.unwrap();

let state_update = read_state_update_from_file(state_update_file.as_str()).expect("issue while reading");

Expand Down
37 changes: 31 additions & 6 deletions crates/orchestrator/src/tests/server/job_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,45 @@ use chrono::{SubsecRound as _, Utc};
use hyper::{Body, Request};
use mockall::predicate::eq;
use rstest::*;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use url::Url;
use utils::env_utils::get_env_var_or_panic;
use uuid::Uuid;

use crate::config::Config;
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{Job, MockJob};
use crate::queue::init_consumers;
use crate::tests::config::TestConfigBuilder;
use crate::tests::server::setup_server;
use crate::tests::config::{ConfigType, TestConfigBuilder};

#[fixture]
async fn setup_trigger() -> (SocketAddr, Arc<Config>) {
dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file");

let madara_url = get_env_var_or_panic("MADARA_RPC_URL");
let provider = JsonRpcClient::new(HttpTransport::new(
Url::parse(madara_url.as_str().to_string().as_str()).expect("Failed to parse URL"),
));

let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.configure_starknet_client(provider.into())
.configure_api_server(ConfigType::Actual)
.build()
.await;

let addr = services.api_server_address.unwrap();
let config = services.config;
(addr, config)
}

#[tokio::test]
#[rstest]
async fn test_trigger_process_job(#[future] setup_server: (SocketAddr, Arc<Config>)) {
let (addr, config) = setup_server.await;
async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc<Config>)) {
let (addr, config) = setup_trigger.await;

let job_type = JobType::DataSubmission;

Expand Down Expand Up @@ -57,8 +82,8 @@ async fn test_trigger_process_job(#[future] setup_server: (SocketAddr, Arc<Confi

#[tokio::test]
#[rstest]
async fn test_trigger_verify_job(#[future] setup_server: (SocketAddr, Arc<Config>)) {
let (addr, config) = setup_server.await;
async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc<Config>)) {
let (addr, config) = setup_trigger.await;

let job_type = JobType::DataSubmission;

Expand Down
52 changes: 5 additions & 47 deletions crates/orchestrator/src/tests/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,64 +1,22 @@
pub mod job_routes;
use std::io::Read;
use std::net::SocketAddr;
use std::sync::Arc;

use axum::http::StatusCode;
use axum::Router;
use hyper::body::Buf;
use hyper::{Body, Request};
use rstest::*;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use url::Url;
use utils::env_utils::{get_env_var_or_default, get_env_var_or_panic};

use crate::config::Config;
use crate::queue::init_consumers;
use crate::routes::app_routes::{app_router, handler_404};
use crate::routes::job_routes::job_routes;
use crate::tests::config::{ConfigType, TestConfigBuilder};

#[fixture]
pub async fn setup_server() -> (SocketAddr, Arc<Config>) {
dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file");

let madara_url = get_env_var_or_panic("MADARA_RPC_URL");
let provider = JsonRpcClient::new(HttpTransport::new(
Url::parse(madara_url.as_str().to_string().as_str()).expect("Failed to parse URL"),
));

let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.configure_starknet_client(provider.into())
.build()
.await;

let host = get_env_var_or_default("HOST", "0.0.0.0");
let port = get_env_var_or_default("PORT", "3000").parse::<u16>().expect("PORT must be a u16");
let address = format!("{}:{}", host, port);

let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener");
let addr = listener.local_addr().expect("Unable to bind address to listener.");

let job_routes = job_routes(services.config.clone());
let app_routes = app_router();

let app = Router::new().merge(app_routes).merge(job_routes).fallback(handler_404);

tokio::spawn(async move {
axum::serve(listener, app).await.expect("Failed to start axum server");
});

(addr, services.config.clone())
}

#[rstest]
#[tokio::test]
async fn test_health_endpoint(#[future] setup_server: (SocketAddr, Arc<Config>)) {
let (addr, _) = setup_server.await;
async fn test_health_endpoint() {
dotenvy::from_filename("../.env.test").expect("Failed to load the .env.test file");

let services = TestConfigBuilder::new().configure_api_server(ConfigType::Actual).build().await;

let addr = services.api_server_address.unwrap();
let client = hyper::Client::new();
let response = client
.request(Request::builder().uri(format!("http://{}/health", addr)).body(Body::empty()).unwrap())
Expand Down

0 comments on commit 1c86624

Please sign in to comment.