From 02d110693a09feac4bf6dcead9ba297191fa4b9a Mon Sep 17 00:00:00 2001 From: ronnie-ahmed Date: Fri, 21 Nov 2025 17:53:24 +0600 Subject: [PATCH 1/4] Added updated queue persistance --- Cargo.lock | 2 +- miner/Cargo.toml | 2 +- miner/src/global_config.rs | 10 +- miner/src/main.rs | 13 ++- miner/src/utils/tx_queue.rs | 210 +++++++++++++++++++++++++----------- 5 files changed, 164 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91397d0..64a6f6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3286,7 +3286,7 @@ dependencies = [ [[package]] name = "cyborg-miner" -version = "0.0.79-test" +version = "0.0.83" dependencies = [ "async-stream", "async-trait 0.1.88 (git+https://github.com/dtolnay/async-trait.git)", diff --git a/miner/Cargo.toml b/miner/Cargo.toml index 382c7b7..b5138eb 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "cyborg-miner" description = "An AI inference mine, designed to connect to the cyborg-network, built, among others with Polkadot SDK." -version = "0.0.79-test" +version = "0.0.83" license = "Unlicense" authors = ["Cyborg Network "] edition = "2021" diff --git a/miner/src/global_config.rs b/miner/src/global_config.rs index dfcef9c..2ed2b7a 100644 --- a/miner/src/global_config.rs +++ b/miner/src/global_config.rs @@ -10,6 +10,8 @@ use subxt::PolkadotConfig; use crate::error::{Error, Result}; use crate::utils::tx_queue::TransactionQueue; use crate::utils::tx_queue::TRANSACTION_QUEUE; +use std::sync::Arc; + #[derive(Debug)] pub struct Paths { @@ -93,7 +95,10 @@ pub async fn run_global_config(parachain_url: &str) -> Result<()> { Lazy::force(&TX_QUEUE_DB_PATH); // Set the transaction queue - if let Err(_) = TRANSACTION_QUEUE.set(TransactionQueue::new().await) { + if TRANSACTION_QUEUE + .set(Arc::new(TransactionQueue::new().await)) + .is_err() + { panic!("Failed to set transaction queue."); } @@ -117,12 +122,11 @@ pub fn get_parachain_client() -> Result<&'static OnlineClient> { .ok_or(Error::parachain_client_not_intitialized()) } -pub fn get_tx_queue() -> Result<&'static TransactionQueue> { +pub fn get_tx_queue() -> Result<&'static Arc> { TRANSACTION_QUEUE.get().ok_or(Error::Custom( "Transaction queue not initialized".to_string(), )) } - /* pub async fn get_cess_gateway() -> String { CESS_GATEWAY.read().await.clone() diff --git a/miner/src/main.rs b/miner/src/main.rs index 82f3c37..605475f 100644 --- a/miner/src/main.rs +++ b/miner/src/main.rs @@ -31,6 +31,8 @@ use clap::Parser; use cli::{Cli, Commands}; use error::Result; use global_config::run_global_config; +use std::sync::Arc; + use traits::ParachainInteractor; use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId; use crate::utils::tx_queue::{TRANSACTION_QUEUE, TransactionQueue, TxOutput}; @@ -59,12 +61,15 @@ async fn main() -> Result<()> { // Initialize logger log::init_logger().expect("Could not initialize logger!"); - let _queue = TRANSACTION_QUEUE.get_or_init(|| { - tokio::runtime::Handle::current().block_on(async { - TransactionQueue::new().await - }) + let _queue = TRANSACTION_QUEUE.get_or_init(|| { + tokio::runtime::Handle::current().block_on(async { + Arc::new(TransactionQueue::new().await) + }) }); + // Start background processor + _queue.start_processing(); + let miner_id_bytes = miner_uuid.as_bytes().to_vec(); let miner_uuid_bounded: MinerId = BoundedVec(miner_id_bytes); diff --git a/miner/src/utils/tx_queue.rs b/miner/src/utils/tx_queue.rs index 798a706..bbca52a 100644 --- a/miner/src/utils/tx_queue.rs +++ b/miner/src/utils/tx_queue.rs @@ -1,8 +1,10 @@ -use crate::{error::Result, types::MinerIdentity}; +use crate::error::Result; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; -use sled::{self}; +use sled; + use crate::global_config::TX_QUEUE_DB_PATH; +use crate::types::MinerIdentity; use std::{ collections::VecDeque, future::Future, @@ -16,10 +18,7 @@ use tokio::sync::{oneshot, Mutex}; use tokio::time::{sleep, Duration}; const MAX_RETRIES: u32 = 500; - -/// Async transaction executor closure type. -type TxExecutor = - Box Pin> + Send>> + Send + Sync>; +const EMPTY_SLEEP_MS: u64 = 250; // when queue empty, sleep briefly and continue #[derive(Debug, Serialize, Deserialize, Clone)] pub enum TxOutput { @@ -27,12 +26,16 @@ pub enum TxOutput { Success, } + +pub type TxExecutor = + Box Pin> + Send>> + Send + Sync>; + #[derive(Serialize, Deserialize, Clone)] pub struct PersistentTx { pub id: u64, pub retry_count: u32, pub timestamp: u64, - pub data: Option>, + // pub kind: TxKind, } pub struct Transaction { @@ -62,61 +65,56 @@ pub struct TransactionQueue { db: sled::Db, } -pub static TRANSACTION_QUEUE: OnceCell = OnceCell::new(); +pub static TRANSACTION_QUEUE: OnceCell> = OnceCell::new(); impl TransactionQueue { + /// Create/open sled DB and initialize queue. pub async fn new() -> Self { - let db_path = TX_QUEUE_DB_PATH.as_str(); + let db_path = TX_QUEUE_DB_PATH.as_str().to_string(); let db = tokio::task::spawn_blocking(move || sled::open(db_path)) .await .expect("Failed to join blocking task") .expect("Failed to open sled DB"); + let queue = Arc::new(Mutex::new(VecDeque::new())); - let mut restored_count = 0; + let mut persisted_count = 0usize; - // Restore persisted transactions { - let mut queue_lock = queue.lock().await; - for item in db.iter() { - if let Ok((_, value)) = item { - if let Ok(persistent_tx) = bincode::deserialize::(&value) { - let tx_obj = Transaction { - id: persistent_tx.id, - executor: Box::new(|| { - Box::pin(async { - println!("[TX-QUEUE] Restored tx executed."); - Ok(TxOutput::Success) - }) - }), - responder: None, - retry_count: persistent_tx.retry_count, - }; - queue_lock.push_back(tx_obj); - restored_count += 1; + let db_clone = db.clone(); + let entries: Vec<_> = tokio::task::spawn_blocking(move || { + let mut out = Vec::new(); + for item in db_clone.iter() { + if let Ok((_k, v)) = item { + if bincode::deserialize::(&v).is_ok() { + out.push(()); + } } } - } + out + }) + .await + .unwrap(); + persisted_count = entries.len(); } println!( - "[TX-QUEUE] Initialized sled DB at {db_path}. Restored {restored_count} txs." + "[TX-QUEUE] Initialized sled DB at {}. Found {} persisted tx records.", + TX_QUEUE_DB_PATH.as_str(), + persisted_count ); - let tx_queue = Self { + Self { inner: queue, processing: Arc::new(AtomicBool::new(false)), db, - }; - - if restored_count > 0 { - println!("[TX-QUEUE] Resuming restored transactions..."); - tx_queue.start_processing(); } - - tx_queue } - pub async fn enqueue(&self, executor: F) -> Result>> + + pub async fn enqueue( + self: &Arc, + executor_fn: F, + ) -> Result>> where F: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, @@ -124,9 +122,11 @@ impl TransactionQueue { let (tx, rx) = oneshot::channel(); let tx_id = self.next_id().await; + let boxed_executor: TxExecutor = Box::new(move || Box::pin(executor_fn())); + let tx_obj = Transaction { id: tx_id, - executor: Box::new(move || Box::pin(executor())), + executor: boxed_executor, responder: Some(tx), retry_count: 0, }; @@ -135,24 +135,30 @@ impl TransactionQueue { id: tx_id, retry_count: 0, timestamp: chrono::Utc::now().timestamp_millis() as u64, - data: None, + }; + // persist then push to queue self.persist_tx(&persistent_tx).await; - self.inner.lock().await.push_back(tx_obj); - - println!( - "[TX-QUEUE] Enqueued tx #{}. Queue size: {}", - tx_id, - self.inner.lock().await.len() - ); - self.start_processing(); + { + let mut q = self.inner.lock().await; + q.push_back(tx_obj); + + println!( + "[TX-QUEUE] Enqueued tx #{}. Queue size: {}", + tx_id, + q.len() + ); + } Ok(rx) } - fn start_processing(&self) { + /// Start background processing. This method is idempotent. + /// Processor stays alive (loops forever) and polls the queue, sleeping when empty. + pub fn start_processing(&self) { + // If already running, do nothing if self.processing.swap(true, Ordering::SeqCst) { return; } @@ -162,6 +168,7 @@ impl TransactionQueue { let db = self.db.clone(); tokio::spawn(async move { + // keep processor alive until process exits loop { let tx_opt = { let mut queue = inner.lock().await; @@ -179,30 +186,70 @@ impl TransactionQueue { match tx.execute().await { Ok(result) => { println!("[TX-QUEUE] Tx #{} succeeded: {:?}", tx.id, result); - TransactionQueue::delete_persisted(&db, tx.id).await; + + // delete persisted entry (run in blocking task) + let _ = tokio::task::spawn_blocking({ + let db = db.clone(); + let id = tx.id; + move || { + let key = id.to_be_bytes(); + let _ = db.remove(key); + } + }) + .await; + if let Some(responder) = tx.responder.take() { let _ = responder.send(Ok(result)); } } Err(e) if tx.retry_count < MAX_RETRIES => { println!("[TX-QUEUE] Tx #{} failed: {}", tx.id, e); - tx.increment_retry(); - let delay_ms = 1000 * 2u64.pow(tx.retry_count().min(10)); + // increment retry count and persist the updated retry count + let mut new_retry = tx.retry_count + 1; + // Save updated retry_count to DB + let _ = tokio::task::spawn_blocking({ + let db = db.clone(); + let id = tx.id; + let retry = new_retry; + move || { + if let Ok(Some(v)) = db.get(id.to_be_bytes()) { + if let Ok(mut ptx) = bincode::deserialize::(&v) { + ptx.retry_count = retry; + let _ = db.insert(id.to_be_bytes(), bincode::serialize(&ptx).unwrap()); + } + } + } + }) + .await; + + let delay_ms = 1000u64.saturating_mul(2u64.pow(std::cmp::min(tx.retry_count(), 10))); println!( "[TX-QUEUE] Retrying tx #{} after {} ms (attempt #{})", tx.id, delay_ms, - tx.retry_count() + new_retry ); sleep(Duration::from_millis(delay_ms)).await; let mut queue = inner.lock().await; + tx.retry_count = new_retry; queue.push_front(tx); } Err(e) => { println!("[TX-QUEUE] Tx #{} permanently failed: {}", tx.id, e); - TransactionQueue::delete_persisted(&db, tx.id).await; + + // delete persisted entry + let _ = tokio::task::spawn_blocking({ + let db = db.clone(); + let id = tx.id; + move || { + let key = id.to_be_bytes(); + let _ = db.remove(key); + } + }) + .await; + if let Some(responder) = tx.responder.take() { let _ = responder.send(Err(e)); } @@ -210,26 +257,61 @@ impl TransactionQueue { } } None => { - println!("[TX-QUEUE] Queue empty. Halting processor."); - processing_flag.store(false, Ordering::SeqCst); - break; + // queue empty -> pause briefly then continue (processor stays running) + // set processing flag true (already true) + sleep(Duration::from_millis(EMPTY_SLEEP_MS)).await; + continue; } } } + + }); } + /// Restore persisted transactions by using a build function that converts PersistentTx -> Transaction. + /// This function returns the number of restored txs. After restore, call `start_processing()` (it will be no-op if already running). + pub async fn restore_from_db(&self, mut build_fn: F) -> usize + where + F: FnMut(PersistentTx) -> Transaction, + { + let mut restored_count = 0usize; + let entries: Vec = { + let db = self.db.clone(); + tokio::task::spawn_blocking(move || { + let mut out = Vec::new(); + for item in db.iter() { + if let Ok((_k, v)) = item { + if let Ok(p) = bincode::deserialize::(&v) { + out.push(p); + } + } + } + out + }) + .await + .unwrap() + }; + + { + let mut q = self.inner.lock().await; + for p in entries { + let tx_obj = build_fn(p.clone()); + q.push_back(tx_obj); + restored_count += 1; + } + } + + restored_count + } + + /// write PersistentTx to sled async fn persist_tx(&self, tx: &PersistentTx) { let db = self.db.clone(); let key = tx.id.to_be_bytes(); let value = bincode::serialize(tx).unwrap(); - let _ = tokio::task::spawn_blocking(move || db.insert(key, value)).await; - } - async fn delete_persisted(db: &sled::Db, id: u64) { - let key = id.to_be_bytes(); - let db = db.clone(); - let _ = tokio::task::spawn_blocking(move || db.remove(key)).await; + let _ = tokio::task::spawn_blocking(move || db.insert(key, value)).await; } async fn next_id(&self) -> u64 { From ef3753be225e23821bb34e8b27108d69877d8cec Mon Sep 17 00:00:00 2001 From: ronnie-ahmed Date: Fri, 21 Nov 2025 21:35:00 +0600 Subject: [PATCH 2/4] resolving conflicts --- miner/Cargo.toml | 2 +- miner/src/global_config.rs | 7 +- miner/src/main.rs | 3 +- miner/src/utils/tx_queue.rs | 22 +- setup.sh | 465 ++++++++++++++++++++++++++++++++++++ 5 files changed, 485 insertions(+), 14 deletions(-) create mode 100755 setup.sh diff --git a/miner/Cargo.toml b/miner/Cargo.toml index 0ed4c00..b5138eb 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "cyborg-miner" description = "An AI inference mine, designed to connect to the cyborg-network, built, among others with Polkadot SDK." -version = "0.0.73" +version = "0.0.83" license = "Unlicense" authors = ["Cyborg Network "] edition = "2021" diff --git a/miner/src/global_config.rs b/miner/src/global_config.rs index 1cb1ed4..28b4c9c 100644 --- a/miner/src/global_config.rs +++ b/miner/src/global_config.rs @@ -51,7 +51,9 @@ pub static FLASH_INFER_PORT: Lazy = Lazy::new(|| { }); - +// Require TX_QUEUE_DB_PATH to be set by the setup script +pub static TX_QUEUE_DB_PATH: Lazy = + Lazy::new(|| env::var("TX_QUEUE_DB_PATH").expect("TX_QUEUE_DB_PATH must be set")); /* // The gateway for CESS network pub static CESS_GATEWAY: Lazy>> = Lazy::new(|| @@ -90,6 +92,7 @@ pub async fn run_global_config(parachain_url: &str) -> Result<()> { Lazy::force(&FLASH_INFER_PORT); //Lazy::force(&CESS_GATEWAY); Lazy::force(&CURRENT_TASK_PATH); + Lazy::force(&TX_QUEUE_DB_PATH); // Set the transaction queue if TRANSACTION_QUEUE @@ -145,4 +148,4 @@ pub fn update_config_file(path: &str, content: &str) -> Result<()> { fs::write(&path, content)?; Ok(()) -} +} \ No newline at end of file diff --git a/miner/src/main.rs b/miner/src/main.rs index 24436d1..5fc86d7 100644 --- a/miner/src/main.rs +++ b/miner/src/main.rs @@ -35,6 +35,7 @@ use std::sync::Arc; use traits::ParachainInteractor; use crate::substrate_interface::api::edge_connect::calls::types::remove_miner::MinerId; +use crate::utils::tx_queue::{TRANSACTION_QUEUE, TransactionQueue, TxOutput}; use crate::substrate_interface::api::runtime_types::bounded_collections::bounded_vec::BoundedVec; @@ -107,4 +108,4 @@ async fn main() -> Result<()> { } Ok(()) -} +} \ No newline at end of file diff --git a/miner/src/utils/tx_queue.rs b/miner/src/utils/tx_queue.rs index 9fe301b..1367c27 100644 --- a/miner/src/utils/tx_queue.rs +++ b/miner/src/utils/tx_queue.rs @@ -20,7 +20,7 @@ use tokio::time::{sleep, Duration}; const MAX_RETRIES: u32 = 500; const EMPTY_SLEEP_MS: u64 = 250; // when queue empty, sleep briefly and continue -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum TxOutput { RegistrationInfo(MinerIdentity), Success, @@ -35,13 +35,14 @@ pub struct PersistentTx { pub id: u64, pub retry_count: u32, pub timestamp: u64, - + // pub kind: TxKind, } pub struct Transaction { - executor: TxExecutor, - responder: Option>>, - retry_count: u32, + pub id: u64, + pub executor: TxExecutor, + pub responder: Option>>, + pub retry_count: u32, } impl Transaction { @@ -61,6 +62,7 @@ impl Transaction { pub struct TransactionQueue { inner: Arc>>, processing: Arc, + db: sled::Db, } pub static TRANSACTION_QUEUE: OnceCell> = OnceCell::new(); @@ -75,7 +77,7 @@ impl TransactionQueue { .expect("Failed to open sled DB"); let queue = Arc::new(Mutex::new(VecDeque::new())); - let mut persisted_count = 0usize; + let mut _persisted_count = 0usize; { let db_clone = db.clone(); @@ -118,6 +120,7 @@ impl TransactionQueue { Fut: Future> + Send + 'static, { let (tx, rx) = oneshot::channel(); + let tx_id = self.next_id().await; let boxed_executor: TxExecutor = Box::new(move || Box::pin(executor_fn())); @@ -157,19 +160,18 @@ impl TransactionQueue { pub fn start_processing(&self) { // If already running, do nothing if self.processing.swap(true, Ordering::SeqCst) { - // Already processing return; } let inner = Arc::clone(&self.inner); let processing_flag = Arc::clone(&self.processing); + let db = self.db.clone(); tokio::spawn(async move { // keep processor alive until process exits loop { let tx_opt = { let mut queue = inner.lock().await; - println!("Queue size: {}", queue.len()); queue.pop_front() }; @@ -253,7 +255,7 @@ impl TransactionQueue { } } } - }, + } None => { // queue empty -> pause briefly then continue (processor stays running) // set processing flag true (already true) @@ -338,4 +340,4 @@ impl TransactionQueue { .await .unwrap() } -} +} \ No newline at end of file diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..0cf7963 --- /dev/null +++ b/setup.sh @@ -0,0 +1,465 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [[ $EUID -ne 0 ]]; then + echo "This script must be run as root" + exit 1 +fi + +# =================================== SHARED CONFIG ========================================== +REPO="Cyborg-Network/Cyborg-miner" +MINER_ASSET_NAME="cyborg-miner" +PLATFORM="linux" + +case "$(uname -m)" in + x86_64) + ARCH="x86_64" + ;; + aarch64 | arm64) + ARCH="aarch64" + ;; + *) + echo "Unsupported architecture: $(uname -m)" + exit 1 + ;; +esac + +# We direct some output to stderr, as to not pollute stdout for check-update +echo "Detected architecture: ${ARCH}" >&2 + +# File names as they appear after installation +MINER_FILE_NAME="cyborg-miner" +AGENT_FILE_NAME="cyborg-agent" +SETUP_SCRIPT_FILE_NAME="setup.sh" + +# Paths for the files +AGENT_DIR="/home/ronnie/Cyborg/Cyborg-miner" +MINER_DIR="/home/ronnie/Cyborg/Cyborg-miner/target/release" +SCRIPT_DIR="/home/ronnie/Cyborg/Cyborg-miner" +TX_QUEUE_DB_PATH="/var/lib/cyborg/tx_queue_db" + +# Full paths +MINER_BINARY_PATH="$MINER_DIR/$MINER_FILE_NAME" +AGENT_BINARY_PATH="$AGENT_DIR/$AGENT_FILE_NAME" +SETUP_SCRIPT_PATH="$SCRIPT_DIR/$SETUP_SCRIPT_FILE_NAME" + +# Ports to be opened at the end of the script +MINER_INFERENCE_PORT=3000 +AGENT_HTTP_PORT=8080 +AGENT_WS_PORT=8081 +FLASH_INFER_PORT=3005 + +# Service files +MINER_SERVICE_FILE="/etc/systemd/system/$MINER_FILE_NAME.service" +AGENT_SERVICE_FILE="/etc/systemd/system/$AGENT_FILE_NAME.service" + +# ENV variables for the miner +MINER_TASK_DIR="/var/lib/cyborg/miner/task" +MINER_CONFIG_DIR="/etc/cyborg/miner" +MINER_TMP_DIR="/var/lib/cyborg/miner/tmp" +MINER_LOG_DIR="/var/log/cyborg/miner" + +# The tailscale network (only for testnet) on which the miner will be reachable +TAILSCALE_NET="tail78ea2b.ts.net" + +verify_release() { + #local file="$1" + #local sig_file="${file}.sig" + + #curl -L "${URL}.sig" -o "$sig_file" + + #if ! minisign -Vm "$file" -P ""; then + #echo "SIGNATURE VERIFICATION FAILED!" + #exit 1 + #fi + + echo "CRITICAL WARNING: Signature verification is not implemented yet!" +} + +# ======================================= UTIL =============================================================== +download_and_extract() { + # local tag="${1:-}" + + # if [[ -z "$tag" ]]; then + # echo "No tag provided, fetching latest release..." + # tag=$(curl -s https://api.github.com/repos/${REPO}/releases/latest | grep -Po '"tag_name": "\K.*?(?=")') + # fi + + # local asset="${MINER_ASSET_NAME}-${PLATFORM}-${ARCH}.tar.gz" + # local url="https://github.com/${REPO}/releases/download/${tag}/${asset}" + + verify_release + + TMP_DIR=$(mktemp -d) + trap "rm -rf \"$TMP_DIR\"" EXIT + + # echo "Downloading latest release: $tag..." + # curl -L "$url" -o "$TMP_DIR/release.tar.gz" + # tar -xf "$TMP_DIR/release.tar.gz" -C "$TMP_DIR" + + echo "Using local binaries from current directory..." + + # Files are in the current directory + MINER_BIN_X="/home/ronnie/Cyborg/Cyborg-miner/target/release/cyborg-miner" + AGENT_BIN_X="/home/ronnie/Cyborg/Cyborg-miner/cyborg-agent" + SETUP_SCRIPT="/home/ronnie/Cyborg/Cyborg-miner/setup.sh" + + if [[ -z "$MINER_BIN_X" || -z "$AGENT_BIN_X" || -z "$SETUP_SCRIPT" ]]; then + echo "Required files not found." + exit 1 + fi + + chmod +x "$MINER_BIN_X" "$AGENT_BIN_X" "$SETUP_SCRIPT" +} + +prepare_environment() { + echo "Preparing file system structure..." + + for dir in \ + "$AGENT_DIR" \ + "$MINER_DIR" \ + "$SCRIPT_DIR" \ + "$MINER_TASK_DIR" \ + "$MINER_CONFIG_DIR" \ + "$MINER_LOG_DIR" \ + "$MINER_TMP_DIR" \ + "/var/log/cyborg/agent" \ + "/var/lib/cyborg" \ + "/var/log/cyborg" \ + "/etc/cyborg" + do + if [[ ! -d "$dir" ]]; then + echo "Creating directory: $dir" + mkdir -p "$dir" + fi + done + + echo "Setting ownership and permissions..." + chown -R root:root /var/lib/cyborg /var/log/cyborg /etc/cyborg + chmod -R 755 /var/lib/cyborg /var/log/cyborg /etc/cyborg +} + +setup_docker() { + if ! command -v docker &> /dev/null; then + echo "[!] Docker not found. Installing Docker..." + apt-get update + apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release + + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg + + echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] \ + https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | \ + tee /etc/apt/sources.list.d/docker.list > /dev/null + + apt-get update + apt-get install -y docker-ce docker-ce-cli containerd.io + echo "Docker installed successfully." + else + echo "Docker is already installed." + fi + + echo "Docker setup complete." +} + +prepare_triton() { + echo "[*] Triton model repository directory: $MINER_TASK_DIR" + + if [ ! -d "$MINER_TASK_DIR" ]; then + echo "[!] Triton Model repository folder '$MINER_TASK_DIR' does not exist. Creating it..." + mkdir -p "$MINER_TASK_DIR" + echo "[✓] Created empty model directory." + fi + + if ! command -v docker &> /dev/null; then + echo "[!] Docker is not installed. Installing Docker..." + apt-get update + apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release + + curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg + + echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] \ + https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | \ + tee /etc/apt/sources.list.d/docker.list > /dev/null + + apt-get update + apt-get install -y docker-ce docker-ce-cli containerd.io + echo "[✓] Docker installed." + else + echo "[✓] Docker is already installed." + fi + + TRITON_IMAGE="nvcr.io/nvidia/tritonserver:25.06-py3" + TRITON_CONTAINER_NAME="triton_server" + + if docker ps -a --format '{{.Names}}' | grep -q "^$TRITON_CONTAINER_NAME\$"; then + if docker inspect -f '{{.State.Running}}' "$TRITON_CONTAINER_NAME" | grep -q "true"; then + echo "[✓] Triton container '$TRITON_CONTAINER_NAME' is already running." + else + echo "[~] Triton container exists but is not running. Restarting..." + docker start "$TRITON_CONTAINER_NAME" + fi + else + echo "[*] Pulling Triton server image..." + docker pull "$TRITON_IMAGE" + + echo "[🚀] Starting Triton server..." + docker run -d --name "$TRITON_CONTAINER_NAME" --restart unless-stopped \ + -p8000:8000 -p8001:8001 -p8002:8002 \ + -v "$MINER_TASK_DIR":/models \ + "$TRITON_IMAGE" \ + tritonserver --model-repository=/models --model-control-mode=explicit + fi +} + +setup_systemd() { + local PARACHAIN_URL="$1" + local ACCOUNT_SEED="$2" + local MINER_TYPE="$3" + local MINER_UUID="$4" + + echo "Creating systemd service for worker node: $MINER_SERVICE_FILE" + + bash -c "cat > $MINER_SERVICE_FILE" << EOL + [Unit] + Description=Service running the cyborg-miner. + After=network.target + Requires=docker.service + + [Service] + Type=simple + User=root + SupplementaryGroups=docker + Environment=PARACHAIN_URL=$PARACHAIN_URL + Environment="ACCOUNT_SEED=\"$ACCOUNT_SEED\"" + Environment=LOG_FILE_PATH=$MINER_LOG_DIR/miner.log + Environment=TASK_FILE_NAME=model.onnx + Environment=TASK_DIR_PATH=$MINER_TASK_DIR + Environment=IDENTITY_FILE_PATH=$MINER_CONFIG_DIR/miner_identity.json + Environment=TASK_OWNER_FILE_PATH=$MINER_CONFIG_DIR/task_owner.json + Environment=CURRENT_TASK_PATH=$MINER_CONFIG_DIR/current_task.json + Environment=MINER_TMP_DIR=$MINER_TMP_DIR + Environment=TAILSCALE_NET=$TAILSCALE_NET + Environment=FLASH_INFER_PORT=$FLASH_INFER_PORT + Environment=TX_QUEUE_DB_PATH=$TX_QUEUE_DB_PATH + Environment=MINER_TYPE=$MINER_TYPE + Environment=MINER_UUID=$MINER_UUID + ExecStart=$MINER_BINARY_PATH start-miner --parachain-url \$PARACHAIN_URL --account-seed "\$ACCOUNT_SEED" --miner-type $MINER_TYPE --miner-uuid \$MINER_UUID + Restart=always + SuccessExitStatus=75 + RestartSec=3 + + [Install] + WantedBy=multi-user.target +EOL + + echo "systemd service for $MINER_FILE_NAME created successfully!" + + echo "Creating systemd service for agent: $AGENT_SERVICE_FILE" + + bash -c "cat > $AGENT_SERVICE_FILE" << EOL + [Unit] + Description=Agent that is able to check the health of the miner, provide required info to the cyborg-parachain, and stream usage metrics and logs of the cyborg node. + After=network.target + + [Service] + User=root + Group=root + SupplementaryGroups=docker + Environment=LOG_FILE_PATH=$MINER_LOG_DIR/miner.log + Environment=TASK_OWNER_FILE_PATH=$MINER_CONFIG_DIR/task_owner.json + Environment=IDENTITY_FILE_PATH=$MINER_CONFIG_DIR/miner_identity.json + ExecStart=$AGENT_BINARY_PATH run + Restart=always + RestartSec=3 + + [Install] + WantedBy=multi-user.target +EOL + + echo "systemd service for $AGENT_FILE_NAME created successfully!" + + echo "Reloading systemd, enabling and starting $MINER_FILE_NAME and $AGENT_FILE_NAME services..." + + systemctl daemon-reexec + systemctl daemon-reload + systemctl enable "$MINER_FILE_NAME" + systemctl enable "$AGENT_FILE_NAME" + systemctl restart "$MINER_FILE_NAME" + systemctl restart "$AGENT_FILE_NAME" + + systemctl status "$MINER_FILE_NAME" --no-pager + systemctl status "$AGENT_FILE_NAME" --no-pager + + echo "Cyborg Miner and Agent are installed and running. Binaries are located at $MINER_BINARY_PATH and $AGENT_BINARY_PATH. Now attempting to open Port $AGENT_HTTP_PORT, $AGENT_WS_PORT and $MINER_INFERENCE_PORT to enable communication with Cyborg Connect and provide an inference endpoint." +} + +move_files() { + + echo "Moving the setup script to $SCRIPT_DIR..." + + +} + +open_firewall() { + if command -v ufw &> /dev/null; then + FIREWALL="ufw" + elif command -v firewall-cmd &> /dev/null; then + FIREWALL="firewalld" + elif command -v iptables &> /dev/null; then + FIREWALL="iptables" + else + echo "Firewall management tool not detected. Please open $AGENT_HTTP_PORT, $AGENT_WS_PORT and $MINER_INFERENCE_PORT manually for the miner to work." + echo "If in doubt, refer to the documentation of your firewall management tool for instructions." + fi + + open_ports_ufw() { + ufw allow $AGENT_WS_PORT + ufw allow $AGENT_HTTP_PORT + ufw allow $MINER_INFERENCE_PORT + echo "Ports opened in UFW." + } + + # Function to open ports with firewalld + open_ports_firewalld() { + firewall-cmd --permanent --add-port=$AGENT_HTTP_PORT/tcp + firewall-cmd --permanent --add-port=$AGENT_WS_PORT/tcp + firewall-cmd --permanent --add-port=$MINER_INFERENCE_PORT/tcp + firewall-cmd --reload + echo "Ports opened in firewalld." + } + + # Function to open ports with iptables + open_ports_iptables() { + iptables -A INPUT -p tcp --dport $AGENT_HTTP_PORT -j ACCEPT + iptables -A INPUT -p tcp --dport $AGENT_WS_PORT -j ACCEPT + iptables -A INPUT -p tcp --dport $MINER_INFERENCE_PORT -j ACCEPT + # Note: Rules added with iptables are not persistent across reboots unless saved. + echo "Ports opened in iptables." + } + + if [[ -n "${FIREWALL:-}" ]]; then + case $FIREWALL in + "ufw") + open_ports_ufw + ;; + "firewalld") + open_ports_firewalld + ;; + "iptables") + open_ports_iptables + ;; + esac + fi +} + +install() { + PARACHAIN_URL="ws://127.0.0.1:9988" + ACCOUNT_SEED="//Dave" + MINER_TYPE="cloud" + MINER_UUID="CD-9c9bfca6-9ee8-1bff-1fc0-a2952ee26f41" + + + if [[ -z "$PARACHAIN_URL" || -z "$ACCOUNT_SEED" || -z "$MINER_TYPE" || -z "$MINER_UUID" ]]; then + echo "ERROR: PARACHAIN_URL and ACCOUNT_SEED must be set in environment." + exit 1 + fi + + download_and_extract + prepare_environment + setup_docker + move_files + setup_systemd "$PARACHAIN_URL" "$ACCOUNT_SEED" "$MINER_TYPE" "$MINER_UUID" + open_firewall + #prepare_triton +} + + + +update() { + local current_version="$1" + local latest_tag="$2" + + ################### WARNING this is not safe, we need proper key management ########################## + echo "Reading current configuration from systemd service file..." + SERVICE_FILE="/etc/systemd/system/cyborg-miner.service" + + if [[ ! -f "$SERVICE_FILE" ]]; then + echo "Service file not found: $SERVICE_FILE" + echo "Cannot extract PARACHAIN_URL or ACCOUNT_SEED." + exit 1 + fi + + PARACHAIN_URL=$(systemctl show cyborg-miner.service -p Environment | grep -o 'PARACHAIN_URL=[^ ]*' | cut -d= -f2) + ACCOUNT_SEED=$(systemctl show cyborg-miner.service -p Environment | grep -o 'ACCOUNT_SEED=[^ ]*' | cut -d= -f2) + MINER_TYPE=$(systemctl show cyborg-miner.service -p Environment | grep -o 'MINER_TYPE=[^ ]*' | cut -d= -f2) + + if [[ -z "$PARACHAIN_URL" || -z "$ACCOUNT_SEED" || -z "$MINER_TYPE" ]]; then + echo "Failed to extract required variables from $SERVICE_FILE" + exit 1 + fi + + echo "PARACHAIN_URL: $PARACHAIN_URL" + echo "ACCOUNT_SEED: $ACCOUNT_SEED" + echo "MINER_TYPE: $MINER_TYPE" + + ############################################################################################################### + + echo "Updating from $current_version to $latest_tag..." + download_and_extract "$latest_tag" + prepare_environment + setup_docker + + # Avoid race condition + systemctl stop cyborg-miner.service + systemctl stop cyborg-agent.service + + move_files + setup_systemd "$PARACHAIN_URL" "$ACCOUNT_SEED" "$MINER_TYPE" + open_firewall + + echo "Update complete: $CURRENT_VERSION to $latest_tag" +} + +check_update() { + # We again direct some output to stderr, as to not pollute stdout for update + local current_version="$1" + + echo "Fetching latest release tag..." >&2 + + local latest_tag + latest_tag=$(curl -s https://api.github.com/repos/${REPO}/releases/latest | grep -Po '"tag_name": "\K.*?(?=")') + + echo "Current version: $current_version" >&2 + echo "Latest version available: $latest_tag" >&2 + + if [[ "$latest_tag" == "v$current_version" || "$latest_tag" == "$current_version" ]]; then + echo "Already up-to-date." >&2 + exit 0 + fi + + # As we can see, either stdout should be empty or contain the latest tag + echo "$latest_tag" +} + +# ======================================== DISPATCH ================================================== + +case "${1:-install}" in + install) + install + ;; + update) + CURRENT_VERSION="${2:-unknown}" + LATEST_TAG="${3:-unknown}" + update "$CURRENT_VERSION" "$LATEST_TAG" + ;; + check-update) + CURRENT_VERSION="${2:-unknown}" + check_update "$CURRENT_VERSION" + ;; + *) + echo "Usage: $0 {install|update current_version latest_tag|check-update current_version}" + exit 1 + ;; +esac From 63facfeea531d62882433b29d8733253ee093cbf Mon Sep 17 00:00:00 2001 From: ronnie-ahmed Date: Fri, 21 Nov 2025 21:35:20 +0600 Subject: [PATCH 3/4] resolving conflicts --- setup.sh | 465 ------------------------------------------------------- 1 file changed, 465 deletions(-) delete mode 100755 setup.sh diff --git a/setup.sh b/setup.sh deleted file mode 100755 index 0cf7963..0000000 --- a/setup.sh +++ /dev/null @@ -1,465 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -if [[ $EUID -ne 0 ]]; then - echo "This script must be run as root" - exit 1 -fi - -# =================================== SHARED CONFIG ========================================== -REPO="Cyborg-Network/Cyborg-miner" -MINER_ASSET_NAME="cyborg-miner" -PLATFORM="linux" - -case "$(uname -m)" in - x86_64) - ARCH="x86_64" - ;; - aarch64 | arm64) - ARCH="aarch64" - ;; - *) - echo "Unsupported architecture: $(uname -m)" - exit 1 - ;; -esac - -# We direct some output to stderr, as to not pollute stdout for check-update -echo "Detected architecture: ${ARCH}" >&2 - -# File names as they appear after installation -MINER_FILE_NAME="cyborg-miner" -AGENT_FILE_NAME="cyborg-agent" -SETUP_SCRIPT_FILE_NAME="setup.sh" - -# Paths for the files -AGENT_DIR="/home/ronnie/Cyborg/Cyborg-miner" -MINER_DIR="/home/ronnie/Cyborg/Cyborg-miner/target/release" -SCRIPT_DIR="/home/ronnie/Cyborg/Cyborg-miner" -TX_QUEUE_DB_PATH="/var/lib/cyborg/tx_queue_db" - -# Full paths -MINER_BINARY_PATH="$MINER_DIR/$MINER_FILE_NAME" -AGENT_BINARY_PATH="$AGENT_DIR/$AGENT_FILE_NAME" -SETUP_SCRIPT_PATH="$SCRIPT_DIR/$SETUP_SCRIPT_FILE_NAME" - -# Ports to be opened at the end of the script -MINER_INFERENCE_PORT=3000 -AGENT_HTTP_PORT=8080 -AGENT_WS_PORT=8081 -FLASH_INFER_PORT=3005 - -# Service files -MINER_SERVICE_FILE="/etc/systemd/system/$MINER_FILE_NAME.service" -AGENT_SERVICE_FILE="/etc/systemd/system/$AGENT_FILE_NAME.service" - -# ENV variables for the miner -MINER_TASK_DIR="/var/lib/cyborg/miner/task" -MINER_CONFIG_DIR="/etc/cyborg/miner" -MINER_TMP_DIR="/var/lib/cyborg/miner/tmp" -MINER_LOG_DIR="/var/log/cyborg/miner" - -# The tailscale network (only for testnet) on which the miner will be reachable -TAILSCALE_NET="tail78ea2b.ts.net" - -verify_release() { - #local file="$1" - #local sig_file="${file}.sig" - - #curl -L "${URL}.sig" -o "$sig_file" - - #if ! minisign -Vm "$file" -P ""; then - #echo "SIGNATURE VERIFICATION FAILED!" - #exit 1 - #fi - - echo "CRITICAL WARNING: Signature verification is not implemented yet!" -} - -# ======================================= UTIL =============================================================== -download_and_extract() { - # local tag="${1:-}" - - # if [[ -z "$tag" ]]; then - # echo "No tag provided, fetching latest release..." - # tag=$(curl -s https://api.github.com/repos/${REPO}/releases/latest | grep -Po '"tag_name": "\K.*?(?=")') - # fi - - # local asset="${MINER_ASSET_NAME}-${PLATFORM}-${ARCH}.tar.gz" - # local url="https://github.com/${REPO}/releases/download/${tag}/${asset}" - - verify_release - - TMP_DIR=$(mktemp -d) - trap "rm -rf \"$TMP_DIR\"" EXIT - - # echo "Downloading latest release: $tag..." - # curl -L "$url" -o "$TMP_DIR/release.tar.gz" - # tar -xf "$TMP_DIR/release.tar.gz" -C "$TMP_DIR" - - echo "Using local binaries from current directory..." - - # Files are in the current directory - MINER_BIN_X="/home/ronnie/Cyborg/Cyborg-miner/target/release/cyborg-miner" - AGENT_BIN_X="/home/ronnie/Cyborg/Cyborg-miner/cyborg-agent" - SETUP_SCRIPT="/home/ronnie/Cyborg/Cyborg-miner/setup.sh" - - if [[ -z "$MINER_BIN_X" || -z "$AGENT_BIN_X" || -z "$SETUP_SCRIPT" ]]; then - echo "Required files not found." - exit 1 - fi - - chmod +x "$MINER_BIN_X" "$AGENT_BIN_X" "$SETUP_SCRIPT" -} - -prepare_environment() { - echo "Preparing file system structure..." - - for dir in \ - "$AGENT_DIR" \ - "$MINER_DIR" \ - "$SCRIPT_DIR" \ - "$MINER_TASK_DIR" \ - "$MINER_CONFIG_DIR" \ - "$MINER_LOG_DIR" \ - "$MINER_TMP_DIR" \ - "/var/log/cyborg/agent" \ - "/var/lib/cyborg" \ - "/var/log/cyborg" \ - "/etc/cyborg" - do - if [[ ! -d "$dir" ]]; then - echo "Creating directory: $dir" - mkdir -p "$dir" - fi - done - - echo "Setting ownership and permissions..." - chown -R root:root /var/lib/cyborg /var/log/cyborg /etc/cyborg - chmod -R 755 /var/lib/cyborg /var/log/cyborg /etc/cyborg -} - -setup_docker() { - if ! command -v docker &> /dev/null; then - echo "[!] Docker not found. Installing Docker..." - apt-get update - apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release - - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg - - echo \ - "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] \ - https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | \ - tee /etc/apt/sources.list.d/docker.list > /dev/null - - apt-get update - apt-get install -y docker-ce docker-ce-cli containerd.io - echo "Docker installed successfully." - else - echo "Docker is already installed." - fi - - echo "Docker setup complete." -} - -prepare_triton() { - echo "[*] Triton model repository directory: $MINER_TASK_DIR" - - if [ ! -d "$MINER_TASK_DIR" ]; then - echo "[!] Triton Model repository folder '$MINER_TASK_DIR' does not exist. Creating it..." - mkdir -p "$MINER_TASK_DIR" - echo "[✓] Created empty model directory." - fi - - if ! command -v docker &> /dev/null; then - echo "[!] Docker is not installed. Installing Docker..." - apt-get update - apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release - - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg - - echo \ - "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] \ - https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | \ - tee /etc/apt/sources.list.d/docker.list > /dev/null - - apt-get update - apt-get install -y docker-ce docker-ce-cli containerd.io - echo "[✓] Docker installed." - else - echo "[✓] Docker is already installed." - fi - - TRITON_IMAGE="nvcr.io/nvidia/tritonserver:25.06-py3" - TRITON_CONTAINER_NAME="triton_server" - - if docker ps -a --format '{{.Names}}' | grep -q "^$TRITON_CONTAINER_NAME\$"; then - if docker inspect -f '{{.State.Running}}' "$TRITON_CONTAINER_NAME" | grep -q "true"; then - echo "[✓] Triton container '$TRITON_CONTAINER_NAME' is already running." - else - echo "[~] Triton container exists but is not running. Restarting..." - docker start "$TRITON_CONTAINER_NAME" - fi - else - echo "[*] Pulling Triton server image..." - docker pull "$TRITON_IMAGE" - - echo "[🚀] Starting Triton server..." - docker run -d --name "$TRITON_CONTAINER_NAME" --restart unless-stopped \ - -p8000:8000 -p8001:8001 -p8002:8002 \ - -v "$MINER_TASK_DIR":/models \ - "$TRITON_IMAGE" \ - tritonserver --model-repository=/models --model-control-mode=explicit - fi -} - -setup_systemd() { - local PARACHAIN_URL="$1" - local ACCOUNT_SEED="$2" - local MINER_TYPE="$3" - local MINER_UUID="$4" - - echo "Creating systemd service for worker node: $MINER_SERVICE_FILE" - - bash -c "cat > $MINER_SERVICE_FILE" << EOL - [Unit] - Description=Service running the cyborg-miner. - After=network.target - Requires=docker.service - - [Service] - Type=simple - User=root - SupplementaryGroups=docker - Environment=PARACHAIN_URL=$PARACHAIN_URL - Environment="ACCOUNT_SEED=\"$ACCOUNT_SEED\"" - Environment=LOG_FILE_PATH=$MINER_LOG_DIR/miner.log - Environment=TASK_FILE_NAME=model.onnx - Environment=TASK_DIR_PATH=$MINER_TASK_DIR - Environment=IDENTITY_FILE_PATH=$MINER_CONFIG_DIR/miner_identity.json - Environment=TASK_OWNER_FILE_PATH=$MINER_CONFIG_DIR/task_owner.json - Environment=CURRENT_TASK_PATH=$MINER_CONFIG_DIR/current_task.json - Environment=MINER_TMP_DIR=$MINER_TMP_DIR - Environment=TAILSCALE_NET=$TAILSCALE_NET - Environment=FLASH_INFER_PORT=$FLASH_INFER_PORT - Environment=TX_QUEUE_DB_PATH=$TX_QUEUE_DB_PATH - Environment=MINER_TYPE=$MINER_TYPE - Environment=MINER_UUID=$MINER_UUID - ExecStart=$MINER_BINARY_PATH start-miner --parachain-url \$PARACHAIN_URL --account-seed "\$ACCOUNT_SEED" --miner-type $MINER_TYPE --miner-uuid \$MINER_UUID - Restart=always - SuccessExitStatus=75 - RestartSec=3 - - [Install] - WantedBy=multi-user.target -EOL - - echo "systemd service for $MINER_FILE_NAME created successfully!" - - echo "Creating systemd service for agent: $AGENT_SERVICE_FILE" - - bash -c "cat > $AGENT_SERVICE_FILE" << EOL - [Unit] - Description=Agent that is able to check the health of the miner, provide required info to the cyborg-parachain, and stream usage metrics and logs of the cyborg node. - After=network.target - - [Service] - User=root - Group=root - SupplementaryGroups=docker - Environment=LOG_FILE_PATH=$MINER_LOG_DIR/miner.log - Environment=TASK_OWNER_FILE_PATH=$MINER_CONFIG_DIR/task_owner.json - Environment=IDENTITY_FILE_PATH=$MINER_CONFIG_DIR/miner_identity.json - ExecStart=$AGENT_BINARY_PATH run - Restart=always - RestartSec=3 - - [Install] - WantedBy=multi-user.target -EOL - - echo "systemd service for $AGENT_FILE_NAME created successfully!" - - echo "Reloading systemd, enabling and starting $MINER_FILE_NAME and $AGENT_FILE_NAME services..." - - systemctl daemon-reexec - systemctl daemon-reload - systemctl enable "$MINER_FILE_NAME" - systemctl enable "$AGENT_FILE_NAME" - systemctl restart "$MINER_FILE_NAME" - systemctl restart "$AGENT_FILE_NAME" - - systemctl status "$MINER_FILE_NAME" --no-pager - systemctl status "$AGENT_FILE_NAME" --no-pager - - echo "Cyborg Miner and Agent are installed and running. Binaries are located at $MINER_BINARY_PATH and $AGENT_BINARY_PATH. Now attempting to open Port $AGENT_HTTP_PORT, $AGENT_WS_PORT and $MINER_INFERENCE_PORT to enable communication with Cyborg Connect and provide an inference endpoint." -} - -move_files() { - - echo "Moving the setup script to $SCRIPT_DIR..." - - -} - -open_firewall() { - if command -v ufw &> /dev/null; then - FIREWALL="ufw" - elif command -v firewall-cmd &> /dev/null; then - FIREWALL="firewalld" - elif command -v iptables &> /dev/null; then - FIREWALL="iptables" - else - echo "Firewall management tool not detected. Please open $AGENT_HTTP_PORT, $AGENT_WS_PORT and $MINER_INFERENCE_PORT manually for the miner to work." - echo "If in doubt, refer to the documentation of your firewall management tool for instructions." - fi - - open_ports_ufw() { - ufw allow $AGENT_WS_PORT - ufw allow $AGENT_HTTP_PORT - ufw allow $MINER_INFERENCE_PORT - echo "Ports opened in UFW." - } - - # Function to open ports with firewalld - open_ports_firewalld() { - firewall-cmd --permanent --add-port=$AGENT_HTTP_PORT/tcp - firewall-cmd --permanent --add-port=$AGENT_WS_PORT/tcp - firewall-cmd --permanent --add-port=$MINER_INFERENCE_PORT/tcp - firewall-cmd --reload - echo "Ports opened in firewalld." - } - - # Function to open ports with iptables - open_ports_iptables() { - iptables -A INPUT -p tcp --dport $AGENT_HTTP_PORT -j ACCEPT - iptables -A INPUT -p tcp --dport $AGENT_WS_PORT -j ACCEPT - iptables -A INPUT -p tcp --dport $MINER_INFERENCE_PORT -j ACCEPT - # Note: Rules added with iptables are not persistent across reboots unless saved. - echo "Ports opened in iptables." - } - - if [[ -n "${FIREWALL:-}" ]]; then - case $FIREWALL in - "ufw") - open_ports_ufw - ;; - "firewalld") - open_ports_firewalld - ;; - "iptables") - open_ports_iptables - ;; - esac - fi -} - -install() { - PARACHAIN_URL="ws://127.0.0.1:9988" - ACCOUNT_SEED="//Dave" - MINER_TYPE="cloud" - MINER_UUID="CD-9c9bfca6-9ee8-1bff-1fc0-a2952ee26f41" - - - if [[ -z "$PARACHAIN_URL" || -z "$ACCOUNT_SEED" || -z "$MINER_TYPE" || -z "$MINER_UUID" ]]; then - echo "ERROR: PARACHAIN_URL and ACCOUNT_SEED must be set in environment." - exit 1 - fi - - download_and_extract - prepare_environment - setup_docker - move_files - setup_systemd "$PARACHAIN_URL" "$ACCOUNT_SEED" "$MINER_TYPE" "$MINER_UUID" - open_firewall - #prepare_triton -} - - - -update() { - local current_version="$1" - local latest_tag="$2" - - ################### WARNING this is not safe, we need proper key management ########################## - echo "Reading current configuration from systemd service file..." - SERVICE_FILE="/etc/systemd/system/cyborg-miner.service" - - if [[ ! -f "$SERVICE_FILE" ]]; then - echo "Service file not found: $SERVICE_FILE" - echo "Cannot extract PARACHAIN_URL or ACCOUNT_SEED." - exit 1 - fi - - PARACHAIN_URL=$(systemctl show cyborg-miner.service -p Environment | grep -o 'PARACHAIN_URL=[^ ]*' | cut -d= -f2) - ACCOUNT_SEED=$(systemctl show cyborg-miner.service -p Environment | grep -o 'ACCOUNT_SEED=[^ ]*' | cut -d= -f2) - MINER_TYPE=$(systemctl show cyborg-miner.service -p Environment | grep -o 'MINER_TYPE=[^ ]*' | cut -d= -f2) - - if [[ -z "$PARACHAIN_URL" || -z "$ACCOUNT_SEED" || -z "$MINER_TYPE" ]]; then - echo "Failed to extract required variables from $SERVICE_FILE" - exit 1 - fi - - echo "PARACHAIN_URL: $PARACHAIN_URL" - echo "ACCOUNT_SEED: $ACCOUNT_SEED" - echo "MINER_TYPE: $MINER_TYPE" - - ############################################################################################################### - - echo "Updating from $current_version to $latest_tag..." - download_and_extract "$latest_tag" - prepare_environment - setup_docker - - # Avoid race condition - systemctl stop cyborg-miner.service - systemctl stop cyborg-agent.service - - move_files - setup_systemd "$PARACHAIN_URL" "$ACCOUNT_SEED" "$MINER_TYPE" - open_firewall - - echo "Update complete: $CURRENT_VERSION to $latest_tag" -} - -check_update() { - # We again direct some output to stderr, as to not pollute stdout for update - local current_version="$1" - - echo "Fetching latest release tag..." >&2 - - local latest_tag - latest_tag=$(curl -s https://api.github.com/repos/${REPO}/releases/latest | grep -Po '"tag_name": "\K.*?(?=")') - - echo "Current version: $current_version" >&2 - echo "Latest version available: $latest_tag" >&2 - - if [[ "$latest_tag" == "v$current_version" || "$latest_tag" == "$current_version" ]]; then - echo "Already up-to-date." >&2 - exit 0 - fi - - # As we can see, either stdout should be empty or contain the latest tag - echo "$latest_tag" -} - -# ======================================== DISPATCH ================================================== - -case "${1:-install}" in - install) - install - ;; - update) - CURRENT_VERSION="${2:-unknown}" - LATEST_TAG="${3:-unknown}" - update "$CURRENT_VERSION" "$LATEST_TAG" - ;; - check-update) - CURRENT_VERSION="${2:-unknown}" - check_update "$CURRENT_VERSION" - ;; - *) - echo "Usage: $0 {install|update current_version latest_tag|check-update current_version}" - exit 1 - ;; -esac From f9812afcffa1dd4ac0d452d715052912dec1abf7 Mon Sep 17 00:00:00 2001 From: ronnie-ahmed Date: Tue, 25 Nov 2025 00:31:46 +0600 Subject: [PATCH 4/4] Modified persistent Tx --- miner/src/main.rs | 2 + miner/src/utils/task_handling.rs | 9 +- miner/src/utils/tx_builder.rs | 10 +-- miner/src/utils/tx_queue.rs | 137 +++++++++++++++++++++---------- 4 files changed, 106 insertions(+), 52 deletions(-) diff --git a/miner/src/main.rs b/miner/src/main.rs index 5fc86d7..aa359f5 100644 --- a/miner/src/main.rs +++ b/miner/src/main.rs @@ -69,6 +69,8 @@ async fn main() -> Result<()> { // Start background processor _queue.start_processing(); + + let miner_id_bytes = miner_uuid.as_bytes().to_vec(); let miner_uuid_bounded: MinerId = BoundedVec(miner_id_bytes); diff --git a/miner/src/utils/task_handling.rs b/miner/src/utils/task_handling.rs index 456e05c..b377670 100644 --- a/miner/src/utils/task_handling.rs +++ b/miner/src/utils/task_handling.rs @@ -335,14 +335,13 @@ pub async fn clean_up_current_task_and_vacate(miner: Arc) -> Result<()> { let tx_queue = global_config::get_tx_queue()?; let rx = tx_queue - .enqueue(move || { + .enqueue("vacate_miner", move || { let keypair = Arc::clone(&keypair); let miner_type = Arc::clone(&miner_type); - async move { - let _ = - confirm_miner_vacation(keypair, current_task_id, miner_type).await?; + Box::pin(async move { + let _ = confirm_miner_vacation(keypair, current_task_id, miner_type).await?; Ok(TxOutput::Success) - } + }) }) .await?; diff --git a/miner/src/utils/tx_builder.rs b/miner/src/utils/tx_builder.rs index ecf4016..0e0d27d 100644 --- a/miner/src/utils/tx_builder.rs +++ b/miner/src/utils/tx_builder.rs @@ -110,7 +110,7 @@ pub async fn pub_register( let tx_queue = global_config::get_tx_queue()?; let rx = tx_queue - .enqueue(move || { + .enqueue("register_miner", move || { let keypair = Arc::clone(&keypair); let miner_type = miner_type.clone(); let value = miner_uuid.clone(); @@ -238,13 +238,13 @@ pub async fn pub_confirm_task_reception( let tx_queue = global_config::get_tx_queue()?; let current_task_id_copy = *current_task_id; - let rx = tx_queue - .enqueue(move || { + let rx = tx_queue + .enqueue("confirm_task_reception", move || { let keypair = Arc::clone(&keypair); - async move { + Box::pin(async move { let _ = confirm_task_reception(keypair, ¤t_task_id_copy).await?; Ok(TxOutput::Success) - } + }) }) .await?; diff --git a/miner/src/utils/tx_queue.rs b/miner/src/utils/tx_queue.rs index 1367c27..e60aba3 100644 --- a/miner/src/utils/tx_queue.rs +++ b/miner/src/utils/tx_queue.rs @@ -2,21 +2,19 @@ use crate::error::Result; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use sled; - -use crate::global_config::TX_QUEUE_DB_PATH; -use crate::types::MinerIdentity; use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, future::Future, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::{Arc, Mutex as StdMutex}, }; +use std::sync::atomic::Ordering; use tokio::sync::{oneshot, Mutex}; use tokio::time::{sleep, Duration}; +use crate::global_config::TX_QUEUE_DB_PATH; +use crate::types::MinerIdentity; + const MAX_RETRIES: u32 = 500; const EMPTY_SLEEP_MS: u64 = 250; // when queue empty, sleep briefly and continue @@ -26,18 +24,19 @@ pub enum TxOutput { Success, } - pub type TxExecutor = Box Pin> + Send>> + Send + Sync>; #[derive(Serialize, Deserialize, Clone)] pub struct PersistentTx { pub id: u64, + pub task_key: String, + pub payload: Option>, pub retry_count: u32, pub timestamp: u64, - // pub kind: TxKind, } +/// In-memory Transaction pub struct Transaction { pub id: u64, pub executor: TxExecutor, @@ -59,16 +58,24 @@ impl Transaction { } } + +type TxHandler = Arc< + dyn Fn(PersistentTx) -> Pin> + Send>> + + Send + + Sync, +>; + pub struct TransactionQueue { inner: Arc>>, - processing: Arc, + processing: Arc, db: sled::Db, + registry: Arc>>, } pub static TRANSACTION_QUEUE: OnceCell> = OnceCell::new(); impl TransactionQueue { - /// Create/open sled DB and initialize queue. + /// Open sled DB and initialize queue + registry pub async fn new() -> Self { let db_path = TX_QUEUE_DB_PATH.as_str().to_string(); let db = tokio::task::spawn_blocking(move || sled::open(db_path)) @@ -77,7 +84,7 @@ impl TransactionQueue { .expect("Failed to open sled DB"); let queue = Arc::new(Mutex::new(VecDeque::new())); - let mut _persisted_count = 0usize; + let mut persisted_count = 0usize; { let db_clone = db.clone(); @@ -105,14 +112,31 @@ impl TransactionQueue { Self { inner: queue, - processing: Arc::new(AtomicBool::new(false)), + processing: Arc::new(std::sync::atomic::AtomicBool::new(false)), db, + registry: Arc::new(StdMutex::new(HashMap::new())), } } - + + + pub fn register_handler(&self, task_key: &str, handler: F) + where + F: Fn(PersistentTx) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let mut reg = self.registry.lock().expect("Registry mutex poisoned"); + reg.insert( + task_key.to_string(), + Arc::new(move |ptx: PersistentTx| Box::pin(handler(ptx))), + ); + } + + + /// Returns a oneshot receiver to await the tx result. pub async fn enqueue( self: &Arc, + task_key: &str, executor_fn: F, ) -> Result>> where @@ -133,12 +157,12 @@ impl TransactionQueue { let persistent_tx = PersistentTx { id: tx_id, + task_key: task_key.to_string(), + payload: None, retry_count: 0, timestamp: chrono::Utc::now().timestamp_millis() as u64, - }; - // persist then push to queue self.persist_tx(&persistent_tx).await; { @@ -146,8 +170,9 @@ impl TransactionQueue { q.push_back(tx_obj); println!( - "[TX-QUEUE] Enqueued tx #{}. Queue size: {}", + "[TX-QUEUE] Enqueued tx #{} (task_key={}). Queue size: {}", tx_id, + task_key, q.len() ); } @@ -155,20 +180,15 @@ impl TransactionQueue { Ok(rx) } - /// Start background processing. This method is idempotent. - /// Processor stays alive (loops forever) and polls the queue, sleeping when empty. pub fn start_processing(&self) { - // If already running, do nothing if self.processing.swap(true, Ordering::SeqCst) { return; } let inner = Arc::clone(&self.inner); - let processing_flag = Arc::clone(&self.processing); let db = self.db.clone(); tokio::spawn(async move { - // keep processor alive until process exits loop { let tx_opt = { let mut queue = inner.lock().await; @@ -187,7 +207,7 @@ impl TransactionQueue { Ok(result) => { println!("[TX-QUEUE] Tx #{} succeeded: {:?}", tx.id, result); - // delete persisted entry (run in blocking task) + // delete persisted entry let _ = tokio::task::spawn_blocking({ let db = db.clone(); let id = tx.id; @@ -206,16 +226,15 @@ impl TransactionQueue { println!("[TX-QUEUE] Tx #{} failed: {}", tx.id, e); // increment retry count and persist the updated retry count - let mut new_retry = tx.retry_count + 1; - // Save updated retry_count to DB + let new_retry = tx.retry_count + 1; let _ = tokio::task::spawn_blocking({ let db = db.clone(); let id = tx.id; - let retry = new_retry; + let retry_val = new_retry; move || { if let Ok(Some(v)) = db.get(id.to_be_bytes()) { if let Ok(mut ptx) = bincode::deserialize::(&v) { - ptx.retry_count = retry; + ptx.retry_count = retry_val; let _ = db.insert(id.to_be_bytes(), bincode::serialize(&ptx).unwrap()); } } @@ -257,25 +276,20 @@ impl TransactionQueue { } } None => { - // queue empty -> pause briefly then continue (processor stays running) - // set processing flag true (already true) + // queue empty -> sleep briefly then continue sleep(Duration::from_millis(EMPTY_SLEEP_MS)).await; continue; } } } - - }); } - /// Restore persisted transactions by using a build function that converts PersistentTx -> Transaction. - /// This function returns the number of restored txs. After restore, call `start_processing()` (it will be no-op if already running). - pub async fn restore_from_db(&self, mut build_fn: F) -> usize - where - F: FnMut(PersistentTx) -> Transaction, - { + /// Returns number of restored transactions. + pub async fn restore_from_db_using_registry(&self) -> usize { let mut restored_count = 0usize; + + // read persisted entries in a blocking task let entries: Vec = { let db = self.db.clone(); tokio::task::spawn_blocking(move || { @@ -295,8 +309,46 @@ impl TransactionQueue { { let mut q = self.inner.lock().await; + let registry_guard = self.registry.lock().expect("Registry mutex poisoned"); + for p in entries { - let tx_obj = build_fn(p.clone()); + let tx_obj = if let Some(handler) = registry_guard.get(&p.task_key) { + let handler_clone = Arc::clone(handler); + let p_clone = p.clone(); + let exec: TxExecutor = Box::new(move || { + let handler_clone = Arc::clone(&handler_clone); + let ptx = p_clone.clone(); + Box::pin(async move { (handler_clone)(ptx).await }) + }); + + Transaction { + id: p.id, + executor: exec, + responder: None, + retry_count: p.retry_count, + } + } else { + let missing_key = p.task_key.clone(); + let id_for_delete = p.id; + let exec: TxExecutor = Box::new(move || { + let missing_key_clone = missing_key.clone(); + + Box::pin(async move { + Err(crate::error::Error::Custom(format!( + "No handler registered for task_key '{}'", + missing_key_clone + ))) + }) + }); + + Transaction { + id: p.id, + executor: exec, + responder: None, + retry_count: p.retry_count, + } + }; + q.push_back(tx_obj); restored_count += 1; } @@ -305,11 +357,12 @@ impl TransactionQueue { restored_count } - /// write PersistentTx to sled async fn persist_tx(&self, tx: &PersistentTx) { let db = self.db.clone(); let key = tx.id.to_be_bytes(); - let value = bincode::serialize(tx).unwrap(); + let value = bincode::serialize(tx).unwrap_or_else(|e| { + panic!("Failed to serialize PersistentTx: {}", e); + }); let _ = tokio::task::spawn_blocking(move || db.insert(key, value)).await; } @@ -340,4 +393,4 @@ impl TransactionQueue { .await .unwrap() } -} \ No newline at end of file +}