Skip to content

Commit

Permalink
Merge pull request #2 from k6io/dropDependancies
Browse files Browse the repository at this point in the history
Drop dependancies
  • Loading branch information
mstoykov authored Apr 2, 2021
2 parents 2d98b43 + 37d8bb4 commit d507d01
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/Shopify/sarama v1.16.0
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
github.com/kelseyhightower/envconfig v1.4.0
github.com/kubernetes/helm v2.9.0+incompatible
github.com/loadimpact/k6 v0.31.1
Expand Down
11 changes: 4 additions & 7 deletions pkg/kafka/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/loadimpact/k6/output"
jsono "github.com/loadimpact/k6/output/json"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/influxdb"
)

// Collector implements the lib.Collector interface and should be used only for testing
Expand Down Expand Up @@ -118,19 +116,18 @@ func (c *Collector) formatSamples(samples stats.Samples) ([]string, error) {

switch c.Config.Format.String {
case "influxdb":
i, err := influxdb.New(c.logger, c.Config.InfluxDBConfig)
var err error
fieldKinds, err := makeInfluxdbFieldKinds(c.Config.InfluxDBConfig.TagsAsFields)
if err != nil {
return nil, err
}

metrics, err = i.Format(samples)
metrics, err = formatAsInfluxdbV1(c.logger, samples, newExtractTagsFields(fieldKinds))
if err != nil {
return nil, err
}
default:
for _, sample := range samples {
env := jsono.WrapSample(sample)
metric, err := json.Marshal(env)
metric, err := json.Marshal(wrapSample(sample))
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats/influxdb"
)

// Config is the config for the kafka collector
Expand All @@ -43,7 +42,7 @@ type Config struct {
Format null.String `json:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval types.NullDuration `json:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`

InfluxDBConfig influxdb.Config `json:"influxdb"`
InfluxDBConfig influxdbConfig `json:"influxdb"`
}

// config is a duplicate of ConfigFields as we can not mapstructure.Decode into
Expand All @@ -54,15 +53,15 @@ type config struct {
Format string `json:"format" mapstructure:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval string `json:"push_interval" mapstructure:"push_interval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`

InfluxDBConfig influxdb.Config `json:"influxdb" mapstructure:"influxdb"`
InfluxDBConfig influxdbConfig `json:"influxdb" mapstructure:"influxdb"`
}

// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
return Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: influxdb.NewConfig(),
InfluxDBConfig: newInfluxdbConfig(),
}
}

Expand Down Expand Up @@ -96,7 +95,7 @@ func ParseArg(arg string) (Config, error) {
}

if v, ok := params["influxdb"].(map[string]interface{}); ok {
influxConfig, err := influxdb.ParseMap(v)
influxConfig, err := influxdbParseMap(v)
if err != nil {
return c, err
}
Expand Down
198 changes: 198 additions & 0 deletions pkg/kafka/format_influxdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2021 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package kafka

import (
"fmt"
"strconv"
"strings"

client "github.com/influxdata/influxdb1-client/v2"
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
)

type extractTagsToValuesFunc func(map[string]string, map[string]interface{}) map[string]interface{}

// format returns a string array of metrics in influx line-protocol
func formatAsInfluxdbV1(
logger logrus.FieldLogger, samples []stats.Sample, extractTagsToValues extractTagsToValuesFunc,
) ([]string, error) {
var metrics []string
type cacheItem struct {
tags map[string]string
values map[string]interface{}
}
cache := map[*stats.SampleTags]cacheItem{}
for _, sample := range samples {
var tags map[string]string
values := make(map[string]interface{})
if cached, ok := cache[sample.Tags]; ok {
tags = cached.tags
for k, v := range cached.values {
values[k] = v
}
} else {
tags = sample.Tags.CloneTags()
extractTagsToValues(tags, values)
cache[sample.Tags] = cacheItem{tags, values}
}
values["value"] = sample.Value
p, err := client.NewPoint(
sample.Metric.Name,
tags,
values,
sample.Time,
)
if err != nil {
logger.WithError(err).Error("InfluxDB: Couldn't make point from sample!")
return nil, err
}
metrics = append(metrics, p.String())
}

return metrics, nil
}

// FieldKind defines Enum for tag-to-field type conversion
type FieldKind int

const (
// String field (default)
String FieldKind = iota
// Int field
Int
// Float field
Float
// Bool field
Bool
)

func newExtractTagsFields(fieldKinds map[string]FieldKind) extractTagsToValuesFunc {
return func(tags map[string]string, values map[string]interface{}) map[string]interface{} {
for tag, kind := range fieldKinds {
if val, ok := tags[tag]; ok {
var v interface{}
var err error

switch kind {
case String:
v = val
case Bool:
v, err = strconv.ParseBool(val)
case Float:
v, err = strconv.ParseFloat(val, 64)
case Int:
v, err = strconv.ParseInt(val, 10, 64)
}
if err == nil {
values[tag] = v
} else {
values[tag] = val
}

delete(tags, tag)
}
}
return values
}
}

// makeFieldKinds reads the Config and returns a lookup map of tag names to
// the field type their values should be converted to.
func makeInfluxdbFieldKinds(tagsAsFields []string) (map[string]FieldKind, error) {
fieldKinds := make(map[string]FieldKind)
for _, tag := range tagsAsFields {
var fieldName, fieldType string
s := strings.SplitN(tag, ":", 2)
if len(s) == 1 {
fieldName, fieldType = s[0], "string"
} else {
fieldName, fieldType = s[0], s[1]
}

err := checkDuplicatedTypeDefinitions(fieldKinds, fieldName)
if err != nil {
return nil, err
}

switch fieldType {
case "string":
fieldKinds[fieldName] = String
case "bool":
fieldKinds[fieldName] = Bool
case "float":
fieldKinds[fieldName] = Float
case "int":
fieldKinds[fieldName] = Int
default:
return nil, fmt.Errorf("An invalid type (%s) is specified for an InfluxDB field (%s).",
fieldType, fieldName)
}
}

return fieldKinds, nil
}

func checkDuplicatedTypeDefinitions(fieldKinds map[string]FieldKind, tag string) error {
if _, found := fieldKinds[tag]; found {
return fmt.Errorf("A tag name (%s) shows up more than once in InfluxDB field type configurations.", tag)
}
return nil
}

func (c influxdbConfig) Apply(cfg influxdbConfig) influxdbConfig {
if len(cfg.TagsAsFields) > 0 {
c.TagsAsFields = cfg.TagsAsFields
}
return c
}

// ParseMap parses a map[string]interface{} into a Config
func influxdbParseMap(m map[string]interface{}) (influxdbConfig, error) {
c := influxdbConfig{}
if v, ok := m["tagsAsFields"].(string); ok {
m["tagsAsFields"] = []string{v}
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: types.NullDecoder,
Result: &c,
})
if err != nil {
return c, err
}

err = dec.Decode(m)
return c, err
}

type influxdbConfig struct {
TagsAsFields []string `json:"tagsAsFields,omitempty" envconfig:"K6_INFLUXDB_TAGS_AS_FIELDS"`
}

func newInfluxdbConfig() influxdbConfig {
c := influxdbConfig{
TagsAsFields: []string{"vu", "iter", "url"},
}
return c
}
60 changes: 60 additions & 0 deletions pkg/kafka/format_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
*
* k6 - a next-generation load testing tool
* Copyright (C) 2021 Load Impact
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package kafka

import (
"time"

"github.com/loadimpact/k6/stats"
)

// wrapSample is used to package a metric sample in a way that's nice to export
// to JSON.
func wrapSample(sample stats.Sample) envolope {
return envolope{
Type: "Point",
Metric: sample.Metric.Name,
Data: newJSONSample(sample),
}
}

// envolope is the data format we use to export both metrics and metric samples
// to the JSON file.
type envolope struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Metric string `json:"metric,omitempty"`
}

// jsonSample is the data format for metric sample data in the JSON file.
type jsonSample struct {
Time time.Time `json:"time"`
Value float64 `json:"value"`
Tags *stats.SampleTags `json:"tags"`
}

func newJSONSample(sample stats.Sample) jsonSample {
return jsonSample{
Time: sample.Time,
Value: sample.Value,
Tags: sample.Tags,
}
}

0 comments on commit d507d01

Please sign in to comment.