Skip to content

Commit

Permalink
Merge branch 'main' into env_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv authored Oct 9, 2024
2 parents 2f0a505 + 63e4fa1 commit 78a29f8
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 126 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- Simplified Update_Job for Database.
- Simplified otel setup.
- Added new_with_settings to SharpClient.
- Calculate root hash logic and added a simple test for it.
Expand Down
6 changes: 1 addition & 5 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use ::mongodb::bson::doc;
use async_trait::async_trait;
use color_eyre::Result;
Expand Down Expand Up @@ -29,9 +27,7 @@ pub trait Database: Send + Sync {
async fn create_job(&self, job: JobItem) -> Result<JobItem>;
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>>;
async fn update_job(&self, job: &JobItem) -> Result<()>;
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn update_job(&self, current_job: &JobItem, updates: crate::jobs::types::JobItemUpdates) -> Result<()>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
Expand Down
116 changes: 49 additions & 67 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use async_std::stream::StreamExt;
use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
Expand All @@ -14,7 +12,7 @@ use uuid::Uuid;

use crate::database::mongodb::config::MongoDbConfig;
use crate::database::{Database, DatabaseConfig};
use crate::jobs::types::{JobItem, JobStatus, JobType};
use crate::jobs::types::{JobItem, JobItemUpdates, JobStatus, JobType};

pub mod config;

Expand Down Expand Up @@ -42,6 +40,40 @@ impl MongoDb {
Self { client }
}

pub fn to_document(&self, current_job: &JobItem, updates: &JobItemUpdates) -> Result<Document> {
let mut doc = Document::new();

// Serialize the struct to BSON
let bson = bson::to_bson(updates)?;

match bson {
// If serialization was successful and it's a document
Bson::Document(bson_doc) => {
let mut is_update_available: bool = false;
// Add non-null fields to our document
for (key, value) in bson_doc.iter() {
if !matches!(value, Bson::Null) {
is_update_available = true;
doc.insert(key, value.clone());
}
}

// checks if is_update_available is still false.
// if it is still false that means there's no field to be updated
// and the call is likely a false call, so raise an error.
if !is_update_available {
return Err(eyre!("No field to be updated, likely a false call"));
}

// Add additional fields that are always updated
doc.insert("version", Bson::Int32(current_job.version + 1));
doc.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into()));
}
_ => return Err(eyre!("Bson object is not a document.")),
}
Ok(doc)
}

/// Mongodb client uses Arc internally, reducing the cost of clone.
/// Directly using clone is not recommended for libraries not using Arc internally.
pub fn client(&self) -> Client {
Expand All @@ -51,47 +83,6 @@ impl MongoDb {
fn get_job_collection(&self) -> Collection<JobItem> {
self.client.database("orchestrator").collection("jobs")
}

/// Updates the job in the database optimistically. This means that the job is updated only if
/// the version of the job in the database is the same as the version of the job passed in.
/// If the version is different, the update fails.
#[tracing::instrument(skip(self, update), fields(function_type = "db_call"))]
async fn update_job_optimistically(&self, current_job: &JobItem, update: Document) -> Result<()> {
let filter = doc! {
"id": current_job.id,
"version": current_job.version,
};
let options = UpdateOptions::builder().upsert(false).build();
let result = self.get_job_collection().update_one(filter, update, options).await?;
if result.modified_count == 0 {
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}
self.post_job_update(current_job).await?;

Ok(())
}

// TODO : remove this function
// Do this process in single db transaction.
/// To update the document version
#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
async fn post_job_update(&self, current_job: &JobItem) -> Result<()> {
let filter = doc! {
"id": current_job.id,
};
let combined_update = doc! {
"$inc": { "version": 1 },
"$set" : {
"updated_at": Utc::now().round_subsecs(0)
}
};
let options = UpdateOptions::builder().upsert(false).build();
let result = self.get_job_collection().update_one(filter, combined_update, options).await?;
if result.modified_count == 0 {
return Err(eyre!("Failed to update job. version"));
}
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -120,34 +111,25 @@ impl Database for MongoDb {
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
async fn update_job(&self, job: &JobItem) -> Result<()> {
let job_doc = bson::to_document(job)?;
let update = doc! {
"$set": job_doc
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<()> {
// Filters to search for the job
let filter = doc! {
"id": current_job.id,
"version": current_job.version,
};
self.update_job_optimistically(job, update).await?;
Ok(())
}
let options = UpdateOptions::builder().upsert(false).build();

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()> {
let update = doc! {
"$set": {
"status": mongodb::bson::to_bson(&new_status)?,
}
};
self.update_job_optimistically(job, update).await?;
Ok(())
}
let values = self.to_document(current_job, &updates)?;

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()> {
let update = doc! {
"$set": {
"metadata": mongodb::bson::to_document(&metadata)?
}
"$set": values
};
self.update_job_optimistically(job, update).await?;

let result = self.get_job_collection().update_one(filter, update, options).await?;
if result.modified_count == 0 {
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}

Ok(())
}

Expand Down
60 changes: 43 additions & 17 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use snos_job::error::FactError;
use snos_job::SnosError;
use state_update_job::StateUpdateError;
use tracing::log;
use types::JobItemUpdates;
use uuid::Uuid;

use crate::config::Config;
Expand Down Expand Up @@ -194,22 +195,30 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
// outdated
config
.database()
.update_job_status(&job, JobStatus::LockedForProcessing)
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::LockedForProcessing).build())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.clone(), &mut job).await?;
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

// Fetching the job again because update status above will update the job version
let mut job_updated = get_job(id, config.clone()).await?;

job_updated.external_id = external_id.into();
job_updated.status = JobStatus::PendingVerification;
job_updated.metadata = metadata;
let mut job_cloned = job.clone();
job_cloned.version += 1;

config.database().update_job(&job_updated).await.map_err(|e| JobError::Other(OtherError(e)))?;
// Fetching the job again because update status above will update the job version
config
.database()
.update_job(
&job_cloned,
JobItemUpdates::new()
.update_status(JobStatus::PendingVerification)
.update_metadata(metadata)
.update_external_id(external_id.into())
.build(),
)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

add_job_to_verification_queue(
job.id,
Expand Down Expand Up @@ -261,7 +270,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
JobVerificationStatus::Verified => {
config
.database()
.update_job_status(&job, JobStatus::Completed)
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::Completed).build())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
}
Expand All @@ -270,7 +279,17 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
new_job.metadata.insert("error".to_string(), e);
new_job.status = JobStatus::VerificationFailed;

config.database().update_job(&new_job).await.map_err(|e| JobError::Other(OtherError(e)))?;
config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::VerificationFailed)
.update_metadata(new_job.metadata)
.build(),
)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

log::error!("Verification failed for job with id {:?}. Cannot verify.", id);

Expand All @@ -295,13 +314,19 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
log::info!("Verification attempts exceeded for job {}. Marking as timed out.", job.id);
config
.database()
.update_job_status(&job, JobStatus::VerificationTimeout)
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::VerificationTimeout).build())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
return Ok(());
}
let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?;
config.database().update_metadata(&job, metadata).await.map_err(|e| JobError::Other(OtherError(e)))?;

config
.database()
.update_job(&job, JobItemUpdates::new().update_metadata(metadata).build())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

add_job_to_verification_queue(
job.id,
Duration::from_secs(job_handler.verification_polling_delay_seconds()),
Expand All @@ -327,7 +352,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
/// Logs error if the job status `Completed` is existing on DL queue.
#[tracing::instrument(skip(config), fields(job_status, job_type))]
pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let mut job = get_job(id, config.clone()).await?.clone();
let job = get_job(id, config.clone()).await?.clone();
let mut metadata = job.metadata.clone();

tracing::Span::current().record("job_status", format!("{:?}", job.status));
Expand All @@ -344,10 +369,11 @@ pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), Job
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
job.metadata = metadata;
job.status = JobStatus::Failed;

config.database().update_job(&job).await.map_err(|e| JobError::Other(OtherError(e)))?;
config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(metadata).build())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

Ok(())
}
Expand Down
52 changes: 52 additions & 0 deletions crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,58 @@ pub struct JobItem {
pub updated_at: DateTime<Utc>,
}

/// Defining a structure that contains the changes to be made in the job object,
/// id and created at are not allowed to be changed
// version and updated_at will always be updated when this object updates the job
#[derive(Serialize, Debug)]
pub struct JobItemUpdates {
pub internal_id: Option<String>,
pub job_type: Option<JobType>,
pub status: Option<JobStatus>,
pub external_id: Option<ExternalId>,
pub metadata: Option<HashMap<String, String>>,
}

/// implements only needed singular changes
impl Default for JobItemUpdates {
fn default() -> Self {
Self::new()
}
}

impl JobItemUpdates {
pub fn new() -> Self {
JobItemUpdates { internal_id: None, job_type: None, status: None, external_id: None, metadata: None }
}

pub fn update_internal_id(mut self, internal_id: String) -> JobItemUpdates {
self.internal_id = Some(internal_id);
self
}

pub fn update_job_type(mut self, job_type: JobType) -> JobItemUpdates {
self.job_type = Some(job_type);
self
}

pub fn update_status(mut self, status: JobStatus) -> JobItemUpdates {
self.status = Some(status);
self
}
pub fn update_external_id(mut self, external_id: ExternalId) -> JobItemUpdates {
self.external_id = Some(external_id);
self
}
pub fn update_metadata(mut self, metadata: HashMap<String, String>) -> JobItemUpdates {
self.metadata = Some(metadata);
self
}
// creating another type JobItemUpdatesBuilder would be an overkill
pub fn build(self) -> JobItemUpdates {
self
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JobVerificationStatus {
#[allow(dead_code)]
Expand Down
Loading

0 comments on commit 78a29f8

Please sign in to comment.