From 9e5d63cf64e330a55327c5d55dc72e7396967e2a Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 25 Sep 2025 14:38:10 +0400 Subject: [PATCH 1/9] feat: implementation of the multi-threaded-scheduler --- Cargo.lock | 1 + Cargo.toml | 1 + magicblock-processor/Cargo.toml | 2 + magicblock-processor/src/executor/mod.rs | 3 +- magicblock-processor/src/lib.rs | 2 - magicblock-processor/src/scheduler.rs | 11 +-- magicblock-processor/src/scheduler/locks.rs | 82 +++++++++++++++++++++ magicblock-processor/src/scheduler/queue.rs | 14 ++++ 8 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 magicblock-processor/src/scheduler/locks.rs create mode 100644 magicblock-processor/src/scheduler/queue.rs diff --git a/Cargo.lock b/Cargo.lock index 14b7c3617..e989e0a35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4031,6 +4031,7 @@ dependencies = [ "magicblock-ledger", "magicblock-program", "parking_lot 0.12.4", + "rustc-hash 2.1.1", "solana-account", "solana-address-lookup-table-program", "solana-bpf-loader-program", diff --git a/Cargo.toml b/Cargo.toml index a7b8e132b..6326cc3e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ rand = "0.8.5" rayon = "1.10.0" rusqlite = { version = "0.34.0", features = ["bundled"] } # bundled sqlite 3.44 rustc_version = "0.4" +rustc-hash = "2.1" scc = "2.4" semver = "1.0.22" serde = "1.0.217" diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 490720242..3fbb12c84 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -38,6 +38,8 @@ solana-transaction = { workspace = true } solana-transaction-status = { workspace = true } solana-transaction-error = { workspace = true } +rustc-hash = { workspace = true } + [dev-dependencies] guinea = { workspace = true } solana-signature = { workspace = true } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 67f0deda5..613007c88 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -20,7 +20,8 @@ use solana_svm::transaction_processor::{ use tokio::{runtime::Builder, sync::mpsc::Sender}; use crate::{ - builtins::BUILTINS, scheduler::state::TransactionSchedulerState, WorkerId, + builtins::BUILTINS, + scheduler::{locks::WorkerId, state::TransactionSchedulerState}, }; /// A dedicated, single-threaded worker responsible for processing transactions using diff --git a/magicblock-processor/src/lib.rs b/magicblock-processor/src/lib.rs index d85cc2b9c..009fb54f1 100644 --- a/magicblock-processor/src/lib.rs +++ b/magicblock-processor/src/lib.rs @@ -10,8 +10,6 @@ use solana_program::feature; use solana_rent_collector::RentCollector; use solana_svm::transaction_processor::TransactionProcessingEnvironment; -type WorkerId = u8; - /// Initialize an SVM enviroment for transaction processing pub fn build_svm_env( accountsdb: &AccountsDb, diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs index bf0f9cf73..9227ecf17 100644 --- a/magicblock-processor/src/scheduler.rs +++ b/magicblock-processor/src/scheduler.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc, RwLock}; +use locks::{WorkerId, MAX_SVM_WORKERS}; use log::info; use magicblock_core::link::transactions::{ ProcessableTransaction, TransactionToProcessRx, @@ -12,10 +13,7 @@ use tokio::{ sync::mpsc::{channel, Receiver, Sender}, }; -use crate::{ - executor::{SimpleForkGraph, TransactionExecutor}, - WorkerId, -}; +use crate::executor::{SimpleForkGraph, TransactionExecutor}; /// The central transaction scheduler responsible for distributing work to a /// pool of `TransactionExecutor` workers. @@ -46,7 +44,8 @@ impl TransactionScheduler { /// 1. Prepares the shared program cache and ensures necessary sysvars are in the `AccountsDb`. /// 2. Creates a pool of `TransactionExecutor` workers, each with its own dedicated channel. /// 3. Spawns each worker in its own OS thread for maximum isolation and performance. - pub fn new(workers: u8, state: TransactionSchedulerState) -> Self { + pub fn new(workers: u32, state: TransactionSchedulerState) -> Self { + let workers = workers.min(MAX_SVM_WORKERS); let index = Arc::new(AtomicUsize::new(0)); let mut executors = Vec::with_capacity(workers as usize); @@ -154,4 +153,6 @@ impl TransactionScheduler { } } +pub mod locks; +pub mod queue; pub mod state; diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs new file mode 100644 index 000000000..4be787fdb --- /dev/null +++ b/magicblock-processor/src/scheduler/locks.rs @@ -0,0 +1,82 @@ +use std::{cell::RefCell, rc::Rc}; + +use rustc_hash::FxHashMap; +use solana_pubkey::Pubkey; + +pub(crate) type WorkerId = u32; + +pub type LocksCache = FxHashMap>>; + +type ReadWriteLock = u64; +#[derive(Clone, Copy, Default)] +#[repr(transparent)] +struct TransactionId(u32); + +pub(crate) const MAX_SVM_WORKERS: u32 = ReadWriteLock::BITS - 1; +const WRITE_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 1); + +impl TransactionId { + pub(crate) fn next() -> Self { + static mut COUNTER: u32 = 0; + let id = unsafe { + COUNTER = COUNTER.wrapping_add(1).max(1); + COUNTER + }; + Self(id) + } + + fn clear(&mut self) { + self.0 = 0; + } +} + +#[derive(Default)] +pub(crate) struct AccountLock { + rw: ReadWriteLock, + contender: TransactionId, +} + +impl AccountLock { + pub(crate) fn write( + &mut self, + worker: WorkerId, + txn: TransactionId, + ) -> Result<(), WorkerId> { + if self.rw != 0 { + self.contend(txn); + return Err(self.rw.trailing_zeros()); + } + self.contender.clear(); + self.rw |= WRITE_BIT_MASK | (1 << worker); + Ok(()) + } + + pub(crate) fn read( + &mut self, + worker: WorkerId, + txn: TransactionId, + ) -> Result<(), WorkerId> { + if self.rw & WRITE_BIT_MASK != 0 { + self.contend(txn); + return Err(self.rw.trailing_zeros()); + } + self.contender.clear(); + self.rw |= 1 << worker; + Ok(()) + } + + pub(crate) fn unlock(&mut self, worker: WorkerId) { + self.rw &= !(WRITE_BIT_MASK | (1 << worker)); + } + + fn contended(&self, txn: TransactionId) -> bool { + return self.contender.0 != 0 && self.contender.0 == txn.0; + } + + fn contend(&mut self, txn: TransactionId) { + if self.contended(txn) { + return; + } + self.contender = txn; + } +} diff --git a/magicblock-processor/src/scheduler/queue.rs b/magicblock-processor/src/scheduler/queue.rs new file mode 100644 index 000000000..eb4eef8cb --- /dev/null +++ b/magicblock-processor/src/scheduler/queue.rs @@ -0,0 +1,14 @@ +use std::collections::VecDeque; + +use magicblock_core::link::transactions::ProcessableTransaction; + +pub struct BlockedTransactionsQueue { + inner: Vec>, +} + +impl BlockedTransactionsQueue { + pub(crate) fn new(workers: u32) -> Self { + let inner = (0..workers).map(|_| VecDeque::new()).collect(); + Self { inner } + } +} From b8cd7679bab57b61d00d384ef5d4bd700e983017 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Mon, 29 Sep 2025 12:34:20 +0400 Subject: [PATCH 2/9] feat: add locking coordinator to the scheduler --- magicblock-processor/src/executor/mod.rs | 10 +- magicblock-processor/src/scheduler.rs | 97 ++++++++++++---- .../src/scheduler/coordinator.rs | 109 ++++++++++++++++++ magicblock-processor/src/scheduler/locks.rs | 104 +++++++++-------- magicblock-processor/src/scheduler/queue.rs | 14 --- 5 files changed, 239 insertions(+), 95 deletions(-) create mode 100644 magicblock-processor/src/scheduler/coordinator.rs delete mode 100644 magicblock-processor/src/scheduler/queue.rs diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 613007c88..44dd90e4d 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -21,7 +21,7 @@ use tokio::{runtime::Builder, sync::mpsc::Sender}; use crate::{ builtins::BUILTINS, - scheduler::{locks::WorkerId, state::TransactionSchedulerState}, + scheduler::{locks::ExecutorId, state::TransactionSchedulerState}, }; /// A dedicated, single-threaded worker responsible for processing transactions using @@ -31,7 +31,7 @@ use crate::{ /// executors can be spawned to process transactions in parallel. pub(super) struct TransactionExecutor { /// A unique identifier for this worker instance. - id: WorkerId, + id: ExecutorId, /// A handle to the global accounts database for reading and writing account state. accountsdb: Arc, /// A handle to the global ledger for writing committed transaction history. @@ -51,7 +51,7 @@ pub(super) struct TransactionExecutor { /// A channel to send out account state updates after processing. accounts_tx: AccountUpdateTx, /// A back-channel to notify the `TransactionScheduler` that this worker is ready for more work. - ready_tx: Sender, + ready_tx: Sender, /// A read lock held during a slot's processing to synchronize with critical global /// operations like `AccountsDb` snapshots. sync: StWLock, @@ -66,10 +66,10 @@ impl TransactionExecutor { /// with a globally shared one. This allows updates made by one executor to be immediately /// visible to all others, preventing redundant program loads. pub(super) fn new( - id: WorkerId, + id: ExecutorId, state: &TransactionSchedulerState, rx: TransactionToProcessRx, - ready_tx: Sender, + ready_tx: Sender, index: Arc, programs_cache: Arc>>, ) -> Self { diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs index 9227ecf17..24c82bc0e 100644 --- a/magicblock-processor/src/scheduler.rs +++ b/magicblock-processor/src/scheduler.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicUsize, Arc, RwLock}; -use locks::{WorkerId, MAX_SVM_WORKERS}; +use coordinator::ExecutionCoordinator; +use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use log::info; use magicblock_core::link::transactions::{ ProcessableTransaction, TransactionToProcessRx, @@ -22,11 +23,13 @@ use crate::executor::{SimpleForkGraph, TransactionExecutor}; /// pipeline. It receives transactions from a global queue and dispatches them to available /// worker threads for execution or simulation. pub struct TransactionScheduler { + /// Scheduling/Execution orchestrator + coordinator: ExecutionCoordinator, /// The receiving end of the global queue for all new transactions. transactions_rx: TransactionToProcessRx, /// A channel that receives readiness notifications from workers, /// indicating they are free to accept new work. - ready_rx: Receiver, + ready_rx: Receiver, /// A list of sender channels, one for each `TransactionExecutor` worker. executors: Vec>, /// A handle to the globally shared cache for loaded BPF programs. @@ -44,23 +47,23 @@ impl TransactionScheduler { /// 1. Prepares the shared program cache and ensures necessary sysvars are in the `AccountsDb`. /// 2. Creates a pool of `TransactionExecutor` workers, each with its own dedicated channel. /// 3. Spawns each worker in its own OS thread for maximum isolation and performance. - pub fn new(workers: u32, state: TransactionSchedulerState) -> Self { - let workers = workers.min(MAX_SVM_WORKERS); + pub fn new(executors: u32, state: TransactionSchedulerState) -> Self { + let count = executors.min(MAX_SVM_EXECUTORS) as usize; let index = Arc::new(AtomicUsize::new(0)); - let mut executors = Vec::with_capacity(workers as usize); + let mut executors = Vec::with_capacity(count); // Create the back-channel for workers to signal their readiness. - let (ready_tx, ready_rx) = channel(workers as usize); + let (ready_tx, ready_rx) = channel(count); // Perform one-time setup of the shared program cache and sysvars. let program_cache = state.prepare_programs_cache(); state.prepare_sysvars(); - for id in 0..workers { + for id in 0..count { // Each executor has a channel capacity of 1, as it // can only process one transaction at a time. let (transactions_tx, transactions_rx) = channel(1); let executor = TransactionExecutor::new( - id, + id as u32, &state, transactions_rx, ready_tx.clone(), @@ -71,7 +74,9 @@ impl TransactionScheduler { executor.spawn(); executors.push(transactions_tx); } + let coordinator = ExecutionCoordinator::new(count); Self { + coordinator, transactions_rx: state.txn_to_process_rx, ready_rx, executors, @@ -107,28 +112,23 @@ impl TransactionScheduler { /// 3. Receiving a notification of a new block, triggering a slot transition. async fn run(mut self) { let mut block_produced = self.latest_block.subscribe(); - let mut ready = true; loop { tokio::select! { biased; // A worker has finished its task and is ready for more. - Some(_) = self.ready_rx.recv() => { - // TODO(bmuddha): - // This branch will be used by a multi-threaded scheduler - // with account-level locking to manage the pool of ready workers. - ready = true; + Some(executor) = self.ready_rx.recv() => { + self.coordinator.unlock_accounts(executor); + self.reschedule_blocked_transactions(executor).await; } // Receive new transactions for scheduling. - Some(txn) = self.transactions_rx.recv(), if ready => { - // TODO(bmuddha): - // The current implementation sends to the first worker only. - // A future implementation with account-level locking will enable - // dispatching to any available worker. - let Some(tx) = self.executors.first() else { - continue; - }; - let _ = tx.send(txn).await; - ready = false; + Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { + // NOTE: unwrap_or will never happen as ready() guard gaurantees that + // Some(_) will be returned, it's more eye pleasing than panicking + let executor = self + .coordinator + .get_ready_executor() + .unwrap_or(ExecutorId::MIN); + self.schedule_transaction(executor, txn).await; } // A new block has been produced. _ = block_produced.recv() => { @@ -151,8 +151,55 @@ impl TransactionScheduler { self.program_cache.write().unwrap().latest_root_slot = self.latest_block.load().slot; } + + async fn reschedule_blocked_transactions(&mut self, blocking: ExecutorId) { + let mut executor = Some(blocking); + loop { + let Some(exec) = executor else { + break; + }; + let Some(txn) = self.coordinator.get_blocked_transaction(blocking) + else { + self.coordinator.release_executor(exec); + break; + }; + + self.schedule_transaction(exec, txn).await; + executor = self.coordinator.get_ready_executor(); + } + } + + async fn schedule_transaction( + &mut self, + executor: ExecutorId, + txn: ProcessableTransaction, + ) { + let result = self.coordinator.try_acquire_locks(executor, &txn); + if let Err(blocking) = result { + self.coordinator.unlock_accounts(executor); + self.coordinator.release_executor(executor); + self.coordinator.queue_transaction(blocking, txn); + return; + } + let _ = self.get_executor(executor).send(txn).await; + } + + #[inline] + fn get_executor( + &self, + executor: ExecutorId, + ) -> &Sender { + let idx = executor as usize; + // SAFETY: + // executor id is always within the bounds of the vec + unsafe { self.executors.get_unchecked(idx) } + } } +pub mod coordinator; pub mod locks; -pub mod queue; pub mod state; + +// SAFETY: +// Rc used within the scheduler never escapes to other threads +unsafe impl Send for TransactionScheduler {} diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs new file mode 100644 index 000000000..137cb49cb --- /dev/null +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -0,0 +1,109 @@ +//! Manages the state of transaction processing across multiple executors. +//! +//! This module contains the `ExecutionCoordinator`, which tracks ready executors, +//! queues of blocked transactions, and the locks held by each worker. + +use std::collections::VecDeque; + +use magicblock_core::link::transactions::ProcessableTransaction; + +use super::locks::{ExecutorId, LocksCache, RcLock}; + +/// A queue of transactions waiting to be processed by a specific executor. +type TransactionQueue = VecDeque; +/// A list of transaction queues, indexed by `ExecutorId`. +type BlockedTransactionQueues = Vec; +/// A list of all locks acquired by an executor, indexed by `ExecutorId`. +type AcquiredLocks = Vec>; + +/// Manages the state for all transaction executors, including their +/// readiness, blocked transactions, and acquired account locks. +pub(super) struct ExecutionCoordinator { + /// A queue for each executor to hold transactions that are waiting for locks. + blocked_transactions: BlockedTransactionQueues, + /// A pool of executor IDs that are currently idle and ready for new work. + ready_executors: Vec, + /// A list of locks currently held by each executor. + acquired_locks: AcquiredLocks, + /// The cache of all account locks. + locks: LocksCache, +} + +impl ExecutionCoordinator { + /// Creates a new `ExecutionCoordinator` for a given number of executors. + pub(super) fn new(count: usize) -> Self { + Self { + blocked_transactions: (0..count).map(|_| VecDeque::new()).collect(), + acquired_locks: (0..count).map(|_| Vec::new()).collect(), + ready_executors: (0..count as u32).collect(), + locks: LocksCache::default(), + } + } + + /// Queues a transaction to be processed by a specific executor once its + /// required locks are available. + pub(super) fn queue_transaction( + &mut self, + executor: ExecutorId, + transaction: ProcessableTransaction, + ) { + let queue = &mut self.blocked_transactions[executor as usize]; + queue.push_back(transaction); + } + + /// Checks if there are any executors ready to process a transaction. + pub(super) fn is_ready(&self) -> bool { + !self.ready_executors.is_empty() + } + + /// Retrieves the ID of a ready executor, if one is available. + pub(super) fn get_ready_executor(&mut self) -> Option { + self.ready_executors.pop() + } + + /// Returns an executor to the pool of ready executors. + pub(super) fn release_executor(&mut self, executor: ExecutorId) { + self.ready_executors.push(executor) + } + + /// Releases all account locks held by a specific executor. + pub(crate) fn unlock_accounts(&mut self, executor: ExecutorId) { + let locks = &mut self.acquired_locks[executor as usize]; + while let Some(lock) = locks.pop() { + lock.borrow_mut().unlock(executor); + } + } + + /// Retrieves the next blocked transaction for a given executor. + pub(super) fn get_blocked_transaction( + &mut self, + executor: ExecutorId, + ) -> Option { + self.blocked_transactions[executor as usize].pop_front() + } + + /// Attempts to acquire all necessary read and write locks for a transaction. + /// + /// If any lock is contended, this function will fail and return the ID of the + /// executor that holds the conflicting lock. + pub(super) fn try_acquire_locks( + &mut self, + executor: ExecutorId, + txn: &ProcessableTransaction, + ) -> Result<(), ExecutorId> { + let message = txn.transaction.message(); + let accounts_to_lock = message.account_keys().iter().enumerate(); + + for (i, &acc) in accounts_to_lock { + let lock = self.locks.entry(acc).or_default().clone(); + if message.is_writable(i) { + lock.borrow_mut().write(executor)?; + } else { + lock.borrow_mut().read(executor)?; + }; + + self.acquired_locks[executor as usize].push(lock); + } + Ok(()) + } +} diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs index 4be787fdb..d4744378f 100644 --- a/magicblock-processor/src/scheduler/locks.rs +++ b/magicblock-processor/src/scheduler/locks.rs @@ -1,82 +1,84 @@ +//! Fast, in-memory account locking primitives for the multi-threaded scheduler. +//! +//! This version uses a single `u64` bitmask to represent the entire lock state, +//! including read locks, write locks, and contention, for maximum efficiency. + use std::{cell::RefCell, rc::Rc}; use rustc_hash::FxHashMap; use solana_pubkey::Pubkey; -pub(crate) type WorkerId = u32; +// A bitmask representing the lock state. +// - MSB: Write lock flag. +// - MSB-1: Contention flag. +// - Remaining bits: Read locks for each executor. +type ReadWriteLock = u64; -pub type LocksCache = FxHashMap>>; +/// Unique identifier for a transaction executor worker. +pub(crate) type ExecutorId = u32; -type ReadWriteLock = u64; -#[derive(Clone, Copy, Default)] -#[repr(transparent)] -struct TransactionId(u32); +/// A shared, mutable reference to an `AccountLock`. +pub(crate) type RcLock = Rc>; -pub(crate) const MAX_SVM_WORKERS: u32 = ReadWriteLock::BITS - 1; -const WRITE_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 1); +/// In-memory cache of account locks. +pub(crate) type LocksCache = FxHashMap; -impl TransactionId { - pub(crate) fn next() -> Self { - static mut COUNTER: u32 = 0; - let id = unsafe { - COUNTER = COUNTER.wrapping_add(1).max(1); - COUNTER - }; - Self(id) - } +/// The maximum number of concurrent executors supported by the bitmask. +/// Two bits are reserved for the write and contention flags. +pub(crate) const MAX_SVM_EXECUTORS: u32 = ReadWriteLock::BITS - 2; - fn clear(&mut self) { - self.0 = 0; - } -} +/// The bit used to indicate a write lock is held. +const WRITE_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 1); -#[derive(Default)] -pub(crate) struct AccountLock { - rw: ReadWriteLock, - contender: TransactionId, -} +/// The bit used to indicate that the lock is contended. +const CONTENTION_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 2); + +/// A read/write lock on a single Solana account, represented by a `u64` bitmask. +#[derive(Default, Debug)] +#[repr(transparent)] +pub(crate) struct AccountLock(ReadWriteLock); impl AccountLock { + /// Attempts to acquire a write lock. Fails if any other lock is held. pub(crate) fn write( &mut self, - worker: WorkerId, - txn: TransactionId, - ) -> Result<(), WorkerId> { - if self.rw != 0 { - self.contend(txn); - return Err(self.rw.trailing_zeros()); + executor: ExecutorId, + ) -> Result<(), ExecutorId> { + if self.0 != 0 { + self.contend(); + return Err(self.0.trailing_zeros()); } - self.contender.clear(); - self.rw |= WRITE_BIT_MASK | (1 << worker); + // Acquiring the lock clears any previous contention. + self.0 = WRITE_BIT_MASK | (1 << executor); Ok(()) } + /// Attempts to acquire a read lock. Fails if a write lock is held or if the + /// lock is marked as contended. pub(crate) fn read( &mut self, - worker: WorkerId, - txn: TransactionId, - ) -> Result<(), WorkerId> { - if self.rw & WRITE_BIT_MASK != 0 { - self.contend(txn); - return Err(self.rw.trailing_zeros()); + executor: ExecutorId, + ) -> Result<(), ExecutorId> { + if self.0 & WRITE_BIT_MASK != 0 || self.is_contended() { + return Err(self.0.trailing_zeros()); } - self.contender.clear(); - self.rw |= 1 << worker; + self.0 |= 1 << executor; Ok(()) } - pub(crate) fn unlock(&mut self, worker: WorkerId) { - self.rw &= !(WRITE_BIT_MASK | (1 << worker)); + /// Releases a lock held by an executor. + pub(crate) fn unlock(&mut self, executor: ExecutorId) { + // Clear the executor's read bit and the global write bit + self.0 &= !(WRITE_BIT_MASK | CONTENTION_BIT_MASK | (1 << executor)); } - fn contended(&self, txn: TransactionId) -> bool { - return self.contender.0 != 0 && self.contender.0 == txn.0; + /// Checks if the lock is marked as contended. + fn is_contended(&self) -> bool { + self.0 & CONTENTION_BIT_MASK != 0 } - fn contend(&mut self, txn: TransactionId) { - if self.contended(txn) { - return; - } - self.contender = txn; + /// Marks the lock as contended when an acquisition fails. + fn contend(&mut self) { + self.0 |= CONTENTION_BIT_MASK; } } diff --git a/magicblock-processor/src/scheduler/queue.rs b/magicblock-processor/src/scheduler/queue.rs deleted file mode 100644 index eb4eef8cb..000000000 --- a/magicblock-processor/src/scheduler/queue.rs +++ /dev/null @@ -1,14 +0,0 @@ -use std::collections::VecDeque; - -use magicblock_core::link::transactions::ProcessableTransaction; - -pub struct BlockedTransactionsQueue { - inner: Vec>, -} - -impl BlockedTransactionsQueue { - pub(crate) fn new(workers: u32) -> Self { - let inner = (0..workers).map(|_| VecDeque::new()).collect(); - Self { inner } - } -} From d4ee71b1515c949dd5c18ff724c0dcd60c0ebb84 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Tue, 30 Sep 2025 22:56:07 +0400 Subject: [PATCH 3/9] fix: fair scheduling --- magicblock-processor/src/scheduler.rs | 11 ++-- .../src/scheduler/coordinator.rs | 48 +++++++++++--- magicblock-processor/src/scheduler/locks.rs | 65 ++++++++++++------- 3 files changed, 84 insertions(+), 40 deletions(-) diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs index 24c82bc0e..e84a37ba1 100644 --- a/magicblock-processor/src/scheduler.rs +++ b/magicblock-processor/src/scheduler.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicUsize, Arc, RwLock}; -use coordinator::ExecutionCoordinator; +use coordinator::{ExecutionCoordinator, TransactionWithId}; use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use log::info; use magicblock_core::link::transactions::{ @@ -128,6 +128,7 @@ impl TransactionScheduler { .coordinator .get_ready_executor() .unwrap_or(ExecutorId::MIN); + let txn = TransactionWithId::new(txn); self.schedule_transaction(executor, txn).await; } // A new block has been produced. @@ -172,16 +173,16 @@ impl TransactionScheduler { async fn schedule_transaction( &mut self, executor: ExecutorId, - txn: ProcessableTransaction, + txn: TransactionWithId, ) { let result = self.coordinator.try_acquire_locks(executor, &txn); - if let Err(blocking) = result { + if let Err(blocker) = result { self.coordinator.unlock_accounts(executor); self.coordinator.release_executor(executor); - self.coordinator.queue_transaction(blocking, txn); + self.coordinator.queue_transaction(blocker, txn); return; } - let _ = self.get_executor(executor).send(txn).await; + let _ = self.get_executor(executor).send(txn.txn).await; } #[inline] diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index 137cb49cb..12076d3cd 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -7,20 +7,38 @@ use std::collections::VecDeque; use magicblock_core::link::transactions::ProcessableTransaction; -use super::locks::{ExecutorId, LocksCache, RcLock}; +use super::locks::{ + next_transaction_id, ExecutorId, LocksCache, RcLock, TransactionId, + TransactionQueues, MAX_SVM_EXECUTORS, +}; /// A queue of transactions waiting to be processed by a specific executor. -type TransactionQueue = VecDeque; +type TransactionQueue = VecDeque; /// A list of transaction queues, indexed by `ExecutorId`. type BlockedTransactionQueues = Vec; /// A list of all locks acquired by an executor, indexed by `ExecutorId`. type AcquiredLocks = Vec>; +pub(super) struct TransactionWithId { + pub(super) id: TransactionId, + pub(super) txn: ProcessableTransaction, +} + +impl TransactionWithId { + pub(super) fn new(txn: ProcessableTransaction) -> Self { + Self { + id: next_transaction_id(), + txn, + } + } +} + /// Manages the state for all transaction executors, including their /// readiness, blocked transactions, and acquired account locks. pub(super) struct ExecutionCoordinator { /// A queue for each executor to hold transactions that are waiting for locks. blocked_transactions: BlockedTransactionQueues, + transaction_queues: TransactionQueues, /// A pool of executor IDs that are currently idle and ready for new work. ready_executors: Vec, /// A list of locks currently held by each executor. @@ -36,6 +54,7 @@ impl ExecutionCoordinator { blocked_transactions: (0..count).map(|_| VecDeque::new()).collect(), acquired_locks: (0..count).map(|_| Vec::new()).collect(), ready_executors: (0..count as u32).collect(), + transaction_queues: TransactionQueues::default(), locks: LocksCache::default(), } } @@ -44,10 +63,19 @@ impl ExecutionCoordinator { /// required locks are available. pub(super) fn queue_transaction( &mut self, - executor: ExecutorId, - transaction: ProcessableTransaction, + mut blocker: u32, + transaction: TransactionWithId, ) { - let queue = &mut self.blocked_transactions[executor as usize]; + if blocker >= MAX_SVM_EXECUTORS { + // unwrap will never happen, as every pending transaction (which is a contender/blocker) will have an associated entry in the hashmap + blocker = self + .transaction_queues + .get(&blocker) + .copied() + .unwrap_or(ExecutorId::MIN); + } + let queue = &mut self.blocked_transactions[blocker as usize]; + self.transaction_queues.insert(transaction.id, blocker); queue.push_back(transaction); } @@ -78,7 +106,7 @@ impl ExecutionCoordinator { pub(super) fn get_blocked_transaction( &mut self, executor: ExecutorId, - ) -> Option { + ) -> Option { self.blocked_transactions[executor as usize].pop_front() } @@ -89,17 +117,17 @@ impl ExecutionCoordinator { pub(super) fn try_acquire_locks( &mut self, executor: ExecutorId, - txn: &ProcessableTransaction, + transaction: &TransactionWithId, ) -> Result<(), ExecutorId> { - let message = txn.transaction.message(); + let message = transaction.txn.transaction.message(); let accounts_to_lock = message.account_keys().iter().enumerate(); for (i, &acc) in accounts_to_lock { let lock = self.locks.entry(acc).or_default().clone(); if message.is_writable(i) { - lock.borrow_mut().write(executor)?; + lock.borrow_mut().write(executor, transaction.id)?; } else { - lock.borrow_mut().read(executor)?; + lock.borrow_mut().read(executor, transaction.id)?; }; self.acquired_locks[executor as usize].push(lock); diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs index d4744378f..e20accd60 100644 --- a/magicblock-processor/src/scheduler/locks.rs +++ b/magicblock-processor/src/scheduler/locks.rs @@ -17,68 +17,83 @@ type ReadWriteLock = u64; /// Unique identifier for a transaction executor worker. pub(crate) type ExecutorId = u32; +pub(super) type TransactionId = u32; + /// A shared, mutable reference to an `AccountLock`. -pub(crate) type RcLock = Rc>; +pub(super) type RcLock = Rc>; /// In-memory cache of account locks. -pub(crate) type LocksCache = FxHashMap; +pub(super) type LocksCache = FxHashMap; +pub(super) type TransactionQueues = FxHashMap; /// The maximum number of concurrent executors supported by the bitmask. /// Two bits are reserved for the write and contention flags. -pub(crate) const MAX_SVM_EXECUTORS: u32 = ReadWriteLock::BITS - 2; +pub(super) const MAX_SVM_EXECUTORS: u32 = ReadWriteLock::BITS - 1; /// The bit used to indicate a write lock is held. const WRITE_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 1); -/// The bit used to indicate that the lock is contended. -const CONTENTION_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 2); - /// A read/write lock on a single Solana account, represented by a `u64` bitmask. #[derive(Default, Debug)] -#[repr(transparent)] -pub(crate) struct AccountLock(ReadWriteLock); +pub(super) struct AccountLock { + rw: ReadWriteLock, + contender: TransactionId, +} impl AccountLock { /// Attempts to acquire a write lock. Fails if any other lock is held. - pub(crate) fn write( + pub(super) fn write( &mut self, executor: ExecutorId, + txn: TransactionId, ) -> Result<(), ExecutorId> { - if self.0 != 0 { - self.contend(); - return Err(self.0.trailing_zeros()); + self.contended(txn)?; + if self.rw != 0 { + self.contender = txn; + return Err(self.rw.trailing_zeros()); } - // Acquiring the lock clears any previous contention. - self.0 = WRITE_BIT_MASK | (1 << executor); + self.rw = WRITE_BIT_MASK | (1 << executor); + self.contender = 0; Ok(()) } /// Attempts to acquire a read lock. Fails if a write lock is held or if the /// lock is marked as contended. - pub(crate) fn read( + pub(super) fn read( &mut self, executor: ExecutorId, + txn: TransactionId, ) -> Result<(), ExecutorId> { - if self.0 & WRITE_BIT_MASK != 0 || self.is_contended() { - return Err(self.0.trailing_zeros()); + self.contended(txn)?; + if self.rw & WRITE_BIT_MASK != 0 { + self.contender = txn; + return Err(self.rw.trailing_zeros()); } - self.0 |= 1 << executor; + self.rw |= 1 << executor; + self.contender = 0; Ok(()) } /// Releases a lock held by an executor. - pub(crate) fn unlock(&mut self, executor: ExecutorId) { + pub(super) fn unlock(&mut self, executor: ExecutorId) { // Clear the executor's read bit and the global write bit - self.0 &= !(WRITE_BIT_MASK | CONTENTION_BIT_MASK | (1 << executor)); + self.rw &= !(WRITE_BIT_MASK | (1 << executor)); } /// Checks if the lock is marked as contended. - fn is_contended(&self) -> bool { - self.0 & CONTENTION_BIT_MASK != 0 + fn contended(&self, txn: TransactionId) -> Result<(), TransactionId> { + if self.contender != 0 && self.contender != txn { + return Err(self.contender); + } + Ok(()) } +} - /// Marks the lock as contended when an acquisition fails. - fn contend(&mut self) { - self.0 |= CONTENTION_BIT_MASK; +pub(super) fn next_transaction_id() -> TransactionId { + static mut COUNTER: u32 = MAX_SVM_EXECUTORS; + // # SAFETY: This is safe because the scheduler operates in a single thread. + unsafe { + COUNTER = COUNTER.wrapping_add(1).max(MAX_SVM_EXECUTORS); + COUNTER } } From 4e84847fd85bf59cb0fbde1f18a074e9c802420b Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Tue, 30 Sep 2025 23:14:55 +0400 Subject: [PATCH 4/9] fix: cleanup the transaction to executor mapping --- magicblock-processor/src/scheduler/coordinator.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index 12076d3cd..a74b4eb66 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -132,6 +132,7 @@ impl ExecutionCoordinator { self.acquired_locks[executor as usize].push(lock); } + self.transaction_queues.remove(&transaction.id); Ok(()) } } From 0d2361793febed4d76499b0bc53772ad78be17fb Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Tue, 30 Sep 2025 23:52:06 +0400 Subject: [PATCH 5/9] refactor: improved documentation and code structure --- magicblock-processor/src/scheduler.rs | 111 ++++++++++-------- .../src/scheduler/coordinator.rs | 72 ++++++++---- magicblock-processor/src/scheduler/locks.rs | 34 ++++-- 3 files changed, 131 insertions(+), 86 deletions(-) diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs index e84a37ba1..ae34afe3e 100644 --- a/magicblock-processor/src/scheduler.rs +++ b/magicblock-processor/src/scheduler.rs @@ -1,3 +1,9 @@ +//! The central transaction scheduler and its event loop. +//! +//! This module is the entry point for all transactions into the processing pipeline. +//! It is responsible for creating and managing a pool of `TransactionExecutor` +//! workers and dispatching transactions to them for execution. + use std::sync::{atomic::AtomicUsize, Arc, RwLock}; use coordinator::{ExecutionCoordinator, TransactionWithId}; @@ -23,7 +29,7 @@ use crate::executor::{SimpleForkGraph, TransactionExecutor}; /// pipeline. It receives transactions from a global queue and dispatches them to available /// worker threads for execution or simulation. pub struct TransactionScheduler { - /// Scheduling/Execution orchestrator + /// Manages the state of all executors, including locks and blocked transactions. coordinator: ExecutionCoordinator, /// The receiving end of the global queue for all new transactions. transactions_rx: TransactionToProcessRx, @@ -88,17 +94,15 @@ impl TransactionScheduler { /// Spawns the scheduler's main event loop into a new, dedicated OS thread. /// - /// Similar to the executors, the scheduler runs in its own thread with a dedicated - /// single-threaded Tokio runtime for performance and to prevent it from interfering - /// with other application tasks. + /// The scheduler runs in its own thread with a dedicated single-threaded Tokio + /// runtime. This design ensures that the scheduling logic, which is a critical + /// path, does not compete for resources with other tasks. pub fn spawn(self) { let task = move || { let runtime = Builder::new_current_thread() - .thread_name("transaction scheduler") + .thread_name("transaction-scheduler") .build() - .expect( - "building single threaded tokio runtime should succeed", - ); + .expect("Failed to build single-threaded Tokio runtime"); runtime.block_on(tokio::task::unconstrained(self.run())); }; std::thread::spawn(task); @@ -106,10 +110,13 @@ impl TransactionScheduler { /// The main event loop of the transaction scheduler. /// - /// This loop multiplexes between three primary events: - /// 1. Receiving a new transaction and dispatching it to an available worker. - /// 2. Receiving a readiness notification from a worker. - /// 3. Receiving a notification of a new block, triggering a slot transition. + /// This loop multiplexes between three primary events using `tokio::select!`: + /// 1. **Worker Readiness**: A worker signals it is ready for a new task. + /// 2. **New Transaction**: A new transaction arrives for processing. + /// 3. **New Block**: A new block is produced, triggering a slot transition. + /// + /// The `biased` selection ensures that ready workers are processed first, + /// which helps to keep the pipeline full and maximize throughput. async fn run(mut self) { let mut block_produced = self.latest_block.subscribe(); loop { @@ -117,19 +124,12 @@ impl TransactionScheduler { biased; // A worker has finished its task and is ready for more. Some(executor) = self.ready_rx.recv() => { - self.coordinator.unlock_accounts(executor); - self.reschedule_blocked_transactions(executor).await; + self.handle_ready_executor(executor).await; } - // Receive new transactions for scheduling. + // Receive new transactions for scheduling, but + // only if there is at least one ready worker. Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { - // NOTE: unwrap_or will never happen as ready() guard gaurantees that - // Some(_) will be returned, it's more eye pleasing than panicking - let executor = self - .coordinator - .get_ready_executor() - .unwrap_or(ExecutorId::MIN); - let txn = TransactionWithId::new(txn); - self.schedule_transaction(executor, txn).await; + self.handle_new_transaction(txn).await; } // A new block has been produced. _ = block_produced.recv() => { @@ -141,7 +141,25 @@ impl TransactionScheduler { } } } - info!("transaction scheduler has terminated"); + info!("Transaction scheduler has terminated"); + } + + /// Handles a notification that a worker has become ready. + async fn handle_ready_executor(&mut self, executor: ExecutorId) { + self.coordinator.unlock_accounts(executor); + self.reschedule_blocked_transactions(executor).await; + } + + /// Handles a new transaction from the global queue. + async fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + // This unwrap is safe due to the `if self.coordinator.is_ready()` + // guard in the `select!` macro. + let executor = self + .coordinator + .get_ready_executor() + .unwrap_or(ExecutorId::MIN); + let txn = TransactionWithId::new(txn); + self.schedule_transaction(executor, txn).await; } /// Updates the scheduler's state when a new slot begins. @@ -153,47 +171,40 @@ impl TransactionScheduler { self.latest_block.load().slot; } - async fn reschedule_blocked_transactions(&mut self, blocking: ExecutorId) { - let mut executor = Some(blocking); - loop { - let Some(exec) = executor else { - break; - }; - let Some(txn) = self.coordinator.get_blocked_transaction(blocking) - else { + /// Attempts to reschedule transactions that were blocked by the newly freed executor. + async fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { + let mut executor = Some(blocker); + while let Some(exec) = executor { + let txn = self.coordinator.get_blocked_transaction(blocker); + if let Some(txn) = txn { + self.schedule_transaction(exec, txn).await; + executor = self.coordinator.get_ready_executor(); + } else { self.coordinator.release_executor(exec); break; - }; - - self.schedule_transaction(exec, txn).await; - executor = self.coordinator.get_ready_executor(); + } } } + /// Attempts to schedule a single transaction for execution. + /// + /// If the transaction's required account locks are acquired, it is sent to the + /// specified executor. Otherwise, it is queued and will be retried later. async fn schedule_transaction( &mut self, executor: ExecutorId, txn: TransactionWithId, ) { - let result = self.coordinator.try_acquire_locks(executor, &txn); - if let Err(blocker) = result { + if let Err(blocker) = self.coordinator.try_acquire_locks(executor, &txn) + { self.coordinator.unlock_accounts(executor); self.coordinator.release_executor(executor); self.coordinator.queue_transaction(blocker, txn); return; } - let _ = self.get_executor(executor).send(txn.txn).await; - } - - #[inline] - fn get_executor( - &self, - executor: ExecutorId, - ) -> &Sender { - let idx = executor as usize; - // SAFETY: - // executor id is always within the bounds of the vec - unsafe { self.executors.get_unchecked(idx) } + // It's safe to ignore the result of the send operation. If the send fails, + // it means the executor's channel is closed, which only happens on shutdown. + let _ = self.executors[executor as usize].send(txn.txn).await; } } diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index a74b4eb66..bdc5e8ed9 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -1,30 +1,35 @@ //! Manages the state of transaction processing across multiple executors. //! //! This module contains the `ExecutionCoordinator`, which tracks ready executors, -//! queues of blocked transactions, and the locks held by each worker. +//! queues of blocked transactions, and the locks held by each worker. It acts +//! as the central state machine for the scheduling process. use std::collections::VecDeque; use magicblock_core::link::transactions::ProcessableTransaction; use super::locks::{ - next_transaction_id, ExecutorId, LocksCache, RcLock, TransactionId, - TransactionQueues, MAX_SVM_EXECUTORS, + next_transaction_id, ExecutorId, LocksCache, RcLock, TransactionContention, + TransactionId, MAX_SVM_EXECUTORS, }; -/// A queue of transactions waiting to be processed by a specific executor. +// --- Type Aliases --- + +/// A queue of transactions waiting for a specific executor to release a lock. type TransactionQueue = VecDeque; -/// A list of transaction queues, indexed by `ExecutorId`. +/// A list of transaction queues, indexed by `ExecutorId`. Each executor has its own queue. type BlockedTransactionQueues = Vec; /// A list of all locks acquired by an executor, indexed by `ExecutorId`. type AcquiredLocks = Vec>; +/// A transaction bundled with its unique ID for tracking purposes. pub(super) struct TransactionWithId { pub(super) id: TransactionId, pub(super) txn: ProcessableTransaction, } impl TransactionWithId { + /// Creates a new transaction with a unique ID. pub(super) fn new(txn: ProcessableTransaction) -> Self { Self { id: next_transaction_id(), @@ -36,14 +41,15 @@ impl TransactionWithId { /// Manages the state for all transaction executors, including their /// readiness, blocked transactions, and acquired account locks. pub(super) struct ExecutionCoordinator { - /// A queue for each executor to hold transactions that are waiting for locks. + /// A queue for each executor to hold transactions that are waiting for its locks. blocked_transactions: BlockedTransactionQueues, - transaction_queues: TransactionQueues, + /// A map tracking which executor is blocking which transaction. + transaction_contention: TransactionContention, /// A pool of executor IDs that are currently idle and ready for new work. ready_executors: Vec, /// A list of locks currently held by each executor. acquired_locks: AcquiredLocks, - /// The cache of all account locks. + /// The global cache of all account locks. locks: LocksCache, } @@ -54,28 +60,36 @@ impl ExecutionCoordinator { blocked_transactions: (0..count).map(|_| VecDeque::new()).collect(), acquired_locks: (0..count).map(|_| Vec::new()).collect(), ready_executors: (0..count as u32).collect(), - transaction_queues: TransactionQueues::default(), + transaction_contention: TransactionContention::default(), locks: LocksCache::default(), } } - /// Queues a transaction to be processed by a specific executor once its - /// required locks are available. + /// Queues a transaction that is blocked by a contended lock. + /// + /// The `blocker_id` can be either an `ExecutorId` or a `TransactionId`. + /// If it's a `TransactionId`, this function resolves it to the underlying + /// `ExecutorId` that holds the conflicting lock. pub(super) fn queue_transaction( &mut self, - mut blocker: u32, + mut blocker_id: u32, transaction: TransactionWithId, ) { - if blocker >= MAX_SVM_EXECUTORS { - // unwrap will never happen, as every pending transaction (which is a contender/blocker) will have an associated entry in the hashmap - blocker = self - .transaction_queues - .get(&blocker) + // A `blocker_id` greater than `MAX_SVM_EXECUTORS` is a `TransactionId` + // of another waiting transaction. We must resolve it to the actual executor. + if blocker_id >= MAX_SVM_EXECUTORS { + // SAFETY: This unwrap is safe. A `TransactionId` is only returned as a + // blocker if that transaction is already tracked in the contention map. + blocker_id = self + .transaction_contention + .get(&blocker_id) .copied() .unwrap_or(ExecutorId::MIN); } - let queue = &mut self.blocked_transactions[blocker as usize]; - self.transaction_queues.insert(transaction.id, blocker); + + let queue = &mut self.blocked_transactions[blocker_id as usize]; + self.transaction_contention + .insert(transaction.id, blocker_id); queue.push_back(transaction); } @@ -97,12 +111,13 @@ impl ExecutionCoordinator { /// Releases all account locks held by a specific executor. pub(crate) fn unlock_accounts(&mut self, executor: ExecutorId) { let locks = &mut self.acquired_locks[executor as usize]; + // Iteratively drain the list of acquired locks. while let Some(lock) = locks.pop() { lock.borrow_mut().unlock(executor); } } - /// Retrieves the next blocked transaction for a given executor. + /// Retrieves the next blocked transaction waiting for a given executor. pub(super) fn get_blocked_transaction( &mut self, executor: ExecutorId, @@ -112,27 +127,34 @@ impl ExecutionCoordinator { /// Attempts to acquire all necessary read and write locks for a transaction. /// - /// If any lock is contended, this function will fail and return the ID of the - /// executor that holds the conflicting lock. + /// This function iterates through all accounts in the transaction's message and + /// attempts to acquire the appropriate lock for each. If any lock is contended, + /// it fails early and returns the ID of the blocking executor or transaction. pub(super) fn try_acquire_locks( &mut self, executor: ExecutorId, transaction: &TransactionWithId, - ) -> Result<(), ExecutorId> { + ) -> Result<(), u32> { let message = transaction.txn.transaction.message(); let accounts_to_lock = message.account_keys().iter().enumerate(); + let acquired_locks = &mut self.acquired_locks[executor as usize]; for (i, &acc) in accounts_to_lock { + // Get or create the lock for the account. let lock = self.locks.entry(acc).or_default().clone(); + + // Attempt to acquire a write or read lock. if message.is_writable(i) { lock.borrow_mut().write(executor, transaction.id)?; } else { lock.borrow_mut().read(executor, transaction.id)?; }; - self.acquired_locks[executor as usize].push(lock); + acquired_locks.push(lock); } - self.transaction_queues.remove(&transaction.id); + + // On success, the transaction is no longer blocked. + self.transaction_contention.remove(&transaction.id); Ok(()) } } diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs index e20accd60..4c73dd898 100644 --- a/magicblock-processor/src/scheduler/locks.rs +++ b/magicblock-processor/src/scheduler/locks.rs @@ -10,13 +10,13 @@ use solana_pubkey::Pubkey; // A bitmask representing the lock state. // - MSB: Write lock flag. -// - MSB-1: Contention flag. // - Remaining bits: Read locks for each executor. type ReadWriteLock = u64; /// Unique identifier for a transaction executor worker. pub(crate) type ExecutorId = u32; +/// Unique identifier for a transaction to be scheduled. pub(super) type TransactionId = u32; /// A shared, mutable reference to an `AccountLock`. @@ -24,13 +24,14 @@ pub(super) type RcLock = Rc>; /// In-memory cache of account locks. pub(super) type LocksCache = FxHashMap; -pub(super) type TransactionQueues = FxHashMap; +/// A map from a blocked transaction to the executor that holds the conflicting lock. +pub(super) type TransactionContention = FxHashMap; /// The maximum number of concurrent executors supported by the bitmask. -/// Two bits are reserved for the write and contention flags. +/// One bit is reserved for the write flag. pub(super) const MAX_SVM_EXECUTORS: u32 = ReadWriteLock::BITS - 1; -/// The bit used to indicate a write lock is held. +/// The bit used to indicate a write lock is held. This is the most significant bit. const WRITE_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 1); /// A read/write lock on a single Solana account, represented by a `u64` bitmask. @@ -46,29 +47,36 @@ impl AccountLock { &mut self, executor: ExecutorId, txn: TransactionId, - ) -> Result<(), ExecutorId> { + ) -> Result<(), u32> { self.contended(txn)?; if self.rw != 0 { self.contender = txn; + // If the lock is held, `trailing_zeros()` will return the index of the + // least significant bit that is set. This corresponds to the ID of the + // executor that holds the lock. return Err(self.rw.trailing_zeros()); } + // Set the write lock bit and the bit for the acquiring executor. self.rw = WRITE_BIT_MASK | (1 << executor); self.contender = 0; Ok(()) } - /// Attempts to acquire a read lock. Fails if a write lock is held or if the - /// lock is marked as contended. + /// Attempts to acquire a read lock. Fails if a write lock is held. pub(super) fn read( &mut self, executor: ExecutorId, txn: TransactionId, - ) -> Result<(), ExecutorId> { + ) -> Result<(), u32> { self.contended(txn)?; + // Check if the write lock bit is set. if self.rw & WRITE_BIT_MASK != 0 { self.contender = txn; + // If a write lock is held, the conflicting executor is the one whose + // bit is set. We can find it using `trailing_zeros()`. return Err(self.rw.trailing_zeros()); } + // Set the bit corresponding to the executor to acquire a read lock. self.rw |= 1 << executor; self.contender = 0; Ok(()) @@ -76,11 +84,12 @@ impl AccountLock { /// Releases a lock held by an executor. pub(super) fn unlock(&mut self, executor: ExecutorId) { - // Clear the executor's read bit and the global write bit + // To release the lock, we clear both the write bit and the executor's + // read bit. This is done using a bitwise AND with the inverted mask. self.rw &= !(WRITE_BIT_MASK | (1 << executor)); } - /// Checks if the lock is marked as contended. + /// Checks if the lock is marked as contended by another transaction. fn contended(&self, txn: TransactionId) -> Result<(), TransactionId> { if self.contender != 0 && self.contender != txn { return Err(self.contender); @@ -89,9 +98,12 @@ impl AccountLock { } } +/// Generates a new, unique transaction ID. pub(super) fn next_transaction_id() -> TransactionId { static mut COUNTER: u32 = MAX_SVM_EXECUTORS; - // # SAFETY: This is safe because the scheduler operates in a single thread. + // SAFETY: This is safe because the scheduler, which calls this function, + // operates in a single, dedicated thread. Therefore, there are no concurrent + // access concerns for this static mutable variable. unsafe { COUNTER = COUNTER.wrapping_add(1).max(MAX_SVM_EXECUTORS); COUNTER From 895887c1236222cbe26a4f708e3807fef7d7347e Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Wed, 1 Oct 2025 11:29:03 +0400 Subject: [PATCH 6/9] fix: correct contention resolution --- Cargo.lock | 1 + magicblock-processor/Cargo.toml | 1 + magicblock-processor/src/scheduler.rs | 3 ++- .../src/scheduler/coordinator.rs | 17 ++++++++++++----- magicblock-processor/src/scheduler/locks.rs | 15 +++++++++++++-- 5 files changed, 29 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef5abb561..f19783b19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4040,6 +4040,7 @@ dependencies = [ "solana-feature-set", "solana-fee", "solana-fee-structure", + "solana-keypair", "solana-loader-v4-program", "solana-program", "solana-program-runtime", diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 3fbb12c84..21c89339c 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -42,6 +42,7 @@ rustc-hash = { workspace = true } [dev-dependencies] guinea = { workspace = true } +solana-keypair = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } test-kit = { workspace = true } diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs index ae34afe3e..f50fb1346 100644 --- a/magicblock-processor/src/scheduler.rs +++ b/magicblock-processor/src/scheduler.rs @@ -197,7 +197,6 @@ impl TransactionScheduler { ) { if let Err(blocker) = self.coordinator.try_acquire_locks(executor, &txn) { - self.coordinator.unlock_accounts(executor); self.coordinator.release_executor(executor); self.coordinator.queue_transaction(blocker, txn); return; @@ -211,6 +210,8 @@ impl TransactionScheduler { pub mod coordinator; pub mod locks; pub mod state; +#[cfg(test)] +mod tests; // SAFETY: // Rc used within the scheduler never escapes to other threads diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index bdc5e8ed9..cc51953e9 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -144,16 +144,23 @@ impl ExecutionCoordinator { let lock = self.locks.entry(acc).or_default().clone(); // Attempt to acquire a write or read lock. - if message.is_writable(i) { - lock.borrow_mut().write(executor, transaction.id)?; + let result = if message.is_writable(i) { + lock.borrow_mut().write(executor, transaction.id) } else { - lock.borrow_mut().read(executor, transaction.id)?; + lock.borrow_mut().read(executor, transaction.id) }; - acquired_locks.push(lock); + + if result.is_err() { + for lock in acquired_locks.drain(..) { + let mut lock = lock.borrow_mut(); + lock.unlock_with_contention(executor, transaction.id); + } + } + result?; } - // On success, the transaction is no longer blocked. + // On success, the transaction is no longer blocking anything. self.transaction_contention.remove(&transaction.id); Ok(()) } diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs index 4c73dd898..6acdcb4ac 100644 --- a/magicblock-processor/src/scheduler/locks.rs +++ b/magicblock-processor/src/scheduler/locks.rs @@ -49,8 +49,8 @@ impl AccountLock { txn: TransactionId, ) -> Result<(), u32> { self.contended(txn)?; + println!("rw: {:b} -> {}", self.rw, self.contender); if self.rw != 0 { - self.contender = txn; // If the lock is held, `trailing_zeros()` will return the index of the // least significant bit that is set. This corresponds to the ID of the // executor that holds the lock. @@ -71,7 +71,6 @@ impl AccountLock { self.contended(txn)?; // Check if the write lock bit is set. if self.rw & WRITE_BIT_MASK != 0 { - self.contender = txn; // If a write lock is held, the conflicting executor is the one whose // bit is set. We can find it using `trailing_zeros()`. return Err(self.rw.trailing_zeros()); @@ -89,6 +88,18 @@ impl AccountLock { self.rw &= !(WRITE_BIT_MASK | (1 << executor)); } + /// Releases a lock held by an executor. + pub(super) fn unlock_with_contention( + &mut self, + executor: ExecutorId, + txn: TransactionId, + ) { + if self.contender == 0 { + self.contender = txn; + } + self.unlock(executor); + } + /// Checks if the lock is marked as contended by another transaction. fn contended(&self, txn: TransactionId) -> Result<(), TransactionId> { if self.contender != 0 && self.contender != txn { From 74bc3d7ac8d0c191d71d52f05ab7453a2c5c9ad6 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Wed, 1 Oct 2025 13:09:24 +0400 Subject: [PATCH 7/9] chore: stage tests.rs file --- magicblock-processor/src/scheduler/tests.rs | 468 ++++++++++++++++++++ 1 file changed, 468 insertions(+) create mode 100644 magicblock-processor/src/scheduler/tests.rs diff --git a/magicblock-processor/src/scheduler/tests.rs b/magicblock-processor/src/scheduler/tests.rs new file mode 100644 index 000000000..19ec9b501 --- /dev/null +++ b/magicblock-processor/src/scheduler/tests.rs @@ -0,0 +1,468 @@ +use super::coordinator::{ExecutionCoordinator, TransactionWithId}; + +use magicblock_core::link::transactions::{ + ProcessableTransaction, SanitizeableTransaction, TransactionProcessingMode, +}; +use solana_keypair::Keypair; +use solana_program::{ + hash::Hash, + instruction::{AccountMeta, Instruction}, +}; +use solana_pubkey::Pubkey; +use solana_signer::Signer; +use solana_transaction::Transaction; + +// --- Test Setup --- + +/// Creates a mock transaction with the specified accounts for testing. +fn create_mock_transaction( + accounts: &[(Pubkey, bool)], // A tuple of (PublicKey, is_writable) +) -> TransactionWithId { + let payer = Keypair::new(); + let instructions: Vec = accounts + .iter() + .map(|(pubkey, is_writable)| { + let meta = if *is_writable { + AccountMeta::new(*pubkey, false) + } else { + AccountMeta::new_readonly(*pubkey, false) + }; + Instruction::new_with_bincode(Pubkey::new_unique(), &(), vec![meta]) + }) + .collect(); + + let transaction = Transaction::new_signed_with_payer( + &instructions, + Some(&payer.pubkey()), + &[payer], + Hash::new_unique(), + ); + + let processable_txn = ProcessableTransaction { + transaction: transaction.sanitize(false).unwrap(), + mode: TransactionProcessingMode::Execution(None), + }; + TransactionWithId::new(processable_txn) +} + +// --- Basic Tests --- + +#[test] +/// Tests that two transactions with no overlapping accounts can be scheduled concurrently. +fn test_non_conflicting_transactions() { + let mut coordinator = ExecutionCoordinator::new(2); + + // Two transactions writing to different accounts + let txn1 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + let txn2 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + + // Both transactions should acquire locks without any issues. + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire lock without conflict" + ); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Txn2 should acquire lock without conflict" + ); +} + +#[test] +/// Tests that multiple transactions can take read locks on the same account concurrently. +fn test_read_read_no_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + + // Both transactions should be able to acquire read locks on the same account. + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire read lock" + ); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Txn2 should also acquire read lock" + ); +} + +// --- Contention Tests --- + +#[test] +/// Tests that a write lock blocks another write lock on the same account. +fn test_write_write_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire write lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + + // Txn2 should be blocked by the executor holding the lock (exec1). + assert_eq!(blocker, exec1, "Txn2 should be blocked by executor 1"); +} + +#[test] +/// Tests that a write lock blocks a read lock on the same account. +fn test_write_read_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire write lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + + // Txn2 should be blocked by exec1. + assert_eq!( + blocker, exec1, + "Read lock (Txn2) should be blocked by write lock (Txn1)" + ); +} + +#[test] +/// Tests that a read lock blocks a write lock on the same account. +fn test_read_write_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire read lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + + // Txn2 should be blocked by exec1. + assert_eq!( + blocker, exec1, + "Write lock (Txn2) should be blocked by read lock (Txn1)" + ); +} + +// --- Advanced Scenarios --- + +#[test] +/// Tests contention with a mix of read and write locks across multiple accounts. +fn test_multiple_mixed_locks_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let acc_c = Pubkey::new_unique(); + + // Txn 1: Writes A, Reads B + let txn1 = create_mock_transaction(&[(acc_a, true), (acc_b, false)]); + // Txn 2: Reads A, Writes C + let txn2 = create_mock_transaction(&[(acc_a, false), (acc_c, true)]); + // Txn 3: Writes B, Writes C + let txn3 = create_mock_transaction(&[(acc_b, true), (acc_c, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should lock A (write) and B (read)" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + // Txn2 should be blocked by Txn1's write lock on A. + assert_eq!( + coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(), + exec1, + "Txn2 should be blocked by Txn1 on account A" + ); + + // Txn3 should be blocked by Txn1's read lock on B. + assert_eq!( + coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(), + exec1, + "Txn3 should be blocked by Txn1 on account B" + ); +} + +#[test] +/// Tests a chain of dependencies: Txn3 waits for Txn2, which waits for Txn1. +fn test_transaction_dependency_chain() { + let mut coordinator = ExecutionCoordinator::new(3); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true), (acc_a, false)]); + let txn3 = create_mock_transaction(&[(acc_b, false)]); + + // Schedule Txn1, which locks A for writing. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // Txn2 needs to read A, so it's blocked by Txn1. + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + assert_eq!(blocker1, exec1, "Txn2 should be blocked by exec1"); + coordinator.queue_transaction(blocker1, txn2); + + // Txn3 needs to read B, but Txn2 (which writes to B) is already queued. + // So, Txn3 should be blocked by Txn2's transaction ID. + let exec3 = coordinator.get_ready_executor().unwrap(); + let blocker2 = coordinator.try_acquire_locks(exec3, &txn3).unwrap_err(); + let blocked_txn = coordinator.get_blocked_transaction(exec1).unwrap(); + assert_eq!( + blocker2, blocked_txn.id, + "Txn3 should be blocked by the transaction ID of Txn2" + ); +} + +#[test] +/// Simulates a scenario where all executors are busy, and a new transaction gets queued and then rescheduled. +fn test_full_executor_pool_and_reschedule() { + let mut coordinator = ExecutionCoordinator::new(2); + + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let acc_c = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + let txn3 = create_mock_transaction(&[(acc_a, true), (acc_c, true)]); + + // Occupy both available executors. + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + + // No more ready executors should be available. + assert!( + coordinator.get_ready_executor().is_none(), + "Executor pool should be empty" + ); + + // Txn3 arrives and contends with Txn1 on account A. + let blocker = coordinator.try_acquire_locks(exec1, &txn3).unwrap_err(); + assert_eq!(blocker, exec1); + coordinator.queue_transaction(blocker, txn3); + + // Executor 1 finishes its work and releases its locks. + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + // Now that an executor is free, we should be able to reschedule the blocked transaction. + let ready_exec = coordinator.get_ready_executor().unwrap(); + let blocked_txn = coordinator.get_blocked_transaction(exec1).unwrap(); + assert!( + coordinator + .try_acquire_locks(ready_exec, &blocked_txn) + .is_ok(), + "Should be able to reschedule the blocked transaction" + ); +} + +// --- Edge Cases --- + +#[test] +/// Tests that a transaction with no accounts can be processed without issues. +fn test_transaction_with_no_accounts() { + let mut coordinator = ExecutionCoordinator::new(1); + let txn = create_mock_transaction(&[]); + let exec = coordinator.get_ready_executor().unwrap(); + + assert!( + coordinator.try_acquire_locks(exec, &txn).is_ok(), + "Transaction with no accounts should not fail" + ); +} + +#[test] +/// Tests that many read locks can be acquired on the same account concurrently. +fn test_multiple_read_locks_on_same_account() { + let mut coordinator = ExecutionCoordinator::new(3); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + let txn3 = create_mock_transaction(&[(shared_account, false)]); + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + let exec3 = coordinator.get_ready_executor().unwrap(); + + // All three should acquire read locks without contention. + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + assert!(coordinator.try_acquire_locks(exec3, &txn3).is_ok()); +} + +#[test] +/// Tests a rapid lock-unlock-lock cycle to ensure state is managed correctly. +fn test_rapid_lock_unlock_cycle() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + // Lock, unlock, and then lock again with a different transaction. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Should be able to lock the account again after it was released" + ); +} + +#[test] +/// Tests rescheduling multiple transactions that were all blocked by the same executor. +fn test_reschedule_multiple_blocked_on_same_executor() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + let txn3 = create_mock_transaction(&[(shared_account, true)]); + + // Txn1 takes the lock. Txn2 and Txn3 are queued as blocked. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + coordinator.queue_transaction(blocker1, txn2); + let blocker2 = coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(); + coordinator.queue_transaction(blocker2, txn3); + + // Txn1 finishes. + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + // The first blocked transaction (Txn2) should now be schedulable. + let ready_exec = coordinator.get_ready_executor().unwrap(); + let blocked_txn1 = coordinator.get_blocked_transaction(exec1).unwrap(); + let result = coordinator.try_acquire_locks(ready_exec, &blocked_txn1); + println!("R: {result:?}"); + assert!( + result.is_ok(), + "First blocked transaction should be reschedulable" + ); + + // The second blocked transaction (Txn3) should still be in the queue. + assert!( + coordinator.get_blocked_transaction(exec1).is_some(), + "Second blocked transaction should still be queued" + ); +} + +#[test] +/// Tests a transaction that contends on multiple accounts held by different executors. +fn test_contention_on_multiple_accounts() { + let mut coordinator = ExecutionCoordinator::new(3); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + // This transaction will contend with both Txn1 and Txn2. + let txn3 = create_mock_transaction(&[(acc_a, true), (acc_b, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + + let exec3 = coordinator.get_ready_executor().unwrap(); + // The coordinator should report the first detected contention. + let blocker = coordinator.try_acquire_locks(exec3, &txn3).unwrap_err(); + assert_eq!( + blocker, exec1, + "Should be blocked by the first contended account (A)" + ); +} + +#[test] +/// Tests that no ready executors are available when the pool is fully utilized. +fn test_no_ready_executors() { + let mut coordinator = ExecutionCoordinator::new(1); + let txn1 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // The only executor is now busy. + assert!( + coordinator.get_ready_executor().is_none(), + "There should be no ready executors" + ); +} + +#[test] +/// Tests that an executor can release locks and immediately reacquire new ones. +fn test_release_and_reacquire_lock() { + let mut coordinator = ExecutionCoordinator::new(1); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + coordinator.unlock_accounts(exec1); + + // The executor should be able to immediately acquire a lock on a different account. + assert!( + coordinator.try_acquire_locks(exec1, &txn2).is_ok(), + "Executor should be able to reacquire a lock after releasing" + ); +} + +#[test] +/// Tests a scenario where a transaction is blocked by another transaction that is itself already queued. +fn test_transaction_blocked_by_queued_transaction() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + let txn3 = create_mock_transaction(&[(shared_account, true)]); + + // Txn1 acquires the lock. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // Txn2 is blocked by Txn1. + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + assert_eq!(blocker1, exec1); + coordinator.queue_transaction(blocker1, txn2); + + // Txn3 is blocked by the already queued Txn2. The error should be the transaction ID. + let blocker2 = coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(); + let blocked_txn = coordinator.get_blocked_transaction(exec1).unwrap(); + assert_eq!( + blocker2, blocked_txn.id, + "Txn3 should be blocked by the ID of the queued Txn2" + ); +} From 61226eecd3a4f39e9bace87cc62d2979e5aaffc2 Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Wed, 1 Oct 2025 16:20:51 +0400 Subject: [PATCH 8/9] fix: divide the CPU resources between RPC and the execution --- Cargo.lock | 2 ++ magicblock-api/Cargo.toml | 2 ++ magicblock-api/src/magic_validator.rs | 10 ++++++++-- .../src/scheduler/coordinator.rs | 2 ++ .../src/{scheduler.rs => scheduler/mod.rs} | 0 magicblock-validator/Cargo.toml | 1 + magicblock-validator/src/main.rs | 18 ++++++++++++++++-- 7 files changed, 31 insertions(+), 4 deletions(-) rename magicblock-processor/src/{scheduler.rs => scheduler/mod.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index f19783b19..fed98bf54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3769,6 +3769,7 @@ dependencies = [ "magicblock-processor", "magicblock-program", "magicblock-validator-admin", + "num_cpus", "paste", "solana-feature-set", "solana-inline-spl", @@ -4121,6 +4122,7 @@ dependencies = [ "magicblock-api", "magicblock-config", "magicblock-version", + "num_cpus", "solana-sdk", "tokio", ] diff --git a/magicblock-api/Cargo.toml b/magicblock-api/Cargo.toml index eedf2879a..c372bc911 100644 --- a/magicblock-api/Cargo.toml +++ b/magicblock-api/Cargo.toml @@ -35,6 +35,8 @@ magicblock-program = { workspace = true } magicblock-validator-admin = { workspace = true } magic-domain-program = { workspace = true } +num_cpus = { workspace = true } + solana-feature-set = { workspace = true } solana-inline-spl = { workspace = true } solana-rpc = { workspace = true } diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 6912c0789..ba2184f51 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -303,8 +303,14 @@ impl MagicValidator { base_fee: config.validator.base_fees.unwrap_or_default(), featureset: txn_scheduler_state.environment.feature_set.clone(), }; - let transaction_scheduler = - TransactionScheduler::new(1, txn_scheduler_state); + // We dedicate half of the available resources to the execution + // runtime, -1 is taken up by the transaction scheduler itself + let transaction_executors = + (num_cpus::get() / 2).saturating_sub(1).max(1) as u32; + let transaction_scheduler = TransactionScheduler::new( + transaction_executors, + txn_scheduler_state, + ); transaction_scheduler.spawn(); let shared_state = SharedState::new( diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index cc51953e9..cf3ae75e6 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -151,6 +151,8 @@ impl ExecutionCoordinator { }; acquired_locks.push(lock); + // We couldn't lock all of the accounts, so we are bailing, but + // first we need to set contention, and unlock successful locks if result.is_err() { for lock in acquired_locks.drain(..) { let mut lock = lock.borrow_mut(); diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler/mod.rs similarity index 100% rename from magicblock-processor/src/scheduler.rs rename to magicblock-processor/src/scheduler/mod.rs diff --git a/magicblock-validator/Cargo.toml b/magicblock-validator/Cargo.toml index 4820ca4b4..9ac23d13a 100644 --- a/magicblock-validator/Cargo.toml +++ b/magicblock-validator/Cargo.toml @@ -16,6 +16,7 @@ log = { workspace = true } magicblock-api = { workspace = true } magicblock-config = { workspace = true } magicblock-version = { workspace = true } +num_cpus = { workspace = true } solana-sdk = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/magicblock-validator/src/main.rs b/magicblock-validator/src/main.rs index 2496bff0c..f7f0b9d70 100644 --- a/magicblock-validator/src/main.rs +++ b/magicblock-validator/src/main.rs @@ -7,6 +7,7 @@ use magicblock_api::{ }; use magicblock_config::MagicBlockConfig; use solana_sdk::signature::Signer; +use tokio::runtime::Builder; use crate::shutdown::Shutdown; @@ -49,8 +50,21 @@ fn init_logger() { }); } -#[tokio::main] -async fn main() { +fn main() { + // We dedicate half of the threads to async runtime (where RPC and other + // io/timer bound services are running), and the other half is allocated + // for the execution runtime (transaction scheduler/executor threads) + let workers = (num_cpus::get() / 2).max(1); + let runtime = Builder::new_multi_thread() + .worker_threads(workers) + .enable_all() + .thread_name("async-runtime") + .build() + .expect("failed to build async runtime"); + runtime.block_on(run()); +} + +async fn run() { init_logger(); #[cfg(feature = "tokio-console")] console_subscriber::init(); From 082d943701e7a2168990076069a177f9c0cd41db Mon Sep 17 00:00:00 2001 From: Babur Makhmudov Date: Thu, 2 Oct 2025 13:53:47 +0400 Subject: [PATCH 9/9] cleanup: remove stray prints and improve comments --- .../src/scheduler/coordinator.rs | 16 +++++++---- magicblock-processor/src/scheduler/locks.rs | 28 +++++++++---------- magicblock-processor/src/scheduler/mod.rs | 5 ++-- magicblock-processor/src/scheduler/tests.rs | 1 - 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index cf3ae75e6..5309d07cf 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -13,8 +13,6 @@ use super::locks::{ TransactionId, MAX_SVM_EXECUTORS, }; -// --- Type Aliases --- - /// A queue of transactions waiting for a specific executor to release a lock. type TransactionQueue = VecDeque; /// A list of transaction queues, indexed by `ExecutorId`. Each executor has its own queue. @@ -78,12 +76,14 @@ impl ExecutionCoordinator { // A `blocker_id` greater than `MAX_SVM_EXECUTORS` is a `TransactionId` // of another waiting transaction. We must resolve it to the actual executor. if blocker_id >= MAX_SVM_EXECUTORS { - // SAFETY: This unwrap is safe. A `TransactionId` is only returned as a - // blocker if that transaction is already tracked in the contention map. + // A `TransactionId` is only returned as a blocker if that + // transaction is already tracked in the contention map. blocker_id = self .transaction_contention .get(&blocker_id) .copied() + // should never happen, but from a logical + // standpoint, it's not really an error .unwrap_or(ExecutorId::MIN); } @@ -149,17 +149,21 @@ impl ExecutionCoordinator { } else { lock.borrow_mut().read(executor, transaction.id) }; - acquired_locks.push(lock); // We couldn't lock all of the accounts, so we are bailing, but // first we need to set contention, and unlock successful locks if result.is_err() { for lock in acquired_locks.drain(..) { let mut lock = lock.borrow_mut(); - lock.unlock_with_contention(executor, transaction.id); + lock.contend(transaction.id); + lock.unlock(executor); } + // for the lock that we failed to acquire, we just set the contention + lock.borrow_mut().contend(transaction.id); } result?; + + acquired_locks.push(lock); } // On success, the transaction is no longer blocking anything. diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs index 6acdcb4ac..dbf5c9c25 100644 --- a/magicblock-processor/src/scheduler/locks.rs +++ b/magicblock-processor/src/scheduler/locks.rs @@ -43,13 +43,13 @@ pub(super) struct AccountLock { impl AccountLock { /// Attempts to acquire a write lock. Fails if any other lock is held. + #[inline] pub(super) fn write( &mut self, executor: ExecutorId, txn: TransactionId, ) -> Result<(), u32> { self.contended(txn)?; - println!("rw: {:b} -> {}", self.rw, self.contender); if self.rw != 0 { // If the lock is held, `trailing_zeros()` will return the index of the // least significant bit that is set. This corresponds to the ID of the @@ -63,6 +63,7 @@ impl AccountLock { } /// Attempts to acquire a read lock. Fails if a write lock is held. + #[inline] pub(super) fn read( &mut self, executor: ExecutorId, @@ -82,31 +83,28 @@ impl AccountLock { } /// Releases a lock held by an executor. + #[inline] pub(super) fn unlock(&mut self, executor: ExecutorId) { // To release the lock, we clear both the write bit and the executor's // read bit. This is done using a bitwise AND with the inverted mask. self.rw &= !(WRITE_BIT_MASK | (1 << executor)); } - /// Releases a lock held by an executor. - pub(super) fn unlock_with_contention( - &mut self, - executor: ExecutorId, - txn: TransactionId, - ) { - if self.contender == 0 { - self.contender = txn; - } - self.unlock(executor); - } - /// Checks if the lock is marked as contended by another transaction. + #[inline] fn contended(&self, txn: TransactionId) -> Result<(), TransactionId> { if self.contender != 0 && self.contender != txn { return Err(self.contender); } Ok(()) } + + #[inline] + pub(super) fn contend(&mut self, txn: TransactionId) { + if self.contender == 0 { + self.contender = txn; + } + } } /// Generates a new, unique transaction ID. @@ -114,7 +112,9 @@ pub(super) fn next_transaction_id() -> TransactionId { static mut COUNTER: u32 = MAX_SVM_EXECUTORS; // SAFETY: This is safe because the scheduler, which calls this function, // operates in a single, dedicated thread. Therefore, there are no concurrent - // access concerns for this static mutable variable. + // access concerns for this static mutable variable. The u32::MAX is large + // enough range to statistically guarantee that no two pending transactions + // have the same ID. unsafe { COUNTER = COUNTER.wrapping_add(1).max(MAX_SVM_EXECUTORS); COUNTER diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index f50fb1346..1bda2734d 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -152,12 +152,13 @@ impl TransactionScheduler { /// Handles a new transaction from the global queue. async fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + // SAFETY: // This unwrap is safe due to the `if self.coordinator.is_ready()` - // guard in the `select!` macro. + // guard in the `select!` macro, which calls this method let executor = self .coordinator .get_ready_executor() - .unwrap_or(ExecutorId::MIN); + .expect("unreacheable code if there're not ready executors"); let txn = TransactionWithId::new(txn); self.schedule_transaction(executor, txn).await; } diff --git a/magicblock-processor/src/scheduler/tests.rs b/magicblock-processor/src/scheduler/tests.rs index 19ec9b501..7a1c2ba79 100644 --- a/magicblock-processor/src/scheduler/tests.rs +++ b/magicblock-processor/src/scheduler/tests.rs @@ -364,7 +364,6 @@ fn test_reschedule_multiple_blocked_on_same_executor() { let ready_exec = coordinator.get_ready_executor().unwrap(); let blocked_txn1 = coordinator.get_blocked_transaction(exec1).unwrap(); let result = coordinator.try_acquire_locks(ready_exec, &blocked_txn1); - println!("R: {result:?}"); assert!( result.is_ok(), "First blocked transaction should be reschedulable"