diff --git a/Cargo.lock b/Cargo.lock index b78e2b8e3..fed98bf54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3769,6 +3769,7 @@ dependencies = [ "magicblock-processor", "magicblock-program", "magicblock-validator-admin", + "num_cpus", "paste", "solana-feature-set", "solana-inline-spl", @@ -4032,6 +4033,7 @@ dependencies = [ "magicblock-ledger", "magicblock-program", "parking_lot 0.12.4", + "rustc-hash 2.1.1", "solana-account", "solana-address-lookup-table-program", "solana-bpf-loader-program", @@ -4039,6 +4041,7 @@ dependencies = [ "solana-feature-set", "solana-fee", "solana-fee-structure", + "solana-keypair", "solana-loader-v4-program", "solana-program", "solana-program-runtime", @@ -4119,6 +4122,7 @@ dependencies = [ "magicblock-api", "magicblock-config", "magicblock-version", + "num_cpus", "solana-sdk", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index a7b8e132b..6326cc3e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ rand = "0.8.5" rayon = "1.10.0" rusqlite = { version = "0.34.0", features = ["bundled"] } # bundled sqlite 3.44 rustc_version = "0.4" +rustc-hash = "2.1" scc = "2.4" semver = "1.0.22" serde = "1.0.217" diff --git a/magicblock-api/Cargo.toml b/magicblock-api/Cargo.toml index eedf2879a..c372bc911 100644 --- a/magicblock-api/Cargo.toml +++ b/magicblock-api/Cargo.toml @@ -35,6 +35,8 @@ magicblock-program = { workspace = true } magicblock-validator-admin = { workspace = true } magic-domain-program = { workspace = true } +num_cpus = { workspace = true } + solana-feature-set = { workspace = true } solana-inline-spl = { workspace = true } solana-rpc = { workspace = true } diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 6912c0789..ba2184f51 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -303,8 +303,14 @@ impl MagicValidator { base_fee: config.validator.base_fees.unwrap_or_default(), featureset: txn_scheduler_state.environment.feature_set.clone(), }; - let transaction_scheduler = - TransactionScheduler::new(1, txn_scheduler_state); + // We dedicate half of the available resources to the execution + // runtime, -1 is taken up by the transaction scheduler itself + let transaction_executors = + (num_cpus::get() / 2).saturating_sub(1).max(1) as u32; + let transaction_scheduler = TransactionScheduler::new( + transaction_executors, + txn_scheduler_state, + ); transaction_scheduler.spawn(); let shared_state = SharedState::new( diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 490720242..21c89339c 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -38,8 +38,11 @@ solana-transaction = { workspace = true } solana-transaction-status = { workspace = true } solana-transaction-error = { workspace = true } +rustc-hash = { workspace = true } + [dev-dependencies] guinea = { workspace = true } +solana-keypair = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } test-kit = { workspace = true } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 67f0deda5..44dd90e4d 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -20,7 +20,8 @@ use solana_svm::transaction_processor::{ use tokio::{runtime::Builder, sync::mpsc::Sender}; use crate::{ - builtins::BUILTINS, scheduler::state::TransactionSchedulerState, WorkerId, + builtins::BUILTINS, + scheduler::{locks::ExecutorId, state::TransactionSchedulerState}, }; /// A dedicated, single-threaded worker responsible for processing transactions using @@ -30,7 +31,7 @@ use crate::{ /// executors can be spawned to process transactions in parallel. pub(super) struct TransactionExecutor { /// A unique identifier for this worker instance. - id: WorkerId, + id: ExecutorId, /// A handle to the global accounts database for reading and writing account state. accountsdb: Arc, /// A handle to the global ledger for writing committed transaction history. @@ -50,7 +51,7 @@ pub(super) struct TransactionExecutor { /// A channel to send out account state updates after processing. accounts_tx: AccountUpdateTx, /// A back-channel to notify the `TransactionScheduler` that this worker is ready for more work. - ready_tx: Sender, + ready_tx: Sender, /// A read lock held during a slot's processing to synchronize with critical global /// operations like `AccountsDb` snapshots. sync: StWLock, @@ -65,10 +66,10 @@ impl TransactionExecutor { /// with a globally shared one. This allows updates made by one executor to be immediately /// visible to all others, preventing redundant program loads. pub(super) fn new( - id: WorkerId, + id: ExecutorId, state: &TransactionSchedulerState, rx: TransactionToProcessRx, - ready_tx: Sender, + ready_tx: Sender, index: Arc, programs_cache: Arc>>, ) -> Self { diff --git a/magicblock-processor/src/lib.rs b/magicblock-processor/src/lib.rs index 57736f81b..37e566dd2 100644 --- a/magicblock-processor/src/lib.rs +++ b/magicblock-processor/src/lib.rs @@ -11,8 +11,6 @@ use solana_program::feature; use solana_rent_collector::RentCollector; use solana_svm::transaction_processor::TransactionProcessingEnvironment; -type WorkerId = u8; - /// Initialize an SVM enviroment for transaction processing pub fn build_svm_env( accountsdb: &AccountsDb, diff --git a/magicblock-processor/src/scheduler.rs b/magicblock-processor/src/scheduler.rs deleted file mode 100644 index bf0f9cf73..000000000 --- a/magicblock-processor/src/scheduler.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::sync::{atomic::AtomicUsize, Arc, RwLock}; - -use log::info; -use magicblock_core::link::transactions::{ - ProcessableTransaction, TransactionToProcessRx, -}; -use magicblock_ledger::LatestBlock; -use solana_program_runtime::loaded_programs::ProgramCache; -use state::TransactionSchedulerState; -use tokio::{ - runtime::Builder, - sync::mpsc::{channel, Receiver, Sender}, -}; - -use crate::{ - executor::{SimpleForkGraph, TransactionExecutor}, - WorkerId, -}; - -/// The central transaction scheduler responsible for distributing work to a -/// pool of `TransactionExecutor` workers. -/// -/// This struct acts as the single entry point for all transactions entering the processing -/// pipeline. It receives transactions from a global queue and dispatches them to available -/// worker threads for execution or simulation. -pub struct TransactionScheduler { - /// The receiving end of the global queue for all new transactions. - transactions_rx: TransactionToProcessRx, - /// A channel that receives readiness notifications from workers, - /// indicating they are free to accept new work. - ready_rx: Receiver, - /// A list of sender channels, one for each `TransactionExecutor` worker. - executors: Vec>, - /// A handle to the globally shared cache for loaded BPF programs. - program_cache: Arc>>, - /// A handle to the globally shared state of the latest block. - latest_block: LatestBlock, - /// A shared atomic counter for ordering transactions within a single slot. - index: Arc, -} - -impl TransactionScheduler { - /// Creates and initializes a new `TransactionScheduler` and its associated pool of workers. - /// - /// This function performs the initial setup for the entire transaction processing pipeline: - /// 1. Prepares the shared program cache and ensures necessary sysvars are in the `AccountsDb`. - /// 2. Creates a pool of `TransactionExecutor` workers, each with its own dedicated channel. - /// 3. Spawns each worker in its own OS thread for maximum isolation and performance. - pub fn new(workers: u8, state: TransactionSchedulerState) -> Self { - let index = Arc::new(AtomicUsize::new(0)); - let mut executors = Vec::with_capacity(workers as usize); - - // Create the back-channel for workers to signal their readiness. - let (ready_tx, ready_rx) = channel(workers as usize); - // Perform one-time setup of the shared program cache and sysvars. - let program_cache = state.prepare_programs_cache(); - state.prepare_sysvars(); - - for id in 0..workers { - // Each executor has a channel capacity of 1, as it - // can only process one transaction at a time. - let (transactions_tx, transactions_rx) = channel(1); - let executor = TransactionExecutor::new( - id, - &state, - transactions_rx, - ready_tx.clone(), - index.clone(), - program_cache.clone(), - ); - executor.populate_builtins(); - executor.spawn(); - executors.push(transactions_tx); - } - Self { - transactions_rx: state.txn_to_process_rx, - ready_rx, - executors, - latest_block: state.ledger.latest_block().clone(), - program_cache, - index, - } - } - - /// Spawns the scheduler's main event loop into a new, dedicated OS thread. - /// - /// Similar to the executors, the scheduler runs in its own thread with a dedicated - /// single-threaded Tokio runtime for performance and to prevent it from interfering - /// with other application tasks. - pub fn spawn(self) { - let task = move || { - let runtime = Builder::new_current_thread() - .thread_name("transaction scheduler") - .build() - .expect( - "building single threaded tokio runtime should succeed", - ); - runtime.block_on(tokio::task::unconstrained(self.run())); - }; - std::thread::spawn(task); - } - - /// The main event loop of the transaction scheduler. - /// - /// This loop multiplexes between three primary events: - /// 1. Receiving a new transaction and dispatching it to an available worker. - /// 2. Receiving a readiness notification from a worker. - /// 3. Receiving a notification of a new block, triggering a slot transition. - async fn run(mut self) { - let mut block_produced = self.latest_block.subscribe(); - let mut ready = true; - loop { - tokio::select! { - biased; - // A worker has finished its task and is ready for more. - Some(_) = self.ready_rx.recv() => { - // TODO(bmuddha): - // This branch will be used by a multi-threaded scheduler - // with account-level locking to manage the pool of ready workers. - ready = true; - } - // Receive new transactions for scheduling. - Some(txn) = self.transactions_rx.recv(), if ready => { - // TODO(bmuddha): - // The current implementation sends to the first worker only. - // A future implementation with account-level locking will enable - // dispatching to any available worker. - let Some(tx) = self.executors.first() else { - continue; - }; - let _ = tx.send(txn).await; - ready = false; - } - // A new block has been produced. - _ = block_produced.recv() => { - self.transition_to_new_slot(); - } - // The main transaction channel has closed, indicating a system shutdown. - else => { - break - } - } - } - info!("transaction scheduler has terminated"); - } - - /// Updates the scheduler's state when a new slot begins. - fn transition_to_new_slot(&self) { - // Reset the intra-slot transaction index to zero. - self.index.store(0, std::sync::atomic::Ordering::Relaxed); - // Re-root the shared program cache to the new slot. - self.program_cache.write().unwrap().latest_root_slot = - self.latest_block.load().slot; - } -} - -pub mod state; diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs new file mode 100644 index 000000000..5309d07cf --- /dev/null +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -0,0 +1,173 @@ +//! Manages the state of transaction processing across multiple executors. +//! +//! This module contains the `ExecutionCoordinator`, which tracks ready executors, +//! queues of blocked transactions, and the locks held by each worker. It acts +//! as the central state machine for the scheduling process. + +use std::collections::VecDeque; + +use magicblock_core::link::transactions::ProcessableTransaction; + +use super::locks::{ + next_transaction_id, ExecutorId, LocksCache, RcLock, TransactionContention, + TransactionId, MAX_SVM_EXECUTORS, +}; + +/// A queue of transactions waiting for a specific executor to release a lock. +type TransactionQueue = VecDeque; +/// A list of transaction queues, indexed by `ExecutorId`. Each executor has its own queue. +type BlockedTransactionQueues = Vec; +/// A list of all locks acquired by an executor, indexed by `ExecutorId`. +type AcquiredLocks = Vec>; + +/// A transaction bundled with its unique ID for tracking purposes. +pub(super) struct TransactionWithId { + pub(super) id: TransactionId, + pub(super) txn: ProcessableTransaction, +} + +impl TransactionWithId { + /// Creates a new transaction with a unique ID. + pub(super) fn new(txn: ProcessableTransaction) -> Self { + Self { + id: next_transaction_id(), + txn, + } + } +} + +/// Manages the state for all transaction executors, including their +/// readiness, blocked transactions, and acquired account locks. +pub(super) struct ExecutionCoordinator { + /// A queue for each executor to hold transactions that are waiting for its locks. + blocked_transactions: BlockedTransactionQueues, + /// A map tracking which executor is blocking which transaction. + transaction_contention: TransactionContention, + /// A pool of executor IDs that are currently idle and ready for new work. + ready_executors: Vec, + /// A list of locks currently held by each executor. + acquired_locks: AcquiredLocks, + /// The global cache of all account locks. + locks: LocksCache, +} + +impl ExecutionCoordinator { + /// Creates a new `ExecutionCoordinator` for a given number of executors. + pub(super) fn new(count: usize) -> Self { + Self { + blocked_transactions: (0..count).map(|_| VecDeque::new()).collect(), + acquired_locks: (0..count).map(|_| Vec::new()).collect(), + ready_executors: (0..count as u32).collect(), + transaction_contention: TransactionContention::default(), + locks: LocksCache::default(), + } + } + + /// Queues a transaction that is blocked by a contended lock. + /// + /// The `blocker_id` can be either an `ExecutorId` or a `TransactionId`. + /// If it's a `TransactionId`, this function resolves it to the underlying + /// `ExecutorId` that holds the conflicting lock. + pub(super) fn queue_transaction( + &mut self, + mut blocker_id: u32, + transaction: TransactionWithId, + ) { + // A `blocker_id` greater than `MAX_SVM_EXECUTORS` is a `TransactionId` + // of another waiting transaction. We must resolve it to the actual executor. + if blocker_id >= MAX_SVM_EXECUTORS { + // A `TransactionId` is only returned as a blocker if that + // transaction is already tracked in the contention map. + blocker_id = self + .transaction_contention + .get(&blocker_id) + .copied() + // should never happen, but from a logical + // standpoint, it's not really an error + .unwrap_or(ExecutorId::MIN); + } + + let queue = &mut self.blocked_transactions[blocker_id as usize]; + self.transaction_contention + .insert(transaction.id, blocker_id); + queue.push_back(transaction); + } + + /// Checks if there are any executors ready to process a transaction. + pub(super) fn is_ready(&self) -> bool { + !self.ready_executors.is_empty() + } + + /// Retrieves the ID of a ready executor, if one is available. + pub(super) fn get_ready_executor(&mut self) -> Option { + self.ready_executors.pop() + } + + /// Returns an executor to the pool of ready executors. + pub(super) fn release_executor(&mut self, executor: ExecutorId) { + self.ready_executors.push(executor) + } + + /// Releases all account locks held by a specific executor. + pub(crate) fn unlock_accounts(&mut self, executor: ExecutorId) { + let locks = &mut self.acquired_locks[executor as usize]; + // Iteratively drain the list of acquired locks. + while let Some(lock) = locks.pop() { + lock.borrow_mut().unlock(executor); + } + } + + /// Retrieves the next blocked transaction waiting for a given executor. + pub(super) fn get_blocked_transaction( + &mut self, + executor: ExecutorId, + ) -> Option { + self.blocked_transactions[executor as usize].pop_front() + } + + /// Attempts to acquire all necessary read and write locks for a transaction. + /// + /// This function iterates through all accounts in the transaction's message and + /// attempts to acquire the appropriate lock for each. If any lock is contended, + /// it fails early and returns the ID of the blocking executor or transaction. + pub(super) fn try_acquire_locks( + &mut self, + executor: ExecutorId, + transaction: &TransactionWithId, + ) -> Result<(), u32> { + let message = transaction.txn.transaction.message(); + let accounts_to_lock = message.account_keys().iter().enumerate(); + let acquired_locks = &mut self.acquired_locks[executor as usize]; + + for (i, &acc) in accounts_to_lock { + // Get or create the lock for the account. + let lock = self.locks.entry(acc).or_default().clone(); + + // Attempt to acquire a write or read lock. + let result = if message.is_writable(i) { + lock.borrow_mut().write(executor, transaction.id) + } else { + lock.borrow_mut().read(executor, transaction.id) + }; + + // We couldn't lock all of the accounts, so we are bailing, but + // first we need to set contention, and unlock successful locks + if result.is_err() { + for lock in acquired_locks.drain(..) { + let mut lock = lock.borrow_mut(); + lock.contend(transaction.id); + lock.unlock(executor); + } + // for the lock that we failed to acquire, we just set the contention + lock.borrow_mut().contend(transaction.id); + } + result?; + + acquired_locks.push(lock); + } + + // On success, the transaction is no longer blocking anything. + self.transaction_contention.remove(&transaction.id); + Ok(()) + } +} diff --git a/magicblock-processor/src/scheduler/locks.rs b/magicblock-processor/src/scheduler/locks.rs new file mode 100644 index 000000000..dbf5c9c25 --- /dev/null +++ b/magicblock-processor/src/scheduler/locks.rs @@ -0,0 +1,122 @@ +//! Fast, in-memory account locking primitives for the multi-threaded scheduler. +//! +//! This version uses a single `u64` bitmask to represent the entire lock state, +//! including read locks, write locks, and contention, for maximum efficiency. + +use std::{cell::RefCell, rc::Rc}; + +use rustc_hash::FxHashMap; +use solana_pubkey::Pubkey; + +// A bitmask representing the lock state. +// - MSB: Write lock flag. +// - Remaining bits: Read locks for each executor. +type ReadWriteLock = u64; + +/// Unique identifier for a transaction executor worker. +pub(crate) type ExecutorId = u32; + +/// Unique identifier for a transaction to be scheduled. +pub(super) type TransactionId = u32; + +/// A shared, mutable reference to an `AccountLock`. +pub(super) type RcLock = Rc>; + +/// In-memory cache of account locks. +pub(super) type LocksCache = FxHashMap; +/// A map from a blocked transaction to the executor that holds the conflicting lock. +pub(super) type TransactionContention = FxHashMap; + +/// The maximum number of concurrent executors supported by the bitmask. +/// One bit is reserved for the write flag. +pub(super) const MAX_SVM_EXECUTORS: u32 = ReadWriteLock::BITS - 1; + +/// The bit used to indicate a write lock is held. This is the most significant bit. +const WRITE_BIT_MASK: u64 = 1 << (ReadWriteLock::BITS - 1); + +/// A read/write lock on a single Solana account, represented by a `u64` bitmask. +#[derive(Default, Debug)] +pub(super) struct AccountLock { + rw: ReadWriteLock, + contender: TransactionId, +} + +impl AccountLock { + /// Attempts to acquire a write lock. Fails if any other lock is held. + #[inline] + pub(super) fn write( + &mut self, + executor: ExecutorId, + txn: TransactionId, + ) -> Result<(), u32> { + self.contended(txn)?; + if self.rw != 0 { + // If the lock is held, `trailing_zeros()` will return the index of the + // least significant bit that is set. This corresponds to the ID of the + // executor that holds the lock. + return Err(self.rw.trailing_zeros()); + } + // Set the write lock bit and the bit for the acquiring executor. + self.rw = WRITE_BIT_MASK | (1 << executor); + self.contender = 0; + Ok(()) + } + + /// Attempts to acquire a read lock. Fails if a write lock is held. + #[inline] + pub(super) fn read( + &mut self, + executor: ExecutorId, + txn: TransactionId, + ) -> Result<(), u32> { + self.contended(txn)?; + // Check if the write lock bit is set. + if self.rw & WRITE_BIT_MASK != 0 { + // If a write lock is held, the conflicting executor is the one whose + // bit is set. We can find it using `trailing_zeros()`. + return Err(self.rw.trailing_zeros()); + } + // Set the bit corresponding to the executor to acquire a read lock. + self.rw |= 1 << executor; + self.contender = 0; + Ok(()) + } + + /// Releases a lock held by an executor. + #[inline] + pub(super) fn unlock(&mut self, executor: ExecutorId) { + // To release the lock, we clear both the write bit and the executor's + // read bit. This is done using a bitwise AND with the inverted mask. + self.rw &= !(WRITE_BIT_MASK | (1 << executor)); + } + + /// Checks if the lock is marked as contended by another transaction. + #[inline] + fn contended(&self, txn: TransactionId) -> Result<(), TransactionId> { + if self.contender != 0 && self.contender != txn { + return Err(self.contender); + } + Ok(()) + } + + #[inline] + pub(super) fn contend(&mut self, txn: TransactionId) { + if self.contender == 0 { + self.contender = txn; + } + } +} + +/// Generates a new, unique transaction ID. +pub(super) fn next_transaction_id() -> TransactionId { + static mut COUNTER: u32 = MAX_SVM_EXECUTORS; + // SAFETY: This is safe because the scheduler, which calls this function, + // operates in a single, dedicated thread. Therefore, there are no concurrent + // access concerns for this static mutable variable. The u32::MAX is large + // enough range to statistically guarantee that no two pending transactions + // have the same ID. + unsafe { + COUNTER = COUNTER.wrapping_add(1).max(MAX_SVM_EXECUTORS); + COUNTER + } +} diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs new file mode 100644 index 000000000..1bda2734d --- /dev/null +++ b/magicblock-processor/src/scheduler/mod.rs @@ -0,0 +1,219 @@ +//! The central transaction scheduler and its event loop. +//! +//! This module is the entry point for all transactions into the processing pipeline. +//! It is responsible for creating and managing a pool of `TransactionExecutor` +//! workers and dispatching transactions to them for execution. + +use std::sync::{atomic::AtomicUsize, Arc, RwLock}; + +use coordinator::{ExecutionCoordinator, TransactionWithId}; +use locks::{ExecutorId, MAX_SVM_EXECUTORS}; +use log::info; +use magicblock_core::link::transactions::{ + ProcessableTransaction, TransactionToProcessRx, +}; +use magicblock_ledger::LatestBlock; +use solana_program_runtime::loaded_programs::ProgramCache; +use state::TransactionSchedulerState; +use tokio::{ + runtime::Builder, + sync::mpsc::{channel, Receiver, Sender}, +}; + +use crate::executor::{SimpleForkGraph, TransactionExecutor}; + +/// The central transaction scheduler responsible for distributing work to a +/// pool of `TransactionExecutor` workers. +/// +/// This struct acts as the single entry point for all transactions entering the processing +/// pipeline. It receives transactions from a global queue and dispatches them to available +/// worker threads for execution or simulation. +pub struct TransactionScheduler { + /// Manages the state of all executors, including locks and blocked transactions. + coordinator: ExecutionCoordinator, + /// The receiving end of the global queue for all new transactions. + transactions_rx: TransactionToProcessRx, + /// A channel that receives readiness notifications from workers, + /// indicating they are free to accept new work. + ready_rx: Receiver, + /// A list of sender channels, one for each `TransactionExecutor` worker. + executors: Vec>, + /// A handle to the globally shared cache for loaded BPF programs. + program_cache: Arc>>, + /// A handle to the globally shared state of the latest block. + latest_block: LatestBlock, + /// A shared atomic counter for ordering transactions within a single slot. + index: Arc, +} + +impl TransactionScheduler { + /// Creates and initializes a new `TransactionScheduler` and its associated pool of workers. + /// + /// This function performs the initial setup for the entire transaction processing pipeline: + /// 1. Prepares the shared program cache and ensures necessary sysvars are in the `AccountsDb`. + /// 2. Creates a pool of `TransactionExecutor` workers, each with its own dedicated channel. + /// 3. Spawns each worker in its own OS thread for maximum isolation and performance. + pub fn new(executors: u32, state: TransactionSchedulerState) -> Self { + let count = executors.min(MAX_SVM_EXECUTORS) as usize; + let index = Arc::new(AtomicUsize::new(0)); + let mut executors = Vec::with_capacity(count); + + // Create the back-channel for workers to signal their readiness. + let (ready_tx, ready_rx) = channel(count); + // Perform one-time setup of the shared program cache and sysvars. + let program_cache = state.prepare_programs_cache(); + state.prepare_sysvars(); + + for id in 0..count { + // Each executor has a channel capacity of 1, as it + // can only process one transaction at a time. + let (transactions_tx, transactions_rx) = channel(1); + let executor = TransactionExecutor::new( + id as u32, + &state, + transactions_rx, + ready_tx.clone(), + index.clone(), + program_cache.clone(), + ); + executor.populate_builtins(); + executor.spawn(); + executors.push(transactions_tx); + } + let coordinator = ExecutionCoordinator::new(count); + Self { + coordinator, + transactions_rx: state.txn_to_process_rx, + ready_rx, + executors, + latest_block: state.ledger.latest_block().clone(), + program_cache, + index, + } + } + + /// Spawns the scheduler's main event loop into a new, dedicated OS thread. + /// + /// The scheduler runs in its own thread with a dedicated single-threaded Tokio + /// runtime. This design ensures that the scheduling logic, which is a critical + /// path, does not compete for resources with other tasks. + pub fn spawn(self) { + let task = move || { + let runtime = Builder::new_current_thread() + .thread_name("transaction-scheduler") + .build() + .expect("Failed to build single-threaded Tokio runtime"); + runtime.block_on(tokio::task::unconstrained(self.run())); + }; + std::thread::spawn(task); + } + + /// The main event loop of the transaction scheduler. + /// + /// This loop multiplexes between three primary events using `tokio::select!`: + /// 1. **Worker Readiness**: A worker signals it is ready for a new task. + /// 2. **New Transaction**: A new transaction arrives for processing. + /// 3. **New Block**: A new block is produced, triggering a slot transition. + /// + /// The `biased` selection ensures that ready workers are processed first, + /// which helps to keep the pipeline full and maximize throughput. + async fn run(mut self) { + let mut block_produced = self.latest_block.subscribe(); + loop { + tokio::select! { + biased; + // A worker has finished its task and is ready for more. + Some(executor) = self.ready_rx.recv() => { + self.handle_ready_executor(executor).await; + } + // Receive new transactions for scheduling, but + // only if there is at least one ready worker. + Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { + self.handle_new_transaction(txn).await; + } + // A new block has been produced. + _ = block_produced.recv() => { + self.transition_to_new_slot(); + } + // The main transaction channel has closed, indicating a system shutdown. + else => { + break + } + } + } + info!("Transaction scheduler has terminated"); + } + + /// Handles a notification that a worker has become ready. + async fn handle_ready_executor(&mut self, executor: ExecutorId) { + self.coordinator.unlock_accounts(executor); + self.reschedule_blocked_transactions(executor).await; + } + + /// Handles a new transaction from the global queue. + async fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + // SAFETY: + // This unwrap is safe due to the `if self.coordinator.is_ready()` + // guard in the `select!` macro, which calls this method + let executor = self + .coordinator + .get_ready_executor() + .expect("unreacheable code if there're not ready executors"); + let txn = TransactionWithId::new(txn); + self.schedule_transaction(executor, txn).await; + } + + /// Updates the scheduler's state when a new slot begins. + fn transition_to_new_slot(&self) { + // Reset the intra-slot transaction index to zero. + self.index.store(0, std::sync::atomic::Ordering::Relaxed); + // Re-root the shared program cache to the new slot. + self.program_cache.write().unwrap().latest_root_slot = + self.latest_block.load().slot; + } + + /// Attempts to reschedule transactions that were blocked by the newly freed executor. + async fn reschedule_blocked_transactions(&mut self, blocker: ExecutorId) { + let mut executor = Some(blocker); + while let Some(exec) = executor { + let txn = self.coordinator.get_blocked_transaction(blocker); + if let Some(txn) = txn { + self.schedule_transaction(exec, txn).await; + executor = self.coordinator.get_ready_executor(); + } else { + self.coordinator.release_executor(exec); + break; + } + } + } + + /// Attempts to schedule a single transaction for execution. + /// + /// If the transaction's required account locks are acquired, it is sent to the + /// specified executor. Otherwise, it is queued and will be retried later. + async fn schedule_transaction( + &mut self, + executor: ExecutorId, + txn: TransactionWithId, + ) { + if let Err(blocker) = self.coordinator.try_acquire_locks(executor, &txn) + { + self.coordinator.release_executor(executor); + self.coordinator.queue_transaction(blocker, txn); + return; + } + // It's safe to ignore the result of the send operation. If the send fails, + // it means the executor's channel is closed, which only happens on shutdown. + let _ = self.executors[executor as usize].send(txn.txn).await; + } +} + +pub mod coordinator; +pub mod locks; +pub mod state; +#[cfg(test)] +mod tests; + +// SAFETY: +// Rc used within the scheduler never escapes to other threads +unsafe impl Send for TransactionScheduler {} diff --git a/magicblock-processor/src/scheduler/tests.rs b/magicblock-processor/src/scheduler/tests.rs new file mode 100644 index 000000000..7a1c2ba79 --- /dev/null +++ b/magicblock-processor/src/scheduler/tests.rs @@ -0,0 +1,467 @@ +use super::coordinator::{ExecutionCoordinator, TransactionWithId}; + +use magicblock_core::link::transactions::{ + ProcessableTransaction, SanitizeableTransaction, TransactionProcessingMode, +}; +use solana_keypair::Keypair; +use solana_program::{ + hash::Hash, + instruction::{AccountMeta, Instruction}, +}; +use solana_pubkey::Pubkey; +use solana_signer::Signer; +use solana_transaction::Transaction; + +// --- Test Setup --- + +/// Creates a mock transaction with the specified accounts for testing. +fn create_mock_transaction( + accounts: &[(Pubkey, bool)], // A tuple of (PublicKey, is_writable) +) -> TransactionWithId { + let payer = Keypair::new(); + let instructions: Vec = accounts + .iter() + .map(|(pubkey, is_writable)| { + let meta = if *is_writable { + AccountMeta::new(*pubkey, false) + } else { + AccountMeta::new_readonly(*pubkey, false) + }; + Instruction::new_with_bincode(Pubkey::new_unique(), &(), vec![meta]) + }) + .collect(); + + let transaction = Transaction::new_signed_with_payer( + &instructions, + Some(&payer.pubkey()), + &[payer], + Hash::new_unique(), + ); + + let processable_txn = ProcessableTransaction { + transaction: transaction.sanitize(false).unwrap(), + mode: TransactionProcessingMode::Execution(None), + }; + TransactionWithId::new(processable_txn) +} + +// --- Basic Tests --- + +#[test] +/// Tests that two transactions with no overlapping accounts can be scheduled concurrently. +fn test_non_conflicting_transactions() { + let mut coordinator = ExecutionCoordinator::new(2); + + // Two transactions writing to different accounts + let txn1 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + let txn2 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + + // Both transactions should acquire locks without any issues. + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire lock without conflict" + ); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Txn2 should acquire lock without conflict" + ); +} + +#[test] +/// Tests that multiple transactions can take read locks on the same account concurrently. +fn test_read_read_no_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + + // Both transactions should be able to acquire read locks on the same account. + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire read lock" + ); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Txn2 should also acquire read lock" + ); +} + +// --- Contention Tests --- + +#[test] +/// Tests that a write lock blocks another write lock on the same account. +fn test_write_write_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire write lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + + // Txn2 should be blocked by the executor holding the lock (exec1). + assert_eq!(blocker, exec1, "Txn2 should be blocked by executor 1"); +} + +#[test] +/// Tests that a write lock blocks a read lock on the same account. +fn test_write_read_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire write lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + + // Txn2 should be blocked by exec1. + assert_eq!( + blocker, exec1, + "Read lock (Txn2) should be blocked by write lock (Txn1)" + ); +} + +#[test] +/// Tests that a read lock blocks a write lock on the same account. +fn test_read_write_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should acquire read lock" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + + // Txn2 should be blocked by exec1. + assert_eq!( + blocker, exec1, + "Write lock (Txn2) should be blocked by read lock (Txn1)" + ); +} + +// --- Advanced Scenarios --- + +#[test] +/// Tests contention with a mix of read and write locks across multiple accounts. +fn test_multiple_mixed_locks_contention() { + let mut coordinator = ExecutionCoordinator::new(2); + + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let acc_c = Pubkey::new_unique(); + + // Txn 1: Writes A, Reads B + let txn1 = create_mock_transaction(&[(acc_a, true), (acc_b, false)]); + // Txn 2: Reads A, Writes C + let txn2 = create_mock_transaction(&[(acc_a, false), (acc_c, true)]); + // Txn 3: Writes B, Writes C + let txn3 = create_mock_transaction(&[(acc_b, true), (acc_c, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec1, &txn1).is_ok(), + "Txn1 should lock A (write) and B (read)" + ); + + let exec2 = coordinator.get_ready_executor().unwrap(); + // Txn2 should be blocked by Txn1's write lock on A. + assert_eq!( + coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(), + exec1, + "Txn2 should be blocked by Txn1 on account A" + ); + + // Txn3 should be blocked by Txn1's read lock on B. + assert_eq!( + coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(), + exec1, + "Txn3 should be blocked by Txn1 on account B" + ); +} + +#[test] +/// Tests a chain of dependencies: Txn3 waits for Txn2, which waits for Txn1. +fn test_transaction_dependency_chain() { + let mut coordinator = ExecutionCoordinator::new(3); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true), (acc_a, false)]); + let txn3 = create_mock_transaction(&[(acc_b, false)]); + + // Schedule Txn1, which locks A for writing. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // Txn2 needs to read A, so it's blocked by Txn1. + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + assert_eq!(blocker1, exec1, "Txn2 should be blocked by exec1"); + coordinator.queue_transaction(blocker1, txn2); + + // Txn3 needs to read B, but Txn2 (which writes to B) is already queued. + // So, Txn3 should be blocked by Txn2's transaction ID. + let exec3 = coordinator.get_ready_executor().unwrap(); + let blocker2 = coordinator.try_acquire_locks(exec3, &txn3).unwrap_err(); + let blocked_txn = coordinator.get_blocked_transaction(exec1).unwrap(); + assert_eq!( + blocker2, blocked_txn.id, + "Txn3 should be blocked by the transaction ID of Txn2" + ); +} + +#[test] +/// Simulates a scenario where all executors are busy, and a new transaction gets queued and then rescheduled. +fn test_full_executor_pool_and_reschedule() { + let mut coordinator = ExecutionCoordinator::new(2); + + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let acc_c = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + let txn3 = create_mock_transaction(&[(acc_a, true), (acc_c, true)]); + + // Occupy both available executors. + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + + // No more ready executors should be available. + assert!( + coordinator.get_ready_executor().is_none(), + "Executor pool should be empty" + ); + + // Txn3 arrives and contends with Txn1 on account A. + let blocker = coordinator.try_acquire_locks(exec1, &txn3).unwrap_err(); + assert_eq!(blocker, exec1); + coordinator.queue_transaction(blocker, txn3); + + // Executor 1 finishes its work and releases its locks. + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + // Now that an executor is free, we should be able to reschedule the blocked transaction. + let ready_exec = coordinator.get_ready_executor().unwrap(); + let blocked_txn = coordinator.get_blocked_transaction(exec1).unwrap(); + assert!( + coordinator + .try_acquire_locks(ready_exec, &blocked_txn) + .is_ok(), + "Should be able to reschedule the blocked transaction" + ); +} + +// --- Edge Cases --- + +#[test] +/// Tests that a transaction with no accounts can be processed without issues. +fn test_transaction_with_no_accounts() { + let mut coordinator = ExecutionCoordinator::new(1); + let txn = create_mock_transaction(&[]); + let exec = coordinator.get_ready_executor().unwrap(); + + assert!( + coordinator.try_acquire_locks(exec, &txn).is_ok(), + "Transaction with no accounts should not fail" + ); +} + +#[test] +/// Tests that many read locks can be acquired on the same account concurrently. +fn test_multiple_read_locks_on_same_account() { + let mut coordinator = ExecutionCoordinator::new(3); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, false)]); + let txn2 = create_mock_transaction(&[(shared_account, false)]); + let txn3 = create_mock_transaction(&[(shared_account, false)]); + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + let exec3 = coordinator.get_ready_executor().unwrap(); + + // All three should acquire read locks without contention. + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + assert!(coordinator.try_acquire_locks(exec3, &txn3).is_ok()); +} + +#[test] +/// Tests a rapid lock-unlock-lock cycle to ensure state is managed correctly. +fn test_rapid_lock_unlock_cycle() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + + // Lock, unlock, and then lock again with a different transaction. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!( + coordinator.try_acquire_locks(exec2, &txn2).is_ok(), + "Should be able to lock the account again after it was released" + ); +} + +#[test] +/// Tests rescheduling multiple transactions that were all blocked by the same executor. +fn test_reschedule_multiple_blocked_on_same_executor() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + let txn3 = create_mock_transaction(&[(shared_account, true)]); + + // Txn1 takes the lock. Txn2 and Txn3 are queued as blocked. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + coordinator.queue_transaction(blocker1, txn2); + let blocker2 = coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(); + coordinator.queue_transaction(blocker2, txn3); + + // Txn1 finishes. + coordinator.unlock_accounts(exec1); + coordinator.release_executor(exec1); + + // The first blocked transaction (Txn2) should now be schedulable. + let ready_exec = coordinator.get_ready_executor().unwrap(); + let blocked_txn1 = coordinator.get_blocked_transaction(exec1).unwrap(); + let result = coordinator.try_acquire_locks(ready_exec, &blocked_txn1); + assert!( + result.is_ok(), + "First blocked transaction should be reschedulable" + ); + + // The second blocked transaction (Txn3) should still be in the queue. + assert!( + coordinator.get_blocked_transaction(exec1).is_some(), + "Second blocked transaction should still be queued" + ); +} + +#[test] +/// Tests a transaction that contends on multiple accounts held by different executors. +fn test_contention_on_multiple_accounts() { + let mut coordinator = ExecutionCoordinator::new(3); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + // This transaction will contend with both Txn1 and Txn2. + let txn3 = create_mock_transaction(&[(acc_a, true), (acc_b, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + let exec2 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + assert!(coordinator.try_acquire_locks(exec2, &txn2).is_ok()); + + let exec3 = coordinator.get_ready_executor().unwrap(); + // The coordinator should report the first detected contention. + let blocker = coordinator.try_acquire_locks(exec3, &txn3).unwrap_err(); + assert_eq!( + blocker, exec1, + "Should be blocked by the first contended account (A)" + ); +} + +#[test] +/// Tests that no ready executors are available when the pool is fully utilized. +fn test_no_ready_executors() { + let mut coordinator = ExecutionCoordinator::new(1); + let txn1 = create_mock_transaction(&[(Pubkey::new_unique(), true)]); + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // The only executor is now busy. + assert!( + coordinator.get_ready_executor().is_none(), + "There should be no ready executors" + ); +} + +#[test] +/// Tests that an executor can release locks and immediately reacquire new ones. +fn test_release_and_reacquire_lock() { + let mut coordinator = ExecutionCoordinator::new(1); + let acc_a = Pubkey::new_unique(); + let acc_b = Pubkey::new_unique(); + let txn1 = create_mock_transaction(&[(acc_a, true)]); + let txn2 = create_mock_transaction(&[(acc_b, true)]); + + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + coordinator.unlock_accounts(exec1); + + // The executor should be able to immediately acquire a lock on a different account. + assert!( + coordinator.try_acquire_locks(exec1, &txn2).is_ok(), + "Executor should be able to reacquire a lock after releasing" + ); +} + +#[test] +/// Tests a scenario where a transaction is blocked by another transaction that is itself already queued. +fn test_transaction_blocked_by_queued_transaction() { + let mut coordinator = ExecutionCoordinator::new(2); + let shared_account = Pubkey::new_unique(); + + let txn1 = create_mock_transaction(&[(shared_account, true)]); + let txn2 = create_mock_transaction(&[(shared_account, true)]); + let txn3 = create_mock_transaction(&[(shared_account, true)]); + + // Txn1 acquires the lock. + let exec1 = coordinator.get_ready_executor().unwrap(); + assert!(coordinator.try_acquire_locks(exec1, &txn1).is_ok()); + + // Txn2 is blocked by Txn1. + let exec2 = coordinator.get_ready_executor().unwrap(); + let blocker1 = coordinator.try_acquire_locks(exec2, &txn2).unwrap_err(); + assert_eq!(blocker1, exec1); + coordinator.queue_transaction(blocker1, txn2); + + // Txn3 is blocked by the already queued Txn2. The error should be the transaction ID. + let blocker2 = coordinator.try_acquire_locks(exec2, &txn3).unwrap_err(); + let blocked_txn = coordinator.get_blocked_transaction(exec1).unwrap(); + assert_eq!( + blocker2, blocked_txn.id, + "Txn3 should be blocked by the ID of the queued Txn2" + ); +} diff --git a/magicblock-validator/Cargo.toml b/magicblock-validator/Cargo.toml index 4820ca4b4..9ac23d13a 100644 --- a/magicblock-validator/Cargo.toml +++ b/magicblock-validator/Cargo.toml @@ -16,6 +16,7 @@ log = { workspace = true } magicblock-api = { workspace = true } magicblock-config = { workspace = true } magicblock-version = { workspace = true } +num_cpus = { workspace = true } solana-sdk = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/magicblock-validator/src/main.rs b/magicblock-validator/src/main.rs index 2496bff0c..f7f0b9d70 100644 --- a/magicblock-validator/src/main.rs +++ b/magicblock-validator/src/main.rs @@ -7,6 +7,7 @@ use magicblock_api::{ }; use magicblock_config::MagicBlockConfig; use solana_sdk::signature::Signer; +use tokio::runtime::Builder; use crate::shutdown::Shutdown; @@ -49,8 +50,21 @@ fn init_logger() { }); } -#[tokio::main] -async fn main() { +fn main() { + // We dedicate half of the threads to async runtime (where RPC and other + // io/timer bound services are running), and the other half is allocated + // for the execution runtime (transaction scheduler/executor threads) + let workers = (num_cpus::get() / 2).max(1); + let runtime = Builder::new_multi_thread() + .worker_threads(workers) + .enable_all() + .thread_name("async-runtime") + .build() + .expect("failed to build async runtime"); + runtime.block_on(run()); +} + +async fn run() { init_logger(); #[cfg(feature = "tokio-console")] console_subscriber::init();