Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature : JobError using thisError #78

Merged
merged 30 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b1ef61d
update: added thiserror JobError
heemankv Jul 31, 2024
11d3ad9
update: introducing DaError
heemankv Jul 31, 2024
cddd2a8
update: add ProverJobError
heemankv Aug 1, 2024
64b1559
update: Register proof JobError
heemankv Aug 1, 2024
1058d5e
update: SNOS proof JobError
heemankv Aug 1, 2024
afb45a1
update: JobError on Queue
heemankv Aug 1, 2024
62cce1d
update: added fix for borrow in state update process job
heemankv Aug 1, 2024
53dc8c4
update: cleaning rework for JobError
heemankv Aug 2, 2024
baebcc3
update: wrap_err_with -> wrap_err wherever used with string
heemankv Aug 9, 2024
a29453e
update: moved all JobErrors to top of file
heemankv Aug 3, 2024
c443dbe
update: Errors for consume_job_from_queue
heemankv Aug 3, 2024
ab74a7c
update: linting fixes
heemankv Aug 3, 2024
6c8f898
update: optimised consume_job_from_queue match statement
heemankv Aug 5, 2024
9f5267c
update: code optimisation based on PR reviews
heemankv Aug 5, 2024
1b3f9a6
chore: lint fixes
heemankv Aug 9, 2024
4580e92
chore: lint fixes
heemankv Aug 9, 2024
7ed04b6
update: error matching optimised
heemankv Aug 10, 2024
62715ca
update: block_no to String and correct assert on test_da_job_process_…
heemankv Aug 12, 2024
a8104c7
fix: linting
heemankv Aug 12, 2024
1d93eb1
update: error matching checks
heemankv Aug 12, 2024
e8f2715
update: test state_update fix
heemankv Aug 12, 2024
c4a484a
update: test state_update fix
heemankv Aug 12, 2024
43d95ce
update: test state_update fix
heemankv Aug 12, 2024
c040981
fix: lint
heemankv Aug 12, 2024
ba2cb27
Merge branch 'main' into feat/JobError-Using-ThisError
heemankv Aug 13, 2024
1420452
chore: imports fixed
heemankv Aug 13, 2024
8987033
Merge branch 'main' into feat/JobError-Using-ThisError
ocdbytes Aug 14, 2024
2429af9
feat : updated the errors enum and implemented OtherTest wrapper for …
ocdbytes Aug 14, 2024
f9b3985
Merge branch 'main' into feat/JobError-Using-ThisError
ocdbytes Aug 14, 2024
8fa7760
feat : fixed tests
ocdbytes Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Implement DL queue for handling failed jobs.
- Added tests for state update job.
- Tests for DA job.
- Added generalized errors for Jobs : JobError.
- Database tests

## Changed
Expand All @@ -28,7 +29,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Orchestrator :Moved TestConfigBuilder to `config.rs` in tests folder.
- `.env` file requires two more variables which are queue urls for processing
and verification.
- Shifted Unit tests to test folder for DA job.

## Removed

Expand Down
125 changes: 75 additions & 50 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use std::collections::HashMap;
use std::ops::{Add, Mul, Rem};
use std::result::Result::{Err, Ok as OtherOk};
use std::str::FromStr;

use async_trait::async_trait;
use color_eyre::eyre::{eyre, Ok};
use color_eyre::Result;
use color_eyre::eyre::WrapErr;
use lazy_static::lazy_static;
use num_bigint::{BigUint, ToBigUint};
use num_traits::{Num, Zero};
//
use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry};
use starknet::providers::Provider;
use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use super::Job;
use super::{Job, JobError, OtherError};
use crate::config::Config;
use crate::constants::BLOB_DATA_FILE_NAME;

Expand All @@ -40,6 +38,21 @@ lazy_static! {
pub static ref BLOB_LEN: usize = 4096;
}

#[derive(Error, Debug, PartialEq)]
pub enum DaError {
#[error("Cannot process block {block_no:?} for job id {job_id:?} as it's still in pending state.")]
BlockPending { block_no: String, job_id: Uuid },

#[error("Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {blob_size:?}")]
InsufficientBlobSize { blob_size: u64 },

#[error("Exceeded the maximum number of blobs per transaction: allowed {max_blob_per_txn:?}, found {current_blob_length:?} for block {block_no:?} and job id {job_id:?}")]
MaxBlobsLimitExceeded { max_blob_per_txn: u64, current_blob_length: u64, block_no: String, job_id: Uuid },

#[error("Other error: {0}")]
Other(#[from] OtherError),
}

pub struct DaJob;

#[async_trait]
Expand All @@ -49,7 +62,7 @@ impl Job for DaJob {
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
) -> Result<JobItem, JobError> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
Expand All @@ -61,24 +74,30 @@ impl Job for DaJob {
})
}

async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String> {
let block_no = job.internal_id.parse::<u64>()?;
async fn process_job(&self, config: &Config, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job
.internal_id
.parse::<u64>()
.wrap_err("Failed to parse u64".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

let state_update = config.starknet_client().get_state_update(BlockId::Number(block_no)).await?;
let state_update = config
.starknet_client()
.get_state_update(BlockId::Number(block_no))
.await
.wrap_err("Failed to get state Update.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

let state_update = match state_update {
MaybePendingStateUpdate::PendingUpdate(_) => {
log::error!("Cannot process block {} for job id {} as it's still in pending state", block_no, job.id);
return Err(eyre!(
"Cannot process block {} for job id {} as it's still in pending state",
block_no,
job.id
));
Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })?
}
MaybePendingStateUpdate::Update(state_update) => state_update,
};
// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config).await?;
let blob_data = state_update_to_blob_data(block_no, state_update, config)
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
Expand All @@ -89,30 +108,40 @@ impl Job for DaJob {
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;

// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data
let blob_array =
data_to_blobs(max_bytes_per_blob, transformed_data).expect("error while converting blob data to vec<u8>");
let current_blob_length: u64 =
blob_array.len().try_into().expect("Unable to convert the blob length into u64 format.");
let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
.len()
.try_into()
.wrap_err("Unable to convert the blob length into u64 format.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

// there is a limit on number of blobs per txn, checking that here
if current_blob_length > max_blob_per_txn {
return Err(eyre!(
"Exceeded the maximum number of blobs per transaction: allowed {}, found {} for block {} and job id {}",
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
block_no,
job.id
));
block_no: block_no.to_string(),
job_id: job.id,
})?
}

// making the txn to the DA layer
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await?;
let external_id = config
.da_client()
.publish_state_diff(blob_array, &[0; 32])
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

Ok(external_id)
}

async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus> {
Ok(config.da_client().verify_inclusion(job.external_id.unwrap_string()?).await?.into())
async fn verify_job(&self, config: &Config, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config
.da_client()
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?)
.await
.map_err(|e| JobError::Other(OtherError(e)))?
.into())
}

fn max_process_attempts(&self) -> u64 {
Expand Down Expand Up @@ -169,13 +198,10 @@ pub fn convert_to_biguint(elements: Vec<FieldElement>) -> Vec<BigUint> {
biguint_vec
}

fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>> {
fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>>, JobError> {
// Validate blob size
if blob_size < 32 {
return Err(eyre!(
"Blob size must be at least 32 bytes to accommodate a single FieldElement/BigUint, but was {}",
blob_size,
));
Err(DaError::InsufficientBlobSize { blob_size })?
}

let mut blobs: Vec<Vec<u8>> = Vec::new();
Expand All @@ -195,7 +221,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>
let mut last_blob = bytes;
last_blob.resize(blob_size as usize, 0); // Pad with zeros
blobs.push(last_blob);
println!("Warning: Remaining {} bytes not forming a complete blob were padded", remaining_bytes);
log::debug!("Warning: Remaining {} bytes not forming a complete blob were padded", remaining_bytes);
}

Ok(blobs)
Expand All @@ -205,7 +231,7 @@ pub async fn state_update_to_blob_data(
block_no: u64,
state_update: StateUpdate,
config: &Config,
) -> Result<Vec<FieldElement>> {
) -> color_eyre::Result<Vec<FieldElement>> {
let state_diff = state_update.state_diff;
let mut blob_data: Vec<FieldElement> = vec![
FieldElement::from(state_diff.storage_diffs.len()),
Expand Down Expand Up @@ -238,22 +264,20 @@ pub async fn state_update_to_blob_data(
// nonce for the block

if nonce.is_none() && !writes.is_empty() && addr != FieldElement::ONE {
let get_current_nonce_result = config.starknet_client().get_nonce(BlockId::Number(block_no), addr).await;

nonce = match get_current_nonce_result {
OtherOk(get_current_nonce) => Some(get_current_nonce),
Err(e) => {
log::error!("Failed to get nonce: {}", e);
return Err(eyre!("Failed to get nonce: {}", e));
}
};
let get_current_nonce_result = config
.starknet_client()
.get_nonce(BlockId::Number(block_no), addr)
.await
.wrap_err("Failed to get nonce ".to_string())?;

nonce = Some(get_current_nonce_result);
}
let da_word = da_word(class_flag.is_some(), nonce, writes.len() as u64);
// @note: it can be improved if the first push to the data is of block number and hash
// @note: ONE address is special address which for now has 1 value and that is current
// block number and hash
// @note: ONE special address can be used to mark the range of block, if in future
// the team wants to submit multiple blocks in a sinle blob etc.
// the team wants to submit multiple blocks in a single blob etc.
if addr == FieldElement::ONE && da_word == FieldElement::ONE {
continue;
}
Expand Down Expand Up @@ -284,21 +308,22 @@ pub async fn state_update_to_blob_data(
}

/// To store the blob data using the storage client with path <block_number>/blob_data.txt
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<()> {
async fn store_blob_data(blob_data: Vec<FieldElement>, block_number: u64, config: &Config) -> Result<(), JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + BLOB_DATA_FILE_NAME;
let data_blob_big_uint = convert_to_biguint(blob_data.clone());

let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint)
.expect("Not able to convert the data into blobs.");
let blobs_array = data_to_blobs(config.da_client().max_bytes_per_blob().await, data_blob_big_uint)?;

let blob = blobs_array.clone();

// converting Vec<Vec<u8> into Vec<u8>
let blob_vec_u8 = bincode::serialize(&blob)?;
let blob_vec_u8 = bincode::serialize(&blob)
.wrap_err("Unable to Serialize blobs (Vec<Vec<u8> into Vec<u8>)".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;

if !blobs_array.is_empty() {
storage_client.put_data(blob_vec_u8.into(), &key).await?;
storage_client.put_data(blob_vec_u8.into(), &key).await.map_err(|e| JobError::Other(OtherError(e)))?;
}

Ok(())
Expand Down
Loading
Loading