Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC Time series #27

Closed
wants to merge 14 commits into from
108 changes: 108 additions & 0 deletions docs/metrics-data-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# k6 Time Series

We want to introduce the time series concept to k6 for getting the benefit of efficiently identifying the combination between a metric and the relative tags.

A summary of the benefits it brings across the platform:

* It allows executing faster lookups and comparisons by a single value, the ID (hash) of the series.
* It reduces the storage by allocating just one time a possible combination (the time series) and referencing it in all other places.
* It allows aggregating data by time series.
* It enables easier detection of high cardinality issues just by counting the entire unique data set of generated time series.
* It enables time series based outputs to simplify the integration's logic (e.g. Prometheus).

## Data model

#### Metrics and Series

* Metric (name, type)
* Time Series (metric, tags)
* Sample (timestamp, value, time series ref)
* Tag - (aka [k6/1831](https://github.com/grafana/k6/issues/1831))

```go
// This is the current k6 Metric polished from the Thresholds dependencies.
type Metric struct {
name string
type MetricType
contains ValueType
}

type Tag struct {
Key, Value string
}

type TimeSeries struct {
id uint64 // hash(metric+tags)
metricName string
tags []Tag
keys map[string]struct{}
Copy link
Member

@na-- na-- Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
keys map[string]struct{}
tagKeys map[string]int

or something like that, to allow for direct lookup? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm...I thinking if we should do one step more at this point. My idea of tags map was something map[keySepValue]struct{} for having the maximum efficiency on Contains but it wouldn't support the fetch by Tag.Key.

So we could consider this data structure for supporting the single GetTag(key):

tags map[key]stuct{
    value string
    valhash uint64
}

In this way, we could support most of the operations with one data structure. The unique missing operation is returning the sorted []Tag for example for GetTags(), I see two potential solution:

  • store just the keys in a sorted slice
  • add another field like order/index in the struct so we know in which specific place set the Tag item.
func (ts TimeSeries) GetTags() []Tag {
    s := make([]Tag, len(ts.tags))
    for k, data := range {
       s[data.index] = Tag{Key: k, Value: data.Value}
    }
    return s
}

The list of operations we have potentially consider in comparisons:

func (TimeSeries) Contains(subset []Tag) bool
func (TimeSeries) HasKey(string) bool 
func (TimeSeries) GetTag(string) Tag
func (TimeSeries) GetTags() []Tag

@na-- Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking better on this, it doesn't really work. Because on Contains we don't have the relative tag.valhash to compare. So I think, we can just support the GetTag by binary search 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand all of your concerns, or maybe I'm missing something 😕 Please correct me if I'm wrong, but we basically need to be able to efficiently access the tags both in a sorted sequential order and in a random access order (i.e. lookup by their key). So, the basic implementation choices are these:

  1. Store the tag keys and values in a sorted list ([]Tag) for efficient sequential access and have an index (map[string]int) by the key for efficient random access
  2. Store the tag keys and values in a map (map[string]string) for the efficient random access and have a sorted list of keys as a []string slice for the sequential access.
  3. Something weirder

The first approach seems preferable to me, since the second one involves an extra map per tag access when iterating across all tags sequentially, and anything else seems unnecessary over-engineering at this early point. 🤷‍♂️

Given that these time series objects should be measured in thousands at most, and not millions, I don't see why we'd want anything more complicated than this? You probably won't save all that much memory usage if you don't have the index and use binary search, for example, since I think the strings for the keys will be reused between the slice and the map 🤷‍♂️

Besides, we might be verging in bike-shedding territory here 🙂 . The actual implementation should be internal to the TimeSeries object, right? So I don't think it matters very much implementation you pick, we should be able to change it at any time in the future without anything breaking, right?

}

type Sample struct {
TimeSeries *TimeSeries
Timestamp uint64
Value float64
}
```

> **Note**: Tag and Metric without Sink+Threholds are represented here just as a general overview of the potential final data model. Tag is to be considered as [k6/1831](https://github.com/grafana/k6/issues/1831) and it isn't expected to be addressed in the same iteration.

#### IDs

The series IDs are `uint64` values generated by hashing the metric name and the tags' key-value pairs.

#### Sink

The sinks are implemented by metric types and they keep the series values up to date:

* Counter: a monotonic increasing value
* Gauge: the latest value
* SparseHistogram: counters per dynamic ranges (buckets) - https://grafana.com/blog/2021/11/03/how-sparse-histograms-can-improve-efficiency-precision-and-mergeability-in-prometheus-tsdb

## Storage

#### Time Series database (aka tsdb)

We need to store all the time series generated from `k6` run during the execution so the other components (mostly the outputs) could query the storage for getting the value of the time series. It's expected to be in-memory and it should be concurrent-safe.

```go
type Repository interface {
InsertSeries(TimeSeries) error
GetSeriesByID(id uint64) (*TimeSeries, error)
}
```

## Samples generation

The current sampling process is controlled by the `metrics.PushIfNotDone` method. All the actual callers should resolve the time series from the storage before push a new Sample, or in the event no one is found to create and insert.
It requires the dependency from the time series database for all the callers (e.g. executors, JavaScript modules).

## metrics.Ingester

Ingester is responsible for resolving the entire set of Sinks impacted from the ingested time series then it adds the Sample's value to the resolved Sinks.

##### Example

In a `t0` where the status of the seen time series is:

```text
http_req_duration{status:200,method:GET}
http_req_duration{method:GET}
http_req_duration{status:200}
http_req_another_one{status:200}
```

The Ingester in the case of a collected Sample `http_req_duration{status:200,method:GET}` then it resolves the dependencies with the other seen time series in a unique set where it contains the following time series `http_req_duration{status:200}` and `http_req_duration{method:GET}`. It can now resolve the relative Metric's Sinks and it invokes them passing the Sample's value.

## Known issues

* Name and URL tags: `k6` is tagging HTTP requests with the URL. It will create a high cardinality issue for the time series data model. This should be fixed by adding the possibility to not store all the tags as indexable, having the availability to set them as `metadata` and exclude them from the time series generation. An alternative workaround could be to exclude them from the default set of enabled tags.
* We need to keep all the data for the Trend type for computing the percentiles. We plan to migrate to some form of approximation (Sparse Histogram, OpenHistogram, T-Digest, etc..)

## Acceptance criteria

- [ ] Can the new model enable the Prometheus output integration?
- [ ] Can the new model enable cloud aggregation?
- [ ] Can the new model work with Thresholds?
- [ ] Is the memory footprint generated from the new model reduced? If not, is it acceptable?
- [ ] Is the CPUs usage generated from the new model reduced? If not, is it acceptable?
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module github.com/grafana/xk6-output-prometheus-remote
go 1.17

require (
github.com/cespare/xxhash/v2 v2.1.2
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/kubernetes/helm v2.17.0+incompatible
github.com/openhistogram/circonusllhist v0.3.0
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v1.8.2-0.20211005150130-f29caccc4255
github.com/sirupsen/logrus v1.8.1
Expand All @@ -18,7 +20,6 @@ require (
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
github.com/aws/aws-sdk-go v1.40.37 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
Expand All @@ -28,6 +29,7 @@ require (
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -52,7 +54,7 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,9 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down Expand Up @@ -1055,6 +1056,8 @@ github.com/opencontainers/runtime-spec v1.0.3-0.20200929063507-e6143ca7d51d/go.m
github.com/opencontainers/runtime-tools v0.0.0-20181011054405-1d69bd0f9c39/go.mod h1:r3f7wjNzSs2extwzU3Y+6pKfobzPh+kKFJ3ofN+3nfs=
github.com/opencontainers/selinux v1.6.0/go.mod h1:VVGKuOLlE7v4PJyT6h7mNWvq1rzqiriPsEqVhc+svHE=
github.com/opencontainers/selinux v1.8.0/go.mod h1:RScLhm78qiWa2gbVCcGkC7tCGdgk3ogry1nUQF8Evvo=
github.com/openhistogram/circonusllhist v0.3.0 h1:CuEawy94hKEzjhSABdqkGirl6o67QrqtRoZg3CXBn6k=
github.com/openhistogram/circonusllhist v0.3.0/go.mod h1:PfeYJ/RW2+Jfv3wTz0upbY2TRour/LLqIm2K2Kw5zg0=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w=
github.com/opentracing-contrib/go-stdlib v1.0.0 h1:TBS7YuVotp8myLon4Pv7BtCBzOTo1DeZCld0Z63mW2w=
Expand Down Expand Up @@ -1910,8 +1913,9 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
41 changes: 13 additions & 28 deletions pkg/remotewrite/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ const (
)

type Config struct {
Mapping null.String `json:"mapping" envconfig:"K6_PROMETHEUS_MAPPING"`

Url null.String `json:"url" envconfig:"K6_PROMETHEUS_REMOTE_URL"` // here, in the name of env variable, we assume that we won't need to distinguish between remote write URL vs remote read URL
URL null.String `json:"url" envconfig:"K6_PROMETHEUS_REMOTE_URL"` // here, in the name of env variable, we assume that we won't need to distinguish between remote write URL vs remote read URL

Headers map[string]string `json:"headers" envconfig:"K6_PROMETHEUS_HEADERS"`

Expand All @@ -38,21 +36,20 @@ type Config struct {

KeepTags null.Bool `json:"keepTags" envconfig:"K6_KEEP_TAGS"`
KeepNameTag null.Bool `json:"keepNameTag" envconfig:"K6_KEEP_NAME_TAG"`
KeepUrlTag null.Bool `json:"keepUrlTag" envconfig:"K6_KEEP_URL_TAG"`
KeepURLTag null.Bool `json:"keepUrlTag" envconfig:"K6_KEEP_URL_TAG"`
}

func NewConfig() Config {
return Config{
Mapping: null.StringFrom("prometheus"),
Url: null.StringFrom("http://localhost:9090/api/v1/write"),
URL: null.StringFrom("http://localhost:9090/api/v1/write"),
InsecureSkipTLSVerify: null.BoolFrom(true),
CACert: null.NewString("", false),
User: null.NewString("", false),
Password: null.NewString("", false),
FlushPeriod: types.NullDurationFrom(defaultFlushPeriod),
KeepTags: null.BoolFrom(true),
KeepNameTag: null.BoolFrom(false),
KeepUrlTag: null.BoolFrom(true),
KeepURLTag: null.BoolFrom(true),
Headers: make(map[string]string),
}
}
Expand All @@ -79,7 +76,7 @@ func (conf Config) ConstructRemoteConfig() (*remote.ClientConfig, error) {
// TODO: consider if the auth logic should be enforced here
// (e.g. if insecureSkipTLSVerify is switched off, then check for non-empty certificate file and auth, etc.)

u, err := url.Parse(conf.Url.String)
u, err := url.Parse(conf.URL.String)
if err != nil {
return nil, err
}
Expand All @@ -97,12 +94,8 @@ func (conf Config) ConstructRemoteConfig() (*remote.ClientConfig, error) {
// From here till the end of the file partial duplicates waiting for config refactor (k6 #883)

func (base Config) Apply(applied Config) Config {
if applied.Mapping.Valid {
base.Mapping = applied.Mapping
}

if applied.Url.Valid {
base.Url = applied.Url
if applied.URL.Valid {
base.URL = applied.URL
}

if applied.InsecureSkipTLSVerify.Valid {
Expand Down Expand Up @@ -133,8 +126,8 @@ func (base Config) Apply(applied Config) Config {
base.KeepNameTag = applied.KeepNameTag
}

if applied.KeepUrlTag.Valid {
base.KeepUrlTag = applied.KeepUrlTag
if applied.KeepURLTag.Valid {
base.KeepURLTag = applied.KeepURLTag
}

if len(applied.Headers) > 0 {
Expand All @@ -154,12 +147,8 @@ func ParseArg(arg string) (Config, error) {
return c, err
}

if v, ok := params["mapping"].(string); ok {
c.Mapping = null.StringFrom(v)
}

if v, ok := params["url"].(string); ok {
c.Url = null.StringFrom(v)
c.URL = null.StringFrom(v)
}

if v, ok := params["insecureSkipTLSVerify"].(bool); ok {
Expand Down Expand Up @@ -193,7 +182,7 @@ func ParseArg(arg string) (Config, error) {
}

if v, ok := params["keepUrlTag"].(bool); ok {
c.KeepUrlTag = null.BoolFrom(v)
c.KeepURLTag = null.BoolFrom(v)
}

c.Headers = make(map[string]string)
Expand Down Expand Up @@ -249,12 +238,8 @@ func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, a
}
}

if mapping, mappingDefined := env["K6_PROMETHEUS_MAPPING"]; mappingDefined {
result.Mapping = null.StringFrom(mapping)
}

if url, urlDefined := env["K6_PROMETHEUS_REMOTE_URL"]; urlDefined {
result.Url = null.StringFrom(url)
result.URL = null.StringFrom(url)
}

if b, err := getEnvBool(env, "K6_PROMETHEUS_INSECURE_SKIP_TLS_VERIFY"); err != nil {
Expand Down Expand Up @@ -298,7 +283,7 @@ func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, a
return result, err
} else {
if b.Valid {
result.KeepUrlTag = b
result.KeepURLTag = b
}
}

Expand Down
Loading