Skip to content

Commit

Permalink
fix: fix reporting inconsistencies, preserve query_id in kafka messag…
Browse files Browse the repository at this point in the history
…es (#701)

Instead of having a mix of `client_query`, `client_request`,
`indexer_query`, `indexer_request`, `query_id` and `request_id` all over
the place, use the `request` variants everywhere.

However, preserve `"query_id"` fields in kafka messages to avoid
breaking the topic schema.

---------

Co-authored-by: Theo Butler <theodusbutler@gmail.com>
  • Loading branch information
Jannis and Theodus authored Apr 29, 2024
1 parent b98a3ca commit a1aeb3d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 29 deletions.
2 changes: 1 addition & 1 deletion gateway-framework/src/http/middleware/request_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
target: REQUEST_SPAN_TARGET,
"client request", // name
graph_env = %self.env_id,
query_id = field::Empty,
request_id = field::Empty,
selector = field::Empty,
)
.entered();
Expand Down
20 changes: 10 additions & 10 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use gateway_framework::{
discovery::Status,
indexing_performance::{IndexingPerformance, Snapshot},
},
reporting::{with_metric, KafkaClient, METRICS},
reporting::{with_metric, KafkaClient, CLIENT_REQUEST_TARGET, INDEXER_REQUEST_TARGET, METRICS},
scalar::{ReceiptStatus, ScalarReceipt},
topology::network::{Deployment, GraphNetwork, Subgraph},
};
Expand Down Expand Up @@ -170,7 +170,7 @@ pub async fn handle_query(
};
let (legacy_status_message, legacy_status_code) = reports::legacy_status(&result);
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
target: CLIENT_REQUEST_TARGET,
start_time_ms = timestamp,
deployment,
%status_message,
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn handle_client_query_inner(
.last()
.map(|deployment| deployment.manifest.network.clone())
.ok_or_else(|| Error::SubgraphNotFound(anyhow!("no matching deployments")))?;
tracing::info!(target: reports::CLIENT_QUERY_TARGET, subgraph_chain);
tracing::info!(target: CLIENT_REQUEST_TARGET, subgraph_chain);

let manifest_min_block = deployments.last().unwrap().manifest.min_block;
let chain = ctx.chains.chain(&subgraph_chain).await;
Expand Down Expand Up @@ -284,7 +284,7 @@ async fn handle_client_query_inner(
let mut context = AgoraContext::new(&payload.query, &variables)
.map_err(|err| Error::BadQuery(anyhow!("{err}")))?;
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
target: CLIENT_REQUEST_TARGET,
query = %payload.query,
%variables,
);
Expand All @@ -305,7 +305,7 @@ async fn handle_client_query_inner(
budget = (*(user_budget_usd * grt_per_usd * one_grt) as u128).min(max_budget);
}
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
target: CLIENT_REQUEST_TARGET,
query_count = 1,
budget_grt = (budget as f64 * 1e-18) as f32,
);
Expand Down Expand Up @@ -481,8 +481,8 @@ async fn handle_client_query_inner(
// there's a race between creating this span and another indexer responding which will
// close the outer client_query span.
let span = tracing::info_span!(
target: reports::INDEXER_QUERY_TARGET,
"indexer_query",
target: INDEXER_REQUEST_TARGET,
"indexer_request",
indexer = ?selection.indexing.indexer,
);
let receipt_signer = ctx.receipt_signer;
Expand Down Expand Up @@ -511,7 +511,7 @@ async fn handle_client_query_inner(
let total_indexer_fees_usd =
USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd);
tracing::info!(
target: reports::CLIENT_QUERY_TARGET,
target: CLIENT_REQUEST_TARGET,
indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32,
indexer_fees_usd = *total_indexer_fees_usd.0 as f32,
);
Expand Down Expand Up @@ -644,7 +644,7 @@ async fn handle_indexer_query(

let latency_ms = ctx.response_time.as_millis() as u32;
tracing::info!(
target: reports::INDEXER_QUERY_TARGET,
target: INDEXER_REQUEST_TARGET,
%deployment,
url = %selection.url,
blocks_behind = selection.blocks_behind,
Expand Down Expand Up @@ -714,7 +714,7 @@ async fn handle_indexer_query_inner(
.collect::<Vec<&str>>()
.join("; ");
tracing::info!(
target: reports::INDEXER_QUERY_TARGET,
target: INDEXER_REQUEST_TARGET,
indexer_errors = errors_repr,
);

Expand Down
33 changes: 15 additions & 18 deletions graph-gateway/src/reports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy_primitives::Address;
use gateway_common::utils::timestamp::unix_timestamp;
use gateway_framework::{
errors::{self, IndexerError},
reporting::{error_log, KafkaClient},
reporting::{error_log, KafkaClient, CLIENT_REQUEST_TARGET, INDEXER_REQUEST_TARGET},
};
use prost::Message as _;
use serde::Deserialize;
Expand All @@ -12,13 +12,10 @@ use toolshed::concat_bytes;

use crate::indexer_client::ResponsePayload;

pub const CLIENT_QUERY_TARGET: &str = "client_query";
pub const INDEXER_QUERY_TARGET: &str = "indexer_query";

pub fn report_client_query(kafka: &KafkaClient, fields: Map<String, serde_json::Value>) {
#[derive(Deserialize)]
struct Fields {
query_id: String,
request_id: String,
graph_env: String,
legacy_status_message: String,
legacy_status_code: u32,
Expand All @@ -36,7 +33,7 @@ pub fn report_client_query(kafka: &KafkaClient, fields: Map<String, serde_json::
Ok(fields) => fields,
Err(err) => {
error_log(
CLIENT_QUERY_TARGET,
CLIENT_REQUEST_TARGET,
&format!("failed to report client query: {}", err),
);
return;
Expand All @@ -48,13 +45,13 @@ pub fn report_client_query(kafka: &KafkaClient, fields: Map<String, serde_json::

// data science: bigquery datasets still rely on this log line
let log = serde_json::to_string(&json!({
"target": CLIENT_QUERY_TARGET,
"target": CLIENT_REQUEST_TARGET,
"level": "INFO",
"timestamp": chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
"fields": {
"message": "Client query result",
"query_id": &fields.query_id,
"ray_id": &fields.query_id, // In production this will be the Ray ID.
"query_id": &fields.request_id,
"ray_id": &fields.request_id, // In production this will be the Ray ID.
"deployment": fields.deployment.as_deref().unwrap_or(""),
"network": fields.subgraph_chain.as_deref().unwrap_or(""),
"user": &fields.user_address,
Expand All @@ -72,8 +69,8 @@ pub fn report_client_query(kafka: &KafkaClient, fields: Map<String, serde_json::
println!("{log}");

let kafka_msg = json!({
"query_id": &fields.query_id,
"ray_id": &fields.query_id, // In production this will be the Ray ID.
"query_id": &fields.request_id,
"ray_id": &fields.request_id, // In production this will be the Ray ID.
"graph_env": &fields.graph_env,
"timestamp": timestamp,
"user": &fields.user_address,
Expand All @@ -98,7 +95,7 @@ pub fn report_client_query(kafka: &KafkaClient, fields: Map<String, serde_json::
pub fn report_indexer_query(kafka: &KafkaClient, fields: Map<String, serde_json::Value>) {
#[derive(Deserialize)]
struct Fields {
query_id: String,
request_id: String,
graph_env: String,
api_key: Option<String>,
user_address: Option<String>,
Expand All @@ -119,7 +116,7 @@ pub fn report_indexer_query(kafka: &KafkaClient, fields: Map<String, serde_json:
Ok(fields) => fields,
Err(err) => {
error_log(
INDEXER_QUERY_TARGET,
INDEXER_REQUEST_TARGET,
&format!("failed to report indexer query: {}", err),
);
return;
Expand All @@ -128,13 +125,13 @@ pub fn report_indexer_query(kafka: &KafkaClient, fields: Map<String, serde_json:

// data science: bigquery datasets still rely on this log line
let log = serde_json::to_string(&json!({
"target": INDEXER_QUERY_TARGET,
"target": INDEXER_REQUEST_TARGET,
"level": "INFO",
"timestamp": chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Nanos, true),
"fields": {
"message": "Indexer attempt",
"query_id": &fields.query_id,
"ray_id": &fields.query_id, // In production this will be the Ray ID.
"query_id": &fields.request_id,
"ray_id": &fields.request_id, // In production this will be the Ray ID.
"deployment": &fields.deployment,
"indexer": &fields.indexer,
"url": &fields.url,
Expand All @@ -153,8 +150,8 @@ pub fn report_indexer_query(kafka: &KafkaClient, fields: Map<String, serde_json:
println!("{log}");

let kafka_msg = json!({
"query_id": &fields.query_id,
"ray_id": &fields.query_id, // In production this will be the Ray ID.
"query_id": &fields.request_id,
"ray_id": &fields.request_id, // In production this will be the Ray ID.
"graph_env": &fields.graph_env,
"timestamp": unix_timestamp(),
"api_key": fields.api_key.as_deref().unwrap_or(""),
Expand Down

0 comments on commit a1aeb3d

Please sign in to comment.