diff --git a/cmd/geras/main.go b/cmd/geras/main.go index 7ba3129..23020bf 100644 --- a/cmd/geras/main.go +++ b/cmd/geras/main.go @@ -183,7 +183,11 @@ func main() { prometheus.DefaultRegisterer.MustRegister(version.NewCollector("geras")) // create openTSDBStore and expose its api on a grpc server - srv := store.NewOpenTSDBStore(logger, client, prometheus.DefaultRegisterer, *refreshInterval, storeLabels, allowedMetricNames, blockedMetricNames, *enableMetricSuggestions, *healthcheckMetric) + srv, err := store.NewOpenTSDBStore(logger, client, prometheus.DefaultRegisterer, *refreshInterval, storeLabels, allowedMetricNames, blockedMetricNames, *enableMetricSuggestions, *healthcheckMetric) + if err != nil { + level.Error(logger).Log("err", err) + os.Exit(1) + } grpcSrv := grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), diff --git a/go.mod b/go.mod index dd666d5..b8e4e7f 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/G-Research/geras require ( - github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7 + github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a github.com/go-kit/kit v0.9.0 github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117 github.com/pkg/errors v0.8.1 diff --git a/go.sum b/go.sum index 756693c..70bc8f6 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,10 @@ github.com/Azure/go-autorest v11.2.8+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSW github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7 h1:LCY7S5FtlqIWHXV0HBX4yGMelG7DCZX90gSPD1TrmqU= -github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g= +github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d h1:gsMpaf4Y8Zm70kWestox0+M6jmThzV0zXH78wYQVCuE= +github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g= +github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a h1:T3zoIUSc6BzpUMsiyw7afr3IiTrELVX4u+tkzk7RWbQ= +github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= diff --git a/pkg/store/store.go b/pkg/store/store.go index 366ff66..2792e09 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -38,9 +38,11 @@ type OpenTSDBStore struct { enableMetricSuggestions bool storeLabels []storepb.Label healthcheckMetric string + aggregateToDownsample map[storepb.Aggr]string + downsampleToAggregate map[string]storepb.Aggr } -func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prometheus.Registerer, interval time.Duration, storeLabels []storepb.Label, allowedMetricNames, blockedMetricNames *regexp.Regexp, enableMetricSuggestions bool, healthcheckMetric string) *OpenTSDBStore { +func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prometheus.Registerer, interval time.Duration, storeLabels []storepb.Label, allowedMetricNames, blockedMetricNames *regexp.Regexp, enableMetricSuggestions bool, healthcheckMetric string) (*OpenTSDBStore, error) { store := &OpenTSDBStore{ logger: log.With(logger, "component", "opentsdb"), openTSDBClient: client, @@ -52,8 +54,30 @@ func NewOpenTSDBStore(logger log.Logger, client opentsdb.ClientContext, reg prom blockedMetricNames: blockedMetricNames, healthcheckMetric: healthcheckMetric, } + err := store.populateMaps() + if err != nil { + return nil, err + } store.updateMetrics(context.Background(), logger) - return store + return store, nil +} + +func (store *OpenTSDBStore) populateMaps() error { + store.aggregateToDownsample = map[storepb.Aggr]string{ + storepb.Aggr_COUNT: "count", + storepb.Aggr_SUM: "sum", + storepb.Aggr_MIN: "min", + storepb.Aggr_MAX: "max", + storepb.Aggr_COUNTER: "avg", + } + store.downsampleToAggregate = map[string]storepb.Aggr{} + for a, d := range store.aggregateToDownsample { + if _, exists := store.downsampleToAggregate[d]; exists { + return errors.New(fmt.Sprintf("Invalid aggregate/downsample mapping - not reversible for downsample function %s", d)) + } + store.downsampleToAggregate[d] = a + } + return nil } type internalMetrics struct { @@ -209,7 +233,7 @@ func (store *OpenTSDBStore) Series( if respI.Error != nil { return respI.Error } - res, count, err := convertOpenTSDBResultsToSeriesResponse(respI) + res, count, err := convertOpenTSDBResultsToSeriesResponse(respI, store.downsampleToAggregate) if err != nil { return err } @@ -417,12 +441,67 @@ func (store *OpenTSDBStore) composeOpenTSDBQuery(req *storepb.SeriesRequest) (op return opentsdb.QueryParam{}, nil, nil } - subQueries := make([]opentsdb.SubQuery, len(metricNames)) + aggregationCount := 0 + needRawAggregation := true + var downsampleSecs int64 + if req.MaxResolutionWindow != 0 { + needRawAggregation = false + for _, agg := range req.Aggregates { + switch agg { + case storepb.Aggr_RAW: + needRawAggregation = true + break + case storepb.Aggr_COUNT: + fallthrough + case storepb.Aggr_SUM: + fallthrough + case storepb.Aggr_MIN: + fallthrough + case storepb.Aggr_MAX: + fallthrough + case storepb.Aggr_COUNTER: + aggregationCount++ + break + default: + level.Info(store.logger).Log("err", fmt.Sprintf("Unrecognised series aggregator: %v", agg)) + needRawAggregation = true + break + } + } + downsampleSecs = req.MaxResolutionWindow / 1000 + } + if needRawAggregation { + aggregationCount++ + } + subQueries := make([]opentsdb.SubQuery, len(metricNames)*aggregationCount) for i, mn := range metricNames { - subQueries[i] = opentsdb.SubQuery{ - Aggregator: "none", - Metric: mn, - Fiters: tagFilters, + aggregationIndex := 0 + if req.MaxResolutionWindow != 0 { + for _, agg := range req.Aggregates { + addAgg := true + var downsample string + if ds, exists := store.aggregateToDownsample[agg]; exists { + downsample = ds + } else { + addAgg = false + } + if addAgg { + subQueries[(i*aggregationCount)+aggregationIndex] = opentsdb.SubQuery{ + Aggregator: "none", + Downsample: fmt.Sprintf("%vs-%s", downsampleSecs, downsample), + Metric: mn, + Fiters: tagFilters, + } + aggregationIndex++ + } + } + } + if needRawAggregation { + subQueries[(i*aggregationCount)+aggregationIndex] = opentsdb.SubQuery{ + Aggregator: "none", + Metric: mn, + Fiters: tagFilters, + } } } query := opentsdb.QueryParam{ @@ -430,6 +509,7 @@ func (store *OpenTSDBStore) composeOpenTSDBQuery(req *storepb.SeriesRequest) (op End: req.MaxTime, Queries: subQueries, MsResolution: true, + ShowQuery: true, } level.Debug(store.logger).Log("tsdb-query", query.String()) return query, warnings, nil @@ -464,7 +544,7 @@ func (store *OpenTSDBStore) checkMetricNames(metricNames []string, fullBlock boo return allowed, warnings, nil } -func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*storepb.SeriesResponse, int, error) { +func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem, downsampleToAggregate map[string]storepb.Aggr) (*storepb.SeriesResponse, int, error) { seriesLabels := make([]storepb.Label, len(respI.Tags)) i := 0 for k, v := range respI.Tags { @@ -473,6 +553,17 @@ func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*sto } seriesLabels = append(seriesLabels, storepb.Label{Name: "__name__", Value: respI.Metric}) + downsampleFunction := "none" + if hyphenIndex := strings.Index(respI.Query.Downsample, "-"); hyphenIndex >= 0 { + downsampleFunction = respI.Query.Downsample[hyphenIndex+1:] + } + var aggregate storepb.Aggr + if v, exists := downsampleToAggregate[downsampleFunction]; exists { + aggregate = v + } else { + aggregate = storepb.Aggr_RAW + } + // Turn datapoints into chunks (Prometheus's tsdb encoding) dps := respI.GetDataPoints() chunks := []storepb.AggrChunk{} @@ -493,11 +584,28 @@ func convertOpenTSDBResultsToSeriesResponse(respI *opentsdb.QueryRespItem) (*sto } a.Append(int64(dp.Timestamp), dp.Value.(float64)) } - chunks = append(chunks, storepb.AggrChunk{ + chunk := &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()} + aggrChunk := storepb.AggrChunk{ MinTime: minTime, MaxTime: int64(dps[i-1].Timestamp), - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - }) + } + switch aggregate { + case storepb.Aggr_COUNT: + aggrChunk.Count = chunk + case storepb.Aggr_SUM: + aggrChunk.Sum = chunk + case storepb.Aggr_MIN: + aggrChunk.Min = chunk + case storepb.Aggr_MAX: + aggrChunk.Max = chunk + case storepb.Aggr_COUNTER: + aggrChunk.Counter = chunk + case storepb.Aggr_RAW: + fallthrough + default: + aggrChunk.Raw = chunk + } + chunks = append(chunks, aggrChunk) } return storepb.NewSeriesResponse(&storepb.Series{ Labels: seriesLabels, diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index 926ec24..b1f8e54 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -38,7 +38,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_SUM}, PartialResponseDisabled: false, }, @@ -72,7 +72,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MAX}, PartialResponseDisabled: false, }, @@ -105,7 +105,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric2", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -145,7 +145,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric2", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -185,7 +185,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric2", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -224,7 +224,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: ".*", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -241,7 +241,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric2", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -268,7 +268,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "v", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -313,7 +313,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric2", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -357,7 +357,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "v", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -402,7 +402,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "test.metric", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -442,7 +442,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "v", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, PartialResponseDisabled: false, }, @@ -502,7 +502,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: `(other|test)\.metric`, }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_SUM}, PartialResponseDisabled: false, }, @@ -530,7 +530,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "up", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_SUM}, PartialResponseDisabled: false, }, @@ -550,7 +550,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "bad.metric", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_SUM}, PartialResponseDisabled: false, }, @@ -569,12 +569,231 @@ func TestComposeOpenTSDBQuery(t *testing.T) { Value: "bad:metric", }, }, - MaxResolutionWindow: 5, + MaxResolutionWindow: 0, Aggregates: []storepb.Aggr{storepb.Aggr_SUM}, PartialResponseDisabled: false, }, err: errors.New(`Metric "bad.metric" is blocked on Geras`), }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 60000, + Aggregates: []storepb.Aggr{storepb.Aggr_RAW}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 60000, + Aggregates: []storepb.Aggr{storepb.Aggr_COUNT}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Downsample: "60s-count", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 120000, + Aggregates: []storepb.Aggr{storepb.Aggr_MAX}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Downsample: "120s-max", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 3600000, + Aggregates: []storepb.Aggr{storepb.Aggr_MIN}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Downsample: "3600s-min", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 15000, + Aggregates: []storepb.Aggr{storepb.Aggr_SUM}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Downsample: "15s-sum", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 60000, + Aggregates: []storepb.Aggr{storepb.Aggr_COUNTER}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Downsample: "60s-avg", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, + { + req: storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_EQ, + Name: "__name__", + Value: "test.metric", + }, + }, + MaxResolutionWindow: 60000, + Aggregates: []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM, storepb.Aggr_MIN, storepb.Aggr_MAX, storepb.Aggr_COUNTER}, + PartialResponseDisabled: false, + }, + tsdbQ: &opentsdb.QueryParam{ + Start: 0, + End: 100, + Queries: []opentsdb.SubQuery{ + { + Aggregator: "none", + Downsample: "60s-count", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + { + Aggregator: "none", + Downsample: "60s-sum", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + { + Aggregator: "none", + Downsample: "60s-min", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + { + Aggregator: "none", + Downsample: "60s-max", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + { + Aggregator: "none", + Downsample: "60s-avg", + Metric: "test.metric", + Fiters: []opentsdb.Filter{}, + }, + }, + }, + }, } for i, test := range testCases { @@ -588,6 +807,7 @@ func TestComposeOpenTSDBQuery(t *testing.T) { allowedMetricNames: allowedMetrics, blockedMetricNames: test.blockedMetrics, } + store.populateMaps() p, _, err := store.composeOpenTSDBQuery(&test.req) if test.err != nil { @@ -669,8 +889,9 @@ func newDps(in map[string]interface{}) (out opentsdb.DataPoints) { func TestConvertOpenTSDBResultsToSeriesResponse(t *testing.T) { testCases := []struct { - input opentsdb.QueryRespItem - expectedOutput *storepb.SeriesResponse + input opentsdb.QueryRespItem + expectedOutput *storepb.SeriesResponse + expectedChunkTypes []storepb.Aggr }{ { input: opentsdb.QueryRespItem{ @@ -682,6 +903,7 @@ func TestConvertOpenTSDBResultsToSeriesResponse(t *testing.T) { Labels: []storepb.Label{{Name: "__name__", Value: "metric"}}, Chunks: []storepb.AggrChunk{}, }), + expectedChunkTypes: []storepb.Aggr{}, }, { input: opentsdb.QueryRespItem{ @@ -697,6 +919,7 @@ func TestConvertOpenTSDBResultsToSeriesResponse(t *testing.T) { Labels: []storepb.Label{{Name: "__name__", Value: "metric"}, {Name: "a", Value: "b"}}, Chunks: []storepb.AggrChunk{{MinTime: 1, MaxTime: 3}}, }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_RAW}, }, { input: opentsdb.QueryRespItem{ @@ -715,10 +938,147 @@ func TestConvertOpenTSDBResultsToSeriesResponse(t *testing.T) { {Name: "a", Value: "b"}}, Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_RAW}, + }, + { + input: opentsdb.QueryRespItem{ + Metric: "metric1", + Tags: map[string]string{}, + Dps: newDps(map[string]interface{}{ + "10": 1.0, + "12": 1.5, + "13": 2.0, + }), + Query: opentsdb.SubQuery{ + Aggregator: "none", + Metric: "metric", + Rate: false, + }, + }, + expectedOutput: storepb.NewSeriesResponse(&storepb.Series{ + Labels: []storepb.Label{{Name: "__name__", Value: "metric1"}}, + Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, + }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_RAW}, + }, + { + input: opentsdb.QueryRespItem{ + Metric: "metric1", + Tags: map[string]string{}, + Dps: newDps(map[string]interface{}{ + "10": 1.0, + "12": 1.5, + "13": 2.0, + }), + Query: opentsdb.SubQuery{ + Aggregator: "none", + Metric: "metric", + Rate: false, + Downsample: "60s-count", + }, + }, + expectedOutput: storepb.NewSeriesResponse(&storepb.Series{ + Labels: []storepb.Label{{Name: "__name__", Value: "metric1"}}, + Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, + }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_COUNT}, + }, + { + input: opentsdb.QueryRespItem{ + Metric: "metric1", + Tags: map[string]string{}, + Dps: newDps(map[string]interface{}{ + "10": 1.0, + "12": 1.5, + "13": 2.0, + }), + Query: opentsdb.SubQuery{ + Aggregator: "none", + Metric: "metric", + Rate: false, + Downsample: "60s-sum", + }, + }, + expectedOutput: storepb.NewSeriesResponse(&storepb.Series{ + Labels: []storepb.Label{{Name: "__name__", Value: "metric1"}}, + Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, + }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_SUM}, + }, + { + input: opentsdb.QueryRespItem{ + Metric: "metric1", + Tags: map[string]string{}, + Dps: newDps(map[string]interface{}{ + "10": 1.0, + "12": 1.5, + "13": 2.0, + }), + Query: opentsdb.SubQuery{ + Aggregator: "none", + Metric: "metric", + Rate: false, + Downsample: "60s-min", + }, + }, + expectedOutput: storepb.NewSeriesResponse(&storepb.Series{ + Labels: []storepb.Label{{Name: "__name__", Value: "metric1"}}, + Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, + }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_MIN}, + }, + { + input: opentsdb.QueryRespItem{ + Metric: "metric1", + Tags: map[string]string{}, + Dps: newDps(map[string]interface{}{ + "10": 1.0, + "12": 1.5, + "13": 2.0, + }), + Query: opentsdb.SubQuery{ + Aggregator: "none", + Metric: "metric", + Rate: false, + Downsample: "60s-max", + }, + }, + expectedOutput: storepb.NewSeriesResponse(&storepb.Series{ + Labels: []storepb.Label{{Name: "__name__", Value: "metric1"}}, + Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, + }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_MAX}, + }, + { + input: opentsdb.QueryRespItem{ + Metric: "metric1", + Tags: map[string]string{}, + Dps: newDps(map[string]interface{}{ + "10": 1.0, + "12": 1.5, + "13": 2.0, + }), + Query: opentsdb.SubQuery{ + Aggregator: "none", + Metric: "metric", + Rate: false, + Downsample: "60s-avg", + }, + }, + expectedOutput: storepb.NewSeriesResponse(&storepb.Series{ + Labels: []storepb.Label{{Name: "__name__", Value: "metric1"}}, + Chunks: []storepb.AggrChunk{{MinTime: 10, MaxTime: 13}}, + }), + expectedChunkTypes: []storepb.Aggr{storepb.Aggr_COUNTER}, }, } for _, test := range testCases { - converted, _, err := convertOpenTSDBResultsToSeriesResponse(&test.input) + store := OpenTSDBStore{} + err := store.populateMaps() + if err != nil { + t.Errorf("unexpected error: %s", err.Error()) + } + converted, _, err := convertOpenTSDBResultsToSeriesResponse(&test.input, store.downsampleToAggregate) if err != nil { t.Errorf("unexpected error: %s", err.Error()) } @@ -739,12 +1099,48 @@ func TestConvertOpenTSDBResultsToSeriesResponse(t *testing.T) { t.Error("number of chunks does not match") } for ci, chunk := range test.expectedOutput.GetSeries().Chunks { - if chunk.MinTime != converted.GetSeries().Chunks[ci].MinTime { + convertedChunk := converted.GetSeries().Chunks[ci] + if chunk.MinTime != convertedChunk.MinTime { t.Errorf("chunk %d min time is not the expected: %d", ci, chunk.MinTime) } - if chunk.MaxTime != converted.GetSeries().Chunks[ci].MaxTime { + if chunk.MaxTime != convertedChunk.MaxTime { t.Errorf("chunk %d max time is not the expected: %d != %d ", ci, chunk.MaxTime, converted.GetSeries().Chunks[ci].MaxTime) } + expectedChunkType := test.expectedChunkTypes[ci] + switch expectedChunkType { + case storepb.Aggr_RAW: + if convertedChunk.Raw == nil { + t.Errorf("chunk %d raw content not set", ci) + } + break + case storepb.Aggr_COUNT: + if convertedChunk.Count == nil { + t.Errorf("chunk %d raw content not set", ci) + } + break + case storepb.Aggr_SUM: + if convertedChunk.Sum == nil { + t.Errorf("chunk %d raw content not set", ci) + } + break + case storepb.Aggr_MIN: + if convertedChunk.Min == nil { + t.Errorf("chunk %d raw content not set", ci) + } + break + case storepb.Aggr_MAX: + if convertedChunk.Max == nil { + t.Errorf("chunk %d raw content not set", ci) + } + break + case storepb.Aggr_COUNTER: + if convertedChunk.Counter == nil { + t.Errorf("chunk %d raw content not set", ci) + } + break + default: + t.Errorf("Unknown chunk type %d expected for chunk %d", expectedChunkType, ci) + } } } } @@ -817,18 +1213,19 @@ func TestGetMatchingMetricNames(t *testing.T) { } for _, test := range testCases { - store := &OpenTSDBStore{} - store.metricNames = []string{ - "cpu.idle", - "cpu.irq", - "cpu.nice", - "cpu.sys", - "cpu.usr", - "tsd.rpc.errors", - "tsd.rpc.exceptions", - "tsd.rpc.forbidden", - "tsd.rpc.received", - "tsd.rpc.unauthorized", + store := &OpenTSDBStore{ + metricNames: []string{ + "cpu.idle", + "cpu.irq", + "cpu.nice", + "cpu.sys", + "cpu.usr", + "tsd.rpc.errors", + "tsd.rpc.exceptions", + "tsd.rpc.forbidden", + "tsd.rpc.received", + "tsd.rpc.unauthorized", + }, } output, err := store.getMatchingMetricNames(test.input) diff --git a/test/faketsdb/go.mod b/test/faketsdb/go.mod index 9f6f1d4..14cf6db 100644 --- a/test/faketsdb/go.mod +++ b/test/faketsdb/go.mod @@ -2,4 +2,4 @@ module github.com/G-Research/geras/test/faketsdb go 1.12 -require github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7 // indirect +require github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a // indirect diff --git a/test/faketsdb/go.sum b/test/faketsdb/go.sum index 3858205..6b957b2 100644 --- a/test/faketsdb/go.sum +++ b/test/faketsdb/go.sum @@ -1,2 +1,5 @@ github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7 h1:LCY7S5FtlqIWHXV0HBX4yGMelG7DCZX90gSPD1TrmqU= github.com/G-Research/opentsdb-goclient v0.0.0-20191028155047-1a0d357f6ca7/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g= +github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-9a5d3f5d556d/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g= +github.com/G-Research/opentsdb-goclient v0.0.0-20191210204552-ccb48600721a/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g= +github.com/G-Research/opentsdb-goclient v0.0.0-20191219200136-ccb48600721a/go.mod h1:oVqXUYMDThF8Uz8WH3f80BlB7+Y5BV4wDtqQ06e4m7g=