From a1aeb3d8725cfcc5c58bd8c56fdb302c5e18dd6c Mon Sep 17 00:00:00 2001 From: Jannis Pohlmann Date: Mon, 29 Apr 2024 19:08:25 +0200 Subject: [PATCH] fix: fix reporting inconsistencies, preserve query_id in kafka messages (#701) Instead of having a mix of `client_query`, `client_request`, `indexer_query`, `indexer_request`, `query_id` and `request_id` all over the place, use the `request` variants everywhere. However, preserve `"query_id"` fields in kafka messages to avoid breaking the topic schema. --------- Co-authored-by: Theo Butler --- .../src/http/middleware/request_tracing.rs | 2 +- graph-gateway/src/client_query.rs | 20 +++++------ graph-gateway/src/reports.rs | 33 +++++++++---------- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/gateway-framework/src/http/middleware/request_tracing.rs b/gateway-framework/src/http/middleware/request_tracing.rs index 4f9b5976..b6983f31 100644 --- a/gateway-framework/src/http/middleware/request_tracing.rs +++ b/gateway-framework/src/http/middleware/request_tracing.rs @@ -56,7 +56,7 @@ where target: REQUEST_SPAN_TARGET, "client request", // name graph_env = %self.env_id, - query_id = field::Empty, + request_id = field::Empty, selector = field::Empty, ) .entered(); diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index b164f0cb..56e84f81 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -32,7 +32,7 @@ use gateway_framework::{ discovery::Status, indexing_performance::{IndexingPerformance, Snapshot}, }, - reporting::{with_metric, KafkaClient, METRICS}, + reporting::{with_metric, KafkaClient, CLIENT_REQUEST_TARGET, INDEXER_REQUEST_TARGET, METRICS}, scalar::{ReceiptStatus, ScalarReceipt}, topology::network::{Deployment, GraphNetwork, Subgraph}, }; @@ -170,7 +170,7 @@ pub async fn handle_query( }; let (legacy_status_message, legacy_status_code) = reports::legacy_status(&result); tracing::info!( - target: reports::CLIENT_QUERY_TARGET, + target: CLIENT_REQUEST_TARGET, start_time_ms = timestamp, deployment, %status_message, @@ -241,7 +241,7 @@ async fn handle_client_query_inner( .last() .map(|deployment| deployment.manifest.network.clone()) .ok_or_else(|| Error::SubgraphNotFound(anyhow!("no matching deployments")))?; - tracing::info!(target: reports::CLIENT_QUERY_TARGET, subgraph_chain); + tracing::info!(target: CLIENT_REQUEST_TARGET, subgraph_chain); let manifest_min_block = deployments.last().unwrap().manifest.min_block; let chain = ctx.chains.chain(&subgraph_chain).await; @@ -284,7 +284,7 @@ async fn handle_client_query_inner( let mut context = AgoraContext::new(&payload.query, &variables) .map_err(|err| Error::BadQuery(anyhow!("{err}")))?; tracing::info!( - target: reports::CLIENT_QUERY_TARGET, + target: CLIENT_REQUEST_TARGET, query = %payload.query, %variables, ); @@ -305,7 +305,7 @@ async fn handle_client_query_inner( budget = (*(user_budget_usd * grt_per_usd * one_grt) as u128).min(max_budget); } tracing::info!( - target: reports::CLIENT_QUERY_TARGET, + target: CLIENT_REQUEST_TARGET, query_count = 1, budget_grt = (budget as f64 * 1e-18) as f32, ); @@ -481,8 +481,8 @@ async fn handle_client_query_inner( // there's a race between creating this span and another indexer responding which will // close the outer client_query span. let span = tracing::info_span!( - target: reports::INDEXER_QUERY_TARGET, - "indexer_query", + target: INDEXER_REQUEST_TARGET, + "indexer_request", indexer = ?selection.indexing.indexer, ); let receipt_signer = ctx.receipt_signer; @@ -511,7 +511,7 @@ async fn handle_client_query_inner( let total_indexer_fees_usd = USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd); tracing::info!( - target: reports::CLIENT_QUERY_TARGET, + target: CLIENT_REQUEST_TARGET, indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32, indexer_fees_usd = *total_indexer_fees_usd.0 as f32, ); @@ -644,7 +644,7 @@ async fn handle_indexer_query( let latency_ms = ctx.response_time.as_millis() as u32; tracing::info!( - target: reports::INDEXER_QUERY_TARGET, + target: INDEXER_REQUEST_TARGET, %deployment, url = %selection.url, blocks_behind = selection.blocks_behind, @@ -714,7 +714,7 @@ async fn handle_indexer_query_inner( .collect::>() .join("; "); tracing::info!( - target: reports::INDEXER_QUERY_TARGET, + target: INDEXER_REQUEST_TARGET, indexer_errors = errors_repr, ); diff --git a/graph-gateway/src/reports.rs b/graph-gateway/src/reports.rs index 96829f90..4205bd61 100644 --- a/graph-gateway/src/reports.rs +++ b/graph-gateway/src/reports.rs @@ -2,7 +2,7 @@ use alloy_primitives::Address; use gateway_common::utils::timestamp::unix_timestamp; use gateway_framework::{ errors::{self, IndexerError}, - reporting::{error_log, KafkaClient}, + reporting::{error_log, KafkaClient, CLIENT_REQUEST_TARGET, INDEXER_REQUEST_TARGET}, }; use prost::Message as _; use serde::Deserialize; @@ -12,13 +12,10 @@ use toolshed::concat_bytes; use crate::indexer_client::ResponsePayload; -pub const CLIENT_QUERY_TARGET: &str = "client_query"; -pub const INDEXER_QUERY_TARGET: &str = "indexer_query"; - pub fn report_client_query(kafka: &KafkaClient, fields: Map) { #[derive(Deserialize)] struct Fields { - query_id: String, + request_id: String, graph_env: String, legacy_status_message: String, legacy_status_code: u32, @@ -36,7 +33,7 @@ pub fn report_client_query(kafka: &KafkaClient, fields: Map fields, Err(err) => { error_log( - CLIENT_QUERY_TARGET, + CLIENT_REQUEST_TARGET, &format!("failed to report client query: {}", err), ); return; @@ -48,13 +45,13 @@ pub fn report_client_query(kafka: &KafkaClient, fields: Map) { #[derive(Deserialize)] struct Fields { - query_id: String, + request_id: String, graph_env: String, api_key: Option, user_address: Option, @@ -119,7 +116,7 @@ pub fn report_indexer_query(kafka: &KafkaClient, fields: Map fields, Err(err) => { error_log( - INDEXER_QUERY_TARGET, + INDEXER_REQUEST_TARGET, &format!("failed to report indexer query: {}", err), ); return; @@ -128,13 +125,13 @@ pub fn report_indexer_query(kafka: &KafkaClient, fields: Map