Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move failed jobs to the failed state #159

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- update_job returns the updated job item
- made create_job atomic to avoid race conditions
- handle jobs in tokio tasks
- handle workers in tokio tasks
Expand All @@ -63,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Fixed

- all failed jobs should move to failed state
- Fixes all unwraps() in code to improve error logging
- Simplified Update_Job for Database.
- Simplified otel setup.
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ tracing-core = { workspace = true, default-features = false }
tracing-opentelemetry = "0.26.0"
tracing-subscriber = { workspace = true, features = ["env-filter"] }


[features]
default = ["ethereum", "with_mongodb", "with_sqs"]
ethereum = ["ethereum-da-client"]
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub trait Database: Send + Sync {
async fn create_job(&self, job: JobItem) -> Result<JobItem, JobError>;
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>>;
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>>;
async fn update_job(&self, current_job: &JobItem, updates: crate::jobs::types::JobItemUpdates) -> Result<()>;
async fn update_job(&self, current_job: &JobItem, updates: crate::jobs::types::JobItemUpdates) -> Result<JobItem>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_jobs_without_successor(
&self,
Expand Down
26 changes: 16 additions & 10 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use color_eyre::eyre::eyre;
use color_eyre::Result;
use futures::TryStreamExt;
use mongodb::bson::{doc, Bson, Document};
use mongodb::options::{ClientOptions, FindOneOptions, FindOptions, ServerApi, ServerApiVersion, UpdateOptions};
use mongodb::options::{
ClientOptions, FindOneAndUpdateOptions, FindOneOptions, FindOptions, ReturnDocument, ServerApi, ServerApiVersion,
UpdateOptions,
};
use mongodb::{bson, Client, Collection};
use utils::ToDocument;
use uuid::Uuid;
Expand Down Expand Up @@ -109,13 +112,13 @@ impl Database for MongoDb {
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<()> {
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<JobItem> {
// Filters to search for the job
let filter = doc! {
"id": current_job.id,
"version": current_job.version,
};
let options = UpdateOptions::builder().upsert(false).build();
let options = FindOneAndUpdateOptions::builder().upsert(false).return_document(ReturnDocument::After).build();

let mut updates = updates.to_document()?;

Expand All @@ -140,14 +143,17 @@ impl Database for MongoDb {
"$set": non_null_updates
};

let result = self.get_job_collection().update_one(filter, update, options).await?;
if result.modified_count == 0 {
tracing::warn!(job_id = %current_job.id, category = "db_call", "Failed to update job. Job version is likely outdated");
return Err(eyre!("Failed to update job. Job version is likely outdated"));
let result = self.get_job_collection().find_one_and_update(filter, update, options).await?;
match result {
Some(job) => {
tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
Ok(job)
}
None => {
tracing::warn!(job_id = %current_job.id, category = "db_call", "Failed to update job. Job version is likely outdated");
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}
}

tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
Ok(())
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/jobs/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub const JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX: &str = "attempt_tx_hashes_";
pub const JOB_METADATA_STATE_UPDATE_LAST_FAILED_BLOCK_NO: &str = "last_failed_block_no";
pub const JOB_METADATA_SNOS_BLOCK: &str = "block_number_to_run";
pub const JOB_METADATA_SNOS_FACT: &str = "snos_fact";
pub const JOB_METADATA_FAILURE_REASON: &str = "failure_reason";
77 changes: 49 additions & 28 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Context};
use constants::JOB_METADATA_FAILURE_REASON;
use conversion::parse_string;
use da_job::DaError;
use mockall::automock;
Expand Down Expand Up @@ -185,7 +186,7 @@ pub async fn create_job(
/// DB. It then adds the job to the verification queue.
#[tracing::instrument(skip(config), fields(category = "general", job, job_type, internal_id), ret, err)]
pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let mut job = get_job(id, config.clone()).await?;
let job = get_job(id, config.clone()).await?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");

Expand All @@ -209,7 +210,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
// the same job, it would fail to update the job in the database because the version would be
// outdated
tracing::debug!(job_id = ?id, "Updating job status to LockedForProcessing");
config
let mut job = config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::LockedForProcessing).build())
.await
Expand All @@ -220,19 +221,28 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

tracing::debug!(job_id = ?id, job_type = ?job.job_type, "Getting job handler");
let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.clone(), &mut job).await?;
let external_id = match job_handler.process_job(config.clone(), &mut job).await {
Ok(external_id) => {
tracing::debug!(job_id = ?id, "Successfully processed job");
external_id
}
Err(e) => {
Mohiiit marked this conversation as resolved.
Show resolved Hide resolved
// TODO: I think most of the times the errors will not be fixed automatically
// if we just retry. But for some failures like DB issues, it might be possible
// that retrying will work. So we can add a retry logic here to improve robustness.
tracing::error!(job_id = ?id, error = ?e, "Failed to process job");
return move_job_to_failed(&job, config.clone(), format!("Processing failed: {}", e)).await;
}
};
tracing::debug!(job_id = ?id, "Incrementing process attempt count in metadata");
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

let mut job_cloned = job.clone();
job_cloned.version += 1;

// Fetching the job again because update status above will update the job version
tracing::debug!(job_id = ?id, "Updating job status to PendingVerification");
config
.database()
.update_job(
&job_cloned,
&job,
JobItemUpdates::new()
.update_status(JobStatus::PendingVerification)
.update_metadata(metadata)
Expand Down Expand Up @@ -322,21 +332,6 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
new_job.metadata.insert("error".to_string(), e);
new_job.status = JobStatus::VerificationFailed;

config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::VerificationFailed)
.update_metadata(new_job.metadata)
.build(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationFailed");
JobError::Other(OtherError(e))
})?;

let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)
.map_err(|e| JobError::Other(OtherError(e)))?;
if process_attempts < job_handler.max_process_attempts() {
Expand All @@ -345,10 +340,30 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
attempt = process_attempts + 1,
"Verification failed. Retrying job processing"
);
config
.database()
.update_job(
&job,
JobItemUpdates::new()
.update_status(JobStatus::VerificationFailed)
.update_metadata(new_job.metadata)
.build(),
)
.await
.map_err(|e| {
tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationFailed");
JobError::Other(OtherError(e))
})?;
add_job_to_process_queue(job.id, config.clone()).await.map_err(|e| JobError::Other(OtherError(e)))?;
return Ok(());
} else {
tracing::warn!(job_id = ?id, "Max process attempts reached. Job will not be retried");
return move_job_to_failed(
&job,
config.clone(),
format!("Verification rejected. Max process attempts reached: {}", process_attempts),
)
.await;
}
}
JobVerificationStatus::Pending => {
Expand Down Expand Up @@ -408,30 +423,36 @@ pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), Job
let job = get_job(id, config.clone()).await?.clone();
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure started for block");
let mut metadata = job.metadata.clone();

tracing::Span::current().record("job_status", format!("{:?}", job.status));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type));

tracing::debug!(job_id = ?id, job_status = ?job.status, job_type = ?job.job_type, block_no = %internal_id, "Job details for failure handling for block");
let status = job.status.clone().to_string();
move_job_to_failed(&job, config.clone(), format!("Received failure queue message for job with status: {}", status))
.await
}

async fn move_job_to_failed(job: &JobItem, config: Arc<Config>, reason: String) -> Result<(), JobError> {
if job.status == JobStatus::Completed {
tracing::error!(job_id = ?id, job_status = ?job.status, "Invalid state exists on DL queue");
tracing::error!(job_id = ?job.id, job_status = ?job.status, "Invalid state exists on DL queue");
return Ok(());
}
// We assume that a Failure status will only show up if the message is sent twice from a queue
// Can return silently because it's already been processed.
else if job.status == JobStatus::Failed {
tracing::warn!(job_id = ?id, "Job already marked as failed, skipping processing");
tracing::warn!(job_id = ?job.id, "Job already marked as failed, skipping processing");
return Ok(());
}

metadata.insert("last_job_status".to_string(), job.status.to_string());
let mut metadata = job.metadata.clone();
let internal_id = job.internal_id.clone();
metadata.insert(JOB_METADATA_FAILURE_REASON.to_string(), reason);

tracing::debug!(job_id = ?id, "Updating job status to Failed in database");
tracing::debug!(job_id = ?job.id, "Updating job status to Failed in database");
match config
.database()
.update_job(&job, JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(metadata).build())
.update_job(job, JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(metadata).build())
.await
{
Ok(_) => {
Expand Down
7 changes: 0 additions & 7 deletions crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,19 @@ pub enum JobType {
pub enum JobStatus {
/// An acknowledgement that the job has been received by the
/// orchestrator and is waiting to be processed
#[strum(to_string = "Created")]
Created,
/// Some system has taken a lock over the job for processing and no
/// other system to process the job
#[strum(to_string = "Locked for Processing")]
LockedForProcessing,
/// The job has been processed and is pending verification
#[strum(to_string = "Pending Verification")]
PendingVerification,
/// The job has been processed and verified. No other actions needs to be taken
#[strum(to_string = "Completed")]
Completed,
/// The job was processed but the was unable to be verified under the given time
#[strum(to_string = "Verification Timeout")]
VerificationTimeout,
/// The job failed processing
#[strum(to_string = "Verification Failed")]
VerificationFailed,
/// The job failed completing
#[strum(to_string = "Failed")]
Failed,
}

Expand Down
7 changes: 6 additions & 1 deletion crates/orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn database_test_update_job() {
let updated_metadata = increment_key_in_metadata(&metadata, key).unwrap();

let job_cloned = job.clone();
let _ = database_client
let updated_job = database_client
.update_job(
&job_cloned,
JobItemUpdates::new()
Expand All @@ -219,10 +219,15 @@ async fn database_test_update_job() {
.await;

if let Some(job_after_updates_db) = database_client.get_job_by_id(job_id).await.unwrap() {
// check if job is updated
assert_eq!(JobType::DataSubmission, job_after_updates_db.job_type);
assert_eq!(JobStatus::LockedForProcessing, job_after_updates_db.status);
assert_eq!(1, job_after_updates_db.version);
assert_eq!(456.to_string(), job_after_updates_db.internal_id);

// check if value returned by `update_job` is the correct one
// and matches the one in database
assert_eq!(updated_job.unwrap(), job_after_updates_db);
} else {
panic!("Job not found in Database.")
}
Expand Down
54 changes: 50 additions & 4 deletions crates/orchestrator/src/tests/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ use tokio::time::sleep;
use uuid::Uuid;

use super::database::build_job_item;
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::constants::{
JOB_METADATA_FAILURE_REASON, JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY,
};
use crate::jobs::job_handler_factory::mock_factory;
use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::{create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, MockJob};
use crate::jobs::{
create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, JobError, MockJob,
};
use crate::queue::job_queue::{JOB_PROCESSING_QUEUE, JOB_VERIFICATION_QUEUE};
use crate::tests::common::MessagePayloadType;
use crate::tests::config::{ConfigType, TestConfigBuilder};
Expand Down Expand Up @@ -298,6 +302,45 @@ async fn process_job_two_workers_process_same_job_works() {
assert_eq!(final_job_in_db.status, JobStatus::PendingVerification);
}

/// Tests `process_job` function when the job handler returns an error.
/// The job should be moved to the failed status.
#[rstest]
#[tokio::test]
async fn process_job_job_handler_returns_error_works() {
let mut job_handler = MockJob::new();
// Expecting process job function in job processor to return the external ID.
let failure_reason = "Failed to process job";
job_handler
.expect_process_job()
.times(1)
.returning(move |_, _| Err(JobError::Other(failure_reason.to_string().into())));
job_handler.expect_verification_polling_delay_seconds().return_const(1u64);

// Mocking the `get_job_handler` call in create_job function.
let job_handler: Arc<Box<dyn Job>> = Arc::new(Box::new(job_handler));
let ctx = mock_factory::get_job_handler_context();
ctx.expect().times(1).with(eq(JobType::SnosRun)).returning(move |_| Arc::clone(&job_handler));

// building config
let services = TestConfigBuilder::new()
.configure_database(ConfigType::Actual)
.configure_queue_client(ConfigType::Actual)
.build()
.await;
let db_client = services.config.database();

let job_item = build_job_item_by_type_and_status(JobType::SnosRun, JobStatus::Created, "1".to_string());

// Creating the job in the db
db_client.create_job(job_item.clone()).await.unwrap();

assert!(process_job(job_item.id, services.config.clone()).await.is_ok());

let final_job_in_db = db_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(final_job_in_db.status, JobStatus::Failed);
assert!(final_job_in_db.metadata.get(JOB_METADATA_FAILURE_REASON).unwrap().to_string().contains(failure_reason));
}

/// Tests `verify_job` function when job is having expected status
/// and returns a `Verified` verification status.
#[rstest]
Expand Down Expand Up @@ -427,7 +470,7 @@ async fn verify_job_with_rejected_status_works() {

// DB checks.
let updated_job = database_client.get_job_by_id(job_item.id).await.unwrap().unwrap();
assert_eq!(updated_job.status, JobStatus::VerificationFailed);
assert_eq!(updated_job.status, JobStatus::Failed);
assert_eq!(updated_job.metadata.get(JOB_PROCESS_ATTEMPT_METADATA_KEY).unwrap(), "1");

// Waiting for 5 secs for message to be passed into the queue
Expand Down Expand Up @@ -623,7 +666,10 @@ async fn handle_job_failure_with_correct_job_status_works(#[case] job_type: JobT
// creating expected output
let mut job_expected = job.clone();
let mut job_metadata = job_expected.metadata.clone();
job_metadata.insert("last_job_status".to_string(), job_status.to_string());
job_metadata.insert(
JOB_METADATA_FAILURE_REASON.to_string(),
format!("Received failure queue message for job with status: {}", job_status),
);
job_expected.metadata.clone_from(&job_metadata);
job_expected.status = JobStatus::Failed;
job_expected.version = 1;
Expand Down
Loading