diff --git a/configuration/update.go b/configuration/update.go index 1a02fc1..2d3ffae 100644 --- a/configuration/update.go +++ b/configuration/update.go @@ -65,6 +65,21 @@ func (uc *updateCommand) SetMetadataMetric(key string, info metadata.MetricInfo) uc.existedMetrics[key] = true } +// MetricExists check if the metric exists +func (uc *updateCommand) MetricExists(key string) bool { + uc.lock.Lock() + defer uc.lock.Unlock() + _, ok := uc.existedMetrics[key] + return ok +} + +// SetMetricExists marks if the metric as existent +func (uc *updateCommand) SetMetricExists(key string) { + uc.lock.Lock() + defer uc.lock.Unlock() + uc.existedMetrics[key] = true +} + func introspectSchema(ctx context.Context, args *UpdateArguments) error { start := time.Now() slog.Info("introspecting metadata", slog.String("dir", args.Dir)) @@ -184,13 +199,14 @@ func (uc *updateCommand) introspectMetric(ctx context.Context, key string, infos continue } - if _, ok := uc.existedMetrics[key]; ok { + if uc.MetricExists(key) { slog.Warn(fmt.Sprintf("metric %s exists", key)) } + switch metricType { case model.MetricTypeGauge, model.MetricTypeGaugeHistogram: for _, suffix := range []string{"sum", "bucket", "count"} { - uc.existedMetrics[fmt.Sprintf("%s_%s", key, suffix)] = true + uc.SetMetricExists(fmt.Sprintf("%s_%s", key, suffix)) } } @@ -350,8 +366,11 @@ var defaultConfiguration = metadata.Configuration{ UnixTimeUnit: client.UnixTimeSecond, ConcurrencyLimit: 5, Format: metadata.RuntimeFormatSettings{ - Timestamp: metadata.TimestampUnix, - Value: metadata.ValueFloat64, + Timestamp: metadata.TimestampUnix, + Value: metadata.ValueFloat64, + NaN: "NaN", + Inf: "+Inf", + NegativeInf: "-Inf", }, }, } diff --git a/connector-definition/configuration.yaml b/connector-definition/configuration.yaml index b47860a..e1a1795 100644 --- a/connector-definition/configuration.yaml +++ b/connector-definition/configuration.yaml @@ -20,3 +20,6 @@ runtime: format: timestamp: unix value: float64 + nan: "NaN" + inf: "+Inf" + negative_inf: "-Inf" diff --git a/connector/api/decimal.go b/connector/api/decimal.go index c3e0990..c56877a 100644 --- a/connector/api/decimal.go +++ b/connector/api/decimal.go @@ -3,6 +3,7 @@ package api import ( "encoding/json" "fmt" + "math" "github.com/hasura/ndc-sdk-go/utils" ) @@ -16,7 +17,7 @@ type Decimal struct { raw *string } -// NewDecimal creates a BigDecimal instance +// NewDecimal creates a Decimal instance func NewDecimal[T comparable](value T) (Decimal, error) { result := Decimal{} if err := result.FromValue(value); err != nil { @@ -25,25 +26,55 @@ func NewDecimal[T comparable](value T) (Decimal, error) { return result, nil } +// NewDecimalValue creates a Decimal instance from a number value +func NewDecimalValue[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | float32 | float64](value T) Decimal { + v := float64(value) + return Decimal{value: &v} +} + // ScalarName get the schema name of the scalar func (bd Decimal) IsNil() bool { return bd.raw == nil } +// Value returns the decimal value +func (bd Decimal) Value() any { + if bd.value == nil { + if bd.raw != nil { + return *bd.raw + } + return nil + } + + if math.IsNaN(*bd.value) { + return "NaN" + } + if *bd.value > 0 && math.IsInf(*bd.value, 1) { + return "+Inf" + } + if *bd.value < 0 && math.IsInf(*bd.value, -1) { + return "-Inf" + } + + return *bd.value +} + // Stringer implements fmt.Stringer interface. func (bd Decimal) String() string { - if bd.raw != nil { - return *bd.raw - } - if bd.value != nil { - return fmt.Sprint(*bd.value) + v := bd.Value() + if v == nil { + return "NaN" } - return "Inf" + return fmt.Sprint(v) } // MarshalJSON implements json.Marshaler. func (bi Decimal) MarshalJSON() ([]byte, error) { - return json.Marshal(bi.String()) + v := bi.Value() + if v != nil { + v = fmt.Sprint(v) + } + return json.Marshal(v) } // UnmarshalJSON implements json.Unmarshaler. diff --git a/connector/api/decimal_test.go b/connector/api/decimal_test.go index b50d43f..212ec42 100644 --- a/connector/api/decimal_test.go +++ b/connector/api/decimal_test.go @@ -4,14 +4,13 @@ import ( "encoding/json" "testing" - "github.com/hasura/ndc-sdk-go/utils" "gotest.tools/v3/assert" ) func TestDecimal(t *testing.T) { assert.Assert(t, Decimal{}.IsNil()) - assert.Equal(t, Decimal{}.String(), "Inf") - assert.Equal(t, Decimal{value: utils.ToPtr(1.2)}.String(), "1.2") + assert.Equal(t, Decimal{}.String(), "NaN") + assert.Equal(t, NewDecimalValue(1.2).String(), "1.2") _, err := NewDecimal("foo") assert.ErrorContains(t, err, "failed to convert Float, got: foo") diff --git a/connector/internal/native_query.go b/connector/internal/native_query.go index 3622861..ca08414 100644 --- a/connector/internal/native_query.go +++ b/connector/internal/native_query.go @@ -12,6 +12,7 @@ import ( "github.com/hasura/ndc-sdk-go/schema" "github.com/hasura/ndc-sdk-go/utils" "github.com/prometheus/common/model" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -138,12 +139,16 @@ func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString st } span := trace.SpanFromContext(ctx) - span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression))) + span.AddEvent("post_filter", trace.WithAttributes( + utils.JSONAttribute("expression", params.Expression), + attribute.Int("pre_filter_count", len(vector)), + )) vector, err = nqe.filterVectorResults(vector, params.Expression) if err != nil { return nil, schema.UnprocessableContentError(err.Error(), nil) } + span.AddEvent("post_filter_results", trace.WithAttributes(attribute.Int("post_filter_count", len(vector)))) sortVector(vector, params.OrderBy) vector = paginateVector(vector, nqe.Request.Query) results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime, flat) @@ -158,12 +163,16 @@ func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString stri } span := trace.SpanFromContext(ctx) - span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression))) + span.AddEvent("post_filter", trace.WithAttributes( + utils.JSONAttribute("expression", params.Expression), + attribute.Int("pre_filter_count", len(matrix)), + )) matrix, err = nqe.filterMatrixResults(matrix, params) if err != nil { return nil, schema.UnprocessableContentError(err.Error(), nil) } + span.AddEvent("post_filter_results", trace.WithAttributes(attribute.Int("post_filter_count", len(matrix)))) sortMatrix(matrix, params.OrderBy) results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime, flat) diff --git a/connector/internal/native_query_test.go b/connector/internal/native_query_test.go index 7de9be5..0bbc480 100644 --- a/connector/internal/native_query_test.go +++ b/connector/internal/native_query_test.go @@ -73,10 +73,10 @@ func TestFilterVectorResults(t *testing.T) { schema.NewExpressionBinaryComparisonOperator( *schema.NewComparisonTargetColumn("job", []string{}, []schema.PathElement{}), "_in", - schema.NewComparisonValueScalar([]string{"foo"}), + schema.NewComparisonValueScalar([]string{"ndc-prometheus"}), ), ).Encode(), - Expected: model.Vector{}, + Expected: vectorFixtures, }, { Name: "nin", diff --git a/connector/internal/utils.go b/connector/internal/utils.go index fe7ec59..1a95a3b 100644 --- a/connector/internal/utils.go +++ b/connector/internal/utils.go @@ -3,6 +3,7 @@ package internal import ( "encoding/json" "fmt" + "math" "slices" "time" @@ -17,7 +18,7 @@ func createQueryResultsFromVector(vector model.Vector, labels map[string]metadat results := make([]map[string]any, len(vector)) for i, item := range vector { ts := formatTimestamp(item.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit) - value := formatValue(item.Value, runtime.Format.Value) + value := formatValue(item.Value, runtime.Format) r := map[string]any{ metadata.TimestampKey: ts, metadata.ValueKey: value, @@ -65,7 +66,7 @@ func createGroupQueryResultsFromMatrix(matrix model.Matrix, labels map[string]me values := make([]map[string]any, valuesLen) for i, value := range item.Values { ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit) - v := formatValue(value.Value, runtime.Format.Value) + v := formatValue(value.Value, runtime.Format) values[i] = map[string]any{ metadata.TimestampKey: ts, metadata.ValueKey: v, @@ -89,7 +90,7 @@ func createFlatQueryResultsFromMatrix(matrix model.Matrix, labels map[string]met for _, item := range matrix { for _, value := range item.Values { ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit) - v := formatValue(value.Value, runtime.Format.Value) + v := formatValue(value.Value, runtime.Format) r := map[string]any{ metadata.LabelsKey: item.Metric, metadata.TimestampKey: ts, @@ -117,9 +118,19 @@ func formatTimestamp(ts model.Time, format metadata.TimestampFormat, unixTime cl } } -func formatValue(value model.SampleValue, format metadata.ValueFormat) any { - switch format { +func formatValue(value model.SampleValue, format metadata.RuntimeFormatSettings) any { + switch format.Value { case metadata.ValueFloat64: + if math.IsNaN(float64(value)) { + return format.NaN + } + if value > 0 && math.IsInf(float64(value), 1) { + return format.Inf + } + if value < 0 && math.IsInf(float64(value), -1) { + return format.NegativeInf + } + return float64(value) default: return value.String() diff --git a/connector/metadata/configuration.go b/connector/metadata/configuration.go index f91a422..62e0509 100644 --- a/connector/metadata/configuration.go +++ b/connector/metadata/configuration.go @@ -80,6 +80,12 @@ type RuntimeFormatSettings struct { Timestamp TimestampFormat `json:"timestamp" yaml:"timestamp" jsonschema:"enum=rfc3339,enum=unix,default=unix"` // The serialization format for value Value ValueFormat `json:"value" yaml:"value" jsonschema:"enum=string,enum=float64,default=string"` + // The serialization format for not-a-number values + NaN any `json:"nan" yaml:"nan" jsonschema:"oneof_type=string;number"` + // The serialization format for infinite values + Inf any `json:"inf" yaml:"inf" jsonschema:"oneof_type=string;number"` + // The serialization format for negative infinite values + NegativeInf any `json:"negative_inf" yaml:"negative_inf" jsonschema:"oneof_type=string;number"` } // RuntimeSettings contain settings for the runtime engine diff --git a/connector/metadata/native_operation.go b/connector/metadata/native_operation.go index 6d59706..7b5ffb4 100644 --- a/connector/metadata/native_operation.go +++ b/connector/metadata/native_operation.go @@ -3,6 +3,7 @@ package metadata import ( "fmt" "regexp" + "slices" "strings" "github.com/hasura/ndc-sdk-go/schema" @@ -12,6 +13,7 @@ import ( // The variable syntax for native queries is ${} which is compatible with Grafana var promQLVariableRegex = regexp.MustCompile(`\${(\w+)}`) +var allowedNativeQueryScalars = []ScalarName{ScalarString, ScalarDuration, ScalarInt64, ScalarFloat64} // NativeOperations the list of native query and mutation definitions type NativeOperations struct { @@ -59,9 +61,18 @@ func (scb *connectorSchemaBuilder) buildNativeQuery(name string, query *NativeQu if _, ok := arguments[key]; ok { return fmt.Errorf("argument `%s` is already used by the function", key) } + scalarName := arg.Type + if arg.Type != "" { + if !slices.Contains(allowedNativeQueryScalars, ScalarName(arg.Type)) { + return fmt.Errorf("%s: unsupported native query argument type %s; argument: %s ", name, scalarName, key) + } + } else { + scalarName = string(ScalarString) + } + arguments[key] = schema.ArgumentInfo{ Description: arg.Description, - Type: schema.NewNamedType(string(ScalarString)).Encode(), + Type: schema.NewNamedType(scalarName).Encode(), } } diff --git a/connector/query.go b/connector/query.go index 47b0d39..199bc2d 100644 --- a/connector/query.go +++ b/connector/query.go @@ -195,6 +195,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C Client: state.Client, Request: request, Arguments: arguments, + Runtime: c.runtime, } _, queryString, err := executor.Explain(ctx) if err != nil { @@ -215,6 +216,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C Request: request, NativeQuery: &nativeQuery, Arguments: arguments, + Runtime: c.runtime, } _, queryString, err := executor.Explain(ctx) if err != nil { @@ -245,6 +247,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C Metric: collection, Variables: requestVars[0], Arguments: arguments, + Runtime: c.runtime, } _, queryString, _, err := executor.Explain(ctx) diff --git a/connector/testdata/query/process_cpu_seconds_total_variables/request.json b/connector/testdata/query/process_cpu_seconds_total_variables/request.json index d358909..9e51a11 100644 --- a/connector/testdata/query/process_cpu_seconds_total_variables/request.json +++ b/connector/testdata/query/process_cpu_seconds_total_variables/request.json @@ -2,9 +2,7 @@ "collection": "process_cpu_seconds_total", "query": { "fields": { - "job": { "type": "column", "column": "job", "fields": null }, - "value": { "type": "column", "column": "value", "fields": null }, - "timestamp": { "type": "column", "column": "timestamp", "fields": null } + "job": { "type": "column", "column": "job", "fields": null } }, "limit": 1, "order_by": { diff --git a/connector/testdata/query/prometheus_alertmanagers/expected.json b/connector/testdata/query/prometheus_alertmanagers/expected.json new file mode 100644 index 0000000..c121ead --- /dev/null +++ b/connector/testdata/query/prometheus_alertmanagers/expected.json @@ -0,0 +1,16 @@ +[ + { + "rows": [ + { + "__value": { + "activeAlertManagers": [ + { + "url": "http://alertmanager:9093/api/v2/alerts" + } + ], + "droppedAlertManagers": [] + } + } + ] + } +] \ No newline at end of file diff --git a/connector/testdata/query/prometheus_alerts/request.json b/connector/testdata/query/prometheus_alerts/request.json index d238e2a..3fa767e 100644 --- a/connector/testdata/query/prometheus_alerts/request.json +++ b/connector/testdata/query/prometheus_alerts/request.json @@ -24,10 +24,6 @@ "state": { "column": "state", "type": "column" - }, - "value": { - "column": "value", - "type": "column" } }, "type": "object" @@ -38,4 +34,4 @@ } } } -} \ No newline at end of file +} diff --git a/connector/testdata/query/prometheus_targets/expected.json b/connector/testdata/query/prometheus_targets/expected.json new file mode 100644 index 0000000..f66f315 --- /dev/null +++ b/connector/testdata/query/prometheus_targets/expected.json @@ -0,0 +1,11 @@ +[ + { + "rows": [ + { + "__value": { + "droppedTargets": [] + } + } + ] + } +] diff --git a/connector/testdata/query/prometheus_targets/request.json b/connector/testdata/query/prometheus_targets/request.json index f889c37..e3e1154 100644 --- a/connector/testdata/query/prometheus_targets/request.json +++ b/connector/testdata/query/prometheus_targets/request.json @@ -8,54 +8,6 @@ "column": "__value", "fields": { "fields": { - "activeTargets": { - "column": "activeTargets", - "fields": { - "fields": { - "fields": { - "discoveredLabels": { - "column": "discoveredLabels", - "type": "column" - }, - "globalUrl": { - "column": "globalUrl", - "type": "column" - }, - "health": { - "column": "health", - "type": "column" - }, - "labels": { - "column": "labels", - "type": "column" - }, - "lastError": { - "column": "lastError", - "type": "column" - }, - "lastScrape": { - "column": "lastScrape", - "type": "column" - }, - "lastScrapeDuration": { - "column": "lastScrapeDuration", - "type": "column" - }, - "scrapePool": { - "column": "scrapePool", - "type": "column" - }, - "scrapeUrl": { - "column": "scrapeUrl", - "type": "column" - } - }, - "type": "object" - }, - "type": "array" - }, - "type": "column" - }, "droppedTargets": { "column": "droppedTargets", "fields": { @@ -79,4 +31,4 @@ } } } -} \ No newline at end of file +} diff --git a/jsonschema/configuration.json b/jsonschema/configuration.json index 7d49e5a..5e1a80a 100644 --- a/jsonschema/configuration.json +++ b/jsonschema/configuration.json @@ -440,13 +440,46 @@ "float64" ], "default": "string" + }, + "nan": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "number" + } + ] + }, + "inf": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "number" + } + ] + }, + "negative_inf": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "number" + } + ] } }, "additionalProperties": false, "type": "object", "required": [ "timestamp", - "value" + "value", + "nan", + "inf", + "negative_inf" ] }, "RuntimeSettings": { diff --git a/tests/configuration/configuration.yaml b/tests/configuration/configuration.yaml index 20d3020..91f8ebf 100644 --- a/tests/configuration/configuration.yaml +++ b/tests/configuration/configuration.yaml @@ -197,6 +197,15 @@ metadata: telemetry_sdk_version: {} native_operations: queries: + ndc_prometheus_query_latency_avg: + query: sum by (collection)(rate(ndc_prometheus_query_total_time_sum{job="${job}"}[${interval}])) / sum by (collection)(rate(ndc_prometheus_query_total_time_count{job="${job}"}[${interval}])) + labels: + collection: {} + arguments: + interval: + type: Duration + job: + type: String service_up: query: up{job="${job}", instance="${instance}"} labels: @@ -213,4 +222,7 @@ runtime: format: timestamp: rfc3339 value: float64 + nan: null + inf: "+Inf" + negative_inf: "-Inf" concurrency_limit: 3 diff --git a/tests/engine/app/metadata/ndc_prometheus_query_latency_avg.hml b/tests/engine/app/metadata/ndc_prometheus_query_latency_avg.hml new file mode 100644 index 0000000..8a4603a --- /dev/null +++ b/tests/engine/app/metadata/ndc_prometheus_query_latency_avg.hml @@ -0,0 +1,125 @@ +--- +kind: ObjectType +version: v1 +definition: + name: NdcPrometheusQueryLatencyAvg + fields: + - name: collection + type: String! + - name: labels + type: LabelSet! + description: Labels of the metric + - name: timestamp + type: Timestamp! + description: An instant timestamp or the last timestamp of a range query result + - name: value + type: Decimal! + description: Value of the instant query or the last value of a range query + - name: values + type: "[QueryResultValue!]!" + description: An array of query result values + graphql: + typeName: NdcPrometheusQueryLatencyAvg + inputTypeName: NdcPrometheusQueryLatencyAvg_input + dataConnectorTypeMapping: + - dataConnectorName: prometheus + dataConnectorObjectType: NdcPrometheusQueryLatencyAvg + +--- +kind: TypePermissions +version: v1 +definition: + typeName: NdcPrometheusQueryLatencyAvg + permissions: + - role: admin + output: + allowedFields: + - collection + - labels + - timestamp + - value + - values + +--- +kind: BooleanExpressionType +version: v1 +definition: + name: NdcPrometheusQueryLatencyAvg_bool_exp + operand: + object: + type: NdcPrometheusQueryLatencyAvg + comparableFields: + - fieldName: collection + booleanExpressionType: String_bool_exp + - fieldName: labels + booleanExpressionType: LabelSet_bool_exp + - fieldName: timestamp + booleanExpressionType: Timestamp_bool_exp + - fieldName: value + booleanExpressionType: Decimal_bool_exp + comparableRelationships: [] + logicalOperators: + enable: true + isNull: + enable: true + graphql: + typeName: NdcPrometheusQueryLatencyAvg_bool_exp + +--- +kind: Model +version: v1 +definition: + name: ndc_prometheus_query_latency_avg + objectType: NdcPrometheusQueryLatencyAvg + arguments: + - name: flat + type: Boolean + description: Flatten grouped values out the root array + - name: interval + type: Duration! + - name: job + type: String! + - name: offset + type: Duration + description: The offset modifier allows changing the time offset for individual + instant and range vectors in a query + - name: step + type: Duration + description: Query resolution step width in duration format or float number of seconds + - name: timeout + type: Duration + description: Evaluation timeout + source: + dataConnectorName: prometheus + collection: ndc_prometheus_query_latency_avg + filterExpressionType: NdcPrometheusQueryLatencyAvg_bool_exp + orderableFields: + - fieldName: collection + orderByDirections: + enableAll: true + - fieldName: labels + orderByDirections: + enableAll: true + - fieldName: timestamp + orderByDirections: + enableAll: true + - fieldName: value + orderByDirections: + enableAll: true + graphql: + selectMany: + queryRootField: ndc_prometheus_query_latency_avg + selectUniques: [] + argumentsInputType: ndc_prometheus_query_latency_avg_arguments + orderByExpressionType: ndc_prometheus_query_latency_avg_order_by + +--- +kind: ModelPermissions +version: v1 +definition: + modelName: ndc_prometheus_query_latency_avg + permissions: + - role: admin + select: + filter: null + diff --git a/tests/engine/app/metadata/prometheus.hml b/tests/engine/app/metadata/prometheus.hml index c257f4d..4859ee7 100644 --- a/tests/engine/app/metadata/prometheus.hml +++ b/tests/engine/app/metadata/prometheus.hml @@ -3864,6 +3864,34 @@ definition: type: type: named name: String + NdcPrometheusQueryLatencyAvg: + fields: + collection: + type: + type: named + name: String + labels: + description: Labels of the metric + type: + type: named + name: LabelSet + timestamp: + description: An instant timestamp or the last timestamp of a range query result + type: + type: named + name: Timestamp + value: + description: Value of the instant query or the last value of a range query + type: + type: named + name: Decimal + values: + description: An array of query result values + type: + type: array + element_type: + type: named + name: QueryResultValue NdcPrometheusQueryTotal: fields: collection: @@ -18440,6 +18468,47 @@ definition: type: HttpClientResponseSizeBytesTotal uniqueness_constraints: {} foreign_keys: {} + - name: ndc_prometheus_query_latency_avg + arguments: + flat: + description: Flatten grouped values out the root array + type: + type: nullable + underlying_type: + type: named + name: Boolean + interval: + type: + type: named + name: Duration + job: + type: + type: named + name: String + offset: + description: The offset modifier allows changing the time offset for individual instant and range vectors in a query + type: + type: nullable + underlying_type: + type: named + name: Duration + step: + description: Query resolution step width in duration format or float number of seconds + type: + type: nullable + underlying_type: + type: named + name: Duration + timeout: + description: Evaluation timeout + type: + type: nullable + underlying_type: + type: named + name: Duration + type: NdcPrometheusQueryLatencyAvg + uniqueness_constraints: {} + foreign_keys: {} - name: ndc_prometheus_query_total description: Total number of query requests arguments: