Skip to content

Commit

Permalink
refactor(starknet_sequencer_infra): remove ComponentError (#4231)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonLStarkWare authored Feb 18, 2025
1 parent 4030eac commit e6d3862
Show file tree
Hide file tree
Showing 15 changed files with 46 additions and 67 deletions.
6 changes: 2 additions & 4 deletions crates/starknet_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use starknet_sequencer_infra::component_definitions::{
default_component_start_fn,
ComponentStarter,
};
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
BATCHED_TRANSACTIONS,
REJECTED_TRANSACTIONS,
Expand Down Expand Up @@ -755,13 +754,12 @@ impl BatcherStorageWriterTrait for papyrus_storage::StorageWriter {

#[async_trait]
impl ComponentStarter for Batcher {
async fn start(&mut self) -> Result<(), ComponentError> {
default_component_start_fn::<Self>().await?;
async fn start(&mut self) {
default_component_start_fn::<Self>().await;
let storage_height = self
.storage_reader
.height()
.expect("Failed to get height from storage during batcher creation.");
register_metrics(storage_height);
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/starknet_batcher/src/batcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async fn create_batcher(mock_dependencies: MockDependencies) -> Batcher {
Box::new(mock_dependencies.block_builder_factory),
);
// Call post-creation functionality (e.g., metrics registration).
batcher.start().await.unwrap();
batcher.start().await;
batcher
}

Expand Down
10 changes: 4 additions & 6 deletions crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use starknet_consensus_orchestrator::cende::CendeAmbassador;
use starknet_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use starknet_infra_utils::type_name::short_type_name;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
CONSENSUS_NUM_CONNECTED_PEERS,
CONSENSUS_NUM_RECEIVED_MESSAGES,
Expand Down Expand Up @@ -183,11 +182,10 @@ pub fn create_consensus_manager(

#[async_trait]
impl ComponentStarter for ConsensusManager {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
info!("Starting component {}.", short_type_name::<Self>());
self.run().await.map_err(|e| {
error!("Error running component ConsensusManager: {:?}", e);
ComponentError::InternalComponentError
})
self.run()
.await
.unwrap_or_else(|e| panic!("Failed to start ConsensusManager component: {:?}", e))
}
}
5 changes: 2 additions & 3 deletions crates/starknet_http_server/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use starknet_gateway_types::communication::SharedGatewayClient;
use starknet_gateway_types::gateway_types::GatewayInput;
use starknet_infra_utils::type_name::short_type_name;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use tracing::{debug, info, instrument, trace};

use crate::config::HttpServerConfig;
Expand Down Expand Up @@ -104,8 +103,8 @@ pub fn create_http_server(

#[async_trait]
impl ComponentStarter for HttpServer {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
info!("Starting component {}.", short_type_name::<Self>());
self.run().await.map_err(|_| ComponentError::InternalComponentError)
self.run().await.unwrap_or_else(|e| panic!("Failed to start HttpServer component: {:?}", e))
}
}
5 changes: 2 additions & 3 deletions crates/starknet_l1_provider/src/l1_scraper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use starknet_l1_provider_types::errors::L1ProviderClientError;
use starknet_l1_provider_types::{Event, SharedL1ProviderClient};
use starknet_sequencer_infra::component_client::ClientError;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{error, info, instrument};
Expand Down Expand Up @@ -215,9 +214,9 @@ impl<B: BaseLayerContract + Send + Sync> L1Scraper<B> {

#[async_trait]
impl<B: BaseLayerContract + Send + Sync> ComponentStarter for L1Scraper<B> {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
info!("Starting component {}.", type_name::<Self>());
self.run().await.map_err(|_| ComponentError::InternalComponentError)
self.run().await.unwrap_or_else(|e| panic!("Failed to start L1Scraper component: {}", e))
}
}

Expand Down
7 changes: 3 additions & 4 deletions crates/starknet_mempool_p2p/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use starknet_gateway_types::errors::GatewayError;
use starknet_gateway_types::gateway_types::GatewayInput;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::component_server::WrapperServer;
use starknet_sequencer_infra::errors::ComponentError;
use tracing::{debug, info, warn};

pub struct MempoolP2pRunner {
Expand All @@ -40,12 +39,12 @@ impl MempoolP2pRunner {

#[async_trait]
impl ComponentStarter for MempoolP2pRunner {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
let mut gateway_futures = FuturesUnordered::new();
loop {
tokio::select! {
result = &mut self.network_future => {
return result.map_err(|_| ComponentError::InternalComponentError);
_ = &mut self.network_future => {
panic!("MempoolP2pRunner failed - network stopped unexpectedly");
}
Some(result) = gateway_futures.next() => {
match result {
Expand Down
14 changes: 8 additions & 6 deletions crates/starknet_mempool_p2p/src/runner/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,22 @@ fn setup(
}

#[test]
fn run_returns_when_network_future_returns() {
#[should_panic]
fn run_panics_when_network_future_returns() {
let network_future = ready(Ok(())).boxed();
let gateway_client = Arc::new(MockGatewayClient::new());
let (mut mempool_p2p_runner, _) = setup(network_future, gateway_client);
mempool_p2p_runner.start().now_or_never().unwrap().unwrap();
mempool_p2p_runner.start().now_or_never().unwrap();
}

#[test]
fn run_returns_error_when_network_future_returns_error() {
#[should_panic]
fn run_panics_when_network_future_returns_error() {
let network_future =
ready(Err(NetworkError::DialError(libp2p::swarm::DialError::Aborted))).boxed();
let gateway_client = Arc::new(MockGatewayClient::new());
let (mut mempool_p2p_runner, _) = setup(network_future, gateway_client);
mempool_p2p_runner.start().now_or_never().unwrap().unwrap_err();
mempool_p2p_runner.start().now_or_never().unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -96,7 +98,7 @@ async fn incoming_p2p_tx_reaches_gateway_client() {
// if the runner fails, there was a network issue => panic.
// if the runner returns successfully, we panic because the runner should never terminate.
res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {
res.expect("Test timed out").expect("MempoolP2pRunner failed - network stopped unexpectedly");
res.expect("Test timed out");
panic!("MempoolP2pRunner terminated");
},
// if a message was received on this oneshot channel, the gateway client received the tx and the test succeeded.
Expand Down Expand Up @@ -144,7 +146,7 @@ async fn incoming_p2p_tx_fails_on_gateway_client() {
// if the runner fails, there was a network issue => panic.
// if the runner returns successfully, we panic because the runner should never terminate.
res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {
res.expect("Test timed out (MempoolP2pRunner took too long to start)").expect("MempoolP2pRunner failed - network stopped unexpectedly");
res.expect("Test timed out (MempoolP2pRunner took too long to start)");
panic!("MempoolP2pRunner terminated");
},
// if a message was received on this oneshot channel, the gateway client received the tx.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use hyper::Error;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use starknet_infra_utils::type_name::short_type_name;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::errors::ComponentError;
use tracing::{info, instrument};

use crate::config::MonitoringEndpointConfig;
Expand Down Expand Up @@ -94,9 +93,9 @@ pub fn create_monitoring_endpoint(

#[async_trait]
impl ComponentStarter for MonitoringEndpoint {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
info!("Starting component {}.", short_type_name::<Self>());
self.run().await.map_err(|_| ComponentError::InternalComponentError)
self.run().await.unwrap_or_else(|e| panic!("Failed to start MointoringEndpoint: {:?}", e));
}
}

Expand Down
7 changes: 2 additions & 5 deletions crates/starknet_sequencer_infra/src/component_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tracing::{error, info};
use validator::Validate;

use crate::component_client::ClientResult;
use crate::errors::ComponentError;

pub const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 32;
Expand All @@ -35,17 +34,15 @@ where
async fn send(&self, request: Request) -> ClientResult<Response>;
}

pub async fn default_component_start_fn<T: ComponentStarter + ?Sized>() -> Result<(), ComponentError>
{
pub async fn default_component_start_fn<T: ComponentStarter + ?Sized>() {
info!("Starting component {} with the default starter.", short_type_name::<T>());
Ok(())
}

// TODO(Lev/Tsabary): Enforce metrics registration at the start of the component to avoid missing
// metrics in the monitoring server.
#[async_trait]
pub trait ComponentStarter {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
default_component_start_fn::<Self>().await
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ impl<Component: Send> WrapperServer<Component> {
impl<Component: ComponentStarter + Send> ComponentServerStarter for WrapperServer<Component> {
async fn start(&mut self) {
info!("Starting WrapperServer for {}.", short_type_name::<Component>());
self.component.start().await.unwrap_or_else(|_| {
panic!("WrapperServer stopped for {}", short_type_name::<Component>())
});
self.component.start().await;
panic!("WrapperServer stopped for {}", short_type_name::<Component>())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ where
{
async fn start(&mut self) {
info!("Starting LocalComponentServer for {}.", short_type_name::<Component>());
self.component.start().await.unwrap_or_else(|_| {
panic!("LocalComponentServer stopped for {}", short_type_name::<Component>())
});
self.component.start().await;
request_response_loop(&mut self.rx, &mut self.component, self.metrics.clone()).await;
panic!("Finished LocalComponentServer for {}.", short_type_name::<Component>());
}
Expand All @@ -75,9 +73,7 @@ where
{
async fn start(&mut self) {
info!("Starting ConcurrentLocalComponentServer for {}.", short_type_name::<Component>());
self.component.start().await.unwrap_or_else(|_| {
panic!("ConcurrentLocalComponentServer stopped for {}", short_type_name::<Component>())
});
self.component.start().await;
concurrent_request_response_loop(
&mut self.rx,
&mut self.component,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use crate::serde_utils::SerdeWrapper;
/// ComponentServerStarter,
/// RemoteComponentServer,
/// };
/// use crate::starknet_sequencer_infra::errors::ComponentError;
///
/// // Define your component
/// struct MyComponent {}
Expand Down
8 changes: 0 additions & 8 deletions crates/starknet_sequencer_infra/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
use thiserror::Error;

#[derive(Error, Debug, PartialEq, Clone)]
pub enum ComponentError {
#[error("Error in the component configuration.")]
ComponentConfigError,
#[error("An internal component error.")]
InternalComponentError,
}

#[derive(Clone, Debug, Error)]
pub enum ReplaceComponentError {
#[error("Internal error.")]
Expand Down
15 changes: 7 additions & 8 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockO
use starknet_client::reader::PendingData;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use starknet_sequencer_infra::component_server::WrapperServer;
use starknet_sequencer_infra::errors::ComponentError;
use starknet_sequencer_metrics::metric_definitions::{
STATE_SYNC_P2P_NUM_ACTIVE_INBOUND_SESSIONS,
STATE_SYNC_P2P_NUM_ACTIVE_OUTBOUND_SESSIONS,
Expand All @@ -58,19 +57,19 @@ pub struct StateSyncRunner {

#[async_trait]
impl ComponentStarter for StateSyncRunner {
async fn start(&mut self) -> Result<(), ComponentError> {
async fn start(&mut self) {
tokio::select! {
result = &mut self.network_future => {
result.map_err(|_| ComponentError::InternalComponentError)
_ = &mut self.network_future => {
panic!("StateSyncRunner failed - network stopped unexpectedly");
}
result = &mut self.p2p_sync_client_future => {
result.map_err(|_| ComponentError::InternalComponentError).map(|_never| ())
_ = &mut self.p2p_sync_client_future => {
panic!("StateSyncRunner failed - p2p sync client stopped unexpectedly");
}
_never = &mut self.p2p_sync_server_future => {
unreachable!("Return type Never should never be constructed")
}
result = &mut self.central_sync_client_future => {
result.map_err(|_| ComponentError::InternalComponentError)
_ = &mut self.central_sync_client_future => {
panic!("StateSyncRunner failed - central sync client stopped unexpectedly");
}
_never = &mut self.new_block_dev_null_future => {
unreachable!("Return type Never should never be constructed")
Expand Down
15 changes: 9 additions & 6 deletions crates/starknet_state_sync/src/runner/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use starknet_sequencer_infra::component_definitions::ComponentStarter;
use super::StateSyncRunner;

#[test]
fn run_returns_when_network_future_returns() {
#[should_panic]
fn run_panics_when_network_future_returns() {
let network_future = ready(Ok(())).boxed();
let p2p_sync_client_future = pending().boxed();
let p2p_sync_server_future = pending().boxed();
Expand All @@ -20,11 +21,12 @@ fn run_returns_when_network_future_returns() {
central_sync_client_future,
new_block_dev_null_future,
};
state_sync_runner.start().now_or_never().unwrap().unwrap();
state_sync_runner.start().now_or_never().unwrap();
}

#[test]
fn run_returns_error_when_network_future_returns_error() {
#[should_panic]
fn run_panics_when_network_future_returns_error() {
let network_future =
ready(Err(NetworkError::DialError(libp2p::swarm::DialError::Aborted))).boxed();
let p2p_sync_client_future = pending().boxed();
Expand All @@ -38,11 +40,12 @@ fn run_returns_error_when_network_future_returns_error() {
central_sync_client_future,
new_block_dev_null_future,
};
state_sync_runner.start().now_or_never().unwrap().unwrap_err();
state_sync_runner.start().now_or_never().unwrap();
}

#[test]
fn run_returns_error_when_sync_client_future_returns_error() {
#[should_panic]
fn run_panics_when_sync_client_future_returns_error() {
let network_future = pending().boxed();
let p2p_sync_client_future = ready(Err(P2pSyncClientError::TooManyResponses)).boxed();
let p2p_sync_server_future = pending().boxed();
Expand All @@ -55,5 +58,5 @@ fn run_returns_error_when_sync_client_future_returns_error() {
central_sync_client_future,
new_block_dev_null_future,
};
state_sync_runner.start().now_or_never().unwrap().unwrap_err();
state_sync_runner.start().now_or_never().unwrap();
}

0 comments on commit e6d3862

Please sign in to comment.