Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expect removed from various places #197

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Changed

- refactor: expect removed and added error wraps
- refactor: Readme and .env.example
- refactor: http_mock version updated
- refactor: prover-services renamed to prover-clients
Expand Down
34 changes: 29 additions & 5 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{BucketLocationConstraint, CreateBucketConfiguration};
use aws_sdk_s3::Client;
use bytes::Bytes;
use color_eyre::eyre::Context;
use color_eyre::Result;

use crate::data_storage::DataStorage;
Expand Down Expand Up @@ -50,8 +51,20 @@ impl AWSS3 {
impl DataStorage for AWSS3 {
/// Function to get the data from S3 bucket by Key.
async fn get_data(&self, key: &str) -> Result<Bytes> {
let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?;
let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes.");
let response = self
.client
.get_object()
.bucket(&self.bucket)
.key(key)
.send()
.await
.context(format!("Failed to get object from bucket: {}, key: {}", self.bucket, key))?;

let data_stream = response.body.collect().await.context(format!(
"Failed to collect body into AggregatedBytes for bucket: {}, key: {}",
self.bucket, key
))?;

tracing::debug!("DataStorage: Collected response body into data stream from {}, key={}", self.bucket, key);
let data_bytes = data_stream.into_bytes();
tracing::debug!(
Expand All @@ -74,7 +87,8 @@ impl DataStorage for AWSS3 {
.body(ByteStream::from(data))
.content_type("application/json")
.send()
.await?;
.await
.context(format!("Failed to put object in bucket: {}, key: {}", self.bucket, key))?;

tracing::debug!(
log_type = "DataStorage",
Expand All @@ -88,7 +102,12 @@ impl DataStorage for AWSS3 {

async fn create_bucket(&self, bucket_name: &str) -> Result<()> {
if self.bucket_location_constraint.as_str() == "us-east-1" {
self.client.create_bucket().bucket(bucket_name).send().await?;
self.client
.create_bucket()
.bucket(bucket_name)
.send()
.await
.context(format!("Failed to create bucket: {} in us-east-1", bucket_name))?;
heemankv marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}

Expand All @@ -101,7 +120,12 @@ impl DataStorage for AWSS3 {
.bucket(bucket_name)
.set_create_bucket_configuration(bucket_configuration)
.send()
.await?;
.await
.context(format!(
"Failed to create bucket: {} in region: {}",
bucket_name,
self.bucket_location_constraint.as_str()
))?;

Ok(())
}
Expand Down
46 changes: 25 additions & 21 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ impl Job for DaJob {
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

let transformed_data = fft_transformation(blob_data_biguint);
let transformed_data =
fft_transformation(blob_data_biguint).wrap_err("Failed to apply FFT transformation").map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to apply FFT transformation");
JobError::Other(OtherError(e))
})?;
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

Expand Down Expand Up @@ -204,17 +208,18 @@ impl Job for DaJob {
}

#[tracing::instrument(skip(elements))]
pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
pub fn fft_transformation(elements: Vec<BigUint>) -> Result<Vec<BigUint>, JobError> {
let xs: Vec<BigUint> = (0..*BLOB_LEN)
.map(|i| {
let bin = format!("{:012b}", i);
let bin_rev = bin.chars().rev().collect::<String>();
GENERATOR.modpow(
&BigUint::from_str_radix(&bin_rev, 2).expect("Not able to convert the parameters into exponent."),
&BLS_MODULUS,
)
let exponent = BigUint::from_str_radix(&bin_rev, 2)
.wrap_err("Failed to convert binary string to exponent")
.map_err(|e| JobError::Other(OtherError(e)))?;
Ok(GENERATOR.modpow(&exponent, &BLS_MODULUS))
})
.collect();
.collect::<Result<Vec<BigUint>, JobError>>()?;

let n = elements.len();
let mut transform: Vec<BigUint> = vec![BigUint::zero(); n];

Expand All @@ -223,7 +228,7 @@ pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
transform[i] = (transform[i].clone().mul(&xs[i]).add(&elements[j])).rem(&*BLS_MODULUS);
}
}
transform
Ok(transform)
}

pub fn convert_to_biguint(elements: Vec<Felt>) -> Vec<BigUint> {
Expand Down Expand Up @@ -310,7 +315,7 @@ pub async fn state_update_to_blob_data(

nonce = Some(get_current_nonce_result);
}
let da_word = da_word(class_flag.is_some(), nonce, storage_entries.len() as u64);
let da_word = da_word(class_flag.is_some(), nonce, storage_entries.len() as u64)?;
blob_data.push(address);
blob_data.push(da_word);

Expand Down Expand Up @@ -355,7 +360,7 @@ async fn store_blob_data(blob_data: Vec<BigUint>, block_number: u64, config: Arc
/// DA word encoding:
/// |---padding---|---class flag---|---new nonce---|---num changes---|
/// 127 bits 1 bit 64 bits 64 bits
fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Felt {
fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Result<Felt, JobError> {
// padding of 127 bits
let mut binary_string = "0".repeat(127);

Expand All @@ -367,13 +372,8 @@ fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Fe
}

// checking for nonce here
if let Some(_new_nonce) = nonce_change {
let bytes: [u8; 32] = nonce_change
.expect(
"Not able to convert the nonce_change var into [u8; 32] type. Possible Error : Improper parameter \
length.",
)
.to_bytes_be();
if let Some(new_nonce) = nonce_change {
let bytes: [u8; 32] = new_nonce.to_bytes_be();
let biguint = BigUint::from_bytes_be(&bytes);
let binary_string_local = format!("{:b}", biguint);
let padded_binary_string = format!("{:0>64}", binary_string_local);
Expand All @@ -387,12 +387,16 @@ fn da_word(class_flag: bool, nonce_change: Option<Felt>, num_changes: u64) -> Fe
let padded_binary_string = format!("{:0>64}", binary_representation);
binary_string += &padded_binary_string;

let biguint = BigUint::from_str_radix(binary_string.as_str(), 2).expect("Invalid binary string");
let biguint = BigUint::from_str_radix(binary_string.as_str(), 2)
.wrap_err("Failed to convert binary string to BigUint")
.map_err(|e| JobError::Other(OtherError(e)))?;

// Now convert the BigUint to a decimal string
let decimal_string = biguint.to_str_radix(10);

Felt::from_dec_str(&decimal_string).expect("issue while converting to fieldElement")
Felt::from_dec_str(&decimal_string)
.wrap_err("Failed to convert decimal string to FieldElement")
.map_err(|e| JobError::Other(OtherError(e)))
}

fn refactor_state_update(state_update: &mut StateDiff) {
Expand Down Expand Up @@ -453,7 +457,7 @@ pub mod test {
#[case] expected: String,
) {
let new_nonce = if new_nonce > 0 { Some(Felt::from(new_nonce)) } else { None };
let da_word = da_word(class_flag, new_nonce, num_changes);
let da_word = da_word(class_flag, new_nonce, num_changes).expect("Failed to create DA word");
let expected = Felt::from_dec_str(expected.as_str()).unwrap();
assert_eq!(da_word, expected);
}
Expand Down Expand Up @@ -562,7 +566,7 @@ pub mod test {
// converting the data to its original format
let ifft_blob_data = blob::recover(original_blob_data.clone());
// applying the fft function again on the original format
let fft_blob_data = fft_transformation(ifft_blob_data);
let fft_blob_data = fft_transformation(ifft_blob_data).expect("FFT transformation failed during test");

// ideally the data after fft transformation and the data before ifft should be same.
assert_eq!(fft_blob_data, original_blob_data);
Expand Down
31 changes: 20 additions & 11 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Job for StateUpdateJob {
for block_no in block_numbers.iter() {
tracing::debug!(job_id = %job.internal_id, block_no = %block_no, "Processing block");

let snos = self.fetch_snos_for_block(*block_no, config.clone()).await;
let snos = self.fetch_snos_for_block(*block_no, config.clone()).await?;
let txn_hash = self
.update_state_for_block(config.clone(), *block_no, snos, nonce)
.await
Expand Down Expand Up @@ -320,7 +320,7 @@ impl StateUpdateJob {
.await
.map_err(|e| JobError::Other(OtherError(e)))?;

let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await;
let program_output = self.fetch_program_output_for_block(block_no, config.clone()).await?;

// TODO :
// Fetching nonce before the transaction is run
Expand All @@ -336,21 +336,30 @@ impl StateUpdateJob {
}

/// Retrieves the SNOS output for the corresponding block.
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> StarknetOsOutput {
async fn fetch_snos_for_block(&self, block_no: u64, config: Arc<Config>) -> Result<StarknetOsOutput, JobError> {
let storage_client = config.storage();
let key = block_no.to_string() + "/" + SNOS_OUTPUT_FILE_NAME;
let snos_output_bytes = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block");
serde_json::from_slice(snos_output_bytes.iter().as_slice())
.expect("Unable to convert the data into snos output")

let snos_output_bytes = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;

serde_json::from_slice(snos_output_bytes.iter().as_slice()).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize SNOS output for block {}: {}", block_no, e)))
})
}

async fn fetch_program_output_for_block(&self, block_number: u64, config: Arc<Config>) -> Vec<[u8; 32]> {
async fn fetch_program_output_for_block(
&self,
block_number: u64,
config: Arc<Config>,
) -> Result<Vec<[u8; 32]>, JobError> {
let storage_client = config.storage();
let key = block_number.to_string() + "/" + PROGRAM_OUTPUT_FILE_NAME;
let program_output = storage_client.get_data(&key).await.expect("Unable to fetch snos output for block");
let decode_data: Vec<[u8; 32]> =
bincode::deserialize(&program_output).expect("Unable to decode the fetched data from storage provider.");
decode_data

let program_output = storage_client.get_data(&key).await.map_err(|e| JobError::Other(OtherError(e)))?;

bincode::deserialize(&program_output).map_err(|e| {
JobError::Other(OtherError(eyre!("Failed to deserialize program output for block {}: {}", block_number, e)))
})
}

/// Insert the tx hashes into the the metadata for the attempt number - will be used later by
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/state_update_job/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn bytes_to_vec_u8(bytes: &[u8]) -> color_eyre::Result<Vec<[u8; 32]>> {
let trimmed = line.trim();
assert!(!trimmed.is_empty());

let result = U256::from_str(trimmed).expect("Unable to convert line");
let result = U256::from_str(trimmed)?;
let res_vec = result.to_be_bytes_vec();
let hex = to_padded_hex(res_vec.as_slice());
let vec_hex = hex_string_to_u8_vec(&hex)
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/queue/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ fn parse_worker_message(message: &Delivery) -> Result<Option<WorkerTriggerMessag
.borrow_payload()
.ok_or_else(|| ConsumptionError::Other(OtherError::from("Empty payload".to_string())))?;
let message_string = String::from_utf8_lossy(payload).to_string().trim_matches('\"').to_string();
let trigger_type = WorkerTriggerType::from_str(message_string.as_str()).expect("trigger type unwrapping failed");
let trigger_type = WorkerTriggerType::from_str(message_string.as_str())
.map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
heemankv marked this conversation as resolved.
Show resolved Hide resolved
Ok(Some(WorkerTriggerMessage { worker: trigger_type }))
}

Expand Down
12 changes: 8 additions & 4 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use color_eyre::eyre::WrapErr;
use opentelemetry::KeyValue;
use starknet::providers::Provider;

Expand Down Expand Up @@ -35,10 +36,13 @@ impl Worker for SnosWorker {

let latest_job_in_db = config.database().get_latest_job_by_type(JobType::SnosRun).await?;

let latest_job_id = match latest_job_in_db {
Some(job) => job.internal_id.parse::<u64>().expect("Failed to parse job internal ID to u64"),
None => "0".to_string().parse::<u64>().expect("Failed to parse '0' to u64"),
};
let latest_job_id = latest_job_in_db
.map(|job| {
job.internal_id
.parse::<u64>()
.wrap_err_with(|| format!("Failed to parse job internal ID: {}", job.internal_id))
})
.unwrap_or(Ok(0))?;

// To be used when testing in specific block range
let block_start = if let Some(min_block_to_process) = config.service_config().min_block_to_process {
Expand Down
3 changes: 1 addition & 2 deletions crates/prover-clients/atlantic-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ impl AtlanticClient {
)
.send()
.await
.map_err(AtlanticError::AddJobFailure)
.expect("Failed to add job");
.map_err(AtlanticError::AddJobFailure)?;
heemankv marked this conversation as resolved.
Show resolved Hide resolved

if response.status().is_success() {
response.json().await.map_err(AtlanticError::AddJobFailure)
Expand Down
20 changes: 10 additions & 10 deletions crates/settlement-clients/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use alloy::signers::local::PrivateKeySigner;
use alloy_primitives::Bytes;
use async_trait::async_trait;
use c_kzg::{Blob, Bytes32, KzgCommitment, KzgProof, KzgSettings};
use color_eyre::eyre::{bail, eyre, Ok};
use color_eyre::eyre::{bail, Ok};
use color_eyre::Result;
use conversion::{get_input_data_for_eip_4844, prepare_sidecar};
use settlement_client_interface::{SettlementClient, SettlementVerificationStatus};
Expand All @@ -36,6 +36,7 @@ pub mod tests;
pub mod types;
use alloy::providers::RootProvider;
use alloy::transports::http::Http;
use color_eyre::eyre::WrapErr;
use lazy_static::lazy_static;
use mockall::automock;
use reqwest::Client;
Expand Down Expand Up @@ -164,7 +165,10 @@ impl EthereumSettlementClient {
&KZG_SETTINGS,
)?;

if !eval { Err(eyre!("ERROR : Assertion failed, not able to verify the proof.")) } else { Ok(kzg_proof) }
if !eval {
bail!("ERROR : Assertion failed, not able to verify the proof.");
}
Ok(kzg_proof)
}
}

Expand Down Expand Up @@ -238,14 +242,10 @@ impl SettlementClient for EthereumSettlementClient {

// x_0_value : program_output[10]
// Updated with starknet 0.13.2 spec
let kzg_proof = Self::build_proof(
state_diff,
Bytes32::from_bytes(program_output[X_0_POINT_OFFSET].as_slice())
.expect("Not able to get x_0 point params."),
y_0,
)
.expect("Unable to build KZG proof for given params.")
.to_owned();
let x_0_point = Bytes32::from_bytes(program_output[X_0_POINT_OFFSET].as_slice())
.wrap_err("Failed to get x_0 point params")?;

let kzg_proof = Self::build_proof(state_diff, x_0_point, y_0).wrap_err("Failed to build KZG proof")?.to_owned();

let input_bytes = get_input_data_for_eip_4844(program_output, kzg_proof)?;

Expand Down
4 changes: 2 additions & 2 deletions crates/settlement-clients/starknet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use appchain_core_contract_client::clients::StarknetCoreContractClient;
use appchain_core_contract_client::interfaces::core_contract::CoreContract;
use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::eyre::{eyre, WrapErr};
use color_eyre::Result;
use crypto_bigint::Encoding;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -250,7 +250,7 @@ impl SettlementClient for StarknetSettlementClient {
return Err(eyre!("Could not fetch last block number from core contract."));
}

Ok(u64_from_felt(block_number[0]).expect("Failed to convert to u64"))
u64_from_felt(block_number[0]).wrap_err("Failed to convert block number from Felt to u64")
}

/// Returns the nonce for the wallet in use.
Expand Down
Loading