Skip to content

Commit

Permalink
chore: update query
Browse files Browse the repository at this point in the history
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
  • Loading branch information
shivanshuraj1333 committed Jan 7, 2025
1 parent 807c1c6 commit 50ac256
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,35 +339,36 @@ func generateOverviewSQL(start, end int64, filters *QueueFilters) string {
var whereClauses []string

// Mandatory base filter: show only kafka/celery
whereClauses = append(whereClauses, "aggregated_metrics.messaging_system IN ('kafka', 'celery')")
whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')")

if len(filters.ServiceName) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.service_name", filters.ServiceName))
whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName))
}
if len(filters.SpanName) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.span_name", filters.SpanName))
whereClauses = append(whereClauses, inClause("span_name", filters.SpanName))
}
if len(filters.Queue) > 0 {
// "queue" in the struct refers to the messaging_system in the DB
whereClauses = append(whereClauses, inClause("aggregated_metrics.messaging_system", filters.Queue))
whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue))
}
if len(filters.Destination) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.destination", filters.Destination))
whereClauses = append(whereClauses, inClause("destination", filters.Destination))
}
if len(filters.Kind) > 0 {
whereClauses = append(whereClauses, inClause("aggregated_metrics.kind_string", filters.Kind))
whereClauses = append(whereClauses, inClause("kind_string", filters.Kind))
}

// Combine all WHERE clauses with AND
whereSQL := strings.Join(whereClauses, "\n AND ")

if len(whereSQL) > 0 {
whereSQL = fmt.Sprintf("AND %s", whereSQL)
}

// Final query string
// Note the use of %f for float64 values in fmt.Sprintf
query := fmt.Sprintf(`
WITH
timeRange AS (
SELECT %f AS seconds
),
processed_traces AS (
SELECT
resource_string_service$$name AS service_name,
Expand Down Expand Up @@ -395,6 +396,7 @@ WITH
OR has(attributes_string, 'celery.action')
OR has(attributes_string, 'celery.task_name')
)
%s
),
aggregated_metrics AS (
SELECT
Expand All @@ -421,22 +423,18 @@ SELECT
aggregated_metrics.messaging_system,
aggregated_metrics.destination,
aggregated_metrics.kind_string,
COALESCE(aggregated_metrics.total_count / timeRange.seconds, 0) AS throughput,
COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput,
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
aggregated_metrics.p95_latency
FROM
aggregated_metrics
CROSS JOIN timeRange
WHERE
%s
ORDER BY
aggregated_metrics.service_name,
aggregated_metrics.span_name;
`,
timeRangeSecs, // timeRange AS (SELECT %f AS seconds)
startSeconds, endSeconds,
tsBucketStart, tsBucketEnd,
whereSQL,
whereSQL, timeRangeSecs,
)

return query
Expand Down

0 comments on commit 50ac256

Please sign in to comment.