From 0e7f8b423140d3668b1973ef5ebe001965636c25 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 23 Oct 2024 20:51:26 -0700 Subject: [PATCH] use pdata for sql data --- .../pkg/monitors/sql/expressions.go | 24 +++-- .../pkg/monitors/sql/monitor.go | 12 +++ .../pkg/monitors/sql/querier.go | 91 ++++++++++++------- 3 files changed, 86 insertions(+), 41 deletions(-) diff --git a/internal/signalfx-agent/pkg/monitors/sql/expressions.go b/internal/signalfx-agent/pkg/monitors/sql/expressions.go index eb48d2adf3..b5f300e2bb 100644 --- a/internal/signalfx-agent/pkg/monitors/sql/expressions.go +++ b/internal/signalfx-agent/pkg/monitors/sql/expressions.go @@ -2,11 +2,8 @@ package sql import ( "database/sql" + "go.opentelemetry.io/collector/pdata/pmetric" "reflect" - - "github.com/signalfx/golib/v3/datapoint" - "github.com/signalfx/golib/v3/sfxclient" - "github.com/signalfx/signalfx-agent/pkg/utils" ) type ExprEnv map[string]interface{} @@ -80,11 +77,20 @@ func convertToFloatOrPanic(val interface{}) float64 { return rVal.Convert(floatType).Float() } -func (e ExprEnv) GAUGE(metric string, dims map[string]interface{}, val interface{}) *datapoint.Datapoint { - - return sfxclient.GaugeF(metric, utils.StringInterfaceMapToStringMap(dims), convertToFloatOrPanic(val)) +func (e ExprEnv) GAUGE(metric string, dims map[string]interface{}, val interface{}) pmetric.Metric { + m := pmetric.NewMetric() + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetDoubleValue(convertToFloatOrPanic(val)) + dp.Attributes().FromRaw(dims) + m.SetName(metric) + return m } -func (e ExprEnv) CUMULATIVE(metric string, dims map[string]interface{}, val interface{}) *datapoint.Datapoint { - return sfxclient.CumulativeF(metric, utils.StringInterfaceMapToStringMap(dims), convertToFloatOrPanic(val)) +func (e ExprEnv) CUMULATIVE(metric string, dims map[string]interface{}, val interface{}) pmetric.Metric { + m := pmetric.NewMetric() + dp := m.SetEmptySum().DataPoints().AppendEmpty() + dp.SetDoubleValue(convertToFloatOrPanic(val)) + dp.Attributes().FromRaw(dims) + m.SetName(metric) + return m } diff --git a/internal/signalfx-agent/pkg/monitors/sql/monitor.go b/internal/signalfx-agent/pkg/monitors/sql/monitor.go index f350a76c9a..3a8cfea70a 100644 --- a/internal/signalfx-agent/pkg/monitors/sql/monitor.go +++ b/internal/signalfx-agent/pkg/monitors/sql/monitor.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "go.opentelemetry.io/collector/pdata/pmetric" "strings" "time" @@ -75,6 +76,17 @@ func (m *Metric) NewDatapoint() *datapoint.Datapoint { return datapoint.New(m.MetricName, map[string]string{}, nil, typ, time.Time{}) } +func (m *Metric) NewMetric() pmetric.Metric { + met := pmetric.NewMetric() + if m.IsCumulative { + met.SetEmptySum() + } else { + met.SetEmptyGauge() + } + met.SetName(m.MetricName) + return met +} + // Config for this monitor type Config struct { config.MonitorConfig `yaml:",inline" acceptsEndpoints:"true"` diff --git a/internal/signalfx-agent/pkg/monitors/sql/querier.go b/internal/signalfx-agent/pkg/monitors/sql/querier.go index afb6fbe510..9b45240a0c 100644 --- a/internal/signalfx-agent/pkg/monitors/sql/querier.go +++ b/internal/signalfx-agent/pkg/monitors/sql/querier.go @@ -3,7 +3,9 @@ package sql import ( "context" "database/sql" + "errors" "fmt" + "go.opentelemetry.io/collector/pdata/pmetric" "reflect" "strings" "time" @@ -11,7 +13,6 @@ import ( "github.com/antonmedv/expr" "github.com/antonmedv/expr/vm" "github.com/davecgh/go-spew/spew" - "github.com/signalfx/golib/v3/datapoint" "github.com/sirupsen/logrus" "github.com/signalfx/signalfx-agent/pkg/monitors/types" @@ -95,13 +96,13 @@ func (q *querier) doQuery(ctx context.Context, database *sql.DB, output types.Ou return fmt.Errorf("error executing statement %s: %v", q.query.Query, err) } for rows.Next() { - dps, dims, err := q.convertCurrentRowToDatapointAndDimensions(rows) + metrics, dims, err := q.convertCurrentRowToDatapointAndDimensions(rows) if err != nil { rows.Close() return err } - output.SendDatapoints(dps...) + output.SendMetrics(metrics...) for i := range dims { for _, dim := range dims[i] { @@ -112,7 +113,7 @@ func (q *querier) doQuery(ctx context.Context, database *sql.DB, output types.Ou return rows.Close() } -func (q *querier) convertCurrentRowToDatapointAndDimensions(rows *sql.Rows) ([]*datapoint.Datapoint, [][]*types.Dimension, error) { +func (q *querier) convertCurrentRowToDatapointAndDimensions(rows *sql.Rows) ([]pmetric.Metric, [][]*types.Dimension, error) { rowSlice, err := q.getRowSlice(rows) if err != nil { return nil, nil, err @@ -128,31 +129,30 @@ func (q *querier) convertCurrentRowToDatapointAndDimensions(rows *sql.Rows) ([]* if q.logQueries { q.logger.Info("Got results %s", spew.Sdump(rowSlice)) } + m := make([]pmetric.Metric, 0, len(q.query.Metrics)+len(q.query.DatapointExpressions)) - dps := make([]*datapoint.Datapoint, 0, len(q.query.Metrics)+len(q.query.DatapointExpressions)) var dims [][]*types.Dimension if len(q.query.Metrics) > 0 { var err error - var structuredDPs []*datapoint.Datapoint - structuredDPs, dims, err = q.convertCurrentRowStructured(rowSlice, columnNames) + var structuredMetrics []pmetric.Metric + structuredMetrics, dims, err = q.convertCurrentRowStructured(rowSlice, columnNames) + m = append(m, structuredMetrics...) if err != nil { q.logger.WithError(err).Warn("Failed to convert row to datapoints and dimensions") } - - dps = append(dps, structuredDPs...) } if len(q.query.DatapointExpressions) > 0 { - exprDPs := q.convertCurrentRowExpressions(rowSlice, columnNames) - dps = append(dps, exprDPs...) + exprMetrics := q.convertCurrentRowExpressions(rowSlice, columnNames) + m = append(m, exprMetrics...) } - return dps, dims, nil + return m, dims, nil } -func (q *querier) convertCurrentRowExpressions(rowSlice []interface{}, columnNames []string) []*datapoint.Datapoint { - dps := make([]*datapoint.Datapoint, 0, len(q.compiledExprs)) +func (q *querier) convertCurrentRowExpressions(rowSlice []interface{}, columnNames []string) []pmetric.Metric { + result := make([]pmetric.Metric, 0, len(q.compiledExprs)) for _, compiled := range q.compiledExprs { env := newExprEnv(rowSlice, columnNames) @@ -176,18 +176,17 @@ func (q *querier) convertCurrentRowExpressions(rowSlice []interface{}, columnNam continue } - if v, ok := dp.(*datapoint.Datapoint); ok { - dps = append(dps, v) + if v, ok := dp.(pmetric.Metric); ok { + result = append(result, v) } else { - q.logger.WithField("expression", compiled.Source.Content()).WithField("result", dp).Warn("Result of expression is not a datapoint") + q.logger.WithField("expression", compiled.Source.Content()).WithField("result", dp).Warn("Result of expression is not a pmetric.Metric") continue } } - - return dps + return result } -func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnNames []string) ([]*datapoint.Datapoint, [][]*types.Dimension, error) { +func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnNames []string) ([]pmetric.Metric, [][]*types.Dimension, error) { // Clone all properties before updating them for i := range q.dimensions { for j := range q.dimensions[i] { @@ -202,11 +201,13 @@ func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnName } } - dps := make([]*datapoint.Datapoint, 0, len(q.query.Metrics)) - for i := range q.query.Metrics { - dps = append(dps, q.query.Metrics[i].NewDatapoint()) + exprMetrics := make([]pmetric.Metric, 0, len(q.query.Metrics)) + for _, m := range q.query.Metrics { + exprMetrics = append(exprMetrics, m.NewMetric()) } + var emptyValues []int + for i := range rowSlice { switch v := rowSlice[i].(type) { case *sql.NullFloat64: @@ -219,9 +220,16 @@ func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnName // This is a logical error in the code, not user input error panic("valueColumn was not properly mapped to metric") } - - dp := dps[q.metricToIndex[metric]] - dp.Value = datapoint.NewFloatValue(v.Float64) + q.query.Metrics[i].NewDatapoint() + dp := exprMetrics[q.metricToIndex[metric]] + switch dp.Type() { + case pmetric.MetricTypeSum: + dp.Sum().DataPoints().AppendEmpty().SetDoubleValue(v.Float64) + case pmetric.MetricTypeGauge: + dp.Gauge().DataPoints().AppendEmpty().SetDoubleValue(v.Float64) + default: + return nil, nil, errors.New("invalid metric type") + } case *sql.NullString: dimVal := v.String @@ -245,23 +253,42 @@ func (q *querier) convertCurrentRowStructured(rowSlice []interface{}, columnName if !q.dimensionColumnSets[j][strings.ToLower(columnNames[i])] { continue } - - dps[j].Dimensions[columnNames[i]] = dimVal + dp := exprMetrics[j] + switch dp.Type() { + case pmetric.MetricTypeSum: + dp.Sum().DataPoints().At(0).Attributes().PutStr(columnNames[i], dimVal) + case pmetric.MetricTypeGauge: + dp.Gauge().DataPoints().At(0).Attributes().PutStr(columnNames[i], dimVal) + default: + return nil, nil, errors.New("invalid metric type") + } } + default: + emptyValues = append(emptyValues, i) } } var n int - for i := range dps { - if dps[i].Value == nil { + for i := range exprMetrics { + dp := exprMetrics[i] + var empty bool + switch dp.Type() { + case pmetric.MetricTypeSum: + empty = dp.Sum().DataPoints().Len() == 0 + case pmetric.MetricTypeGauge: + empty = dp.Gauge().DataPoints().Len() == 0 + default: + empty = true + } + if empty { q.logger.Warnf("Metric %s's value column '%s' did not correspond to a value\nrowSlice: %s", q.query.Metrics[i].MetricName, q.query.Metrics[i].ValueColumn, spew.Sdump(rowSlice)) continue } - dps[n] = dps[i] + exprMetrics[n] = exprMetrics[i] n++ } - return dps[:n], q.dimensions, nil + return exprMetrics, q.dimensions, nil } func (q *querier) getRowSlice(rows *sql.Rows) ([]interface{}, error) {