diff --git a/Cargo.lock b/Cargo.lock index 91397d0..64a6f6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3286,7 +3286,7 @@ dependencies = [ [[package]] name = "cyborg-miner" -version = "0.0.79-test" +version = "0.0.83" dependencies = [ "async-stream", "async-trait 0.1.88 (git+https://github.com/dtolnay/async-trait.git)", diff --git a/miner/Cargo.toml b/miner/Cargo.toml index 0ed4c00..b5138eb 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "cyborg-miner" description = "An AI inference mine, designed to connect to the cyborg-network, built, among others with Polkadot SDK." -version = "0.0.73" +version = "0.0.83" license = "Unlicense" authors = ["Cyborg Network "] edition = "2021" diff --git a/miner/src/global_config.rs b/miner/src/global_config.rs index 5e4c506..28b4c9c 100644 --- a/miner/src/global_config.rs +++ b/miner/src/global_config.rs @@ -10,6 +10,8 @@ use subxt::PolkadotConfig; use crate::error::{Error, Result}; use crate::utils::tx_queue::TransactionQueue; use crate::utils::tx_queue::TRANSACTION_QUEUE; +use std::sync::Arc; + #[derive(Debug)] pub struct Paths { @@ -49,7 +51,9 @@ pub static FLASH_INFER_PORT: Lazy = Lazy::new(|| { }); - +// Require TX_QUEUE_DB_PATH to be set by the setup script +pub static TX_QUEUE_DB_PATH: Lazy = + Lazy::new(|| env::var("TX_QUEUE_DB_PATH").expect("TX_QUEUE_DB_PATH must be set")); /* // The gateway for CESS network pub static CESS_GATEWAY: Lazy>> = Lazy::new(|| @@ -88,9 +92,13 @@ pub async fn run_global_config(parachain_url: &str) -> Result<()> { Lazy::force(&FLASH_INFER_PORT); //Lazy::force(&CESS_GATEWAY); Lazy::force(&CURRENT_TASK_PATH); + Lazy::force(&TX_QUEUE_DB_PATH); // Set the transaction queue - if let Err(_) = TRANSACTION_QUEUE.set(TransactionQueue::new()) { + if TRANSACTION_QUEUE + .set(Arc::new(TransactionQueue::new().await)) + .is_err() + { panic!("Failed to set transaction queue."); } @@ -114,12 +122,11 @@ pub fn get_parachain_client() -> Result<&'static OnlineClient> { .ok_or(Error::parachain_client_not_intitialized()) } -pub fn get_tx_queue() -> Result<&'static TransactionQueue> { +pub fn get_tx_queue() -> Result<&'static Arc> { TRANSACTION_QUEUE.get().ok_or(Error::Custom( "Transaction queue not initialized".to_string(), )) } - /* pub async fn get_cess_gateway() -> String { CESS_GATEWAY.read().await.clone() @@ -141,4 +148,4 @@ pub fn update_config_file(path: &str, content: &str) -> Result<()> { fs::write(&path, content)?; Ok(()) -} +} \ No newline at end of file diff --git a/miner/src/main.rs b/miner/src/main.rs index fbd4688..aa359f5 100644 --- a/miner/src/main.rs +++ b/miner/src/main.rs @@ -31,8 +31,11 @@ use clap::Parser; use cli::{Cli, Commands}; use error::Result; use global_config::run_global_config; +use std::sync::Arc; + use traits::ParachainInteractor; use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId; +use crate::utils::tx_queue::{TRANSACTION_QUEUE, TransactionQueue, TxOutput}; use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; @@ -58,7 +61,16 @@ async fn main() -> Result<()> { // Initialize logger log::init_logger().expect("Could not initialize logger!"); - + let _queue = TRANSACTION_QUEUE.get_or_init(|| { + tokio::runtime::Handle::current().block_on(async { + Arc::new(TransactionQueue::new().await) + }) + }); + + // Start background processor + _queue.start_processing(); + + let miner_id_bytes = miner_uuid.as_bytes().to_vec(); let miner_uuid_bounded: MinerId = BoundedVec(miner_id_bytes); @@ -98,4 +110,4 @@ async fn main() -> Result<()> { } Ok(()) -} +} \ No newline at end of file diff --git a/miner/src/utils/task_handling.rs b/miner/src/utils/task_handling.rs index 456e05c..b377670 100644 --- a/miner/src/utils/task_handling.rs +++ b/miner/src/utils/task_handling.rs @@ -335,14 +335,13 @@ pub async fn clean_up_current_task_and_vacate(miner: Arc) -> Result<()> { let tx_queue = global_config::get_tx_queue()?; let rx = tx_queue - .enqueue(move || { + .enqueue("vacate_miner", move || { let keypair = Arc::clone(&keypair); let miner_type = Arc::clone(&miner_type); - async move { - let _ = - confirm_miner_vacation(keypair, current_task_id, miner_type).await?; + Box::pin(async move { + let _ = confirm_miner_vacation(keypair, current_task_id, miner_type).await?; Ok(TxOutput::Success) - } + }) }) .await?; diff --git a/miner/src/utils/tx_builder.rs b/miner/src/utils/tx_builder.rs index ecf4016..0e0d27d 100644 --- a/miner/src/utils/tx_builder.rs +++ b/miner/src/utils/tx_builder.rs @@ -110,7 +110,7 @@ pub async fn pub_register( let tx_queue = global_config::get_tx_queue()?; let rx = tx_queue - .enqueue(move || { + .enqueue("register_miner", move || { let keypair = Arc::clone(&keypair); let miner_type = miner_type.clone(); let value = miner_uuid.clone(); @@ -238,13 +238,13 @@ pub async fn pub_confirm_task_reception( let tx_queue = global_config::get_tx_queue()?; let current_task_id_copy = *current_task_id; - let rx = tx_queue - .enqueue(move || { + let rx = tx_queue + .enqueue("confirm_task_reception", move || { let keypair = Arc::clone(&keypair); - async move { + Box::pin(async move { let _ = confirm_task_reception(keypair, ¤t_task_id_copy).await?; Ok(TxOutput::Success) - } + }) }) .await?; diff --git a/miner/src/utils/tx_queue.rs b/miner/src/utils/tx_queue.rs index fbd4e88..e60aba3 100644 --- a/miner/src/utils/tx_queue.rs +++ b/miner/src/utils/tx_queue.rs @@ -1,33 +1,47 @@ -use crate::{error::Result, types::MinerIdentity}; +use crate::error::Result; use once_cell::sync::OnceCell; +use serde::{Deserialize, Serialize}; +use sled; use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, future::Future, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::{Arc, Mutex as StdMutex}, }; +use std::sync::atomic::Ordering; use tokio::sync::{oneshot, Mutex}; use tokio::time::{sleep, Duration}; -const MAX_RETRIES: u32 = 500; +use crate::global_config::TX_QUEUE_DB_PATH; +use crate::types::MinerIdentity; -/// The type of an async transaction executor closure: no args, returns a Future Result -type TxExecutor = - Box Pin> + Send>> + Send + Sync>; +const MAX_RETRIES: u32 = 500; +const EMPTY_SLEEP_MS: u64 = 250; // when queue empty, sleep briefly and continue -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum TxOutput { RegistrationInfo(MinerIdentity), Success, } +pub type TxExecutor = + Box Pin> + Send>> + Send + Sync>; + +#[derive(Serialize, Deserialize, Clone)] +pub struct PersistentTx { + pub id: u64, + pub task_key: String, + pub payload: Option>, + pub retry_count: u32, + pub timestamp: u64, +} + +/// In-memory Transaction pub struct Transaction { - executor: TxExecutor, - responder: Option>>, - retry_count: u32, + pub id: u64, + pub executor: TxExecutor, + pub responder: Option>>, + pub retry_count: u32, } impl Transaction { @@ -44,90 +58,339 @@ impl Transaction { } } + +type TxHandler = Arc< + dyn Fn(PersistentTx) -> Pin> + Send>> + + Send + + Sync, +>; + pub struct TransactionQueue { inner: Arc>>, - processing: Arc, + processing: Arc, + db: sled::Db, + registry: Arc>>, } -pub static TRANSACTION_QUEUE: OnceCell = OnceCell::new(); +pub static TRANSACTION_QUEUE: OnceCell> = OnceCell::new(); impl TransactionQueue { - pub fn new() -> Self { + /// Open sled DB and initialize queue + registry + pub async fn new() -> Self { + let db_path = TX_QUEUE_DB_PATH.as_str().to_string(); + let db = tokio::task::spawn_blocking(move || sled::open(db_path)) + .await + .expect("Failed to join blocking task") + .expect("Failed to open sled DB"); + + let queue = Arc::new(Mutex::new(VecDeque::new())); + let mut persisted_count = 0usize; + + { + let db_clone = db.clone(); + let entries: Vec<_> = tokio::task::spawn_blocking(move || { + let mut out = Vec::new(); + for item in db_clone.iter() { + if let Ok((_k, v)) = item { + if bincode::deserialize::(&v).is_ok() { + out.push(()); + } + } + } + out + }) + .await + .unwrap(); + persisted_count = entries.len(); + } + + println!( + "[TX-QUEUE] Initialized sled DB at {}. Found {} persisted tx records.", + TX_QUEUE_DB_PATH.as_str(), + persisted_count + ); + Self { - inner: Arc::new(Mutex::new(VecDeque::new())), - processing: Arc::new(AtomicBool::new(false)), + inner: queue, + processing: Arc::new(std::sync::atomic::AtomicBool::new(false)), + db, + registry: Arc::new(StdMutex::new(HashMap::new())), } } - pub async fn enqueue(&self, executor: F) -> Result>> + + + pub fn register_handler(&self, task_key: &str, handler: F) + where + F: Fn(PersistentTx) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let mut reg = self.registry.lock().expect("Registry mutex poisoned"); + reg.insert( + task_key.to_string(), + Arc::new(move |ptx: PersistentTx| Box::pin(handler(ptx))), + ); + } + + + /// Returns a oneshot receiver to await the tx result. + pub async fn enqueue( + self: &Arc, + task_key: &str, + executor_fn: F, + ) -> Result>> where F: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { let (tx, rx) = oneshot::channel(); + let tx_id = self.next_id().await; + + let boxed_executor: TxExecutor = Box::new(move || Box::pin(executor_fn())); - let tx = Transaction { - executor: Box::new(move || Box::pin(executor())), + let tx_obj = Transaction { + id: tx_id, + executor: boxed_executor, responder: Some(tx), retry_count: 0, }; - self.inner.lock().await.push_back(tx); - self.start_processing(); + let persistent_tx = PersistentTx { + id: tx_id, + task_key: task_key.to_string(), + payload: None, + retry_count: 0, + timestamp: chrono::Utc::now().timestamp_millis() as u64, + }; + + self.persist_tx(&persistent_tx).await; + + { + let mut q = self.inner.lock().await; + q.push_back(tx_obj); + + println!( + "[TX-QUEUE] Enqueued tx #{} (task_key={}). Queue size: {}", + tx_id, + task_key, + q.len() + ); + } Ok(rx) } pub fn start_processing(&self) { if self.processing.swap(true, Ordering::SeqCst) { - // Already processing return; } let inner = Arc::clone(&self.inner); - let processing_flag = Arc::clone(&self.processing); + let db = self.db.clone(); tokio::spawn(async move { loop { let tx_opt = { let mut queue = inner.lock().await; - println!("Queue size: {}", queue.len()); queue.pop_front() }; match tx_opt { - Some(mut tx) => match tx.execute().await { - Ok(result) => { - println!("Transaction succeeded: {result:?}"); - if let Some(responder) = tx.responder.take() { - let _ = responder.send(Ok(result)); + Some(mut tx) => { + println!( + "[TX-QUEUE] Processing tx #{} (retry #{})", + tx.id, + tx.retry_count() + ); + + match tx.execute().await { + Ok(result) => { + println!("[TX-QUEUE] Tx #{} succeeded: {:?}", tx.id, result); + + // delete persisted entry + let _ = tokio::task::spawn_blocking({ + let db = db.clone(); + let id = tx.id; + move || { + let key = id.to_be_bytes(); + let _ = db.remove(key); + } + }) + .await; + + if let Some(responder) = tx.responder.take() { + let _ = responder.send(Ok(result)); + } } - } - Err(e) if tx.retry_count < MAX_RETRIES => { - println!("Transaction failed: {}", e); - tx.increment_retry(); + Err(e) if tx.retry_count < MAX_RETRIES => { + println!("[TX-QUEUE] Tx #{} failed: {}", tx.id, e); - let delay_ms = 1000 * 2u64.pow(tx.retry_count().min(10)); - println!("Retrying after {} ms", delay_ms); - sleep(Duration::from_millis(delay_ms)).await; + // increment retry count and persist the updated retry count + let new_retry = tx.retry_count + 1; + let _ = tokio::task::spawn_blocking({ + let db = db.clone(); + let id = tx.id; + let retry_val = new_retry; + move || { + if let Ok(Some(v)) = db.get(id.to_be_bytes()) { + if let Ok(mut ptx) = bincode::deserialize::(&v) { + ptx.retry_count = retry_val; + let _ = db.insert(id.to_be_bytes(), bincode::serialize(&ptx).unwrap()); + } + } + } + }) + .await; - let mut queue = inner.lock().await; - queue.push_front(tx); - } - Err(e) => { - println!("Transaction failed: {}", e); - if let Some(responder) = tx.responder.take() { - let _ = responder.send(Err(e)); + let delay_ms = 1000u64.saturating_mul(2u64.pow(std::cmp::min(tx.retry_count(), 10))); + println!( + "[TX-QUEUE] Retrying tx #{} after {} ms (attempt #{})", + tx.id, + delay_ms, + new_retry + ); + sleep(Duration::from_millis(delay_ms)).await; + + let mut queue = inner.lock().await; + tx.retry_count = new_retry; + queue.push_front(tx); + } + Err(e) => { + println!("[TX-QUEUE] Tx #{} permanently failed: {}", tx.id, e); + + // delete persisted entry + let _ = tokio::task::spawn_blocking({ + let db = db.clone(); + let id = tx.id; + move || { + let key = id.to_be_bytes(); + let _ = db.remove(key); + } + }) + .await; + + if let Some(responder) = tx.responder.take() { + let _ = responder.send(Err(e)); + } } } - }, + } None => { - processing_flag.store(false, Ordering::SeqCst); - println!("Transaction queue is empty"); - break; + // queue empty -> sleep briefly then continue + sleep(Duration::from_millis(EMPTY_SLEEP_MS)).await; + continue; } } } }); } -} \ No newline at end of file + + /// Returns number of restored transactions. + pub async fn restore_from_db_using_registry(&self) -> usize { + let mut restored_count = 0usize; + + // read persisted entries in a blocking task + let entries: Vec = { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut out = Vec::new(); + for item in db.iter() { + if let Ok((_k, v)) = item { + if let Ok(p) = bincode::deserialize::(&v) { + out.push(p); + } + } + } + out + }) + .await + .unwrap() + }; + + { + let mut q = self.inner.lock().await; + let registry_guard = self.registry.lock().expect("Registry mutex poisoned"); + + for p in entries { + let tx_obj = if let Some(handler) = registry_guard.get(&p.task_key) { + let handler_clone = Arc::clone(handler); + let p_clone = p.clone(); + let exec: TxExecutor = Box::new(move || { + let handler_clone = Arc::clone(&handler_clone); + let ptx = p_clone.clone(); + Box::pin(async move { (handler_clone)(ptx).await }) + }); + + Transaction { + id: p.id, + executor: exec, + responder: None, + retry_count: p.retry_count, + } + } else { + let missing_key = p.task_key.clone(); + let id_for_delete = p.id; + let exec: TxExecutor = Box::new(move || { + let missing_key_clone = missing_key.clone(); + + Box::pin(async move { + Err(crate::error::Error::Custom(format!( + "No handler registered for task_key '{}'", + missing_key_clone + ))) + }) + }); + + Transaction { + id: p.id, + executor: exec, + responder: None, + retry_count: p.retry_count, + } + }; + + q.push_back(tx_obj); + restored_count += 1; + } + } + + restored_count + } + + async fn persist_tx(&self, tx: &PersistentTx) { + let db = self.db.clone(); + let key = tx.id.to_be_bytes(); + let value = bincode::serialize(tx).unwrap_or_else(|e| { + panic!("Failed to serialize PersistentTx: {}", e); + }); + + let _ = tokio::task::spawn_blocking(move || db.insert(key, value)).await; + } + + async fn next_id(&self) -> u64 { + let counter_key = b"tx_counter"; + let db = self.db.clone(); + + tokio::task::spawn_blocking(move || { + let next_id = db + .update_and_fetch(counter_key, |old| { + let next = match old { + Some(v) => { + let mut arr = [0u8; 8]; + arr.copy_from_slice(&v); + u64::from_be_bytes(arr) + 1 + } + None => 1, + }; + Some(next.to_be_bytes().to_vec()) + }) + .unwrap(); + + let mut arr = [0u8; 8]; + arr.copy_from_slice(&next_id.unwrap()); + u64::from_be_bytes(arr) + }) + .await + .unwrap() + } +}