From b1ef61d651ff9463a6dd20581a369df8ee9d3791 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Wed, 31 Jul 2024 17:25:41 +0530 Subject: [PATCH 01/27] update: added thiserror JobError --- Cargo.lock | 1 + crates/orchestrator/Cargo.toml | 1 + crates/orchestrator/src/jobs/mod.rs | 82 +++++++++++++++++++++-------- 3 files changed, 62 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6dc64ecd..1295d5bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6315,6 +6315,7 @@ name = "orchestrator" version = "0.1.0" dependencies = [ "alloy 0.1.2", + "anyhow", "arc-swap", "assert_matches", "async-std", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index b6566e8a..d9430af2 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -16,6 +16,7 @@ alloy = { workspace = true } arc-swap = { workspace = true } assert_matches = "1.5.0" async-std = "1.12.0" +anyhow = "1.0" async-trait = { workspace = true } aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 3a90d2fa..c9c5aa91 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::num::ParseIntError; use std::time::Duration; use async_trait::async_trait; @@ -6,6 +7,7 @@ use color_eyre::eyre::eyre; use color_eyre::Result; use mockall::automock; use mockall_double::double; +use color_eyre::eyre::Context; use tracing::log; use uuid::Uuid; @@ -23,6 +25,8 @@ pub mod proving_job; pub mod register_proof_job; pub mod snos_job; pub mod state_update_job; +pub mod types; +use thiserror::Error; /// The Job trait is used to define the methods that a job /// should implement to be used as a job for the orchestrator. The orchestrator automatically @@ -36,15 +40,15 @@ pub trait Job: Send + Sync { config: &Config, internal_id: String, metadata: HashMap, - ) -> Result; + ) -> color_eyre::Result; /// Should process the job and return the external_id which can be used to /// track the status of the job. For example, a DA job will submit the state diff /// to the DA layer and return the txn hash. - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result; + async fn process_job(&self, config: &Config, job: &mut JobItem) -> color_eyre::Result; /// Should verify the job and return the status of the verification. For example, /// a DA job will verify the inclusion of the state diff in the DA layer and return /// the status of the verification. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result; + async fn verify_job(&self, config: &Config, job: &mut JobItem) -> color_eyre::Result; /// Should return the maximum number of attempts to process the job. A new attempt is made /// every time the verification returns `JobVerificationStatus::Rejected` fn max_process_attempts(&self) -> u64; @@ -55,19 +59,18 @@ pub trait Job: Send + Sync { fn verification_polling_delay_seconds(&self) -> u64; } -pub mod types; /// Creates the job in the DB in the created state and adds it to the process queue -pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMap) -> Result<()> { +pub async fn create_job( + job_type: JobType, + internal_id: String, + metadata: HashMap, +) -> Result<(), JobError> { let config = config().await; let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?; if existing_job.is_some() { log::debug!("Job already exists for internal_id {:?} and job_type {:?}. Skipping.", internal_id, job_type); - return Err(eyre!( - "Job already exists for internal_id {:?} and job_type {:?}. Skipping.", - internal_id, - job_type - )); + return Err(JobError::JobAlreadyExists { internal_id, job_type }); } let job_handler = factory::get_job_handler(&job_type).await; @@ -80,7 +83,7 @@ pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMa /// Processes the job, increments the process attempt count and updates the status of the job in the /// DB. It then adds the job to the verification queue. -pub async fn process_job(id: Uuid) -> Result<()> { +pub async fn process_job(id: Uuid) -> Result<(), JobError> { let config = config().await; let mut job = get_job(id).await?; @@ -92,7 +95,7 @@ pub async fn process_job(id: Uuid) -> Result<()> { } _ => { log::error!("Invalid status {:?} for job with id {:?}. Cannot process.", id, job.status); - return Err(eyre!("Invalid status {:?} for job with id {:?}. Cannot process.", id, job.status)); + return Err(JobError::InvalidStatus { id, job_status: job.status }); } } // this updates the version of the job. this ensures that if another thread was about to process @@ -118,9 +121,9 @@ pub async fn process_job(id: Uuid) -> Result<()> { /// Verifies the job and updates the status of the job in the DB. If the verification fails, it /// retries processing the job if the max attempts have not been exceeded. If the max attempts have -/// been exceeded, it marks the job as timedout. If the verification is still pending, it pushes the +/// been exceeded, it marks the job as timed out. If the verification is still pending, it pushes the /// job back to the queue. -pub async fn verify_job(id: Uuid) -> Result<()> { +pub async fn verify_job(id: Uuid) -> Result<(), JobError> { let config = config().await; let mut job = get_job(id).await?; @@ -130,7 +133,7 @@ pub async fn verify_job(id: Uuid) -> Result<()> { } _ => { log::error!("Invalid status {:?} for job with id {:?}. Cannot verify.", id, job.status); - return Err(eyre!("Invalid status {:?} for job with id {:?}. Cannot verify.", id, job.status)); + return Err(JobError::InvalidStatus { id, job_status: job.status }); } } @@ -186,38 +189,73 @@ pub async fn verify_job(id: Uuid) -> Result<()> { Ok(()) } -async fn get_job(id: Uuid) -> Result { +fn get_job_handler(job_type: &JobType) -> Box { + match job_type { + JobType::DataSubmission => Box::new(da_job::DaJob), + JobType::SnosRun => Box::new(snos_job::SnosJob), + JobType::ProofCreation => Box::new(proving_job::ProvingJob), + JobType::StateTransition => Box::new(state_update_job::StateUpdateJob), + _ => unimplemented!("Job type not implemented yet."), + } +} + +async fn get_job(id: Uuid) -> Result { let config = config().await; let job = config.database().get_job_by_id(id).await?; match job { Some(job) => Ok(job), None => { log::error!("Failed to find job with id {:?}", id); - Err(eyre!("Failed to process job with id {:?}", id)) + Err(JobError::JobNotFound { id }) } } } -pub fn increment_key_in_metadata(metadata: &HashMap, key: &str) -> Result> { +fn increment_key_in_metadata( + metadata: &HashMap, + key: &str, +) -> Result, JobError> { let mut new_metadata = metadata.clone(); let attempt = get_u64_from_metadata(metadata, key)?; let incremented_value = attempt.checked_add(1); if incremented_value.is_none() { - return Err(eyre!("Incrementing key {} in metadata would exceed u64::MAX", key)); + return Err(JobError::Arithmetic(format!("Incrementing key {} in metadata would exceed u64::MAX", key))); } new_metadata.insert(key.to_string(), incremented_value.unwrap().to_string()); Ok(new_metadata) } -fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> Result { - Ok(metadata.get(key).unwrap_or(&"0".to_string()).parse::()?) +fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color_eyre::Result { + metadata + .get(key) + .unwrap_or(&"0".to_string()) + .parse::() + .wrap_err_with(|| format!("Failed to parse u64 from metadata key '{}'", key)) +} + +#[derive(Error, Debug)] +pub enum JobError { + #[error("Job already exists for internal_id {internal_id:?} and job_type {job_type:?}. Skipping!")] + JobAlreadyExists { internal_id: String, job_type: JobType }, + + #[error("Invalid status {id:?} for job with id {job_status:?}. Cannot process.")] + InvalidStatus { id: Uuid, job_status: JobStatus }, + + #[error("Failed to find job with id {id:?}")] + JobNotFound { id: Uuid }, + + #[error("Arithmetic error: {0}")] + Arithmetic(String), + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), } #[cfg(test)] mod tests { use super::*; - mod test_incremement_key_in_metadata { + mod test_increment_key_in_metadata { use super::*; #[test] From 11d3ad972d78a99760b48c5144fff3f030016849 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Wed, 31 Jul 2024 19:19:32 +0530 Subject: [PATCH 02/27] update: introducing DaError --- crates/orchestrator/src/jobs/da_job/mod.rs | 57 ++++++++++++---------- crates/orchestrator/src/jobs/mod.rs | 13 ++--- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 5e49d75c..0b9d2793 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -1,22 +1,21 @@ use std::collections::HashMap; use std::ops::{Add, Mul, Rem}; -use std::result::Result::{Err, Ok as OtherOk}; use std::str::FromStr; use async_trait::async_trait; -use color_eyre::eyre::{eyre, Ok}; -use color_eyre::Result; +use color_eyre::eyre::{eyre, WrapErr}; use lazy_static::lazy_static; use num_bigint::{BigUint, ToBigUint}; use num_traits::{Num, Zero}; -// use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry}; use starknet::providers::Provider; +use std::result::Result::Ok as StdOk; +use thiserror::Error; use tracing::log; use uuid::Uuid; use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use super::Job; +use super::{Job, JobError}; use crate::config::Config; use crate::constants::BLOB_DATA_FILE_NAME; @@ -49,7 +48,7 @@ impl Job for DaJob { _config: &Config, internal_id: String, metadata: HashMap, - ) -> Result { + ) -> Result { Ok(JobItem { id: Uuid::new_v4(), internal_id, @@ -61,19 +60,19 @@ impl Job for DaJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let block_no = job.internal_id.parse::()?; + async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + let block_no = job.internal_id.parse::().wrap_err_with(|| format!("Failed to parse u64"))?; - let state_update = config.starknet_client().get_state_update(BlockId::Number(block_no)).await?; + let state_update = config + .starknet_client() + .get_state_update(BlockId::Number(block_no)) + .await + .wrap_err_with(|| format!("Failed to get state Update."))?; let state_update = match state_update { MaybePendingStateUpdate::PendingUpdate(_) => { log::error!("Cannot process block {} for job id {} as it's still in pending state", block_no, job.id); - return Err(eyre!( - "Cannot process block {} for job id {} as it's still in pending state", - block_no, - job.id - )); + Err(DaError::BlockPending { block_no, job_id: job.id })? } MaybePendingStateUpdate::Update(state_update) => state_update, }; @@ -96,13 +95,7 @@ impl Job for DaJob { // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { - return Err(eyre!( - "Exceeded the maximum number of blobs per transaction: allowed {}, found {} for block {} and job id {}", - max_blob_per_txn, - current_blob_length, - block_no, - job.id - )); + Err(DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no, job_id: job.id })? } // making the txn to the DA layer @@ -111,7 +104,7 @@ impl Job for DaJob { Ok(external_id) } - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { Ok(config.da_client().verify_inclusion(job.external_id.unwrap_string()?).await?.into()) } @@ -169,7 +162,7 @@ pub fn convert_to_biguint(elements: Vec) -> Vec { biguint_vec } -fn data_to_blobs(blob_size: u64, block_data: Vec) -> Result>> { +fn data_to_blobs(blob_size: u64, block_data: Vec) -> color_eyre::Result>> { // Validate blob size if blob_size < 32 { return Err(eyre!( @@ -205,7 +198,7 @@ pub async fn state_update_to_blob_data( block_no: u64, state_update: StateUpdate, config: &Config, -) -> Result> { +) -> color_eyre::Result> { let state_diff = state_update.state_diff; let mut blob_data: Vec = vec![ FieldElement::from(state_diff.storage_diffs.len()), @@ -241,7 +234,7 @@ pub async fn state_update_to_blob_data( let get_current_nonce_result = config.starknet_client().get_nonce(BlockId::Number(block_no), addr).await; nonce = match get_current_nonce_result { - OtherOk(get_current_nonce) => Some(get_current_nonce), + StdOk(get_current_nonce) => Some(get_current_nonce), Err(e) => { log::error!("Failed to get nonce: {}", e); return Err(eyre!("Failed to get nonce: {}", e)); @@ -284,7 +277,7 @@ pub async fn state_update_to_blob_data( } /// To store the blob data using the storage client with path /blob_data.txt -async fn store_blob_data(blob_data: Vec, block_number: u64, config: &Config) -> Result<()> { +async fn store_blob_data(blob_data: Vec, block_number: u64, config: &Config) -> color_eyre::Result<()> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; let data_blob_big_uint = convert_to_biguint(blob_data.clone()); @@ -549,3 +542,15 @@ pub mod test { } } } + +#[derive(Error, Debug)] +pub enum DaError { + #[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")] + BlockPending { block_no: u64, job_id: Uuid }, + + #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] + MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index c9c5aa91..d66991dc 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -1,10 +1,7 @@ use std::collections::HashMap; -use std::num::ParseIntError; use std::time::Duration; use async_trait::async_trait; -use color_eyre::eyre::eyre; -use color_eyre::Result; use mockall::automock; use mockall_double::double; use color_eyre::eyre::Context; @@ -17,6 +14,7 @@ use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ use crate::jobs::job_handler_factory::factory; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue}; +use da_job::DaError; pub mod constants; pub mod da_job; @@ -40,15 +38,15 @@ pub trait Job: Send + Sync { config: &Config, internal_id: String, metadata: HashMap, - ) -> color_eyre::Result; + ) -> Result; /// Should process the job and return the external_id which can be used to /// track the status of the job. For example, a DA job will submit the state diff /// to the DA layer and return the txn hash. - async fn process_job(&self, config: &Config, job: &mut JobItem) -> color_eyre::Result; + async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result; /// Should verify the job and return the status of the verification. For example, /// a DA job will verify the inclusion of the state diff in the DA layer and return /// the status of the verification. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> color_eyre::Result; + async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result; /// Should return the maximum number of attempts to process the job. A new attempt is made /// every time the verification returns `JobVerificationStatus::Rejected` fn max_process_attempts(&self) -> u64; @@ -247,6 +245,9 @@ pub enum JobError { #[error("Arithmetic error: {0}")] Arithmetic(String), + #[error("DA Error: {0}")] + DaJobError(#[from] DaError), + #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } From cddd2a8d59f8843bebb62fa418f4c36c4e5e6a2c Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 1 Aug 2024 14:53:30 +0530 Subject: [PATCH 03/27] update: add ProverJobError --- crates/orchestrator/src/jobs/mod.rs | 6 ++- .../orchestrator/src/jobs/proving_job/mod.rs | 51 ++++++++++++++----- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index d66991dc..0b6e77e1 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -5,6 +5,8 @@ use async_trait::async_trait; use mockall::automock; use mockall_double::double; use color_eyre::eyre::Context; +use da_job::DaError; +use proving_job::ProvingError; use tracing::log; use uuid::Uuid; @@ -14,7 +16,6 @@ use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ use crate::jobs::job_handler_factory::factory; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue}; -use da_job::DaError; pub mod constants; pub mod da_job; @@ -248,6 +249,9 @@ pub enum JobError { #[error("DA Error: {0}")] DaJobError(#[from] DaError), + #[error("Proving Error: {0}")] + ProvingJobError(#[from] ProvingError), + #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index 3427f878..6b87b731 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -4,16 +4,16 @@ use std::str::FromStr; use async_trait::async_trait; use cairo_vm::vm::runners::cairo_pie::CairoPie; -use color_eyre::eyre::eyre; -use color_eyre::Result; +use color_eyre::eyre::WrapErr; use prover_client_interface::{Task, TaskStatus}; +use thiserror::Error; use tracing::log::log; use tracing::log::Level::Error; use uuid::Uuid; use super::constants::JOB_METADATA_CAIRO_PIE_PATH_KEY; use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use super::Job; +use super::{Job, JobError}; use crate::config::Config; pub struct ProvingJob; @@ -25,9 +25,10 @@ impl Job for ProvingJob { _config: &Config, internal_id: String, metadata: HashMap, - ) -> Result { + ) -> Result { if !metadata.contains_key(JOB_METADATA_CAIRO_PIE_PATH_KEY) { - return Err(eyre!("Cairo PIE path is not specified (prover job #{})", internal_id)); + // TODO: validate the usage of `.clone()` here, ensure lightweight borrowing of variables + Err(ProvingError::CairoPIEWrongPath { internal_id: internal_id.clone() })? } Ok(JobItem { id: Uuid::new_v4(), @@ -40,22 +41,31 @@ impl Job for ProvingJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { // TODO: allow to download PIE from storage - let cairo_pie_path: PathBuf = job + let cairo_pie_path = job .metadata .get(JOB_METADATA_CAIRO_PIE_PATH_KEY) .map(|s| PathBuf::from_str(s)) - .ok_or_else(|| eyre!("Cairo PIE path is not specified (prover job #{})", job.internal_id))??; - let cairo_pie = CairoPie::read_zip_file(&cairo_pie_path) - .expect("Not able to read the cairo PIE file from the zip file provided."); - let external_id = config.prover_client().submit_task(Task::CairoPie(cairo_pie)).await?; + .ok_or_else(|| ProvingError::CairoPIEWrongPath { internal_id: job.internal_id.clone() })? + .map_err(|_| ProvingError::CairoPIENotReadable)?; + + let cairo_pie = CairoPie::read_zip_file(&cairo_pie_path).map_err(|_| ProvingError::CairoPIENotReadable)?; + + let external_id = config + .prover_client() + .submit_task(Task::CairoPie(cairo_pie)) + .await + .wrap_err_with(|| format!("Prover Client Error"))?; Ok(external_id) } - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { let task_id: String = job.external_id.unwrap_string()?.into(); - match config.prover_client().get_task_status(&task_id).await? { + let task_status = + config.prover_client().get_task_status(&task_id).await.wrap_err_with(|| format!("Prover Client Error"))?; + + match task_status { TaskStatus::Processing => Ok(JobVerificationStatus::Pending), TaskStatus::Succeeded => Ok(JobVerificationStatus::Verified), TaskStatus::Failed(err) => { @@ -80,3 +90,18 @@ impl Job for ProvingJob { 60 } } + +#[derive(Error, Debug)] +pub enum ProvingError { + #[error("Cairo PIE path is not specified - prover job #{internal_id:?}")] + CairoPIEWrongPath { internal_id: String }, + + #[error("Not able to read the cairo PIE file from the zip file provided.")] + CairoPIENotReadable, + + #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] + MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} From 64b1559c318ee97bb419911c4aec12d35b46d755 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 1 Aug 2024 14:58:24 +0530 Subject: [PATCH 04/27] update: Register proof JobError --- crates/orchestrator/src/jobs/register_proof_job/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/jobs/register_proof_job/mod.rs b/crates/orchestrator/src/jobs/register_proof_job/mod.rs index 3105e8a3..e9a076dc 100644 --- a/crates/orchestrator/src/jobs/register_proof_job/mod.rs +++ b/crates/orchestrator/src/jobs/register_proof_job/mod.rs @@ -8,6 +8,8 @@ use crate::config::Config; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::Job; +use super::JobError; + pub struct RegisterProofJob; #[async_trait] @@ -17,7 +19,7 @@ impl Job for RegisterProofJob { _config: &Config, internal_id: String, metadata: HashMap, - ) -> Result { + ) -> Result { Ok(JobItem { id: Uuid::new_v4(), internal_id, @@ -31,14 +33,14 @@ impl Job for RegisterProofJob { }) } - async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { // Get proof from storage and submit on chain for verification // We need to implement a generic trait for this to support multiple // base layers todo!() } - async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { // verify that the proof transaction has been included on chain todo!() } From 1058d5ede8113e6bdcc56314f35b06891dfc2a66 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 1 Aug 2024 15:03:30 +0530 Subject: [PATCH 05/27] update: SNOS proof JobError --- crates/orchestrator/src/jobs/snos_job/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs index be6cfeee..ab048699 100644 --- a/crates/orchestrator/src/jobs/snos_job/mod.rs +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -8,6 +8,8 @@ use crate::config::Config; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::Job; +use super::JobError; + pub struct SnosJob; #[async_trait] @@ -17,7 +19,7 @@ impl Job for SnosJob { _config: &Config, internal_id: String, metadata: HashMap, - ) -> Result { + ) -> Result { Ok(JobItem { id: Uuid::new_v4(), internal_id, @@ -29,14 +31,14 @@ impl Job for SnosJob { }) } - async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn process_job(&self, _config: &Config, _job: &mut JobItem) -> Result { // 1. Fetch SNOS input data from Madara // 2. Import SNOS in Rust and execute it with the input data // 3. Store the received PIE in DB todo!() } - async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { + async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { // No need for verification as of now. If we later on decide to outsource SNOS run // to another servicehow a, verify_job can be used to poll on the status of the job todo!() From afb45a190331f300632da289fd425205818ac2cc Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 1 Aug 2024 17:17:44 +0530 Subject: [PATCH 06/27] update: JobError on Queue --- crates/orchestrator/src/jobs/mod.rs | 4 ++ .../src/jobs/state_update_job/mod.rs | 60 +++++++++++++++---- crates/orchestrator/src/queue/job_queue.rs | 16 ++--- crates/orchestrator/src/queue/mod.rs | 8 ++- 4 files changed, 65 insertions(+), 23 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 0b6e77e1..4fbe4dec 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -7,6 +7,7 @@ use mockall_double::double; use color_eyre::eyre::Context; use da_job::DaError; use proving_job::ProvingError; +use state_update_job::StateUpdateError; use tracing::log; use uuid::Uuid; @@ -252,6 +253,9 @@ pub enum JobError { #[error("Proving Error: {0}")] ProvingJobError(#[from] ProvingError), + #[error("Proving Error: {0}")] + StateUpdateJobError(#[from] StateUpdateError), + #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index d60c86d1..59b25200 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -8,6 +8,7 @@ use cairo_vm::Felt252; use color_eyre::eyre::eyre; use color_eyre::Result; use snos::io::output::StarknetOsOutput; +use thiserror::Error; use uuid::Uuid; use settlement_client_interface::SettlementVerificationStatus; @@ -16,6 +17,7 @@ use super::constants::{ JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX, JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO, JOB_PROCESS_ATTEMPT_METADATA_KEY, }; +use super::JobError; use crate::config::{config, Config}; use crate::constants::SNOS_OUTPUT_FILE_NAME; @@ -32,7 +34,7 @@ impl Job for StateUpdateJob { _config: &Config, internal_id: String, metadata: HashMap, - ) -> Result { + ) -> Result { Ok(JobItem { id: Uuid::new_v4(), internal_id, @@ -46,9 +48,11 @@ impl Job for StateUpdateJob { }) } - async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let attempt_no = - job.metadata.get(JOB_PROCESS_ATTEMPT_METADATA_KEY).expect("Could not find current attempt number.").clone(); + async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { + let attempt_no = job + .metadata + .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) + .ok_or_else(|| StateUpdateError::AttemptNumberNotFound)?; // Read the metadata to get the blocks for which state update will be performed. // We assume that blocks nbrs are formatted as follow: "2,3,4,5,6". @@ -57,17 +61,21 @@ impl Job for StateUpdateJob { // If we had a block state update failing last run, we recover from this block if let Some(last_failed_block) = job.metadata.get(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO) { - let last_failed_block: u64 = - last_failed_block.parse().expect("last_failed_block should be a positive number"); + let last_failed_block = + last_failed_block.parse().map_err(|_| StateUpdateError::LastFailedBlockNonPositive)?; + block_numbers = block_numbers.into_iter().filter(|&block| block >= last_failed_block).collect::>(); } let mut sent_tx_hashes: Vec = Vec::with_capacity(block_numbers.len()); for block_no in block_numbers.iter() { let snos = self.fetch_snos_for_block(*block_no).await; + let tx_hash = self.update_state_for_block(config, *block_no, snos).await.map_err(|e| { job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string()); + self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); + eyre!("Block #{block_no} - Error occured during the state update: {e}") })?; sent_tx_hashes.push(tx_hash); @@ -83,13 +91,16 @@ impl Job for StateUpdateJob { /// Status will be verified if: /// 1. the last settlement tx hash is successful, /// 2. the expected last settled block from our configuration is indeed the one found in the provider. - async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { - let attempt_no = - job.metadata.get(JOB_PROCESS_ATTEMPT_METADATA_KEY).expect("Could not find current attempt number.").clone(); + async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { + let attempt_no = job + .metadata + .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) + .ok_or_else(|| StateUpdateError::AttemptNumberNotFound)?; + let metadata_tx_hashes = job .metadata .get(&format!("{}{}", JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX, attempt_no)) - .expect("Could not find tx hashes metadata for the current attempt") + .ok_or_else(|| StateUpdateError::TxnHashMetadataNotFound)? .clone() .replace(' ', ""); @@ -115,7 +126,7 @@ impl Job for StateUpdateJob { return Ok(new_status.into()); } SettlementVerificationStatus::Pending => { - return Err(eyre!("Tx {tx_hash} should not be pending.")) + Err(StateUpdateError::TxnShouldNotBePending { tx_hash: tx_hash.to_string() })? } SettlementVerificationStatus::Verified => {} } @@ -124,7 +135,8 @@ impl Job for StateUpdateJob { } } // verify that the last settled block is indeed the one we expect to be - let expected_last_block_number = block_numbers.last().expect("Block numbers list should not be empty."); + let expected_last_block_number = block_numbers.last().ok_or_else(|| StateUpdateError::EmptyBlockNumberList)?; + let out_last_block_number = settlement_client.get_last_settled_block().await?; let block_status = if out_last_block_number == *expected_last_block_number { SettlementVerificationStatus::Verified @@ -222,3 +234,27 @@ impl StateUpdateJob { job.metadata.insert(new_attempt_metadata_key, tx_hashes.join(",")); } } + +#[derive(Error, Debug)] +pub enum StateUpdateError { + #[error("Block numbers list should not be empty.")] + EmptyBlockNumberList, + + #[error("Could not find current attempt number.")] + AttemptNumberNotFound, + + #[error("last_failed_block should be a positive number")] + LastFailedBlockNonPositive, + + #[error("Could not find tx hashes metadata for the current attempt")] + TxnHashMetadataNotFound, + + #[error("Tx {tx_hash:?} should not be pending.")] + TxnShouldNotBePending { tx_hash: String }, + + #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] + MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index aebba4bf..c7204feb 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::time::Duration; use color_eyre::eyre::eyre; -use color_eyre::Result; +use color_eyre::Result as EyreResult; use omniqueue::QueueError; use serde::{Deserialize, Serialize}; use tokio::time::sleep; @@ -10,7 +10,7 @@ use tracing::log; use uuid::Uuid; use crate::config::config; -use crate::jobs::{process_job, verify_job}; +use crate::jobs::{process_job, verify_job, JobError}; pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; @@ -20,20 +20,20 @@ pub struct JobQueueMessage { pub(crate) id: Uuid, } -pub async fn add_job_to_process_queue(id: Uuid) -> Result<()> { +pub async fn add_job_to_process_queue(id: Uuid) -> EyreResult<()> { log::info!("Adding job with id {:?} to processing queue", id); add_job_to_queue(id, JOB_PROCESSING_QUEUE.to_string(), None).await } -pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration) -> Result<()> { +pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration) -> EyreResult<()> { log::info!("Adding job with id {:?} to verification queue", id); add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay)).await } -pub async fn consume_job_from_queue(queue: String, handler: F) -> Result<()> +pub async fn consume_job_from_queue(queue: String, handler: F) -> EyreResult<()> where F: FnOnce(Uuid) -> Fut, - Fut: Future>, + Fut: Future>, { log::info!("Consuming from queue {:?}", queue); let config = config().await; @@ -68,7 +68,7 @@ where Ok(()) } -pub async fn init_consumers() -> Result<()> { +pub async fn init_consumers() -> Result<(), JobError> { // TODO: figure out a way to generalize this tokio::spawn(async move { loop { @@ -91,7 +91,7 @@ pub async fn init_consumers() -> Result<()> { Ok(()) } -async fn add_job_to_queue(id: Uuid, queue: String, delay: Option) -> Result<()> { +async fn add_job_to_queue(id: Uuid, queue: String, delay: Option) -> EyreResult<()> { let config = config().await; let message = JobQueueMessage { id }; config.queue().send_message_to_queue(queue, serde_json::to_string(&message)?, delay).await?; diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index 01829892..48599364 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -4,20 +4,22 @@ pub mod sqs; use std::time::Duration; use async_trait::async_trait; -use color_eyre::Result; +use color_eyre::Result as EyreResult; use mockall::automock; use omniqueue::{Delivery, QueueError}; +use crate::jobs::JobError; + /// The QueueProvider trait is used to define the methods that a queue /// should implement to be used as a queue for the orchestrator. The /// purpose of this trait is to allow developers to use any queue of their choice. #[automock] #[async_trait] pub trait QueueProvider: Send + Sync { - async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> Result<()>; + async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option) -> EyreResult<()>; async fn consume_message_from_queue(&self, queue: String) -> std::result::Result; } -pub async fn init_consumers() -> Result<()> { +pub async fn init_consumers() -> Result<(), JobError> { job_queue::init_consumers().await } From 62cce1d046e201eedd6ec4a1b6f076eddb45154b Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 1 Aug 2024 19:09:49 +0530 Subject: [PATCH 07/27] update: added fix for borrow in state update process job --- .../src/jobs/state_update_job/mod.rs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 59b25200..3ddf0caf 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -6,7 +6,7 @@ use ::utils::collections::{has_dup, is_sorted}; use async_trait::async_trait; use cairo_vm::Felt252; use color_eyre::eyre::eyre; -use color_eyre::Result; +use color_eyre::Result as EyreResult; use snos::io::output::StarknetOsOutput; use thiserror::Error; use uuid::Uuid; @@ -49,7 +49,9 @@ impl Job for StateUpdateJob { } async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let attempt_no = job + let job_cloned = job.clone(); + + let attempt_no = job_cloned .metadata .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) .ok_or_else(|| StateUpdateError::AttemptNumberNotFound)?; @@ -76,7 +78,7 @@ impl Job for StateUpdateJob { self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); - eyre!("Block #{block_no} - Error occured during the state update: {e}") + StateUpdateError::Other(eyre!("Block #{block_no} - Error occurred during the state update: {e}")) })?; sent_tx_hashes.push(tx_hash); } @@ -84,7 +86,9 @@ impl Job for StateUpdateJob { self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); // external_id returned corresponds to the last block number settled - Ok(block_numbers.last().expect("Last number in block_numbers array returned as None. Possible Error : Delay in job processing or Failed job execution.").to_string()) + let val = block_numbers.last().ok_or_else(|| StateUpdateError::LastNumberReturnedError)?; + + Ok(val.to_string()) } /// Returns the status of the passed job. @@ -164,7 +168,7 @@ impl Job for StateUpdateJob { impl StateUpdateJob { /// Read the metadata and parse the block numbers - fn get_block_numbers_from_metadata(&self, job: &JobItem) -> Result> { + fn get_block_numbers_from_metadata(&self, job: &JobItem) -> EyreResult> { let blocks_to_settle = job.metadata.get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY).ok_or_else(|| { eyre!("Block numbers to settle must be specified (state update job #{})", job.internal_id) })?; @@ -172,7 +176,7 @@ impl StateUpdateJob { } /// Parse a list of blocks comma separated - fn parse_block_numbers(&self, blocks_to_settle: &str) -> Result> { + fn parse_block_numbers(&self, blocks_to_settle: &str) -> EyreResult> { let sanitized_blocks = blocks_to_settle.replace(' ', ""); let block_numbers: Vec = sanitized_blocks .split(',') @@ -183,7 +187,7 @@ impl StateUpdateJob { } /// Validate that the list of block numbers to process is valid. - async fn validate_block_numbers(&self, config: &Config, block_numbers: &[u64]) -> Result<()> { + async fn validate_block_numbers(&self, config: &Config, block_numbers: &[u64]) -> EyreResult<()> { if block_numbers.is_empty() { return Err(eyre!("No block numbers found.")); } @@ -202,7 +206,12 @@ impl StateUpdateJob { } /// Update the state for the corresponding block using the settlement layer. - async fn update_state_for_block(&self, config: &Config, block_no: u64, snos: StarknetOsOutput) -> Result { + async fn update_state_for_block( + &self, + config: &Config, + block_no: u64, + snos: StarknetOsOutput, + ) -> EyreResult { let settlement_client = config.settlement_client(); let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { unimplemented!("update_state_for_block not implemented as of now for calldata DA.") @@ -255,6 +264,9 @@ pub enum StateUpdateError { #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, + #[error("Last number in block_numbers array returned as None. Possible Error : Delay in job processing or Failed job execution.")] + LastNumberReturnedError, + #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } From 53dc8c4c43fba2311f9ead75515f5d4124292635 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 2 Aug 2024 15:49:11 +0530 Subject: [PATCH 08/27] update: cleaning rework for JobError --- Cargo.lock | 1 - crates/orchestrator/Cargo.toml | 1 - crates/orchestrator/src/jobs/da_job/mod.rs | 59 +++++++++---------- crates/orchestrator/src/jobs/mod.rs | 14 ++--- .../orchestrator/src/jobs/proving_job/mod.rs | 7 +-- crates/orchestrator/src/jobs/snos_job/mod.rs | 2 +- .../src/jobs/state_update_job/mod.rs | 55 ++++++++++------- 7 files changed, 69 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1295d5bb..6dc64ecd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6315,7 +6315,6 @@ name = "orchestrator" version = "0.1.0" dependencies = [ "alloy 0.1.2", - "anyhow", "arc-swap", "assert_matches", "async-std", diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index d9430af2..b6566e8a 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -16,7 +16,6 @@ alloy = { workspace = true } arc-swap = { workspace = true } assert_matches = "1.5.0" async-std = "1.12.0" -anyhow = "1.0" async-trait = { workspace = true } aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 0b9d2793..f98eb6b9 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -3,13 +3,12 @@ use std::ops::{Add, Mul, Rem}; use std::str::FromStr; use async_trait::async_trait; -use color_eyre::eyre::{eyre, WrapErr}; +use color_eyre::eyre::WrapErr; use lazy_static::lazy_static; use num_bigint::{BigUint, ToBigUint}; use num_traits::{Num, Zero}; use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry}; use starknet::providers::Provider; -use std::result::Result::Ok as StdOk; use thiserror::Error; use tracing::log; use uuid::Uuid; @@ -61,19 +60,16 @@ impl Job for DaJob { } async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let block_no = job.internal_id.parse::().wrap_err_with(|| format!("Failed to parse u64"))?; + let block_no = job.internal_id.parse::().wrap_err_with(|| "Failed to parse u64".to_string())?; let state_update = config .starknet_client() .get_state_update(BlockId::Number(block_no)) .await - .wrap_err_with(|| format!("Failed to get state Update."))?; + .wrap_err_with(|| "Failed to get state Update.".to_string())?; let state_update = match state_update { - MaybePendingStateUpdate::PendingUpdate(_) => { - log::error!("Cannot process block {} for job id {} as it's still in pending state", block_no, job.id); - Err(DaError::BlockPending { block_no, job_id: job.id })? - } + MaybePendingStateUpdate::PendingUpdate(_) => Err(DaError::BlockPending { block_no, job_id: job.id })?, MaybePendingStateUpdate::Update(state_update) => state_update, }; // constructing the data from the rpc @@ -88,10 +84,11 @@ impl Job for DaJob { let max_blob_per_txn = config.da_client().max_blob_per_txn().await; // converting BigUints to Vec, one Vec represents one blob data - let blob_array = - data_to_blobs(max_bytes_per_blob, transformed_data).expect("error while converting blob data to vec"); - let current_blob_length: u64 = - blob_array.len().try_into().expect("Unable to convert the blob length into u64 format."); + let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?; + let current_blob_length: u64 = blob_array + .len() + .try_into() + .wrap_err_with(|| "Unable to convert the blob length into u64 format.".to_string())?; // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { @@ -162,13 +159,10 @@ pub fn convert_to_biguint(elements: Vec) -> Vec { biguint_vec } -fn data_to_blobs(blob_size: u64, block_data: Vec) -> color_eyre::Result>> { +fn data_to_blobs(blob_size: u64, block_data: Vec) -> Result>, JobError> { // Validate blob size if blob_size < 32 { - return Err(eyre!( - "Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {}", - blob_size, - )); + Err(DaError::InsufficientBlobSize { blob_size })? } let mut blobs: Vec> = Vec::new(); @@ -188,7 +182,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec) -> color_eyre::Result let mut last_blob = bytes; last_blob.resize(blob_size as usize, 0); // Pad with zeros blobs.push(last_blob); - println!("Warning: Remaining {} bytes not forming a complete blob were padded", remaining_bytes); + log::debug!("Warning: Remaining {} bytes not forming a complete blob were padded", remaining_bytes); } Ok(blobs) @@ -231,22 +225,20 @@ pub async fn state_update_to_blob_data( // nonce for the block if nonce.is_none() && !writes.is_empty() && addr != FieldElement::ONE { - let get_current_nonce_result = config.starknet_client().get_nonce(BlockId::Number(block_no), addr).await; - - nonce = match get_current_nonce_result { - StdOk(get_current_nonce) => Some(get_current_nonce), - Err(e) => { - log::error!("Failed to get nonce: {}", e); - return Err(eyre!("Failed to get nonce: {}", e)); - } - }; + let get_current_nonce_result = config + .starknet_client() + .get_nonce(BlockId::Number(block_no), addr) + .await + .wrap_err_with(|| "Failed to get nonce ".to_string())?; + + nonce = Some(get_current_nonce_result); } let da_word = da_word(class_flag.is_some(), nonce, writes.len() as u64); // @note: it can be improved if the first push to the data is of block number and hash // @note: ONE address is special address which for now has 1 value and that is current // block number and hash // @note: ONE special address can be used to mark the range of block, if in future - // the team wants to submit multiple blocks in a sinle blob etc. + // the team wants to submit multiple blocks in a single blob etc. if addr == FieldElement::ONE && da_word == FieldElement::ONE { continue; } @@ -277,18 +269,18 @@ pub async fn state_update_to_blob_data( } /// To store the blob data using the storage client with path /blob_data.txt -async fn store_blob_data(blob_data: Vec, block_number: u64, config: &Config) -> color_eyre::Result<()> { +async fn store_blob_data(blob_data: Vec, block_number: u64, config: &Config) -> Result<(), JobError> { let storage_client = config.storage(); let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME; let data_blob_big_uint = convert_to_biguint(blob_data.clone()); - let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint) - .expect("Not able to convert the data into blobs."); + let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint)?; let blob = blobs_array.clone(); // converting Vec into Vec - let blob_vec_u8 = bincode::serialize(&blob)?; + let blob_vec_u8 = + bincode::serialize(&blob).wrap_err_with(|| "Unable to Serialize blobs (Vec into Vec)".to_string())?; if !blobs_array.is_empty() { storage_client.put_data(blob_vec_u8.into(), &key).await?; @@ -548,6 +540,9 @@ pub enum DaError { #[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")] BlockPending { block_no: u64, job_id: Uuid }, + #[error("Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {blob_size:?}")] + InsufficientBlobSize { blob_size: u64 }, + #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 4fbe4dec..cad5929d 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -69,7 +69,6 @@ pub async fn create_job( let config = config().await; let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?; if existing_job.is_some() { - log::debug!("Job already exists for internal_id {:?} and job_type {:?}. Skipping.", internal_id, job_type); return Err(JobError::JobAlreadyExists { internal_id, job_type }); } @@ -94,7 +93,6 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { log::info!("Processing job with id {:?}", id); } _ => { - log::error!("Invalid status {:?} for job with id {:?}. Cannot process.", id, job.status); return Err(JobError::InvalidStatus { id, job_status: job.status }); } } @@ -132,7 +130,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { log::info!("Verifying job with id {:?}", id); } _ => { - log::error!("Invalid status {:?} for job with id {:?}. Cannot verify.", id, job.status); return Err(JobError::InvalidStatus { id, job_status: job.status }); } } @@ -204,10 +201,7 @@ async fn get_job(id: Uuid) -> Result { let job = config.database().get_job_by_id(id).await?; match job { Some(job) => Ok(job), - None => { - log::error!("Failed to find job with id {:?}", id); - Err(JobError::JobNotFound { id }) - } + None => Err(JobError::JobNotFound { id }), } } @@ -219,7 +213,7 @@ fn increment_key_in_metadata( let attempt = get_u64_from_metadata(metadata, key)?; let incremented_value = attempt.checked_add(1); if incremented_value.is_none() { - return Err(JobError::Arithmetic(format!("Incrementing key {} in metadata would exceed u64::MAX", key))); + return Err(JobError::KeyOutOfBounds { key: key.to_string() }); } new_metadata.insert(key.to_string(), incremented_value.unwrap().to_string()); Ok(new_metadata) @@ -244,8 +238,8 @@ pub enum JobError { #[error("Failed to find job with id {id:?}")] JobNotFound { id: Uuid }, - #[error("Arithmetic error: {0}")] - Arithmetic(String), + #[error("Incrementing key {} in metadata would exceed u64::MAX", key)] + KeyOutOfBounds { key: String }, #[error("DA Error: {0}")] DaJobError(#[from] DaError), diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index 6b87b731..729ac5d7 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -56,14 +56,14 @@ impl Job for ProvingJob { .prover_client() .submit_task(Task::CairoPie(cairo_pie)) .await - .wrap_err_with(|| format!("Prover Client Error"))?; + .wrap_err_with(|| "Prover Client Error".to_string())?; Ok(external_id) } async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { let task_id: String = job.external_id.unwrap_string()?.into(); let task_status = - config.prover_client().get_task_status(&task_id).await.wrap_err_with(|| format!("Prover Client Error"))?; + config.prover_client().get_task_status(&task_id).await.wrap_err_with(|| "Prover Client Error".to_string())?; match task_status { TaskStatus::Processing => Ok(JobVerificationStatus::Pending), @@ -99,9 +99,6 @@ pub enum ProvingError { #[error("Not able to read the cairo PIE file from the zip file provided.")] CairoPIENotReadable, - #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] - MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, - #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } diff --git a/crates/orchestrator/src/jobs/snos_job/mod.rs b/crates/orchestrator/src/jobs/snos_job/mod.rs index ab048699..8c532bb2 100644 --- a/crates/orchestrator/src/jobs/snos_job/mod.rs +++ b/crates/orchestrator/src/jobs/snos_job/mod.rs @@ -40,7 +40,7 @@ impl Job for SnosJob { async fn verify_job(&self, _config: &Config, _job: &mut JobItem) -> Result { // No need for verification as of now. If we later on decide to outsource SNOS run - // to another servicehow a, verify_job can be used to poll on the status of the job + // to another service, verify_job can be used to poll on the status of the job todo!() } diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 3ddf0caf..65a0bbc3 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -6,7 +6,6 @@ use ::utils::collections::{has_dup, is_sorted}; use async_trait::async_trait; use cairo_vm::Felt252; use color_eyre::eyre::eyre; -use color_eyre::Result as EyreResult; use snos::io::output::StarknetOsOutput; use thiserror::Error; use uuid::Uuid; @@ -49,12 +48,11 @@ impl Job for StateUpdateJob { } async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let job_cloned = job.clone(); - - let attempt_no = job_cloned + let attempt_no = job .metadata .get(JOB_PROCESS_ATTEMPT_METADATA_KEY) - .ok_or_else(|| StateUpdateError::AttemptNumberNotFound)?; + .ok_or_else(|| StateUpdateError::AttemptNumberNotFound)? + .clone(); // Read the metadata to get the blocks for which state update will be performed. // We assume that blocks nbrs are formatted as follow: "2,3,4,5,6". @@ -168,15 +166,17 @@ impl Job for StateUpdateJob { impl StateUpdateJob { /// Read the metadata and parse the block numbers - fn get_block_numbers_from_metadata(&self, job: &JobItem) -> EyreResult> { - let blocks_to_settle = job.metadata.get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY).ok_or_else(|| { - eyre!("Block numbers to settle must be specified (state update job #{})", job.internal_id) - })?; + fn get_block_numbers_from_metadata(&self, job: &JobItem) -> Result, JobError> { + let blocks_to_settle = job + .metadata + .get(JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY) + .ok_or_else(|| StateUpdateError::UnspecifiedBlockNumber { internal_id: job.internal_id.clone() })?; + self.parse_block_numbers(blocks_to_settle) } /// Parse a list of blocks comma separated - fn parse_block_numbers(&self, blocks_to_settle: &str) -> EyreResult> { + fn parse_block_numbers(&self, blocks_to_settle: &str) -> Result, JobError> { let sanitized_blocks = blocks_to_settle.replace(' ', ""); let block_numbers: Vec = sanitized_blocks .split(',') @@ -187,20 +187,20 @@ impl StateUpdateJob { } /// Validate that the list of block numbers to process is valid. - async fn validate_block_numbers(&self, config: &Config, block_numbers: &[u64]) -> EyreResult<()> { + async fn validate_block_numbers(&self, config: &Config, block_numbers: &[u64]) -> Result<(), JobError> { if block_numbers.is_empty() { - return Err(eyre!("No block numbers found.")); + Err(StateUpdateError::BlockNumberNotFound)?; } if has_dup(block_numbers) { - return Err(eyre!("Duplicated block numbers.")); + Err(StateUpdateError::DuplicateBlockNumbers)?; } if !is_sorted(block_numbers) { - return Err(eyre!("Block numbers aren't sorted in increasing order.")); + Err(StateUpdateError::UnsortedBlockNumbers)?; } // Check for gap between the last settled block and the first block to settle let last_settled_block: u64 = config.settlement_client().get_last_settled_block().await?; if last_settled_block + 1 != block_numbers[0] { - return Err(eyre!("Gap detected between the first block to settle and the last one settled.")); + Err(StateUpdateError::GapBetweenFirstAndLastBlock)?; } Ok(()) } @@ -211,7 +211,7 @@ impl StateUpdateJob { config: &Config, block_no: u64, snos: StarknetOsOutput, - ) -> EyreResult { + ) -> Result { let settlement_client = config.settlement_client(); let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { unimplemented!("update_state_for_block not implemented as of now for calldata DA.") @@ -221,7 +221,7 @@ impl StateUpdateJob { // Sending update_state transaction from the settlement client settlement_client.update_state_with_blobs(vec![], blob_data).await? } else { - return Err(eyre!("Block #{} - SNOS error, [use_kzg_da] should be either 0 or 1.", block_no)); + Err(StateUpdateError::UseKZGDaError { block_no })? }; Ok(last_tx_hash_executed) } @@ -255,18 +255,33 @@ pub enum StateUpdateError { #[error("last_failed_block should be a positive number")] LastFailedBlockNonPositive, + #[error("Block numbers to settle must be specified (state update job #{internal_id:?})")] + UnspecifiedBlockNumber { internal_id: String }, + #[error("Could not find tx hashes metadata for the current attempt")] TxnHashMetadataNotFound, #[error("Tx {tx_hash:?} should not be pending.")] TxnShouldNotBePending { tx_hash: String }, - #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] - MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, - #[error("Last number in block_numbers array returned as None. Possible Error : Delay in job processing or Failed job execution.")] LastNumberReturnedError, + #[error("No block numbers found.")] + BlockNumberNotFound, + + #[error("Duplicated block numbers.")] + DuplicateBlockNumbers, + + #[error("Block numbers aren't sorted in increasing order.")] + UnsortedBlockNumbers, + + #[error("Gap detected between the first block to settle and the last one settled.")] + GapBetweenFirstAndLastBlock, + + #[error("Block #{block_no:?} - SNOS error, [use_kzg_da] should be either 0 or 1.")] + UseKZGDaError { block_no: u64 }, + #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } From baebcc3d40de315aaf87d777a25becfea191cce7 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 10 Aug 2024 01:54:37 +0530 Subject: [PATCH 09/27] update: wrap_err_with -> wrap_err wherever used with string --- crates/orchestrator/src/jobs/da_job/mod.rs | 15 ++++++++++----- crates/orchestrator/src/jobs/mod.rs | 2 +- crates/orchestrator/src/jobs/proving_job/mod.rs | 6 ++++-- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index f98eb6b9..b9604def 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -60,13 +60,15 @@ impl Job for DaJob { } async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let block_no = job.internal_id.parse::().wrap_err_with(|| "Failed to parse u64".to_string())?; + let block_no = job.internal_id.parse::() + .wrap_err("Failed to parse u64".to_string())?; let state_update = config .starknet_client() .get_state_update(BlockId::Number(block_no)) .await - .wrap_err_with(|| "Failed to get state Update.".to_string())?; + .wrap_err("Failed to get state Update.".to_string())?; + let state_update = match state_update { MaybePendingStateUpdate::PendingUpdate(_) => Err(DaError::BlockPending { block_no, job_id: job.id })?, @@ -88,7 +90,7 @@ impl Job for DaJob { let current_blob_length: u64 = blob_array .len() .try_into() - .wrap_err_with(|| "Unable to convert the blob length into u64 format.".to_string())?; + .wrap_err("Unable to convert the blob length into u64 format.".to_string())?; // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { @@ -229,7 +231,8 @@ pub async fn state_update_to_blob_data( .starknet_client() .get_nonce(BlockId::Number(block_no), addr) .await - .wrap_err_with(|| "Failed to get nonce ".to_string())?; + .wrap_err("Failed to get nonce ".to_string())?; + nonce = Some(get_current_nonce_result); } @@ -280,7 +283,9 @@ async fn store_blob_data(blob_data: Vec, block_number: u64, config // converting Vec into Vec let blob_vec_u8 = - bincode::serialize(&blob).wrap_err_with(|| "Unable to Serialize blobs (Vec into Vec)".to_string())?; + bincode::serialize(&blob) + .wrap_err("Unable to Serialize blobs (Vec into Vec)".to_string())?; + if !blobs_array.is_empty() { storage_client.put_data(blob_vec_u8.into(), &key).await?; diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index cad5929d..19f783dd 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -224,7 +224,7 @@ fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color .get(key) .unwrap_or(&"0".to_string()) .parse::() - .wrap_err_with(|| format!("Failed to parse u64 from metadata key '{}'", key)) + .wrap_err(format!("Failed to parse u64 from metadata key '{}'", key)) } #[derive(Error, Debug)] diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index 729ac5d7..533cd22f 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -56,14 +56,16 @@ impl Job for ProvingJob { .prover_client() .submit_task(Task::CairoPie(cairo_pie)) .await - .wrap_err_with(|| "Prover Client Error".to_string())?; + .wrap_err("Prover Client Error".to_string())?; + Ok(external_id) } async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { let task_id: String = job.external_id.unwrap_string()?.into(); let task_status = - config.prover_client().get_task_status(&task_id).await.wrap_err_with(|| "Prover Client Error".to_string())?; + config.prover_client().get_task_status(&task_id).await + .wrap_err("Prover Client Error".to_string())?; match task_status { TaskStatus::Processing => Ok(JobVerificationStatus::Pending), From a29453e098679c841833cfc3b364ae453b9b67b7 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 3 Aug 2024 09:51:41 +0530 Subject: [PATCH 10/27] update: moved all JobErrors to top of file --- crates/orchestrator/src/jobs/da_job/mod.rs | 30 ++++--- crates/orchestrator/src/jobs/mod.rs | 54 ++++++------ .../orchestrator/src/jobs/proving_job/mod.rs | 27 +++--- .../src/jobs/state_update_job/mod.rs | 84 +++++++++---------- 4 files changed, 101 insertions(+), 94 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index b9604def..02ca75f1 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -38,6 +38,21 @@ lazy_static! { pub static ref BLOB_LEN: usize = 4096; } +#[derive(Error, Debug)] +pub enum DaError { + #[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")] + BlockPending { block_no: u64, job_id: Uuid }, + + #[error("Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {blob_size:?}")] + InsufficientBlobSize { blob_size: u64 }, + + #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] + MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} + pub struct DaJob; #[async_trait] @@ -60,8 +75,7 @@ impl Job for DaJob { } async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let block_no = job.internal_id.parse::() - .wrap_err("Failed to parse u64".to_string())?; + let block_no = job.internal_id.parse::().wrap_err("Failed to parse u64".to_string())?; let state_update = config .starknet_client() @@ -69,7 +83,6 @@ impl Job for DaJob { .await .wrap_err("Failed to get state Update.".to_string())?; - let state_update = match state_update { MaybePendingStateUpdate::PendingUpdate(_) => Err(DaError::BlockPending { block_no, job_id: job.id })?, MaybePendingStateUpdate::Update(state_update) => state_update, @@ -87,10 +100,8 @@ impl Job for DaJob { // converting BigUints to Vec, one Vec represents one blob data let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?; - let current_blob_length: u64 = blob_array - .len() - .try_into() - .wrap_err("Unable to convert the blob length into u64 format.".to_string())?; + let current_blob_length: u64 = + blob_array.len().try_into().wrap_err("Unable to convert the blob length into u64 format.".to_string())?; // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { @@ -233,7 +244,6 @@ pub async fn state_update_to_blob_data( .await .wrap_err("Failed to get nonce ".to_string())?; - nonce = Some(get_current_nonce_result); } let da_word = da_word(class_flag.is_some(), nonce, writes.len() as u64); @@ -283,9 +293,7 @@ async fn store_blob_data(blob_data: Vec, block_number: u64, config // converting Vec into Vec let blob_vec_u8 = - bincode::serialize(&blob) - .wrap_err("Unable to Serialize blobs (Vec into Vec)".to_string())?; - + bincode::serialize(&blob).wrap_err("Unable to Serialize blobs (Vec into Vec)".to_string())?; if !blobs_array.is_empty() { storage_client.put_data(blob_vec_u8.into(), &key).await?; diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 19f783dd..67ac0c0a 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -28,6 +28,33 @@ pub mod state_update_job; pub mod types; use thiserror::Error; +#[derive(Error, Debug)] +pub enum JobError { + #[error("Job already exists for internal_id {internal_id:?} and job_type {job_type:?}. Skipping!")] + JobAlreadyExists { internal_id: String, job_type: JobType }, + + #[error("Invalid status {id:?} for job with id {job_status:?}. Cannot process.")] + InvalidStatus { id: Uuid, job_status: JobStatus }, + + #[error("Failed to find job with id {id:?}")] + JobNotFound { id: Uuid }, + + #[error("Incrementing key {} in metadata would exceed u64::MAX", key)] + KeyOutOfBounds { key: String }, + + #[error("DA Error: {0}")] + DaJobError(#[from] DaError), + + #[error("Proving Error: {0}")] + ProvingJobError(#[from] ProvingError), + + #[error("Proving Error: {0}")] + StateUpdateJobError(#[from] StateUpdateError), + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} + /// The Job trait is used to define the methods that a job /// should implement to be used as a job for the orchestrator. The orchestrator automatically /// handles queueing and processing of jobs as long as they implement the trait. @@ -227,33 +254,6 @@ fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color .wrap_err(format!("Failed to parse u64 from metadata key '{}'", key)) } -#[derive(Error, Debug)] -pub enum JobError { - #[error("Job already exists for internal_id {internal_id:?} and job_type {job_type:?}. Skipping!")] - JobAlreadyExists { internal_id: String, job_type: JobType }, - - #[error("Invalid status {id:?} for job with id {job_status:?}. Cannot process.")] - InvalidStatus { id: Uuid, job_status: JobStatus }, - - #[error("Failed to find job with id {id:?}")] - JobNotFound { id: Uuid }, - - #[error("Incrementing key {} in metadata would exceed u64::MAX", key)] - KeyOutOfBounds { key: String }, - - #[error("DA Error: {0}")] - DaJobError(#[from] DaError), - - #[error("Proving Error: {0}")] - ProvingJobError(#[from] ProvingError), - - #[error("Proving Error: {0}")] - StateUpdateJobError(#[from] StateUpdateError), - - #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index 533cd22f..f886bf90 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -16,6 +16,18 @@ use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use super::{Job, JobError}; use crate::config::Config; +#[derive(Error, Debug)] +pub enum ProvingError { + #[error("Cairo PIE path is not specified - prover job #{internal_id:?}")] + CairoPIEWrongPath { internal_id: String }, + + #[error("Not able to read the cairo PIE file from the zip file provided.")] + CairoPIENotReadable, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} + pub struct ProvingJob; #[async_trait] @@ -64,8 +76,7 @@ impl Job for ProvingJob { async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { let task_id: String = job.external_id.unwrap_string()?.into(); let task_status = - config.prover_client().get_task_status(&task_id).await - .wrap_err("Prover Client Error".to_string())?; + config.prover_client().get_task_status(&task_id).await.wrap_err("Prover Client Error".to_string())?; match task_status { TaskStatus::Processing => Ok(JobVerificationStatus::Pending), @@ -92,15 +103,3 @@ impl Job for ProvingJob { 60 } } - -#[derive(Error, Debug)] -pub enum ProvingError { - #[error("Cairo PIE path is not specified - prover job #{internal_id:?}")] - CairoPIEWrongPath { internal_id: String }, - - #[error("Not able to read the cairo PIE file from the zip file provided.")] - CairoPIENotReadable, - - #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), -} diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 65a0bbc3..78ec70e3 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -25,6 +25,48 @@ use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::Job; +#[derive(Error, Debug)] +pub enum StateUpdateError { + #[error("Block numbers list should not be empty.")] + EmptyBlockNumberList, + + #[error("Could not find current attempt number.")] + AttemptNumberNotFound, + + #[error("last_failed_block should be a positive number")] + LastFailedBlockNonPositive, + + #[error("Block numbers to settle must be specified (state update job #{internal_id:?})")] + UnspecifiedBlockNumber { internal_id: String }, + + #[error("Could not find tx hashes metadata for the current attempt")] + TxnHashMetadataNotFound, + + #[error("Tx {tx_hash:?} should not be pending.")] + TxnShouldNotBePending { tx_hash: String }, + + #[error("Last number in block_numbers array returned as None. Possible Error : Delay in job processing or Failed job execution.")] + LastNumberReturnedError, + + #[error("No block numbers found.")] + BlockNumberNotFound, + + #[error("Duplicated block numbers.")] + DuplicateBlockNumbers, + + #[error("Block numbers aren't sorted in increasing order.")] + UnsortedBlockNumbers, + + #[error("Gap detected between the first block to settle and the last one settled.")] + GapBetweenFirstAndLastBlock, + + #[error("Block #{block_no:?} - SNOS error, [use_kzg_da] should be either 0 or 1.")] + UseKZGDaError { block_no: u64 }, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} + pub struct StateUpdateJob; #[async_trait] impl Job for StateUpdateJob { @@ -243,45 +285,3 @@ impl StateUpdateJob { job.metadata.insert(new_attempt_metadata_key, tx_hashes.join(",")); } } - -#[derive(Error, Debug)] -pub enum StateUpdateError { - #[error("Block numbers list should not be empty.")] - EmptyBlockNumberList, - - #[error("Could not find current attempt number.")] - AttemptNumberNotFound, - - #[error("last_failed_block should be a positive number")] - LastFailedBlockNonPositive, - - #[error("Block numbers to settle must be specified (state update job #{internal_id:?})")] - UnspecifiedBlockNumber { internal_id: String }, - - #[error("Could not find tx hashes metadata for the current attempt")] - TxnHashMetadataNotFound, - - #[error("Tx {tx_hash:?} should not be pending.")] - TxnShouldNotBePending { tx_hash: String }, - - #[error("Last number in block_numbers array returned as None. Possible Error : Delay in job processing or Failed job execution.")] - LastNumberReturnedError, - - #[error("No block numbers found.")] - BlockNumberNotFound, - - #[error("Duplicated block numbers.")] - DuplicateBlockNumbers, - - #[error("Block numbers aren't sorted in increasing order.")] - UnsortedBlockNumbers, - - #[error("Gap detected between the first block to settle and the last one settled.")] - GapBetweenFirstAndLastBlock, - - #[error("Block #{block_no:?} - SNOS error, [use_kzg_da] should be either 0 or 1.")] - UseKZGDaError { block_no: u64 }, - - #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), -} From c443dbeb43e20e2fa0021c64d3e86c378f8b4afb Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 3 Aug 2024 16:03:43 +0530 Subject: [PATCH 11/27] update: Errors for consume_job_from_queue --- crates/orchestrator/src/jobs/mod.rs | 11 ++++--- crates/orchestrator/src/queue/job_queue.rs | 35 +++++++++++++++++----- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 67ac0c0a..a9016ed1 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -1,6 +1,10 @@ use std::collections::HashMap; use std::time::Duration; +use crate::config::{config, Config}; +use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; +use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; +use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue, ConsumptionError}; use async_trait::async_trait; use mockall::automock; use mockall_double::double; @@ -11,12 +15,8 @@ use state_update_job::StateUpdateError; use tracing::log; use uuid::Uuid; -use crate::config::{config, Config}; -use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; #[double] use crate::jobs::job_handler_factory::factory; -use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue}; pub mod constants; pub mod da_job; @@ -51,6 +51,9 @@ pub enum JobError { #[error("Proving Error: {0}")] StateUpdateJobError(#[from] StateUpdateError), + #[error("Queue Handling Error: {0}")] + ConsumptionError(#[from] ConsumptionError), + #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), } diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index c7204feb..8467fc7e 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::time::Duration; -use color_eyre::eyre::eyre; +use color_eyre::eyre::Context; use color_eyre::Result as EyreResult; use omniqueue::QueueError; use serde::{Deserialize, Serialize}; @@ -15,6 +15,20 @@ use crate::jobs::{process_job, verify_job, JobError}; pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ConsumptionError { + #[error("Failed to consume message from queue, error {error_msg:?}")] + FailedToConsumeFromQueue { error_msg : String }, + + #[error("Failed to handle job with id {job_id:?}. Error: {error_msg:?}")] + FailedToHandleJob {job_id : Uuid, error_msg : String }, + + #[error("Other error: {0}")] + Other(#[from] color_eyre::eyre::Error), +} + #[derive(Debug, Serialize, Deserialize)] pub struct JobQueueMessage { pub(crate) id: Uuid, @@ -30,7 +44,7 @@ pub async fn add_job_to_verification_queue(id: Uuid, delay: Duration) -> EyreRes add_job_to_queue(id, JOB_VERIFICATION_QUEUE.to_string(), Some(delay)).await } -pub async fn consume_job_from_queue(queue: String, handler: F) -> EyreResult<()> +pub async fn consume_job_from_queue(queue: String, handler: F) -> Result<(), ConsumptionError> where F: FnOnce(Uuid) -> Fut, Fut: Future>, @@ -43,22 +57,27 @@ where return Ok(()); } Err(e) => { - return Err(eyre!("Failed to consume message from queue, error {}", e)); + return Err(ConsumptionError::FailedToConsumeFromQueue { error_msg : e.to_string()}); } }; - let job_message: Option = delivery.payload_serde_json()?; + let job_message: Option = delivery.payload_serde_json() + .wrap_err("Payload Serde Error ")?; match job_message { Some(job_message) => { log::info!("Handling job with id {:?} for queue {:?}", job_message.id, queue); - match handler(job_message.id).await { - Ok(_) => delivery.ack().await.map_err(|(e, _)| e)?, + let _ = match handler(job_message.id).await { + Ok(_) => delivery.ack().await.map_err(|(e, _)| e).wrap_err("Queue Error "), Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); - + // if the queue as a retry logic at the source, it will be attempted // after the nack - delivery.nack().await.map_err(|(e, _)| e)?; + match delivery.nack().await { + Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, error_msg: "Job handling failed, message nack-ed".to_string() })?, + Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, error_msg: delivery_nack_error.0.to_string() })? + } + } }; } From ab74a7caf40618fb49ff0295e35d724ca8b3f83c Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 3 Aug 2024 16:04:58 +0530 Subject: [PATCH 12/27] update: linting fixes --- crates/orchestrator/src/queue/job_queue.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 8467fc7e..203e1e79 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -20,10 +20,10 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum ConsumptionError { #[error("Failed to consume message from queue, error {error_msg:?}")] - FailedToConsumeFromQueue { error_msg : String }, + FailedToConsumeFromQueue { error_msg: String }, #[error("Failed to handle job with id {job_id:?}. Error: {error_msg:?}")] - FailedToHandleJob {job_id : Uuid, error_msg : String }, + FailedToHandleJob { job_id: Uuid, error_msg: String }, #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), @@ -57,11 +57,10 @@ where return Ok(()); } Err(e) => { - return Err(ConsumptionError::FailedToConsumeFromQueue { error_msg : e.to_string()}); + return Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }); } }; - let job_message: Option = delivery.payload_serde_json() - .wrap_err("Payload Serde Error ")?; + let job_message: Option = delivery.payload_serde_json().wrap_err("Payload Serde Error ")?; match job_message { Some(job_message) => { @@ -70,14 +69,19 @@ where Ok(_) => delivery.ack().await.map_err(|(e, _)| e).wrap_err("Queue Error "), Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); - + // if the queue as a retry logic at the source, it will be attempted // after the nack match delivery.nack().await { - Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, error_msg: "Job handling failed, message nack-ed".to_string() })?, - Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, error_msg: delivery_nack_error.0.to_string() })? + Ok(_) => Err(ConsumptionError::FailedToHandleJob { + job_id: job_message.id, + error_msg: "Job handling failed, message nack-ed".to_string(), + })?, + Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob { + job_id: job_message.id, + error_msg: delivery_nack_error.0.to_string(), + })?, } - } }; } From 6c8f898cb38527e2463c5f16d0a1220a8fb110ea Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 5 Aug 2024 09:42:02 +0530 Subject: [PATCH 13/27] update: optimised consume_job_from_queue match statement --- crates/orchestrator/src/queue/job_queue.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 203e1e79..be634935 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -65,8 +65,8 @@ where match job_message { Some(job_message) => { log::info!("Handling job with id {:?} for queue {:?}", job_message.id, queue); - let _ = match handler(job_message.id).await { - Ok(_) => delivery.ack().await.map_err(|(e, _)| e).wrap_err("Queue Error "), + match handler(job_message.id).await { + Ok(_) => delivery.ack().await.map_err(|(e, _)| e).wrap_err("Queue Error ")?, Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); From 9f5267c4507a8de95b9daf1a526f36a3ab7137a8 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 5 Aug 2024 22:39:34 +0530 Subject: [PATCH 14/27] update: code optimisation based on PR reviews --- crates/orchestrator/src/jobs/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index a9016ed1..4e7c5193 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -242,9 +242,7 @@ fn increment_key_in_metadata( let mut new_metadata = metadata.clone(); let attempt = get_u64_from_metadata(metadata, key)?; let incremented_value = attempt.checked_add(1); - if incremented_value.is_none() { - return Err(JobError::KeyOutOfBounds { key: key.to_string() }); - } + incremented_value.ok_or_else(|| JobError::KeyOutOfBounds { key: key.to_string() })?; new_metadata.insert(key.to_string(), incremented_value.unwrap().to_string()); Ok(new_metadata) } From 1b3f9a683cfaaa8f2eadba41da29f81cc4bc3bbb Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 10 Aug 2024 02:08:18 +0530 Subject: [PATCH 15/27] chore: lint fixes --- CHANGELOG.md | 3 ++- crates/orchestrator/src/jobs/da_job/mod.rs | 15 --------------- crates/orchestrator/src/jobs/mod.rs | 17 +++-------------- 3 files changed, 5 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb03bc5c..a8ed5e7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - `AWS_DEFAULT_REGION="localhost"` var. in .env.test for omniqueue queue testing. - Added basic rust-toolchain support. - Tests for DA job. +- Added generalized errors for Jobs : JobError + ## Changed @@ -23,7 +25,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder. - `.env` file requires two more variables which are queue urls for processing and verification. -- Shifted Unit tests to test folder for DA job. ## Removed diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 02ca75f1..6700b039 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -547,18 +547,3 @@ pub mod test { } } } - -#[derive(Error, Debug)] -pub enum DaError { - #[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")] - BlockPending { block_no: u64, job_id: Uuid }, - - #[error("Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {blob_size:?}")] - InsufficientBlobSize { blob_size: u64 }, - - #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] - MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, - - #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), -} diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 4e7c5193..60979476 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -6,10 +6,10 @@ use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue, ConsumptionError}; use async_trait::async_trait; -use mockall::automock; -use mockall_double::double; use color_eyre::eyre::Context; use da_job::DaError; +use mockall::automock; +use mockall_double::double; use proving_job::ProvingError; use state_update_job::StateUpdateError; use tracing::log; @@ -89,7 +89,6 @@ pub trait Job: Send + Sync { fn verification_polling_delay_seconds(&self) -> u64; } - /// Creates the job in the DB in the created state and adds it to the process queue pub async fn create_job( job_type: JobType, @@ -216,16 +215,6 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { Ok(()) } -fn get_job_handler(job_type: &JobType) -> Box { - match job_type { - JobType::DataSubmission => Box::new(da_job::DaJob), - JobType::SnosRun => Box::new(snos_job::SnosJob), - JobType::ProofCreation => Box::new(proving_job::ProvingJob), - JobType::StateTransition => Box::new(state_update_job::StateUpdateJob), - _ => unimplemented!("Job type not implemented yet."), - } -} - async fn get_job(id: Uuid) -> Result { let config = config().await; let job = config.database().get_job_by_id(id).await?; @@ -235,7 +224,7 @@ async fn get_job(id: Uuid) -> Result { } } -fn increment_key_in_metadata( +pub fn increment_key_in_metadata( metadata: &HashMap, key: &str, ) -> Result, JobError> { From 4580e92ad1d087a02cb95a0027de9c81ee675025 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 10 Aug 2024 02:10:59 +0530 Subject: [PATCH 16/27] chore: lint fixes --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8ed5e7d..95d1f450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - `AWS_DEFAULT_REGION="localhost"` var. in .env.test for omniqueue queue testing. - Added basic rust-toolchain support. - Tests for DA job. -- Added generalized errors for Jobs : JobError - +- Added generalized errors for Jobs : JobError. ## Changed From 7ed04b6cfefe38e30e4a43a982fd971d1c7d420c Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Sat, 10 Aug 2024 09:52:22 +0530 Subject: [PATCH 17/27] update: error matching optimised --- crates/orchestrator/src/jobs/da_job/mod.rs | 6 ++++-- crates/orchestrator/src/tests/jobs/da_job/mod.rs | 14 ++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 6700b039..7fd72526 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -41,7 +41,7 @@ lazy_static! { #[derive(Error, Debug)] pub enum DaError { #[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")] - BlockPending { block_no: u64, job_id: Uuid }, + BlockPending { block_no: String, job_id: Uuid }, #[error("Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {blob_size:?}")] InsufficientBlobSize { blob_size: u64 }, @@ -84,7 +84,9 @@ impl Job for DaJob { .wrap_err("Failed to get state Update.".to_string())?; let state_update = match state_update { - MaybePendingStateUpdate::PendingUpdate(_) => Err(DaError::BlockPending { block_no, job_id: job.id })?, + MaybePendingStateUpdate::PendingUpdate(_) => { + Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })? + } MaybePendingStateUpdate::Update(state_update) => state_update, }; // constructing the data from the rpc diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index bcc02050..7d194895 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -1,5 +1,5 @@ use crate::jobs::da_job::test::{get_nonce_attached, read_state_update_from_file}; -use crate::jobs::da_job::DaJob; +use crate::jobs::da_job::{DaError, DaJob}; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; use crate::tests::common::drop_database; use crate::tests::config::TestConfigBuilder; @@ -137,13 +137,11 @@ async fn test_da_job_process_job_failure_on_pending_block() { assert_matches!(response, Err(e) => { - let expected_error = eyre!( - "Cannot process block {} for job id {} as it's still in pending state", - internal_id.to_string(), - Uuid::default() - ) - .to_string(); - assert_eq!(e.to_string(), expected_error); + let expected_error = DaError::BlockPending { + block_no: internal_id.to_string(), + job_id: Uuid::default() + }; + assert_eq!(e.to_string(), expected_error.to_string()); } ); From 62715cae438f68bfcaa0f38ab8e757028115c8f2 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 10:25:08 +0530 Subject: [PATCH 18/27] update: block_no to String and correct assert on test_da_job_process_job_failure_on_small_blob_size --- crates/orchestrator/src/jobs/da_job/mod.rs | 6 +++--- crates/orchestrator/src/tests/jobs/da_job/mod.rs | 11 ++--------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 7fd72526..6889c08e 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -47,7 +47,7 @@ pub enum DaError { InsufficientBlobSize { blob_size: u64 }, #[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")] - MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: u64, job_id: Uuid }, + MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: String, job_id: Uuid }, #[error("Other error: {0}")] Other(#[from] color_eyre::eyre::Error), @@ -85,7 +85,7 @@ impl Job for DaJob { let state_update = match state_update { MaybePendingStateUpdate::PendingUpdate(_) => { - Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })? + Err(DaError::BlockPending { block_no : block_no.to_string(), job_id: job.id })? } MaybePendingStateUpdate::Update(state_update) => state_update, }; @@ -107,7 +107,7 @@ impl Job for DaJob { // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { - Err(DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no, job_id: job.id })? + Err(DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no : block_no.to_string(), job_id: job.id })? } // making the txn to the DA layer diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 7d194895..24f7b667 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -73,15 +73,8 @@ async fn test_da_job_process_job_failure_on_small_blob_size( assert_matches!(response, Err(e) => { - let expected_error = eyre!( - "Exceeded the maximum number of blobs per transaction: allowed {}, found {} for block {} and job id {}", - max_blob_per_txn, - current_blob_length, - internal_id.to_string(), - Uuid::default() - ) - .to_string(); - assert_eq!(e.to_string(), expected_error); + let expected_error = DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no: internal_id.to_string(), job_id: Uuid::default() } ; + assert_eq!(e.to_string(), expected_error.to_string()); } ); From a8104c7dc80b76482a4582ea3944263639e5af49 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 10:26:39 +0530 Subject: [PATCH 19/27] fix: linting --- crates/orchestrator/src/jobs/da_job/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 6889c08e..4d30b086 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -85,7 +85,7 @@ impl Job for DaJob { let state_update = match state_update { MaybePendingStateUpdate::PendingUpdate(_) => { - Err(DaError::BlockPending { block_no : block_no.to_string(), job_id: job.id })? + Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })? } MaybePendingStateUpdate::Update(state_update) => state_update, }; @@ -107,7 +107,12 @@ impl Job for DaJob { // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { - Err(DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no : block_no.to_string(), job_id: job.id })? + Err(DaError::MaxBlobsLimitExceeded { + max_blob_per_txn, + current_blob_length, + block_no: block_no.to_string(), + job_id: job.id, + })? } // making the txn to the DA layer From 1d93eb186b7ae3b186fe9e4565f89417a98e5835 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 10:57:57 +0530 Subject: [PATCH 20/27] update: error matching checks --- crates/orchestrator/src/tests/jobs/da_job/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/tests/jobs/da_job/mod.rs b/crates/orchestrator/src/tests/jobs/da_job/mod.rs index 24f7b667..1390e46e 100644 --- a/crates/orchestrator/src/tests/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/da_job/mod.rs @@ -1,6 +1,7 @@ use crate::jobs::da_job::test::{get_nonce_attached, read_state_update_from_file}; use crate::jobs::da_job::{DaError, DaJob}; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::jobs::JobError; use crate::tests::common::drop_database; use crate::tests::config::TestConfigBuilder; use crate::{config::config, jobs::Job}; @@ -73,7 +74,8 @@ async fn test_da_job_process_job_failure_on_small_blob_size( assert_matches!(response, Err(e) => { - let expected_error = DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no: internal_id.to_string(), job_id: Uuid::default() } ; + let err = DaError::MaxBlobsLimitExceeded { max_blob_per_txn, current_blob_length, block_no: internal_id.to_string(), job_id: Uuid::default() }; + let expected_error = JobError::DaJobError(err); assert_eq!(e.to_string(), expected_error.to_string()); } ); @@ -130,10 +132,11 @@ async fn test_da_job_process_job_failure_on_pending_block() { assert_matches!(response, Err(e) => { - let expected_error = DaError::BlockPending { + let err = DaError::BlockPending { block_no: internal_id.to_string(), job_id: Uuid::default() }; + let expected_error = JobError::DaJobError(err); assert_eq!(e.to_string(), expected_error.to_string()); } ); From e8f271563dc7a0502e56a4601dd54f8cdf95d3a5 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 12:04:44 +0530 Subject: [PATCH 21/27] update: test state_update fix --- crates/orchestrator/src/tests/jobs/state_update_job/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index cd5d8f85..56b8253e 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -189,7 +189,7 @@ async fn test_process_job_invalid_input_gap() { let mut job = StateUpdateJob.create_job(config().await.as_ref(), String::from("internal_id"), metadata).await.unwrap(); - let _ = StateUpdateJob.process_job(config().await.as_ref(), &mut job).await.unwrap(); + let _ = StateUpdateJob.process_job(config().await.as_ref(), &mut job).await.unwrap_err(); } // ==================== Utility functions =========================== From c4a484a84c31457852fe0859946db59cdafb1ef4 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 12:40:11 +0530 Subject: [PATCH 22/27] update: test state_update fix --- .../src/tests/jobs/state_update_job/mod.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 56b8253e..6bbc77a9 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -8,6 +8,7 @@ use lazy_static::lazy_static; use mockall::predicate::eq; use rstest::*; use settlement_client_interface::MockSettlementClient; +use assert_matches::assert_matches; use super::super::common::init_config; use crate::config::{config, config_force_init}; @@ -18,9 +19,9 @@ use crate::jobs::constants::{ JOB_PROCESS_ATTEMPT_METADATA_KEY, }; use crate::jobs::state_update_job::utils::hex_string_to_u8_vec; -use crate::jobs::state_update_job::StateUpdateJob; +use crate::jobs::state_update_job::{StateUpdateError, StateUpdateJob}; use crate::jobs::types::{JobStatus, JobType}; -use crate::jobs::Job; +use crate::jobs::{Job, JobError}; lazy_static! { pub static ref CURRENT_PATH: PathBuf = std::env::current_dir().unwrap(); @@ -189,7 +190,17 @@ async fn test_process_job_invalid_input_gap() { let mut job = StateUpdateJob.create_job(config().await.as_ref(), String::from("internal_id"), metadata).await.unwrap(); - let _ = StateUpdateJob.process_job(config().await.as_ref(), &mut job).await.unwrap_err(); + let response = StateUpdateJob.process_job(config().await.as_ref(), &mut job).await; + + assert_matches!(response, + Err(e) => { + let err = StateUpdateError::GapBetweenFirstAndLastBlock; + let expected_error = JobError::StateUpdateJobError(err); + assert_eq!(e.to_string(), expected_error.to_string()); + } + ); + + } // ==================== Utility functions =========================== From 43d95cef4815244b864cdf32dd8f4d213100d951 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 13:27:25 +0530 Subject: [PATCH 23/27] update: test state_update fix --- crates/orchestrator/src/tests/jobs/state_update_job/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 6bbc77a9..e35db913 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -164,7 +164,6 @@ async fn test_process_job_invalid_inputs(#[case] block_numbers_to_settle: String #[rstest] #[tokio::test] -#[should_panic(expected = "Gap detected between the first block to settle and the last one settle")] async fn test_process_job_invalid_input_gap() { let server = MockServer::start(); let mut settlement_client = MockSettlementClient::new(); From c040981c0a73f58c44fd8231bb612adacbd5d89b Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Mon, 12 Aug 2024 13:28:40 +0530 Subject: [PATCH 24/27] fix: lint --- crates/orchestrator/src/tests/jobs/state_update_job/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index e35db913..a3856f54 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -2,13 +2,13 @@ use std::collections::HashMap; use std::fs; use std::path::PathBuf; +use assert_matches::assert_matches; use bytes::Bytes; use httpmock::prelude::*; use lazy_static::lazy_static; use mockall::predicate::eq; use rstest::*; use settlement_client_interface::MockSettlementClient; -use assert_matches::assert_matches; use super::super::common::init_config; use crate::config::{config, config_force_init}; @@ -198,8 +198,6 @@ async fn test_process_job_invalid_input_gap() { assert_eq!(e.to_string(), expected_error.to_string()); } ); - - } // ==================== Utility functions =========================== From 14204528972ebfea048a4eec542514c051436371 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Tue, 13 Aug 2024 07:53:01 +0530 Subject: [PATCH 25/27] chore: imports fixed --- crates/orchestrator/src/tests/jobs/state_update_job/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 899e283a..876ab050 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -10,6 +10,7 @@ use rstest::*; use settlement_client_interface::MockSettlementClient; use color_eyre::eyre::eyre; +use utils::env_utils::get_env_var_or_panic; use super::super::common::init_config; use crate::config::{config, config_force_init}; @@ -24,7 +25,7 @@ use crate::jobs::state_update_job::utils::hex_string_to_u8_vec; use crate::jobs::state_update_job::{StateUpdateError, StateUpdateJob}; use crate::jobs::types::{JobStatus, JobType}; use crate::jobs::{Job, JobError}; -use crate::tests::common::default_job_item; +use crate::tests::common::{default_job_item, get_storage_client}; use crate::tests::config::TestConfigBuilder; use lazy_static::lazy_static; @@ -58,6 +59,10 @@ async fn test_process_job_works( #[case] processing_start_index: u8, ) { // Will be used by storage client which we call while storing the data. + + use num::ToPrimitive; + + use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); // Mocking the settlement client. From 2429af94523007b0078c3fc163d74fa7af75c6c5 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Wed, 14 Aug 2024 17:43:02 +0530 Subject: [PATCH 26/27] feat : updated the errors enum and implemented OtherTest wrapper for eyre errors --- crates/orchestrator/src/jobs/da_job/mod.rs | 46 ++++++--- crates/orchestrator/src/jobs/mod.rs | 97 ++++++++++++++----- .../orchestrator/src/jobs/proving_job/mod.rs | 19 ++-- .../src/jobs/state_update_job/mod.rs | 39 +++++--- crates/orchestrator/src/queue/job_queue.rs | 20 ++-- .../src/tests/jobs/state_update_job/mod.rs | 4 +- 6 files changed, 163 insertions(+), 62 deletions(-) diff --git a/crates/orchestrator/src/jobs/da_job/mod.rs b/crates/orchestrator/src/jobs/da_job/mod.rs index 4d30b086..592e6ad1 100644 --- a/crates/orchestrator/src/jobs/da_job/mod.rs +++ b/crates/orchestrator/src/jobs/da_job/mod.rs @@ -14,7 +14,7 @@ use tracing::log; use uuid::Uuid; use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use super::{Job, JobError}; +use super::{Job, JobError, OtherError}; use crate::config::Config; use crate::constants::BLOB_DATA_FILE_NAME; @@ -38,7 +38,7 @@ lazy_static! { pub static ref BLOB_LEN: usize = 4096; } -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum DaError { #[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")] BlockPending { block_no: String, job_id: Uuid }, @@ -50,7 +50,7 @@ pub enum DaError { MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: String, job_id: Uuid }, #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), + Other(#[from] OtherError), } pub struct DaJob; @@ -75,13 +75,18 @@ impl Job for DaJob { } async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result { - let block_no = job.internal_id.parse::().wrap_err("Failed to parse u64".to_string())?; + let block_no = job + .internal_id + .parse::() + .wrap_err("Failed to parse u64".to_string()) + .map_err(|e| JobError::Other(OtherError(e)))?; let state_update = config .starknet_client() .get_state_update(BlockId::Number(block_no)) .await - .wrap_err("Failed to get state Update.".to_string())?; + .wrap_err("Failed to get state Update.".to_string()) + .map_err(|e| JobError::Other(OtherError(e)))?; let state_update = match state_update { MaybePendingStateUpdate::PendingUpdate(_) => { @@ -90,7 +95,9 @@ impl Job for DaJob { MaybePendingStateUpdate::Update(state_update) => state_update, }; // constructing the data from the rpc - let blob_data = state_update_to_blob_data(block_no, state_update, config).await?; + let blob_data = state_update_to_blob_data(block_no, state_update, config) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; // transforming the data so that we can apply FFT on this. // @note: we can skip this step if in the above step we return vec directly let blob_data_biguint = convert_to_biguint(blob_data.clone()); @@ -102,8 +109,11 @@ impl Job for DaJob { // converting BigUints to Vec, one Vec represents one blob data let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?; - let current_blob_length: u64 = - blob_array.len().try_into().wrap_err("Unable to convert the blob length into u64 format.".to_string())?; + let current_blob_length: u64 = blob_array + .len() + .try_into() + .wrap_err("Unable to convert the blob length into u64 format.".to_string()) + .map_err(|e| JobError::Other(OtherError(e)))?; // there is a limit on number of blobs per txn, checking that here if current_blob_length > max_blob_per_txn { @@ -116,13 +126,22 @@ impl Job for DaJob { } // making the txn to the DA layer - let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await?; + let external_id = config + .da_client() + .publish_state_diff(blob_array, &[0; 32]) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; Ok(external_id) } async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { - Ok(config.da_client().verify_inclusion(job.external_id.unwrap_string()?).await?.into()) + Ok(config + .da_client() + .verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?) + .await + .map_err(|e| JobError::Other(OtherError(e)))? + .into()) } fn max_process_attempts(&self) -> u64 { @@ -299,11 +318,12 @@ async fn store_blob_data(blob_data: Vec, block_number: u64, config let blob = blobs_array.clone(); // converting Vec into Vec - let blob_vec_u8 = - bincode::serialize(&blob).wrap_err("Unable to Serialize blobs (Vec into Vec)".to_string())?; + let blob_vec_u8 = bincode::serialize(&blob) + .wrap_err("Unable to Serialize blobs (Vec into Vec)".to_string()) + .map_err(|e| JobError::Other(OtherError(e)))?; if !blobs_array.is_empty() { - storage_client.put_data(blob_vec_u8.into(), &key).await?; + storage_client.put_data(blob_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?; } Ok(()) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 71c21a79..e4ae1ed6 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::fmt; use std::time::Duration; use crate::config::{config, Config}; @@ -6,7 +7,7 @@ use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue, ConsumptionError}; use async_trait::async_trait; -use color_eyre::eyre::Context; +use color_eyre::eyre::{eyre, Context}; use da_job::DaError; use mockall::automock; use mockall_double::double; @@ -28,7 +29,7 @@ pub mod state_update_job; pub mod types; use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum JobError { #[error("Job already exists for internal_id {internal_id:?} and job_type {job_type:?}. Skipping!")] JobAlreadyExists { internal_id: String, job_type: JobType }, @@ -55,9 +56,41 @@ pub enum JobError { ConsumptionError(#[from] ConsumptionError), #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), + Other(#[from] OtherError), } +// ==================================================== +/// Wrapper Type for Other(<>) job type +#[derive(Debug)] +pub struct OtherError(color_eyre::eyre::Error); + +impl fmt::Display for OtherError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for OtherError {} + +impl PartialEq for OtherError { + fn eq(&self, _other: &Self) -> bool { + false + } +} + +impl From for OtherError { + fn from(err: color_eyre::eyre::Error) -> Self { + OtherError(err) + } +} + +impl From for OtherError { + fn from(error_string: String) -> Self { + OtherError(eyre!(error_string)) + } +} +// ==================================================== + /// The Job trait is used to define the methods that a job /// should implement to be used as a job for the orchestrator. The orchestrator automatically /// handles queueing and processing of jobs as long as they implement the trait. @@ -96,16 +129,20 @@ pub async fn create_job( metadata: HashMap, ) -> Result<(), JobError> { let config = config().await; - let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?; + let existing_job = config + .database() + .get_job_by_internal_id_and_type(internal_id.as_str(), &job_type) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; if existing_job.is_some() { return Err(JobError::JobAlreadyExists { internal_id, job_type }); } let job_handler = factory::get_job_handler(&job_type).await; let job_item = job_handler.create_job(config.as_ref(), internal_id, metadata).await?; - config.database().create_job(job_item.clone()).await?; + config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?; - add_job_to_process_queue(job_item.id).await?; + add_job_to_process_queue(job_item.id).await.map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } @@ -128,7 +165,11 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { // this updates the version of the job. this ensures that if another thread was about to process // the same job, it would fail to update the job in the database because the version would be // outdated - config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?; + config + .database() + .update_job_status(&job, JobStatus::LockedForProcessing) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; let job_handler = factory::get_job_handler(&job.job_type).await; let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; @@ -141,10 +182,11 @@ pub async fn process_job(id: Uuid) -> Result<(), JobError> { job_updated.status = JobStatus::PendingVerification; job_updated.metadata = metadata; - config.database().update_job(&job_updated).await?; + config.database().update_job(&job_updated).await.map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_verification_queue(job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds())) - .await?; + .await + .map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } @@ -171,26 +213,31 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { match verification_status { JobVerificationStatus::Verified => { - config.database().update_job_status(&job, JobStatus::Completed).await?; + config + .database() + .update_job_status(&job, JobStatus::Completed) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; } JobVerificationStatus::Rejected(e) => { let mut new_job = job.clone(); new_job.metadata.insert("error".to_string(), e); new_job.status = JobStatus::VerificationFailed; - config.database().update_job(&new_job).await?; + config.database().update_job(&new_job).await.map_err(|e| JobError::Other(OtherError(e)))?; log::error!("Verification failed for job with id {:?}. Cannot verify.", id); // retry job processing if we haven't exceeded the max limit - let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; + let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY) + .map_err(|e| JobError::Other(OtherError(e)))?; if process_attempts < job_handler.max_process_attempts() { log::info!( "Verification failed for job {}. Retrying processing attempt {}.", job.id, process_attempts + 1 ); - add_job_to_process_queue(job.id).await?; + add_job_to_process_queue(job.id).await.map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); } else { // TODO: send alert @@ -198,20 +245,26 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { } JobVerificationStatus::Pending => { log::info!("Inclusion is still pending for job {}. Pushing back to queue.", job.id); - let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; + let verify_attempts = get_u64_from_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY) + .map_err(|e| JobError::Other(OtherError(e)))?; if verify_attempts >= job_handler.max_verification_attempts() { // TODO: send alert log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id); - config.database().update_job_status(&job, JobStatus::VerificationTimeout).await?; + config + .database() + .update_job_status(&job, JobStatus::VerificationTimeout) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; return Ok(()); } let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; - config.database().update_metadata(&job, metadata).await?; + config.database().update_metadata(&job, metadata).await.map_err(|e| JobError::Other(OtherError(e)))?; add_job_to_verification_queue( job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds()), ) - .await?; + .await + .map_err(|e| JobError::Other(OtherError(e)))?; } }; @@ -220,7 +273,7 @@ pub async fn verify_job(id: Uuid) -> Result<(), JobError> { /// Terminates the job and updates the status of the job in the DB. /// Logs error if the job status `Completed` is existing on DL queue. -pub async fn handle_job_failure(id: Uuid) -> Result<()> { +pub async fn handle_job_failure(id: Uuid) -> Result<(), JobError> { let config = config().await; let mut job = get_job(id).await?.clone(); @@ -240,14 +293,14 @@ pub async fn handle_job_failure(id: Uuid) -> Result<()> { job.metadata = metadata; job.status = JobStatus::Failed; - config.database().update_job(&job).await?; + config.database().update_job(&job).await.map_err(|e| JobError::Other(OtherError(e)))?; Ok(()) } -async fn get_job(id: Uuid) -> Result { +async fn get_job(id: Uuid) -> Result { let config = config().await; - let job = config.database().get_job_by_id(id).await?; + let job = config.database().get_job_by_id(id).await.map_err(|e| JobError::Other(OtherError(e)))?; match job { Some(job) => Ok(job), None => Err(JobError::JobNotFound { id }), @@ -259,7 +312,7 @@ pub fn increment_key_in_metadata( key: &str, ) -> Result, JobError> { let mut new_metadata = metadata.clone(); - let attempt = get_u64_from_metadata(metadata, key)?; + let attempt = get_u64_from_metadata(metadata, key).map_err(|e| JobError::Other(OtherError(e)))?; let incremented_value = attempt.checked_add(1); incremented_value.ok_or_else(|| JobError::KeyOutOfBounds { key: key.to_string() })?; new_metadata.insert(key.to_string(), incremented_value.unwrap().to_string()); diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index f886bf90..c9c38d2b 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -13,10 +13,10 @@ use uuid::Uuid; use super::constants::JOB_METADATA_CAIRO_PIE_PATH_KEY; use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; -use super::{Job, JobError}; +use super::{Job, JobError, OtherError}; use crate::config::Config; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum ProvingError { #[error("Cairo PIE path is not specified - prover job #{internal_id:?}")] CairoPIEWrongPath { internal_id: String }, @@ -25,7 +25,7 @@ pub enum ProvingError { CairoPIENotReadable, #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), + Other(#[from] OtherError), } pub struct ProvingJob; @@ -68,15 +68,20 @@ impl Job for ProvingJob { .prover_client() .submit_task(Task::CairoPie(cairo_pie)) .await - .wrap_err("Prover Client Error".to_string())?; + .wrap_err("Prover Client Error".to_string()) + .map_err(|e| JobError::Other(OtherError(e)))?; Ok(external_id) } async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result { - let task_id: String = job.external_id.unwrap_string()?.into(); - let task_status = - config.prover_client().get_task_status(&task_id).await.wrap_err("Prover Client Error".to_string())?; + let task_id: String = job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?.into(); + let task_status = config + .prover_client() + .get_task_status(&task_id) + .await + .wrap_err("Prover Client Error".to_string()) + .map_err(|e| JobError::Other(OtherError(e)))?; match task_status { TaskStatus::Processing => Ok(JobVerificationStatus::Pending), diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 78ec70e3..f9515a40 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -16,7 +16,7 @@ use super::constants::{ JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX, JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO, JOB_PROCESS_ATTEMPT_METADATA_KEY, }; -use super::JobError; +use super::{JobError, OtherError}; use crate::config::{config, Config}; use crate::constants::SNOS_OUTPUT_FILE_NAME; @@ -25,7 +25,7 @@ use crate::jobs::state_update_job::utils::fetch_blob_data_for_block; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::Job; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum StateUpdateError { #[error("Block numbers list should not be empty.")] EmptyBlockNumberList, @@ -64,7 +64,7 @@ pub enum StateUpdateError { UseKZGDaError { block_no: u64 }, #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), + Other(#[from] OtherError), } pub struct StateUpdateJob; @@ -118,7 +118,9 @@ impl Job for StateUpdateJob { self.insert_attempts_into_metadata(job, &attempt_no, &sent_tx_hashes); - StateUpdateError::Other(eyre!("Block #{block_no} - Error occurred during the state update: {e}")) + StateUpdateError::Other(OtherError(eyre!( + "Block #{block_no} - Error occurred during the state update: {e}" + ))) })?; sent_tx_hashes.push(tx_hash); } @@ -153,7 +155,8 @@ impl Job for StateUpdateJob { let settlement_client = config.settlement_client(); for (tx_hash, block_no) in tx_hashes.iter().zip(block_numbers.iter()) { - let tx_inclusion_status = settlement_client.verify_tx_inclusion(tx_hash).await?; + let tx_inclusion_status = + settlement_client.verify_tx_inclusion(tx_hash).await.map_err(|e| JobError::Other(OtherError(e)))?; match tx_inclusion_status { SettlementVerificationStatus::Rejected(_) => { job.metadata.insert(JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO.into(), block_no.to_string()); @@ -161,8 +164,14 @@ impl Job for StateUpdateJob { } // If the tx is still pending, we wait for it to be finalized and check again the status. SettlementVerificationStatus::Pending => { - settlement_client.wait_for_tx_finality(tx_hash).await?; - let new_status = settlement_client.verify_tx_inclusion(tx_hash).await?; + settlement_client + .wait_for_tx_finality(tx_hash) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + let new_status = settlement_client + .verify_tx_inclusion(tx_hash) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; match new_status { SettlementVerificationStatus::Rejected(_) => { job.metadata @@ -181,7 +190,8 @@ impl Job for StateUpdateJob { // verify that the last settled block is indeed the one we expect to be let expected_last_block_number = block_numbers.last().ok_or_else(|| StateUpdateError::EmptyBlockNumberList)?; - let out_last_block_number = settlement_client.get_last_settled_block().await?; + let out_last_block_number = + settlement_client.get_last_settled_block().await.map_err(|e| JobError::Other(OtherError(e)))?; let block_status = if out_last_block_number == *expected_last_block_number { SettlementVerificationStatus::Verified } else { @@ -224,7 +234,8 @@ impl StateUpdateJob { .split(',') .map(|block_no| block_no.parse::()) .collect::, _>>() - .map_err(|e| eyre!("Block numbers to settle list is not correctly formatted: {e}"))?; + .map_err(|e| eyre!("Block numbers to settle list is not correctly formatted: {e}")) + .map_err(|e| JobError::Other(OtherError(e)))?; Ok(block_numbers) } @@ -240,7 +251,8 @@ impl StateUpdateJob { Err(StateUpdateError::UnsortedBlockNumbers)?; } // Check for gap between the last settled block and the first block to settle - let last_settled_block: u64 = config.settlement_client().get_last_settled_block().await?; + let last_settled_block: u64 = + config.settlement_client().get_last_settled_block().await.map_err(|e| JobError::Other(OtherError(e)))?; if last_settled_block + 1 != block_numbers[0] { Err(StateUpdateError::GapBetweenFirstAndLastBlock)?; } @@ -258,10 +270,13 @@ impl StateUpdateJob { let last_tx_hash_executed = if snos.use_kzg_da == Felt252::ZERO { unimplemented!("update_state_for_block not implemented as of now for calldata DA.") } else if snos.use_kzg_da == Felt252::ONE { - let blob_data = fetch_blob_data_for_block(block_no).await?; + let blob_data = fetch_blob_data_for_block(block_no).await.map_err(|e| JobError::Other(OtherError(e)))?; // Sending update_state transaction from the settlement client - settlement_client.update_state_with_blobs(vec![], blob_data).await? + settlement_client + .update_state_with_blobs(vec![], blob_data) + .await + .map_err(|e| JobError::Other(OtherError(e)))? } else { Err(StateUpdateError::UseKZGDaError { block_no })? }; diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 4c3160fb..e881d2ef 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -10,7 +10,7 @@ use tracing::log; use uuid::Uuid; use crate::config::config; -use crate::jobs::{handle_job_failure, process_job, verify_job, JobError}; +use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherError}; pub const JOB_PROCESSING_QUEUE: &str = "madara_orchestrator_job_processing_queue"; pub const JOB_VERIFICATION_QUEUE: &str = "madara_orchestrator_job_verification_queue"; @@ -19,7 +19,7 @@ pub const JOB_HANDLE_FAILURE_QUEUE: &str = "madara_orchestrator_job_handle_failu use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum ConsumptionError { #[error("Failed to consume message from queue, error {error_msg:?}")] FailedToConsumeFromQueue { error_msg: String }, @@ -28,7 +28,7 @@ pub enum ConsumptionError { FailedToHandleJob { job_id: Uuid, error_msg: String }, #[error("Other error: {0}")] - Other(#[from] color_eyre::eyre::Error), + Other(#[from] OtherError), } #[derive(Debug, Serialize, Deserialize)] @@ -62,13 +62,21 @@ where return Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }); } }; - let job_message: Option = delivery.payload_serde_json().wrap_err("Payload Serde Error ")?; + let job_message: Option = delivery + .payload_serde_json() + .wrap_err("Payload Serde Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; match job_message { Some(job_message) => { log::info!("Handling job with id {:?} for queue {:?}", job_message.id, queue); match handler(job_message.id).await { - Ok(_) => delivery.ack().await.map_err(|(e, _)| e).wrap_err("Queue Error ")?, + Ok(_) => delivery + .ack() + .await + .map_err(|(e, _)| e) + .wrap_err("Queue Error") + .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?, Err(e) => { log::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e); @@ -107,7 +115,7 @@ macro_rules! spawn_consumer { }; } -pub async fn init_consumers() -> Result<()> { +pub async fn init_consumers() -> Result<(), JobError> { spawn_consumer!(JOB_PROCESSING_QUEUE.to_string(), process_job); spawn_consumer!(JOB_VERIFICATION_QUEUE.to_string(), verify_job); spawn_consumer!(JOB_HANDLE_FAILURE_QUEUE.to_string(), handle_job_failure); diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 876ab050..3cf8a31b 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -38,7 +38,6 @@ pub const X_0_FILE_NAME: &str = "x_0.txt"; // ================= Exhaustive tests (with minimum mock) ================= #[rstest] -#[should_panic(expected = "Could not find current attempt number.")] #[tokio::test] async fn test_process_job_attempt_not_present_fails() { TestConfigBuilder::new().build().await; @@ -46,7 +45,8 @@ async fn test_process_job_attempt_not_present_fails() { let mut job = default_job_item(); let config = config().await; let state_update_job = StateUpdateJob {}; - let _ = state_update_job.process_job(&config, &mut job).await; + let res = state_update_job.process_job(&config, &mut job).await.unwrap_err(); + assert_eq!(res, JobError::StateUpdateJobError(StateUpdateError::AttemptNumberNotFound)); } #[rstest] From 8fa7760ff80d1074064b5772a3d2cf634c753e08 Mon Sep 17 00:00:00 2001 From: Arun Jangra Date: Fri, 16 Aug 2024 11:42:04 +0530 Subject: [PATCH 27/27] feat : fixed tests --- crates/orchestrator/src/tests/jobs/state_update_job/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs index 359c63cc..b9b466ee 100644 --- a/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/tests/jobs/state_update_job/mod.rs @@ -252,7 +252,6 @@ async fn process_job_invalid_inputs_errors(#[case] block_numbers_to_settle: Stri #[rstest] #[tokio::test] -#[should_panic(expected = "Gap detected between the first block to settle and the last one settle")] async fn process_job_invalid_input_gap_panics() { let server = MockServer::start(); let mut settlement_client = MockSettlementClient::new();