Skip to content

Commit

Permalink
feat(middleware): create RequestInfo trait to get method form generic…
Browse files Browse the repository at this point in the history
… Request.
  • Loading branch information
andysim3d committed Sep 20, 2024
1 parent 1060c2e commit 2fc3160
Showing 1 changed file with 61 additions and 67 deletions.
128 changes: 61 additions & 67 deletions crates/task/src/metriclayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,45 @@ use std::{
time::{Duration, Instant},
};

use futures::{future::BoxFuture, FutureExt};
use jsonrpsee::server::HttpBody;
use pin_project::pin_project;
use tower::{Layer, Service};
use tonic::codegen::http;

#![allow(dead_code)]

pub fn get_method_name(request : http::Request<Body>) -> String {
let uri = request.uri();
let method_name = uri.path().split('/').last().unwrap_or("unknown");
method_name.to_string()
pub trait RequestInfo {
pub fn get_method_name(&self) -> String;
}

pub fn get_method_name<'a>(request : jsonrpsee::types::Request<'a>) -> String {
request.method_name().to_string()
pub struct MethodExtractor<Request>
{}

impl<Request> MethodExtractor<Request> where Request: RequestInfo
{
pub fn new () -> Self {
Self { }
}
pub fn extract_method_name(&self, request: Request) -> String{
request.get_method_name()
}
}

impl<Body> RequestInfo for http::Request<Body>{
fn get_method_name(&self) -> String {
let method_name = self.uri().path().split('/').last().unwrap_or("unknown");
method_name.to_string()
}
}

impl<'a> RequestInfo for jsonrpsee::types::Request<'a>{
fn get_method_name<'a>(&self) -> String {
self.
request.method_name().to_string()
}
}

#[derive(Debug, Clone)]
pub struct MetricsLayer {
service_name: String,
Expand All @@ -43,89 +66,60 @@ impl<S> Layer<S> for MetricsLayer {


#[derive(Clone, Debug)]
pub struct MetricsMiddleware<S> {
pub struct MetricsMiddleware<S, Request> {
inner: S,
service_name: String,
service_metrics: ServiceMetrics,
method_extractor: MethodExtractor<Request>
}

impl<S> MetricsMiddleware<S> {
impl<S, Request> MetricsMiddleware<S, Request>
where Request: RequestInfo
{
pub fn new(inner: S, service_name: String) -> Self {
Self {
inner: inner,
service_name: service_name,
service_metrics: ServiceMetrics::new(service_name.as_str()),
method_extractor: MethodExtractor<Request>::new(),
}
}
}

#[pin_project]
pub struct ResponseFuture<F> {
#[pin]
response_future: F,
start_time: Instant,
method_name: String,
service_name: String,
}

impl<S> Service<RequestInfo> for MetricsMiddleware<S>
impl<S, Request> Service<Request> for MetricsMiddleware<S>
where
S: Service<RequestInfo>,
S: Service<Request>,
Request: RequestInfo,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future>;
type Future = BoxFuture<Response>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: http::Request<Body>) -> Self::Future {
let uri = request.uri().clone();
let method_name = uri.path().split('/').last().unwrap_or("unknown");
{
let mut metric = method_metric.lock().unwarp();
MethodMetrics::increment_num_requests(self.service_name.as_str(), method_name);
MethodMetrics::increment_open_requests(self.service_name.as_str(), method_name);
}
ResponseFuture{
response_future: self.inner.call(request),
start_time: Instant::now(),
method_name: method_name.to_string(),
service_name: self.service_name.clone(),
fn call(&mut self, request: Request) -> Self::Future {
let method_name = self.method_extractor.extract_method_name(request);
MethodMetrics::increment_num_requests(self.service_name.clone(), method_name.to_string());
MethodMetrics::increment_open_requests(self.service_name.clone(), method_name);

let start = Instant::now();
let svc = self.inner.clone();
let service_name = self.service_name.clone();
async move{
let rsp: Result<Response, Error> = svc.call(request).await;
MethodMetrics::record_request_latency(method_name, service_name, start.elapsed());
MethodMetrics::decrement_open_requests(method_name, service_name);
if rsp.is_err(){
MethodMetrics::increment_error_count(method_name, service_name);
}
rsp
}
.boxed()
}
}

impl<F, Response, Error> Future for ResponseFuture<F>
where
F: Future<Output = Result<Response, Error>>,
{
type Output = Result<Response, Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.response_future.poll(cx);

if res.is_ready() {
MethodMetrics::decrement_open_requests(self.service_name.as_str(), method_name);
MethodMetrics::record_request_latency(self.service_name.as_str(), method_name, this.start_time.elapsed());
}
if let Poll::Ready(Err(_)) = res {
MethodMetrics::increment_error_count(self.service_name.as_str(), method_name);
}
res
}
}

#[pin_project]
pub struct ResponseFuture<F> {
#[pin]
response_future: F,
start_time: Instant,
method_name: String,
}


#[derive(Clone)]
// service metrics tracks all method metrics of specific service.
Expand All @@ -145,23 +139,23 @@ impl ServiceMetrics {
struct MethodMetrics {}

impl MethodMetrics {
fn increment_num_requests(&self, method_name: String, service_name: String) {
fn increment_num_requests(method_name: String, service_name: String) {
metrics::counter!("num_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string()).increment(1)
}

fn increment_open_requests(&self, method_name: String, service_name: String) {
fn increment_open_requests(method_name: String, service_name: String) {
metrics::gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string()).increment(1_f64)
}

fn decrement_open_requests(&self, method_name: String, service_name: String) {
fn decrement_open_requests( method_name: String, service_name: String) {
metrics::gauge!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string()).decrement(1_f64)
}

fn increment_error_count(&self, method_name: String, service_name: String) {
fn increment_error_count( method_name: String, service_name: String) {
metrics::counter!("open_requests", "method_name" => method_name.to_string(), "service_name" => service_name.to_string()).increment(1)
}

fn record_request_latency(&self, , method_name: String, service_name: String, latency: Duration) {
fn record_request_latency( method_name: String, service_name: String, latency: Duration) {
metrics::histogram!("request_latency", "method_name" => method_name.to_string(), "service_name" => service_name.to_string()).record(latency)
}
}

0 comments on commit 2fc3160

Please sign in to comment.