Skip to content

Commit

Permalink
atomic create job (#152)
Browse files Browse the repository at this point in the history
* make create job atomic

* changelog

* comment

* fix prettier

* fix tests and upsert logic

* update: lint fix

---------

Co-authored-by: Heemank Verma <heemankv@gmail.com>
  • Loading branch information
apoorvsadana and heemankv authored Oct 11, 2024
1 parent 3b04d66 commit 2b957be
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- made create_job atomic to avoid race conditions
- handle jobs in tokio tasks
- handle workers in tokio tasks
- cleaned .env.example and .env.test files
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use utils::settings::Settings;
use uuid::Uuid;

use crate::jobs::types::{JobItem, JobStatus, JobType};
use crate::jobs::JobError;

/// MongoDB
pub mod mongodb;
Expand All @@ -24,7 +25,7 @@ pub mod mongodb;
#[automock]
#[async_trait]
pub trait Database: Send + Sync {
async fn create_job(&self, job: JobItem) -> Result<JobItem>;
async fn create_job(&self, job: JobItem) -> Result<JobItem, JobError>;
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>>;
async fn update_job(&self, current_job: &JobItem, updates: crate::jobs::types::JobItemUpdates) -> Result<()>;
Expand Down
95 changes: 54 additions & 41 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ::utils::settings::Settings;
use async_std::stream::StreamExt;
use async_trait::async_trait;
use chrono::{SubsecRound, Utc};
Expand All @@ -7,14 +8,16 @@ use futures::TryStreamExt;
use mongodb::bson::{doc, Bson, Document};
use mongodb::options::{ClientOptions, FindOneOptions, FindOptions, ServerApi, ServerApiVersion, UpdateOptions};
use mongodb::{bson, Client, Collection};
use utils::settings::Settings;
use utils::ToDocument;
use uuid::Uuid;

use crate::database::mongodb::config::MongoDbConfig;
use crate::database::{Database, DatabaseConfig};
use crate::jobs::types::{JobItem, JobItemUpdates, JobStatus, JobType};
use crate::jobs::JobError;

pub mod config;
mod utils;

pub struct MongoDb {
client: Client,
Expand All @@ -39,40 +42,6 @@ impl MongoDb {
Self { client, database_name: mongo_db_settings.database_name }
}

pub fn to_document(&self, current_job: &JobItem, updates: &JobItemUpdates) -> Result<Document> {
let mut doc = Document::new();

// Serialize the struct to BSON
let bson = bson::to_bson(updates)?;

match bson {
// If serialization was successful and it's a document
Bson::Document(bson_doc) => {
let mut is_update_available: bool = false;
// Add non-null fields to our document
for (key, value) in bson_doc.iter() {
if !matches!(value, Bson::Null) {
is_update_available = true;
doc.insert(key, value.clone());
}
}

// checks if is_update_available is still false.
// if it is still false that means there's no field to be updated
// and the call is likely a false call, so raise an error.
if !is_update_available {
return Err(eyre!("No field to be updated, likely a false call"));
}

// Add additional fields that are always updated
doc.insert("version", Bson::Int32(current_job.version + 1));
doc.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into()));
}
_ => return Err(eyre!("Bson object is not a document.")),
}
Ok(doc)
}

/// Mongodb client uses Arc internally, reducing the cost of clone.
/// Directly using clone is not recommended for libraries not using Arc internally.
pub fn client(&self) -> Client {
Expand All @@ -87,10 +56,37 @@ impl MongoDb {
#[async_trait]
impl Database for MongoDb {
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn create_job(&self, job: JobItem) -> Result<JobItem> {
self.get_job_collection().insert_one(&job, None).await?;
tracing::debug!(job_id = %job.id, category = "db_call", "Job created successfully");
Ok(job)
async fn create_job(&self, job: JobItem) -> Result<JobItem, JobError> {
let options = UpdateOptions::builder().upsert(true).build();

let updates = job.to_document().map_err(|e| JobError::Other(e.into()))?;
let job_type =
updates.get("job_type").ok_or(eyre!("Job type not found")).map_err(|e| JobError::Other(e.into()))?;
let internal_id =
updates.get("internal_id").ok_or(eyre!("Internal ID not found")).map_err(|e| JobError::Other(e.into()))?;

// Filter using only two fields
let filter = doc! {
"job_type": job_type.clone(),
"internal_id": internal_id.clone()
};

let updates = doc! {
// only set when the document is inserted for the first time
"$setOnInsert": updates
};

let result = self
.get_job_collection()
.update_one(filter, updates, options)
.await
.map_err(|e| JobError::Other(e.to_string().into()))?;

if result.matched_count == 0 {
Ok(job)
} else {
Err(JobError::JobAlreadyExists { internal_id: job.internal_id, job_type: job.job_type })
}
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
Expand Down Expand Up @@ -121,10 +117,27 @@ impl Database for MongoDb {
};
let options = UpdateOptions::builder().upsert(false).build();

let values = self.to_document(current_job, &updates)?;
let mut updates = updates.to_document()?;

// remove null values from the updates
let mut non_null_updates = Document::new();
updates.iter_mut().for_each(|(k, v)| {
if v != &Bson::Null {
non_null_updates.insert(k, v);
}
});

// throw an error if there's no field to be updated
if non_null_updates.is_empty() {
return Err(eyre!("No field to be updated, likely a false call"));
}

// Add additional fields that are always updated
non_null_updates.insert("version", Bson::Int32(current_job.version + 1));
non_null_updates.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into()));

let update = doc! {
"$set": values
"$set": non_null_updates
};

let result = self.get_job_collection().update_one(filter, update, options).await?;
Expand Down
54 changes: 54 additions & 0 deletions crates/orchestrator/src/database/mongodb/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use color_eyre::eyre::{eyre, Result};
use mongodb::bson::{Bson, Document};
use serde::Serialize;

pub trait ToDocument {
fn to_document(&self) -> Result<Document>;
}

impl<T: Serialize> ToDocument for T {
fn to_document(&self) -> Result<Document> {
match mongodb::bson::to_bson(self)? {
Bson::Document(document) => Ok(document),
_ => Err(eyre!("Failed to convert to Document")),
}
}
}

#[cfg(test)]
mod tests {
use mongodb::bson::{Bson, Document};
use serde::{Deserialize, Serialize};

use super::*;

#[derive(Serialize, Deserialize)]
struct TestStruct {
id: i32,
name: String,
optional_field: Option<String>,
}

#[test]
fn test_to_document() {
let test_struct = TestStruct { id: 1, name: "Test".to_string(), optional_field: None };

let document = test_struct.to_document().expect("Failed to convert to Document");

let mut expected_document = Document::new();
expected_document.insert("id", 1);
expected_document.insert("name", "Test");
expected_document.insert("optional_field", Bson::Null);

assert_eq!(document, expected_document);
}

#[test]
fn test_to_document_fail() {
let non_document_value = 1;

let result = non_document_value.to_document();

assert!(result.is_err() && result.unwrap_err().to_string().contains("Failed to convert to Document"));
}
}
3 changes: 2 additions & 1 deletion crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ pub async fn create_job(
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

// this is technicaly a redundant check, we've another check inside `create_job`
if existing_job.is_some() {
return Err(JobError::JobAlreadyExists { internal_id, job_type });
}

let job_handler = factory::get_job_handler(&job_type).await;
let job_item = job_handler.create_job(config.clone(), internal_id.clone(), metadata).await?;
config.database().create_job(job_item.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;
config.database().create_job(job_item.clone()).await?;

add_job_to_process_queue(job_item.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;

Expand Down
32 changes: 31 additions & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use chrono::{SubsecRound, Utc};
use rstest::*;
use uuid::Uuid;

use crate::jobs::increment_key_in_metadata;
use crate::jobs::types::{ExternalId, JobItem, JobItemUpdates, JobStatus, JobType};
use crate::jobs::{increment_key_in_metadata, JobError};
use crate::tests::config::{ConfigType, TestConfigBuilder};

#[rstest]
Expand Down Expand Up @@ -46,6 +46,36 @@ async fn database_create_job_works() {
assert_eq!(get_job_3, job_vec[2].clone());
}

/// Tests for `create_job` operation in database trait.
/// Creates a job with the same job type and internal id as an existing job.
/// Should fail.
#[rstest]
#[tokio::test]
async fn database_create_job_with_job_exists_fails() {
let services: crate::tests::config::TestConfigBuilderReturns =
TestConfigBuilder::new().configure_database(ConfigType::Actual).build().await;
let config = services.config;
let database_client = config.database();

let job_one = build_job_item(JobType::ProofCreation, JobStatus::Created, 1);

// same job type and internal id
let job_two = build_job_item(JobType::ProofCreation, JobStatus::LockedForProcessing, 1);

database_client.create_job(job_one).await.unwrap();

let result = database_client.create_job(job_two).await;

assert_eq!(
result.unwrap_err(),
JobError::JobAlreadyExists { internal_id: "1".to_string(), job_type: JobType::ProofCreation }
);
// fetch job to see status wasn't updated
let fetched_job =
database_client.get_job_by_internal_id_and_type("1", &JobType::ProofCreation).await.unwrap().unwrap();
assert_eq!(fetched_job.status, JobStatus::Created);
}

/// Test for `get_jobs_without_successor` operation in database trait.
/// Creates jobs in the following sequence :
///
Expand Down
2 changes: 1 addition & 1 deletion migrations/00000000000000-init.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module.exports = {
.collection("jobs")
.createIndexes([
{ key: { id: 1 } },
{ key: { job_type: 1, internal_id: -1 } },
{ key: { job_type: 1, internal_id: -1 }, unique: true },
{ key: { job_type: 1, status: 1, internal_id: -1 } },
{ key: { status: 1 } },
]);
Expand Down

0 comments on commit 2b957be

Please sign in to comment.