Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(middleware): add response extractor. #816

Open
wants to merge 1 commit into
base: feat/v0.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions crates/pool/src/server/remote/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ use alloy_primitives::{Address, B256};
use async_trait::async_trait;
use futures_util::StreamExt;
use rundler_task::{
grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes},
metrics::{MetricsLayer, RequestMethodNameInfo},
grpc::{
grpc_metrics::{HttpMethodExtractor, HttpResponseCodeExtractor},
protos::from_bytes,
},
metrics::MetricsLayer,
};
use rundler_types::{
chain::ChainSpec,
Expand Down Expand Up @@ -81,7 +84,11 @@ pub(crate) async fn spawn_remote_mempool_server(
.set_serving::<OpPoolServer<OpPoolImpl>>()
.await;

let metrics_layer = MetricsLayer::<HttpMethodExtractor, http::Request>::new("op_pool_service".to_string(), "http-grpc".to_string());
let metrics_layer =
MetricsLayer::<HttpMethodExtractor, http::Request, HttpResponseCodeExtractor>::new(
"op_pool_service".to_string(),
"http-grpc".to_string(),
);
let handle = tokio::spawn(async move {
Server::builder()
.layer(metrics_layer)
Expand Down
27 changes: 20 additions & 7 deletions crates/provider/src/traits/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,36 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use alloy_json_rpc::{RequestPacket, ResponsePacket};
/// Method extractor
use rundler_types::task::traits::RequestExtractor;
use alloy_json_rpc::RequestPacket;
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};

#[allow(dead_code)]
#[derive(Clone, Copy)]
struct AlloyMethodExtractor;
pub struct AlloyMethodExtractor;

impl RequestExtractor<RequestPacket> for RPCMethodExtractor {
impl RequestExtractor<RequestPacket> for AlloyMethodExtractor {
fn get_method_name(req: &RequestPacket) -> String {
match req {
RequestPacket::Single(request) => {
request.method().to_string()
}
RequestPacket::Single(request) => request.method().to_string(),
_ => {
// can't extract method name for batch.
"batch".to_string()
}
}
}
}

#[allow(dead_code)]
#[derive(Clone, Copy)]
pub struct AlloyResponseCodeExtractor;

impl ResponseExtractor<ResponsePacket> for AlloyMethodExtractor {
fn get_response_code(response: &ResponsePacket) -> String {
if response.is_error() {
response.as_error().unwrap().code.to_string()
} else {
"200".to_string()
}
}
}
1 change: 1 addition & 0 deletions crates/provider/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! Traits for the provider module.

mod error;
mod metrics;
pub use error::*;

mod entry_point;
Expand Down
20 changes: 17 additions & 3 deletions crates/rpc/src/rpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,28 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use jsonrpsee::types::Request;
use rundler_types::task::traits::RequestExtractor;
use jsonrpsee::{types::Request, MethodResponse};
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};

#[derive(Copy, Clone)]
struct RPCMethodExtractor;

impl RequestExtractor<Request<'static>> for RPCMethodExtractor {
fn get_method_name(req: & Request<'static>) -> String {
fn get_method_name(req: &Request<'static>) -> String {
req.method_name().to_string()
}
}

/// http response extractor.
#[derive(Copy, Clone)]
pub struct RPCResponseCodeExtractor;

impl ResponseExtractor<MethodResponse> for RPCResponseCodeExtractor {
fn get_response_code(response: &MethodResponse) -> String {
if response.is_error() {
response.as_error_code().unwrap().to_string()
} else {
"200".to_string()
}
}
}
12 changes: 8 additions & 4 deletions crates/rpc/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
use anyhow::{bail, Context};
use async_trait::async_trait;
use jsonrpsee::{
server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder}, types::Request, RpcModule
server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder},
types::Request,
RpcModule,
};
use rundler_provider::{EntryPointProvider, Provider};
use rundler_sim::{
Expand All @@ -42,10 +44,9 @@ use crate::{
EthApiSettings, UserOperationEventProviderV0_6, UserOperationEventProviderV0_7,
},
health::{HealthChecker, SystemApiServer},
rpc_metrics,
rpc_metrics::{RPCMethodExtractor, RPCResponseCodeExtractor},
rundler::{RundlerApi, RundlerApiServer, Settings as RundlerApiSettings},
types::ApiNamespace,
rpc_metrics::RPCMethodExtractor,
};

/// RPC server arguments.
Expand Down Expand Up @@ -188,7 +189,10 @@ where
.timeout(self.args.rpc_timeout);

let rpc_metric_middleware =
MetricsLayer::<RPCMethodExtractor, Request>::new("rundler-eth-service".to_string(), "rpc".to_string());
MetricsLayer::<RPCMethodExtractor, Request<'static>, RPCResponseCodeExtractor>::new(
"rundler-eth-service".to_string(),
"rpc".to_string(),
);

let server = ServerBuilder::default()
.set_http_middleware(http_middleware)
Expand Down
14 changes: 12 additions & 2 deletions crates/task/src/grpc/grpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use rundler_types::task::traits::RequestExtractor;
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};
use tonic::codegen::http;

/// http request method extractor.
/// http request method extractor.
#[derive(Copy, Clone)]
struct HttpMethodExtractor;

Expand All @@ -24,3 +24,13 @@ impl<Body> RequestExtractor<http::Request<Body>> for HttpMethodExtractor {
method_name.to_string()
}
}

/// http response extractor.
#[derive(Copy, Clone)]
pub struct HttpResponseCodeExtractor;

impl<B> ResponseExtractor<http::Response<B>> for HttpResponseCodeExtractor {
fn get_response_code(response: &http::Response<B>) -> String {
response.status().to_string()
}
}
60 changes: 45 additions & 15 deletions crates/task/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ use std::{
};

use futures::{future::BoxFuture, FutureExt};
use rundler_types::task::traits::RequestExtractor;
use rundler_types::task::traits::{RequestExtractor, ResponseExtractor};
use tower::{Layer, Service};

/// tower network layer: https://github.com/tower-rs/tower/blob/master/guides/building-a-middleware-from-scratch.md
#[derive(Debug, Clone)]
pub struct MetricsLayer<T, R> {
pub struct MetricsLayer<T, R, RE> {
service_name: String,
protocal: String,
_request_extractor_: PhantomData<T>,
_request_type_: PhantomData<R>,
_response_extractor_: PhantomData<RE>,
}

impl<T, R> MetricsLayer<T, R>
impl<T, R, RE> MetricsLayer<T, R, RE>
where
T: RequestExtractor<R>,
{
Expand All @@ -43,30 +44,36 @@ where
protocal,
_request_extractor_: PhantomData,
_request_type_: PhantomData,
_response_extractor_: PhantomData,
}
}
}

impl<S, T, R> Layer<S> for MetricsLayer<T, R>
impl<S, T, R, RE> Layer<S> for MetricsLayer<T, R, RE>
where
T: RequestExtractor<R>,
{
type Service = MetricsMiddleware<S, T, R>;
type Service = MetricsMiddleware<S, T, R, RE>;
fn layer(&self, service: S) -> Self::Service {
MetricsMiddleware::<S, T, R>::new(service, self.service_name.clone(), self.protocal.clone())
MetricsMiddleware::<S, T, R, RE>::new(
service,
self.service_name.clone(),
self.protocal.clone(),
)
}
}

/// Middleware implementation.
pub struct MetricsMiddleware<S, T, R> {
pub struct MetricsMiddleware<S, T, R, RE> {
inner: S,
service_name: String,
protocal: String,
_request_extractor_: PhantomData<T>,
_request_type_: PhantomData<R>,
_response_extractor_: PhantomData<RE>,
}

impl<S, T, R> MetricsMiddleware<S, T, R>
impl<S, T, R, RE> MetricsMiddleware<S, T, R, RE>
where
T: RequestExtractor<R>,
{
Expand All @@ -78,16 +85,18 @@ where
protocal: protocal,
_request_extractor_: PhantomData,
_request_type_: PhantomData,
_response_extractor_: PhantomData,
}
}
}

impl<S, T, Request> Service<Request> for MetricsMiddleware<S, T, Request>
impl<S, T, Request, RE> Service<Request> for MetricsMiddleware<S, T, Request, RE>
where
S: Service<Request> + Send + Sync + Clone + 'static,
S::Future: Send + Sync + 'static,
T: RequestExtractor<Request> + 'static,
Request: Send + Sync + 'static,
RE: ResponseExtractor<S::Response> + Send + Sync + 'static,
{
type Response = S::Response;
type Error = S::Error;
Expand Down Expand Up @@ -128,12 +137,24 @@ where
service_name.as_str(),
protocal.as_str(),
);
if rsp.is_err() {
MethodMetrics::increment_error_count(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
);

match &rsp {
Ok(response) => {
let response_code = RE::get_response_code(response);
MethodMetrics::increment_response_code(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
response_code.as_str(),
);
}
Err(_) => {
MethodMetrics::increment_error_count(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
);
}
}
rsp
}
Expand Down Expand Up @@ -161,6 +182,15 @@ impl MethodMetrics {
metrics::counter!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocal" => protocal.to_string()).increment(1)
}

fn increment_response_code(
method_name: &str,
service_name: &str,
protocal: &str,
response_code: &str,
) {
metrics::counter!("response_stats", "method_name" => method_name.to_string(), "service_name" => service_name.to_string(), "protocal" => protocal.to_string(), "response_code" => response_code.to_string()).increment(1)
}

fn record_request_latency(
method_name: &str,
service_name: &str,
Expand Down
6 changes: 6 additions & 0 deletions crates/types/src/task/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ pub trait RequestExtractor<R>: Copy + Sync + Send {
/// Get method name.
fn get_method_name(request: &R) -> String;
}

/// Trait to extract response code.
pub trait ResponseExtractor<R>: Copy + Sync + Send {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be Copy? Thats a pretty strict bound to put on a trait

/// Get response code.
fn get_response_code(response: &R) -> String;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the code a string?

}
Loading