Skip to content

Commit

Permalink
fix(middleware): change mod visibility to pass build.
Browse files Browse the repository at this point in the history
fix(middleware): customize clone implemention.

fix: cleanup unsed Clone and Copy trait.
  • Loading branch information
andysim3d committed Sep 27, 2024
1 parent 83ba4ef commit 9e97977
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tokio-util.workspace = true
tonic.workspace = true
tonic-health.workspace = true
tonic-reflection.workspace = true
tower.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
8 changes: 6 additions & 2 deletions crates/pool/src/server/remote/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use async_trait::async_trait;
use futures_util::StreamExt;
use rundler_task::{
grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes},
metrics::{MetricsLayer, RequestMethodNameInfo},
metrics::MetricsLayer,
};
use rundler_types::{
chain::ChainSpec,
Expand Down Expand Up @@ -81,7 +81,11 @@ pub(crate) async fn spawn_remote_mempool_server(
.set_serving::<OpPoolServer<OpPoolImpl>>()
.await;

let metrics_layer = MetricsLayer::<HttpMethodExtractor, http::Request>::new("op_pool_service".to_string(), "http-grpc".to_string());
let metrics_layer = MetricsLayer::<HttpMethodExtractor, _>::new(
"op_pool_service".to_string(),
"http-grpc".to_string(),
);

let handle = tokio::spawn(async move {
Server::builder()
.layer(metrics_layer)
Expand Down
2 changes: 1 addition & 1 deletion crates/provider/src/traits/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use rundler_types::task::traits::RequestExtractor;
use alloy_json_rpc::RequestPacket;

#[derive(Clone, Copy)]
struct AlloyMethodExtractor;
pub struct AlloyMethodExtractor;

impl RequestExtractor<RequestPacket> for RPCMethodExtractor {
fn get_method_name(req: &RequestPacket) -> String {
Expand Down
5 changes: 2 additions & 3 deletions crates/rpc/src/rpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
use jsonrpsee::types::Request;
use rundler_types::task::traits::RequestExtractor;

#[derive(Copy, Clone)]
struct RPCMethodExtractor;
pub struct RPCMethodExtractor;

impl RequestExtractor<Request<'static>> for RPCMethodExtractor {
fn get_method_name(req: & Request<'static>) -> String {
fn get_method_name(req: &Request<'static>) -> String {
req.method_name().to_string()
}
}
13 changes: 8 additions & 5 deletions crates/rpc/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
use anyhow::{bail, Context};
use async_trait::async_trait;
use jsonrpsee::{
server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder}, types::Request, RpcModule
server::{middleware::http::ProxyGetRequestLayer, RpcServiceBuilder, ServerBuilder},
types::Request,
RpcModule,
};
use rundler_provider::{EntryPointProvider, Provider};
use rundler_sim::{
Expand All @@ -42,10 +44,9 @@ use crate::{
EthApiSettings, UserOperationEventProviderV0_6, UserOperationEventProviderV0_7,
},
health::{HealthChecker, SystemApiServer},
rpc_metrics,
rpc_metrics::RPCMethodExtractor,
rundler::{RundlerApi, RundlerApiServer, Settings as RundlerApiSettings},
types::ApiNamespace,
rpc_metrics::RPCMethodExtractor,
};

/// RPC server arguments.
Expand Down Expand Up @@ -187,8 +188,10 @@ where
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.timeout(self.args.rpc_timeout);

let rpc_metric_middleware =
MetricsLayer::<RPCMethodExtractor, Request>::new("rundler-eth-service".to_string(), "rpc".to_string());
let rpc_metric_middleware = MetricsLayer::<RPCMethodExtractor, Request<'static>>::new(
"rundler-eth-service".to_string(),
"rpc".to_string(),
);

let server = ServerBuilder::default()
.set_http_middleware(http_middleware)
Expand Down
5 changes: 2 additions & 3 deletions crates/task/src/grpc/grpc_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
use rundler_types::task::traits::RequestExtractor;
use tonic::codegen::http;

/// http request method extractor.
#[derive(Copy, Clone)]
struct HttpMethodExtractor;
/// http request method extractor.
pub struct HttpMethodExtractor;

impl<Body> RequestExtractor<http::Request<Body>> for HttpMethodExtractor {
fn get_method_name(req: &http::Request<Body>) -> String {
Expand Down
1 change: 1 addition & 0 deletions crates/task/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

//! Utilities for working with gRPC

/// grpc method extractor implmentation.
pub mod grpc_metrics;
#[allow(non_snake_case)]
pub mod protos;
100 changes: 58 additions & 42 deletions crates/task/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use rundler_types::task::traits::RequestExtractor;
use tower::{Layer, Service};

/// tower network layer: https://github.com/tower-rs/tower/blob/master/guides/building-a-middleware-from-scratch.md
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct MetricsLayer<T, R> {
service_name: String,
protocal: String,
_request_extractor_: PhantomData<T>,
_request_type_: PhantomData<R>,
_request_extractor: PhantomData<T>,
_request_type: PhantomData<R>,
}

impl<T, R> MetricsLayer<T, R>
Expand All @@ -41,8 +41,22 @@ where
MetricsLayer {
service_name,
protocal,
_request_extractor_: PhantomData,
_request_type_: PhantomData,
_request_extractor: PhantomData,
_request_type: PhantomData,
}
}
}

impl<T, R> Clone for MetricsLayer<T, R>
where
T: RequestExtractor<R>,
{
fn clone(&self) -> Self {
Self {
service_name: self.service_name.clone(),
protocal: self.protocal.clone(),
_request_extractor: PhantomData,
_request_type: PhantomData,
}
}
}
Expand All @@ -52,42 +66,58 @@ where
T: RequestExtractor<R>,
{
type Service = MetricsMiddleware<S, T, R>;

fn layer(&self, service: S) -> Self::Service {
MetricsMiddleware::<S, T, R>::new(service, self.service_name.clone(), self.protocal.clone())
Self::Service::new(service, self.service_name.clone(), self.protocal.clone())
}
}

/// Middleware implementation.
pub struct MetricsMiddleware<S, T, R> {
inner: S,
service_name: String,
protocal: String,
_request_extractor_: PhantomData<T>,
_request_type_: PhantomData<R>,
protocol: String,
_request_extractor: PhantomData<T>,
_request_type: PhantomData<R>,
}

impl<S, T, R> Clone for MetricsMiddleware<S, T, R>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
service_name: self.service_name.clone(),
protocol: self.protocol.clone(),
_request_extractor: PhantomData,
_request_type: PhantomData,
}
}
}

impl<S, T, R> MetricsMiddleware<S, T, R>
where
T: RequestExtractor<R>,
{
/// Initialize a middleware.
pub fn new(inner: S, service_name: String, protocal: String) -> Self {
pub fn new(inner: S, service_name: String, protocol: String) -> Self {
Self {
inner: inner,
inner,
service_name: service_name.clone(),
protocal: protocal,
_request_extractor_: PhantomData,
_request_type_: PhantomData,
protocol,
_request_extractor: PhantomData,
_request_type: PhantomData,
}
}
}

impl<S, T, Request> Service<Request> for MetricsMiddleware<S, T, Request>
impl<S, T, R> Service<R> for MetricsMiddleware<S, T, R>
where
S: Service<Request> + Send + Sync + Clone + 'static,
S::Future: Send + Sync + 'static,
T: RequestExtractor<Request> + 'static,
Request: Send + Sync + 'static,
S: Service<R> + Send + Clone + 'static,
S::Future: Send + 'static,
T: RequestExtractor<R> + 'static,
R: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
Expand All @@ -97,51 +127,37 @@ where
self.inner.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
fn call(&mut self, request: R) -> Self::Future {
let method_name = T::get_method_name(&request);

MethodMetrics::increment_num_requests(
self.service_name.as_str(),
method_name.as_str(),
self.protocal.as_str(),
);
MethodMetrics::increment_num_requests(&self.service_name, &method_name, &self.protocol);
MethodMetrics::increment_open_requests(
self.service_name.as_str(),
method_name.as_str(),
self.protocal.as_str(),
self.protocol.as_str(),
);

let start = Instant::now();
let mut svc = self.inner.clone();
let service_name = self.service_name.clone();
let protocal = self.protocal.clone();
let protocal = self.protocol.clone();
async move {
let rsp = svc.call(request).await;
MethodMetrics::record_request_latency(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
&method_name,
&service_name,
&protocal,
start.elapsed(),
);
MethodMetrics::decrement_open_requests(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
);
MethodMetrics::decrement_open_requests(&method_name, &service_name, &protocal);
if rsp.is_err() {
MethodMetrics::increment_error_count(
method_name.as_str(),
service_name.as_str(),
protocal.as_str(),
);
MethodMetrics::increment_error_count(&method_name, &service_name, &protocal);
}
rsp
}
.boxed()
}
}

#[derive(Clone)]
struct MethodMetrics {}

impl MethodMetrics {
Expand Down
4 changes: 4 additions & 0 deletions crates/types/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@
// If not, see https://www.gnu.org/licenses/.

//! Rundler task traits.
//!
//! This module contains traits related to Rundler tasks.

/// method extractor trait.
pub mod traits;
2 changes: 1 addition & 1 deletion crates/types/src/task/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

/// Trait to expose request method name.

pub trait RequestExtractor<R>: Copy + Sync + Send {
pub trait RequestExtractor<R>: Sync + Send {
/// Get method name.
fn get_method_name(request: &R) -> String;
}

0 comments on commit 9e97977

Please sign in to comment.