From ff92295124821326f4c46a7a845157de9d2d9a77 Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Wed, 15 Nov 2023 14:57:43 +0100 Subject: [PATCH] feat: use websocket stream block listener (#371) --- Cargo.toml | 2 +- .../topos-sequencer-subnet-client/src/lib.rs | 64 +++++++++++++-- .../src/proxy.rs | 79 +++++++++++++------ 3 files changed, 116 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c4b1ba63a..be74fa7a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ tower = "0.4" ethereum-types = { version = "0.13.1"} secp256k1 = {version = "0.27", features = ["recovery"]} tiny-keccak = {version = "1.5"} -ethers = {version = "2.0.4", features = ["legacy", "abigen-online"]} +ethers = {version = "2.0.9", features = ["legacy", "abigen-online"]} # Log, Tracing & telemetry opentelemetry = { version = "0.19", features = ["rt-tokio", "metrics"] } diff --git a/crates/topos-sequencer-subnet-client/src/lib.rs b/crates/topos-sequencer-subnet-client/src/lib.rs index 9762d9ce9..cc02e075a 100644 --- a/crates/topos-sequencer-subnet-client/src/lib.rs +++ b/crates/topos-sequencer-subnet-client/src/lib.rs @@ -9,10 +9,10 @@ use ethers::{ abi::Token, core::rand::thread_rng, middleware::SignerMiddleware, - providers::{Http, Provider, ProviderError, Ws, WsClientError}, + providers::{Http, Provider, ProviderError, StreamExt, Ws, WsClientError}, signers::{LocalWallet, Signer, WalletError}, }; -use ethers_providers::Middleware; +use ethers_providers::{Middleware, SubscriptionStream}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::Duration; @@ -57,10 +57,12 @@ pub struct BlockInfo { pub enum Error { #[error("new finalized block not available")] BlockNotAvailable(u64), + #[error("next stream block is not available")] + StreamBlockNotAvailable, #[error("invalid block number: {0}")] InvalidBlockNumber(u64), - #[error("data not available")] - DataNotAvailable, + #[error("block number not available")] + BlockNumberNotAvailable, #[error("failed mutable cast")] MutableCastFailed, #[error("json error: {source}")] @@ -127,6 +129,15 @@ impl SubnetClientListener { Ok(SubnetClientListener { contract, provider }) } + pub async fn new_block_subscription_stream( + &self, + ) -> Result>, Error> { + self.provider + .subscribe_blocks() + .await + .map_err(Error::EthersProviderError) + } + /// Subscribe and listen to runtime finalized blocks pub async fn get_finalized_block( &mut self, @@ -160,7 +171,11 @@ impl SubnetClientListener { Ok(events) => events, Err(Error::EventDecodingError(e)) => { // FIXME: Happens in block before subnet contract is deployed, seems like bug in ethers - error!("Unable to parse events from block {}: {e}", block_number); + error!( + "Error decoding events from block {}: {e} \nTopos smart contracts may not be \ + deployed?", + block_number + ); Vec::new() } Err(e) => { @@ -194,6 +209,45 @@ impl SubnetClientListener { .map(|block_number| block_number.as_u64()) .map_err(Error::EthersProviderError) } + + pub async fn wait_for_new_block( + &self, + stream: &mut SubscriptionStream<'_, Ws, ethers::types::Block>, + ) -> Result { + if let Some(block) = stream.next().await { + let block_number = block.number.ok_or(Error::BlockNumberNotAvailable)?; + let events = match get_block_events(&self.contract, block_number).await { + Ok(events) => events, + Err(Error::EventDecodingError(e)) => { + // FIXME: Happens in block before subnet contract is deployed, seems like bug in ethers + error!( + "Error decoding events from block {}: {e} \nTopos smart contracts may not \ + be deployed?", + block_number + ); + Vec::new() + } + Err(e) => { + error!("Unable to parse events from block {}: {e}", block_number); + return Err(e); + } + }; + + // Make block info result from all collected info + let block_info = BlockInfo { + hash: block.hash.unwrap_or_default().to_string(), + parent_hash: block.parent_hash.to_string(), + number: block_number.to_owned().as_u64(), + state_root: block.state_root.0, + tx_root_hash: block.transactions_root.0, + receipts_root_hash: block.receipts_root.0, + events, + }; + Ok(block_info) + } else { + Err(Error::StreamBlockNotAvailable) + } + } } /// Create subnet client listener and open connection to the subnet diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index dd48e0878..b36a9dde2 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -8,16 +8,13 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::{mpsc, oneshot}; -use tokio::time::{self, Duration}; +use tokio::time::Duration; use topos_core::api::grpc::checkpoints::TargetStreamPosition; use topos_core::uci::{Certificate, CertificateId, SubnetId}; -use topos_sequencer_subnet_client::{self, SubnetClient, SubnetClientListener}; +use topos_sequencer_subnet_client::{self, BlockInfo, SubnetClient, SubnetClientListener}; use tracing::{debug, error, field, info, info_span, instrument, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -/// Arbitrary tick duration for fetching new finalized blocks -const SUBNET_BLOCK_TIME: Duration = Duration::new(2, 0); - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Authorities { // TODO: proper dependencies to block type etc @@ -205,7 +202,7 @@ impl SubnetRuntimeProxy { while latest_acquired_subnet_block_number < current_subnet_block_number { let next_block_number = latest_acquired_subnet_block_number + 1; info!("Retrieving historical block {}", next_block_number); - if let Err(e) = SubnetRuntimeProxy::process_next_block( + if let Err(e) = SubnetRuntimeProxy::retrieve_and_process_block( runtime_proxy.clone(), &mut subnet_listener, certification.clone(), @@ -223,30 +220,44 @@ impl SubnetRuntimeProxy { } } + // Create a new subscription stream to listen for new blocks from subnet node + let mut subscription_stream = + match subnet_listener.new_block_subscription_stream().await { + Ok(stream) => stream, + Err(e) => { + panic!( + "Failed to open subnet node block subscription stream, unable to \ + proceed with certificate generation: {e}" + ) + } + }; + + info!("Block subscription stream opened, listening for new blocks..."); + // Go to standard mode of listening for new blocks - let mut interval = time::interval(SUBNET_BLOCK_TIME); let shutdowned: Option> = loop { tokio::select! { - _ = interval.tick() => { - let next_block_number = latest_acquired_subnet_block_number + 1; - if let Err(e) = SubnetRuntimeProxy::process_next_block( - runtime_proxy.clone(), - &mut subnet_listener, - certification.clone(), - next_block_number as u64 - ).await { - match e { - Error::SubnetError { source: topos_sequencer_subnet_client::Error::BlockNotAvailable(_) } => { - continue; - } - _ => { + result = subnet_listener.wait_for_new_block(&mut subscription_stream) => { + match result { + Ok(block) => { + let new_block_number = block.number as i128; + info!("Successfully received new block {} from the subnet subscription", new_block_number); + if let Err(e) = SubnetRuntimeProxy::process_block( + runtime_proxy.clone(), + certification.clone(), + block + ).await { error!("Failed to process next block: {}", e); break None; } } + Err(e) => { + error!("Failed to retrieve next block: {}, trying again soon", e); + tokio::time::sleep(Duration::from_millis(1000)).await; + continue; + } } - latest_acquired_subnet_block_number = next_block_number; - }, + } shutdown = block_task_shutdown.recv() => { break shutdown; } @@ -303,7 +314,7 @@ impl SubnetRuntimeProxy { Ok(runtime_proxy) } - async fn process_next_block( + async fn retrieve_and_process_block( subnet_runtime_proxy: Arc>, subnet_listener: &mut SubnetClientListener, certification: Arc>, @@ -352,6 +363,28 @@ impl SubnetRuntimeProxy { } } + async fn process_block( + subnet_runtime_proxy: Arc>, + certification: Arc>, + block_info: BlockInfo, + ) -> Result<(), Error> { + let mut certification = certification.lock().await; + let block_number = block_info.number; + + // Update certificate block history + certification.append_blocks(vec![block_info]); + + let new_certificates = certification.generate_certificates().await?; + + debug!("Generated new certificates {new_certificates:?}"); + + for cert in new_certificates { + Self::send_new_certificate(subnet_runtime_proxy.clone(), cert, block_number).await + } + info!("Block {} processed", block_number); + Ok(()) + } + /// Dispatch newly generated certificate to TCE client #[instrument(name = "NewCertificate", fields(certification = field::Empty, source_subnet_id = field::Empty, certificate_id = field::Empty))] async fn send_new_certificate(