Skip to content

Commit

Permalink
update: fixed Reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv committed Oct 10, 2024
1 parent 14f70ee commit 68ad2eb
Show file tree
Hide file tree
Showing 20 changed files with 217 additions and 76 deletions.
18 changes: 15 additions & 3 deletions crates/orchestrator/src/data_storage/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ impl DataStorage for AWSS3 {
async fn get_data(&self, key: &str) -> Result<Bytes> {
let response = self.client.get_object().bucket(&self.bucket).key(key).send().await?;
let data_stream = response.body.collect().await.expect("Failed to convert body into AggregatedBytes.");
tracing::trace!("DataStorage: Collected response body into data stream. key={}", key);
tracing::debug!("DataStorage: Collected response body into data stream from {}, key={}", self.bucket, key);
let data_bytes = data_stream.into_bytes();
tracing::info!("DataStorage: Successfully retrieved and converted data from S3. key={}", key);
tracing::info!(
log_type = "DataStorage",
category = "data_storage_call",
"Successfully retrieved and converted data from {}, key={}",
self.bucket,
key
);
Ok(data_bytes)
}

Expand All @@ -65,7 +71,13 @@ impl DataStorage for AWSS3 {
.send()
.await?;

tracing::info!("DataStorage: Successfully put data into S3 bucket. key={}", key);
tracing::info!(
log_type = "DataStorage",
category = "data_storage_call",
"Successfully put data into {}. key={}",
self.bucket,
key
);
Ok(())
}

Expand Down
28 changes: 14 additions & 14 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub async fn create_job(
metadata: HashMap<String, String>,
config: Arc<Config>,
) -> Result<(), JobError> {
tracing::info!(log_type = "starting", category = "general", function_type = "create_job", job_type = ?job_type, block_no = %internal_id, "General create job started for block: {:?}", internal_id);
tracing::info!(log_type = "starting", category = "general", function_type = "create_job", job_type = ?job_type, block_no = %internal_id, "General create job started for block");

tracing::debug!(
job_type = ?job_type,
Expand Down Expand Up @@ -174,7 +174,7 @@ pub async fn create_job(
];

ORCHESTRATOR_METRICS.block_gauge.record(internal_id.parse::<f64>().unwrap(), &attributes);
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block: {:?}", internal_id);
tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block");
Ok(())
}

Expand All @@ -184,7 +184,7 @@ pub async fn create_job(
pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let mut job = get_job(id, config.clone()).await?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block: {:?}", internal_id);
tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block");

tracing::Span::current().record("job", format!("{:?}", job.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type.clone()));
Expand All @@ -195,10 +195,10 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
// we only want to process jobs that are in the created or verification failed state.
// verification failed state means that the previous processing failed and we want to retry
JobStatus::Created | JobStatus::VerificationFailed => {
tracing::info!(job_id = ?id, "Processing job");
tracing::info!(job_id = ?id, status = ?job.status, "Job status is Created or VerificationFailed, proceeding with processing");
}
_ => {
tracing::warn!(job_id = ?id, status = ?job.status, "Invalid job status for processing");
tracing::warn!(job_id = ?id, status = ?job.status, "Job status is Invalid. Cannot process.");
return Err(JobError::InvalidStatus { id, job_status: job.status });
}
}
Expand All @@ -215,10 +215,10 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
JobError::Other(OtherError(e))
})?;

tracing::trace!(job_id = ?id, job_type = ?job.job_type, "Getting job handler");
tracing::debug!(job_id = ?id, job_type = ?job.job_type, "Getting job handler");
let job_handler = factory::get_job_handler(&job.job_type).await;
let external_id = job_handler.process_job(config.clone(), &mut job).await?;
tracing::trace!(job_id = ?id, "Incrementing process attempt count in metadata");
tracing::debug!(job_id = ?id, "Incrementing process attempt count in metadata");
let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY)?;

let mut job_cloned = job.clone();
Expand Down Expand Up @@ -262,7 +262,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>

ORCHESTRATOR_METRICS.block_gauge.record(job.internal_id.parse::<f64>().unwrap(), &attributes);

tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block: {:?}", internal_id);
tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block");
Ok(())
}

Expand All @@ -281,7 +281,7 @@ pub async fn process_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError>
pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let mut job = get_job(id, config.clone()).await?;
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job started for block: {:?}", internal_id.clone());
tracing::info!(log_type = "starting", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job started for block");

tracing::Span::current().record("job", format!("{:?}", job.clone()));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type.clone()));
Expand Down Expand Up @@ -396,7 +396,7 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {

ORCHESTRATOR_METRICS.block_gauge.record(job.internal_id.parse::<f64>().unwrap(), &attributes);

tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block: {:?}", internal_id);
tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block");
Ok(())
}

Expand All @@ -406,13 +406,13 @@ pub async fn verify_job(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), JobError> {
let job = get_job(id, config.clone()).await?.clone();
let internal_id = job.internal_id.clone();
tracing::info!(log_type = "starting", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure started for block: {:?}", internal_id);
tracing::info!(log_type = "starting", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure started for block");
let mut metadata = job.metadata.clone();

tracing::Span::current().record("job_status", format!("{:?}", job.status));
tracing::Span::current().record("job_type", format!("{:?}", job.job_type));

tracing::debug!(job_id = ?id, job_status = ?job.status, job_type = ?job.job_type, "Job details for failure handling for block: {:?}", internal_id);
tracing::debug!(job_id = ?id, job_status = ?job.status, job_type = ?job.job_type, block_no = %internal_id, "Job details for failure handling for block");

if job.status == JobStatus::Completed {
tracing::error!(job_id = ?id, job_status = ?job.status, "Invalid state exists on DL queue");
Expand All @@ -434,11 +434,11 @@ pub async fn handle_job_failure(id: Uuid, config: Arc<Config>) -> Result<(), Job
.await
{
Ok(_) => {
tracing::info!(log_type = "completed", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure completed for block: {:?}", internal_id);
tracing::info!(log_type = "completed", category = "general", function_type = "handle_job_failure", block_no = %internal_id, "General handle job failure completed for block");
Ok(())
}
Err(e) => {
tracing::error!(log_type = "error", category = "general", function_type = "handle_job_failure", block_no = %internal_id, error = %e, "General handle job failure failed for block: {:?}", internal_id);
tracing::error!(log_type = "error", category = "general", function_type = "handle_job_failure", block_no = %internal_id, error = %e, "General handle job failure failed for block");
Err(JobError::Other(OtherError(e)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/proving_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Job for ProvingJob {
ProvingError::CairoPIEFileFetchFailed(e.to_string())
})?;

tracing::trace!(job_id = %job.internal_id, "Parsing Cairo PIE file");
tracing::debug!(job_id = %job.internal_id, "Parsing Cairo PIE file");
let cairo_pie = CairoPie::from_bytes(cairo_pie_file.to_vec().as_slice()).map_err(|e| {
tracing::error!(job_id = %job.internal_id, error = %e, "Failed to parse Cairo PIE file");
ProvingError::CairoPIENotReadable(e.to_string())
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/jobs/register_proof_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Job for RegisterProofJob {
external_id: String::new().into(),
// metadata must contain the blocks that have been included inside this proof
// this will allow state update jobs to be created for each block
metadata: metadata.clone(),
metadata,
version: 0,
created_at: Utc::now().round_subsecs(0),
updated_at: Utc::now().round_subsecs(0),
Expand Down
71 changes: 60 additions & 11 deletions crates/orchestrator/src/jobs/snos_job/fact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,85 @@ pub struct FactInfo {
}

pub fn get_fact_info(cairo_pie: &CairoPie, program_hash: Option<Felt>) -> Result<FactInfo, FactError> {
tracing::info!("[FactInfo] Starting get_fact_info function");

tracing::debug!("[FactInfo] Getting program output");
tracing::info!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Starting get_fact_info function"
);

tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Getting program output"
);
let program_output = get_program_output(cairo_pie)?;
tracing::trace!("[FactInfo] Program output length: {}", program_output.len());

tracing::debug!("[FactInfo] Getting fact topology");
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Program output length: {}",
program_output.len()
);

tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Getting fact topology"
);
let fact_topology = get_fact_topology(cairo_pie, program_output.len())?;

let program_hash = match program_hash {
Some(hash) => {
tracing::debug!("[FactInfo] Using provided program hash");
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Using provided program hash"
);
hash
}
None => {
tracing::debug!("[FactInfo] Computing program hash");
tracing::debug!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Computing program hash"
);
Felt::from_bytes_be(
&compute_program_hash_chain(&cairo_pie.metadata.program, BOOTLOADER_VERSION)
.map_err(|e| {
tracing::error!("[FactInfo] Failed to compute program hash: {}", e);
tracing::error!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Failed to compute program hash: {}",
e
);
FactError::ProgramHashCompute(e.to_string())
})?
.to_bytes_be(),
)
}
};
tracing::trace!("[FactInfo] Program hash: {:?} and now generating merkle root", program_hash);
tracing::trace!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Program hash: {:?} and now generating merkle root",
program_hash
);
let output_root = generate_merkle_root(&program_output, &fact_topology)?;
let fact = keccak256([program_hash.to_bytes_be(), *output_root.node_hash].concat());
tracing::info!("[FactInfo] Fact computed successfully: {:?}", fact);
tracing::info!(
log_type = "FactInfo",
category = "fact_info",
function_type = "get_fact_info",
"Fact computed successfully: {:?}",
fact
);

Ok(FactInfo { program_output, fact_topology, fact })
}
Expand Down
64 changes: 56 additions & 8 deletions crates/orchestrator/src/jobs/snos_job/fact_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,71 @@ pub struct FactNode {
/// Basically it transforms the flat fact topology into a non-binary Merkle tree and then computes
/// its root, enriching the nodes with metadata such as page sizes and hashes.
pub fn generate_merkle_root(program_output: &[Felt252], fact_topology: &FactTopology) -> Result<FactNode, FactError> {
tracing::info!("[FactNode] Starting generate_merkle_root function");
tracing::info!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Starting generate_merkle_root function"
);
let FactTopology { tree_structure, mut page_sizes } = fact_topology.clone();

let mut end_offset: usize = 0;
let mut node_stack: Vec<FactNode> = Vec::with_capacity(page_sizes.len());
let mut output_iter = program_output.iter();

tracing::debug!("[FactNode] Processing tree structure: {:?}", tree_structure.len());
tracing::debug!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Processing tree structure: {:?}",
tree_structure.len()
);
for (n_pages, n_nodes) in tree_structure.into_iter().tuples() {
tracing::trace!("[FactNode] Processing (n_pages: {}, n_nodes: {})", n_pages, n_nodes);
tracing::trace!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"(n_pages: {}, n_nodes: {})",
n_pages,
n_nodes
);
ensure!(n_pages <= page_sizes.len(), FactError::TreeStructurePagesCountOutOfRange(n_pages, page_sizes.len()));

// Push n_pages (leaves) to the stack
for _ in 0..n_pages {
let page_size = page_sizes.remove(0);
tracing::trace!("[FactNode] Processing page with size: {}", page_size);
tracing::trace!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Processing page with size: {}",
page_size
);
// Page size is already validated upon retrieving the topology
let page = output_iter.by_ref().take(page_size).map(|felt| felt.to_bytes_be().to_vec()).concat();
let node_hash = keccak256(&page);
end_offset += page_size;
// Add lead node (no children)
node_stack.push(FactNode { node_hash, end_offset, page_size, children: vec![] });
tracing::debug!("[FactNode] Added leaf node with hash: {:?}", node_hash.to_string());
tracing::debug!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Added leaf node with hash: {:?}",
node_hash.to_string()
);
}

ensure!(n_nodes <= node_stack.len(), FactError::TreeStructureNodesCountOutOfRange(n_nodes, node_stack.len()));

if n_nodes > 0 {
tracing::trace!("[FactNode] Creating parent node for {} children", n_nodes);
tracing::trace!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Creating parent node for {} children",
n_nodes
);
// Create a parent node to the last n_nodes in the head of the stack.
let children: Vec<FactNode> = node_stack.drain(node_stack.len() - n_nodes..).collect();
let mut node_data = Vec::with_capacity(2 * 32 * children.len());
Expand All @@ -100,7 +136,13 @@ pub fn generate_merkle_root(program_output: &[Felt252], fact_topology: &FactTopo
children,
};
node_stack.push(parent_node.clone());
tracing::debug!("[FactNode] Added parent node with hash: {:?}", parent_node.node_hash);
tracing::debug!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Added parent node with hash: {:?}",
parent_node.node_hash
);
}
}

Expand All @@ -116,7 +158,13 @@ pub fn generate_merkle_root(program_output: &[Felt252], fact_topology: &FactTopo
);

let root = node_stack.remove(0);
tracing::info!("[FactNode] Successfully generated Merkle root with hash: {:?}", root.node_hash);
tracing::info!(
log_type = "FactNode",
category = "generate_merkle_root",
function_type = "generate_merkle_root",
"Successfully generated Merkle root with hash: {:?}",
root.node_hash
);
Ok(root)
}

Expand Down
Loading

0 comments on commit 68ad2eb

Please sign in to comment.