Skip to content

Commit

Permalink
validator/pow: stoppable mining
Browse files Browse the repository at this point in the history
  • Loading branch information
aggstam authored and bobsummerwill committed Sep 29, 2023
1 parent 555854a commit 451f178
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 88 deletions.
11 changes: 7 additions & 4 deletions bin/darkfid2/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,13 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
darkfid.validator.write().await.purge_pending_txs().await?;

// Consensus protocol
let consensus_task = if args.consensus {
let (consensus_task, consensus_sender) = if args.consensus {
info!(target: "darkfid", "Starting consensus protocol task");
let (sender, recvr) = smol::channel::bounded(1);
let task = StoppableTask::new();
task.clone().start(
// Weird hack to prevent lifetimes hell
async move { miner_task(&darkfid).await },
async move { miner_task(&darkfid, &recvr).await },
|res| async {
match res {
Ok(()) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }
Expand All @@ -256,10 +257,10 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
Error::MinerTaskStopped,
ex.clone(),
);
Some(task)
(Some(task), Some(sender))
} else {
info!(target: "darkfid", "Not participating in consensus");
None
(None, None)
};

// Signal handling for graceful termination.
Expand All @@ -278,6 +279,8 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
consensus_p2p.unwrap().stop().await;

info!(target: "darkfid", "Stopping consensus task...");
// Send signal to spawned miner threads to stop
consensus_sender.unwrap().send(()).await?;
consensus_task.unwrap().stop().await;
}

Expand Down
23 changes: 12 additions & 11 deletions bin/darkfid2/src/task/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@
*/

use darkfi::{
blockchain::BlockInfo,
system::sleep,
util::time::Timestamp,
validator::pow::{mine_block, PoWModule},
Result,
blockchain::BlockInfo, system::sleep, util::time::Timestamp, validator::pow::PoWModule, Result,
};
use log::info;
use smol::channel::Receiver;

use crate::{proto::BlockInfoMessage, Darkfid};

/// async task used for participating in the PoW consensus protocol
pub async fn miner_task(node: &Darkfid) -> Result<()> {
pub async fn miner_task(node: &Darkfid, stop_signal: &Receiver<()>) -> Result<()> {
// TODO: For now we asume we have a single miner that produces block,
// until the PoW consensus and proper validations have been added.
// The miner workflow would be:
Expand All @@ -52,6 +49,14 @@ pub async fn miner_task(node: &Darkfid) -> Result<()> {
// We sleep so our miner can grab their pickaxe
sleep(10).await;

// Start miner loop
miner_loop(node, stop_signal).await?;

Ok(())
}

/// Miner loop
async fn miner_loop(node: &Darkfid, stop_signal: &Receiver<()>) -> Result<()> {
// TODO: add miner threads arg
// Generate a PoW module
let mut module = PoWModule::new(node.validator.read().await.blockchain.clone(), None, Some(90));
Expand All @@ -73,7 +78,7 @@ pub async fn miner_task(node: &Darkfid) -> Result<()> {
next_block.header.previous = last.hash()?;
next_block.header.height = last.header.height + 1;
next_block.header.timestamp = Timestamp::current_time();
mine_block(module.clone(), &mut next_block);
module.mine_block(&mut next_block, stop_signal)?;

// Verify it
module.verify_block(&next_block)?;
Expand All @@ -90,9 +95,5 @@ pub async fn miner_task(node: &Darkfid) -> Result<()> {

// Update PoW module
module.append(timestamp, &difficulty);

// TODO: remove this once mining is not blocking
// Lazy way to enable stopping this task
sleep(10).await;
}
}
165 changes: 92 additions & 73 deletions src/validator/pow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use darkfi_sdk::{
use log::info;
use num_bigint::BigUint;
use randomx::{RandomXCache, RandomXDataset, RandomXFlags, RandomXVM};
use smol::channel::Receiver;

use crate::{
blockchain::{BlockInfo, Blockchain},
util::{ringbuffer::RingBuffer, time::Timestamp},
Result,
Error, Result,
};

// TODO: replace asserts with error returns
Expand Down Expand Up @@ -232,6 +233,93 @@ impl PoWModule {
self.cummulative_difficulty += difficulty;
self.difficulties.push(self.cummulative_difficulty.clone());
}

/// Mine provided block, based on provided PoW module next mine target and difficulty
pub fn mine_block(
&self,
miner_block: &mut BlockInfo,
stop_signal: &Receiver<()>,
) -> Result<()> {
let miner_setup = Instant::now();

// Grab the next mine target
let target = self.next_mine_target();
info!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{:064x}", target);

// Get the PoW input. The key changes with every mined block.
let input = miner_block.header.previous;
info!(target: "validator::pow::mine_block", "[MINER] PoW input: {}", input.to_hex());
let flags = RandomXFlags::default() | RandomXFlags::FULLMEM;
info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX dataset...");
let dataset = Arc::new(RandomXDataset::new(flags, input.as_bytes(), self.threads).unwrap());
info!(target: "validator::pow::mine_block", "[MINER] Setup time: {:?}", miner_setup.elapsed());

// Multithreaded mining setup
let mining_time = Instant::now();
let mut handles = vec![];
let found_block = Arc::new(AtomicBool::new(false));
let found_nonce = Arc::new(AtomicU32::new(0));
let threads = self.threads as u32;
for t in 0..threads {
let target = target.clone();
let mut block = miner_block.clone();
let found_block = Arc::clone(&found_block);
let found_nonce = Arc::clone(&found_nonce);
let dataset = Arc::clone(&dataset);
let stop_signal = stop_signal.clone();

handles.push(thread::spawn(move || {
info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX VM #{}...", t);
let mut miner_nonce = t;
let vm = RandomXVM::new_fast(flags, &dataset).unwrap();
loop {
// Check if stop signal was received
if stop_signal.is_full() {
info!(target: "validator::pow::mine_block", "[MINER] Stop signal received, thread #{} exiting", t);
break
}

block.header.nonce = pallas::Base::from(miner_nonce as u64);
if found_block.load(Ordering::SeqCst) {
info!(target: "validator::pow::mine_block", "[MINER] Block found, thread #{} exiting", t);
break
}

let out_hash = vm.hash(block.hash().unwrap().as_bytes());
let out_hash = BigUint::from_bytes_be(&out_hash);
if out_hash <= target {
found_block.store(true, Ordering::SeqCst);
found_nonce.store(miner_nonce, Ordering::SeqCst);
info!(target: "validator::pow::mine_block", "[MINER] Thread #{} found block using nonce {}",
t, miner_nonce
);
info!(target: "validator::pow::mine_block", "[MINER] Block hash {}", block.hash().unwrap().to_hex());
info!(target: "validator::pow::mine_block", "[MINER] RandomX output: 0x{:064x}", out_hash);
break
}

// This means thread 0 will use nonces, 0, 4, 8, ...
// and thread 1 will use nonces, 1, 5, 9, ...
miner_nonce += threads;
}
}));
}

for handle in handles {
let _ = handle.join();
}
// Check if stop signal was received
if stop_signal.is_full() {
return Err(Error::MinerTaskStopped)
}

info!(target: "validator::pow::mine_block", "[MINER] Mining time: {:?}", mining_time.elapsed());

// Set the valid mined nonce in the block
miner_block.header.nonce = pallas::Base::from(found_nonce.load(Ordering::SeqCst) as u64);

Ok(())
}
}

impl std::fmt::Display for PoWModule {
Expand All @@ -245,76 +333,6 @@ impl std::fmt::Display for PoWModule {
}
}

// TODO: Move this inside PoWModule(if possible) and use stoppable task so
// we can stop it externally
/// Mine provided block, based on provided PoW module next mine target and difficulty
pub fn mine_block(module: PoWModule, miner_block: &mut BlockInfo) {
let miner_setup = Instant::now();

// Grab the next mine target
let target = module.next_mine_target();
info!(target: "validator::pow::mine_block", "[MINER] Mine target: 0x{:064x}", target);

// Get the PoW input. The key changes with every mined block.
let input = miner_block.header.previous;
info!(target: "validator::pow::mine_block", "[MINER] PoW input: {}", input.to_hex());
let flags = RandomXFlags::default() | RandomXFlags::FULLMEM;
info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX dataset...");
let dataset = Arc::new(RandomXDataset::new(flags, input.as_bytes(), module.threads).unwrap());
info!(target: "validator::pow::mine_block", "[MINER] Setup time: {:?}", miner_setup.elapsed());

// Multithreaded mining setup
let mining_time = Instant::now();
let mut handles = vec![];
let found_block = Arc::new(AtomicBool::new(false));
let found_nonce = Arc::new(AtomicU32::new(0));
for t in 0..module.threads {
let target = target.clone();
let mut block = miner_block.clone();
let found_block = Arc::clone(&found_block);
let found_nonce = Arc::clone(&found_nonce);
let dataset = Arc::clone(&dataset);

handles.push(thread::spawn(move || {
info!(target: "validator::pow::mine_block", "[MINER] Initializing RandomX VM #{}...", t);
let mut miner_nonce = t as u32;
let vm = RandomXVM::new_fast(flags, &dataset).unwrap();
loop {
block.header.nonce = pallas::Base::from(miner_nonce as u64);
if found_block.load(Ordering::SeqCst) {
info!(target: "validator::pow::mine_block", "[MINER] Block found, thread #{} exiting", t);
break
}

let out_hash = vm.hash(block.hash().unwrap().as_bytes());
let out_hash = BigUint::from_bytes_be(&out_hash);
if out_hash <= target {
found_block.store(true, Ordering::SeqCst);
found_nonce.store(miner_nonce, Ordering::SeqCst);
info!(target: "validator::pow::mine_block", "[MINER] Thread #{} found block using nonce {}",
t, miner_nonce
);
info!(target: "validator::pow::mine_block", "[MINER] Block hash {}", block.hash().unwrap().to_hex());
info!(target: "validator::pow::mine_block", "[MINER] RandomX output: 0x{:064x}", out_hash);
break
}

// This means thread 0 will use nonces, 0, 4, 8, ...
// and thread 1 will use nonces, 1, 5, 9, ...
miner_nonce += module.threads as u32;
}
}));
}

for handle in handles {
let _ = handle.join();
}
info!(target: "validator::pow::mine_block", "[MINER] Mining time: {:?}", mining_time.elapsed());

// Set the valid mined nonce in the block
miner_block.header.nonce = pallas::Base::from(found_nonce.load(Ordering::SeqCst) as u64);
}

// TODO: move these to utils or something
fn get_mid(a: u64, b: u64) -> u64 {
(a / 2) + (b / 2) + ((a - 2 * (a / 2)) + (b - 2 * (b / 2))) / 2
Expand Down Expand Up @@ -352,7 +370,7 @@ mod tests {
Result,
};

use super::{mine_block, PoWModule};
use super::PoWModule;

const DEFAULT_TEST_DIFFICULTY_TARGET: usize = 120;

Expand Down Expand Up @@ -394,12 +412,13 @@ mod tests {
let sled_db = sled::Config::new().temporary(true).open()?;
let blockchain = Blockchain::new(&sled_db)?;
let module = PoWModule::new(blockchain, None, Some(DEFAULT_TEST_DIFFICULTY_TARGET));
let (_, recvr) = smol::channel::bounded(1);
let genesis_block = BlockInfo::default();

// Mine next block
let mut next_block = BlockInfo::default();
next_block.header.previous = genesis_block.hash()?;
mine_block(module.clone(), &mut next_block);
module.mine_block(&mut next_block, &recvr)?;

// Verify it
module.verify_block(&next_block)?;
Expand Down

0 comments on commit 451f178

Please sign in to comment.