Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions miner/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
error::{Error, Result},
parachain_interactor::registration::retrieve_identity,
substrate_interface::api::runtime_types::cyborg_primitives::miner::MinerType,
types::{Miner, ParentRuntime},
error::{Error, Result},
parachain_interactor::registration::retrieve_identity,
substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus},
types::{Miner, ParentRuntime}
};
use std::{str::FromStr, sync::Arc};
use subxt_signer::{sr25519::Keypair as SR25519Keypair, SecretUri};
Expand Down
46 changes: 45 additions & 1 deletion miner/src/parachain_interactor/behavior_control.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::sync::Arc;

use crate::error::Result;
use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec;
use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus;
use crate::types::Miner;

use crate::utils::substrate_queries::get_miner_operational_status;
use crate::utils::tx_builder::pub_update_operational_status;
use crate::{global_config, substrate_interface};

pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> {
Expand Down Expand Up @@ -47,3 +50,44 @@ pub async fn _miner_self_suspend(miner: &Miner) -> Result<()> {

Ok(())
}

/// Updates the operational status on the parachain (non-blocking)
pub async fn update_operational_status(miner: Arc<Miner>, status: OperationalStatus) -> Result<()> {
// Query current status from chainstate
let client = global_config::get_parachain_client()?;
let current_status = get_miner_operational_status(
&client,
&miner.identity.miner_id,
miner.miner_type.as_ref(),
)
.await
.unwrap_or(None); // Default to None if query fails

// Only update if status has changed
let should_update = match current_status {
Some(current) => !matches!(
(&current, &status),
(OperationalStatus::Available, OperationalStatus::Available)
| (OperationalStatus::Busy, OperationalStatus::Busy)
| (OperationalStatus::Suspended, OperationalStatus::Suspended)
),
None => true, // If we can't get current status, update anyway
};

if !should_update {
return Ok(());
}

// Use the tx_builder for the actual transaction (non-blocking)
let keypair = Arc::clone(&miner.keypair);
let miner_type = Arc::clone(&miner.miner_type);
let miner_id = miner.identity.miner_id.0.clone();

tokio::spawn(async move {
if let Err(e) = pub_update_operational_status(keypair, miner_type, miner_id, status).await {
println!("Error updating operational status: {}", e);
}
});

Ok(())
}
5 changes: 0 additions & 5 deletions miner/src/parachain_interactor/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use std::fs;
use std::sync::Arc;
use subxt::{events::EventDetails, PolkadotConfig};

use super::registration::update_operational_status;

pub async fn process_event(miner: Arc<Miner>, event: &EventDetails<PolkadotConfig>) -> Result<()> {
// Extract the task_id before matching to avoid having to clone miner
let current_task_id = {
Expand Down Expand Up @@ -109,9 +107,6 @@ pub async fn process_event(miner: Arc<Miner>, event: &EventDetails<PolkadotConfi
if assigned_miner.1 .0.to_vec() == miner_data.miner_id.0.to_vec() {
println!("New task scheduled: {:?}", task_scheduled.task_id);

// Update operational status to Busy when task is assigned
update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?;

let current_task = CurrentTask {
task_owner: task_scheduled.task_owner,
task_type: task_scheduled.task_kind,
Expand Down
75 changes: 46 additions & 29 deletions miner/src/parachain_interactor/registration.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::global_config::{PATHS, self, update_config_file};
use crate::error::Result;
use crate::global_config::{self, update_config_file, PATHS};
use crate::self_management::try_apply_update_if_available;
use crate::substrate_interface;
use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus};
use crate::utils::task_handling::pick_up_task;
use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec;
use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::{
MinerType, OperationalStatus,
};
use crate::traits::ParachainInteractor;
use crate::types::{Miner, MinerIdentity};
use crate::utils::substrate_queries::get_miner_operational_status;
use crate::utils::task_handling::pick_up_task;
use crate::utils::tx_builder::pub_register;
use once_cell::sync::Lazy;
use subxt_signer::sr25519::Keypair;
use std::fs;
use std::sync::Arc;
use std::time::{Duration, Instant};
use subxt_signer::sr25519::Keypair;
use tokio::sync::Mutex;
use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec;

static LAST_UPDATE_CHECK: Lazy<Mutex<Option<Instant>>> = Lazy::new(|| Mutex::new(None));

Expand All @@ -37,9 +40,8 @@ async fn confirm_registration() -> Result<RegistrationStatus> {

println!("identity: {:?}", miner_id);


let miner_id_bounded = BoundedVec(miner_id.clone());
// Since there seems to be a bug in subxt that should have been resolved (and we possibly won't have a separate storage map for querying workers by id)
// Since there seems to be a bug in subxt that should have been resolved (and we possibly won't have a separate storage map for querying workers by id)
let miner_registration_confirmation_query = match miner_type {
MinerType::Cloud => substrate_interface::api::storage()
.edge_connect()
Expand Down Expand Up @@ -109,36 +111,49 @@ pub async fn retrieve_identity(
Ok(identity)
}

// Add new function to update operational status
pub async fn update_operational_status(miner: Arc<Miner>, status: OperationalStatus) -> Result<()> {
/// Check the miner's current status from the parachain and update accordingly
async fn check_and_update_miner_status(miner: Arc<Miner>) -> Result<()> {
let client = global_config::get_parachain_client()?;

println!("Updating operational status to: {:?}", status);

let tx = substrate_interface::api::tx()
.edge_connect()
.update_operational_status(
miner.miner_type.as_ref().clone(),
miner.identity.miner_id.clone(),
status,
);

let _ = client
.tx()
.sign_and_submit_then_watch_default(&tx, miner.keypair.as_ref())
.await?
.wait_for_finalized_success()
.await?;
// Query operational status from chain
let operational_status = get_miner_operational_status(
&client,
&miner.identity.miner_id,
&miner.miner_type.as_ref().clone(),
)
.await?;

println!(
"Miner status from parachain - Operational: {:?}",
operational_status
);

// If we have a task but the operational status is Available, update to Busy
if miner.current_task.read().await.is_some() {
if let Some(OperationalStatus::Available) = operational_status {
println!("Miner has task but operational status is Available, updating to Busy");
miner
.update_operational_status(OperationalStatus::Busy)
.await?;
}
} else {
// If we don't have a task but operational status is Busy, update to Available
if let Some(OperationalStatus::Busy) = operational_status {
println!("Miner has no task but operational status is Busy, updating to Available");
miner
.update_operational_status(OperationalStatus::Available)
.await?;
}
}

println!("Operational status updated successfully");
Ok(())
}

pub async fn start_miner(miner: Arc<Miner>) -> Result<()> {
println!("Starting miner...");

// Set operational status to Available when starting
update_operational_status(Arc::clone(&miner), OperationalStatus::Available).await?;
// Check current status from parachain and update accordingly
check_and_update_miner_status(Arc::clone(&miner)).await?;

println!("Waiting for tasks...");

Expand All @@ -162,7 +177,9 @@ pub async fn start_miner(miner: Arc<Miner>) -> Result<()> {
let now = Instant::now();

// Check if 6 hours have passed since the last update attempt
if last_check.map_or(true, |t| now.duration_since(t) > Duration::from_secs(24 * 3600)) {
if last_check.map_or(true, |t| {
now.duration_since(t) > Duration::from_secs(24 * 3600)
}) {
println!("Miner doesn't have an active task, trying to apply update!");
if let Err(e) = try_apply_update_if_available() {
println!("Update check failed: {:?}", e);
Expand Down
19 changes: 19 additions & 0 deletions miner/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ pub trait ParachainInteractor {
/// # Returns
/// A `Result` indicating `Ok(())` if the miner is successfully suspended, or an `Error` if it fails.
async fn suspend_miner(&self) -> Result<()>;

/// Updates the operational status on the parachain (non-blocking)
///
/// # Arguments
/// * `status` - The new operational status to set
///
/// # Returns
/// A `Result` indicating `Ok(())` if successful, or an `Error` if it fails.
async fn update_operational_status(
&self,
status: crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus,
) -> Result<()>;
}

/// Implementation of `ParachainInteractor` trait for `Miner`.
Expand Down Expand Up @@ -137,4 +149,11 @@ impl ParachainInteractor for Arc<Miner> {
async fn suspend_miner(&self) -> Result<()> {
behavior_control::_miner_self_suspend(self).await
}

async fn update_operational_status(
&self,
status: crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::OperationalStatus,
) -> Result<()> {
behavior_control::update_operational_status(Arc::clone(self), status).await
}
}
8 changes: 2 additions & 6 deletions miner/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ use subxt::utils::AccountId32;
use subxt_signer::sr25519::Keypair;
use tokio::sync::RwLock;

use crate::{error::Result, substrate_interface::api::runtime_types::cyborg_primitives::{miner::{MinerType, OperationalStatus}, task::TaskKind}};
use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId;
use crate::{
error::Result,
substrate_interface::api::runtime_types::cyborg_primitives::{
miner::MinerType, task::TaskKind,
},
};


#[derive(Deserialize, Serialize, Debug)]
pub struct MinerIdentity {
Expand Down
27 changes: 26 additions & 1 deletion miner/src/utils/substrate_queries.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId;
use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec;
use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::MinerType;
use crate::substrate_interface::api::runtime_types::cyborg_primitives::miner::{MinerType, OperationalStatus};
use crate::substrate_interface::api::runtime_types::cyborg_primitives::task::TaskInfo;
use crate::types::MinerIdentity;
use crate::{error::Result, substrate_interface};
Expand Down Expand Up @@ -179,3 +179,28 @@ let miner_info = found_miner.ok_or_else(|| "Miner not found for given ID")?;
miner_type,
})
}

/// Query the current operational status of a miner from the parachain
pub async fn get_miner_operational_status(
client: &OnlineClient<subxt::PolkadotConfig>,
miner_id: &BoundedVec<u8>,
miner_type: &MinerType,
) -> Result<Option<OperationalStatus>> {
let miner_query = match miner_type {
MinerType::Cloud => substrate_interface::api::storage()
.edge_connect()
.cloud_miners(miner_id.clone()),
MinerType::Edge => substrate_interface::api::storage()
.edge_connect()
.edge_miners(miner_id.clone()),
};

let miner_info = client
.storage()
.at_latest()
.await?
.fetch(&miner_query)
.await?;

Ok(miner_info.map(|miner| miner.operational_status))
}
35 changes: 29 additions & 6 deletions miner/src/utils/task_handling.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::traits::ParachainInteractor;
use crate::utils::substrate_queries::get_miner_operational_status;
use crate::{
error::{Error, Result},
global_config::{
Expand Down Expand Up @@ -28,8 +30,6 @@ use std::{fs, sync::Arc};
use subxt::utils::AccountId32;
use tokio::{sync::RwLock, task::JoinHandle};

use crate::parachain_interactor::registration::update_operational_status;

#[derive(Serialize)]
struct TaskOwner {
address: AccountId32,
Expand Down Expand Up @@ -168,14 +168,21 @@ pub async fn pick_up_task(miner: Arc<Miner>) -> Result<TaskPickupReturnType> {

nuke_all_running_task_containers().await?;

// Set operational status to Available when no task is active
miner
.update_operational_status(OperationalStatus::Available)
.await?;

Ok(TaskPickupReturnType::Failure(()))
}
// Task is assinged but not yet confirmed by the miner, which is necessary so that the parachain won't suspend the miner
TaskStatusType::Assigned => {
println!("Assigned task found, confirming task reception...");

// Update operational status to Busy when picking up a task
update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?;
miner
.update_operational_status(OperationalStatus::Busy)
.await?;

let task = CurrentTask {
task_type: task.task_kind,
Expand All @@ -200,7 +207,9 @@ pub async fn pick_up_task(miner: Arc<Miner>) -> Result<TaskPickupReturnType> {
// Task should already be running and only needs to be picked back up
TaskStatusType::Running => {
// Update operational status to Busy when picking up a running task
update_operational_status(Arc::clone(&miner), OperationalStatus::Busy).await?;
miner
.update_operational_status(OperationalStatus::Busy)
.await?;

let task = CurrentTask {
task_type: task.task_kind,
Expand Down Expand Up @@ -323,8 +332,22 @@ pub fn return_task_container_name(task_id: TaskId) -> String {
pub async fn clean_up_current_task_and_vacate(miner: Arc<Miner>) -> Result<()> {
nuke_all_running_task_containers().await?;

// Update operational status back to Available after task completion
update_operational_status(Arc::clone(&miner), OperationalStatus::Available).await?;
// Query chainstate to determine if we should set status to Available
let client = global_config::get_parachain_client()?;
let current_status = get_miner_operational_status(
&client,
&miner.identity.miner_id,
&miner.miner_type,
)
.await
.unwrap_or(Some(OperationalStatus::Available));

// Only update to Available if we're not already in that state and don't have another task
if !matches!(current_status, Some(OperationalStatus::Available)) {
miner
.update_operational_status(OperationalStatus::Available)
.await?;
}

let keypair = Arc::clone(&miner.keypair);
let current_task_id = miner.current_task().await?.read().await.id;
Expand Down
Loading