diff --git a/miner/src/builder.rs b/miner/src/builder.rs index b15d8c9..c967712 100644 --- a/miner/src/builder.rs +++ b/miner/src/builder.rs @@ -1,8 +1,8 @@ use crate::{ - error::{Error, Result}, - parachain_interactor::registration::retrieve_identity, - substrate_interface::api::runtime_types::cyborg_primitives::miner::MinerType, - types::{Miner, ParentRuntime}, + error::{Error, Result}, + parachain_interactor::registration::retrieve_identity, + substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus}, + types::{Miner, ParentRuntime} }; use std::{str::FromStr, sync::Arc}; use subxt_signer::{sr25519::Keypair as SR25519Keypair, SecretUri}; diff --git a/miner/src/parachain_interactor/behavior_control.rs b/miner/src/parachain_interactor/behavior_control.rs index f7eda02..c6d3e85 100644 --- a/miner/src/parachain_interactor/behavior_control.rs +++ b/miner/src/parachain_interactor/behavior_control.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use crate::error::Result; use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus; use crate::types::Miner; - +use crate::utils::substrate_queries::get_miner_operational_status; +use crate::utils::tx_builder::pub_update_operational_status; use crate::{global_config, substrate_interface}; pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> { @@ -47,3 +50,44 @@ pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> { Ok(()) } + +/// Updates the operational status on the parachain (non-blocking) +pub async fn update_operational_status(miner: Arc, status: OperationalStatus) -> Result<()> { + // Query current status from chainstate + let client = global_config::get_parachain_client()?; + let current_status = get_miner_operational_status( + &client, + &miner.identity.miner_id, + miner.miner_type.as_ref(), + ) + .await + .unwrap_or(None); // Default to None if query fails + + // Only update if status has changed + let should_update = match current_status { + Some(current) => !matches!( + (¤t, &status), + (OperationalStatus::Available, OperationalStatus::Available) + | (OperationalStatus::Busy, OperationalStatus::Busy) + | (OperationalStatus::Suspended, OperationalStatus::Suspended) + ), + None => true, // If we can't get current status, update anyway + }; + + if !should_update { + return Ok(()); + } + + // Use the tx_builder for the actual transaction (non-blocking) + let keypair = Arc::clone(&miner.keypair); + let miner_type = Arc::clone(&miner.miner_type); + let miner_id = miner.identity.miner_id.0.clone(); + + tokio::spawn(async move { + if let Err(e) = pub_update_operational_status(keypair, miner_type, miner_id, status).await { + println!("Error updating operational status: {}", e); + } + }); + + Ok(()) +} diff --git a/miner/src/parachain_interactor/event_processor.rs b/miner/src/parachain_interactor/event_processor.rs index f251cc3..efa6b3e 100644 --- a/miner/src/parachain_interactor/event_processor.rs +++ b/miner/src/parachain_interactor/event_processor.rs @@ -13,8 +13,6 @@ use std::fs; use std::sync::Arc; use subxt::{events::EventDetails, PolkadotConfig}; -use super::registration::update_operational_status; - pub async fn process_event(miner: Arc, event: &EventDetails) -> Result<()> { // Extract the task_id before matching to avoid having to clone miner let current_task_id = { @@ -109,9 +107,6 @@ pub async fn process_event(miner: Arc, event: &EventDetails>> = Lazy::new(|| Mutex::new(None)); @@ -37,9 +40,8 @@ async fn confirm_registration() -> Result { println!("identity: {:?}", miner_id); - let miner_id_bounded = BoundedVec(miner_id.clone()); - // Since there seems to be a bug in subxt that should have been resolved (and we possibly won't have a separate storage map for querying workers by id) + // Since there seems to be a bug in subxt that should have been resolved (and we possibly won't have a separate storage map for querying workers by id) let miner_registration_confirmation_query = match miner_type { MinerType::Cloud => substrate_interface::api::storage() .edge_connect() @@ -109,36 +111,49 @@ pub async fn retrieve_identity( Ok(identity) } -// Add new function to update operational status -pub async fn update_operational_status(miner: Arc, status: OperationalStatus) -> Result<()> { +/// Check the miner's current status from the parachain and update accordingly +async fn check_and_update_miner_status(miner: Arc) -> Result<()> { let client = global_config::get_parachain_client()?; - println!("Updating operational status to: {:?}", status); - - let tx = substrate_interface::api::tx() - .edge_connect() - .update_operational_status( - miner.miner_type.as_ref().clone(), - miner.identity.miner_id.clone(), - status, - ); - - let _ = client - .tx() - .sign_and_submit_then_watch_default(&tx, miner.keypair.as_ref()) - .await? - .wait_for_finalized_success() - .await?; + // Query operational status from chain + let operational_status = get_miner_operational_status( + &client, + &miner.identity.miner_id, + &miner.miner_type.as_ref().clone(), + ) + .await?; + + println!( + "Miner status from parachain - Operational: {:?}", + operational_status + ); + + // If we have a task but the operational status is Available, update to Busy + if miner.current_task.read().await.is_some() { + if let Some(OperationalStatus::Available) = operational_status { + println!("Miner has task but operational status is Available, updating to Busy"); + miner + .update_operational_status(OperationalStatus::Busy) + .await?; + } + } else { + // If we don't have a task but operational status is Busy, update to Available + if let Some(OperationalStatus::Busy) = operational_status { + println!("Miner has no task but operational status is Busy, updating to Available"); + miner + .update_operational_status(OperationalStatus::Available) + .await?; + } + } - println!("Operational status updated successfully"); Ok(()) } pub async fn start_miner(miner: Arc) -> Result<()> { println!("Starting miner..."); - // Set operational status to Available when starting - update_operational_status(Arc::clone(&miner), OperationalStatus::Available).await?; + // Check current status from parachain and update accordingly + check_and_update_miner_status(Arc::clone(&miner)).await?; println!("Waiting for tasks..."); @@ -162,7 +177,9 @@ pub async fn start_miner(miner: Arc) -> Result<()> { let now = Instant::now(); // Check if 6 hours have passed since the last update attempt - if last_check.map_or(true, |t| now.duration_since(t) > Duration::from_secs(24 * 3600)) { + if last_check.map_or(true, |t| { + now.duration_since(t) > Duration::from_secs(24 * 3600) + }) { println!("Miner doesn't have an active task, trying to apply update!"); if let Err(e) = try_apply_update_if_available() { println!("Update check failed: {:?}", e); diff --git a/miner/src/traits.rs b/miner/src/traits.rs index f1cb5a3..39a8911 100644 --- a/miner/src/traits.rs +++ b/miner/src/traits.rs @@ -109,6 +109,18 @@ pub trait ParachainInteractor { /// # Returns /// A `Result` indicating `Ok(())` if the miner is successfully suspended, or an `Error` if it fails. async fn suspend_miner(&self) -> Result<()>; + + /// Updates the operational status on the parachain (non-blocking) + /// + /// # Arguments + /// * `status` - The new operational status to set + /// + /// # Returns + /// A `Result` indicating `Ok(())` if successful, or an `Error` if it fails. + async fn update_operational_status( + &self, + status: crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus, + ) -> Result<()>; } /// Implementation of `ParachainInteractor` trait for `Miner`. @@ -137,4 +149,11 @@ impl ParachainInteractor for Arc { async fn suspend_miner(&self) -> Result<()> { behavior_control::_miner_self_suspend(self).await } + + async fn update_operational_status( + &self, + status: crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus, + ) -> Result<()> { + behavior_control::update_operational_status(Arc::clone(self), status).await + } } diff --git a/miner/src/types.rs b/miner/src/types.rs index d08f3be..e183d5a 100644 --- a/miner/src/types.rs +++ b/miner/src/types.rs @@ -4,13 +4,9 @@ use subxt::utils::AccountId32; use subxt_signer::sr25519::Keypair; use tokio::sync::RwLock; +use crate::{error::Result, substrate_interface::api::runtime_types::cyborg_primitives::{miner::{MinerType, OperationalStatus}, task::TaskKind}}; use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId; -use crate::{ - error::Result, - substrate_interface::api::runtime_types::cyborg_primitives::{ - miner::MinerType, task::TaskKind, - }, -}; + #[derive(Deserialize, Serialize, Debug)] pub struct MinerIdentity { diff --git a/miner/src/utils/substrate_queries.rs b/miner/src/utils/substrate_queries.rs index 7fcb704..7f6bf82 100644 --- a/miner/src/utils/substrate_queries.rs +++ b/miner/src/utils/substrate_queries.rs @@ -1,6 +1,6 @@ use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId; use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; -use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::MinerType; +use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus}; use crate::substrate_interface::api::runtime_types::cyborg_primitives::task::TaskInfo; use crate::types::MinerIdentity; use crate::{error::Result, substrate_interface}; @@ -179,3 +179,28 @@ let miner_info = found_miner.ok_or_else(|| "Miner not found for given ID")?; miner_type, }) } + +/// Query the current operational status of a miner from the parachain +pub async fn get_miner_operational_status( + client: &OnlineClient, + miner_id: &BoundedVec, + miner_type: &MinerType, +) -> Result> { + let miner_query = match miner_type { + MinerType::Cloud => substrate_interface::api::storage() + .edge_connect() + .cloud_miners(miner_id.clone()), + MinerType::Edge => substrate_interface::api::storage() + .edge_connect() + .edge_miners(miner_id.clone()), + }; + + let miner_info = client + .storage() + .at_latest() + .await? + .fetch(&miner_query) + .await?; + + Ok(miner_info.map(|miner| miner.operational_status)) +} \ No newline at end of file diff --git a/miner/src/utils/task_handling.rs b/miner/src/utils/task_handling.rs index 456e05c..8b64c1c 100644 --- a/miner/src/utils/task_handling.rs +++ b/miner/src/utils/task_handling.rs @@ -1,3 +1,5 @@ +use crate::traits::ParachainInteractor; +use crate::utils::substrate_queries::get_miner_operational_status; use crate::{ error::{Error, Result}, global_config::{ @@ -28,8 +30,6 @@ use std::{fs, sync::Arc}; use subxt::utils::AccountId32; use tokio::{sync::RwLock, task::JoinHandle}; -use crate::parachain_interactor::registration::update_operational_status; - #[derive(Serialize)] struct TaskOwner { address: AccountId32, @@ -168,6 +168,11 @@ pub async fn pick_up_task(miner: Arc) -> Result { nuke_all_running_task_containers().await?; + // Set operational status to Available when no task is active + miner + .update_operational_status(OperationalStatus::Available) + .await?; + Ok(TaskPickupReturnType::Failure(())) } // Task is assinged but not yet confirmed by the miner, which is necessary so that the parachain won't suspend the miner @@ -175,7 +180,9 @@ pub async fn pick_up_task(miner: Arc) -> Result { println!("Assigned task found, confirming task reception..."); // Update operational status to Busy when picking up a task - update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?; + miner + .update_operational_status(OperationalStatus::Busy) + .await?; let task = CurrentTask { task_type: task.task_kind, @@ -200,7 +207,9 @@ pub async fn pick_up_task(miner: Arc) -> Result { // Task should already be running and only needs to be picked back up TaskStatusType::Running => { // Update operational status to Busy when picking up a running task - update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?; + miner + .update_operational_status(OperationalStatus::Busy) + .await?; let task = CurrentTask { task_type: task.task_kind, @@ -323,8 +332,22 @@ pub fn return_task_container_name(task_id: TaskId) -> String { pub async fn clean_up_current_task_and_vacate(miner: Arc) -> Result<()> { nuke_all_running_task_containers().await?; - // Update operational status back to Available after task completion - update_operational_status(Arc::clone(&miner), OperationalStatus::Available).await?; + // Query chainstate to determine if we should set status to Available + let client = global_config::get_parachain_client()?; + let current_status = get_miner_operational_status( + &client, + &miner.identity.miner_id, + &miner.miner_type, + ) + .await + .unwrap_or(Some(OperationalStatus::Available)); + + // Only update to Available if we're not already in that state and don't have another task + if !matches!(current_status, Some(OperationalStatus::Available)) { + miner + .update_operational_status(OperationalStatus::Available) + .await?; + } let keypair = Arc::clone(&miner.keypair); let current_task_id = miner.current_task().await?.read().await.id; diff --git a/miner/src/utils/tx_builder.rs b/miner/src/utils/tx_builder.rs index c7cdf41..80d2856 100644 --- a/miner/src/utils/tx_builder.rs +++ b/miner/src/utils/tx_builder.rs @@ -15,6 +15,7 @@ use substrate_interface::api::edge_connect::Error as EdgeConnectError; use substrate_interface::api::neuro_zk::Error as NzkError; use substrate_interface::api::task_management::Error as TaskManagementError; use subxt_signer::sr25519::Keypair; +use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus; /// Registers the miner on the blockchain. /// @@ -128,6 +129,118 @@ pub async fn pub_register( } } +/// Updates the operational status on the parachain +/// +/// # Arguments +/// * `keypair` - The miner's keypair +/// * `miner_type` - The type of miner +/// * `miner_id` - The miner's ID +/// * `status` - The new operational status +/// +/// # Returns +/// A `Result` indicating `Ok(())` if successful, or an `Error` if it fails. +async fn update_operational_status( + keypair: Arc, + miner_type: Arc, + miner_id: Vec, + status: OperationalStatus, +) -> Result<()> { + let client = global_config::get_parachain_client()?; + + let miner_id_bounded = BoundedVec(miner_id); + + let tx = substrate_interface::api::tx() + .edge_connect() + .update_operational_status(miner_type.as_ref().clone(), miner_id_bounded, status); + + println!("Transaction Details:"); + println!("Module: {:?}", tx.pallet_name()); + println!("Call: {:?}", tx.call_name()); + println!("Parameters: {:?}", tx.call_data()); + + let tx_submission = client + .tx() + .sign_and_submit_then_watch_default(&tx, keypair.as_ref()) + .await + .map(|e| { + println!( + "Operational status update submitted, waiting for transaction to be finalized..." + ); + e + })? + .wait_for_finalized_success() + .await; + + match tx_submission { + Ok(e) => { + let tx_event = e.find_first::< + substrate_interface::api::edge_connect::events::OperationalStatusUpdated, + >()?; + + if let Some(event) = tx_event { + println!("Operational status updated successfully: {event:?}"); + } else { + println!("No operational status update event found!"); + } + } + Err(e) => { + // Check for acceptable errors + check_for_acceptable_error( + &[ + EdgeConnectError::MinerDoesNotExist, + EdgeConnectError::NotAuthorized, + EdgeConnectError::MinerSuspended, + ], + e, + )?; + println!("Operational status update completed (acceptable error)"); + } + } + + Ok(()) +} + +/// Public interface for updating operational status (non-blocking with queue) +pub async fn pub_update_operational_status( + keypair: Arc, + miner_type: Arc, + miner_id: Vec, + status: OperationalStatus, +) -> Result<()> { + let tx_queue = global_config::get_tx_queue()?; + + let rx = tx_queue + .enqueue(move || { + let keypair = Arc::clone(&keypair); + let miner_type = Arc::clone(&miner_type); + let status_inner = status.clone(); + let miner_id_clone = miner_id.clone(); + async move { + let _ = + update_operational_status(keypair, miner_type, miner_id_clone, status_inner).await?; + Ok(TxOutput::Success) + } + }) + .await?; + + match rx.await { + Ok(Ok(TxOutput::Success)) => { + println!("Operational status updated successfully"); + } + Ok(Err(e)) => { + println!("Error updating operational status: {}", e); + } + Err(_) => { + println!("Response channel dropped for operational status update"); + } + _ => { + println!("Unexpected response for operational status update"); + } + } + + Ok(()) +} + /// Submits a zkml (Zero Knowledge Machine Learning) proof to the blockchain. /// /// # Arguments