Skip to content

Commit

Permalink
Merge branch 'master' into refactor/gas-price-args
Browse files Browse the repository at this point in the history
  • Loading branch information
MitchTurner authored Feb 7, 2025
2 parents 5c4cd64 + e41d936 commit 76add3a
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 28 deletions.
6 changes: 5 additions & 1 deletion crates/fraud_proofs/global_merkle_root/storage/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ where
fn process_create_transaction(&mut self, tx: &Create) -> anyhow::Result<()> {
let bytecode_witness_index = tx.bytecode_witness_index();
let witnesses = tx.witnesses();
let bytecode = witnesses[usize::from(*bytecode_witness_index)].as_vec();
let bytecode = witnesses
.get(usize::from(*bytecode_witness_index))
.ok_or_else(|| anyhow!("invalid witness index {bytecode_witness_index}"))?
.as_vec();

// The Fuel specs mandate that each create transaction has exactly one output of type `Output::ContractCreated`.
// See https://docs.fuel.network/docs/specs/tx-format/transaction/#transactioncreate
let Some(Output::ContractCreated { contract_id, .. }) = tx
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ impl ShutdownListener {
}
}

#[async_trait::async_trait]
impl NotifyCancel for ShutdownListener {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.token.cancelled().await;
Expand Down
9 changes: 5 additions & 4 deletions crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,26 @@ use super::{
NotifyCancel,
};

pub struct Exporter<Fun> {
pub struct Exporter<Fun, N> {
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
task_manager: TaskManager<SnapshotFragment>,
task_manager: TaskManager<SnapshotFragment, N>,
multi_progress: MultipleProgressReporter,
}

impl<Fun> Exporter<Fun>
impl<Fun, N> Exporter<Fun, N>
where
Fun: Fn() -> anyhow::Result<SnapshotWriter>,
N: NotifyCancel + Send + Sync + Clone + 'static,
{
pub fn new(
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
cancel_token: impl NotifyCancel + Send + Sync + 'static,
cancel_token: N,
) -> Self {
Self {
db,
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-core/src/service/genesis/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ mod on_chain;

const GROUPS_NUMBER_FOR_PARALLELIZATION: usize = 10;

pub struct SnapshotImporter {
pub struct SnapshotImporter<N = StateWatcher> {
db: CombinedGenesisDatabase,
task_manager: TaskManager<()>,
task_manager: TaskManager<(), N>,
genesis_block: Block,
snapshot_reader: SnapshotReader,
multi_progress_reporter: MultipleProgressReporter,
Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-core/src/service/genesis/importer/import_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
service::genesis::{
progress::ProgressReporter,
task_manager::CancellationToken,
NotifyCancel,
},
};

Expand Down Expand Up @@ -100,7 +101,10 @@ where
for<'a> StorageTransaction<&'a mut GenesisDatabase<DbDesc>>:
StorageMutate<GenesisMetadata<DbDesc>, Error = fuel_core_storage::Error>,
{
pub fn run(mut self, cancel_token: CancellationToken) -> anyhow::Result<()> {
pub fn run<N>(mut self, cancel_token: CancellationToken<N>) -> anyhow::Result<()>
where
N: NotifyCancel + Send + Sync + 'static,
{
let mut db = self.db;
let mut is_cancelled = cancel_token.is_cancelled();
self.groups
Expand Down Expand Up @@ -609,7 +613,7 @@ mod tests {
assert!(result.is_err());
}

fn never_cancel() -> CancellationToken {
fn never_cancel() -> CancellationToken<tokio_util::sync::CancellationToken> {
CancellationToken::new(tokio_util::sync::CancellationToken::new())
}
}
40 changes: 22 additions & 18 deletions crates/fuel-core/src/service/genesis/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use futures::{
use itertools::Itertools;
use tokio::task::JoinSet;

pub struct TaskManager<T> {
pub struct TaskManager<T, N> {
set: JoinSet<anyhow::Result<T>>,
cancel_token: CancellationToken,
cancel_token: CancellationToken<N>,
}

#[async_trait::async_trait]
pub trait NotifyCancel {
async fn wait_until_cancelled(&self) -> anyhow::Result<()>;
fn wait_until_cancelled(
&self,
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
fn is_cancelled(&self) -> bool;
}

#[async_trait::async_trait]
impl NotifyCancel for tokio_util::sync::CancellationToken {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.cancelled().await;
Expand All @@ -30,7 +30,6 @@ impl NotifyCancel for tokio_util::sync::CancellationToken {
}
}

#[async_trait::async_trait]
impl NotifyCancel for StateWatcher {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
let mut state = self.clone();
Expand All @@ -50,13 +49,16 @@ impl NotifyCancel for StateWatcher {
/// A token that implements [`NotifyCancel`]. Given to jobs inside of [`TaskManager`] so they can
/// stop either when commanded by the [`TaskManager`] or by an outside source.
#[derive(Clone)]
pub struct CancellationToken {
outside_signal: Arc<dyn NotifyCancel + Send + Sync>,
pub struct CancellationToken<N> {
outside_signal: Arc<N>,
inner_signal: tokio_util::sync::CancellationToken,
}

impl CancellationToken {
pub fn new(outside_signal: impl NotifyCancel + Send + Sync + 'static) -> Self {
impl<N> CancellationToken<N>
where
N: NotifyCancel,
{
pub fn new(outside_signal: N) -> Self {
Self {
outside_signal: Arc::new(outside_signal),
inner_signal: tokio_util::sync::CancellationToken::new(),
Expand All @@ -66,16 +68,17 @@ impl CancellationToken {
pub fn cancel(&self) {
self.inner_signal.cancel()
}
}

impl CancellationToken {
pub fn is_cancelled(&self) -> bool {
self.inner_signal.is_cancelled() || self.outside_signal.is_cancelled()
}
}

impl<T> TaskManager<T> {
pub fn new(outside_cancel: impl NotifyCancel + Send + Sync + 'static) -> Self {
impl<T, N> TaskManager<T, N>
where
N: NotifyCancel + Clone,
{
pub fn new(outside_cancel: N) -> Self {
Self {
set: JoinSet::new(),
cancel_token: CancellationToken::new(outside_cancel),
Expand All @@ -84,20 +87,21 @@ impl<T> TaskManager<T> {

pub fn run<F>(&mut self, arg: F) -> anyhow::Result<T>
where
F: FnOnce(CancellationToken) -> anyhow::Result<T>,
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T>,
{
arg(self.cancel_token.clone())
}
}

impl<T> TaskManager<T>
impl<T, N> TaskManager<T, N>
where
T: Send + 'static,
N: NotifyCancel + Send + Sync + Clone + 'static,
{
#[cfg(test)]
pub fn spawn<F, Fut>(&mut self, arg: F)
where
F: FnOnce(CancellationToken) -> Fut,
F: FnOnce(CancellationToken<N>) -> Fut,
Fut: futures::Future<Output = anyhow::Result<T>> + Send + 'static,
{
let token = self.cancel_token.clone();
Expand All @@ -106,7 +110,7 @@ where

pub fn spawn_blocking<F>(&mut self, arg: F)
where
F: FnOnce(CancellationToken) -> anyhow::Result<T> + Send + 'static,
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T> + Send + 'static,
{
let token = self.cancel_token.clone();
self.set.spawn_blocking(move || arg(token));
Expand Down

0 comments on commit 76add3a

Please sign in to comment.