diff --git a/docs/metrics-data-model.md b/docs/metrics-data-model.md new file mode 100644 index 0000000..17e60fa --- /dev/null +++ b/docs/metrics-data-model.md @@ -0,0 +1,125 @@ +# k6 Time Series + +We want to introduce the time series concept to k6 for getting the benefit of efficiently identifying the combination between a metric and the relative tags. + +A summary of the benefits it brings across the platform: + +* It allows executing faster lookups and comparisons by a single value, the ID (hash) of the series. +* It reduces the storage by allocating just one time a possible combination (the time series) and referencing it in all other places. +* It allows aggregating data by time series. +* It enables easier detection of high cardinality issues just by counting the entire unique data set of generated time series. +* It enables time series based outputs to simplify the integration's logic (e.g. Prometheus). + +## Data model + +#### Metrics and Series + +* Metric (name, type) +* Time Series (metric, tags) +* Sample (timestamp, value, time series ref) +* Tag - (aka [k6/1831](https://github.com/grafana/k6/issues/1831)) + +```go +// This is the current k6 Metric polished from the Thresholds dependencies. +type Metric struct { + name string + type MetricType + contains ValueType +} + +type Tag struct { + Key, Value string +} + +type TimeSeries struct { + // hash is a 64bit hash generated + // hashing the metric name and the tags set. + // The tag set must be sorted to guarantee the hashing consistency. + // The value must be consistent across distributed instances. + hash uint64 + + // sequence is the unique and sequential number in the current instance. + // It is used for storing multiple time series in a slice + // for getting fast access by slice's indexing. + sequence uint32 + + // metricName is the name of the related metric. + metricName string + + // tags is the sorted list of tags. + tags []Tag + + // tagIndex provides a resolver by single Tag. + // It links for each Tag's key its related index in the tags slice. + tagIndex map[string]int +} + +type Sample struct { + TimeSeries *TimeSeries + Timestamp uint64 + Value float64 +} +``` + +#### Hash + +The time series Hash are `uint64` values generated by hashing the metric name and the tags' sorted key-value pairs. + +#### Sink + +The sinks are implemented by metric types and they keep values up to date by time series and/or aggregated views: + +* Counter: a monotonic increasing value +* Gauge: the latest value +* Rate: the percentage of non-zero values across the total of the events +* SparseHistogram: counters per dynamic ranges (buckets) - https://grafana.com/blog/2021/11/03/how-sparse-histograms-can-improve-efficiency-precision-and-mergeability-in-prometheus-tsdb + +## Storage + +#### Time Series database (aka tsdb) + +We need to store all the time series generated from `k6` run during the execution so the other components (mostly the outputs) could query the storage for getting the value of the time series. It's expected to be in-memory and it should be concurrent-safe. + +It can extend the current `metrics.Registry` struct adding the following function: + +```go +type Registry struct { + ... + GetOrCreateSeries(metric *Metric, tags []Tag) *TimeSeries +} +``` + +## Samples generation + +The current sampling process is controlled by the `metrics.PushIfNotDone` method. All the actual callers should resolve the time series from the storage before push a new Sample, or in the event no one is found the storage will create and insert a new one. +It requires the dependency from the time series database for all the callers (e.g. executors, JavaScript modules). Most of them already have the metrics.Registry dependency. + +## metrics.Ingester + +Ingester is responsible for resolving the entire set of Sinks impacted from the ingested time series then it adds the Sample's value to the resolved Sinks. + +##### Example + +In a `t0` where the status of the seen time series is: + +```text +http_req_duration{status:200,method:GET} +http_req_duration{method:GET} +http_req_duration{status:200} +http_req_another_one{status:200} +``` + +The Ingester in the case of a collected Sample `http_req_duration{status:200,method:GET}` then it resolves the dependencies with the other seen time series in a unique set where it contains the following time series `http_req_duration{status:200}` and `http_req_duration{method:GET}`. It can now resolve the relative Metric's Sinks and it invokes them passing the Sample's value. + +## Known issues + +* Name and URL tags: `k6` is tagging HTTP requests with the URL. It will create a high cardinality issue for the time series data model. This should be fixed by adding the possibility to not store all the tags as indexable, having the availability to set them as `metadata` and exclude them from the time series generation. An alternative workaround could be to exclude them from the default set of enabled tags. +* We need to keep all the data for the Trend type for computing the percentiles. We plan to migrate to some form of approximation (Sparse Histogram, OpenHistogram, T-Digest, etc..) + +## Acceptance criteria + +- [ ] Can the new model enable the Prometheus output integration? +- [ ] Can the new model enable cloud aggregation? +- [ ] Can the new model work with Thresholds? +- [ ] Is the memory footprint generated from the new model reduced? If not, is it acceptable? +- [ ] Is the CPUs usage generated from the new model reduced? If not, is it acceptable? diff --git a/go.mod b/go.mod index 84448f2..4743376 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module github.com/grafana/xk6-output-prometheus-remote go 1.17 require ( + github.com/cespare/xxhash/v2 v2.1.2 github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 github.com/kubernetes/helm v2.17.0+incompatible + github.com/openhistogram/circonusllhist v0.3.0 github.com/prometheus/common v0.32.1 github.com/prometheus/prometheus v1.8.2-0.20211005150130-f29caccc4255 github.com/sirupsen/logrus v1.8.1 @@ -18,7 +20,6 @@ require ( github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect github.com/aws/aws-sdk-go v1.40.37 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dennwc/varint v1.0.0 // indirect @@ -28,6 +29,7 @@ require ( github.com/go-kit/log v0.1.0 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -52,7 +54,7 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.27.1 // indirect + google.golang.org/protobuf v1.28.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index bcfc233..6844760 100644 --- a/go.sum +++ b/go.sum @@ -681,8 +681,9 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -1055,6 +1056,8 @@ github.com/opencontainers/runtime-spec v1.0.3-0.20200929063507-e6143ca7d51d/go.m github.com/opencontainers/runtime-tools v0.0.0-20181011054405-1d69bd0f9c39/go.mod h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs= github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqiriPsEqVhc+svHE= github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo= +github.com/openhistogram/circonusllhist v0.3.0 h1:CuEawy94hKEzjhSABdqkGirl6o67QrqtRoZg3CXBn6k= +github.com/openhistogram/circonusllhist v0.3.0/go.mod h1:PfeYJ/RW2+Jfv3wTz0upbY2TRour/LLqIm2K2Kw5zg0= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w= github.com/opentracing-contrib/go-stdlib v1.0.0 h1:TBS7YuVotp8myLon4Pv7BtCBzOTo1DeZCld0Z63mW2w= @@ -1910,8 +1913,9 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/remotewrite/config.go b/pkg/remotewrite/config.go index 720af82..2edadca 100644 --- a/pkg/remotewrite/config.go +++ b/pkg/remotewrite/config.go @@ -22,9 +22,7 @@ const ( ) type Config struct { - Mapping null.String `json:"mapping" envconfig:"K6_PROMETHEUS_MAPPING"` - - Url null.String `json:"url" envconfig:"K6_PROMETHEUS_REMOTE_URL"` // here, in the name of env variable, we assume that we won't need to distinguish between remote write URL vs remote read URL + URL null.String `json:"url" envconfig:"K6_PROMETHEUS_REMOTE_URL"` // here, in the name of env variable, we assume that we won't need to distinguish between remote write URL vs remote read URL Headers map[string]string `json:"headers" envconfig:"K6_PROMETHEUS_HEADERS"` @@ -38,13 +36,12 @@ type Config struct { KeepTags null.Bool `json:"keepTags" envconfig:"K6_KEEP_TAGS"` KeepNameTag null.Bool `json:"keepNameTag" envconfig:"K6_KEEP_NAME_TAG"` - KeepUrlTag null.Bool `json:"keepUrlTag" envconfig:"K6_KEEP_URL_TAG"` + KeepURLTag null.Bool `json:"keepUrlTag" envconfig:"K6_KEEP_URL_TAG"` } func NewConfig() Config { return Config{ - Mapping: null.StringFrom("prometheus"), - Url: null.StringFrom("http://localhost:9090/api/v1/write"), + URL: null.StringFrom("http://localhost:9090/api/v1/write"), InsecureSkipTLSVerify: null.BoolFrom(true), CACert: null.NewString("", false), User: null.NewString("", false), @@ -52,7 +49,7 @@ func NewConfig() Config { FlushPeriod: types.NullDurationFrom(defaultFlushPeriod), KeepTags: null.BoolFrom(true), KeepNameTag: null.BoolFrom(false), - KeepUrlTag: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), Headers: make(map[string]string), } } @@ -79,7 +76,7 @@ func (conf Config) ConstructRemoteConfig() (*remote.ClientConfig, error) { // TODO: consider if the auth logic should be enforced here // (e.g. if insecureSkipTLSVerify is switched off, then check for non-empty certificate file and auth, etc.) - u, err := url.Parse(conf.Url.String) + u, err := url.Parse(conf.URL.String) if err != nil { return nil, err } @@ -97,12 +94,8 @@ func (conf Config) ConstructRemoteConfig() (*remote.ClientConfig, error) { // From here till the end of the file partial duplicates waiting for config refactor (k6 #883) func (base Config) Apply(applied Config) Config { - if applied.Mapping.Valid { - base.Mapping = applied.Mapping - } - - if applied.Url.Valid { - base.Url = applied.Url + if applied.URL.Valid { + base.URL = applied.URL } if applied.InsecureSkipTLSVerify.Valid { @@ -133,8 +126,8 @@ func (base Config) Apply(applied Config) Config { base.KeepNameTag = applied.KeepNameTag } - if applied.KeepUrlTag.Valid { - base.KeepUrlTag = applied.KeepUrlTag + if applied.KeepURLTag.Valid { + base.KeepURLTag = applied.KeepURLTag } if len(applied.Headers) > 0 { @@ -154,12 +147,8 @@ func ParseArg(arg string) (Config, error) { return c, err } - if v, ok := params["mapping"].(string); ok { - c.Mapping = null.StringFrom(v) - } - if v, ok := params["url"].(string); ok { - c.Url = null.StringFrom(v) + c.URL = null.StringFrom(v) } if v, ok := params["insecureSkipTLSVerify"].(bool); ok { @@ -193,7 +182,7 @@ func ParseArg(arg string) (Config, error) { } if v, ok := params["keepUrlTag"].(bool); ok { - c.KeepUrlTag = null.BoolFrom(v) + c.KeepURLTag = null.BoolFrom(v) } c.Headers = make(map[string]string) @@ -249,12 +238,8 @@ func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, a } } - if mapping, mappingDefined := env["K6_PROMETHEUS_MAPPING"]; mappingDefined { - result.Mapping = null.StringFrom(mapping) - } - if url, urlDefined := env["K6_PROMETHEUS_REMOTE_URL"]; urlDefined { - result.Url = null.StringFrom(url) + result.URL = null.StringFrom(url) } if b, err := getEnvBool(env, "K6_PROMETHEUS_INSECURE_SKIP_TLS_VERIFY"); err != nil { @@ -298,7 +283,7 @@ func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, a return result, err } else { if b.Valid { - result.KeepUrlTag = b + result.KeepURLTag = b } } diff --git a/pkg/remotewrite/config_test.go b/pkg/remotewrite/config_test.go index af4ddf0..98235e3 100644 --- a/pkg/remotewrite/config_test.go +++ b/pkg/remotewrite/config_test.go @@ -19,7 +19,7 @@ func TestApply(t *testing.T) { t.Parallel() fullConfig := Config{ - Url: null.StringFrom("some-url"), + URL: null.StringFrom("some-url"), InsecureSkipTLSVerify: null.BoolFrom(false), CACert: null.StringFrom("some-file"), User: null.StringFrom("user"), @@ -33,7 +33,7 @@ func TestApply(t *testing.T) { // Defaults should be overwritten by valid values c := NewConfig() c = c.Apply(fullConfig) - assert.Equal(t, fullConfig.Url, c.Url) + assert.Equal(t, fullConfig.URL, c.URL) assert.Equal(t, fullConfig.InsecureSkipTLSVerify, c.InsecureSkipTLSVerify) assert.Equal(t, fullConfig.CACert, c.CACert) assert.Equal(t, fullConfig.User, c.User) @@ -58,21 +58,21 @@ func TestConfigParseArg(t *testing.T) { c, err := ParseArg("url=http://prometheus.remote:3412/write") assert.Nil(t, err) - assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.Url) + assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.URL) c, err = ParseArg("url=http://prometheus.remote:3412/write,insecureSkipTLSVerify=false") assert.Nil(t, err) - assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.Url) + assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.URL) assert.Equal(t, null.BoolFrom(false), c.InsecureSkipTLSVerify) c, err = ParseArg("url=https://prometheus.remote:3412/write,caCertFile=f.crt") assert.Nil(t, err) - assert.Equal(t, null.StringFrom("https://prometheus.remote:3412/write"), c.Url) + assert.Equal(t, null.StringFrom("https://prometheus.remote:3412/write"), c.URL) assert.Equal(t, null.StringFrom("f.crt"), c.CACert) c, err = ParseArg("url=https://prometheus.remote:3412/write,insecureSkipTLSVerify=false,caCertFile=f.crt,user=user,password=pass") assert.Nil(t, err) - assert.Equal(t, null.StringFrom("https://prometheus.remote:3412/write"), c.Url) + assert.Equal(t, null.StringFrom("https://prometheus.remote:3412/write"), c.URL) assert.Equal(t, null.BoolFrom(false), c.InsecureSkipTLSVerify) assert.Equal(t, null.StringFrom("f.crt"), c.CACert) assert.Equal(t, null.StringFrom("user"), c.User) @@ -80,12 +80,12 @@ func TestConfigParseArg(t *testing.T) { c, err = ParseArg("url=http://prometheus.remote:3412/write,flushPeriod=2s") assert.Nil(t, err) - assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.Url) + assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.URL) assert.Equal(t, types.NullDurationFrom(time.Second*2), c.FlushPeriod) c, err = ParseArg("url=http://prometheus.remote:3412/write,headers.X-Header=value") assert.Nil(t, err) - assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.Url) + assert.Equal(t, null.StringFrom("http://prometheus.remote:3412/write"), c.URL) assert.Equal(t, map[string]string{"X-Header": "value"}, c.Headers) } @@ -108,8 +108,7 @@ func TestConstructRemoteConfig(t *testing.T) { env: nil, arg: "", config: Config{ - Mapping: null.StringFrom("prometheus"), - Url: null.StringFrom(u.String()), + URL: null.StringFrom(u.String()), InsecureSkipTLSVerify: null.BoolFrom(true), CACert: null.NewString("", false), User: null.NewString("", false), @@ -117,7 +116,7 @@ func TestConstructRemoteConfig(t *testing.T) { FlushPeriod: types.NullDurationFrom(defaultFlushPeriod), KeepTags: null.BoolFrom(true), KeepNameTag: null.BoolFrom(false), - KeepUrlTag: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), Headers: make(map[string]string), }, errString: "", @@ -138,8 +137,7 @@ func TestConstructRemoteConfig(t *testing.T) { env: map[string]string{"K6_PROMETHEUS_INSECURE_SKIP_TLS_VERIFY": "false", "K6_PROMETHEUS_USER": "u"}, arg: "user=user", config: Config{ - Mapping: null.StringFrom("raw"), - Url: null.StringFrom(u.String()), + URL: null.StringFrom(u.String()), InsecureSkipTLSVerify: null.BoolFrom(false), CACert: null.NewString("", false), User: null.NewString("user", true), @@ -147,7 +145,7 @@ func TestConstructRemoteConfig(t *testing.T) { FlushPeriod: types.NullDurationFrom(defaultFlushPeriod), KeepTags: null.BoolFrom(true), KeepNameTag: null.BoolFrom(false), - KeepUrlTag: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), Headers: make(map[string]string), }, errString: "", @@ -187,8 +185,7 @@ func TestConstructRemoteConfig(t *testing.T) { env: nil, arg: "", config: Config{ - Mapping: null.NewString("mapping", true), - Url: null.StringFrom(u.String()), + URL: null.StringFrom(u.String()), InsecureSkipTLSVerify: null.BoolFrom(true), CACert: null.NewString("", false), User: null.NewString("", false), @@ -196,7 +193,7 @@ func TestConstructRemoteConfig(t *testing.T) { FlushPeriod: types.NullDurationFrom(defaultFlushPeriod), KeepTags: null.BoolFrom(true), KeepNameTag: null.BoolFrom(false), - KeepUrlTag: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), Headers: map[string]string{ "X-Header": "value", }, @@ -224,8 +221,7 @@ func TestConstructRemoteConfig(t *testing.T) { }, arg: "", config: Config{ - Mapping: null.NewString("mapping", true), - Url: null.StringFrom(u.String()), + URL: null.StringFrom(u.String()), InsecureSkipTLSVerify: null.BoolFrom(true), CACert: null.NewString("", false), User: null.NewString("", false), @@ -233,7 +229,7 @@ func TestConstructRemoteConfig(t *testing.T) { FlushPeriod: types.NullDurationFrom(defaultFlushPeriod), KeepTags: null.BoolFrom(true), KeepNameTag: null.BoolFrom(false), - KeepUrlTag: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), Headers: map[string]string{ "X-Header": "value_from_env", }, @@ -261,8 +257,7 @@ func TestConstructRemoteConfig(t *testing.T) { }, arg: "headers.X-Header=value_from_arg", config: Config{ - Mapping: null.NewString("mapping", true), - Url: null.StringFrom(u.String()), + URL: null.StringFrom(u.String()), InsecureSkipTLSVerify: null.BoolFrom(true), CACert: null.NewString("", false), User: null.NewString("", false), @@ -270,7 +265,7 @@ func TestConstructRemoteConfig(t *testing.T) { FlushPeriod: types.NullDurationFrom(defaultFlushPeriod), KeepTags: null.BoolFrom(true), KeepNameTag: null.BoolFrom(false), - KeepUrlTag: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), Headers: map[string]string{ "X-Header": "value_from_arg", }, @@ -311,8 +306,7 @@ func TestConstructRemoteConfig(t *testing.T) { } func assertConfig(t *testing.T, actual, expected Config) { - assert.Equal(t, expected.Mapping, actual.Mapping) - assert.Equal(t, expected.Url, actual.Url) + assert.Equal(t, expected.URL, actual.URL) assert.Equal(t, expected.InsecureSkipTLSVerify, actual.InsecureSkipTLSVerify) assert.Equal(t, expected.CACert, actual.CACert) assert.Equal(t, expected.User, actual.User) @@ -320,7 +314,7 @@ func assertConfig(t *testing.T, actual, expected Config) { assert.Equal(t, expected.FlushPeriod, actual.FlushPeriod) assert.Equal(t, expected.KeepTags, actual.KeepTags) assert.Equal(t, expected.KeepNameTag, expected.KeepNameTag) - assert.Equal(t, expected.KeepUrlTag, expected.KeepUrlTag) + assert.Equal(t, expected.KeepURLTag, expected.KeepURLTag) assert.Equal(t, expected.Headers, actual.Headers) } diff --git a/pkg/remotewrite/labels.go b/pkg/remotewrite/labels.go deleted file mode 100644 index e545cce..0000000 --- a/pkg/remotewrite/labels.go +++ /dev/null @@ -1,38 +0,0 @@ -package remotewrite - -import ( - "github.com/prometheus/prometheus/prompb" - "go.k6.io/k6/metrics" -) - -func tagsToLabels(tags *metrics.SampleTags, config Config) ([]prompb.Label, error) { - if !config.KeepTags.Bool { - return []prompb.Label{}, nil - } - - tagsMap := tags.CloneTags() - labelPairs := make([]prompb.Label, 0, len(tagsMap)) - - for name, value := range tagsMap { - if len(name) < 1 || len(value) < 1 { - continue - } - - if !config.KeepNameTag.Bool && name == "name" { - continue - } - - if !config.KeepUrlTag.Bool && name == "url" { - continue - } - - labelPairs = append(labelPairs, prompb.Label{ - Name: name, - Value: value, - }) - } - - // names of the metrics might be remote agent dependent so let Mapping set those - - return labelPairs[:len(labelPairs):len(labelPairs)], nil -} diff --git a/pkg/remotewrite/labels_test.go b/pkg/remotewrite/labels_test.go deleted file mode 100644 index c0e4c72..0000000 --- a/pkg/remotewrite/labels_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package remotewrite - -import ( - "fmt" - "testing" - - "github.com/prometheus/prometheus/prompb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.k6.io/k6/metrics" - "gopkg.in/guregu/null.v3" -) - -func TestTagsToLabels(t *testing.T) { - t.Parallel() - - testCases := map[string]struct { - tags *metrics.SampleTags - config Config - labels []prompb.Label - }{ - "empty-tags": { - tags: &metrics.SampleTags{}, - config: Config{ - KeepTags: null.BoolFrom(true), - KeepNameTag: null.BoolFrom(false), - }, - labels: []prompb.Label{}, - }, - "name-tag-discard": { - tags: metrics.NewSampleTags(map[string]string{"foo": "bar", "name": "nnn"}), - config: Config{ - KeepTags: null.BoolFrom(true), - KeepNameTag: null.BoolFrom(false), - }, - labels: []prompb.Label{ - {Name: "foo", Value: "bar"}, - }, - }, - "name-tag-keep": { - tags: metrics.NewSampleTags(map[string]string{"foo": "bar", "name": "nnn"}), - config: Config{ - KeepTags: null.BoolFrom(true), - KeepNameTag: null.BoolFrom(true), - }, - labels: []prompb.Label{ - {Name: "foo", Value: "bar"}, - {Name: "name", Value: "nnn"}, - }, - }, - "url-tag-discard": { - tags: metrics.NewSampleTags(map[string]string{"foo": "bar", "url": "uuu"}), - config: Config{ - KeepTags: null.BoolFrom(true), - KeepUrlTag: null.BoolFrom(false), - }, - labels: []prompb.Label{ - {Name: "foo", Value: "bar"}, - }, - }, - "url-tag-keep": { - tags: metrics.NewSampleTags(map[string]string{"foo": "bar", "url": "uuu"}), - config: Config{ - KeepTags: null.BoolFrom(true), - KeepUrlTag: null.BoolFrom(true), - }, - labels: []prompb.Label{ - {Name: "foo", Value: "bar"}, - {Name: "url", Value: "uuu"}, - }, - }, - "discard-tags": { - tags: metrics.NewSampleTags(map[string]string{"foo": "bar", "name": "nnn"}), - config: Config{ - KeepTags: null.BoolFrom(false), - }, - labels: []prompb.Label{}, - }, - } - - for name, testCase := range testCases { - testCase := testCase - t.Run(name, func(t *testing.T) { - labels, err := tagsToLabels(testCase.tags, testCase.config) - require.NoError(t, err) - - assert.Equal(t, len(testCase.labels), len(labels)) - - for i := range testCase.labels { - var found bool - - // order is not guaranteed ATM - for j := range labels { - if labels[j].Name == testCase.labels[i].Name { - assert.Equal(t, testCase.labels[i].Value, labels[j].Value) - found = true - break - } - - } - if !found { - assert.Fail(t, fmt.Sprintf("Not found label %s: \n"+ - "expected: %v\n"+ - "actual : %v", testCase.labels[i].Name, testCase.labels, labels)) - } - } - }) - } -} diff --git a/pkg/remotewrite/metrics.go b/pkg/remotewrite/metrics.go deleted file mode 100644 index e0a2f6d..0000000 --- a/pkg/remotewrite/metrics.go +++ /dev/null @@ -1,152 +0,0 @@ -package remotewrite - -import ( - "fmt" - - "github.com/prometheus/prometheus/pkg/timestamp" - "github.com/prometheus/prometheus/prompb" - "go.k6.io/k6/metrics" -) - -// Note: k6 Registry is not used here since Output is getting -// samples only from k6 engine, hence we assume they are already vetted. - -// metricsStorage is an in-memory gather point for metrics -type metricsStorage struct { - m map[string]*metrics.Metric -} - -func newMetricsStorage() *metricsStorage { - return &metricsStorage{ - m: make(map[string]*metrics.Metric), - } -} - -// update modifies metricsStorage and returns updated sample -// so that the stored metric and the returned metric hold the same value -func (ms *metricsStorage) update(sample metrics.Sample, add func(*metrics.Metric, metrics.Sample)) *metrics.Metric { - m, ok := ms.m[sample.Metric.Name] - if !ok { - var sink metrics.Sink - switch sample.Metric.Type { - case metrics.Counter: - sink = &metrics.CounterSink{} - case metrics.Gauge: - sink = &metrics.GaugeSink{} - case metrics.Trend: - sink = &metrics.TrendSink{} - case metrics.Rate: - sink = &metrics.RateSink{} - default: - panic("the Metric Type is not supported") - } - - m = &metrics.Metric{ - Name: sample.Metric.Name, - Type: sample.Metric.Type, - Contains: sample.Metric.Contains, - Sink: sink, - } - - ms.m[m.Name] = m - } - - // TODO: https://github.com/grafana/xk6-output-prometheus-remote/issues/11 - // - // Sometimes remote write endpoint throws an error about duplicates even if the values - // sent were different. By current observations, this is a hard to repeat case and - // potentially a bug. - // Related: https://github.com/prometheus/prometheus/issues/9210 - - // TODO: Trend is the unique type that benefits from this logic. - // so this logic can be removed just creating - // a new implementation in this extension - // for TrendSink and its Add method. - if add == nil { - m.Sink.Add(sample) - } else { - add(m, sample) - } - - return m -} - -// transform k6 sample into TimeSeries for remote-write -func (ms *metricsStorage) transform(mapping Mapping, sample metrics.Sample, labels []prompb.Label) ([]prompb.TimeSeries, error) { - var newts []prompb.TimeSeries - - switch sample.Metric.Type { - case metrics.Counter: - newts = mapping.MapCounter(ms, sample, labels) - - case metrics.Gauge: - newts = mapping.MapGauge(ms, sample, labels) - - case metrics.Rate: - newts = mapping.MapRate(ms, sample, labels) - - case metrics.Trend: - newts = mapping.MapTrend(ms, sample, labels) - - default: - return nil, fmt.Errorf("Something is really off as I cannot recognize the type of metric %s: `%s`", sample.Metric.Name, sample.Metric.Type) - } - - return newts, nil -} - -// Mapping represents the specific way k6 metrics can be mapped to metrics of -// remote agent. As each remote agent can use different ways to store metrics as well as -// expect different values on remote write endpoint, they must have their own support. -type Mapping interface { - MapCounter(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries - MapGauge(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries - MapRate(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries - MapTrend(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries - - // AdjustLabels(labels []prompb.Label) []prompb.Label -} - -func NewMapping(mapping string) Mapping { - switch mapping { - case "prometheus": - return &PrometheusMapping{} - default: - return &RawMapping{} - } -} - -type RawMapping struct{} - -func (rm *RawMapping) MapCounter(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - return rm.processSample(sample, labels) -} - -func (rm *RawMapping) MapGauge(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - return rm.processSample(sample, labels) -} - -func (rm *RawMapping) MapRate(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - return rm.processSample(sample, labels) -} - -func (rm *RawMapping) MapTrend(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - return rm.processSample(sample, labels) -} - -func (rm *RawMapping) processSample(sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - return []prompb.TimeSeries{ - { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s", defaultMetricPrefix, sample.Metric.Name), - }), - Samples: []prompb.Sample{ - { - Value: sample.Value, - Timestamp: timestamp.FromTime(sample.Time), - }, - }, - }, - } -} diff --git a/pkg/remotewrite/prometheus.go b/pkg/remotewrite/prometheus.go index aff38d6..9b662dd 100644 --- a/pkg/remotewrite/prometheus.go +++ b/pkg/remotewrite/prometheus.go @@ -2,217 +2,122 @@ package remotewrite import ( "fmt" - "math" - "sort" + "time" + "github.com/grafana/xk6-output-prometheus-remote/pkg/tsdb" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" - "go.k6.io/k6/metrics" ) -type PrometheusMapping struct{} +func tagsToLabels(config Config, tags tsdb.TagSet) []prompb.Label { + if !config.KeepTags.Bool { + return []prompb.Label{} + } -func (pm *PrometheusMapping) MapCounter(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - metric := ms.update(sample, nil) - aggr := metric.Sink.Format(0) + // Adding one more because with high probability + // a __name__ label will be added after + labels := make([]prompb.Label, 0, 1+len(tags)) - return []prompb.TimeSeries{ - { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s", defaultMetricPrefix, sample.Metric.Name), - }), - Samples: []prompb.Sample{ - { - Value: aggr["count"], - Timestamp: timestamp.FromTime(sample.Time), - }, - }, - }, - } -} + for _, tag := range tags { + if len(tag.Key) < 1 || len(tag.Value) < 1 { + continue + } -func (pm *PrometheusMapping) MapGauge(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - return []prompb.TimeSeries{ - { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s", defaultMetricPrefix, sample.Metric.Name), - }), - Samples: []prompb.Sample{ - { - // Gauge is just the latest value - // so we can skip the sink using directly the value from the sample. - Value: sample.Value, - Timestamp: timestamp.FromTime(sample.Time), - }, - }, - }, + if !config.KeepNameTag.Bool && tag.Key == "name" { + continue + } + + if !config.KeepURLTag.Bool && tag.Key == "url" { + continue + } + + labels = append(labels, prompb.Label{ + Name: tag.Key, + Value: tag.Value, + }) } + + return labels } -func (pm *PrometheusMapping) MapRate(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - metric := ms.update(sample, nil) - aggr := metric.Sink.Format(0) +func appendNameLabel(labels []prompb.Label, name string) []prompb.Label { + return append(labels, prompb.Label{ + Name: "__name__", + Value: fmt.Sprintf("%s%s", defaultMetricPrefix, name), + }) +} - return []prompb.TimeSeries{ - { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s", defaultMetricPrefix, sample.Metric.Name), - }), - Samples: []prompb.Sample{ - { - Value: aggr["rate"], - Timestamp: timestamp.FromTime(sample.Time), - }, - }, - }, +func MapTrend(metricName string, s *tsdb.TrendSeries, labels []prompb.Label) []prompb.TimeSeries { + // TODO: asssign in-place + aggr := map[string]float64{ + "min": s.Min(), + "max": s.Max(), + "avg": s.Avg(), + "med": s.Med(), + "p(90)": s.P(0.90), + "p(95)": s.P(0.95), } -} -func (pm *PrometheusMapping) MapTrend(ms *metricsStorage, sample metrics.Sample, labels []prompb.Label) []prompb.TimeSeries { - metric := ms.update(sample, trendAdd) + ts := time.Now() // Prometheus metric system does not support Trend so this mapping will store gauges // to keep track of key values. // TODO: when Prometheus implements support for sparse histograms, re-visit this implementation - s := metric.Sink.(*metrics.TrendSink) - aggr := map[string]float64{ - "min": s.Min, - "max": s.Max, - "avg": s.Avg, - "med": s.Med, - "p(90)": p(s, 0.90), - "p(95)": p(s, 0.95), - } - return []prompb.TimeSeries{ { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s_min", defaultMetricPrefix, sample.Metric.Name), - }), + Labels: appendNameLabel(labels, metricName+"_min"), Samples: []prompb.Sample{ { Value: aggr["min"], - Timestamp: timestamp.FromTime(sample.Time), + Timestamp: timestamp.FromTime(ts), }, }, }, { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s_max", defaultMetricPrefix, sample.Metric.Name), - }), + Labels: appendNameLabel(labels, metricName+"_max"), Samples: []prompb.Sample{ { Value: aggr["max"], - Timestamp: timestamp.FromTime(sample.Time), + Timestamp: timestamp.FromTime(ts), }, }, }, { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s_avg", defaultMetricPrefix, sample.Metric.Name), - }), + Labels: appendNameLabel(labels, metricName+"_avg"), Samples: []prompb.Sample{ { Value: aggr["avg"], - Timestamp: timestamp.FromTime(sample.Time), + Timestamp: timestamp.FromTime(ts), }, }, }, { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s_med", defaultMetricPrefix, sample.Metric.Name), - }), + Labels: appendNameLabel(labels, metricName+"_med"), Samples: []prompb.Sample{ { Value: aggr["med"], - Timestamp: timestamp.FromTime(sample.Time), + Timestamp: timestamp.FromTime(ts), }, }, }, { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s_p90", defaultMetricPrefix, sample.Metric.Name), - }), + Labels: appendNameLabel(labels, metricName+"_p90"), Samples: []prompb.Sample{ { Value: aggr["p(90)"], - Timestamp: timestamp.FromTime(sample.Time), + Timestamp: timestamp.FromTime(ts), }, }, }, { - Labels: append(labels, prompb.Label{ - Name: "__name__", - Value: fmt.Sprintf("%s%s_p95", defaultMetricPrefix, sample.Metric.Name), - }), + Labels: appendNameLabel(labels, metricName+"_p95"), Samples: []prompb.Sample{ { Value: aggr["p(95)"], - Timestamp: timestamp.FromTime(sample.Time), + Timestamp: timestamp.FromTime(ts), }, }, }, } } - -// The following functions are an attempt to add ad-hoc optimization to TrendSink, -// and are a partial copy-paste from k6/metrics. -// TODO: re-write & refactor this once metrics refactoring progresses in k6. - -func trendAdd(current *metrics.Metric, s metrics.Sample) { - t := current.Sink.(*metrics.TrendSink) - - // insert into sorted array instead of sorting anew on each addition - index := sort.Search(len(t.Values), func(i int) bool { - return t.Values[i] > s.Value - }) - t.Values = append(t.Values, 0) - copy(t.Values[index+1:], t.Values[index:]) - t.Values[index] = s.Value - - t.Count += 1 - t.Sum += s.Value - t.Avg = t.Sum / float64(t.Count) - - if s.Value > t.Max { - t.Max = s.Value - } - if s.Value < t.Min || t.Count == 1 { - t.Min = s.Value - } - - if (t.Count & 0x01) == 0 { - t.Med = (t.Values[(t.Count/2)-1] + t.Values[(t.Count/2)]) / 2 - } else { - t.Med = t.Values[t.Count/2] - } - - current.Sink = t -} - -func p(t *metrics.TrendSink, pct float64) float64 { - switch t.Count { - case 0: - return 0 - case 1: - return t.Values[0] - default: - // If percentile falls on a value in Values slice, we return that value. - // If percentile does not fall on a value in Values slice, we calculate (linear interpolation) - // the value that would fall at percentile, given the values above and below that percentile. - i := pct * (float64(t.Count) - 1.0) - j := t.Values[int(math.Floor(i))] - k := t.Values[int(math.Ceil(i))] - f := i - math.Floor(i) - return j + (k-j)*f - } -} diff --git a/pkg/remotewrite/prometheus_test.go b/pkg/remotewrite/prometheus_test.go index b18ba61..d030bd7 100644 --- a/pkg/remotewrite/prometheus_test.go +++ b/pkg/remotewrite/prometheus_test.go @@ -1,110 +1,112 @@ package remotewrite import ( - "math/rand" + "fmt" "testing" - "time" + "github.com/grafana/xk6-output-prometheus-remote/pkg/tsdb" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" - "go.k6.io/k6/metrics" + "gopkg.in/guregu/null.v3" ) -// check that ad-hoc optimization doesn't produce wrong values -func TestTrendAdd(t *testing.T) { +func TestTagsToLabels(t *testing.T) { t.Parallel() - testCases := []struct { - current *metrics.Metric - s metrics.Sample - expected metrics.TrendSink + testCases := map[string]struct { + tags map[string]string + config Config + labels []prompb.Label }{ - { - current: &metrics.Metric{ - Sink: &metrics.TrendSink{}, + "empty-tags": { + tags: nil, + config: Config{ + KeepTags: null.BoolFrom(true), + KeepNameTag: null.BoolFrom(false), }, - s: metrics.Sample{Value: 2}, - expected: metrics.TrendSink{ - Values: []float64{2}, - Count: 1, - Min: 2, - Max: 2, - Sum: 2, - Avg: 2, - Med: 2, + labels: []prompb.Label{}, + }, + "name-tag-discard": { + tags: map[string]string{"foo": "bar", "name": "nnn"}, + config: Config{ + KeepTags: null.BoolFrom(true), + KeepNameTag: null.BoolFrom(false), + }, + labels: []prompb.Label{ + {Name: "foo", Value: "bar"}, + }, + }, + "name-tag-keep": { + tags: map[string]string{"foo": "bar", "name": "nnn"}, + config: Config{ + KeepTags: null.BoolFrom(true), + KeepNameTag: null.BoolFrom(true), + }, + labels: []prompb.Label{ + {Name: "foo", Value: "bar"}, + {Name: "name", Value: "nnn"}, }, }, - { - current: &metrics.Metric{ - Sink: &metrics.TrendSink{ - Values: []float64{8, 3, 1, 7, 4, 2}, - Count: 6, - Min: 1, - Max: 8, - Sum: 25, - }, + "url-tag-discard": { + tags: map[string]string{"foo": "bar", "url": "uuu"}, + config: Config{ + KeepTags: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(false), }, - s: metrics.Sample{Value: 12.3}, - expected: metrics.TrendSink{ - Values: []float64{8, 3, 1, 7, 4, 2, 12.3}, - Count: 7, - Min: 1, - Max: 12.3, - Sum: 37.3, - Avg: 37.3 / 7, - Med: 7, + labels: []prompb.Label{ + {Name: "foo", Value: "bar"}, }, }, + "url-tag-keep": { + tags: map[string]string{"foo": "bar", "url": "uuu"}, + config: Config{ + KeepTags: null.BoolFrom(true), + KeepURLTag: null.BoolFrom(true), + }, + labels: []prompb.Label{ + {Name: "foo", Value: "bar"}, + {Name: "url", Value: "uuu"}, + }, + }, + "discard-tags": { + tags: map[string]string{"foo": "bar", "name": "nnn"}, + config: Config{ + KeepTags: null.BoolFrom(false), + }, + labels: []prompb.Label{}, + }, } - for _, testCase := range testCases { - // trendAdd should result in the same values as Sink.Add - - trendAdd(testCase.current, testCase.s) - sink := testCase.current.Sink.(*metrics.TrendSink) - - assert.Equal(t, testCase.expected.Count, sink.Count) - assert.Equal(t, testCase.expected.Min, sink.Min) - assert.Equal(t, testCase.expected.Max, sink.Max) - assert.Equal(t, testCase.expected.Sum, sink.Sum) - assert.Equal(t, testCase.expected.Avg, sink.Avg) - assert.Equal(t, testCase.expected.Med, sink.Med) - assert.Equal(t, testCase.expected.Values, sink.Values) - } -} + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + var tagset tsdb.TagSet + if testCase.tags != nil { + for k, v := range testCase.tags { + tagset = append(tagset, &tsdb.Tag{Key: k, Value: v}) + } + } + labels := tagsToLabels(testCase.config, tagset) -func BenchmarkTrendAdd(b *testing.B) { - benchF := []func(b *testing.B, start metrics.Metric){ - func(b *testing.B, m metrics.Metric) { - b.ResetTimer() - rand.Seed(time.Now().Unix()) + assert.Equal(t, len(testCase.labels), len(labels)) - for i := 0; i < b.N; i++ { - trendAdd(&m, metrics.Sample{Value: rand.Float64() * 1000}) - sink := m.Sink.(*metrics.TrendSink) - p(sink, 0.90) - p(sink, 0.95) - } - }, - func(b *testing.B, start metrics.Metric) { - b.ResetTimer() - rand.Seed(time.Now().Unix()) + for i := range testCase.labels { + var found bool - for i := 0; i < b.N; i++ { - start.Sink.Add(metrics.Sample{Value: rand.Float64() * 1000}) - start.Sink.Format(0) + // order is not guaranteed ATM + for j := range labels { + if labels[j].Name == testCase.labels[i].Name { + assert.Equal(t, testCase.labels[i].Value, labels[j].Value) + found = true + break + } + } + if !found { + assert.Fail(t, fmt.Sprintf("Not found label %s: \n"+ + "expected: %v\n"+ + "actual : %v", testCase.labels[i].Name, testCase.labels, labels)) + } } - }, + }) } - - start := metrics.Metric{ - Type: metrics.Trend, - Sink: &metrics.TrendSink{}, - } - - b.Run("trendAdd", func(b *testing.B) { - benchF[0](b, start) - }) - b.Run("TrendSink.Add", func(b *testing.B) { - benchF[1](b, start) - }) } diff --git a/pkg/remotewrite/remotewrite.go b/pkg/remotewrite/remotewrite.go index cc82683..0875f6e 100644 --- a/pkg/remotewrite/remotewrite.go +++ b/pkg/remotewrite/remotewrite.go @@ -1,13 +1,19 @@ package remotewrite import ( + "bufio" "context" + "errors" "fmt" + "hash/maphash" + "os" "time" - //nolint:staticcheck + "github.com/grafana/xk6-output-prometheus-remote/pkg/tsdb" + "github.com/golang/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" "github.com/sirupsen/logrus" @@ -16,23 +22,21 @@ import ( ) type Output struct { - config Config - - client remote.WriteClient - metrics *metricsStorage - mapping Mapping - periodicFlusher *output.PeriodicFlusher output.SampleBuffer - logger logrus.FieldLogger + config Config + logger logrus.FieldLogger + client remote.WriteClient + tsdb tsdb.Repository + tagmap tsdb.TagMap + periodicCollector *output.PeriodicFlusher + periodicFlusher *output.PeriodicFlusher } var _ output.Output = new(Output) -// toggle to indicate whether we should stop dropping samples -var flushTooLong bool - func New(params output.Params) (*Output, error) { + logger := params.Logger.WithFields(logrus.Fields{"output": "Prometheus Remote-Write"}) config, err := GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument) if err != nil { return nil, err @@ -49,44 +53,110 @@ func New(params output.Params) (*Output, error) { return nil, err } - params.Logger.Info(fmt.Sprintf("Prometheus: configuring remote-write with %s mapping", config.Mapping.String)) - return &Output{ - client: client, - config: config, - metrics: newMetricsStorage(), - mapping: NewMapping(config.Mapping.String), - logger: params.Logger, + client: client, + config: config, + logger: logger, + tsdb: tsdb.NewInMemoryRepository(), }, nil } func (*Output) Description() string { - return "Output k6 metrics to prometheus remote-write endpoint" + return "Output k6 metrics to Prometheus remote-write endpoint" } func (o *Output) Start() error { - if periodicFlusher, err := output.NewPeriodicFlusher(time.Duration(o.config.FlushPeriod.Duration), o.flush); err != nil { + collector, err := output.NewPeriodicFlusher(time.Duration(o.config.FlushPeriod.Duration), o.collect) + if err != nil { + return err + } + o.periodicCollector = collector + + flusher, err := output.NewPeriodicFlusher(time.Duration(o.config.FlushPeriod.Duration), o.flushSeries) + if err != nil { return err - } else { - o.periodicFlusher = periodicFlusher } - o.logger.Debug("Prometheus: starting remote-write") + o.periodicFlusher = flusher + o.logger.Debug("Periodic collector and series flusher have started") return nil } func (o *Output) Stop() error { - o.logger.Debug("Prometheus: stopping remote-write") + o.logger.Debug("Stopping the output") + o.periodicCollector.Stop() o.periodicFlusher.Stop() - return nil + + series, err := o.tsdb.GetSeries() + if err != nil { + return err + } + o.logger.WithField("unique-series-generated", len(series)).Debug("Output stopped") + + // TODO: remove before merge + f, err := os.CreateTemp("/home/codebien/code/grafana/xk6-output-prometheus-remote", "k6-") + if err != nil { + return err + } + w := bufio.NewWriter(f) + var line string + for _, s := range series { + line = fmt.Sprintf("%s{%+v} %f", s.MetricName, func() map[string]string { + m := make(map[string]string) + for _, t := range s.Tags { + m[t.Key] = t.Value + } + return m + }(), s.Sink.Value()) + fmt.Fprintln(w, line) + } + return w.Flush() +} + +func (o *Output) collect() { + samplesContainers := o.GetBufferedSamples() + + if len(samplesContainers) < 1 { + return + } + + // Remote write endpoint accepts TimeSeries structure defined in gRPC. It must: + // a) contain Labels array + // b) have a __name__ label: without it, metric might be unquerable or even rejected + // as a metric without a name. This behaviour depends on underlying storage used. + // c) not have duplicate timestamps within 1 timeseries, see https://github.com/prometheus/prometheus/issues/9210 + // Prometheus write handler processes only some fields as of now, so here we'll add only them. + + series, err := o.sinkSeriesFromSamples(samplesContainers) + if err != nil { + o.logger.WithError(err).Error("Failed to convert the samples as time series") + return + } + + o.logger.WithField("series", series).Debug("Converting samples to time series and sink them") + + // TODO: sink here the series? } -func (o *Output) flush() { +func (o *Output) flushSeries() { var ( start = time.Now() nts int ) + series, err := o.tsdb.GetSeries() + if err != nil { + o.logger.WithError(err).Error("Fetching time series") + return + } + + if len(series) < 1 { + o.logger.Debug("Skipping the flush operation, any time series found") + return + } + + nts = len(series) + defer func() { d := time.Since(start) if d > time.Duration(o.config.FlushPeriod.Duration) { @@ -94,71 +164,136 @@ func (o *Output) flush() { o.logger.WithField("nts", nts). Warn(fmt.Sprintf("Remote write took %s while flush period is %s. Some samples may be dropped.", d.String(), o.config.FlushPeriod.String())) - flushTooLong = true } else { o.logger.WithField("nts", nts).Debug(fmt.Sprintf("Remote write took %s.", d.String())) - flushTooLong = false } }() - samplesContainers := o.GetBufferedSamples() - - // Remote write endpoint accepts TimeSeries structure defined in gRPC. It must: - // a) contain Labels array - // b) have a __name__ label: without it, metric might be unquerable or even rejected - // as a metric without a name. This behaviour depends on underlying storage used. - // c) not have duplicate timestamps within 1 timeseries, see https://github.com/prometheus/prometheus/issues/9210 - // Prometheus write handler processes only some fields as of now, so here we'll add only them. - promTimeSeries := o.convertToTimeSeries(samplesContainers) - nts = len(promTimeSeries) + o.logger.WithField("nts", nts).Debug("Preparing time series for flushing") - o.logger.WithField("nts", nts).Debug("Converted samples to time series in preparation for sending.") + // TODO: maybe some cache? + var promSeries []prompb.TimeSeries + for _, s := range series { + promSeries = append(promSeries, o.formatSeriesAsProm(s)...) + } req := prompb.WriteRequest{ - Timeseries: promTimeSeries, + Timeseries: promSeries, + } + + buf, err := proto.Marshal(&req) + if err != nil { + o.logger.WithError(err).Fatal("Failed to marshal timeseries") + return + } + + // TODO: this call can panic, find the source and fix it + encoded := snappy.Encode(nil, buf) + if err := o.client.Store(context.Background(), encoded); err != nil { + o.logger.WithError(err).Error("Failed to store timeseries") + return } +} + +// TODO: cache the prom time series (name + tags) - if buf, err := proto.Marshal(&req); err != nil { - o.logger.WithError(err).Fatal("Failed to marshal timeseries.") - } else { - encoded := snappy.Encode(nil, buf) // this call can panic - if err = o.client.Store(context.Background(), encoded); err != nil { - o.logger.WithError(err).Error("Failed to store timeseries.") +func (o *Output) formatSeriesAsProm(series *tsdb.TimeSeries) []prompb.TimeSeries { + labels := tagsToLabels(o.config, series.Tags) + + switch styp := series.Sink.(type) { + case *tsdb.CountSeries, *tsdb.GaugeSeries, *tsdb.RateSeries: + return []prompb.TimeSeries{ + { + Labels: appendNameLabel(labels, series.MetricName), + Samples: []prompb.Sample{ + { + Value: series.Sink.Value(), + Timestamp: timestamp.FromTime(time.Now()), + }, + }, + }, } + + case *tsdb.TrendSeries: + return MapTrend(series.MetricName, styp, labels) + default: + panic("the time series type is not supported") } } -func (o *Output) convertToTimeSeries(samplesContainers []metrics.SampleContainer) []prompb.TimeSeries { - promTimeSeries := make([]prompb.TimeSeries, 0) +func (o *Output) sinkSeriesFromSamples(samplesContainers []metrics.SampleContainer) (uint, error) { + sinked := make(map[uint64]*tsdb.TimeSeries) + // TODO: aggregate same time series across containers (maybe caching?) + // useless to fetch and update for each single sample for _, samplesContainer := range samplesContainers { samples := samplesContainer.GetSamples() for _, sample := range samples { - // Prometheus remote write treats each label array in TimeSeries as the same - // for all Samples in those TimeSeries (https://github.com/prometheus/prometheus/blob/03d084f8629477907cab39fc3d314b375eeac010/storage/remote/write_handler.go#L75). - // But K6 metrics can have different tags per each Sample so in order not to - // lose info in tags or assign tags wrongly, let's store each Sample in a different TimeSeries, for now. - // This approach also allows to avoid hard to replicate issues with duplicate timestamps. - - labels, err := tagsToLabels(sample.Tags, o.config) + tset, err := o.tagSetFromTags(sample.Tags.CloneTags()) if err != nil { - o.logger.Error(err) + return 0, err + } + + id := tsdb.HashKey(sample.Metric.Name, tset) + series, ok := sinked[id] + if ok { + series.AddPoint(sample.Value) + continue } + series, err = o.tsdb.GetSeriesByID(id) + if err != nil { + if !errors.Is(err, tsdb.ErrSeriesNotFound) { + return 0, err + } - if newts, err := o.metrics.transform(o.mapping, sample, labels); err != nil { - o.logger.Error(err) - } else { - promTimeSeries = append(promTimeSeries, newts...) + // TODO: avoid to re-hash + series = o.k6MetricToSeries(sample.Metric, tset) + err = o.tsdb.InsertSeries(series) + if err != nil { + return 0, err + } } + + series.AddPoint(sample.Value) + sinked[id] = series } + } + return uint(len(sinked)), nil +} - // Do not blow up if remote endpoint is overloaded and responds too slowly. - // TODO: consider other approaches - if flushTooLong && len(promTimeSeries) > 150000 { - break +var tagHasher maphash.Hash + +func (o *Output) tagSetFromTags(tags map[string]string) (tsdb.TagSet, error) { + tset := tsdb.NewTagSet(len(tags)) + for k, v := range tags { + // TODO: define a better separator + tagHasher.WriteString(k + "_" + v) + h := tagHasher.Sum64() + tagHasher.Reset() + + tag := o.tagmap.Get(h) + if tag == nil { + tag = &tsdb.Tag{Key: k, Value: v} + o.tagmap.Set(h, tag) } + tset.Add(tag) } + return tset, nil +} - return promTimeSeries +// TODO: test in case of sub-metric +func (o *Output) k6MetricToSeries(m *metrics.Metric, tset tsdb.TagSet) *tsdb.TimeSeries { + switch m.Type { + case metrics.Counter: + return tsdb.NewCountSeries(m.Name, tset) + case metrics.Gauge: + return tsdb.NewGaugeSeries(m.Name, tset) + case metrics.Trend: + return tsdb.NewTrendSeries(m.Name, tset) + case metrics.Rate: + return tsdb.NewRateSeries(m.Name, tset) + default: + panic("the metric type is not supported") + } } diff --git a/pkg/remotewrite/remotewrite_test.go b/pkg/remotewrite/remotewrite_test.go new file mode 100644 index 0000000..b975537 --- /dev/null +++ b/pkg/remotewrite/remotewrite_test.go @@ -0,0 +1,111 @@ +package remotewrite + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/grafana/xk6-output-prometheus-remote/pkg/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/metrics" + "go.k6.io/k6/output" +) + +func TestOutputCollect(t *testing.T) { + o, err := New(output.Params{ + Logger: testutils.NewLogger(t), + }) + require.NoError(t, err) + + o.tsdb = tsdb.NewInMemoryRepository() + metric := &metrics.Metric{ + Name: "myMetric", + Type: metrics.Counter, + } + o.AddMetricSamples([]metrics.SampleContainer{ + metrics.Samples([]metrics.Sample{ + { + Metric: metric, + Time: time.Now(), + Tags: metrics.NewSampleTags(map[string]string{"key1": "val1"}), + Value: 3.14, + }, + }), + }) + o.AddMetricSamples([]metrics.SampleContainer{ + metrics.Samples([]metrics.Sample{ + { + Metric: metric, + Time: time.Now(), + Tags: metrics.NewSampleTags(map[string]string{"key1": "val1"}), + Value: 2.71, + }, + { + Metric: metric, + Time: time.Now(), + Tags: metrics.NewSampleTags(map[string]string{"key1": "val2"}), + Value: 1.61, + }, + }), + }) + + o.collect() + + serieses, err := o.tsdb.GetSeries() + require.NoError(t, err) + require.Len(t, serieses, 2) + + series, err := o.tsdb.GetSeriesByID(tsdb.HashKey( + "myMetric", + tsdb.TagSet{&tsdb.Tag{Key: "key1", Value: "val1"}}, + )) + require.NoError(t, err) + + cs, ok := series.Sink.(*tsdb.CountSeries) + require.True(t, ok) + + assert.Equal(t, cs.Value(), 5.85) +} + +type remoteWriteMock struct{} + +// Store stores the given samples in the remote storage. +func (rwm remoteWriteMock) Store(_ context.Context, b []byte) error { + if len(b) < 1 { + return fmt.Errorf("received an empty body") + } + return nil +} + +// Name uniquely identifies the remote storage. +func (rwm remoteWriteMock) Name() string { + panic("Name not implemented") +} + +// Endpoint is the remote read or write endpoint for the storage client. +func (rwm remoteWriteMock) Endpoint() string { + panic("Endpoint not implemented") +} + +func TestOutputFlushSeries(t *testing.T) { + o, err := New(output.Params{ + Logger: testutils.NewLogger(t), + }) + require.NoError(t, err) + + o.client = remoteWriteMock{} + + o.tsdb = tsdb.NewInMemoryRepository() + metric := &metrics.Metric{ + Name: "myMetric", + Type: metrics.Counter, + } + series := tsdb.NewCountSeries(metric.Name, tsdb.TagSet{{Key: "tag1", Value: "val1"}}) + require.NoError(t, o.tsdb.InsertSeries(series)) + + series.AddPoint(42) + o.flushSeries() +} diff --git a/pkg/tsdb/repository.go b/pkg/tsdb/repository.go new file mode 100644 index 0000000..edf4b0f --- /dev/null +++ b/pkg/tsdb/repository.go @@ -0,0 +1,67 @@ +package tsdb + +import ( + "errors" + "sync" +) + +var ( + ErrSeriesNotFound = errors.New("series not found") + ErrSeriesAlreadyExists = errors.New("series already exists") +) + +type Repository interface { + GetSeries() ([]*TimeSeries, error) + GetSeriesByID(hash uint64) (*TimeSeries, error) + InsertSeries(*TimeSeries) error +} + +// InMemory is a basic in-memory time series storage. +type InMemory struct { + rwm sync.RWMutex + + // series is the storage of the available series + // we expect time series to be reused as much as possible + // so it should be able to maintain the writes low + // and instead reading most of the time. + series map[uint64]*TimeSeries +} + +func NewInMemoryRepository() *InMemory { + return &InMemory{ + series: make(map[uint64]*TimeSeries), + rwm: sync.RWMutex{}, + } +} + +func (inmem *InMemory) GetSeries() ([]*TimeSeries, error) { + inmem.rwm.RLock() + all := make([]*TimeSeries, 0, len(inmem.series)) + for _, s := range inmem.series { + all = append(all, s) + } + inmem.rwm.RUnlock() + + return all, nil +} + +func (inmem *InMemory) GetSeriesByID(id uint64) (*TimeSeries, error) { + inmem.rwm.RLock() + s, ok := inmem.series[id] + inmem.rwm.RUnlock() + if !ok { + return nil, ErrSeriesNotFound + } + return s, nil +} + +func (inmem *InMemory) InsertSeries(series *TimeSeries) error { + inmem.rwm.Lock() + defer inmem.rwm.Unlock() + if _, ok := inmem.series[series.ID]; ok { + return ErrSeriesAlreadyExists + } + + inmem.series[series.ID] = series + return nil +} diff --git a/pkg/tsdb/repository_test.go b/pkg/tsdb/repository_test.go new file mode 100644 index 0000000..a3d3546 --- /dev/null +++ b/pkg/tsdb/repository_test.go @@ -0,0 +1,64 @@ +package tsdb + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRepositoryGetSeries(t *testing.T) { + r := NewInMemoryRepository() + series, err := r.GetSeries() + require.NoError(t, err) + assert.Empty(t, series) + + // TODO: decoupling, they have direct dependency in this way + // a bug on one side could impact the other side + // or the bug could be hidden + err = r.InsertSeries(NewCountSeries("metric-name", nil)) + require.NoError(t, err) + + series, err = r.GetSeries() + require.NoError(t, err) + assert.Len(t, series, 1) +} + +func TestRepositoryGetSeriesByID(t *testing.T) { + r := NewInMemoryRepository() + counterSeries := NewCountSeries("metric-name", nil) + + series, err := r.GetSeriesByID(counterSeries.ID) + assert.Equal(t, ErrSeriesNotFound, err) + assert.Nil(t, series) + + err = r.InsertSeries(counterSeries) + require.NoError(t, err) + + series, err = r.GetSeriesByID(counterSeries.ID) + require.NoError(t, err) + assert.Equal(t, "metric-name", series.MetricName) +} + +func TestRepositoryInsertSeries(t *testing.T) { + tags := []*Tag{ + {Key: "tag1", Value: "value1"}, + {Key: "tag2", Value: "value2"}, + } + counterSeries := NewCountSeries("metric-name", tags) + + r := NewInMemoryRepository() + err := r.InsertSeries(counterSeries) + require.NoError(t, err) + + err = r.InsertSeries(counterSeries) + assert.Equal(t, ErrSeriesAlreadyExists, err) + + counterSeries2 := NewCountSeries("metric-name", tags) + err = r.InsertSeries(counterSeries2) + assert.Equal(t, ErrSeriesAlreadyExists, err) + + series, err := r.GetSeries() + require.NoError(t, err) + assert.Len(t, series, 1) +} diff --git a/pkg/tsdb/tsdb.go b/pkg/tsdb/tsdb.go new file mode 100644 index 0000000..c88cc51 --- /dev/null +++ b/pkg/tsdb/tsdb.go @@ -0,0 +1,221 @@ +package tsdb + +import ( + "sort" + "sync" + + "github.com/cespare/xxhash/v2" + "github.com/openhistogram/circonusllhist" + "go.k6.io/k6/metrics" +) + +type Tag struct { + Key, Value string +} + +type TagSet []*Tag + +func NewTagSet(ncap int) TagSet { + return make([]*Tag, 0, ncap) +} + +// Add adds a new item to the inner slice in an ordered way. +// Add is a not safe concurrent-operation. +func (set *TagSet) Add(t *Tag) { + index := sort.Search(len(*set), func(i int) bool { + return (*set)[i].Key > t.Key + }) + + if len(*set) == index { + *set = append(*set, t) + return + } + + *set = append((*set)[:index], append(make([]*Tag, 1), (*set)[index:]...)...) + (*set)[index] = t +} + +type TagMap struct { + m sync.Map +} + +func (tagmap *TagMap) Get(hash uint64) *Tag { + t, ok := tagmap.m.Load(hash) + if !ok { + return nil + } + return t.(*Tag) +} + +func (tagmap *TagMap) Set(hash uint64, t *Tag) { + // TODO: refactor, in this way it's hashing twice + tagmap.m.Store(hash, t) +} + +type TimeSeries struct { + Sink Sink + MetricName string + Tags TagSet + ID uint64 +} + +func newTimeSeries(metric string, tags TagSet) *TimeSeries { + return &TimeSeries{ + MetricName: metric, + Tags: tags, + ID: HashKey(metric, tags), + } +} + +func (s TimeSeries) AddPoint(v float64) { + s.Sink.Add(v) +} + +type Sink interface { + Add(v float64) + Value() float64 +} + +var bytesep = []byte{0xff} + +func HashKey(metric string, tags TagSet) uint64 { + hasher := xxhash.New() + hasher.WriteString(metric) + for i := 0; i < len(tags); i++ { + hasher.Write(bytesep) + hasher.WriteString(tags[i].Key) + hasher.Write(bytesep) + hasher.WriteString(tags[i].Value) + } + h := hasher.Sum64() + hasher.Reset() + return h +} + +type CountSeries struct { + rwm sync.RWMutex + value float64 +} + +func NewCountSeries(metric string, tags TagSet) *TimeSeries { + s := newTimeSeries(metric, tags) + s.Sink = &CountSeries{} + return s +} + +func (cs *CountSeries) Add(v float64) { + if v < 0 { + return + } + cs.rwm.Lock() + cs.value += v + cs.rwm.Unlock() +} + +func (cs *CountSeries) Value() float64 { + cs.rwm.RLock() + defer cs.rwm.RUnlock() + return cs.value +} + +type GaugeSeries struct { + rwm sync.RWMutex + value float64 +} + +func NewGaugeSeries(metric string, tags TagSet) *TimeSeries { + s := newTimeSeries(metric, tags) + s.Sink = &GaugeSeries{} + return s +} + +func (gs *GaugeSeries) Add(v float64) { + gs.rwm.Lock() + gs.value = v + gs.rwm.Unlock() +} + +func (gs *GaugeSeries) Value() float64 { + gs.rwm.RLock() + defer gs.rwm.RUnlock() + return gs.value +} + +type RateSeries struct { + inner *metrics.RateSink + rwm sync.RWMutex +} + +func NewRateSeries(metric string, tags TagSet) *TimeSeries { + s := newTimeSeries(metric, tags) + s.Sink = &RateSeries{ + inner: &metrics.RateSink{}, + } + return s +} + +func (rs *RateSeries) Add(v float64) { + rs.rwm.Lock() + rs.inner.Add(metrics.Sample{Value: v}) + rs.rwm.Unlock() +} + +func (rs *RateSeries) Value() float64 { + rs.rwm.RLock() + defer rs.rwm.RUnlock() + return float64(rs.inner.Trues) / float64(rs.inner.Total) +} + +type TrendSeries struct { + rwm sync.RWMutex + + histogram *circonusllhist.Histogram + count int + sum float64 + avg float64 + max, min float64 +} + +func NewTrendSeries(metric string, tags TagSet) *TimeSeries { + s := newTimeSeries(metric, tags) + s.Sink = &TrendSeries{ + histogram: circonusllhist.New(), + } + return s +} + +func (t *TrendSeries) Add(v float64) { + t.rwm.Lock() + t.histogram.RecordValue(v) + t.count += 1 + t.sum += v + t.avg = t.sum / float64(t.count) + if v > t.max { + t.max = v + } + if t.count == 1 || v < t.min { + t.min = v + } + t.rwm.Unlock() +} + +func (t *TrendSeries) Value() float64 { + // TODO: it's the main p used by k6, maybe switch to 0.99 + return t.P(0.95) +} + +func (t *TrendSeries) onRLock(f func() float64) float64 { + t.rwm.RLock() + v := f() + t.rwm.RUnlock() + return v +} + +func (t *TrendSeries) Min() float64 { return t.onRLock(func() float64 { return t.min }) } +func (t *TrendSeries) Max() float64 { return t.onRLock(func() float64 { return t.max }) } +func (t *TrendSeries) Avg() float64 { return t.onRLock(func() float64 { return t.avg }) } +func (t *TrendSeries) Med() float64 { return t.P(0.5) } + +func (t *TrendSeries) P(pct float64) float64 { + return t.onRLock(func() float64 { return t.histogram.ValueAtQuantile(pct) }) +} diff --git a/pkg/tsdb/tsdb_test.go b/pkg/tsdb/tsdb_test.go new file mode 100644 index 0000000..f6a6869 --- /dev/null +++ b/pkg/tsdb/tsdb_test.go @@ -0,0 +1,101 @@ +package tsdb_test + +import ( + "math" + "testing" + + "github.com/grafana/xk6-output-prometheus-remote/pkg/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTagSetAdd(t *testing.T) { + tags := map[string]string{ + "group": "", + "method": "GET", + "expected_response": "true", + } + set := tsdb.NewTagSet(len(tags)) + for k, v := range tags { + set.Add(&tsdb.Tag{Key: k, Value: v}) + } + + exp := []string{ + "expected_response", + "group", + "method", + } + keys := func() []string { + s := []string{} + for _, t := range set { + s = append(s, t.Key) + } + return s + }() + assert.Equal(t, exp, keys) +} + +func TestHashKey(t *testing.T) { + tags := map[string]string{ + "expected_response": "true", "group": "", + "method": "GET", "name": "https://test.k6.io", + "proto": "HTTP/1.1", "scenario": "default", + "status": "200", "tls_version": "tls1.3", "url": "https://test.k6.io", + } + + var tagset tsdb.TagSet + for k, v := range tags { + tagset = append(tagset, &tsdb.Tag{Key: k, Value: v}) + } + + key := tsdb.HashKey("http_reqs", tagset) + twice := tsdb.HashKey("http_reqs", tagset) + assert.Equal(t, key, twice) +} + +func TestCounterSeriesAddPoint(t *testing.T) { + cs := tsdb.NewCountSeries("test", nil) + cs.AddPoint(5.12) + cs.AddPoint(42.1394) + cs.AddPoint(-1) // it's discarded + cs.AddPoint(64.8524) + assert.Equal(t, 112.1118, cs.Sink.(*tsdb.CountSeries).Value()) +} + +func TestGaugeSeriesAddPoint(t *testing.T) { + gs := tsdb.NewGaugeSeries("test", nil) + gs.AddPoint(1) + gs.AddPoint(1) + gs.AddPoint(-5) + assert.Equal(t, float64(-5), gs.Sink.(*tsdb.GaugeSeries).Value()) +} + +func TestRateSeriesAddPoint(t *testing.T) { + rs := tsdb.NewRateSeries("test", nil) + rs.AddPoint(1) + rs.AddPoint(0) + rs.AddPoint(7) + + rate, ok := rs.Sink.(*tsdb.RateSeries) + require.True(t, ok) + + assert.Equal(t, float64(0.667), math.Round(rate.Value()*1000)/1000) +} + +func TestTrendSeriesAddPoint(t *testing.T) { + ts := tsdb.NewTrendSeries("test", nil) + ts.AddPoint(3.14) + ts.AddPoint(2.718) + ts.AddPoint(1.618) + + trend, ok := ts.Sink.(*tsdb.TrendSeries) + require.True(t, ok) + + assert.Equal(t, float64(1.618), trend.Min()) + assert.Equal(t, float64(3.14), trend.Max()) + assert.Equal(t, float64(2.492), math.Round(trend.Avg()*1000)/1000) + assert.Equal(t, 2.75, trend.Med()) // 2.718 + assert.Equal(t, 3.17, trend.P(0.90)) // 3.2244 + assert.Equal(t, 3.185, trend.P(0.95)) // 3.2877 + assert.Equal(t, 3.197, trend.P(0.99)) // 3.2877 +}