From 3731e6db2e3320a948b9d25361335cb7e8c62faf Mon Sep 17 00:00:00 2001 From: Kasper Ziemianek Date: Thu, 7 Nov 2024 11:28:49 +0100 Subject: [PATCH 1/2] add start block --- .../executor-core/src/listener.rs | 102 ++++++++++-------- .../omni-executor/executor-worker/src/cli.rs | 1 + .../omni-executor/executor-worker/src/main.rs | 5 +- .../parentchain/listener/src/rpc_client.rs | 13 ++- 4 files changed, 69 insertions(+), 52 deletions(-) diff --git a/tee-worker/omni-executor/executor-core/src/listener.rs b/tee-worker/omni-executor/executor-core/src/listener.rs index 126c45e7a3..ce343f5d31 100644 --- a/tee-worker/omni-executor/executor-core/src/listener.rs +++ b/tee-worker/omni-executor/executor-core/src/listener.rs @@ -96,7 +96,6 @@ impl< log::debug!("Starting sync from {:?}", block_number_to_sync); 'main: loop { - log::info!("Syncing block: {}", block_number_to_sync); if self.stop_signal.try_recv().is_ok() { break; } @@ -135,16 +134,41 @@ impl< None => false, }; + let mut sync_error = false; + if last_finalized_block >= block_number_to_sync { - if let Ok(events) = - self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync)) - { - for event in events { - let event_id = event.get_event_id().clone(); - if let Some(ref checkpoint) = - self.checkpoint_repository.get().expect("Could not read checkpoint") - { - if checkpoint.lt(&event.get_event_id().clone().into()) { + log::info!("Syncing block: {}", block_number_to_sync); + match self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync)) { + Ok(events) => { + for event in events { + let event_id = event.get_event_id().clone(); + if let Some(ref checkpoint) = + self.checkpoint_repository.get().expect("Could not read checkpoint") + { + if checkpoint.lt(&event.get_event_id().clone().into()) { + log::info!("Handling event: {:?}", event_id); + if let Err(e) = + self.handle.block_on(self.intent_event_handler.handle(event)) + { + log::error!("Could not handle event: {:?}", e); + match e { + Error::NonRecoverableError => { + error!("Non-recoverable intent handling error, event: {:?}", event_id); + break 'main; + }, + Error::RecoverableError => { + error!( + "Recoverable intent handling error, event: {:?}", + event_id + ); + continue 'main; + }, + } + } + } else { + log::debug!("Skipping event"); + } + } else { log::info!("Handling event: {:?}", event_id); if let Err(e) = self.handle.block_on(self.intent_event_handler.handle(event)) @@ -152,59 +176,43 @@ impl< log::error!("Could not handle event: {:?}", e); match e { Error::NonRecoverableError => { - error!("Non-recoverable intent handling error, event: {:?}", event_id); - break 'main; - }, - Error::RecoverableError => { error!( - "Recoverable intent handling error, event: {:?}", - event_id - ); - continue 'main; - }, - } - } - } else { - log::debug!("Skipping event"); - } - } else { - log::info!("Handling event: {:?}", event_id); - if let Err(e) = - self.handle.block_on(self.intent_event_handler.handle(event)) - { - log::error!("Could not handle event: {:?}", e); - match e { - Error::NonRecoverableError => { - error!( "Non-recoverable intent handling error, event: {:?}", event_id ); - break 'main; - }, - Error::RecoverableError => { - error!( + break 'main; + }, + Error::RecoverableError => { + error!( "Recoverable intent handling error, event: {:?}", event_id ); - continue 'main; - }, + continue 'main; + }, + } } } + self.checkpoint_repository + .save(event_id.into()) + .expect("Could not save checkpoint"); } + // we processed block completely so store new checkpoint self.checkpoint_repository - .save(event_id.into()) + .save(CheckpointT::from(block_number_to_sync)) .expect("Could not save checkpoint"); + log::info!("Finished syncing block: {}", block_number_to_sync); + block_number_to_sync += 1; + }, + Err(e) => { + log::error!("Could not get block {} events: {:?}", block_number_to_sync, e); + sync_error = true; } - // we processed block completely so store new checkpoint - self.checkpoint_repository - .save(CheckpointT::from(block_number_to_sync)) - .expect("Could not save checkpoint"); - log::info!("Finished syncing block: {}", block_number_to_sync); - block_number_to_sync += 1; } + } else { + log::trace!("Block: {} not yet finalized", block_number_to_sync); } - if !fast { + if !fast || sync_error { sleep(Duration::from_secs(1)) } else { log::trace!("Fast sync skipping 1s wait"); diff --git a/tee-worker/omni-executor/executor-worker/src/cli.rs b/tee-worker/omni-executor/executor-worker/src/cli.rs index 3ee590d34b..ab360db4b9 100644 --- a/tee-worker/omni-executor/executor-worker/src/cli.rs +++ b/tee-worker/omni-executor/executor-worker/src/cli.rs @@ -5,4 +5,5 @@ use clap::Parser; pub struct Cli { pub parentchain_url: String, pub ethereum_url: String, + pub start_block: u64 } diff --git a/tee-worker/omni-executor/executor-worker/src/main.rs b/tee-worker/omni-executor/executor-worker/src/main.rs index 7eb630e1ff..bf8a0a9ea7 100644 --- a/tee-worker/omni-executor/executor-worker/src/main.rs +++ b/tee-worker/omni-executor/executor-worker/src/main.rs @@ -48,7 +48,7 @@ async fn main() -> Result<(), ()> { error!("Could not create data dir: {:?}", e); })?; - listen_to_parentchain(cli.parentchain_url, cli.ethereum_url) + listen_to_parentchain(cli.parentchain_url, cli.ethereum_url, cli.start_block) .await .unwrap() .join() @@ -59,6 +59,7 @@ async fn main() -> Result<(), ()> { async fn listen_to_parentchain( parentchain_url: String, ethereum_url: String, + start_block: u64 ) -> Result, ()> { let (_sub_stop_sender, sub_stop_receiver) = oneshot::channel(); let ethereum_intent_executor = @@ -75,6 +76,6 @@ async fn listen_to_parentchain( Ok(thread::Builder::new() .name("litentry_rococo_sync".to_string()) - .spawn(move || parentchain_listener.sync(0)) + .spawn(move || parentchain_listener.sync(start_block)) .unwrap()) } diff --git a/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs b/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs index 727e5fac63..a4f0f91428 100644 --- a/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs +++ b/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs @@ -16,7 +16,7 @@ use crate::primitives::{BlockEvent, EventId}; use async_trait::async_trait; -use log::error; +use log::{info, error}; use parity_scale_codec::Encode; use std::marker::PhantomData; use std::ops::Deref; @@ -67,9 +67,16 @@ impl> SubstrateRpcClient Result, ()> { - match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|_| ())? { + info!("Getting block {} events", block_num); + match self.legacy.chain_get_block_hash(Some(block_num.into())).await + .map_err(|e| + { + error!("Error getting block {} hash: {:?}", block_num, e); + })? { Some(hash) => { - let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|_| ())?; + let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|e| { + error!("Error getting block {} events: {:?}", block_num, e); + })?; Ok(events .iter() .enumerate() From 6f78663f98ddfa8f72a14fda0bfea3d52477ddb3 Mon Sep 17 00:00:00 2001 From: Kasper Ziemianek Date: Thu, 7 Nov 2024 13:55:29 +0100 Subject: [PATCH 2/2] improve signal handling --- tee-worker/omni-executor/Cargo.lock | 1 + .../omni-executor/executor-core/src/listener.rs | 13 +++++++------ .../omni-executor/executor-worker/Cargo.toml | 2 +- .../omni-executor/executor-worker/src/cli.rs | 2 +- .../omni-executor/executor-worker/src/main.rs | 14 +++++++++++--- .../parentchain/listener/src/rpc_client.rs | 10 ++++------ 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/tee-worker/omni-executor/Cargo.lock b/tee-worker/omni-executor/Cargo.lock index 63b09a07f6..6d589b911f 100644 --- a/tee-worker/omni-executor/Cargo.lock +++ b/tee-worker/omni-executor/Cargo.lock @@ -4724,6 +4724,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/tee-worker/omni-executor/executor-core/src/listener.rs b/tee-worker/omni-executor/executor-core/src/listener.rs index ce343f5d31..68e854b327 100644 --- a/tee-worker/omni-executor/executor-core/src/listener.rs +++ b/tee-worker/omni-executor/executor-core/src/listener.rs @@ -147,8 +147,9 @@ impl< { if checkpoint.lt(&event.get_event_id().clone().into()) { log::info!("Handling event: {:?}", event_id); - if let Err(e) = - self.handle.block_on(self.intent_event_handler.handle(event)) + if let Err(e) = self + .handle + .block_on(self.intent_event_handler.handle(event)) { log::error!("Could not handle event: {:?}", e); match e { @@ -184,9 +185,9 @@ impl< }, Error::RecoverableError => { error!( - "Recoverable intent handling error, event: {:?}", - event_id - ); + "Recoverable intent handling error, event: {:?}", + event_id + ); continue 'main; }, } @@ -206,7 +207,7 @@ impl< Err(e) => { log::error!("Could not get block {} events: {:?}", block_number_to_sync, e); sync_error = true; - } + }, } } else { log::trace!("Block: {} not yet finalized", block_number_to_sync); diff --git a/tee-worker/omni-executor/executor-worker/Cargo.toml b/tee-worker/omni-executor/executor-worker/Cargo.toml index 5ee9b46240..5ea483d852 100644 --- a/tee-worker/omni-executor/executor-worker/Cargo.toml +++ b/tee-worker/omni-executor/executor-worker/Cargo.toml @@ -14,7 +14,7 @@ log = { workspace = true } parentchain-listener = { path = "../parentchain/listener" } scale-encode = { workspace = true } serde_json = "1.0.127" -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } [lints] workspace = true diff --git a/tee-worker/omni-executor/executor-worker/src/cli.rs b/tee-worker/omni-executor/executor-worker/src/cli.rs index ab360db4b9..73e8c27337 100644 --- a/tee-worker/omni-executor/executor-worker/src/cli.rs +++ b/tee-worker/omni-executor/executor-worker/src/cli.rs @@ -5,5 +5,5 @@ use clap::Parser; pub struct Cli { pub parentchain_url: String, pub ethereum_url: String, - pub start_block: u64 + pub start_block: u64, } diff --git a/tee-worker/omni-executor/executor-worker/src/main.rs b/tee-worker/omni-executor/executor-worker/src/main.rs index bf8a0a9ea7..f9742e73b1 100644 --- a/tee-worker/omni-executor/executor-worker/src/main.rs +++ b/tee-worker/omni-executor/executor-worker/src/main.rs @@ -22,6 +22,7 @@ use std::io::Write; use std::thread::JoinHandle; use std::{fs, thread}; use tokio::runtime::Handle; +use tokio::signal; use tokio::sync::oneshot; mod cli; @@ -50,16 +51,23 @@ async fn main() -> Result<(), ()> { listen_to_parentchain(cli.parentchain_url, cli.ethereum_url, cli.start_block) .await - .unwrap() - .join() .unwrap(); + + match signal::ctrl_c().await { + Ok(()) => {}, + Err(err) => { + eprintln!("Unable to listen for shutdown signal: {}", err); + // we also shut down in case of error + }, + } + Ok(()) } async fn listen_to_parentchain( parentchain_url: String, ethereum_url: String, - start_block: u64 + start_block: u64, ) -> Result, ()> { let (_sub_stop_sender, sub_stop_receiver) = oneshot::channel(); let ethereum_intent_executor = diff --git a/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs b/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs index a4f0f91428..a18b3a08fc 100644 --- a/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs +++ b/tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs @@ -16,7 +16,7 @@ use crate::primitives::{BlockEvent, EventId}; use async_trait::async_trait; -use log::{info, error}; +use log::{error, info}; use parity_scale_codec::Encode; use std::marker::PhantomData; use std::ops::Deref; @@ -68,11 +68,9 @@ impl> SubstrateRpcClient Result, ()> { info!("Getting block {} events", block_num); - match self.legacy.chain_get_block_hash(Some(block_num.into())).await - .map_err(|e| - { - error!("Error getting block {} hash: {:?}", block_num, e); - })? { + match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|e| { + error!("Error getting block {} hash: {:?}", block_num, e); + })? { Some(hash) => { let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|e| { error!("Error getting block {} events: {:?}", block_num, e);