Skip to content

fix(notify_cancel): removed usage of async trait and made the trait dyn incompatible #2665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ impl ShutdownListener {
}
}

#[async_trait::async_trait]
impl NotifyCancel for ShutdownListener {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.token.cancelled().await;
Expand Down
9 changes: 5 additions & 4 deletions crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,26 @@ use super::{
NotifyCancel,
};

pub struct Exporter<Fun> {
pub struct Exporter<Fun, N> {
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
task_manager: TaskManager<SnapshotFragment>,
task_manager: TaskManager<SnapshotFragment, N>,
multi_progress: MultipleProgressReporter,
}

impl<Fun> Exporter<Fun>
impl<Fun, N> Exporter<Fun, N>
where
Fun: Fn() -> anyhow::Result<SnapshotWriter>,
N: NotifyCancel + Send + Sync + Clone + 'static,
{
pub fn new(
db: CombinedDatabase,
prev_chain_config: ChainConfig,
writer: Fun,
group_size: usize,
cancel_token: impl NotifyCancel + Send + Sync + 'static,
cancel_token: N,
) -> Self {
Self {
db,
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-core/src/service/genesis/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ mod on_chain;

const GROUPS_NUMBER_FOR_PARALLELIZATION: usize = 10;

pub struct SnapshotImporter {
pub struct SnapshotImporter<N = StateWatcher> {
db: CombinedGenesisDatabase,
task_manager: TaskManager<()>,
task_manager: TaskManager<(), N>,
genesis_block: Block,
snapshot_reader: SnapshotReader,
multi_progress_reporter: MultipleProgressReporter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
service::genesis::{
progress::ProgressReporter,
task_manager::CancellationToken,
NotifyCancel,
},
};

Expand Down Expand Up @@ -100,7 +101,10 @@ where
for<'a> StorageTransaction<&'a mut GenesisDatabase<DbDesc>>:
StorageMutate<GenesisMetadata<DbDesc>, Error = fuel_core_storage::Error>,
{
pub fn run(mut self, cancel_token: CancellationToken) -> anyhow::Result<()> {
pub fn run<N>(mut self, cancel_token: CancellationToken<N>) -> anyhow::Result<()>
where
N: NotifyCancel + Send + Sync + 'static,
{
let mut db = self.db;
let mut is_cancelled = cancel_token.is_cancelled();
self.groups
Expand Down Expand Up @@ -609,7 +613,7 @@ mod tests {
assert!(result.is_err());
}

fn never_cancel() -> CancellationToken {
fn never_cancel() -> CancellationToken<tokio_util::sync::CancellationToken> {
CancellationToken::new(tokio_util::sync::CancellationToken::new())
}
}
40 changes: 22 additions & 18 deletions crates/fuel-core/src/service/genesis/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ use futures::{
use itertools::Itertools;
use tokio::task::JoinSet;

pub struct TaskManager<T> {
pub struct TaskManager<T, N> {
set: JoinSet<anyhow::Result<T>>,
cancel_token: CancellationToken,
cancel_token: CancellationToken<N>,
}

#[async_trait::async_trait]
pub trait NotifyCancel {
async fn wait_until_cancelled(&self) -> anyhow::Result<()>;
fn wait_until_cancelled(
&self,
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
fn is_cancelled(&self) -> bool;
}

#[async_trait::async_trait]
impl NotifyCancel for tokio_util::sync::CancellationToken {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
self.cancelled().await;
Expand All @@ -30,7 +30,6 @@ impl NotifyCancel for tokio_util::sync::CancellationToken {
}
}

#[async_trait::async_trait]
impl NotifyCancel for StateWatcher {
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
let mut state = self.clone();
Expand All @@ -50,13 +49,16 @@ impl NotifyCancel for StateWatcher {
/// A token that implements [`NotifyCancel`]. Given to jobs inside of [`TaskManager`] so they can
/// stop either when commanded by the [`TaskManager`] or by an outside source.
#[derive(Clone)]
pub struct CancellationToken {
outside_signal: Arc<dyn NotifyCancel + Send + Sync>,
pub struct CancellationToken<N> {
outside_signal: Arc<N>,
inner_signal: tokio_util::sync::CancellationToken,
}

impl CancellationToken {
pub fn new(outside_signal: impl NotifyCancel + Send + Sync + 'static) -> Self {
impl<N> CancellationToken<N>
where
N: NotifyCancel,
{
pub fn new(outside_signal: N) -> Self {
Self {
outside_signal: Arc::new(outside_signal),
inner_signal: tokio_util::sync::CancellationToken::new(),
Expand All @@ -66,16 +68,17 @@ impl CancellationToken {
pub fn cancel(&self) {
self.inner_signal.cancel()
}
}

impl CancellationToken {
pub fn is_cancelled(&self) -> bool {
self.inner_signal.is_cancelled() || self.outside_signal.is_cancelled()
}
}

impl<T> TaskManager<T> {
pub fn new(outside_cancel: impl NotifyCancel + Send + Sync + 'static) -> Self {
impl<T, N> TaskManager<T, N>
where
N: NotifyCancel + Clone,
{
pub fn new(outside_cancel: N) -> Self {
Self {
set: JoinSet::new(),
cancel_token: CancellationToken::new(outside_cancel),
Expand All @@ -84,20 +87,21 @@ impl<T> TaskManager<T> {

pub fn run<F>(&mut self, arg: F) -> anyhow::Result<T>
where
F: FnOnce(CancellationToken) -> anyhow::Result<T>,
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T>,
{
arg(self.cancel_token.clone())
}
}

impl<T> TaskManager<T>
impl<T, N> TaskManager<T, N>
where
T: Send + 'static,
N: NotifyCancel + Send + Sync + Clone + 'static,
{
#[cfg(test)]
pub fn spawn<F, Fut>(&mut self, arg: F)
where
F: FnOnce(CancellationToken) -> Fut,
F: FnOnce(CancellationToken<N>) -> Fut,
Fut: futures::Future<Output = anyhow::Result<T>> + Send + 'static,
{
let token = self.cancel_token.clone();
Expand All @@ -106,7 +110,7 @@ where

pub fn spawn_blocking<F>(&mut self, arg: F)
where
F: FnOnce(CancellationToken) -> anyhow::Result<T> + Send + 'static,
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T> + Send + 'static,
{
let token = self.cancel_token.clone();
self.set.spawn_blocking(move || arg(token));
Expand Down
Loading