From 9790be3c63207982791934ead3e7fed6aa19bc90 Mon Sep 17 00:00:00 2001 From: xzchaoo Date: Mon, 18 Mar 2024 11:55:54 +0800 Subject: [PATCH] feat: supports using scripts to modify pipeline execution results --- go.mod | 1 + go.sum | 2 + pkg/pipeline/integration/base/conf.go | 1 + pkg/pipeline/standard/pipeline.go | 6 +- pkg/pipeline/standard/script.go | 148 ++++++++++++++++++++++++++ pkg/util/map.go | 25 ++++- 6 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 pkg/pipeline/standard/script.go diff --git a/go.mod b/go.mod index 6db81ff..7f229ed 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/bep/debounce v1.2.1 github.com/containerd/containerd v1.5.13 github.com/containerd/typeurl v1.0.2 + github.com/d5/tengo/v2 v2.17.0 github.com/docker/docker v20.10.14+incompatible github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 github.com/go-kit/log v0.2.1 diff --git a/go.sum b/go.sum index d4a99eb..bb93aa2 100644 --- a/go.sum +++ b/go.sum @@ -272,6 +272,8 @@ github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1S github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s= github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8= github.com/d2g/hardwareaddr v0.0.0-20190221164911-e7d9fbe030e4/go.mod h1:bMl4RjIciD2oAxI7DmWRx6gbeqrkoLqv3MV0vzNad+I= +github.com/d5/tengo/v2 v2.17.0 h1:BWUN9NoJzw48jZKiYDXDIF3QrIVZRm1uV1gTzeZ2lqM= +github.com/d5/tengo/v2 v2.17.0/go.mod h1:XRGjEs5I9jYIKTxly6HCF8oiiilk5E/RYXOZ5b0DZC8= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/pkg/pipeline/integration/base/conf.go b/pkg/pipeline/integration/base/conf.go index 99a6a21..379a7c1 100644 --- a/pkg/pipeline/integration/base/conf.go +++ b/pkg/pipeline/integration/base/conf.go @@ -31,6 +31,7 @@ type ( // Metric black list MetricBlacklist []string `json:"metricBlacklist,omitempty"` MetricConfigs map[string]*MetricConfig `json:"metricConfigs,omitempty"` + Scripts []string `json:"scripts"` } MetricConfig struct { // DELTA = currentValue - lastValue diff --git a/pkg/pipeline/standard/pipeline.go b/pkg/pipeline/standard/pipeline.go index 2f8eef0..f4a882a 100644 --- a/pkg/pipeline/standard/pipeline.go +++ b/pkg/pipeline/standard/pipeline.go @@ -56,6 +56,7 @@ type ( mutex sync.RWMutex lastMetricValueCache map[string]float64 + scriptManager *scriptManager } internalState struct { timer *util.AlignedTimer @@ -149,7 +150,8 @@ func NewPipeline(task *collecttask.CollectTask, baseConf *base.Conf, input inter state: &internalState{ timer: timer, }, - transform: baseConf.Transform, + transform: baseConf.Transform, + scriptManager: newScriptManager(task.Key, baseConf.Transform.Scripts, task.Target.Meta), }, nil } func (p *Pipeline) Start() error { @@ -413,6 +415,8 @@ func (p *Pipeline) transformMetrics(metricTime time.Time, m *accumulator.Memory) } m.Metrics = keep + m.Metrics = p.scriptManager.run(m.Metrics) + if x := p.transform.MetricPrefix; x != "" { for _, metric := range m.Metrics { metric.Name = x + metric.Name diff --git a/pkg/pipeline/standard/script.go b/pkg/pipeline/standard/script.go new file mode 100644 index 0000000..654aefe --- /dev/null +++ b/pkg/pipeline/standard/script.go @@ -0,0 +1,148 @@ +package standard + +import ( + "context" + "github.com/d5/tengo/v2" + "github.com/traas-stack/holoinsight-agent/pkg/logger" + "github.com/traas-stack/holoinsight-agent/pkg/model" + "github.com/traas-stack/holoinsight-agent/pkg/util" + "go.uber.org/zap" + "time" +) + +type ( + scriptManager struct { + key string + scripts []string + inited bool + compiledScripts []*tengo.Compiled + targetMap map[string]tengo.Object + } +) + +func newScriptManager(key string, scripts []string, target map[string]string) *scriptManager { + targetMap := make(map[string]tengo.Object, len(target)) + for k, v := range target { + targetMap[k] = &tengo.String{Value: v} + } + return &scriptManager{key: key, scripts: scripts, targetMap: targetMap} +} + +func (m *scriptManager) ensureInited() { + if m.inited { + return + } + m.inited = true + + for _, script := range m.scripts { + ts := tengo.NewScript([]byte(script)) + ts.Add("result", nil) + ts.Add("tags", nil) + ts.Add("target", nil) + ts.Add("divide", DivideFunc) + if compiled, err := ts.Compile(); err == nil { + m.compiledScripts = append(m.compiledScripts, compiled) + } else { + logger.Errorz("[pipeline] fail to compile script", zap.String("key", m.key), zap.String("script", script), zap.Error(err)) + } + } +} + +func (m *scriptManager) run(metrics []*model.Metric) []*model.Metric { + m.ensureInited() + if len(m.compiledScripts) == 0 { + return metrics + } + var groupByTags = make(map[string][]*model.Metric) + for _, metric := range metrics { + key := util.BuildTagsKey(metric.Tags) + groupByTags[key] = append(groupByTags[key], metric) + } + + var modifiedMetrics []*model.Metric + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + for _, metrics := range groupByTags { + + resultMap := map[string]tengo.Object{} + for _, metric := range metrics { + resultMap[metric.Name] = &tengo.Float{Value: metric.Value} + } + + tagsMap := toTengoMapStringString(metrics[0].Tags) + + // execute scripts + for _, script := range m.compiledScripts { + script.Set("result", resultMap) + script.Set("tags", tagsMap) + script.Set("target", m.targetMap) + script.RunContext(ctx) + script.Set("result", nil) + script.Set("tags", nil) + script.Set("target", nil) + } + + tags := toGoMapStringString(tagsMap) + + for name, v := range resultMap { + f64, ok := tengo.ToFloat64(v) + if !ok { + continue + } + modifiedMetrics = append(modifiedMetrics, &model.Metric{ + Name: name, + Tags: tags, + Timestamp: metrics[0].Timestamp, + Value: f64, + }) + } + } + return modifiedMetrics +} + +func DivideFunc(args ...tengo.Object) (tengo.Object, error) { + m := args[0] + + var leftv float64 + var rightv float64 + + { + left, err := m.IndexGet(args[1]) + if err == nil { + leftv, _ = tengo.ToFloat64(left) + } + } + + { + right, err := m.IndexGet(args[2]) + if err == nil { + rightv, _ = tengo.ToFloat64(right) + } + } + + if rightv == 0 { + return tengo.FromInterface(0.0) + } + + return tengo.FromInterface(leftv / rightv) +} + +func toTengoMapStringString(tags map[string]string) map[string]tengo.Object { + tagsMap := map[string]tengo.Object{} + for k, v := range tags { + tagsMap[k] = &tengo.String{Value: v} + } + return tagsMap +} + +func toGoMapStringString(tagsMap map[string]tengo.Object) map[string]string { + tags := make(map[string]string, len(tagsMap)) + for k, v := range tagsMap { + if s, ok := tengo.ToString(v); ok { + tags[k] = s + } + } + return tags +} diff --git a/pkg/util/map.go b/pkg/util/map.go index fa4e6a5..5a390f6 100644 --- a/pkg/util/map.go +++ b/pkg/util/map.go @@ -4,7 +4,11 @@ package util -import "sync" +import ( + "sort" + "strings" + "sync" +) func CopyStringMap(m map[string]string) map[string]string { newMap := make(map[string]string, len(m)) @@ -46,3 +50,22 @@ func MergeStringMapTo(a map[string]string, to map[string]string, override bool) } return to } + +// BuildTagsKey build string from tags +// "key1=value1 key2=value2 key3=value3" +// key1 ... keyn is sorted +func BuildTagsKey(tags map[string]string) string { + keys := make([]string, 0, len(tags)) + for key := range tags { + keys = append(keys, key) + } + sort.Strings(keys) + sb := strings.Builder{} + for _, key := range keys { + sb.WriteString(key) + sb.WriteByte('=') + sb.WriteString(tags[key]) + sb.WriteByte(' ') + } + return sb.String() +}