Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion miner/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "cyborg-miner"
description = "An AI inference mine, designed to connect to the cyborg-network, built, among others with Polkadot SDK."
version = "0.0.73"
version = "0.0.83"
license = "Unlicense"
authors = ["Cyborg Network <https://github.com/Cyborg-Network>"]
edition = "2021"
Expand Down
17 changes: 12 additions & 5 deletions miner/src/global_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use subxt::PolkadotConfig;
use crate::error::{Error, Result};
use crate::utils::tx_queue::TransactionQueue;
use crate::utils::tx_queue::TRANSACTION_QUEUE;
use std::sync::Arc;


#[derive(Debug)]
pub struct Paths {
Expand Down Expand Up @@ -49,7 +51,9 @@ pub static FLASH_INFER_PORT: Lazy<u16> = Lazy::new(|| {
});



// Require TX_QUEUE_DB_PATH to be set by the setup script
pub static TX_QUEUE_DB_PATH: Lazy<String> =
Lazy::new(|| env::var("TX_QUEUE_DB_PATH").expect("TX_QUEUE_DB_PATH must be set"));
/*
// The gateway for CESS network
pub static CESS_GATEWAY: Lazy<Arc<RwLock<String>>> = Lazy::new(||
Expand Down Expand Up @@ -88,9 +92,13 @@ pub async fn run_global_config(parachain_url: &str) -> Result<()> {
Lazy::force(&FLASH_INFER_PORT);
//Lazy::force(&CESS_GATEWAY);
Lazy::force(&CURRENT_TASK_PATH);
Lazy::force(&TX_QUEUE_DB_PATH);

// Set the transaction queue
if let Err(_) = TRANSACTION_QUEUE.set(TransactionQueue::new()) {
if TRANSACTION_QUEUE
.set(Arc::new(TransactionQueue::new().await))
.is_err()
{
panic!("Failed to set transaction queue.");
}

Expand All @@ -114,12 +122,11 @@ pub fn get_parachain_client() -> Result<&'static OnlineClient<PolkadotConfig>> {
.ok_or(Error::parachain_client_not_intitialized())
}

pub fn get_tx_queue() -> Result<&'static TransactionQueue> {
pub fn get_tx_queue() -> Result<&'static Arc<TransactionQueue>> {
TRANSACTION_QUEUE.get().ok_or(Error::Custom(
"Transaction queue not initialized".to_string(),
))
}

/*
pub async fn get_cess_gateway() -> String {
CESS_GATEWAY.read().await.clone()
Expand All @@ -141,4 +148,4 @@ pub fn update_config_file(path: &str, content: &str) -> Result<()> {
fs::write(&path, content)?;

Ok(())
}
}
16 changes: 14 additions & 2 deletions miner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ use clap::Parser;
use cli::{Cli, Commands};
use error::Result;
use global_config::run_global_config;
use std::sync::Arc;

use traits::ParachainInteractor;
use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId;
use crate::utils::tx_queue::{TRANSACTION_QUEUE, TransactionQueue, TxOutput};
use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec;


Expand All @@ -58,7 +61,16 @@ async fn main() -> Result<()> {
// Initialize logger
log::init_logger().expect("Could not initialize logger!");


let _queue = TRANSACTION_QUEUE.get_or_init(|| {
tokio::runtime::Handle::current().block_on(async {
Arc::new(TransactionQueue::new().await)
})
});

// Start background processor
_queue.start_processing();



let miner_id_bytes = miner_uuid.as_bytes().to_vec();
let miner_uuid_bounded: MinerId = BoundedVec(miner_id_bytes);
Expand Down Expand Up @@ -98,4 +110,4 @@ async fn main() -> Result<()> {
}

Ok(())
}
}
9 changes: 4 additions & 5 deletions miner/src/utils/task_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,13 @@ pub async fn clean_up_current_task_and_vacate(miner: Arc<Miner>) -> Result<()> {
let tx_queue = global_config::get_tx_queue()?;

let rx = tx_queue
.enqueue(move || {
.enqueue("vacate_miner", move || {
let keypair = Arc::clone(&keypair);
let miner_type = Arc::clone(&miner_type);
async move {
let _ =
confirm_miner_vacation(keypair, current_task_id, miner_type).await?;
Box::pin(async move {
let _ = confirm_miner_vacation(keypair, current_task_id, miner_type).await?;
Ok(TxOutput::Success)
}
})
})
.await?;

Expand Down
10 changes: 5 additions & 5 deletions miner/src/utils/tx_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub async fn pub_register(
let tx_queue = global_config::get_tx_queue()?;

let rx = tx_queue
.enqueue(move || {
.enqueue("register_miner", move || {
let keypair = Arc::clone(&keypair);
let miner_type = miner_type.clone();
let value = miner_uuid.clone();
Expand Down Expand Up @@ -238,13 +238,13 @@ pub async fn pub_confirm_task_reception(
let tx_queue = global_config::get_tx_queue()?;
let current_task_id_copy = *current_task_id;

let rx = tx_queue
.enqueue(move || {
let rx = tx_queue
.enqueue("confirm_task_reception", move || {
let keypair = Arc::clone(&keypair);
async move {
Box::pin(async move {
let _ = confirm_task_reception(keypair, &current_task_id_copy).await?;
Ok(TxOutput::Success)
}
})
})
.await?;

Expand Down
Loading