From 67f8f030a5c472ae111a38102eee10225a31d0d1 Mon Sep 17 00:00:00 2001 From: Nathaniel Bajo Date: Mon, 27 Oct 2025 14:50:32 +0100 Subject: [PATCH 1/4] fix ci conflicts --- miner/src/builder.rs | 5 +- .../parachain_interactor/behavior_control.rs | 152 ++++++++++++++++++ .../parachain_interactor/event_processor.rs | 6 +- .../src/parachain_interactor/registration.rs | 36 ++--- miner/src/traits.rs | 19 +++ miner/src/types.rs | 14 +- miner/src/utils/task_handling.rs | 20 ++- 7 files changed, 215 insertions(+), 37 deletions(-) diff --git a/miner/src/builder.rs b/miner/src/builder.rs index 88ffb20..98a64bd 100644 --- a/miner/src/builder.rs +++ b/miner/src/builder.rs @@ -70,7 +70,10 @@ impl MinerBuilderStage2 { parent_runtime: Arc::new(RwLock::new(ParentRuntime { port: None })), keypair, identity: Arc::new(miner_identity), - current_task: Arc::new(RwLock::new(None)) + current_task: Arc::new(RwLock::new(None)), + last_operational_status: Arc::new(RwLock::new( + crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus::Available + )), })) } } diff --git a/miner/src/parachain_interactor/behavior_control.rs b/miner/src/parachain_interactor/behavior_control.rs index 09a6125..051e07b 100644 --- a/miner/src/parachain_interactor/behavior_control.rs +++ b/miner/src/parachain_interactor/behavior_control.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::error::Result; use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus; use crate::types::Miner; @@ -40,3 +42,153 @@ pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> { Ok(()) } + +/// Updates the operational status on the parachain (non-blocking) +pub async fn update_operational_status(miner: Arc, status: OperationalStatus) -> Result<()> { + let current_status = miner.get_operational_status().await; + + // Only update if status has changed + if matches!( + (¤t_status, &status), + (OperationalStatus::Available, OperationalStatus::Available) + | (OperationalStatus::Busy, OperationalStatus::Busy) + | (OperationalStatus::Suspended, OperationalStatus::Suspended) + ) { + return Ok(()); + } + + // Clone the status before moving into the async block + let status_for_spawn = status.clone(); + let keypair = Arc::clone(&miner.keypair); + let miner_type = Arc::clone(&miner.miner_type); + let miner_id = miner.identity.miner_id.1; + + tokio::spawn(async move { + if let Err(e) = async { + let tx_queue = global_config::get_tx_queue()?; + + // Clone status for the closure + let status_for_closure = status_for_spawn.clone(); + let rx = tx_queue.enqueue(move || { + let keypair = Arc::clone(&keypair); + let miner_type = Arc::clone(&miner_type); + let status_inner = status_for_closure.clone(); + + async move { + let client = global_config::get_parachain_client()?; + + // Print the status before moving it + println!("Updating operational status to: {:?}", status_inner); + + let tx = substrate_interface::api::tx() + .edge_connect() + .update_operational_status( + miner_type.as_ref().clone(), + miner_id, + status_inner, + ); + + println!("Transaction Details:"); + println!("Module: {:?}", tx.pallet_name()); + println!("Call: {:?}", tx.call_name()); + println!("Parameters: {:?}", tx.call_data()); + + let tx_submission = client + .tx() + .sign_and_submit_then_watch_default(&tx, keypair.as_ref()) + .await + .map(|e| { + println!("Operational status update submitted, waiting for transaction to be finalized..."); + e + })? + .wait_for_finalized_success() + .await; + + match tx_submission { + Ok(e) => { + let tx_event = e.find_first::< + substrate_interface::api::edge_connect::events::OperationalStatusUpdated, + >()?; + + if let Some(event) = tx_event { + println!("Operational status updated successfully: {event:?}"); + } else { + println!("No operational status update event found!"); + } + } + Err(e) => { + // Check for acceptable errors + check_for_acceptable_operational_status_error(e)?; + println!("Operational status update completed (acceptable error)"); + } + } + + Ok(crate::utils::tx_queue::TxOutput::Success) + } + }) + .await?; + + match rx.await { + Ok(Ok(crate::utils::tx_queue::TxOutput::Success)) => { + println!("Operational status updated successfully"); + } + Ok(Ok(crate::utils::tx_queue::TxOutput::RegistrationInfo(_))) => { + // This should never happen for operational status updates, but handle it + println!("Unexpected RegistrationInfo in operational status update response"); + } + Ok(Err(e)) => { + println!("Error updating operational status: {}", e); + } + Err(_) => { + println!("Response channel dropped for operational status update"); + } + } + + Ok::<_, crate::error::Error>(()) + } + .await + { + println!("Error in operational status update task: {}", e); + } + }); + + // Update local status cache + miner.set_operational_status(status).await; + + Ok(()) +} + +/// Check for acceptable errors when updating operational status +fn check_for_acceptable_operational_status_error(e: subxt::Error) -> Result<()> { + match e { + subxt::Error::Runtime(err) => { + match err { + subxt::error::DispatchError::Module(returned_error) => { + let returned_error_details = returned_error + .details() + .map_err(|err| crate::error::Error::Custom(err.to_string()))?; + + let returned_error_string = returned_error_details.variant.name.to_string(); + + // Acceptable errors for operational status updates + let acceptable_errors = [ + "MinerDoesNotExist", // Miner might have been removed + "NotAuthorized", // Permission issues (temporary) + "MinerSuspended", // Miner is suspended (temporary state) + ]; + + for acceptable_error in &acceptable_errors { + if returned_error_string == *acceptable_error { + return Ok(()); + } + } + + // If not an acceptable error, propagate it + return Err(crate::error::Error::Custom(returned_error.to_string())); + } + _ => return Err(crate::error::Error::Custom(err.to_string())), + }; + } + _ => return Err(e.into()), + } +} \ No newline at end of file diff --git a/miner/src/parachain_interactor/event_processor.rs b/miner/src/parachain_interactor/event_processor.rs index 24d6234..7536da2 100644 --- a/miner/src/parachain_interactor/event_processor.rs +++ b/miner/src/parachain_interactor/event_processor.rs @@ -13,8 +13,6 @@ use std::fs; use std::sync::Arc; use subxt::{events::EventDetails, PolkadotConfig}; -use super::registration::update_operational_status; - pub async fn process_event(miner: Arc, event: &EventDetails) -> Result<()> { // Extract the task_id before matching to avoid having to clone miner let current_task_id = { @@ -110,7 +108,9 @@ pub async fn process_event(miner: Arc, event: &EventDetails, miner_type: Arc Ok(identity) } -// Add new function to update operational status -pub async fn update_operational_status(miner: Arc, status: OperationalStatus) -> Result<()> { - let client = global_config::get_parachain_client()?; - - println!("Updating operational status to: {:?}", status); - - let tx = substrate_interface::api::tx() - .edge_connect() - .update_operational_status( - miner.miner_type.as_ref().clone(), - miner.identity.miner_id.1, - status - ); - - let _ = client - .tx() - .sign_and_submit_then_watch_default(&tx, miner.keypair.as_ref()) - .await? - .wait_for_finalized_success() - .await?; - - println!("Operational status updated successfully"); - Ok(()) -} - pub async fn start_miner(miner: Arc) -> Result<()> { println!("Starting miner..."); - // Set operational status to Available when starting - update_operational_status(Arc::clone(&miner), OperationalStatus::Available).await?; + // Initialize operational status to Available + miner + .set_operational_status(OperationalStatus::Available) + .await; + + // Set operational status to Available on parachain when starting + miner + .update_operational_status(OperationalStatus::Available) + .await?; println!("Waiting for tasks..."); diff --git a/miner/src/traits.rs b/miner/src/traits.rs index f20f869..bddfd18 100644 --- a/miner/src/traits.rs +++ b/miner/src/traits.rs @@ -104,6 +104,18 @@ pub trait ParachainInteractor { /// # Returns /// A `Result` indicating `Ok(())` if the miner is successfully suspended, or an `Error` if it fails. async fn suspend_miner(&self) -> Result<()>; + + /// Updates the operational status on the parachain (non-blocking) + /// + /// # Arguments + /// * `status` - The new operational status to set + /// + /// # Returns + /// A `Result` indicating `Ok(())` if successful, or an `Error` if it fails. + async fn update_operational_status( + &self, + status: crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus, + ) -> Result<()>; } /// Implementation of `ParachainInteractor` trait for `Miner`. @@ -132,4 +144,11 @@ impl ParachainInteractor for Arc { async fn suspend_miner(&self) -> Result<()> { behavior_control::_miner_self_suspend(self).await } + + async fn update_operational_status( + &self, + status: crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus, + ) -> Result<()> { + behavior_control::update_operational_status(Arc::clone(self), status).await + } } diff --git a/miner/src/types.rs b/miner/src/types.rs index ab327a3..cc97007 100644 --- a/miner/src/types.rs +++ b/miner/src/types.rs @@ -4,7 +4,7 @@ use subxt::utils::AccountId32; use subxt_signer::sr25519::Keypair; use tokio::sync::RwLock; -use crate::{error::Result, substrate_interface::api::runtime_types::cyborg_primitives::{miner::MinerType, task::TaskKind}}; +use crate::{error::Result, substrate_interface::api::runtime_types::cyborg_primitives::{miner::{MinerType, OperationalStatus}, task::TaskKind}}; #[derive(Deserialize, Serialize, Debug)] pub struct MinerIdentity { @@ -51,6 +51,8 @@ pub struct Miner { pub parent_runtime: Arc>, pub identity: Arc, pub current_task: Arc>>, + // Track the last operational status to avoid unnecessary transactions + pub last_operational_status: Arc>, } impl Miner { @@ -78,4 +80,14 @@ impl Miner { pub async fn deactivate_task(&self) -> Option { self.current_task.write().await.take() } + + /// Gets the current operational status + pub async fn get_operational_status(&self) -> OperationalStatus { + self.last_operational_status.read().await.clone() + } + + /// Sets the operational status (non-blocking) + pub async fn set_operational_status(&self, status: OperationalStatus) { + *self.last_operational_status.write().await = status; + } } \ No newline at end of file diff --git a/miner/src/utils/task_handling.rs b/miner/src/utils/task_handling.rs index a44459a..812f268 100644 --- a/miner/src/utils/task_handling.rs +++ b/miner/src/utils/task_handling.rs @@ -27,8 +27,7 @@ use serde::Serialize; use std::{fs, sync::Arc}; use subxt::utils::AccountId32; use tokio::{sync::RwLock, task::JoinHandle}; - -use crate::parachain_interactor::registration::update_operational_status; +use crate::traits::ParachainInteractor; #[derive(Serialize)] struct TaskOwner { @@ -168,6 +167,11 @@ pub async fn pick_up_task(miner: Arc) -> Result { nuke_all_running_task_containers().await?; + // Set operational status to Available when no task is active + miner + .update_operational_status(OperationalStatus::Available) + .await?; + Ok(TaskPickupReturnType::Failure(())) } // Task is assinged but not yet confirmed by the miner, which is necessary so that the parachain won't suspend the miner @@ -175,7 +179,9 @@ pub async fn pick_up_task(miner: Arc) -> Result { println!("Assigned task found, confirming task reception..."); // Update operational status to Busy when picking up a task - update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?; + miner + .update_operational_status(OperationalStatus::Busy) + .await?; let task = CurrentTask { task_type: task.task_kind, @@ -200,7 +206,9 @@ pub async fn pick_up_task(miner: Arc) -> Result { // Task should already be running and only needs to be picked back up TaskStatusType::Running => { // Update operational status to Busy when picking up a running task - update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?; + miner + .update_operational_status(OperationalStatus::Busy) + .await?; let task = CurrentTask { task_type: task.task_kind, @@ -324,7 +332,9 @@ pub async fn clean_up_current_task_and_vacate(miner: Arc) -> Result<()> { nuke_all_running_task_containers().await?; // Update operational status back to Available after task completion - update_operational_status(Arc::clone(&miner), OperationalStatus::Available).await?; + miner + .update_operational_status(OperationalStatus::Available) + .await?; let keypair = Arc::clone(&miner.keypair); let current_task_id = miner.current_task().await?.read().await.id; From fcd030170fa176c43f57c8c061877f7368fe6685 Mon Sep 17 00:00:00 2001 From: Nathaniel Bajo Date: Mon, 27 Oct 2025 15:23:00 +0100 Subject: [PATCH 2/4] nit --- .../parachain_interactor/behavior_control.rs | 132 +----------------- miner/src/utils/tx_builder.rs | 108 +++++++++++++- 2 files changed, 114 insertions(+), 126 deletions(-) diff --git a/miner/src/parachain_interactor/behavior_control.rs b/miner/src/parachain_interactor/behavior_control.rs index 051e07b..9ed45fa 100644 --- a/miner/src/parachain_interactor/behavior_control.rs +++ b/miner/src/parachain_interactor/behavior_control.rs @@ -4,6 +4,7 @@ use crate::error::Result; use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus; use crate::types::Miner; use crate::{global_config, substrate_interface}; +use crate::utils::tx_builder::pub_update_operational_status; pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> { let client = global_config::get_parachain_client()?; @@ -57,138 +58,19 @@ pub async fn update_operational_status(miner: Arc, status: OperationalSta return Ok(()); } - // Clone the status before moving into the async block - let status_for_spawn = status.clone(); + // Update local status cache first + miner.set_operational_status(status.clone()).await; + + // Use the tx_builder for the actual transaction (non-blocking) let keypair = Arc::clone(&miner.keypair); let miner_type = Arc::clone(&miner.miner_type); let miner_id = miner.identity.miner_id.1; tokio::spawn(async move { - if let Err(e) = async { - let tx_queue = global_config::get_tx_queue()?; - - // Clone status for the closure - let status_for_closure = status_for_spawn.clone(); - let rx = tx_queue.enqueue(move || { - let keypair = Arc::clone(&keypair); - let miner_type = Arc::clone(&miner_type); - let status_inner = status_for_closure.clone(); - - async move { - let client = global_config::get_parachain_client()?; - - // Print the status before moving it - println!("Updating operational status to: {:?}", status_inner); - - let tx = substrate_interface::api::tx() - .edge_connect() - .update_operational_status( - miner_type.as_ref().clone(), - miner_id, - status_inner, - ); - - println!("Transaction Details:"); - println!("Module: {:?}", tx.pallet_name()); - println!("Call: {:?}", tx.call_name()); - println!("Parameters: {:?}", tx.call_data()); - - let tx_submission = client - .tx() - .sign_and_submit_then_watch_default(&tx, keypair.as_ref()) - .await - .map(|e| { - println!("Operational status update submitted, waiting for transaction to be finalized..."); - e - })? - .wait_for_finalized_success() - .await; - - match tx_submission { - Ok(e) => { - let tx_event = e.find_first::< - substrate_interface::api::edge_connect::events::OperationalStatusUpdated, - >()?; - - if let Some(event) = tx_event { - println!("Operational status updated successfully: {event:?}"); - } else { - println!("No operational status update event found!"); - } - } - Err(e) => { - // Check for acceptable errors - check_for_acceptable_operational_status_error(e)?; - println!("Operational status update completed (acceptable error)"); - } - } - - Ok(crate::utils::tx_queue::TxOutput::Success) - } - }) - .await?; - - match rx.await { - Ok(Ok(crate::utils::tx_queue::TxOutput::Success)) => { - println!("Operational status updated successfully"); - } - Ok(Ok(crate::utils::tx_queue::TxOutput::RegistrationInfo(_))) => { - // This should never happen for operational status updates, but handle it - println!("Unexpected RegistrationInfo in operational status update response"); - } - Ok(Err(e)) => { - println!("Error updating operational status: {}", e); - } - Err(_) => { - println!("Response channel dropped for operational status update"); - } - } - - Ok::<_, crate::error::Error>(()) - } - .await - { - println!("Error in operational status update task: {}", e); + if let Err(e) = pub_update_operational_status(keypair, miner_type, miner_id, status).await { + println!("Error updating operational status: {}", e); } }); - // Update local status cache - miner.set_operational_status(status).await; - Ok(()) -} - -/// Check for acceptable errors when updating operational status -fn check_for_acceptable_operational_status_error(e: subxt::Error) -> Result<()> { - match e { - subxt::Error::Runtime(err) => { - match err { - subxt::error::DispatchError::Module(returned_error) => { - let returned_error_details = returned_error - .details() - .map_err(|err| crate::error::Error::Custom(err.to_string()))?; - - let returned_error_string = returned_error_details.variant.name.to_string(); - - // Acceptable errors for operational status updates - let acceptable_errors = [ - "MinerDoesNotExist", // Miner might have been removed - "NotAuthorized", // Permission issues (temporary) - "MinerSuspended", // Miner is suspended (temporary state) - ]; - - for acceptable_error in &acceptable_errors { - if returned_error_string == *acceptable_error { - return Ok(()); - } - } - - // If not an acceptable error, propagate it - return Err(crate::error::Error::Custom(returned_error.to_string())); - } - _ => return Err(crate::error::Error::Custom(err.to_string())), - }; - } - _ => return Err(e.into()), - } } \ No newline at end of file diff --git a/miner/src/utils/tx_builder.rs b/miner/src/utils/tx_builder.rs index d0a78d9..87473d2 100644 --- a/miner/src/utils/tx_builder.rs +++ b/miner/src/utils/tx_builder.rs @@ -14,7 +14,7 @@ use substrate_interface::api::neuro_zk::{Error as NzkError}; use substrate_interface::api::edge_connect::{Error as EdgeConnectError}; use substrate_interface::api::task_management::{Error as TaskManagementError}; use crate::error::Result; -use crate::substrate_interface::{self, api::runtime_types::cyborg_primitives::miner::MinerType}; +use crate::substrate_interface::{self, api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus}}; /// Registers the miner on the blockchain. /// @@ -113,6 +113,112 @@ pub async fn pub_register(keypair: Arc, miner_type: Arc) -> } } +/// Updates the operational status on the parachain +/// +/// # Arguments +/// * `keypair` - The miner's keypair +/// * `miner_type` - The type of miner +/// * `miner_id` - The miner's ID +/// * `status` - The new operational status +/// +/// # Returns +/// A `Result` indicating `Ok(())` if successful, or an `Error` if it fails. +async fn update_operational_status( + keypair: Arc, + miner_type: Arc, + miner_id: u64, + status: OperationalStatus, +) -> Result<()> { + let client = global_config::get_parachain_client()?; + + let tx = substrate_interface::api::tx() + .edge_connect() + .update_operational_status( + miner_type.as_ref().clone(), + miner_id, + status, + ); + + println!("Transaction Details:"); + println!("Module: {:?}", tx.pallet_name()); + println!("Call: {:?}", tx.call_name()); + println!("Parameters: {:?}", tx.call_data()); + + let tx_submission = client + .tx() + .sign_and_submit_then_watch_default(&tx, keypair.as_ref()) + .await + .map(|e| { + println!("Operational status update submitted, waiting for transaction to be finalized..."); + e + })? + .wait_for_finalized_success() + .await; + + match tx_submission { + Ok(e) => { + let tx_event = e.find_first::< + substrate_interface::api::edge_connect::events::OperationalStatusUpdated, + >()?; + + if let Some(event) = tx_event { + println!("Operational status updated successfully: {event:?}"); + } else { + println!("No operational status update event found!"); + } + } + Err(e) => { + // Check for acceptable errors + check_for_acceptable_error(&[ + EdgeConnectError::MinerDoesNotExist, + EdgeConnectError::NotAuthorized, + EdgeConnectError::MinerSuspended, + ], e)?; + println!("Operational status update completed (acceptable error)"); + } + } + + Ok(()) +} + +/// Public interface for updating operational status (non-blocking with queue) +pub async fn pub_update_operational_status( + keypair: Arc, + miner_type: Arc, + miner_id: u64, + status: OperationalStatus, +) -> Result<()> { + let tx_queue = global_config::get_tx_queue()?; + + let rx = tx_queue.enqueue(move || { + let keypair = Arc::clone(&keypair); + let miner_type = Arc::clone(&miner_type); + let status_inner = status.clone(); + async move { + let _ = update_operational_status(keypair, miner_type, miner_id, status_inner).await?; + Ok(TxOutput::Success) + } + }) + .await?; + + match rx.await { + Ok(Ok(TxOutput::Success)) => { + println!("Operational status updated successfully"); + } + Ok(Err(e)) => { + println!("Error updating operational status: {}", e); + } + Err(_) => { + println!("Response channel dropped for operational status update"); + } + _ => { + println!("Unexpected response for operational status update"); + } + } + + Ok(()) +} + /// Submits a zkml (Zero Knowledge Machine Learning) proof to the blockchain. /// /// # Arguments From 399194f5a3714702e4f32965f7436e6214e1233a Mon Sep 17 00:00:00 2001 From: Nathaniel Bajo Date: Tue, 28 Oct 2025 06:47:32 +0100 Subject: [PATCH 3/4] make it more readable --- miner/src/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/src/builder.rs b/miner/src/builder.rs index 98a64bd..93f9e25 100644 --- a/miner/src/builder.rs +++ b/miner/src/builder.rs @@ -1,7 +1,7 @@ use crate::{ error::{Error, Result}, parachain_interactor::registration::retrieve_identity, - substrate_interface::api::runtime_types::cyborg_primitives::miner::MinerType, + substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus}, types::{Miner, ParentRuntime} }; use std::{str::FromStr, sync::Arc}; @@ -72,7 +72,7 @@ impl MinerBuilderStage2 { identity: Arc::new(miner_identity), current_task: Arc::new(RwLock::new(None)), last_operational_status: Arc::new(RwLock::new( - crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus::Available + OperationalStatus::Available )), })) } From 457cc1ad0f2814c2865b8ea3008ecddf4944c937 Mon Sep 17 00:00:00 2001 From: Nathaniel Bajo Date: Sun, 2 Nov 2025 02:00:49 +0100 Subject: [PATCH 4/4] nit --- miner/src/builder.rs | 3 - .../parachain_interactor/behavior_control.rs | 38 +++++---- .../parachain_interactor/event_processor.rs | 5 -- .../src/parachain_interactor/registration.rs | 82 +++++++++++-------- miner/src/types.rs | 13 --- miner/src/utils/substrate_queries.rs | 27 +++++- miner/src/utils/task_handling.rs | 27 ++++-- miner/src/utils/tx_builder.rs | 58 ++++++------- 8 files changed, 146 insertions(+), 107 deletions(-) diff --git a/miner/src/builder.rs b/miner/src/builder.rs index 03094e9..c967712 100644 --- a/miner/src/builder.rs +++ b/miner/src/builder.rs @@ -74,9 +74,6 @@ impl MinerBuilderStage2 { keypair, identity: Arc::new(miner_identity), current_task: Arc::new(RwLock::new(None)), - last_operational_status: Arc::new(RwLock::new( - OperationalStatus::Available - )), })) } } diff --git a/miner/src/parachain_interactor/behavior_control.rs b/miner/src/parachain_interactor/behavior_control.rs index 398bf91..c6d3e85 100644 --- a/miner/src/parachain_interactor/behavior_control.rs +++ b/miner/src/parachain_interactor/behavior_control.rs @@ -4,9 +4,9 @@ use crate::error::Result; use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus; use crate::types::Miner; - -use crate::{global_config, substrate_interface}; +use crate::utils::substrate_queries::get_miner_operational_status; use crate::utils::tx_builder::pub_update_operational_status; +use crate::{global_config, substrate_interface}; pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> { let client = global_config::get_parachain_client()?; @@ -53,25 +53,35 @@ pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> { /// Updates the operational status on the parachain (non-blocking) pub async fn update_operational_status(miner: Arc, status: OperationalStatus) -> Result<()> { - let current_status = miner.get_operational_status().await; + // Query current status from chainstate + let client = global_config::get_parachain_client()?; + let current_status = get_miner_operational_status( + &client, + &miner.identity.miner_id, + miner.miner_type.as_ref(), + ) + .await + .unwrap_or(None); // Default to None if query fails // Only update if status has changed - if matches!( - (¤t_status, &status), - (OperationalStatus::Available, OperationalStatus::Available) - | (OperationalStatus::Busy, OperationalStatus::Busy) - | (OperationalStatus::Suspended, OperationalStatus::Suspended) - ) { + let should_update = match current_status { + Some(current) => !matches!( + (¤t, &status), + (OperationalStatus::Available, OperationalStatus::Available) + | (OperationalStatus::Busy, OperationalStatus::Busy) + | (OperationalStatus::Suspended, OperationalStatus::Suspended) + ), + None => true, // If we can't get current status, update anyway + }; + + if !should_update { return Ok(()); } - // Update local status cache first - miner.set_operational_status(status.clone()).await; - // Use the tx_builder for the actual transaction (non-blocking) let keypair = Arc::clone(&miner.keypair); let miner_type = Arc::clone(&miner.miner_type); - let miner_id = miner.identity.miner_id.1; + let miner_id = miner.identity.miner_id.0.clone(); tokio::spawn(async move { if let Err(e) = pub_update_operational_status(keypair, miner_type, miner_id, status).await { @@ -80,4 +90,4 @@ pub async fn update_operational_status(miner: Arc, status: OperationalSta }); Ok(()) -} \ No newline at end of file +} diff --git a/miner/src/parachain_interactor/event_processor.rs b/miner/src/parachain_interactor/event_processor.rs index 731db42..efa6b3e 100644 --- a/miner/src/parachain_interactor/event_processor.rs +++ b/miner/src/parachain_interactor/event_processor.rs @@ -107,11 +107,6 @@ pub async fn process_event(miner: Arc, event: &EventDetails>> = Lazy::new(|| Mutex::new(None)); @@ -37,9 +40,8 @@ async fn confirm_registration() -> Result { println!("identity: {:?}", miner_id); - let miner_id_bounded = BoundedVec(miner_id.clone()); - // Since there seems to be a bug in subxt that should have been resolved (and we possibly won't have a separate storage map for querying workers by id) + // Since there seems to be a bug in subxt that should have been resolved (and we possibly won't have a separate storage map for querying workers by id) let miner_registration_confirmation_query = match miner_type { MinerType::Cloud => substrate_interface::api::storage() .edge_connect() @@ -109,43 +111,49 @@ pub async fn retrieve_identity( Ok(identity) } -// Add new function to update operational status -pub async fn update_operational_status(miner: Arc, status: OperationalStatus) -> Result<()> { +/// Check the miner's current status from the parachain and update accordingly +async fn check_and_update_miner_status(miner: Arc) -> Result<()> { let client = global_config::get_parachain_client()?; - println!("Updating operational status to: {:?}", status); - - let tx = substrate_interface::api::tx() - .edge_connect() - .update_operational_status( - miner.miner_type.as_ref().clone(), - miner.identity.miner_id.clone(), - status, - ); - - let _ = client - .tx() - .sign_and_submit_then_watch_default(&tx, miner.keypair.as_ref()) - .await? - .wait_for_finalized_success() - .await?; + // Query operational status from chain + let operational_status = get_miner_operational_status( + &client, + &miner.identity.miner_id, + &miner.miner_type.as_ref().clone(), + ) + .await?; + + println!( + "Miner status from parachain - Operational: {:?}", + operational_status + ); + + // If we have a task but the operational status is Available, update to Busy + if miner.current_task.read().await.is_some() { + if let Some(OperationalStatus::Available) = operational_status { + println!("Miner has task but operational status is Available, updating to Busy"); + miner + .update_operational_status(OperationalStatus::Busy) + .await?; + } + } else { + // If we don't have a task but operational status is Busy, update to Available + if let Some(OperationalStatus::Busy) = operational_status { + println!("Miner has no task but operational status is Busy, updating to Available"); + miner + .update_operational_status(OperationalStatus::Available) + .await?; + } + } - println!("Operational status updated successfully"); Ok(()) } pub async fn start_miner(miner: Arc) -> Result<()> { println!("Starting miner..."); - // Initialize operational status to Available - miner - .set_operational_status(OperationalStatus::Available) - .await; - - // Set operational status to Available on parachain when starting - miner - .update_operational_status(OperationalStatus::Available) - .await?; + // Check current status from parachain and update accordingly + check_and_update_miner_status(Arc::clone(&miner)).await?; println!("Waiting for tasks..."); @@ -169,7 +177,9 @@ pub async fn start_miner(miner: Arc) -> Result<()> { let now = Instant::now(); // Check if 6 hours have passed since the last update attempt - if last_check.map_or(true, |t| now.duration_since(t) > Duration::from_secs(24 * 3600)) { + if last_check.map_or(true, |t| { + now.duration_since(t) > Duration::from_secs(24 * 3600) + }) { println!("Miner doesn't have an active task, trying to apply update!"); if let Err(e) = try_apply_update_if_available() { println!("Update check failed: {:?}", e); diff --git a/miner/src/types.rs b/miner/src/types.rs index 653616a..e183d5a 100644 --- a/miner/src/types.rs +++ b/miner/src/types.rs @@ -53,8 +53,6 @@ pub struct Miner { pub parent_runtime: Arc>, pub identity: Arc, pub current_task: Arc>>, - // Track the last operational status to avoid unnecessary transactions - pub last_operational_status: Arc>, } impl Miner { @@ -82,15 +80,4 @@ impl Miner { pub async fn deactivate_task(&self) -> Option { self.current_task.write().await.take() } - - /// Gets the current operational status - pub async fn get_operational_status(&self) -> OperationalStatus { - self.last_operational_status.read().await.clone() - } - - /// Sets the operational status (non-blocking) - pub async fn set_operational_status(&self, status: OperationalStatus) { - *self.last_operational_status.write().await = status; - } -} } diff --git a/miner/src/utils/substrate_queries.rs b/miner/src/utils/substrate_queries.rs index 7fcb704..7f6bf82 100644 --- a/miner/src/utils/substrate_queries.rs +++ b/miner/src/utils/substrate_queries.rs @@ -1,6 +1,6 @@ use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId; use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; -use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::MinerType; +use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus}; use crate::substrate_interface::api::runtime_types::cyborg_primitives::task::TaskInfo; use crate::types::MinerIdentity; use crate::{error::Result, substrate_interface}; @@ -179,3 +179,28 @@ let miner_info = found_miner.ok_or_else(|| "Miner not found for given ID")?; miner_type, }) } + +/// Query the current operational status of a miner from the parachain +pub async fn get_miner_operational_status( + client: &OnlineClient, + miner_id: &BoundedVec, + miner_type: &MinerType, +) -> Result> { + let miner_query = match miner_type { + MinerType::Cloud => substrate_interface::api::storage() + .edge_connect() + .cloud_miners(miner_id.clone()), + MinerType::Edge => substrate_interface::api::storage() + .edge_connect() + .edge_miners(miner_id.clone()), + }; + + let miner_info = client + .storage() + .at_latest() + .await? + .fetch(&miner_query) + .await?; + + Ok(miner_info.map(|miner| miner.operational_status)) +} \ No newline at end of file diff --git a/miner/src/utils/task_handling.rs b/miner/src/utils/task_handling.rs index 344ab28..8b64c1c 100644 --- a/miner/src/utils/task_handling.rs +++ b/miner/src/utils/task_handling.rs @@ -1,3 +1,5 @@ +use crate::traits::ParachainInteractor; +use crate::utils::substrate_queries::get_miner_operational_status; use crate::{ error::{Error, Result}, global_config::{ @@ -27,7 +29,6 @@ use serde::Serialize; use std::{fs, sync::Arc}; use subxt::utils::AccountId32; use tokio::{sync::RwLock, task::JoinHandle}; -use crate::traits::ParachainInteractor; #[derive(Serialize)] struct TaskOwner { @@ -207,8 +208,8 @@ pub async fn pick_up_task(miner: Arc) -> Result { TaskStatusType::Running => { // Update operational status to Busy when picking up a running task miner - .update_operational_status(OperationalStatus::Busy) - .await?; + .update_operational_status(OperationalStatus::Busy) + .await?; let task = CurrentTask { task_type: task.task_kind, @@ -331,10 +332,22 @@ pub fn return_task_container_name(task_id: TaskId) -> String { pub async fn clean_up_current_task_and_vacate(miner: Arc) -> Result<()> { nuke_all_running_task_containers().await?; - // Update operational status back to Available after task completion - miner - .update_operational_status(OperationalStatus::Available) - .await?; + // Query chainstate to determine if we should set status to Available + let client = global_config::get_parachain_client()?; + let current_status = get_miner_operational_status( + &client, + &miner.identity.miner_id, + &miner.miner_type, + ) + .await + .unwrap_or(Some(OperationalStatus::Available)); + + // Only update to Available if we're not already in that state and don't have another task + if !matches!(current_status, Some(OperationalStatus::Available)) { + miner + .update_operational_status(OperationalStatus::Available) + .await?; + } let keypair = Arc::clone(&miner.keypair); let current_task_id = miner.current_task().await?.read().await.id; diff --git a/miner/src/utils/tx_builder.rs b/miner/src/utils/tx_builder.rs index 272d7c4..80d2856 100644 --- a/miner/src/utils/tx_builder.rs +++ b/miner/src/utils/tx_builder.rs @@ -15,11 +15,7 @@ use substrate_interface::api::edge_connect::Error as EdgeConnectError; use substrate_interface::api::neuro_zk::Error as NzkError; use substrate_interface::api::task_management::Error as TaskManagementError; use subxt_signer::sr25519::Keypair; -use substrate_interface::api::neuro_zk::{Error as NzkError}; -use substrate_interface::api::edge_connect::{Error as EdgeConnectError}; -use substrate_interface::api::task_management::{Error as TaskManagementError}; -use crate::error::Result; -use crate::substrate_interface::{self, api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus}}; +use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus; /// Registers the miner on the blockchain. /// @@ -146,18 +142,16 @@ pub async fn pub_register( async fn update_operational_status( keypair: Arc, miner_type: Arc, - miner_id: u64, + miner_id: Vec, status: OperationalStatus, ) -> Result<()> { let client = global_config::get_parachain_client()?; + let miner_id_bounded = BoundedVec(miner_id); + let tx = substrate_interface::api::tx() .edge_connect() - .update_operational_status( - miner_type.as_ref().clone(), - miner_id, - status, - ); + .update_operational_status(miner_type.as_ref().clone(), miner_id_bounded, status); println!("Transaction Details:"); println!("Module: {:?}", tx.pallet_name()); @@ -169,7 +163,9 @@ async fn update_operational_status( .sign_and_submit_then_watch_default(&tx, keypair.as_ref()) .await .map(|e| { - println!("Operational status update submitted, waiting for transaction to be finalized..."); + println!( + "Operational status update submitted, waiting for transaction to be finalized..." + ); e })? .wait_for_finalized_success() @@ -189,11 +185,14 @@ async fn update_operational_status( } Err(e) => { // Check for acceptable errors - check_for_acceptable_error(&[ - EdgeConnectError::MinerDoesNotExist, - EdgeConnectError::NotAuthorized, - EdgeConnectError::MinerSuspended, - ], e)?; + check_for_acceptable_error( + &[ + EdgeConnectError::MinerDoesNotExist, + EdgeConnectError::NotAuthorized, + EdgeConnectError::MinerSuspended, + ], + e, + )?; println!("Operational status update completed (acceptable error)"); } } @@ -205,21 +204,24 @@ async fn update_operational_status( pub async fn pub_update_operational_status( keypair: Arc, miner_type: Arc, - miner_id: u64, + miner_id: Vec, status: OperationalStatus, ) -> Result<()> { let tx_queue = global_config::get_tx_queue()?; - let rx = tx_queue.enqueue(move || { - let keypair = Arc::clone(&keypair); - let miner_type = Arc::clone(&miner_type); - let status_inner = status.clone(); - async move { - let _ = update_operational_status(keypair, miner_type, miner_id, status_inner).await?; - Ok(TxOutput::Success) - } - }) - .await?; + let rx = tx_queue + .enqueue(move || { + let keypair = Arc::clone(&keypair); + let miner_type = Arc::clone(&miner_type); + let status_inner = status.clone(); + let miner_id_clone = miner_id.clone(); + async move { + let _ = + update_operational_status(keypair, miner_type, miner_id_clone, status_inner).await?; + Ok(TxOutput::Success) + } + }) + .await?; match rx.await { Ok(Ok(TxOutput::Success)) => {