Skip to content

Commit

Permalink
feat(state_update_job): Updating job
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Jul 1, 2024
1 parent 9da6baa commit 04a89d5
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 28 deletions.
4 changes: 2 additions & 2 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ pub trait Database: Send + Sync {
async fn create_job(&self, job: JobItem) -> Result<JobItem>;
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_external_id_and_status_and_metadata(
async fn update_job(
&self,
job: &JobItem,
external_id: String,
new_status: JobStatus,
metadata: HashMap<String, String>,
) -> Result<()>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
Expand Down
22 changes: 12 additions & 10 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,30 @@ impl Database for MongoDb {
Ok(self.get_job_collection().find_one(filter, None).await?)
}

async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()> {
async fn update_job(
&self,
job: &JobItem,
external_id: String,
new_status: JobStatus,
metadata: HashMap<String, String>,
) -> Result<()> {
let update = doc! {
"$set": {
"internal_id": &job.internal_id,
"job_type": mongodb::bson::to_bson(&job.job_type)?,
"status": mongodb::bson::to_bson(&new_status)?,
"external_id": external_id,
"metadata": mongodb::bson::to_document(&metadata)?
}
};
self.update_job_optimistically(job, update).await?;
Ok(())
}

async fn update_external_id_and_status_and_metadata(
&self,
job: &JobItem,
external_id: String,
new_status: JobStatus,
metadata: HashMap<String, String>,
) -> Result<()> {
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()> {
let update = doc! {
"$set": {
"status": mongodb::bson::to_bson(&new_status)?,
"external_id": external_id,
"metadata": mongodb::bson::to_document(&metadata)?
}
};
self.update_job_optimistically(job, update).await?;
Expand Down
7 changes: 2 additions & 5 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,9 @@ pub async fn process_job(id: Uuid) -> Result<()> {

let job_handler = get_job_handler(&job.job_type);
let external_id = job_handler.process_job(config.as_ref(), &mut job).await?;

let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;
config
.database()
.update_external_id_and_status_and_metadata(&job, external_id, JobStatus::PendingVerification, metadata)
.await?;

config.database().update_job(&job, external_id.clone(), JobStatus::PendingVerification, metadata).await?;

add_job_to_verification_queue(job.id, Duration::from_secs(job_handler.verification_polling_delay_seconds()))
.await?;
Expand Down
13 changes: 2 additions & 11 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,8 @@ impl Job for StateUpdateJob {
let new_attempt_metadata_key = format!("{}{}", JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX, attempt_no);
job.metadata.insert(new_attempt_metadata_key, sent_tx_hashes.join(","));

if sent_tx_hashes.len() == block_numbers.len() {
// Return the last block number that should be settled as external_id
// (Safe unwrap since block numbers have been validated)
Ok(block_numbers.last().unwrap().to_string())
} else {
return Err(eyre!(
"[Attempt #{}] Not enough settlement TX sent (state update job #{})",
attempt_no,
job.internal_id
));
}
// external_id returned corresponds to the last block number settled
Ok(block_numbers.last().unwrap().to_string())
}

/// Returns the status of the passed job.
Expand Down

0 comments on commit 04a89d5

Please sign in to comment.