Skip to content

Commit 43db46e

Browse files
feat: Data Submission Worker Integration. (#51)
* update: DA job draft #1 * docs: changelog updated * update: is_worker_enabled impl & usage in da_submission, removal of String from VerificationFailed * update: renamed to * update: run worker only if it's enabled using is_worker_enabled check * build: linter fixes * Update CHANGELOG.md Co-authored-by: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> * update: limit_to_one on get_jobs_by_status * update: removed get_last_successful_job_by_type, added get_latest_job_by_type_and_status * update: added error to job metadata * update: pr resolution, simplifying get_jobs_by_status, rejected status in verify_jobs * update: linting fixes * Update crates/orchestrator/src/jobs/mod.rs Co-authored-by: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com> * update: removing .expect from mongodb mod file * update: fixed testcase for snos worker * chore: correct variable name * update: added support to check againt multiple status - is_worker_enabled, get_jobs_by_statuses * docs: rewrote 1 job per block assumption * docs: DataSubmissionWorker -> DataAvailabilitySynchronizer * chore: liniting fix * update: changed name : DataAvailabilitySynchronizer -> DataSubmissionWorker --------- Co-authored-by: Apoorv Sadana <95699312+apoorvsadana@users.noreply.github.com>
1 parent d3e57f9 commit 43db46e

File tree

13 files changed

+152
-52
lines changed

13 files changed

+152
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
66

77
## Added
88

9+
- implemented DA worker.
910
- Function to calculate the kzg proof of x_0.
1011
- Tests for updating the state.
1112
- Function to update the state and publish blob on ethereum in state update job.

crates/orchestrator/src/database/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,27 @@ pub trait Database: Send + Sync {
2929
async fn update_job(&self, job: &JobItem) -> Result<()>;
3030
async fn update_job_status(&self, job: &JobItem, new_status: JobStatus) -> Result<()>;
3131
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
32-
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>>;
32+
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
3333
async fn get_jobs_without_successor(
3434
&self,
3535
job_a_type: JobType,
3636
job_a_status: JobStatus,
3737
job_b_type: JobType,
3838
) -> Result<Vec<JobItem>>;
39-
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
39+
async fn get_latest_job_by_type_and_status(
40+
&self,
41+
job_type: JobType,
42+
job_status: JobStatus,
43+
) -> Result<Option<JobItem>>;
4044
async fn get_jobs_after_internal_id_by_job_type(
4145
&self,
4246
job_type: JobType,
4347
job_status: JobStatus,
4448
internal_id: String,
4549
) -> Result<Vec<JobItem>>;
50+
51+
// TODO: can be extendible to support multiple status.
52+
async fn get_jobs_by_statuses(&self, status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>>;
4653
}
4754

4855
pub trait DatabaseConfig {

crates/orchestrator/src/database/mongodb/mod.rs

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use async_std::stream::StreamExt;
2+
use futures::TryStreamExt;
23
use std::collections::HashMap;
34

45
use async_trait::async_trait;
56
use color_eyre::eyre::eyre;
67
use color_eyre::Result;
78
use mongodb::bson::{Bson, Document};
8-
use mongodb::options::{FindOneOptions, UpdateOptions};
9+
use mongodb::options::{FindOneOptions, FindOptions, UpdateOptions};
910
use mongodb::{
1011
bson,
1112
bson::doc,
@@ -112,16 +113,12 @@ impl Database for MongoDb {
112113
Ok(())
113114
}
114115

115-
async fn get_latest_job_by_type_and_internal_id(&self, job_type: JobType) -> Result<Option<JobItem>> {
116+
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
116117
let filter = doc! {
117118
"job_type": mongodb::bson::to_bson(&job_type)?,
118119
};
119120
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
120-
Ok(self
121-
.get_job_collection()
122-
.find_one(filter, find_options)
123-
.await
124-
.expect("Failed to fetch latest job by given job type"))
121+
Ok(self.get_job_collection().find_one(filter, find_options).await?)
125122
}
126123

127124
/// function to get jobs that don't have a successor job.
@@ -226,8 +223,7 @@ impl Database for MongoDb {
226223
// }
227224
// }
228225

229-
let collection = self.get_job_collection();
230-
let mut cursor = collection.aggregate(pipeline, None).await?;
226+
let mut cursor = self.get_job_collection().aggregate(pipeline, None).await?;
231227

232228
let mut vec_jobs: Vec<JobItem> = Vec::new();
233229

@@ -245,18 +241,18 @@ impl Database for MongoDb {
245241
Ok(vec_jobs)
246242
}
247243

248-
async fn get_last_successful_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
244+
async fn get_latest_job_by_type_and_status(
245+
&self,
246+
job_type: JobType,
247+
job_status: JobStatus,
248+
) -> Result<Option<JobItem>> {
249249
let filter = doc! {
250250
"job_type": bson::to_bson(&job_type)?,
251-
"job_status": bson::to_bson(&JobStatus::Completed)?
251+
"job_status": bson::to_bson(&job_status)?
252252
};
253253
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
254254

255-
Ok(self
256-
.get_job_collection()
257-
.find_one(filter, find_options)
258-
.await
259-
.expect("Failed to fetch latest job by given job type"))
255+
Ok(self.get_job_collection().find_one(filter, find_options).await?)
260256
}
261257

262258
async fn get_jobs_after_internal_id_by_job_type(
@@ -271,23 +267,23 @@ impl Database for MongoDb {
271267
"internal_id": { "$gt": internal_id }
272268
};
273269

274-
let mut jobs = self
275-
.get_job_collection()
276-
.find(filter, None)
277-
.await
278-
.expect("Failed to fetch latest jobs by given job type and internal_od conditions");
270+
let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?;
279271

280-
let mut results = Vec::new();
272+
Ok(jobs)
273+
}
281274

282-
while let Some(result) = jobs.next().await {
283-
match result {
284-
Ok(job_item) => {
285-
results.push(job_item);
286-
}
287-
Err(e) => return Err(e.into()),
275+
async fn get_jobs_by_statuses(&self, job_status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>> {
276+
let filter = doc! {
277+
"job_status": {
278+
// TODO: Check that the conversion leads to valid output!
279+
"$in": job_status.iter().map(|status| bson::to_bson(status).unwrap_or(Bson::Null)).collect::<Vec<Bson>>()
288280
}
289-
}
281+
};
282+
283+
let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build());
284+
285+
let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;
290286

291-
Ok(results)
287+
Ok(jobs)
292288
}
293289
}

crates/orchestrator/src/jobs/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ pub async fn process_job(id: Uuid) -> Result<()> {
8080
match job.status {
8181
// we only want to process jobs that are in the created or verification failed state.
8282
// verification failed state means that the previous processing failed and we want to retry
83-
JobStatus::Created | JobStatus::VerificationFailed(_) => {
83+
JobStatus::Created | JobStatus::VerificationFailed => {
8484
log::info!("Processing job with id {:?}", id);
8585
}
8686
_ => {
@@ -135,7 +135,13 @@ pub async fn verify_job(id: Uuid) -> Result<()> {
135135
config.database().update_job_status(&job, JobStatus::Completed).await?;
136136
}
137137
JobVerificationStatus::Rejected(e) => {
138-
config.database().update_job_status(&job, JobStatus::VerificationFailed(e)).await?;
138+
let mut new_job = job.clone();
139+
new_job.metadata.insert("error".to_string(), e);
140+
new_job.status = JobStatus::VerificationFailed;
141+
142+
config.database().update_job(&new_job).await?;
143+
144+
log::error!("Verification failed for job with id {:?}. Cannot verify.", id);
139145

140146
// retry job processing if we haven't exceeded the max limit
141147
let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

crates/orchestrator/src/jobs/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pub enum JobStatus {
9898
/// The job was processed but the was unable to be verified under the given time
9999
VerificationTimeout,
100100
/// The job failed processing
101-
VerificationFailed(String),
101+
VerificationFailed,
102102
}
103103

104104
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]

crates/orchestrator/src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use dotenvy::dotenv;
22
use orchestrator::config::config;
33
use orchestrator::queue::init_consumers;
44
use orchestrator::routes::app_router;
5+
use orchestrator::workers::data_submission_worker::DataSubmissionWorker;
56
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
67
use orchestrator::workers::proving::ProvingWorker;
78
use orchestrator::workers::snos::SnosWorker;
@@ -33,14 +34,15 @@ async fn main() {
3334
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
3435
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
3536
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));
37+
tokio::spawn(start_cron(Box::new(DataSubmissionWorker), 60));
3638

3739
tracing::info!("Listening on http://{}", address);
3840
axum::serve(listener, app).await.expect("Failed to start axum server");
3941
}
4042

4143
async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
4244
loop {
43-
worker.run_worker().await.expect("Error in running the worker.");
45+
worker.run_worker_if_enabled().await.expect("Error in running the worker.");
4446
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
4547
}
4648
}

crates/orchestrator/src/tests/workers/snos/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::config::config_force_init;
22
use crate::database::MockDatabase;
3-
use crate::jobs::types::JobType;
3+
use crate::jobs::types::{JobStatus, JobType};
44
use crate::queue::MockQueueProvider;
55
use crate::tests::common::init_config;
66
use crate::tests::workers::utils::get_job_item_mock_by_id;
@@ -30,15 +30,18 @@ async fn test_snos_worker(#[case] db_val: bool) -> Result<(), Box<dyn Error>> {
3030

3131
// Mocking db function expectations
3232
if !db_val {
33-
db.expect_get_last_successful_job_by_type().times(1).with(eq(JobType::SnosRun)).returning(|_| Ok(None));
33+
db.expect_get_latest_job_by_type_and_status()
34+
.times(1)
35+
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
36+
.returning(|_, _| Ok(None));
3437
start_job_index = 1;
3538
block = 5;
3639
} else {
3740
let uuid_temp = Uuid::new_v4();
3841

39-
db.expect_get_last_successful_job_by_type()
40-
.with(eq(JobType::SnosRun))
41-
.returning(move |_| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
42+
db.expect_get_latest_job_by_type_and_status()
43+
.with(eq(JobType::SnosRun), eq(JobStatus::Completed))
44+
.returning(move |_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), uuid_temp))));
4245
block = 6;
4346
start_job_index = 2;
4447
}

crates/orchestrator/src/tests/workers/update_state/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,18 @@ async fn test_update_state_worker(
3333
// Mocking db function expectations
3434
// If no successful state update jobs exist
3535
if !last_successful_job_exists {
36-
db.expect_get_last_successful_job_by_type().with(eq(JobType::StateTransition)).times(1).returning(|_| Ok(None));
36+
db.expect_get_latest_job_by_type_and_status()
37+
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
38+
.times(1)
39+
.returning(|_, _| Ok(None));
3740
} else {
3841
// if successful state update job exists
3942

4043
// mocking the return value of first function call (getting last successful jobs):
41-
db.expect_get_last_successful_job_by_type()
42-
.with(eq(JobType::StateTransition))
44+
db.expect_get_latest_job_by_type_and_status()
45+
.with(eq(JobType::StateTransition), eq(JobStatus::Completed))
4346
.times(1)
44-
.returning(|_| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));
47+
.returning(|_, _| Ok(Some(get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()))));
4548

4649
// mocking the return values of second function call (getting completed proving worker jobs)
4750
db.expect_get_jobs_after_internal_id_by_job_type()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use crate::config::config;
2+
use crate::jobs::create_job;
3+
use crate::jobs::types::{JobStatus, JobType};
4+
use crate::workers::Worker;
5+
use async_trait::async_trait;
6+
use std::collections::HashMap;
7+
use std::error::Error;
8+
9+
pub struct DataSubmissionWorker;
10+
11+
#[async_trait]
12+
impl Worker for DataSubmissionWorker {
13+
// 0. All ids are assumed to be block numbers.
14+
// 1. Fetch the latest completed Proving job.
15+
// 2. Fetch the latest DA job creation.
16+
// 3. Create jobs from after the lastest DA job already created till latest completed proving job.
17+
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
18+
let config = config().await;
19+
20+
// provides latest completed proof creation job id
21+
let latest_proven_job_id = config
22+
.database()
23+
.get_latest_job_by_type_and_status(JobType::ProofCreation, JobStatus::Completed)
24+
.await
25+
.unwrap()
26+
.map(|item| item.internal_id)
27+
.unwrap_or("0".to_string());
28+
29+
// provides latest triggered data submission job id
30+
let latest_data_submission_job_id = config
31+
.database()
32+
.get_latest_job_by_type(JobType::DataSubmission)
33+
.await
34+
.unwrap()
35+
.map(|item| item.internal_id)
36+
.unwrap_or("0".to_string());
37+
38+
let latest_data_submission_id: u64 = latest_data_submission_job_id.parse()?;
39+
let latest_proven_id: u64 = latest_proven_job_id.parse()?;
40+
41+
// creating data submission jobs for latest blocks that don't have existing data submission jobs yet.
42+
for new_job_id in latest_data_submission_id + 1..latest_proven_id + 1 {
43+
create_job(JobType::DataSubmission, new_job_id.to_string(), HashMap::new()).await?;
44+
}
45+
46+
Ok(())
47+
}
48+
}
Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,46 @@
1-
use std::error::Error;
2-
1+
use crate::{config::config, jobs::types::JobStatus};
32
use async_trait::async_trait;
3+
use std::error::Error;
44

5+
pub mod data_submission_worker;
56
pub mod proof_registration;
67
pub mod proving;
78
pub mod snos;
89
pub mod update_state;
910

1011
#[async_trait]
1112
pub trait Worker: Send + Sync {
13+
async fn run_worker_if_enabled(&self) -> Result<(), Box<dyn Error>> {
14+
if !self.is_worker_enabled().await? {
15+
return Ok(());
16+
}
17+
self.run_worker().await
18+
}
19+
1220
async fn run_worker(&self) -> Result<(), Box<dyn Error>>;
21+
22+
// Assumption
23+
// If say a job for block X fails, we don't want the worker to respawn another job for the same block
24+
// we will resolve the existing failed job first.
25+
26+
// We assume the system to keep working till a job hasn't failed,
27+
// as soon as it fails we currently halt any more execution and wait for manual intervention.
28+
29+
// Checks if any of the jobs have failed
30+
// Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed
31+
// Halts any new job creation till all the count of failed jobs is not Zero.
32+
async fn is_worker_enabled(&self) -> Result<bool, Box<dyn Error>> {
33+
let config = config().await;
34+
35+
let failed_jobs = config
36+
.database()
37+
.get_jobs_by_statuses(vec![JobStatus::VerificationFailed, JobStatus::VerificationTimeout], Some(1))
38+
.await?;
39+
40+
if !failed_jobs.is_empty() {
41+
return Ok(false);
42+
}
43+
44+
Ok(true)
45+
}
1346
}

crates/orchestrator/src/workers/snos.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use starknet::providers::Provider;
66

77
use crate::config::config;
88
use crate::jobs::create_job;
9-
use crate::jobs::types::JobType;
9+
use crate::jobs::types::{JobStatus, JobType};
1010
use crate::workers::Worker;
1111

1212
pub struct SnosWorker;
@@ -22,7 +22,7 @@ impl Worker for SnosWorker {
2222
let latest_block_number = provider.block_number().await?;
2323
let latest_block_processed_data = config
2424
.database()
25-
.get_last_successful_job_by_type(JobType::SnosRun)
25+
.get_latest_job_by_type_and_status(JobType::SnosRun, JobStatus::Completed)
2626
.await
2727
.unwrap()
2828
.map(|item| item.internal_id)

crates/orchestrator/src/workers/update_state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ impl Worker for UpdateStateWorker {
1616
/// 3. Create state updates for all the blocks that don't have a state update job
1717
async fn run_worker(&self) -> Result<(), Box<dyn Error>> {
1818
let config = config().await;
19-
let latest_successful_job = config.database().get_last_successful_job_by_type(JobType::StateTransition).await?;
19+
let latest_successful_job =
20+
config.database().get_latest_job_by_type_and_status(JobType::StateTransition, JobStatus::Completed).await?;
2021

2122
match latest_successful_job {
2223
Some(job) => {

crates/prover-services/gps-fact-checker/src/fact_node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
//! constructed using a stack of nodes (initialized to an empty stack) by repeating for each pair:
1313
//! 1. Add #n_pages lead nodes to the stack.
1414
//! 2. Pop the top #n_nodes, construct a parent node for them, and push it back to the stack.
15-
//! After applying the steps above, the stack must contain exactly one node, which will
16-
//! constitute the root of the Merkle tree.
15+
//! After applying the steps above, the stack must contain exactly one node, which will
16+
//! constitute the root of the Merkle tree.
1717
//!
1818
//! For example, [(2, 2)] will create a Merkle tree with a root and two direct children, while
1919
//! [(3, 2), (0, 2)] will create a Merkle tree with a root whose left child is a leaf and

0 commit comments

Comments
 (0)