From 0ad9bfb20c44aa4626b0755e21ddad129ef53c00 Mon Sep 17 00:00:00 2001 From: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> Date: Sat, 24 Aug 2024 15:00:25 +0530 Subject: [PATCH 1/3] show logs (#100) * show logs * fix CI * prettier * logs * remove no capture * pass secrets * yml fix * revert changes * prettier * Empty commit * change delay --- .github/workflows/coverage.yml | 13 ++++--------- .github/workflows/linters-cargo.yml | 1 - .github/workflows/linters.yml | 1 - .github/workflows/pull-request.yml | 4 +++- .github/workflows/rust-build.yml | 1 - CHANGELOG.md | 1 + crates/settlement-clients/ethereum/src/tests/mod.rs | 2 +- 7 files changed, 9 insertions(+), 14 deletions(-) diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index b841c16b..f6afb593 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -1,15 +1,10 @@ name: Rust Test & Coverage on: - pull_request_target: - branches: - - main - types: [opened, synchronize, reopened] - push: - branches-ignore: - - main workflow_call: - workflow_dispatch: + secrets: + ETHEREUM_BLAST_RPC_URL: + required: true jobs: coverage: @@ -74,7 +69,7 @@ jobs: - name: Run llvm-cov tests env: ETHEREUM_BLAST_RPC_URL: ${{ secrets.ETHEREUM_BLAST_RPC_URL }} - run: cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1 + run: RUST_LOG=debug RUST_BACKTRACE=1 cargo llvm-cov nextest --release --lcov --output-path lcov.info --test-threads=1 - name: Coveralls uses: coverallsapp/github-action@v2 diff --git a/.github/workflows/linters-cargo.yml b/.github/workflows/linters-cargo.yml index d3e5ff90..96172230 100644 --- a/.github/workflows/linters-cargo.yml +++ b/.github/workflows/linters-cargo.yml @@ -4,7 +4,6 @@ name: Task - Linters Cargo on: workflow_dispatch: workflow_call: - push: jobs: cargo-lint: diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 901b7f1f..42f8c8de 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -4,7 +4,6 @@ name: Task - Linters on: workflow_dispatch: workflow_call: - push: jobs: prettier: diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 2863c998..3698d8c5 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -2,7 +2,8 @@ name: Workflow - Pull Request on: - pull_request_target: + workflow_dispatch: + pull_request: branches: [main] push: branches: [main] @@ -24,3 +25,4 @@ jobs: name: Run coverage uses: ./.github/workflows/coverage.yml needs: rust_build + secrets: inherit diff --git a/.github/workflows/rust-build.yml b/.github/workflows/rust-build.yml index 1406606b..f7cb3e08 100644 --- a/.github/workflows/rust-build.yml +++ b/.github/workflows/rust-build.yml @@ -4,7 +4,6 @@ name: Task - Build Rust on: workflow_dispatch: workflow_call: - push: jobs: rust_build: diff --git a/CHANGELOG.md b/CHANGELOG.md index c6f1f690..60de9a90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Removed +- revert CI changes from settlement client PR. - `init_config` from all the tests. - `fetch_from_test` argument diff --git a/crates/settlement-clients/ethereum/src/tests/mod.rs b/crates/settlement-clients/ethereum/src/tests/mod.rs index 65694e46..078cbe53 100644 --- a/crates/settlement-clients/ethereum/src/tests/mod.rs +++ b/crates/settlement-clients/ethereum/src/tests/mod.rs @@ -228,7 +228,7 @@ async fn update_state_blob_with_impersonation_works(#[case] fork_block_no: u64) // Asserting, Expected to receive transaction hash. assert!(!update_state_result.is_empty(), "No transaction Hash received."); - sleep(Duration::from_secs(2)).await; + sleep(Duration::from_secs(5)).await; ethereum_settlement_client .wait_for_tx_finality(update_state_result.as_str()) .await From 77bf9c61d67c4a4b628b087568f709541e6747c7 Mon Sep 17 00:00:00 2001 From: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> Date: Mon, 26 Aug 2024 09:58:11 +0530 Subject: [PATCH 2/3] refactor aws and clean .env files (#98) * refactor aws * env cleanup and build_database_client * remove get_credentials_and_region_from_config * region for test case * fix localstack not working * add port and host to .env.test * fix eth pk * fix CI --------- Co-authored-by: Arun Jangra --- .env.example | 57 ++++++++-------- .env.test | 57 +++++++++------- CHANGELOG.md | 1 + crates/da-clients/ethereum/src/config.rs | 4 +- crates/orchestrator/src/config.rs | 33 +++++++-- .../src/data_storage/aws_s3/config.rs | 51 +------------- .../src/data_storage/aws_s3/mod.rs | 68 +++---------------- crates/orchestrator/src/tests/common/mod.rs | 7 +- .../src/tests/data_storage/mod.rs | 11 ++- .../settlement-clients/ethereum/src/config.rs | 13 ++-- 10 files changed, 120 insertions(+), 182 deletions(-) diff --git a/.env.example b/.env.example index 310ac77c..fc5dc314 100644 --- a/.env.example +++ b/.env.example @@ -1,50 +1,51 @@ +##### ORCHESTRATOR ##### + HOST= PORT= -DATABASE_URL= -MADARA_RPC_URL= -DA_LAYER= -SETTLEMENT_LAYER= -# Ethereum -ETHEREUM_PRIVATE_KEY= -ETHEREUM_RPC_URL= -MEMORY_PAGES_CONTRACT_ADDRESS= -STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS= +##### AWS CONFIG ##### +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_REGION= +AWS_ENDPOINT_URL= -# Starknet -STARKNET_PUBLIC_KEY= -STARNET_PRIVATE_KEY= -STARKNET_RPC_URL= -STARKNET_CAIRO_CORE_CONTRACT_ADDRESS= +##### STORAGE ##### -# MongoDB connection string -MONGODB_CONNECTION_STRING= +DATA_STORAGE= +AWS_S3_BUCKET_NAME= -# AWS -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= -AWS_DEFAULT_REGION= +##### QUEUE ##### -# SQS +QUEUE_PROVIDER= SQS_JOB_PROCESSING_QUEUE_URL= SQS_JOB_VERIFICATION_QUEUE_URL= SQS_JOB_HANDLE_FAILURE_QUEUE_URL= SQS_WORKER_TRIGGER_QUEUE_URL= -# S3 -AWS_S3_BUCKET_NAME= -AWS_S3_BUCKET_REGION= +##### DATABASE ##### +DATABASE= +MONGODB_CONNECTION_STRING= -# Ethereum Settlement -DEFAULT_SETTLEMENT_CLIENT_RPC= -DEFAULT_L1_CORE_CONTRACT_ADDRESS= +##### PROVER ##### -# Sharp Services +PROVER_SERVICE= SHARP_CUSTOMER_ID= +SHARP_URL= +# [IMP!!!] These are test certificates (they don't work) SHARP_USER_CRT= SHARP_USER_KEY= SHARP_SERVER_CRT= SHARP_PROOF_LAYOUT= +##### ON CHAIN CONFIG ##### + +DA_LAYER= +SETTLEMENT_LAYER= +ETHEREUM_RPC_URL= +MADARA_RPC_URL= +MEMORY_PAGES_CONTRACT_ADDRESS= +PRIVATE_KEY= +ETHEREUM_PRIVATE_KEY= +STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS= diff --git a/.env.test b/.env.test index 08b22c73..b87267ed 100644 --- a/.env.test +++ b/.env.test @@ -1,41 +1,36 @@ -##### AWS config ##### +##### ORCHESTRATOR ##### + +HOST=127.0.0.1 +PORT=3000 + +##### AWS CONFIG ##### AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY" -AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket" -AWS_S3_BUCKET_REGION="us-east-1" +AWS_REGION="us-east-1" AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" + +##### STORAGE ##### + +DATA_STORAGE="s3" +AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket" + +##### QUEUE ##### + +QUEUE_PROVIDER="sqs" SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" SQS_JOB_HANDLE_FAILURE_QUEUE_URL= SQS_WORKER_TRIGGER_QUEUE_URL= -AWS_DEFAULT_REGION="localhost" - -##### On chain config ##### - -MADARA_RPC_URL="http://localhost:3000" -ETHEREUM_RPC_URL="http://localhost:3001" -MEMORY_PAGES_CONTRACT_ADDRESS="0x000000000000000000000000000000000001dead" -PRIVATE_KEY="0xdead" -# Private key of Test wallet provided by Anvil -ETHEREUM_PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" -STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0x000000000000000000000000000000000002dead" -##### Config URLs ##### +##### DATABASE ##### -DA_LAYER="ethereum" -PROVER_SERVICE="sharp" -SETTLEMENT_LAYER="ethereum" -DATA_STORAGE="s3" +DATABASE="mongodb" MONGODB_CONNECTION_STRING="mongodb://localhost:27017" -DEFAULT_SETTLEMENT_CLIENT_RPC="http://localhost:3000" -# Ethereum Settlement -DEFAULT_L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" -SHOULD_IMPERSONATE_ACCOUNT="true" -TEST_DUMMY_CONTRACT_ADDRESS="0xE5b6F5e695BA6E4aeD92B68c4CC8Df1160D69A81" +##### PROVER ##### -# Sharp Services +PROVER_SERVICE="sharp" SHARP_CUSTOMER_ID="sharp_consumer_id" SHARP_URL="http://127.0.0.1:5000" # [IMP!!!] These are test certificates (they don't work) @@ -43,3 +38,15 @@ SHARP_USER_CRT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUR4ekNDQXErZ0F3SUJBZ0lV SHARP_USER_KEY="LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2UUlCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktjd2dnU2pBZ0VBQW9JQkFRRFJ5bVQzWTJQdkFJL0gKVkwxaTBNU011ZDVQanhuRE9UbW5tODBscWd4aXhTVHpiM1Z0cUdkUVNiQTZJWVpueEIvc25RSVFLdjZPL09QWApKMStrOWt2VjVwSjZiN0o4SUVLeFBZTW95WHNOU3dCdHhwclVpRGRXeUNhSXNtOEFBOHVKVHNDNUZ1ZDdWcDhhClVaT1ZGWDAwbHltazBEY2hzSSt3cFN4NUdxOG1HbU8vV1BjaHRFYnhRSGFFZDdOS01wR3VnQ2xocm1mRGJvTWUKZXVxUHRtV0VNNjVzbmpFa2hXeHpMcG1VUzE2QjRwUEFDU25xN1o3QUEveXRVVUc2T0hKcDVhb1RFMmdTbXl3RgpGZVpzVktObWFNVHlGRHBnd1BlNjl0blVtdzNSVkhNZjNmRDZkTTFMSzZzRDZ3WHFwMGRjQ3ZEdU5TQm8rZHhiCnQxWlFzdjhkQWdNQkFBRUNnZ0VBQU9mcDFiT2xLOVFKeXVlUHhjeDIvTkNVcUMxTEJDL01FdkEyUzVKWGFWbkcKbGhLR0pFb1U0Q0RoVk83dUlLYVZLTFZvMjk4RHFHUnBLM1d0RVE1TE40bytXYTcveTA5c1drMlVzbWxrVWFOZwpSaGtVZEJSK2dLNXVsQ3FKRml2dUJoTEQvRWlnQ1VWUGZKS2JtNG96TnpYcjVSMU5ENlV1aWFtODdtenlFcTBLCmZsVXlhR0RZNGdIdFNBOVBENVBFYlUveFpKeitKaHk5T2l3aVRXV0MrSHoyb2c3UWRDRDE2RlhGcit2VHpQN0MKb2tFb0VDZFNPRWlMalVENjBhS2ZxRmFCVm5MTkVudC9QSytmY1RBM05mNGtSMnFDNk9ZWjVFb09zYm1ka29ZTgpyU3NJZW9XblMxOEhvekZud2w3Z05wTUtjNmRzQzRBTldOVDFsTkhCb1FLQmdRRHlaUDFJSlppZUh6NlExaUVTCm5zd2tnblZCQUQ0SlVLR1ZDMHA3dk4yclNDZXh4c05ZZXFPTEEyZGZCUGpOVjd3blFKcUgxT05XellOMUJVSUUKeThLTCtFZVl6Q3RZa21LL21wSGJIMzNjd2tJODBuMHJROU1BalZMTlJ2YVVEOWp1NFBsRzFqaEFZUVVyTkViZQpKRlVpSk83aDVQa1llZG50SitqSHFpQnRoUUtCZ1FEZGtPbndmL0szYk4xenR0bXZQd0VicjhkVWJjRVh5NDFOCkl5VWwrZW1WSlgzYktKM0duNDZnQ2RsTTdkYmpwS3JVZ3oxL2JsZTgvMkVFckJvSEFRNkMrU2pEaGhvL01CbnIKekZheTBoK3YxbjBnZnNNVzRoOEF4cEFwc25OYnh6K2g1Wm5uSnRTd0srUjB3U0VJVVEzRjAxL2hMWWhLQ2l5OApwbW5HQi9hU3VRS0JnRzdxd1cvVExGd214ZlYyMXBsenFzeUdHZXVObGRXalhOMGIxcEI2b3lDdW11TmhwYUFHCk5uSDFNOGNxT2tPVWd4ZWZHMWRPbGx6eEc5ZGZlWTlDUWhyVW1NYVZucndmK0NuZkxDRU43d1VtcXpLenl1MFMKVXlwc2dOaElRYXNNK1dLTjllTnhRVHBNYXhZVERONjMxM0VSWDNKazJZdFdydDh6cFBSQXFDZ1ZBb0dCQU54egpUa0NMbmJ6aFphbTNlZm9DenlCMEVma3dSdHBkSGxkc3E0NlFqTmRuK1VSd3NpTXBLR2lWeEE3bDZsU1B4NlV3CmU2VHA3Z1JQZUlHRWwxVDJ1VENacGZSODNtcVdlb1FCeVJXZE9nZmplcFkxYWZpL3ZhY3c2Y21ERTRKeXloNVUKYTMveFE5ZVJwSHFDbWxKREMxZ1V5eVlwL3B2a2FjUytNeW5sVEhHSkFvR0FQekdTSzdXOHBUYldSVEFoaTVrSQpwZk5kWk1tcnRodUxNT3F6TGhyRjZublpldk9OdTBoYXVhZktlVElFd2w0clhYZHFKQlJBaWZKMFFsLzZKWFFkCmd1VzFrZWk1Ui8rUFZ5eUhab042c3NXSTNWYklwUUloUmt6UENnTDZhbHEwSzFpT1dlV1lIOHdORGRRdlB1T2UKRkZPOEovSzNxV0NtWjU0ODBBbTNhT0U9Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0K" SHARP_SERVER_CRT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURhekNDQWxPZ0F3SUJBZ0lVRUR0Rjd0YVNTUnVQQTJ6Uk1aNWNzY2JCRm5jd0RRWUpLb1pJaHZjTkFRRUwKQlFBd1JURUxNQWtHQTFVRUJoTUNTVTR4RXpBUkJnTlZCQWdNQ2xOdmJXVXRVM1JoZEdVeElUQWZCZ05WQkFvTQpHRWx1ZEdWeWJtVjBJRmRwWkdkcGRITWdVSFI1SUV4MFpEQWVGdzB5TkRBNE1UTXhNekEzTVROYUZ3MHlOVEE0Ck1UTXhNekEzTVROYU1FVXhDekFKQmdOVkJBWVRBa2xPTVJNd0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1TRXcKSHdZRFZRUUtEQmhKYm5SbGNtNWxkQ0JYYVdSbmFYUnpJRkIwZVNCTWRHUXdnZ0VpTUEwR0NTcUdTSWIzRFFFQgpBUVVBQTRJQkR3QXdnZ0VLQW9JQkFRRFRHcEEwNEZ1QlNFaE5PNVYvMGxTaDkvSEgxeVRZT2dRVFdoOG43eDlRCnZGMHpvZFZueVFIdjE5elU5eVdia2xvOEkvOXFBVm9lRzdXTnpUVFg2Q295ZlNjb1YvazN0Q2UwVnVWMlFJTVQKdW82SzJSU05CVHB1TlNqNTlzUiszVTQ2OFRBQnY0YVpsYjU4TU5CRXM3MVRieVpLRHBGRVRkMkg3T0ZKajg4QQpNRi9MaXJkeDZPOFdZL0tDeisxd1ZXL1JRdytYYjRJSWx4bXJFOC9UZ3FNSEo4dFUxYkZiOWJNcTEvOTN5YWtJClU1V2J2NVhXKzFwZFVyTUFNcTFFaC9vZThMN2pFaFdvZXZrNzgyU0kwUk0xeG5MaEtrUUVBYXd6Zkg2ODZiR2YKUHQ3RkFIQ1pGaWJ4KzZzSkg0R1M3S25iK0x5bk9ud3phMWZPUXZEZmcvRm5BZ01CQUFHalV6QlJNQjBHQTFVZApEZ1FXQkJUYlFUdmlUTW1xNXlNK2ZJRVI4VjdTZk1pK3B6QWZCZ05WSFNNRUdEQVdnQlRiUVR2aVRNbXE1eU0rCmZJRVI4VjdTZk1pK3B6QVBCZ05WSFJNQkFmOEVCVEFEQVFIL01BMEdDU3FHU0liM0RRRUJDd1VBQTRJQkFRREYKTllyRnpBa2RIVkhjUkd5SUNsTi9IVGswaldOcTVSdTB1RUpDQ21Dbm9ZY1pRSTlDQlcwTkl3dGpZUkpTKzR1UwordWh4VWpSYTA5YXdOWDhvYmU0dDZjK25HRnhZMGZqamk0cGZnbU1kMWNJeGdsM3E3Nlp0ZkllRGR6alRLRXN1CjRFUTVadnEwMnJvTEZ0ZjEvL3dRVG0xNkNKdFpGWnhNZ1phYnNxc2JRc3M2dWdMUGtTTmdBWjI1L2VhcWhnQ20KTjFUV2FxL0xJMVBLSkxPK085NFlMa2FsNVpyOTJCOXk4Q0VKVUVuSTA1R1N1MmJUOFM2a0ZBMEpadEszTW9SbwpqRWZWV1lQVHR5TFR4amNvRndCcDlHaXZYSDdSdHBxMDlmSmFhU1pNekxmNGlyNHpBdXprbExBNWZvampPNXlKCllnYlVaQUU2aS81N1NFWjR3VmxTCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" SHARP_PROOF_LAYOUT="small" + +##### ON CHAIN CONFIG ##### + +DA_LAYER="ethereum" +SETTLEMENT_LAYER="ethereum" +SETTLEMENT_RPC_URL="http://localhost:3001" +MADARA_RPC_URL="http://localhost:3000" +L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" +MEMORY_PAGES_CONTRACT_ADDRESS="0x000000000000000000000000000000000001dead" +PRIVATE_KEY="0xdead" +ETHEREUM_PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" +STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0x000000000000000000000000000000000002dead" diff --git a/CHANGELOG.md b/CHANGELOG.md index 60de9a90..952e7162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Changed +- refactor AWS config usage and clean .env files - GitHub's coverage CI yml file for localstack and db testing. - Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder. - `.env` file requires two more variables which are queue urls for processing diff --git a/crates/da-clients/ethereum/src/config.rs b/crates/da-clients/ethereum/src/config.rs index b50604b2..34613486 100644 --- a/crates/da-clients/ethereum/src/config.rs +++ b/crates/da-clients/ethereum/src/config.rs @@ -19,14 +19,14 @@ pub struct EthereumDaConfig { impl DaConfig for EthereumDaConfig { fn new_from_env() -> Self { Self { - rpc_url: get_env_var_or_panic("ETHEREUM_RPC_URL"), + rpc_url: get_env_var_or_panic("SETTLEMENT_RPC_URL"), memory_pages_contract: get_env_var_or_panic("MEMORY_PAGES_CONTRACT_ADDRESS"), private_key: get_env_var_or_panic("PRIVATE_KEY"), } } async fn build_client(&self) -> EthereumDaClient { let client = - RpcClient::new_http(Url::from_str(self.rpc_url.as_str()).expect("Failed to parse ETHEREUM_RPC_URL")); + RpcClient::new_http(Url::from_str(self.rpc_url.as_str()).expect("Failed to parse SETTLEMENT_RPC_URL")); let provider = ProviderBuilder::<_, Ethereum>::new().on_client(client); EthereumDaClient { provider } diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 447b25ec..e3b7b975 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,9 +1,10 @@ use std::sync::Arc; -use crate::data_storage::aws_s3::config::{AWSS3Config, AWSS3ConfigType}; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; use arc_swap::{ArcSwap, Guard}; +use aws_config::SdkConfig; use da_client_interface::{DaClient, DaConfig}; use dotenvy::dotenv; use ethereum_da_client::config::EthereumDaConfig; @@ -54,10 +55,16 @@ pub async fn init_config() -> Config { )); // init database - let database = Box::new(MongoDb::new(MongoDbConfig::new_from_env()).await); + let database = build_database_client().await; + + // init AWS + let aws_config = aws_config::load_from_env().await; // init the queue - let queue = Box::new(SqsQueue {}); + // TODO: we use omniqueue for now which doesn't support loading AWS config + // from `SdkConfig`. We can later move to using `aws_sdk_sqs`. This would require + // us stop using the generic omniqueue abstractions for message ack/nack + let queue = build_queue_client(&aws_config); let da_client = build_da_client().await; @@ -65,7 +72,7 @@ pub async fn init_config() -> Config { let settlement_client = build_settlement_client(&settings_provider).await; let prover_client = build_prover_service(&settings_provider); - let storage_client = build_storage_client().await; + let storage_client = build_storage_client(&aws_config).await; Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client) } @@ -177,9 +184,23 @@ pub async fn build_settlement_client( } } -pub async fn build_storage_client() -> Box { +pub async fn build_storage_client(aws_config: &SdkConfig) -> Box { match get_env_var_or_panic("DATA_STORAGE").as_str() { - "s3" => Box::new(AWSS3::new(AWSS3ConfigType::WithoutEndpoint(AWSS3Config::new_from_env())).await), + "s3" => Box::new(AWSS3::new(AWSS3Config::new_from_env(), aws_config)), _ => panic!("Unsupported Storage Client"), } } + +pub fn build_queue_client(_aws_config: &SdkConfig) -> Box { + match get_env_var_or_panic("QUEUE_PROVIDER").as_str() { + "sqs" => Box::new(SqsQueue {}), + _ => panic!("Unsupported Queue Client"), + } +} + +pub async fn build_database_client() -> Box { + match get_env_var_or_panic("DATABASE").as_str() { + "mongodb" => Box::new(MongoDb::new(MongoDbConfig::new_from_env()).await), + _ => panic!("Unsupported Database Client"), + } +} diff --git a/crates/orchestrator/src/data_storage/aws_s3/config.rs b/crates/orchestrator/src/data_storage/aws_s3/config.rs index 0970d80d..665caeb4 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/config.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/config.rs @@ -2,64 +2,17 @@ use utils::env_utils::get_env_var_or_panic; use crate::data_storage::DataStorageConfig; -/// Represents the type of the config which one wants to pass to create the client -#[derive(Clone)] -pub enum AWSS3ConfigType { - WithEndpoint(S3LocalStackConfig), - WithoutEndpoint(AWSS3Config), -} - /// Represents AWS S3 config struct with all the necessary variables. #[derive(Clone)] pub struct AWSS3Config { - /// AWS ACCESS KEY ID - pub s3_key_id: String, - /// AWS ACCESS KEY SECRET - pub s3_key_secret: String, /// S3 Bucket Name - pub s3_bucket_name: String, - /// S3 Bucket region - pub s3_bucket_region: String, -} - -/// Represents AWS S3 config struct with all the necessary variables. -#[derive(Clone)] -pub struct S3LocalStackConfig { - /// AWS ACCESS KEY ID - pub s3_key_id: String, - /// AWS ACCESS KEY SECRET - pub s3_key_secret: String, - /// S3 Bucket Name - pub s3_bucket_name: String, - /// S3 Bucket region - pub s3_bucket_region: String, - /// Endpoint url - pub endpoint_url: String, + pub bucket_name: String, } /// Implementation of `DataStorageConfig` for `AWSS3Config` impl DataStorageConfig for AWSS3Config { /// To return the config struct by creating it from the environment variables. fn new_from_env() -> Self { - Self { - s3_key_id: get_env_var_or_panic("AWS_ACCESS_KEY_ID"), - s3_key_secret: get_env_var_or_panic("AWS_SECRET_ACCESS_KEY"), - s3_bucket_name: get_env_var_or_panic("AWS_S3_BUCKET_NAME"), - s3_bucket_region: get_env_var_or_panic("AWS_S3_BUCKET_REGION"), - } - } -} - -/// Implementation of `DataStorageConfig` for `S3LocalStackConfig` -impl DataStorageConfig for S3LocalStackConfig { - /// To return the config struct by creating it from the environment variables. - fn new_from_env() -> Self { - Self { - s3_key_id: get_env_var_or_panic("AWS_ACCESS_KEY_ID"), - s3_key_secret: get_env_var_or_panic("AWS_SECRET_ACCESS_KEY"), - s3_bucket_name: get_env_var_or_panic("AWS_S3_BUCKET_NAME"), - s3_bucket_region: get_env_var_or_panic("AWS_S3_BUCKET_REGION"), - endpoint_url: get_env_var_or_panic("AWS_ENDPOINT_URL"), - } + Self { bucket_name: get_env_var_or_panic("AWS_S3_BUCKET_NAME") } } } diff --git a/crates/orchestrator/src/data_storage/aws_s3/mod.rs b/crates/orchestrator/src/data_storage/aws_s3/mod.rs index 990a5d0a..0f920c7a 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/mod.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/mod.rs @@ -1,7 +1,7 @@ -use crate::data_storage::aws_s3::config::AWSS3ConfigType; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::DataStorage; use async_trait::async_trait; -use aws_sdk_s3::config::{Builder, Credentials, Region}; +use aws_config::SdkConfig; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::Client; use bytes::Bytes; @@ -13,7 +13,7 @@ pub mod config; /// AWSS3 represents AWS S3 client object containing the client and the config itself. pub struct AWSS3 { client: Client, - config: AWSS3ConfigType, + bucket: String, } /// Implementation for AWS S3 client. Contains the function for : @@ -22,63 +22,17 @@ pub struct AWSS3 { impl AWSS3 { /// Initializes a new AWS S3 client by passing the config /// and returning it. - pub async fn new(config: AWSS3ConfigType) -> Self { - let (config_builder, config) = match config { - AWSS3ConfigType::WithoutEndpoint(config) => { - let (credentials, region) = get_credentials_and_region_from_config( - config.s3_key_id.clone(), - config.s3_key_secret.clone(), - config.s3_bucket_region.clone(), - ); - ( - Builder::new().region(region).credentials_provider(credentials).force_path_style(true), - AWSS3ConfigType::WithoutEndpoint(config), - ) - } - AWSS3ConfigType::WithEndpoint(config) => { - let (credentials, region) = get_credentials_and_region_from_config( - config.s3_key_id.clone(), - config.s3_key_secret.clone(), - config.s3_bucket_region.clone(), - ); - ( - Builder::new() - .region(region) - .credentials_provider(credentials) - .force_path_style(true) - .endpoint_url(config.endpoint_url.clone()), - AWSS3ConfigType::WithEndpoint(config), - ) - } - }; - - let conf = config_builder.build(); - + pub fn new(s3_config: AWSS3Config, aws_config: &SdkConfig) -> Self { // Building AWS S3 config - let client = Client::from_conf(conf); - - Self { client, config } - } + let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config); - pub fn get_bucket_name(&self) -> String { - match self.config.clone() { - AWSS3ConfigType::WithEndpoint(config) => config.s3_bucket_name, - AWSS3ConfigType::WithoutEndpoint(config) => config.s3_bucket_name, - } + // this is necessary for it to work with localstack in test cases + s3_config_builder.set_force_path_style(Some(true)); + let client = Client::from_conf(s3_config_builder.build()); + Self { client, bucket: s3_config.bucket_name } } } -/// Return the constructed `Credentials` and `Region` -fn get_credentials_and_region_from_config( - s3_key_id: String, - s3_key_secret: String, - s3_bucket_region: String, -) -> (Credentials, Region) { - let credentials = Credentials::new(s3_key_id, s3_key_secret, None, None, "loaded_from_custom_env"); - let region = Region::new(s3_bucket_region); - (credentials, region) -} - /// Implementation of `DataStorage` for `AWSS3` /// contains the function for getting the data and putting the data /// by taking the key as an argument. @@ -86,7 +40,7 @@ fn get_credentials_and_region_from_config( impl DataStorage for AWSS3 { /// Function to get the data from S3 bucket by Key. async fn get_data(&self, key: &str) -> Result { - let response = self.client.get_object().bucket(self.get_bucket_name()).key(key).send().await?; + let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?; let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes."); let data_bytes = data_stream.into_bytes(); Ok(data_bytes) @@ -96,7 +50,7 @@ impl DataStorage for AWSS3 { async fn put_data(&self, data: Bytes, key: &str) -> Result<()> { self.client .put_object() - .bucket(self.get_bucket_name()) + .bucket(&self.bucket) .key(key) .body(ByteStream::from(data)) .content_type("application/json") diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 9d56b7e3..ee29ac34 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -7,8 +7,9 @@ use aws_config::Region; use mongodb::Client; use rstest::*; use serde::Deserialize; +use utils::env_utils::get_env_var_or_panic; -use crate::data_storage::aws_s3::config::{AWSS3ConfigType, S3LocalStackConfig}; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; use crate::database::mongodb::config::MongoDbConfig; @@ -84,5 +85,7 @@ pub struct MessagePayloadType { } pub async fn get_storage_client() -> Box { - Box::new(AWSS3::new(AWSS3ConfigType::WithEndpoint(S3LocalStackConfig::new_from_env())).await) + let aws_config = + aws_config::load_from_env().await.into_builder().endpoint_url(get_env_var_or_panic("AWS_ENDPOINT_URL")).build(); + Box::new(AWSS3::new(AWSS3Config::new_from_env(), &aws_config)) } diff --git a/crates/orchestrator/src/tests/data_storage/mod.rs b/crates/orchestrator/src/tests/data_storage/mod.rs index d127917a..a3055acb 100644 --- a/crates/orchestrator/src/tests/data_storage/mod.rs +++ b/crates/orchestrator/src/tests/data_storage/mod.rs @@ -1,7 +1,6 @@ -use crate::data_storage::aws_s3::config::{AWSS3ConfigType, S3LocalStackConfig}; +use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; -use crate::tests::config::TestConfigBuilder; use bytes::Bytes; use rstest::rstest; use serde_json::json; @@ -14,12 +13,12 @@ use utils::env_utils::get_env_var_or_panic; #[rstest] #[tokio::test] async fn test_put_and_get_data_s3() -> color_eyre::Result<()> { - TestConfigBuilder::new().build().await; - dotenvy::from_filename("../.env.test")?; - let config = S3LocalStackConfig::new_from_env(); - let s3_client = AWSS3::new(AWSS3ConfigType::WithEndpoint(config)).await; + let config = AWSS3Config::new_from_env(); + let aws_config = + aws_config::load_from_env().await.into_builder().endpoint_url(get_env_var_or_panic("AWS_ENDPOINT_URL")).build(); + let s3_client = AWSS3::new(config, &aws_config); s3_client.build_test_bucket(&get_env_var_or_panic("AWS_S3_BUCKET_NAME")).await.unwrap(); let mock_data = json!( diff --git a/crates/settlement-clients/ethereum/src/config.rs b/crates/settlement-clients/ethereum/src/config.rs index 294c2a54..3d038390 100644 --- a/crates/settlement-clients/ethereum/src/config.rs +++ b/crates/settlement-clients/ethereum/src/config.rs @@ -6,8 +6,8 @@ use url::Url; use utils::env_utils::get_env_var_or_panic; pub const ENV_CORE_CONTRACT_ADDRESS: &str = "STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS"; -pub const DEFAULT_SETTLEMENT_CLIENT_RPC: &str = "DEFAULT_SETTLEMENT_CLIENT_RPC"; -pub const DEFAULT_L1_CORE_CONTRACT_ADDRESS: &str = "DEFAULT_L1_CORE_CONTRACT_ADDRESS"; +pub const SETTLEMENT_RPC_URL: &str = "SETTLEMENT_RPC_URL"; +pub const L1_CORE_CONTRACT_ADDRESS: &str = "L1_CORE_CONTRACT_ADDRESS"; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EthereumSettlementConfig { @@ -17,9 +17,8 @@ pub struct EthereumSettlementConfig { impl SettlementConfig for EthereumSettlementConfig { fn new_from_env() -> Self { - let rpc_url = get_env_var_or_panic(DEFAULT_SETTLEMENT_CLIENT_RPC); - let rpc_url = - Url::from_str(&rpc_url).unwrap_or_else(|_| panic!("Failed to parse {}", DEFAULT_SETTLEMENT_CLIENT_RPC)); + let rpc_url = get_env_var_or_panic(SETTLEMENT_RPC_URL); + let rpc_url = Url::from_str(&rpc_url).unwrap_or_else(|_| panic!("Failed to parse {}", SETTLEMENT_RPC_URL)); let core_contract_address = get_env_var_or_panic(ENV_CORE_CONTRACT_ADDRESS); Self { rpc_url, core_contract_address } } @@ -28,8 +27,8 @@ impl SettlementConfig for EthereumSettlementConfig { impl Default for EthereumSettlementConfig { fn default() -> Self { Self { - rpc_url: get_env_var_or_panic(DEFAULT_SETTLEMENT_CLIENT_RPC).parse().unwrap(), - core_contract_address: get_env_var_or_panic(DEFAULT_L1_CORE_CONTRACT_ADDRESS), + rpc_url: get_env_var_or_panic(SETTLEMENT_RPC_URL).parse().unwrap(), + core_contract_address: get_env_var_or_panic(L1_CORE_CONTRACT_ADDRESS), } } } From f84ba4101f6ca3cc58f5fd375bfa0a9b1392028c Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 30 Aug 2024 12:18:27 +0530 Subject: [PATCH 3/3] Feat : alerts module (#95) * feat : alerts module init * feat : added alerts in jobs/mod.rs * feat : updated messages * feat : updated tests with alerts module mock * changelog : update * feat : added additional alerts * feat : added test for alerts and added logs at queue level * refactor : removed redundant alerts * feat : updated workflow * feat : resolved comments * feat : added all alerts for queues * feat : updated code * remove queue url const * feat : tests and lint fixes --------- Co-authored-by: apoorvsadana <95699312+apoorvsadana@users.noreply.github.com> --- .env.example | 3 +- .env.test | 23 +++++-- .github/workflows/coverage.yml | 2 +- CHANGELOG.md | 1 + Cargo.lock | 41 ++++++++++--- crates/orchestrator/Cargo.toml | 1 + crates/orchestrator/src/alerts/aws_sns/mod.rs | 27 ++++++++ crates/orchestrator/src/alerts/mod.rs | 11 ++++ crates/orchestrator/src/config.rs | 32 +++++++++- crates/orchestrator/src/jobs/mod.rs | 7 +-- crates/orchestrator/src/lib.rs | 2 + crates/orchestrator/src/queue/job_queue.rs | 14 +++++ crates/orchestrator/src/tests/alerts/mod.rs | 61 +++++++++++++++++++ crates/orchestrator/src/tests/common/mod.rs | 16 ++++- crates/orchestrator/src/tests/config.rs | 22 ++++++- crates/orchestrator/src/tests/mod.rs | 1 + 16 files changed, 240 insertions(+), 24 deletions(-) create mode 100644 crates/orchestrator/src/alerts/aws_sns/mod.rs create mode 100644 crates/orchestrator/src/alerts/mod.rs create mode 100644 crates/orchestrator/src/tests/alerts/mod.rs diff --git a/.env.example b/.env.example index fc5dc314..e0355037 100644 --- a/.env.example +++ b/.env.example @@ -33,7 +33,6 @@ MONGODB_CONNECTION_STRING= PROVER_SERVICE= SHARP_CUSTOMER_ID= SHARP_URL= -# [IMP!!!] These are test certificates (they don't work) SHARP_USER_CRT= SHARP_USER_KEY= SHARP_SERVER_CRT= @@ -48,4 +47,4 @@ MADARA_RPC_URL= MEMORY_PAGES_CONTRACT_ADDRESS= PRIVATE_KEY= ETHEREUM_PRIVATE_KEY= -STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS= +STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS= \ No newline at end of file diff --git a/.env.test b/.env.test index b87267ed..4d09e45b 100644 --- a/.env.test +++ b/.env.test @@ -9,6 +9,7 @@ AWS_ACCESS_KEY_ID="AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY" AWS_REGION="us-east-1" AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566" +AWS_DEFAULT_REGION="localhost" ##### STORAGE ##### @@ -20,8 +21,14 @@ AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket" QUEUE_PROVIDER="sqs" SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue" SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue" -SQS_JOB_HANDLE_FAILURE_QUEUE_URL= -SQS_WORKER_TRIGGER_QUEUE_URL= +SQS_JOB_HANDLE_FAILURE_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_handle_failure_queue" +SQS_WORKER_TRIGGER_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_worker_trigger_queue" + +##### SNS ##### +ALERTS="sns" +AWS_SNS_REGION="us-east-1" +AWS_SNS_ARN="arn:aws:sns:us-east-1:000000000000:madara-orchestrator-arn" +AWS_SNS_ARN_NAME="madara-orchestrator-arn" ##### DATABASE ##### @@ -46,7 +53,15 @@ SETTLEMENT_LAYER="ethereum" SETTLEMENT_RPC_URL="http://localhost:3001" MADARA_RPC_URL="http://localhost:3000" L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" -MEMORY_PAGES_CONTRACT_ADDRESS="0x000000000000000000000000000000000001dead" +MEMORY_PAGES_CONTRACT_ADDRESS="0x47312450B3Ac8b5b8e247a6bB6d523e7605bDb60" PRIVATE_KEY="0xdead" ETHEREUM_PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" -STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0x000000000000000000000000000000000002dead" +STARKNET_SOLIDITY_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" +DEFAULT_L1_CORE_CONTRACT_ADDRESS="0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4" +TEST_DUMMY_CONTRACT_ADDRESS="0xE5b6F5e695BA6E4aeD92B68c4CC8Df1160D69A81" +STARKNET_OPERATOR_ADDRESS="0x2C169DFe5fBbA12957Bdd0Ba47d9CEDbFE260CA7" +ETHEREUM_BLAST_RPC_URL="https://eth-mainnet.public.blastapi.io" + +##### E2E test vars ##### + +L2_BLOCK_NUMBER_FOR_TEST=671070 \ No newline at end of file diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index f6afb593..2284f89b 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -14,7 +14,7 @@ jobs: localstack: image: localstack/localstack env: - SERVICES: s3, sqs + SERVICES: s3, sqs, sns DEFAULT_REGION: us-east-1 AWS_ACCESS_KEY_ID: "AWS_ACCESS_KEY_ID" AWS_SECRET_ACCESS_KEY: "AWS_SECRET_ACCESS_KEY" diff --git a/CHANGELOG.md b/CHANGELOG.md index 952e7162..c79ba808 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- alerts module. - Tests for Settlement client. - Worker queues to listen for trigger events. - Tests for prover client. diff --git a/Cargo.lock b/Cargo.lock index 8414b14e..ccbd7a75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1697,9 +1697,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" +checksum = "f42c2d4218de4dcd890a109461e2f799a1a2ba3bcd2cde9af88360f5df9266c6" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -1713,6 +1713,7 @@ dependencies = [ "fastrand 2.1.0", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -1754,6 +1755,29 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sns" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dab2b9787b8d9d3094ace9585e785079cfc583199ec620ab067b599e8850c1a6" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sqs" version = "1.36.0" @@ -1957,9 +1981,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.2" +version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce87155eba55e11768b8c1afa607f3e864ae82f03caf63258b37455b0ad02537" +checksum = "0abbf454960d0db2ad12684a1640120e7557294b0ff8e2f11236290a1b293225" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1984,9 +2008,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30819352ed0a04ecf6a2f3477e344d2d1ba33d43e0f09ad9047c12e0d923616f" +checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -2001,9 +2025,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.0" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +checksum = "6cee7cadb433c781d3299b916fbf620fea813bf38f49db282fb6858141a05cc8" dependencies = [ "base64-simd", "bytes", @@ -6374,6 +6398,7 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "aws-sdk-sns", "aws-sdk-sqs", "axum 0.7.5", "axum-macros", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index e5121529..1d67b237 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -19,6 +19,7 @@ async-std = "1.12.0" async-trait = { workspace = true } aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } +aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] } aws-sdk-sqs = "1.36.0" axum = { workspace = true, features = ["macros"] } axum-macros = { workspace = true } diff --git a/crates/orchestrator/src/alerts/aws_sns/mod.rs b/crates/orchestrator/src/alerts/aws_sns/mod.rs new file mode 100644 index 00000000..9aa95f9a --- /dev/null +++ b/crates/orchestrator/src/alerts/aws_sns/mod.rs @@ -0,0 +1,27 @@ +use crate::alerts::Alerts; +use async_trait::async_trait; +use aws_sdk_sns::config::Region; +use aws_sdk_sns::Client; +use utils::env_utils::get_env_var_or_panic; + +pub struct AWSSNS { + client: Client, +} + +impl AWSSNS { + /// To create a new SNS client + pub async fn new() -> Self { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + AWSSNS { client: Client::new(&config) } + } +} + +#[async_trait] +impl Alerts for AWSSNS { + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()> { + let topic_arn = get_env_var_or_panic("AWS_SNS_ARN"); + self.client.publish().topic_arn(topic_arn).message(message_body).send().await?; + Ok(()) + } +} diff --git a/crates/orchestrator/src/alerts/mod.rs b/crates/orchestrator/src/alerts/mod.rs new file mode 100644 index 00000000..1e36129d --- /dev/null +++ b/crates/orchestrator/src/alerts/mod.rs @@ -0,0 +1,11 @@ +use async_trait::async_trait; +use mockall::automock; + +pub mod aws_sns; + +#[automock] +#[async_trait] +pub trait Alerts: Send + Sync { + /// To send an alert message to our alert service + async fn send_alert_message(&self, message_body: String) -> color_eyre::Result<()>; +} diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index e3b7b975..4f29cac5 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use crate::alerts::aws_sns::AWSSNS; +use crate::alerts::Alerts; use crate::data_storage::aws_s3::config::AWSS3Config; use crate::data_storage::aws_s3::AWSS3; use crate::data_storage::{DataStorage, DataStorageConfig}; @@ -43,6 +45,8 @@ pub struct Config { queue: Box, /// Storage client storage: Box, + /// Alerts client + alerts: Box, } /// Initializes the app config @@ -74,11 +78,23 @@ pub async fn init_config() -> Config { let storage_client = build_storage_client(&aws_config).await; - Config::new(Arc::new(provider), da_client, prover_client, settlement_client, database, queue, storage_client) + let alerts_client = build_alert_client().await; + + Config::new( + Arc::new(provider), + da_client, + prover_client, + settlement_client, + database, + queue, + storage_client, + alerts_client, + ) } impl Config { /// Create a new config + #[allow(clippy::too_many_arguments)] pub fn new( starknet_client: Arc>, da_client: Box, @@ -87,8 +103,9 @@ impl Config { database: Box, queue: Box, storage: Box, + alerts: Box, ) -> Self { - Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage } + Self { starknet_client, da_client, prover_client, settlement_client, database, queue, storage, alerts } } /// Returns the starknet client @@ -125,6 +142,11 @@ impl Config { pub fn storage(&self) -> &dyn DataStorage { self.storage.as_ref() } + + /// Returns the alerts client + pub fn alerts(&self) -> &dyn Alerts { + self.alerts.as_ref() + } } /// The app config. It can be accessed from anywhere inside the service. @@ -191,6 +213,12 @@ pub async fn build_storage_client(aws_config: &SdkConfig) -> Box Box { + match get_env_var_or_panic("ALERTS").as_str() { + "sns" => Box::new(AWSSNS::new().await), + _ => panic!("Unsupported Alert Client"), + } +} pub fn build_queue_client(_aws_config: &SdkConfig) -> Box { match get_env_var_or_panic("QUEUE_PROVIDER").as_str() { "sqs" => Box::new(SqsQueue {}), diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e4ae1ed6..11da33d8 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -139,7 +139,8 @@ pub async fn create_job( } let job_handler = factory::get_job_handler(&job_type).await; - let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; + let job_item = job_handler.create_job(config.as_ref(), internal_id.clone(), metadata.clone()).await?; + config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; @@ -173,6 +174,7 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { let job_handler = factory::get_job_handler(&job.job_type).await; let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; + let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; // Fetching the job again because update status above will update the job version @@ -239,8 +241,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { ); add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); - } else { - // TODO: send alert } } JobVerificationStatus::Pending => { @@ -248,7 +248,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; if verify_attempts >= job_handler.max_verification_attempts() { - // TODO: send alert log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); config .database() diff --git a/crates/orchestrator/src/lib.rs b/crates/orchestrator/src/lib.rs index 3d02378b..4212f381 100644 --- a/crates/orchestrator/src/lib.rs +++ b/crates/orchestrator/src/lib.rs @@ -1,3 +1,5 @@ +/// Contains the trait implementations for alerts +pub mod alerts; /// Config of the service. Contains configurations for DB, Queues and other services. pub mod config; pub mod constants; diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index d6769aea..44d492da 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -163,6 +163,13 @@ where } Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + match message.nack().await { Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, @@ -201,6 +208,13 @@ where } Err(e) => { log::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e); + config() + .await + .alerts() + .send_alert_message(e.to_string()) + .await + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; + message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; Err(ConsumptionError::FailedToSpawnWorker { worker_trigger_type: job_message.worker, diff --git a/crates/orchestrator/src/tests/alerts/mod.rs b/crates/orchestrator/src/tests/alerts/mod.rs new file mode 100644 index 00000000..2026061b --- /dev/null +++ b/crates/orchestrator/src/tests/alerts/mod.rs @@ -0,0 +1,61 @@ +use crate::config::config; +use crate::tests::common::{get_sns_client, get_sqs_client}; +use crate::tests::config::TestConfigBuilder; +use aws_sdk_sqs::types::QueueAttributeName::QueueArn; +use rstest::rstest; +use std::time::Duration; +use tokio::time::sleep; +use utils::env_utils::get_env_var_or_panic; + +pub const SNS_ALERT_TEST_QUEUE: &str = "orchestrator_sns_alert_testing_queue"; + +#[rstest] +#[tokio::test] +async fn sns_alert_subscribe_to_topic_receive_alert_works() { + TestConfigBuilder::new().build().await; + + let sqs_client = get_sqs_client().await; + let queue = sqs_client.create_queue().queue_name(SNS_ALERT_TEST_QUEUE).send().await.unwrap(); + let queue_url = queue.queue_url().unwrap(); + + let sns_client = get_sns_client().await; + let config = config().await; + + let queue_attributes = + sqs_client.get_queue_attributes().queue_url(queue_url).attribute_names(QueueArn).send().await.unwrap(); + + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueArn).unwrap(); + + // subscribing the queue with the alerts + sns_client + .subscribe() + .topic_arn(get_env_var_or_panic("AWS_SNS_ARN").as_str()) + .protocol("sqs") + .endpoint(queue_arn) + .send() + .await + .unwrap(); + + let message_to_send = "Hello World :)"; + + // Getting sns client from the module + let alerts_client = config.alerts(); + // Sending the alert message + alerts_client.send_alert_message(message_to_send.to_string()).await.unwrap(); + + sleep(Duration::from_secs(5)).await; + + // Checking the queue for message + let receive_message_result = sqs_client + .receive_message() + .queue_url(queue_url) + .max_number_of_messages(1) + .send() + .await + .unwrap() + .messages + .unwrap(); + + assert_eq!(receive_message_result.len(), 1, "Alert message length assertion failed"); + assert!(receive_message_result[0].body.clone().unwrap().contains(message_to_send)); +} diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index ee29ac34..83cf7e68 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -4,6 +4,8 @@ use std::collections::HashMap; use ::uuid::Uuid; use aws_config::Region; +use aws_sdk_sns::error::SdkError; +use aws_sdk_sns::operation::create_topic::CreateTopicError; use mongodb::Client; use rstest::*; use serde::Deserialize; @@ -41,6 +43,18 @@ pub fn custom_job_item(default_job_item: JobItem, #[default(String::from("0"))] job_item } +pub async fn create_sns_arn() -> Result<(), SdkError> { + let sns_client = get_sns_client().await; + sns_client.create_topic().name(get_env_var_or_panic("AWS_SNS_ARN_NAME")).send().await?; + Ok(()) +} + +pub async fn get_sns_client() -> aws_sdk_sns::client::Client { + let sns_region = get_env_var_or_panic("AWS_SNS_REGION"); + let config = aws_config::from_env().region(Region::new(sns_region)).load().await; + aws_sdk_sns::Client::new(&config) +} + pub async fn drop_database() -> color_eyre::Result<()> { let db_client: Client = MongoDb::new(MongoDbConfig::new_from_env()).await.client(); // dropping all the collection. @@ -72,7 +86,7 @@ pub async fn create_sqs_queues() -> color_eyre::Result<()> { Ok(()) } -async fn get_sqs_client() -> aws_sdk_sqs::Client { +pub async fn get_sqs_client() -> aws_sdk_sqs::Client { // This function is for localstack. So we can hardcode the region for this as of now. let region_provider = Region::new("us-east-1"); let config = aws_config::from_env().region(region_provider).load().await; diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index 21510375..8424d6d7 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use crate::config::{build_da_client, build_prover_service, build_settlement_client, config_force_init, Config}; +use crate::config::{ + build_alert_client, build_da_client, build_prover_service, build_settlement_client, config_force_init, Config, +}; use crate::data_storage::DataStorage; use da_client_interface::DaClient; use httpmock::MockServer; +use crate::alerts::Alerts; use prover_client_interface::ProverClient; use settlement_client_interface::SettlementClient; use starknet::providers::jsonrpc::HttpTransport; @@ -17,7 +20,7 @@ use crate::database::mongodb::MongoDb; use crate::database::{Database, DatabaseConfig}; use crate::queue::sqs::SqsQueue; use crate::queue::QueueProvider; -use crate::tests::common::{create_sqs_queues, drop_database, get_storage_client}; +use crate::tests::common::{create_sns_arn, create_sqs_queues, drop_database, get_storage_client}; // Inspiration : https://rust-unofficial.github.io/patterns/patterns/creational/builder.html // TestConfigBuilder allows to heavily customise the global configs based on the test's requirement. @@ -39,6 +42,8 @@ pub struct TestConfigBuilder { queue: Option>, /// Storage client storage: Option>, + /// Alerts client + alerts: Option>, } impl Default for TestConfigBuilder { @@ -58,6 +63,7 @@ impl TestConfigBuilder { database: None, queue: None, storage: None, + alerts: None, } } @@ -96,6 +102,11 @@ impl TestConfigBuilder { self } + pub fn mock_alerts(mut self, alerts: Box) -> TestConfigBuilder { + self.alerts = Some(alerts); + self + } + pub async fn build(mut self) -> MockServer { dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); @@ -132,10 +143,16 @@ impl TestConfigBuilder { } } + if self.alerts.is_none() { + self.alerts = Some(build_alert_client().await); + } + // Deleting and Creating the queues in sqs. create_sqs_queues().await.expect("Not able to delete and create the queues."); // Deleting the database drop_database().await.expect("Unable to drop the database."); + // Creating the SNS ARN + create_sns_arn().await.expect("Unable to create the sns arn"); let config = Config::new( self.starknet_client.unwrap_or_else(|| { @@ -150,6 +167,7 @@ impl TestConfigBuilder { self.database.unwrap(), self.queue.unwrap_or_else(|| Box::new(SqsQueue {})), self.storage.unwrap(), + self.alerts.unwrap(), ); drop_database().await.unwrap(); diff --git a/crates/orchestrator/src/tests/mod.rs b/crates/orchestrator/src/tests/mod.rs index 1dbc21a2..4f264304 100644 --- a/crates/orchestrator/src/tests/mod.rs +++ b/crates/orchestrator/src/tests/mod.rs @@ -7,6 +7,7 @@ pub mod server; pub mod queue; +pub mod alerts; pub mod common; mod data_storage; pub mod workers;