Skip to content
This repository was archived by the owner on May 24, 2025. It is now read-only.

Adding Logs #147

Merged
merged 17 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ STARKNET_ACCOUNT_ADDRESS=0x3bb306a004034dba19e6cf7b161e7a4fef64bc1078419e8ad1876
## Instrumentation
OTEL_SERVICE_NAME="madara_orchestrator"


##### Tests #####

STARKNET_OPERATOR_ADDRESS="0x5b98B836969A60FEC50Fa925905Dd1D382a7db43"
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- added logs
- added MongoDB migrations using nodejs
- added dockerfile
- `SnosJob` implementation and e2e
Expand Down
3 changes: 1 addition & 2 deletions crates/orchestrator/src/controllers/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use axum::response::IntoResponse;
use axum::Json;
use color_eyre::eyre::ErrReport;
use serde_json::json;
use tracing::log;

/// Root level error which is sent back to the client
#[derive(thiserror::Error, Debug)]
Expand All @@ -15,7 +14,7 @@ pub enum AppError {
/// Convert the error into a response so that it can be sent back to the client
impl IntoResponse for AppError {
fn into_response(self) -> axum::http::Response<axum::body::Body> {
log::error!("Error: {:?}", self);
tracing::error!("Error: {:?}", self);
let (status, err_msg) = match self {
Self::InternalServerError(msg) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, msg.to_string()),
};
Expand Down
16 changes: 16 additions & 0 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ impl DataStorage for AWSS3 {
async fn get_data(&self, key: &str) -> Result<Bytes> {
let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?;
let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes.");
tracing::debug!("DataStorage: Collected response body into data stream from {}, key={}", self.bucket, key);
let data_bytes = data_stream.into_bytes();
tracing::debug!(
log_type = "DataStorage",
category = "data_storage_call",
data_bytes = data_bytes.len(),
"Successfully retrieved and converted data from {}, key={}",
self.bucket,
key
);
Ok(data_bytes)
}

Expand All @@ -63,6 +72,13 @@ impl DataStorage for AWSS3 {
.send()
.await?;

tracing::debug!(
log_type = "DataStorage",
category = "data_storage_call",
"Successfully put data into {}. key={}",
self.bucket,
key
);
Ok(())
}

Expand Down
44 changes: 25 additions & 19 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl MongoDb {
let client = Client::with_options(client_options).expect("Failed to create MongoDB client");
// Ping the server to see if you can connect to the cluster
client.database("admin").run_command(doc! {"ping": 1}, None).await.expect("Failed to ping MongoDB deployment");
log::debug!("Pinged your deployment. You successfully connected to MongoDB!");
tracing::debug!("Pinged your deployment. You successfully connected to MongoDB!");

Self { client, database_name: mongo_db_settings.database_name }
}
Expand Down Expand Up @@ -86,30 +86,33 @@ impl MongoDb {

#[async_trait]
impl Database for MongoDb {
#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn create_job(&self, job: JobItem) -> Result<JobItem> {
self.get_job_collection().insert_one(&job, None).await?;
tracing::debug!(job_id = %job.id, category = "db_call", "Job created successfully");
Ok(job)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_job_by_id(&self, id: Uuid) -> Result<Option<JobItem>> {
let filter = doc! {
"id": id
};
tracing::debug!(job_id = %id, category = "db_call", "Fetched job by ID");
Ok(self.get_job_collection().find_one(filter, None).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"internal_id": internal_id,
"job_type": mongodb::bson::to_bson(&job_type)?,
};
tracing::debug!(internal_id = %internal_id, job_type = ?job_type, category = "db_call", "Fetched job by internal ID and type");
Ok(self.get_job_collection().find_one(filter, None).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result<()> {
// Filters to search for the job
let filter = doc! {
Expand All @@ -126,18 +129,21 @@ impl Database for MongoDb {

let result = self.get_job_collection().update_one(filter, update, options).await?;
if result.modified_count == 0 {
tracing::warn!(job_id = %current_job.id, category = "db_call", "Failed to update job. Job version is likely outdated");
return Err(eyre!("Failed to update job. Job version is likely outdated"));
}

tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully");
Ok(())
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
tracing::debug!(job_type = ?job_type, category = "db_call", "Fetching latest job by type");
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

Expand All @@ -162,7 +168,7 @@ impl Database for MongoDb {
/// job_b_type : ProofCreation
///
/// TODO : For now Job B status implementation is pending so we can pass None
#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_without_successor(
&self,
job_a_type: JobType,
Expand Down Expand Up @@ -226,7 +232,6 @@ impl Database for MongoDb {
}
},
];

// TODO : Job B status code :
// // Conditionally add status matching for job_b_status
// if let Some(status) = job_b_status {
Expand All @@ -253,16 +258,17 @@ impl Database for MongoDb {
match result {
Ok(document) => match bson::from_bson(Bson::Document(document)) {
Ok(job_item) => vec_jobs.push(job_item),
Err(e) => eprintln!("Failed to deserialize JobItem: {:?}", e),
Err(e) => tracing::error!(error = %e, category = "db_call", "Failed to deserialize JobItem"),
},
Err(e) => eprintln!("Error retrieving document: {:?}", e),
Err(e) => tracing::error!(error = %e, category = "db_call", "Error retrieving document"),
}
}

tracing::debug!(job_count = vec_jobs.len(), category = "db_call", "Retrieved jobs without successor");
Ok(vec_jobs)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_latest_job_by_type_and_status(
&self,
job_type: JobType,
Expand All @@ -274,10 +280,11 @@ impl Database for MongoDb {
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();

tracing::debug!(job_type = ?job_type, job_status = ?job_status, category = "db_call", "Fetched latest job by type and status");
Ok(self.get_job_collection().find_one(filter, find_options).await?)
}

#[tracing::instrument(skip(self), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_after_internal_id_by_job_type(
&self,
job_type: JobType,
Expand All @@ -287,15 +294,14 @@ impl Database for MongoDb {
let filter = doc! {
"job_type": bson::to_bson(&job_type)?,
"status": bson::to_bson(&job_status)?,
"internal_id": { "$gt": internal_id }
"internal_id": { "$gt": internal_id.clone() }
};

let jobs = self.get_job_collection().find(filter, None).await?.try_collect().await?;

let jobs: Vec<JobItem> = self.get_job_collection().find(filter, None).await?.try_collect().await?;
tracing::debug!(job_type = ?job_type, job_status = ?job_status, internal_id = internal_id, category = "db_call", "Fetched jobs after internal ID by job type");
Ok(jobs)
}

#[tracing::instrument(skip(self, limit), fields(function_type = "db_call"))]
#[tracing::instrument(skip(self, limit), fields(function_type = "db_call"), ret, err)]
async fn get_jobs_by_statuses(&self, job_status: Vec<JobStatus>, limit: Option<i64>) -> Result<Vec<JobItem>> {
let filter = doc! {
"status": {
Expand All @@ -306,8 +312,8 @@ impl Database for MongoDb {

let find_options = limit.map(|val| FindOptions::builder().limit(Some(val)).build());

let jobs = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;

let jobs: Vec<JobItem> = self.get_job_collection().find(filter, find_options).await?.try_collect().await?;
tracing::debug!(job_count = jobs.len(), category = "db_call", "Retrieved jobs by statuses");
Ok(jobs)
}
}
92 changes: 61 additions & 31 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use starknet::core::types::{
};
use starknet::providers::Provider;
use thiserror::Error;
use tracing::log;
use uuid::Uuid;

use super::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
Expand Down Expand Up @@ -65,73 +64,93 @@ pub struct DaJob;

#[async_trait]
impl Job for DaJob {
#[tracing::instrument(fields(category = "da"), skip(self, _config, metadata))]
#[tracing::instrument(fields(category = "da"), skip(self, _config, metadata), ret, err)]
async fn create_job(
&self,
_config: Arc<Config>,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem, JobError> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
let job_id = Uuid::new_v4();
tracing::info!(log_type = "starting", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation started.");
let job_item = JobItem {
id: job_id,
internal_id: internal_id.clone(),
job_type: JobType::DataSubmission,
status: JobStatus::Created,
external_id: String::new().into(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
})
};
tracing::info!(log_type = "completed", category = "da", function_type = "create_job", block_no = %internal_id, "DA job creation completed.");
Ok(job_item)
}

#[tracing::instrument(fields(category = "da"), skip(self, config))]
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn process_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<String, JobError> {
let block_no = job
.internal_id
.parse::<u64>()
.wrap_err("Failed to parse u64".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, "DA job processing started.");
let block_no = job.internal_id.parse::<u64>().wrap_err("Failed to parse u64".to_string()).map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to parse block number");
JobError::Other(OtherError(e))
})?;

let state_update = config
.starknet_client()
.get_state_update(BlockId::Number(block_no))
.await
.wrap_err("Failed to get state Update.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to get state update");
JobError::Other(OtherError(e))
})?;

let state_update = match state_update {
MaybePendingStateUpdate::PendingUpdate(_) => {
tracing::warn!(job_id = ?job.id, block_no = block_no, "Block is still pending");
Err(DaError::BlockPending { block_no: block_no.to_string(), job_id: job.id })?
}
MaybePendingStateUpdate::Update(state_update) => state_update,
};
tracing::debug!(job_id = ?job.id, "Retrieved state update");
// constructing the data from the rpc
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone())
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
let blob_data = state_update_to_blob_data(block_no, state_update, config.clone()).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert state update to blob data");
JobError::Other(OtherError(e))
})?;
// transforming the data so that we can apply FFT on this.
// @note: we can skip this step if in the above step we return vec<BigUint> directly
let blob_data_biguint = convert_to_biguint(blob_data.clone());
tracing::trace!(job_id = ?job.id, "Converted blob data to BigUint");

// data transformation on the data
let transformed_data = fft_transformation(blob_data_biguint);
// data transformation on the data
tracing::trace!(job_id = ?job.id, "Applied FFT transformation");

store_blob_data(transformed_data.clone(), block_no, config.clone()).await?;
tracing::debug!(job_id = ?job.id, "Stored blob data");

let max_bytes_per_blob = config.da_client().max_bytes_per_blob().await;
let max_blob_per_txn = config.da_client().max_blob_per_txn().await;

tracing::trace!(job_id = ?job.id, max_bytes_per_blob = max_bytes_per_blob, max_blob_per_txn = max_blob_per_txn, "Retrieved DA client configuration");
// converting BigUints to Vec<u8>, one Vec<u8> represents one blob data

let blob_array = data_to_blobs(max_bytes_per_blob, transformed_data)?;
let current_blob_length: u64 = blob_array
.len()
.try_into()
.wrap_err("Unable to convert the blob length into u64 format.".to_string())
.map_err(|e| JobError::Other(OtherError(e)))?;
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to convert blob length to u64");
JobError::Other(OtherError(e))
})?;
tracing::debug!(job_id = ?job.id, blob_count = current_blob_length, "Converted data to blobs");

// there is a limit on number of blobs per txn, checking that here
if current_blob_length > max_blob_per_txn {
tracing::warn!(job_id = ?job.id, current_blob_length = current_blob_length, max_blob_per_txn = max_blob_per_txn, "Exceeded maximum number of blobs per transaction");
Err(DaError::MaxBlobsLimitExceeded {
max_blob_per_txn,
current_blob_length,
Expand All @@ -141,23 +160,34 @@ impl Job for DaJob {
}

// making the txn to the DA layer
let external_id = config
.da_client()
.publish_state_diff(blob_array, &[0; 32])
.await
.map_err(|e| JobError::Other(OtherError(e)))?;
let external_id = config.da_client().publish_state_diff(blob_array, &[0; 32]).await.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to publish state diff to DA layer");
JobError::Other(OtherError(e))
})?;

tracing::info!(log_type = "completed", category = "da", function_type = "process_job", job_id = ?job.id, block_no = %internal_id, external_id = ?external_id, "Successfully published state diff to DA layer.");
Ok(external_id)
}

#[tracing::instrument(fields(category = "da"), skip(self, config))]
#[tracing::instrument(fields(category = "da"), skip(self, config), ret, err)]
async fn verify_job(&self, config: Arc<Config>, job: &mut JobItem) -> Result<JobVerificationStatus, JobError> {
Ok(config
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "da", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "DA job verification started.");
let verification_status = config
.da_client()
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| JobError::Other(OtherError(e)))?)
.verify_inclusion(job.external_id.unwrap_string().map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Failed to unwrap external ID");
JobError::Other(OtherError(e))
})?)
.await
.map_err(|e| JobError::Other(OtherError(e)))?
.into())
.map_err(|e| {
tracing::error!(job_id = ?job.id, error = ?e, "Job verification failed");
JobError::Other(OtherError(e))
})?
.into();

tracing::info!(log_type = "completed", category = "da", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, verification_status = ?verification_status, "DA job verification completed.");
Ok(verification_status)
}

fn max_process_attempts(&self) -> u64 {
Expand All @@ -173,7 +203,7 @@ impl Job for DaJob {
}
}

#[tracing::instrument(skip(elements))]
#[tracing::instrument(skip(elements), ret)]
pub fn fft_transformation(elements: Vec<BigUint>) -> Vec<BigUint> {
let xs: Vec<BigUint> = (0..*BLOB_LEN)
.map(|i| {
Expand Down Expand Up @@ -234,7 +264,7 @@ fn data_to_blobs(blob_size: u64, block_data: Vec<BigUint>) -> Result<Vec<Vec<u8>
let mut blob = chunk.to_vec();
if blob.len() < chunk_size {
blob.resize(chunk_size, 0);
log::debug!("Warning: Last chunk of {} bytes was padded to full blob size", chunk.len());
tracing::debug!("Warning: Last chunk of {} bytes was padded to full blob size", chunk.len());
}
blobs.push(blob);
}
Expand Down
Loading
Loading