Skip to content

Commit

Permalink
feat: use websocket stream block listener (#371)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko authored Nov 15, 2023
1 parent d951171 commit ff92295
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ tower = "0.4"
ethereum-types = { version = "0.13.1"}
secp256k1 = {version = "0.27", features = ["recovery"]}
tiny-keccak = {version = "1.5"}
ethers = {version = "2.0.4", features = ["legacy", "abigen-online"]}
ethers = {version = "2.0.9", features = ["legacy", "abigen-online"]}

# Log, Tracing & telemetry
opentelemetry = { version = "0.19", features = ["rt-tokio", "metrics"] }
Expand Down
64 changes: 59 additions & 5 deletions crates/topos-sequencer-subnet-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use ethers::{
abi::Token,
core::rand::thread_rng,
middleware::SignerMiddleware,
providers::{Http, Provider, ProviderError, Ws, WsClientError},
providers::{Http, Provider, ProviderError, StreamExt, Ws, WsClientError},
signers::{LocalWallet, Signer, WalletError},
};
use ethers_providers::Middleware;
use ethers_providers::{Middleware, SubscriptionStream};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -57,10 +57,12 @@ pub struct BlockInfo {
pub enum Error {
#[error("new finalized block not available")]
BlockNotAvailable(u64),
#[error("next stream block is not available")]
StreamBlockNotAvailable,
#[error("invalid block number: {0}")]
InvalidBlockNumber(u64),
#[error("data not available")]
DataNotAvailable,
#[error("block number not available")]
BlockNumberNotAvailable,
#[error("failed mutable cast")]
MutableCastFailed,
#[error("json error: {source}")]
Expand Down Expand Up @@ -127,6 +129,15 @@ impl SubnetClientListener {
Ok(SubnetClientListener { contract, provider })
}

pub async fn new_block_subscription_stream(
&self,
) -> Result<SubscriptionStream<Ws, ethers::types::Block<ethers::types::H256>>, Error> {
self.provider
.subscribe_blocks()
.await
.map_err(Error::EthersProviderError)
}

/// Subscribe and listen to runtime finalized blocks
pub async fn get_finalized_block(
&mut self,
Expand Down Expand Up @@ -160,7 +171,11 @@ impl SubnetClientListener {
Ok(events) => events,
Err(Error::EventDecodingError(e)) => {
// FIXME: Happens in block before subnet contract is deployed, seems like bug in ethers
error!("Unable to parse events from block {}: {e}", block_number);
error!(
"Error decoding events from block {}: {e} \nTopos smart contracts may not be \
deployed?",
block_number
);
Vec::new()
}
Err(e) => {
Expand Down Expand Up @@ -194,6 +209,45 @@ impl SubnetClientListener {
.map(|block_number| block_number.as_u64())
.map_err(Error::EthersProviderError)
}

pub async fn wait_for_new_block(
&self,
stream: &mut SubscriptionStream<'_, Ws, ethers::types::Block<ethers::types::H256>>,
) -> Result<BlockInfo, Error> {
if let Some(block) = stream.next().await {
let block_number = block.number.ok_or(Error::BlockNumberNotAvailable)?;
let events = match get_block_events(&self.contract, block_number).await {
Ok(events) => events,
Err(Error::EventDecodingError(e)) => {
// FIXME: Happens in block before subnet contract is deployed, seems like bug in ethers
error!(
"Error decoding events from block {}: {e} \nTopos smart contracts may not \
be deployed?",
block_number
);
Vec::new()
}
Err(e) => {
error!("Unable to parse events from block {}: {e}", block_number);
return Err(e);
}
};

// Make block info result from all collected info
let block_info = BlockInfo {
hash: block.hash.unwrap_or_default().to_string(),
parent_hash: block.parent_hash.to_string(),
number: block_number.to_owned().as_u64(),
state_root: block.state_root.0,
tx_root_hash: block.transactions_root.0,
receipts_root_hash: block.receipts_root.0,
events,
};
Ok(block_info)
} else {
Err(Error::StreamBlockNotAvailable)
}
}
}

/// Create subnet client listener and open connection to the subnet
Expand Down
79 changes: 56 additions & 23 deletions crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{self, Duration};
use tokio::time::Duration;
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_core::uci::{Certificate, CertificateId, SubnetId};
use topos_sequencer_subnet_client::{self, SubnetClient, SubnetClientListener};
use topos_sequencer_subnet_client::{self, BlockInfo, SubnetClient, SubnetClientListener};
use tracing::{debug, error, field, info, info_span, instrument, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Arbitrary tick duration for fetching new finalized blocks
const SUBNET_BLOCK_TIME: Duration = Duration::new(2, 0);

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Authorities {
// TODO: proper dependencies to block type etc
Expand Down Expand Up @@ -205,7 +202,7 @@ impl SubnetRuntimeProxy {
while latest_acquired_subnet_block_number < current_subnet_block_number {
let next_block_number = latest_acquired_subnet_block_number + 1;
info!("Retrieving historical block {}", next_block_number);
if let Err(e) = SubnetRuntimeProxy::process_next_block(
if let Err(e) = SubnetRuntimeProxy::retrieve_and_process_block(
runtime_proxy.clone(),
&mut subnet_listener,
certification.clone(),
Expand All @@ -223,30 +220,44 @@ impl SubnetRuntimeProxy {
}
}

// Create a new subscription stream to listen for new blocks from subnet node
let mut subscription_stream =
match subnet_listener.new_block_subscription_stream().await {
Ok(stream) => stream,
Err(e) => {
panic!(
"Failed to open subnet node block subscription stream, unable to \
proceed with certificate generation: {e}"
)
}
};

info!("Block subscription stream opened, listening for new blocks...");

// Go to standard mode of listening for new blocks
let mut interval = time::interval(SUBNET_BLOCK_TIME);
let shutdowned: Option<oneshot::Sender<()>> = loop {
tokio::select! {
_ = interval.tick() => {
let next_block_number = latest_acquired_subnet_block_number + 1;
if let Err(e) = SubnetRuntimeProxy::process_next_block(
runtime_proxy.clone(),
&mut subnet_listener,
certification.clone(),
next_block_number as u64
).await {
match e {
Error::SubnetError { source: topos_sequencer_subnet_client::Error::BlockNotAvailable(_) } => {
continue;
}
_ => {
result = subnet_listener.wait_for_new_block(&mut subscription_stream) => {
match result {
Ok(block) => {
let new_block_number = block.number as i128;
info!("Successfully received new block {} from the subnet subscription", new_block_number);
if let Err(e) = SubnetRuntimeProxy::process_block(
runtime_proxy.clone(),
certification.clone(),
block
).await {
error!("Failed to process next block: {}", e);
break None;
}
}
Err(e) => {
error!("Failed to retrieve next block: {}, trying again soon", e);
tokio::time::sleep(Duration::from_millis(1000)).await;
continue;
}
}
latest_acquired_subnet_block_number = next_block_number;
},
}
shutdown = block_task_shutdown.recv() => {
break shutdown;
}
Expand Down Expand Up @@ -303,7 +314,7 @@ impl SubnetRuntimeProxy {
Ok(runtime_proxy)
}

async fn process_next_block(
async fn retrieve_and_process_block(
subnet_runtime_proxy: Arc<Mutex<SubnetRuntimeProxy>>,
subnet_listener: &mut SubnetClientListener,
certification: Arc<Mutex<Certification>>,
Expand Down Expand Up @@ -352,6 +363,28 @@ impl SubnetRuntimeProxy {
}
}

async fn process_block(
subnet_runtime_proxy: Arc<Mutex<SubnetRuntimeProxy>>,
certification: Arc<Mutex<Certification>>,
block_info: BlockInfo,
) -> Result<(), Error> {
let mut certification = certification.lock().await;
let block_number = block_info.number;

// Update certificate block history
certification.append_blocks(vec![block_info]);

let new_certificates = certification.generate_certificates().await?;

debug!("Generated new certificates {new_certificates:?}");

for cert in new_certificates {
Self::send_new_certificate(subnet_runtime_proxy.clone(), cert, block_number).await
}
info!("Block {} processed", block_number);
Ok(())
}

/// Dispatch newly generated certificate to TCE client
#[instrument(name = "NewCertificate", fields(certification = field::Empty, source_subnet_id = field::Empty, certificate_id = field::Empty))]
async fn send_new_certificate(
Expand Down

0 comments on commit ff92295

Please sign in to comment.