Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to specify start block #3171

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tee-worker/omni-executor/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 54 additions & 45 deletions tee-worker/omni-executor/executor-core/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ impl<
log::debug!("Starting sync from {:?}", block_number_to_sync);

'main: loop {
log::info!("Syncing block: {}", block_number_to_sync);
if self.stop_signal.try_recv().is_ok() {
break;
}
Expand Down Expand Up @@ -135,24 +134,53 @@ impl<
None => false,
};

let mut sync_error = false;

if last_finalized_block >= block_number_to_sync {
if let Ok(events) =
self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync))
{
for event in events {
let event_id = event.get_event_id().clone();
if let Some(ref checkpoint) =
self.checkpoint_repository.get().expect("Could not read checkpoint")
{
if checkpoint.lt(&event.get_event_id().clone().into()) {
log::info!("Syncing block: {}", block_number_to_sync);
match self.handle.block_on(self.fetcher.get_block_events(block_number_to_sync)) {
Ok(events) => {
for event in events {
let event_id = event.get_event_id().clone();
if let Some(ref checkpoint) =
self.checkpoint_repository.get().expect("Could not read checkpoint")
{
if checkpoint.lt(&event.get_event_id().clone().into()) {
log::info!("Handling event: {:?}", event_id);
if let Err(e) = self
.handle
.block_on(self.intent_event_handler.handle(event))
{
log::error!("Could not handle event: {:?}", e);
match e {
Error::NonRecoverableError => {
error!("Non-recoverable intent handling error, event: {:?}", event_id);
break 'main;
},
Error::RecoverableError => {
error!(
"Recoverable intent handling error, event: {:?}",
event_id
);
continue 'main;
},
}
}
} else {
log::debug!("Skipping event");
}
} else {
log::info!("Handling event: {:?}", event_id);
if let Err(e) =
self.handle.block_on(self.intent_event_handler.handle(event))
{
log::error!("Could not handle event: {:?}", e);
match e {
Error::NonRecoverableError => {
error!("Non-recoverable intent handling error, event: {:?}", event_id);
error!(
"Non-recoverable intent handling error, event: {:?}",
event_id
);
break 'main;
},
Error::RecoverableError => {
Expand All @@ -164,47 +192,28 @@ impl<
},
}
}
} else {
log::debug!("Skipping event");
}
} else {
log::info!("Handling event: {:?}", event_id);
if let Err(e) =
self.handle.block_on(self.intent_event_handler.handle(event))
{
log::error!("Could not handle event: {:?}", e);
match e {
Error::NonRecoverableError => {
error!(
"Non-recoverable intent handling error, event: {:?}",
event_id
);
break 'main;
},
Error::RecoverableError => {
error!(
"Recoverable intent handling error, event: {:?}",
event_id
);
continue 'main;
},
}
}
self.checkpoint_repository
.save(event_id.into())
.expect("Could not save checkpoint");
}
// we processed block completely so store new checkpoint
self.checkpoint_repository
.save(event_id.into())
.save(CheckpointT::from(block_number_to_sync))
.expect("Could not save checkpoint");
}
// we processed block completely so store new checkpoint
self.checkpoint_repository
.save(CheckpointT::from(block_number_to_sync))
.expect("Could not save checkpoint");
log::info!("Finished syncing block: {}", block_number_to_sync);
block_number_to_sync += 1;
log::info!("Finished syncing block: {}", block_number_to_sync);
block_number_to_sync += 1;
},
Err(e) => {
log::error!("Could not get block {} events: {:?}", block_number_to_sync, e);
sync_error = true;
},
}
} else {
log::trace!("Block: {} not yet finalized", block_number_to_sync);
}

if !fast {
if !fast || sync_error {
sleep(Duration::from_secs(1))
} else {
log::trace!("Fast sync skipping 1s wait");
Expand Down
2 changes: 1 addition & 1 deletion tee-worker/omni-executor/executor-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ log = { workspace = true }
parentchain-listener = { path = "../parentchain/listener" }
scale-encode = { workspace = true }
serde_json = "1.0.127"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }

[lints]
workspace = true
1 change: 1 addition & 0 deletions tee-worker/omni-executor/executor-worker/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ use clap::Parser;
pub struct Cli {
pub parentchain_url: String,
pub ethereum_url: String,
pub start_block: u64,
}
17 changes: 13 additions & 4 deletions tee-worker/omni-executor/executor-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::io::Write;
use std::thread::JoinHandle;
use std::{fs, thread};
use tokio::runtime::Handle;
use tokio::signal;
use tokio::sync::oneshot;

mod cli;
Expand All @@ -48,17 +49,25 @@ async fn main() -> Result<(), ()> {
error!("Could not create data dir: {:?}", e);
})?;

listen_to_parentchain(cli.parentchain_url, cli.ethereum_url)
listen_to_parentchain(cli.parentchain_url, cli.ethereum_url, cli.start_block)
.await
.unwrap()
.join()
.unwrap();

match signal::ctrl_c().await {
Ok(()) => {},
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
// we also shut down in case of error
},
}

Ok(())
}

async fn listen_to_parentchain(
parentchain_url: String,
ethereum_url: String,
start_block: u64,
) -> Result<JoinHandle<()>, ()> {
let (_sub_stop_sender, sub_stop_receiver) = oneshot::channel();
let ethereum_intent_executor =
Expand All @@ -75,6 +84,6 @@ async fn listen_to_parentchain(

Ok(thread::Builder::new()
.name("litentry_rococo_sync".to_string())
.spawn(move || parentchain_listener.sync(0))
.spawn(move || parentchain_listener.sync(start_block))
.unwrap())
}
11 changes: 8 additions & 3 deletions tee-worker/omni-executor/parentchain/listener/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::primitives::{BlockEvent, EventId};
use async_trait::async_trait;
use log::error;
use log::{error, info};
use parity_scale_codec::Encode;
use std::marker::PhantomData;
use std::ops::Deref;
Expand Down Expand Up @@ -67,9 +67,14 @@ impl<ChainConfig: Config<AccountId = AccountId32>> SubstrateRpcClient<ChainConfi
}
}
async fn get_block_events(&mut self, block_num: u64) -> Result<Vec<BlockEvent>, ()> {
match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|_| ())? {
info!("Getting block {} events", block_num);
match self.legacy.chain_get_block_hash(Some(block_num.into())).await.map_err(|e| {
error!("Error getting block {} hash: {:?}", block_num, e);
})? {
Some(hash) => {
let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|_| ())?;
let events = self.events.at(BlockRef::from_hash(hash)).await.map_err(|e| {
error!("Error getting block {} events: {:?}", block_num, e);
})?;
Ok(events
.iter()
.enumerate()
Expand Down