Skip to content

Commit 6580b7e

Browse files
committed
fix(notify_cancel): removed usage of async trait and made the trait dyn incompatible
1 parent 09a31ba commit 6580b7e

File tree

5 files changed

+35
-27
lines changed

5 files changed

+35
-27
lines changed

crates/fuel-core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ impl ShutdownListener {
9393
}
9494
}
9595

96-
#[async_trait::async_trait]
9796
impl NotifyCancel for ShutdownListener {
9897
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
9998
self.token.cancelled().await;

crates/fuel-core/src/service/genesis/exporter.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,26 @@ use super::{
6565
NotifyCancel,
6666
};
6767

68-
pub struct Exporter<Fun> {
68+
pub struct Exporter<Fun, N> {
6969
db: CombinedDatabase,
7070
prev_chain_config: ChainConfig,
7171
writer: Fun,
7272
group_size: usize,
73-
task_manager: TaskManager<SnapshotFragment>,
73+
task_manager: TaskManager<SnapshotFragment, N>,
7474
multi_progress: MultipleProgressReporter,
7575
}
7676

77-
impl<Fun> Exporter<Fun>
77+
impl<Fun, N> Exporter<Fun, N>
7878
where
7979
Fun: Fn() -> anyhow::Result<SnapshotWriter>,
80+
N: NotifyCancel + Send + Sync + Clone + 'static,
8081
{
8182
pub fn new(
8283
db: CombinedDatabase,
8384
prev_chain_config: ChainConfig,
8485
writer: Fun,
8586
group_size: usize,
86-
cancel_token: impl NotifyCancel + Send + Sync + 'static,
87+
cancel_token: N,
8788
) -> Self {
8889
Self {
8990
db,

crates/fuel-core/src/service/genesis/importer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ mod on_chain;
7373

7474
const GROUPS_NUMBER_FOR_PARALLELIZATION: usize = 10;
7575

76-
pub struct SnapshotImporter {
76+
pub struct SnapshotImporter<N = StateWatcher> {
7777
db: CombinedGenesisDatabase,
78-
task_manager: TaskManager<()>,
78+
task_manager: TaskManager<(), N>,
7979
genesis_block: Block,
8080
snapshot_reader: SnapshotReader,
8181
multi_progress_reporter: MultipleProgressReporter,

crates/fuel-core/src/service/genesis/importer/import_task.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
service::genesis::{
2525
progress::ProgressReporter,
2626
task_manager::CancellationToken,
27+
NotifyCancel,
2728
},
2829
};
2930

@@ -100,7 +101,10 @@ where
100101
for<'a> StorageTransaction<&'a mut GenesisDatabase<DbDesc>>:
101102
StorageMutate<GenesisMetadata<DbDesc>, Error = fuel_core_storage::Error>,
102103
{
103-
pub fn run(mut self, cancel_token: CancellationToken) -> anyhow::Result<()> {
104+
pub fn run<N>(mut self, cancel_token: CancellationToken<N>) -> anyhow::Result<()>
105+
where
106+
N: NotifyCancel + Send + Sync + 'static,
107+
{
104108
let mut db = self.db;
105109
let mut is_cancelled = cancel_token.is_cancelled();
106110
self.groups
@@ -609,7 +613,7 @@ mod tests {
609613
assert!(result.is_err());
610614
}
611615

612-
fn never_cancel() -> CancellationToken {
616+
fn never_cancel() -> CancellationToken<tokio_util::sync::CancellationToken> {
613617
CancellationToken::new(tokio_util::sync::CancellationToken::new())
614618
}
615619
}

crates/fuel-core/src/service/genesis/task_manager.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,18 @@ use futures::{
88
use itertools::Itertools;
99
use tokio::task::JoinSet;
1010

11-
pub struct TaskManager<T> {
11+
pub struct TaskManager<T, N> {
1212
set: JoinSet<anyhow::Result<T>>,
13-
cancel_token: CancellationToken,
13+
cancel_token: CancellationToken<N>,
1414
}
1515

16-
#[async_trait::async_trait]
1716
pub trait NotifyCancel {
18-
async fn wait_until_cancelled(&self) -> anyhow::Result<()>;
17+
fn wait_until_cancelled(
18+
&self,
19+
) -> impl core::future::Future<Output = anyhow::Result<()>> + Send;
1920
fn is_cancelled(&self) -> bool;
2021
}
2122

22-
#[async_trait::async_trait]
2323
impl NotifyCancel for tokio_util::sync::CancellationToken {
2424
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
2525
self.cancelled().await;
@@ -30,7 +30,6 @@ impl NotifyCancel for tokio_util::sync::CancellationToken {
3030
}
3131
}
3232

33-
#[async_trait::async_trait]
3433
impl NotifyCancel for StateWatcher {
3534
async fn wait_until_cancelled(&self) -> anyhow::Result<()> {
3635
let mut state = self.clone();
@@ -50,13 +49,16 @@ impl NotifyCancel for StateWatcher {
5049
/// A token that implements [`NotifyCancel`]. Given to jobs inside of [`TaskManager`] so they can
5150
/// stop either when commanded by the [`TaskManager`] or by an outside source.
5251
#[derive(Clone)]
53-
pub struct CancellationToken {
54-
outside_signal: Arc<dyn NotifyCancel + Send + Sync>,
52+
pub struct CancellationToken<N> {
53+
outside_signal: Arc<N>,
5554
inner_signal: tokio_util::sync::CancellationToken,
5655
}
5756

58-
impl CancellationToken {
59-
pub fn new(outside_signal: impl NotifyCancel + Send + Sync + 'static) -> Self {
57+
impl<N> CancellationToken<N>
58+
where
59+
N: NotifyCancel,
60+
{
61+
pub fn new(outside_signal: N) -> Self {
6062
Self {
6163
outside_signal: Arc::new(outside_signal),
6264
inner_signal: tokio_util::sync::CancellationToken::new(),
@@ -66,16 +68,17 @@ impl CancellationToken {
6668
pub fn cancel(&self) {
6769
self.inner_signal.cancel()
6870
}
69-
}
7071

71-
impl CancellationToken {
7272
pub fn is_cancelled(&self) -> bool {
7373
self.inner_signal.is_cancelled() || self.outside_signal.is_cancelled()
7474
}
7575
}
7676

77-
impl<T> TaskManager<T> {
78-
pub fn new(outside_cancel: impl NotifyCancel + Send + Sync + 'static) -> Self {
77+
impl<T, N> TaskManager<T, N>
78+
where
79+
N: NotifyCancel + Clone,
80+
{
81+
pub fn new(outside_cancel: N) -> Self {
7982
Self {
8083
set: JoinSet::new(),
8184
cancel_token: CancellationToken::new(outside_cancel),
@@ -84,20 +87,21 @@ impl<T> TaskManager<T> {
8487

8588
pub fn run<F>(&mut self, arg: F) -> anyhow::Result<T>
8689
where
87-
F: FnOnce(CancellationToken) -> anyhow::Result<T>,
90+
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T>,
8891
{
8992
arg(self.cancel_token.clone())
9093
}
9194
}
9295

93-
impl<T> TaskManager<T>
96+
impl<T, N> TaskManager<T, N>
9497
where
9598
T: Send + 'static,
99+
N: NotifyCancel + Send + Sync + Clone + 'static,
96100
{
97101
#[cfg(test)]
98102
pub fn spawn<F, Fut>(&mut self, arg: F)
99103
where
100-
F: FnOnce(CancellationToken) -> Fut,
104+
F: FnOnce(CancellationToken<N>) -> Fut,
101105
Fut: futures::Future<Output = anyhow::Result<T>> + Send + 'static,
102106
{
103107
let token = self.cancel_token.clone();
@@ -106,7 +110,7 @@ where
106110

107111
pub fn spawn_blocking<F>(&mut self, arg: F)
108112
where
109-
F: FnOnce(CancellationToken) -> anyhow::Result<T> + Send + 'static,
113+
F: FnOnce(CancellationToken<N>) -> anyhow::Result<T> + Send + 'static,
110114
{
111115
let token = self.cancel_token.clone();
112116
self.set.spawn_blocking(move || arg(token));

0 commit comments

Comments
 (0)