Skip to content

Commit

Permalink
feat: supports using scripts to modify pipeline execution results
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo committed Mar 18, 2024
1 parent 3ccbd6b commit 9790be3
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 2 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/bep/debounce v1.2.1
github.com/containerd/containerd v1.5.13
github.com/containerd/typeurl v1.0.2
github.com/d5/tengo/v2 v2.17.0
github.com/docker/docker v20.10.14+incompatible
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153
github.com/go-kit/log v0.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1S
github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s=
github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8=
github.com/d2g/hardwareaddr v0.0.0-20190221164911-e7d9fbe030e4/go.mod h1:bMl4RjIciD2oAxI7DmWRx6gbeqrkoLqv3MV0vzNad+I=
github.com/d5/tengo/v2 v2.17.0 h1:BWUN9NoJzw48jZKiYDXDIF3QrIVZRm1uV1gTzeZ2lqM=
github.com/d5/tengo/v2 v2.17.0/go.mod h1:XRGjEs5I9jYIKTxly6HCF8oiiilk5E/RYXOZ5b0DZC8=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/integration/base/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type (
// Metric black list
MetricBlacklist []string `json:"metricBlacklist,omitempty"`
MetricConfigs map[string]*MetricConfig `json:"metricConfigs,omitempty"`
Scripts []string `json:"scripts"`
}
MetricConfig struct {
// DELTA = currentValue - lastValue
Expand Down
6 changes: 5 additions & 1 deletion pkg/pipeline/standard/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type (
mutex sync.RWMutex

lastMetricValueCache map[string]float64
scriptManager *scriptManager
}
internalState struct {
timer *util.AlignedTimer
Expand Down Expand Up @@ -149,7 +150,8 @@ func NewPipeline(task *collecttask.CollectTask, baseConf *base.Conf, input inter
state: &internalState{
timer: timer,
},
transform: baseConf.Transform,
transform: baseConf.Transform,
scriptManager: newScriptManager(task.Key, baseConf.Transform.Scripts, task.Target.Meta),
}, nil
}
func (p *Pipeline) Start() error {
Expand Down Expand Up @@ -413,6 +415,8 @@ func (p *Pipeline) transformMetrics(metricTime time.Time, m *accumulator.Memory)
}
m.Metrics = keep

m.Metrics = p.scriptManager.run(m.Metrics)

if x := p.transform.MetricPrefix; x != "" {
for _, metric := range m.Metrics {
metric.Name = x + metric.Name
Expand Down
148 changes: 148 additions & 0 deletions pkg/pipeline/standard/script.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package standard

import (
"context"
"github.com/d5/tengo/v2"
"github.com/traas-stack/holoinsight-agent/pkg/logger"
"github.com/traas-stack/holoinsight-agent/pkg/model"
"github.com/traas-stack/holoinsight-agent/pkg/util"
"go.uber.org/zap"
"time"
)

type (
scriptManager struct {
key string
scripts []string
inited bool
compiledScripts []*tengo.Compiled
targetMap map[string]tengo.Object
}
)

func newScriptManager(key string, scripts []string, target map[string]string) *scriptManager {
targetMap := make(map[string]tengo.Object, len(target))
for k, v := range target {
targetMap[k] = &tengo.String{Value: v}
}
return &scriptManager{key: key, scripts: scripts, targetMap: targetMap}
}

func (m *scriptManager) ensureInited() {
if m.inited {
return
}
m.inited = true

for _, script := range m.scripts {
ts := tengo.NewScript([]byte(script))
ts.Add("result", nil)
ts.Add("tags", nil)
ts.Add("target", nil)
ts.Add("divide", DivideFunc)
if compiled, err := ts.Compile(); err == nil {
m.compiledScripts = append(m.compiledScripts, compiled)
} else {
logger.Errorz("[pipeline] fail to compile script", zap.String("key", m.key), zap.String("script", script), zap.Error(err))
}
}
}

func (m *scriptManager) run(metrics []*model.Metric) []*model.Metric {
m.ensureInited()
if len(m.compiledScripts) == 0 {
return metrics
}
var groupByTags = make(map[string][]*model.Metric)
for _, metric := range metrics {
key := util.BuildTagsKey(metric.Tags)
groupByTags[key] = append(groupByTags[key], metric)
}

var modifiedMetrics []*model.Metric

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

for _, metrics := range groupByTags {

resultMap := map[string]tengo.Object{}
for _, metric := range metrics {
resultMap[metric.Name] = &tengo.Float{Value: metric.Value}
}

tagsMap := toTengoMapStringString(metrics[0].Tags)

// execute scripts
for _, script := range m.compiledScripts {
script.Set("result", resultMap)
script.Set("tags", tagsMap)
script.Set("target", m.targetMap)
script.RunContext(ctx)
script.Set("result", nil)
script.Set("tags", nil)
script.Set("target", nil)
}

tags := toGoMapStringString(tagsMap)

for name, v := range resultMap {
f64, ok := tengo.ToFloat64(v)
if !ok {
continue
}
modifiedMetrics = append(modifiedMetrics, &model.Metric{
Name: name,
Tags: tags,
Timestamp: metrics[0].Timestamp,
Value: f64,
})
}
}
return modifiedMetrics
}

func DivideFunc(args ...tengo.Object) (tengo.Object, error) {
m := args[0]

var leftv float64
var rightv float64

{
left, err := m.IndexGet(args[1])
if err == nil {
leftv, _ = tengo.ToFloat64(left)
}
}

{
right, err := m.IndexGet(args[2])
if err == nil {
rightv, _ = tengo.ToFloat64(right)
}
}

if rightv == 0 {
return tengo.FromInterface(0.0)
}

return tengo.FromInterface(leftv / rightv)
}

func toTengoMapStringString(tags map[string]string) map[string]tengo.Object {
tagsMap := map[string]tengo.Object{}
for k, v := range tags {
tagsMap[k] = &tengo.String{Value: v}
}
return tagsMap
}

func toGoMapStringString(tagsMap map[string]tengo.Object) map[string]string {
tags := make(map[string]string, len(tagsMap))
for k, v := range tagsMap {
if s, ok := tengo.ToString(v); ok {
tags[k] = s
}
}
return tags
}
25 changes: 24 additions & 1 deletion pkg/util/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

package util

import "sync"
import (
"sort"
"strings"
"sync"
)

func CopyStringMap(m map[string]string) map[string]string {
newMap := make(map[string]string, len(m))
Expand Down Expand Up @@ -46,3 +50,22 @@ func MergeStringMapTo(a map[string]string, to map[string]string, override bool)
}
return to
}

// BuildTagsKey build string from tags
// "key1=value1 key2=value2 key3=value3"
// key1 ... keyn is sorted
func BuildTagsKey(tags map[string]string) string {
keys := make([]string, 0, len(tags))
for key := range tags {
keys = append(keys, key)
}
sort.Strings(keys)
sb := strings.Builder{}
for _, key := range keys {
sb.WriteString(key)
sb.WriteByte('=')
sb.WriteString(tags[key])
sb.WriteByte(' ')
}
return sb.String()
}

0 comments on commit 9790be3

Please sign in to comment.