From 04a89d5968dfba4c9ce8a11736d1b747a5f22b94 Mon Sep 17 00:00:00 2001 From: akhercha Date: Mon, 1 Jul 2024 16:36:54 +0200 Subject: [PATCH] feat(state_update_job): Updating job --- crates/orchestrator/src/database/mod.rs | 4 ++-- .../orchestrator/src/database/mongodb/mod.rs | 22 ++++++++++--------- crates/orchestrator/src/jobs/mod.rs | 7 ++---- .../src/jobs/state_update_job/mod.rs | 13 ++--------- 4 files changed, 18 insertions(+), 28 deletions(-) diff --git a/crates/orchestrator/src/database/mod.rs b/crates/orchestrator/src/database/mod.rs index c5480b91..f59a31e9 100644 --- a/crates/orchestrator/src/database/mod.rs +++ b/crates/orchestrator/src/database/mod.rs @@ -26,14 +26,14 @@ pub trait Database: Send + Sync { async fn create_job(&self, job: JobItem) -> Result; async fn get_job_by_id(&self, id: Uuid) -> Result>; async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result>; - async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>; - async fn update_external_id_and_status_and_metadata( + async fn update_job( &self, job: &JobItem, external_id: String, new_status: JobStatus, metadata: HashMap, ) -> Result<()>; + async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>; async fn update_metadata(&self, job: &JobItem, metadata: HashMap) -> Result<()>; async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result>; async fn get_jobs_without_successor( diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index f0ea886a..99fafca4 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -83,28 +83,30 @@ impl Database for MongoDb { Ok(self.get_job_collection().find_one(filter, None).await?) } - async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()> { + async fn update_job( + &self, + job: &JobItem, + external_id: String, + new_status: JobStatus, + metadata: HashMap, + ) -> Result<()> { let update = doc! { "$set": { + "internal_id": &job.internal_id, + "job_type": mongodb::bson::to_bson(&job.job_type)?, "status": mongodb::bson::to_bson(&new_status)?, + "external_id": external_id, + "metadata": mongodb::bson::to_document(&metadata)? } }; self.update_job_optimistically(job, update).await?; Ok(()) } - async fn update_external_id_and_status_and_metadata( - &self, - job: &JobItem, - external_id: String, - new_status: JobStatus, - metadata: HashMap, - ) -> Result<()> { + async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()> { let update = doc! { "$set": { "status": mongodb::bson::to_bson(&new_status)?, - "external_id": external_id, - "metadata": mongodb::bson::to_document(&metadata)? } }; self.update_job_optimistically(job, update).await?; diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index c11d06b7..cd60c185 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -95,12 +95,9 @@ pub async fn process_job(id: Uuid) -> Result<()> { let job_handler = get_job_handler(&job.job_type); let external_id = job_handler.process_job(config.as_ref(), &mut job).await?; - let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?; - config - .database() - .update_external_id_and_status_and_metadata(&job, external_id, JobStatus::PendingVerification, metadata) - .await?; + + config.database().update_job(&job, external_id.clone(), JobStatus::PendingVerification, metadata).await?; add_job_to_verification_queue(job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds())) .await?; diff --git a/crates/orchestrator/src/jobs/state_update_job/mod.rs b/crates/orchestrator/src/jobs/state_update_job/mod.rs index 9062d5e1..8aecb0d2 100644 --- a/crates/orchestrator/src/jobs/state_update_job/mod.rs +++ b/crates/orchestrator/src/jobs/state_update_job/mod.rs @@ -79,17 +79,8 @@ impl Job for StateUpdateJob { let new_attempt_metadata_key = format!("{}{}", JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX, attempt_no); job.metadata.insert(new_attempt_metadata_key, sent_tx_hashes.join(",")); - if sent_tx_hashes.len() == block_numbers.len() { - // Return the last block number that should be settled as external_id - // (Safe unwrap since block numbers have been validated) - Ok(block_numbers.last().unwrap().to_string()) - } else { - return Err(eyre!( - "[Attempt #{}] Not enough settlement TX sent (state update job #{})", - attempt_no, - job.internal_id - )); - } + // external_id returned corresponds to the last block number settled + Ok(block_numbers.last().unwrap().to_string()) } /// Returns the status of the passed job.