From 50ac256b9440cbebb1e9f17f1cc55412af78bb7c Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Tue, 7 Jan 2025 22:59:00 +0530 Subject: [PATCH] chore: update query Signed-off-by: Shivanshu Raj Shrivastava --- .../integrations/messagingQueues/kafka/sql.go | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index f90eb95420..8f1e010939 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -339,35 +339,36 @@ func generateOverviewSQL(start, end int64, filters *QueueFilters) string { var whereClauses []string // Mandatory base filter: show only kafka/celery - whereClauses = append(whereClauses, "aggregated_metrics.messaging_system IN ('kafka', 'celery')") + whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')") if len(filters.ServiceName) > 0 { - whereClauses = append(whereClauses, inClause("aggregated_metrics.service_name", filters.ServiceName)) + whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName)) } if len(filters.SpanName) > 0 { - whereClauses = append(whereClauses, inClause("aggregated_metrics.span_name", filters.SpanName)) + whereClauses = append(whereClauses, inClause("span_name", filters.SpanName)) } if len(filters.Queue) > 0 { // "queue" in the struct refers to the messaging_system in the DB - whereClauses = append(whereClauses, inClause("aggregated_metrics.messaging_system", filters.Queue)) + whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue)) } if len(filters.Destination) > 0 { - whereClauses = append(whereClauses, inClause("aggregated_metrics.destination", filters.Destination)) + whereClauses = append(whereClauses, inClause("destination", filters.Destination)) } if len(filters.Kind) > 0 { - whereClauses = append(whereClauses, inClause("aggregated_metrics.kind_string", filters.Kind)) + whereClauses = append(whereClauses, inClause("kind_string", filters.Kind)) } // Combine all WHERE clauses with AND whereSQL := strings.Join(whereClauses, "\n AND ") + if len(whereSQL) > 0 { + whereSQL = fmt.Sprintf("AND %s", whereSQL) + } + // Final query string // Note the use of %f for float64 values in fmt.Sprintf query := fmt.Sprintf(` WITH - timeRange AS ( - SELECT %f AS seconds - ), processed_traces AS ( SELECT resource_string_service$$name AS service_name, @@ -395,6 +396,7 @@ WITH OR has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name') ) + %s ), aggregated_metrics AS ( SELECT @@ -421,22 +423,18 @@ SELECT aggregated_metrics.messaging_system, aggregated_metrics.destination, aggregated_metrics.kind_string, - COALESCE(aggregated_metrics.total_count / timeRange.seconds, 0) AS throughput, + COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput, COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage, aggregated_metrics.p95_latency FROM aggregated_metrics -CROSS JOIN timeRange -WHERE - %s ORDER BY aggregated_metrics.service_name, aggregated_metrics.span_name; `, - timeRangeSecs, // timeRange AS (SELECT %f AS seconds) startSeconds, endSeconds, tsBucketStart, tsBucketEnd, - whereSQL, + whereSQL, timeRangeSecs, ) return query