Skip to content

Commit

Permalink
feat(middleware): rework middleware, split rpc and http.
Browse files Browse the repository at this point in the history
  • Loading branch information
andysim3d committed Oct 4, 2024
1 parent 58656bb commit 969bfbc
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 98 deletions.
71 changes: 34 additions & 37 deletions crates/provider/src/alloy/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::task::{Context, Poll};

use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{BoxFuture, TransportError};
use alloy_transport::{BoxFuture, HttpError, TransportError, TransportErrorKind};
use futures_util::FutureExt;
use rundler_types::task::{
metric_recorder::MethodSessionLogger,
Expand Down Expand Up @@ -99,34 +99,20 @@ where
method_logger.record_rpc(get_rpc_status_code(resp));
}
Err(e) => match e {
alloy_json_rpc::RpcError::ErrorResp(_) => {
method_logger.record_http(HttpCode::FiveHundreds);
method_logger.record_rpc(RpcCode::ServerError);
alloy_json_rpc::RpcError::ErrorResp(rpc_error) => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(get_rpc_status_from_code(rpc_error.code));
}
alloy_json_rpc::RpcError::NullResp => {
method_logger.record_http(HttpCode::FiveHundreds);
method_logger.record_rpc(RpcCode::InternalError);
}
alloy_json_rpc::RpcError::UnsupportedFeature(_) => {
method_logger.record_http(HttpCode::FourHundreds);
method_logger.record_rpc(RpcCode::MethodNotFound);
}
alloy_json_rpc::RpcError::LocalUsageError(_) => {
method_logger.record_http(HttpCode::FourHundreds);
method_logger.record_rpc(RpcCode::InvalidRequest);
}
alloy_json_rpc::RpcError::SerError(_) => {
method_logger.record_http(HttpCode::FiveHundreds);
method_logger.record_rpc(RpcCode::InternalError);
alloy_json_rpc::RpcError::Transport(TransportErrorKind::HttpError(
HttpError { status, body: _ },
)) => {
method_logger.record_http(get_http_status_from_code(*status));
}
alloy_json_rpc::RpcError::DeserError { .. } => {
method_logger.record_http(HttpCode::FourHundreds);
method_logger.record_rpc(RpcCode::ParseError);
}
alloy_json_rpc::RpcError::Transport(transport_error) => {
method_logger.record_http(HttpCode::FiveHundreds);
method_logger.record_rpc(RpcCode::ServerError);
alloy_json_rpc::RpcError::NullResp => {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(RpcCode::Success);
}
_ => {}
},
}
response
Expand All @@ -146,6 +132,27 @@ fn get_method_name(req: &RequestPacket) -> String {
}
}

fn get_rpc_status_from_code(code: i64) -> RpcCode {
match code {
-32700 => RpcCode::ParseError,
-32600 => RpcCode::InvalidRequest,
-32601 => RpcCode::MethodNotFound,
-32602 => RpcCode::InvalidParams,
-32603 => RpcCode::InternalError,
x if (-32099..=-32000).contains(&x) => RpcCode::ServerError,
_ => RpcCode::Other,
}
}

fn get_http_status_from_code(code: u16) -> HttpCode {
match code {
x if (200..=299).contains(&x) => HttpCode::TwoHundreds,
x if (400..=499).contains(&x) => HttpCode::FourHundreds,
x if (500..=599).contains(&x) => HttpCode::FiveHundreds,
_ => HttpCode::Other,
}
}

fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode {
let response: &alloy_json_rpc::Response = match response_packet {
ResponsePacket::Batch(resps) => &resps[0],
Expand All @@ -155,15 +162,5 @@ fn get_rpc_status_code(response_packet: &ResponsePacket) -> RpcCode {
alloy_json_rpc::ResponsePayload::Success(_) => 0,
alloy_json_rpc::ResponsePayload::Failure(error_payload) => error_payload.code,
};

let rpc_code = match response_code {
-32700 => RpcCode::ParseError,
-32600 => RpcCode::InvalidRequest,
-32601 => RpcCode::MethodNotFound,
-32602 => RpcCode::InvalidParams,
-32603 => RpcCode::InternalError,
x if (-32000..=-32099).contains(&x) => RpcCode::ServerError,
_ => RpcCode::Other,
};
rpc_code
get_rpc_status_from_code(response_code)
}
13 changes: 10 additions & 3 deletions crates/provider/src/alloy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use alloy_provider::{Provider as AlloyProvider, ProviderBuilder};
use alloy_rpc_client::ClientBuilder;
use alloy_transport::layers::RetryBackoffService;
use alloy_transport_http::Http;
use anyhow::Context;
use evm::AlloyEvmProvider;
Expand All @@ -35,10 +36,16 @@ pub fn new_alloy_evm_provider(rpc_url: &str) -> anyhow::Result<impl EvmProvider
/// Create a new alloy provider from a given RPC URL
pub fn new_alloy_provider(
rpc_url: &str,
) -> anyhow::Result<impl AlloyProvider<AlloyMetricMiddleware<Http<Client>>> + Clone> {
) -> anyhow::Result<
impl AlloyProvider<AlloyMetricMiddleware<RetryBackoffService<Http<Client>>>> + Clone,
> {
let url = Url::parse(rpc_url).context("invalid rpc url")?;
let metric_layer = AlloyMetricLayer::new();
let client = ClientBuilder::default().layer(metric_layer).http(url);
let metric_layer = AlloyMetricLayer::default();
let retry_layer = alloy_transport::layers::RetryBackoffLayer::new(10, 500, 0);
let client = ClientBuilder::default()
.layer(metric_layer)
.layer(retry_layer)
.http(url);
let provider = ProviderBuilder::new().on_client(client);
Ok(provider)
}
1 change: 1 addition & 0 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde.workspace = true
strum.workspace = true
url.workspace = true
futures-util.workspace = true
http = "1.1.0"

[dev-dependencies]
mockall.workspace = true
Expand Down
126 changes: 90 additions & 36 deletions crates/rpc/src/rpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
// If not, see https://www.gnu.org/licenses/.

use futures_util::{future::BoxFuture, FutureExt};
use http::{Request as httpRequest, Response as httpResponse};
use jsonrpsee::{
server::middleware::rpc::RpcServiceT,
types::{ErrorCode, Request},
MethodResponse,
};
use rundler_types::task::{
metric_recorder::MethodSessionLogger,
status_code::{HttpCode, RpcCode},
status_code::{get_http_status_from_code, HttpCode, RpcCode},
};
use tower::Layer;
use tower::{Layer, Service};

#[derive(Clone)]
pub(crate) struct RpcMetricsMiddlewareLayer {
Expand All @@ -30,9 +31,7 @@ pub(crate) struct RpcMetricsMiddlewareLayer {

impl RpcMetricsMiddlewareLayer {
pub(crate) fn new(service_name: String) -> Self {
Self {
service_name: service_name,
}
Self { service_name }
}
}

Expand All @@ -41,7 +40,7 @@ impl<S> Layer<S> for RpcMetricsMiddlewareLayer {

fn layer(&self, service: S) -> Self::Service {
RpcMetricsMiddleware {
service: service,
service,
service_name: self.service_name.clone(),
}
}
Expand Down Expand Up @@ -76,41 +75,96 @@ where
if rp.is_success() {
method_logger.record_http(HttpCode::TwoHundreds);
method_logger.record_rpc(RpcCode::Success);
} else if let Some(error) = rp.as_error_code() {
let error_code: ErrorCode = error.into();
let rpc_code = match error_code {
ErrorCode::ParseError => RpcCode::ParseError,
ErrorCode::OversizedRequest => RpcCode::InvalidRequest,
ErrorCode::InvalidRequest => RpcCode::InvalidRequest,
ErrorCode::MethodNotFound => RpcCode::MethodNotFound,
ErrorCode::ServerIsBusy => RpcCode::ResourceExhausted,
ErrorCode::InvalidParams => RpcCode::InvalidParams,
ErrorCode::InternalError => RpcCode::InternalError,
ErrorCode::ServerError(_) => RpcCode::ServerError,
};
method_logger.record_rpc(rpc_code);
} else {
if let Some(error) = rp.as_error_code() {
let error_code: ErrorCode = error.into();
let (http_code, rpc_code) = match error_code {
ErrorCode::ParseError => (HttpCode::FourHundreds, RpcCode::ParseError),
ErrorCode::OversizedRequest => {
(HttpCode::FourHundreds, RpcCode::InvalidRequest)
}
ErrorCode::InvalidRequest => {
(HttpCode::FourHundreds, RpcCode::InvalidRequest)
}
ErrorCode::MethodNotFound => {
(HttpCode::FourHundreds, RpcCode::MethodNotFound)
}
ErrorCode::ServerIsBusy => {
(HttpCode::FiveHundreds, RpcCode::ResourceExhausted)
}
ErrorCode::InvalidParams => {
(HttpCode::FourHundreds, RpcCode::InvalidParams)
}
ErrorCode::InternalError => {
(HttpCode::FiveHundreds, RpcCode::InternalError)
}
ErrorCode::ServerError(_) => (HttpCode::FiveHundreds, RpcCode::ServerError),
};
method_logger.record_http(http_code);
method_logger.record_rpc(rpc_code);
} else {
method_logger.record_http(HttpCode::FiveHundreds);
method_logger.record_rpc(RpcCode::Other);
}
method_logger.record_rpc(RpcCode::Other);
}

rp
}
.boxed()
}
}

#[derive(Clone)]
pub(crate) struct HttpMetricMiddlewareLayer {
service_name: String,
}

impl HttpMetricMiddlewareLayer {
pub(crate) fn new(service_name: String) -> Self {
Self { service_name }
}
}

impl<S> Layer<S> for HttpMetricMiddlewareLayer {
type Service = HttpMetricMiddleware<S>;

fn layer(&self, service: S) -> Self::Service {
HttpMetricMiddleware {
service,
service_name: self.service_name.clone(),
}
}
}

#[derive(Clone)]
pub(crate) struct HttpMetricMiddleware<S> {
service: S,
service_name: String,
}

impl<S, R, ResBody> Service<httpRequest<R>> for HttpMetricMiddleware<S>
where
S: Service<httpRequest<R>, Response = httpResponse<ResBody>> + Send + Sync + Clone + 'static,
R: 'static,
S::Future: Send,
R: Send,
{
type Response = S::Response;

type Error = S::Error;

type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, req: httpRequest<R>) -> Self::Future {
let uri = req.uri().clone();
let method_name = uri.path().split('/').last().unwrap_or("unknown");
let mut method_logger = MethodSessionLogger::new(
self.service_name.clone(),
method_name.to_string(),
"http".to_string(),
);
method_logger.start();
let mut svc = self.service.clone();
async move {
let rp = svc.call(req).await;
method_logger.done();
let http_status = rp.as_ref().ok().map(|rp| rp.status());
if let Some(status_code) = http_status {
method_logger.record_http(get_http_status_from_code(status_code.as_u16()));
}
rp
}
.boxed()
}
}
9 changes: 6 additions & 3 deletions crates/rpc/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::{
EthApiSettings, UserOperationEventProviderV0_6, UserOperationEventProviderV0_7,
},
health::{HealthChecker, SystemApiServer},
rpc_metrics::RpcMetricsMiddlewareLayer,
rpc_metrics::{HttpMetricMiddlewareLayer, RpcMetricsMiddlewareLayer},
rundler::{RundlerApi, RundlerApiServer, Settings as RundlerApiSettings},
types::ApiNamespace,
};
Expand Down Expand Up @@ -204,10 +204,13 @@ where
let http_middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.timeout(self.args.rpc_timeout);
.timeout(self.args.rpc_timeout)
.layer(HttpMetricMiddlewareLayer::new(
"rundler-rpc-service-http".to_string(),
));

let rpc_metric_middleware = RpcServiceBuilder::new().layer(RpcMetricsMiddlewareLayer::new(
"rundler-eth-service".to_string(),
"rundler-rpc-service".to_string(),
));

let server = ServerBuilder::default()
Expand Down
29 changes: 14 additions & 15 deletions crates/task/src/grpc/grpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use std::{
};

use pin_project::pin_project;
use rundler_types::task::{metric_recorder::MethodSessionLogger, status_code::HttpCode};
use rundler_types::task::{
metric_recorder::MethodSessionLogger, status_code::get_http_status_from_code,
};
use tonic::codegen::http;
use tower::{Layer, Service};

Expand Down Expand Up @@ -59,9 +61,9 @@ impl<S> GrpcMetrics<S> {
}
}

impl<S, Body> Service<http::Request<Body>> for GrpcMetrics<S>
impl<S, Body, ResBody> Service<http::Request<Body>> for GrpcMetrics<S>
where
S: Service<http::Request<Body>> + Sync,
S: Service<http::Request<Body>, Response = http::Response<ResBody>> + Sync,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -84,7 +86,7 @@ where
method_logger.start();
ResponseFuture {
response_future: self.inner.call(request),
method_logger: method_logger,
method_logger,
}
}
}
Expand All @@ -100,26 +102,23 @@ pub struct ResponseFuture<F> {
method_logger: MethodSessionLogger,
}

impl<F, Response, E> Future for ResponseFuture<F>
impl<F, ResBody, Error> Future for ResponseFuture<F>
where
F: Future<Output = Result<Response, E>>,
F: Future<Output = Result<http::Response<ResBody>, Error>>,
{
type Output = Result<Response, E>;
type Output = Result<http::Response<ResBody>, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.response_future.poll(cx);
match &res {
Poll::Ready(response) => {
this.method_logger.done();
match response {
Ok(_) => {
this.method_logger.record_http(HttpCode::TwoHundreds);
}
Err(_) => {
// extract the error message form the error trait
this.method_logger.record_http(HttpCode::FiveHundreds);
}

let http_status = response.as_ref().ok().map(|response| response.status());
if let Some(status_code) = http_status {
this.method_logger
.record_http(get_http_status_from_code(status_code.as_u16()));
}
}
Poll::Pending => {}
Expand Down
Loading

0 comments on commit 969bfbc

Please sign in to comment.