From 969bfbce5d972f1b73d5b8586a639990469c12e7 Mon Sep 17 00:00:00 2001 From: "Pengfei(Andy) Zhang" Date: Fri, 4 Oct 2024 17:07:26 -0400 Subject: [PATCH] feat(middleware): rework middleware, split rpc and http. --- crates/provider/src/alloy/metrics.rs | 71 ++++++------- crates/provider/src/alloy/mod.rs | 13 ++- crates/rpc/Cargo.toml | 1 + crates/rpc/src/rpc_metrics.rs | 126 ++++++++++++++++------- crates/rpc/src/task.rs | 9 +- crates/task/src/grpc/grpc_metrics.rs | 29 +++--- crates/types/src/task/metric_recorder.rs | 8 +- crates/types/src/task/status_code.rs | 10 ++ 8 files changed, 169 insertions(+), 98 deletions(-) diff --git a/crates/provider/src/alloy/metrics.rs b/crates/provider/src/alloy/metrics.rs index e7432ead9..c04c91c59 100644 --- a/crates/provider/src/alloy/metrics.rs +++ b/crates/provider/src/alloy/metrics.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll}; use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use alloy_transport::{BoxFuture, TransportError}; +use alloy_transport::{BoxFuture, HttpError, TransportError, TransportErrorKind}; use futures_util::FutureExt; use rundler_types::task::{ metric_recorder::MethodSessionLogger, @@ -99,34 +99,20 @@ where method_logger.record_rpc(get_rpc_status_code(resp)); } Err(e) => match e { - alloy_json_rpc::RpcError::ErrorResp(_) => { - method_logger.record_http(HttpCode::FiveHundreds); - method_logger.record_rpc(RpcCode::ServerError); + alloy_json_rpc::RpcError::ErrorResp(rpc_error) => { + method_logger.record_http(HttpCode::TwoHundreds); + method_logger.record_rpc(get_rpc_status_from_code(rpc_error.code)); } - alloy_json_rpc::RpcError::NullResp => { - method_logger.record_http(HttpCode::FiveHundreds); - method_logger.record_rpc(RpcCode::InternalError); - } - alloy_json_rpc::RpcError::UnsupportedFeature(_) => { - method_logger.record_http(HttpCode::FourHundreds); - method_logger.record_rpc(RpcCode::MethodNotFound); - } - alloy_json_rpc::RpcError::LocalUsageError(_) => { - method_logger.record_http(HttpCode::FourHundreds); - method_logger.record_rpc(RpcCode::InvalidRequest); - } - alloy_json_rpc::RpcError::SerError(_) => { - method_logger.record_http(HttpCode::FiveHundreds); - method_logger.record_rpc(RpcCode::InternalError); + alloy_json_rpc::RpcError::Transport(TransportErrorKind::HttpError( + HttpError { status, body: _ }, + )) => { + method_logger.record_http(get_http_status_from_code(*status)); } - alloy_json_rpc::RpcError::DeserError { .. } => { - method_logger.record_http(HttpCode::FourHundreds); - method_logger.record_rpc(RpcCode::ParseError); - } - alloy_json_rpc::RpcError::Transport(transport_error) => { - method_logger.record_http(HttpCode::FiveHundreds); - method_logger.record_rpc(RpcCode::ServerError); + alloy_json_rpc::RpcError::NullResp => { + method_logger.record_http(HttpCode::TwoHundreds); + method_logger.record_rpc(RpcCode::Success); } + _ => {} }, } response @@ -146,6 +132,27 @@ fn get_method_name(req: &RequestPacket) -> String { } } +fn get_rpc_status_from_code(code: i64) -> RpcCode { + match code { + -32700 => RpcCode::ParseError, + -32600 => RpcCode::InvalidRequest, + -32601 => RpcCode::MethodNotFound, + -32602 => RpcCode::InvalidParams, + -32603 => RpcCode::InternalError, + x if (-32099..=-32000).contains(&x) => RpcCode::ServerError, + _ => RpcCode::Other, + } +} + +fn get_http_status_from_code(code: u16) -> HttpCode { + match code { + x if (200..=299).contains(&x) => HttpCode::TwoHundreds, + x if (400..=499).contains(&x) => HttpCode::FourHundreds, + x if (500..=599).contains(&x) => HttpCode::FiveHundreds, + _ => HttpCode::Other, + } +} + fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode { let response: &alloy_json_rpc::Response = match response_packet { ResponsePacket::Batch(resps) => &resps[0], @@ -155,15 +162,5 @@ fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode { alloy_json_rpc::ResponsePayload::Success(_) => 0, alloy_json_rpc::ResponsePayload::Failure(error_payload) => error_payload.code, }; - - let rpc_code = match response_code { - -32700 => RpcCode::ParseError, - -32600 => RpcCode::InvalidRequest, - -32601 => RpcCode::MethodNotFound, - -32602 => RpcCode::InvalidParams, - -32603 => RpcCode::InternalError, - x if (-32000..=-32099).contains(&x) => RpcCode::ServerError, - _ => RpcCode::Other, - }; - rpc_code + get_rpc_status_from_code(response_code) } diff --git a/crates/provider/src/alloy/mod.rs b/crates/provider/src/alloy/mod.rs index 05d798c41..ccfea9e77 100644 --- a/crates/provider/src/alloy/mod.rs +++ b/crates/provider/src/alloy/mod.rs @@ -13,6 +13,7 @@ use alloy_provider::{Provider as AlloyProvider, ProviderBuilder}; use alloy_rpc_client::ClientBuilder; +use alloy_transport::layers::RetryBackoffService; use alloy_transport_http::Http; use anyhow::Context; use evm::AlloyEvmProvider; @@ -35,10 +36,16 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result anyhow::Result>> + Clone> { +) -> anyhow::Result< + impl AlloyProvider>>> + Clone, +> { let url = Url::parse(rpc_url).context("invalid rpc url")?; - let metric_layer = AlloyMetricLayer::new(); - let client = ClientBuilder::default().layer(metric_layer).http(url); + let metric_layer = AlloyMetricLayer::default(); + let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 0); + let client = ClientBuilder::default() + .layer(metric_layer) + .layer(retry_layer) + .http(url); let provider = ProviderBuilder::new().on_client(client); Ok(provider) } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 349799738..7ab597cef 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -32,6 +32,7 @@ serde.workspace = true strum.workspace = true url.workspace = true futures-util.workspace = true +http = "1.1.0" [dev-dependencies] mockall.workspace = true diff --git a/crates/rpc/src/rpc_metrics.rs b/crates/rpc/src/rpc_metrics.rs index cc220b1c3..13f808871 100644 --- a/crates/rpc/src/rpc_metrics.rs +++ b/crates/rpc/src/rpc_metrics.rs @@ -12,6 +12,7 @@ // If not, see https://www.gnu.org/licenses/. use futures_util::{future::BoxFuture, FutureExt}; +use http::{Request as httpRequest, Response as httpResponse}; use jsonrpsee::{ server::middleware::rpc::RpcServiceT, types::{ErrorCode, Request}, @@ -19,9 +20,9 @@ use jsonrpsee::{ }; use rundler_types::task::{ metric_recorder::MethodSessionLogger, - status_code::{HttpCode, RpcCode}, + status_code::{get_http_status_from_code, HttpCode, RpcCode}, }; -use tower::Layer; +use tower::{Layer, Service}; #[derive(Clone)] pub(crate) struct RpcMetricsMiddlewareLayer { @@ -30,9 +31,7 @@ pub(crate) struct RpcMetricsMiddlewareLayer { impl RpcMetricsMiddlewareLayer { pub(crate) fn new(service_name: String) -> Self { - Self { - service_name: service_name, - } + Self { service_name } } } @@ -41,7 +40,7 @@ impl Layer for RpcMetricsMiddlewareLayer { fn layer(&self, service: S) -> Self::Service { RpcMetricsMiddleware { - service: service, + service, service_name: self.service_name.clone(), } } @@ -76,37 +75,21 @@ where if rp.is_success() { method_logger.record_http(HttpCode::TwoHundreds); method_logger.record_rpc(RpcCode::Success); + } else if let Some(error) = rp.as_error_code() { + let error_code: ErrorCode = error.into(); + let rpc_code = match error_code { + ErrorCode::ParseError => RpcCode::ParseError, + ErrorCode::OversizedRequest => RpcCode::InvalidRequest, + ErrorCode::InvalidRequest => RpcCode::InvalidRequest, + ErrorCode::MethodNotFound => RpcCode::MethodNotFound, + ErrorCode::ServerIsBusy => RpcCode::ResourceExhausted, + ErrorCode::InvalidParams => RpcCode::InvalidParams, + ErrorCode::InternalError => RpcCode::InternalError, + ErrorCode::ServerError(_) => RpcCode::ServerError, + }; + method_logger.record_rpc(rpc_code); } else { - if let Some(error) = rp.as_error_code() { - let error_code: ErrorCode = error.into(); - let (http_code, rpc_code) = match error_code { - ErrorCode::ParseError => (HttpCode::FourHundreds, RpcCode::ParseError), - ErrorCode::OversizedRequest => { - (HttpCode::FourHundreds, RpcCode::InvalidRequest) - } - ErrorCode::InvalidRequest => { - (HttpCode::FourHundreds, RpcCode::InvalidRequest) - } - ErrorCode::MethodNotFound => { - (HttpCode::FourHundreds, RpcCode::MethodNotFound) - } - ErrorCode::ServerIsBusy => { - (HttpCode::FiveHundreds, RpcCode::ResourceExhausted) - } - ErrorCode::InvalidParams => { - (HttpCode::FourHundreds, RpcCode::InvalidParams) - } - ErrorCode::InternalError => { - (HttpCode::FiveHundreds, RpcCode::InternalError) - } - ErrorCode::ServerError(_) => (HttpCode::FiveHundreds, RpcCode::ServerError), - }; - method_logger.record_http(http_code); - method_logger.record_rpc(rpc_code); - } else { - method_logger.record_http(HttpCode::FiveHundreds); - method_logger.record_rpc(RpcCode::Other); - } + method_logger.record_rpc(RpcCode::Other); } rp @@ -114,3 +97,74 @@ where .boxed() } } + +#[derive(Clone)] +pub(crate) struct HttpMetricMiddlewareLayer { + service_name: String, +} + +impl HttpMetricMiddlewareLayer { + pub(crate) fn new(service_name: String) -> Self { + Self { service_name } + } +} + +impl Layer for HttpMetricMiddlewareLayer { + type Service = HttpMetricMiddleware; + + fn layer(&self, service: S) -> Self::Service { + HttpMetricMiddleware { + service, + service_name: self.service_name.clone(), + } + } +} + +#[derive(Clone)] +pub(crate) struct HttpMetricMiddleware { + service: S, + service_name: String, +} + +impl Service> for HttpMetricMiddleware +where + S: Service, Response = httpResponse> + Send + Sync + Clone + 'static, + R: 'static, + S::Future: Send, + R: Send, +{ + type Response = S::Response; + + type Error = S::Error; + + type Future = BoxFuture<'static, Result>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: httpRequest) -> Self::Future { + let uri = req.uri().clone(); + let method_name = uri.path().split('/').last().unwrap_or("unknown"); + let mut method_logger = MethodSessionLogger::new( + self.service_name.clone(), + method_name.to_string(), + "http".to_string(), + ); + method_logger.start(); + let mut svc = self.service.clone(); + async move { + let rp = svc.call(req).await; + method_logger.done(); + let http_status = rp.as_ref().ok().map(|rp| rp.status()); + if let Some(status_code) = http_status { + method_logger.record_http(get_http_status_from_code(status_code.as_u16())); + } + rp + } + .boxed() + } +} diff --git a/crates/rpc/src/task.rs b/crates/rpc/src/task.rs index 9384a9520..85e180851 100644 --- a/crates/rpc/src/task.rs +++ b/crates/rpc/src/task.rs @@ -42,7 +42,7 @@ use crate::{ EthApiSettings, UserOperationEventProviderV0_6, UserOperationEventProviderV0_7, }, health::{HealthChecker, SystemApiServer}, - rpc_metrics::RpcMetricsMiddlewareLayer, + rpc_metrics::{HttpMetricMiddlewareLayer, RpcMetricsMiddlewareLayer}, rundler::{RundlerApi, RundlerApiServer, Settings as RundlerApiSettings}, types::ApiNamespace, }; @@ -204,10 +204,13 @@ where let http_middleware = tower::ServiceBuilder::new() // Proxy `GET /health` requests to internal `system_health` method. .layer(ProxyGetRequestLayer::new("/health", "system_health")?) - .timeout(self.args.rpc_timeout); + .timeout(self.args.rpc_timeout) + .layer(HttpMetricMiddlewareLayer::new( + "rundler-rpc-service-http".to_string(), + )); let rpc_metric_middleware = RpcServiceBuilder::new().layer(RpcMetricsMiddlewareLayer::new( - "rundler-eth-service".to_string(), + "rundler-rpc-service".to_string(), )); let server = ServerBuilder::default() diff --git a/crates/task/src/grpc/grpc_metrics.rs b/crates/task/src/grpc/grpc_metrics.rs index 28964d6b0..0bd4d79bb 100644 --- a/crates/task/src/grpc/grpc_metrics.rs +++ b/crates/task/src/grpc/grpc_metrics.rs @@ -20,7 +20,9 @@ use std::{ }; use pin_project::pin_project; -use rundler_types::task::{metric_recorder::MethodSessionLogger, status_code::HttpCode}; +use rundler_types::task::{ + metric_recorder::MethodSessionLogger, status_code::get_http_status_from_code, +}; use tonic::codegen::http; use tower::{Layer, Service}; @@ -59,9 +61,9 @@ impl GrpcMetrics { } } -impl Service> for GrpcMetrics +impl Service> for GrpcMetrics where - S: Service> + Sync, + S: Service, Response = http::Response> + Sync, { type Response = S::Response; type Error = S::Error; @@ -84,7 +86,7 @@ where method_logger.start(); ResponseFuture { response_future: self.inner.call(request), - method_logger: method_logger, + method_logger, } } } @@ -100,11 +102,11 @@ pub struct ResponseFuture { method_logger: MethodSessionLogger, } -impl Future for ResponseFuture +impl Future for ResponseFuture where - F: Future>, + F: Future, Error>>, { - type Output = Result; + type Output = Result, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -112,14 +114,11 @@ where match &res { Poll::Ready(response) => { this.method_logger.done(); - match response { - Ok(_) => { - this.method_logger.record_http(HttpCode::TwoHundreds); - } - Err(_) => { - // extract the error message form the error trait - this.method_logger.record_http(HttpCode::FiveHundreds); - } + + let http_status = response.as_ref().ok().map(|response| response.status()); + if let Some(status_code) = http_status { + this.method_logger + .record_http(get_http_status_from_code(status_code.as_u16())); } } Poll::Pending => {} diff --git a/crates/types/src/task/metric_recorder.rs b/crates/types/src/task/metric_recorder.rs index 455686cbf..dac3d8daf 100644 --- a/crates/types/src/task/metric_recorder.rs +++ b/crates/types/src/task/metric_recorder.rs @@ -29,12 +29,12 @@ impl MethodSessionLogger { pub fn new(service_name: String, method_name: String, protocol: String) -> Self { Self { start_time: Instant::now(), - method_name: method_name, - service_name: service_name, - protocol: protocol, + method_name, + service_name, + protocol, } } - + /// start the session. time will be initialized. pub fn start(&mut self) { self.start_time = Instant::now(); diff --git a/crates/types/src/task/status_code.rs b/crates/types/src/task/status_code.rs index aef93f2a6..610e0ccdd 100644 --- a/crates/types/src/task/status_code.rs +++ b/crates/types/src/task/status_code.rs @@ -52,3 +52,13 @@ pub enum HttpCode { FiveHundreds, Other, } + +/// utility function to conert a http status code to HttpCode object. +pub fn get_http_status_from_code(code: u16) -> HttpCode { + match code { + x if (200..=299).contains(&x) => HttpCode::TwoHundreds, + x if (400..=499).contains(&x) => HttpCode::FourHundreds, + x if (500..=599).contains(&x) => HttpCode::FiveHundreds, + _ => HttpCode::Other, + } +}