Skip to content

Commit

Permalink
Feature : JobError using thisError (#78)
Browse files Browse the repository at this point in the history
* update: added thiserror JobError

* update: introducing DaError

* update: add ProverJobError

* update: Register proof JobError

* update: SNOS proof JobError

* update: JobError on Queue

* update: added fix for borrow in state update process job

* update: cleaning rework for JobError

* update: wrap_err_with -> wrap_err wherever used with string

* update: moved all JobErrors to top of file

* update: Errors for consume_job_from_queue

* update: linting fixes

* update: optimised consume_job_from_queue match statement

* update: code optimisation based on PR reviews

* chore: lint fixes

* chore: lint fixes

* update: error matching optimised

* update: block_no to String and correct assert on test_da_job_process_job_failure_on_small_blob_size

* fix: linting

* update: error matching checks

* update: test state_update fix

* update: test state_update fix

* update: test state_update fix

* fix: lint

* chore: imports fixed

* feat : updated the errors enum and implemented OtherTest wrapper for eyre errors

* feat : fixed tests

---------

Co-authored-by: Arun Jangra <arunjangra1001@gmail.com>
  • Loading branch information
heemankv and ocdbytes authored Aug 16, 2024
1 parent 8377c07 commit 465415b
Show file tree
Hide file tree
Showing 11 changed files with 461 additions and 204 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Implement DL queue for handling failed jobs.
- Added tests for state update job.
- Tests for DA job.
- Added generalized errors for Jobs : JobError.
- Database tests

## Changed
Expand All @@ -28,7 +29,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder.
- `.env` file requires two more variables which are queue urls for processing
and verification.
- Shifted Unit tests to test folder for DA job.

## Removed

Expand Down
125 changes: 75 additions & 50 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use std::collections::HashMap;
use std::ops::{Add, Mul, Rem};
use std::result::Result::{Err, Ok as OtherOk};
use std::str::FromStr;

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Ok};
use color_eyre::Result;
use color_eyre::eyre::WrapErr;
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
//
use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry};
use starknet::providers::Provider;
use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::Job;
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;

Expand All @@ -40,6 +38,21 @@ lazy_static! {
pub static ref BLOB_LEN: usize = 4096;
}

#[derive(Error, Debug, PartialEq)]
pub enum DaError {
#[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")]
BlockPending { block_no: String, job_id: Uuid },

#[error("Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {blob_size:?}")]
InsufficientBlobSize { blob_size: u64 },

#[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")]
MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: String, job_id: Uuid },

#[error("Other error: {0}")]
Other(#[from] OtherError),
}

pub struct DaJob;

#[async_trait]
Expand All @@ -49,7 +62,7 @@ impl Job for DaJob {
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
) -> Result<JobItem, JobError> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
Expand All @@ -61,24 +74,30 @@ impl Job for DaJob {
})
}

async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String> {
let block_no = job.internal_id.parse::<u64>()?;
async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job
.internal_id
.parse::<u64>()
.wrap_err("Failed to parse u64".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

let state_update = config.starknet_client().get_state_update(BlockId::Number(block_no)).await?;
let state_update = config
.starknet_client()
.get_state_update(BlockId::Number(block_no))
.await
.wrap_err("Failed to get state Update.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

let state_update = match state_update {
MaybePendingStateUpdate::PendingUpdate(_) => {
log::error!("Cannot process block {} for job id {} as it's still in pending state", block_no, job.id);
return Err(eyre!(
"Cannot process block {} for job id {} as it's still in pending state",
block_no,
job.id
));
Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })?
}
MaybePendingStateUpdate::Update(state_update) => state_update,
};
// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config).await?;
let blob_data = state_update_to_blob_data(block_no, state_update, config)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
Expand All @@ -89,30 +108,40 @@ impl Job for DaJob {
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;

// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data
let blob_array =
data_to_blobs(max_bytes_per_blob, transformed_data).expect("error while converting blob data to vec<u8>");
let current_blob_length: u64 =
blob_array.len().try_into().expect("Unable to convert the blob length into u64 format.");
let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
.len()
.try_into()
.wrap_err("Unable to convert the blob length into u64 format.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

// there is a limit on number of blobs per txn, checking that here
if current_blob_length > max_blob_per_txn {
return Err(eyre!(
"Exceeded the maximum number of blobs per transaction: allowed {}, found {} for block {} and job id {}",
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
block_no,
job.id
));
block_no: block_no.to_string(),
job_id: job.id,
})?
}

// making the txn to the DA layer
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await?;
let external_id = config
.da_client()
.publish_state_diff(blob_array, &[0; 32])
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

Ok(external_id)
}

async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus> {
Ok(config.da_client().verify_inclusion(job.external_id.unwrap_string()?).await?.into())
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config
.da_client()
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?)
.await
.map_err(|e| JobError::Other(OtherError(e)))?
.into())
}

fn max_process_attempts(&self) -> u64 {
Expand Down Expand Up @@ -169,13 +198,10 @@ pub fn convert_to_biguint(elements: Vec<FieldElement>) -> Vec<BigUint> {
biguint_vec
}

fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>> {
fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>, JobError> {
// Validate blob size
if blob_size < 32 {
return Err(eyre!(
"Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {}",
blob_size,
));
Err(DaError::InsufficientBlobSize { blob_size })?
}

let mut blobs: Vec<Vec<u8>> = Vec::new();
Expand All @@ -195,7 +221,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>
let mut last_blob = bytes;
last_blob.resize(blob_size as usize, 0); // Pad with zeros
blobs.push(last_blob);
println!("Warning: Remaining {} bytes not forming a complete blob were padded", remaining_bytes);
log::debug!("Warning: Remaining {} bytes not forming a complete blob were padded", remaining_bytes);
}

Ok(blobs)
Expand All @@ -205,7 +231,7 @@ pub async fn state_update_to_blob_data(
block_no: u64,
state_update: StateUpdate,
config: &Config,
) -> Result<Vec<FieldElement>> {
) -> color_eyre::Result<Vec<FieldElement>> {
let state_diff = state_update.state_diff;
let mut blob_data: Vec<FieldElement> = vec![
FieldElement::from(state_diff.storage_diffs.len()),
Expand Down Expand Up @@ -238,22 +264,20 @@ pub async fn state_update_to_blob_data(
// nonce for the block

if nonce.is_none() && !writes.is_empty() && addr != FieldElement::ONE {
let get_current_nonce_result = config.starknet_client().get_nonce(BlockId::Number(block_no), addr).await;

nonce = match get_current_nonce_result {
OtherOk(get_current_nonce) => Some(get_current_nonce),
Err(e) => {
log::error!("Failed to get nonce: {}", e);
return Err(eyre!("Failed to get nonce: {}", e));
}
};
let get_current_nonce_result = config
.starknet_client()
.get_nonce(BlockId::Number(block_no), addr)
.await
.wrap_err("Failed to get nonce ".to_string())?;

nonce = Some(get_current_nonce_result);
}
let da_word = da_word(class_flag.is_some(), nonce, writes.len() as u64);
// @note: it can be improved if the first push to the data is of block number and hash
// @note: ONE address is special address which for now has 1 value and that is current
// block number and hash
// @note: ONE special address can be used to mark the range of block, if in future
// the team wants to submit multiple blocks in a sinle blob etc.
// the team wants to submit multiple blocks in a single blob etc.
if addr == FieldElement::ONE && da_word == FieldElement::ONE {
continue;
}
Expand Down Expand Up @@ -284,21 +308,22 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<()> {
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let data_blob_big_uint = convert_to_biguint(blob_data.clone());

let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint)
.expect("Not able to convert the data into blobs.");
let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint)?;

let blob = blobs_array.clone();

// converting Vec<Vec<u8> into Vec<u8>
let blob_vec_u8 = bincode::serialize(&blob)?;
let blob_vec_u8 = bincode::serialize(&blob)
.wrap_err("Unable to Serialize blobs (Vec<Vec<u8> into Vec<u8>)".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

if !blobs_array.is_empty() {
storage_client.put_data(blob_vec_u8.into(), &key).await?;
storage_client.put_data(blob_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
Loading

0 comments on commit 465415b

Please sign in to comment.