Skip to content

Commit

Permalink
feat: ref meta from pod meta (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Mar 27, 2024
1 parent 07840cf commit fa72556
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 27 deletions.
9 changes: 8 additions & 1 deletion pkg/agent/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package server

import (
"fmt"
"github.com/traas-stack/holoinsight-agent/pkg/appconfig"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"net/http"
"reflect"
"runtime"
Expand Down Expand Up @@ -37,7 +39,12 @@ func (h *HttpServerComponent) Start() {
go func() {
apiHandleFuncMux.HandleFunc("/", printHelp)

addr := fmt.Sprintf("127.0.0.1:%d", bindPort)
bindIp := "127.0.0.1"
if appconfig.StdAgentConfig.Mode != core.AgentModeDaemonset {
bindIp = "0.0.0.0"
}
addr := fmt.Sprintf("%s:%d", bindIp, bindPort)

logger.Infoz("[http] start http server", zap.String("addr", addr))
h.server = &http.Server{Addr: addr, Handler: apiHandleFuncMux}
if err := h.server.ListenAndServe(); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/collectconfig/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ const (
EElectRefVar = "refVar"
)

const (
ElectRefMetaTypePodLabels = "labels"
ElectRefMetaTypePodAnnotations = "annotations"
)

type (
// 1. pre-where: 可选, 只能支持左起右至的elect
// 2. structure: 可选, 使得数据结构化
Expand Down
99 changes: 99 additions & 0 deletions pkg/pipeline/standard/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package standard

import (
"github.com/traas-stack/holoinsight-agent/pkg/appconfig"
"github.com/traas-stack/holoinsight-agent/pkg/collectconfig"
"github.com/traas-stack/holoinsight-agent/pkg/collecttask"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"github.com/traas-stack/holoinsight-agent/pkg/ioc"
"github.com/traas-stack/holoinsight-agent/pkg/meta"
"github.com/traas-stack/holoinsight-agent/pkg/pipeline/integration/base"
"github.com/traas-stack/holoinsight-agent/pkg/util"
"strings"
)

func getPodMeta(task *collecttask.CollectTask, name string, pod *cri.Pod) (*cri.Pod, string) {
if pod == nil && ioc.Crii != nil {
pod, _ = ioc.Crii.GetPod(task.Target.GetNamespace(), task.Target.GetPodName())
}
if pod == nil {
return pod, ""
}
if v, ok := pod.Labels[name]; ok && v != "" {
return pod, v
}
if v, ok := pod.Annotations[name]; ok && v != "" {
return pod, v
}

ss := strings.SplitN(name, ".", 2)
if len(ss) != 2 {
return pod, ""
}
subType := ss[0]
subKey := ss[1]
switch subType {
case collectconfig.ElectRefMetaTypePodLabels:
return pod, pod.Labels[subKey]
case collectconfig.ElectRefMetaTypePodAnnotations:
return pod, pod.Annotations[subKey]
default:
return pod, ""
}
}

// createCommonTags creates common tags for this Pipeline
func createCommonTags(task *collecttask.CollectTask, baseConf *base.Conf) map[string]string {
tags := make(map[string]string, len(baseConf.RefMetas))

for key, item := range baseConf.RefMetas {
if value, ok := task.Target.Meta[item.Name]; ok && value != "" {
tags[key] = value
}
}

switch task.Target.Type {
case collecttask.TargetLocalhost:
for key, item := range baseConf.RefMetas {
if _, ok := tags[key]; ok {
continue
}
if value := getLocalhostMeta(item.Name); value != "" {
tags[key] = value
}
}
case collecttask.TargetPod:
var pod *cri.Pod
var value string
for key, item := range baseConf.RefMetas {
if _, ok := tags[key]; ok {
continue
}
if pod, value = getPodMeta(task, item.Name, pod); value != "" {
tags[key] = value
}
}
}

meta.SuppressCommonTags(tags)
return tags
}

func getLocalhostMeta(name string) string {
var value string
switch name {
case "app":
value = appconfig.StdAgentConfig.App
case "ip":
value = util.GetLocalIp()
case "host":
fallthrough
case "hostname":
value = util.GetHostname()
}
return value
}
32 changes: 6 additions & 26 deletions pkg/pipeline/standard/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/spf13/cast"
containerhelpermodel "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model"
"github.com/traas-stack/holoinsight-agent/pkg/accumulator"
"github.com/traas-stack/holoinsight-agent/pkg/appconfig"
"github.com/traas-stack/holoinsight-agent/pkg/collecttask"
"github.com/traas-stack/holoinsight-agent/pkg/collecttask/collecttaskcri"
"github.com/traas-stack/holoinsight-agent/pkg/core"
Expand Down Expand Up @@ -50,7 +49,7 @@ type (
stopCh chan struct{}
stoppedCh chan struct{}

transform base.Transform
baseConf *base.Conf

state *internalState
mutex sync.RWMutex
Expand Down Expand Up @@ -90,26 +89,7 @@ func (p *Pipeline) View(f func(api2.Pipeline)) {
}

func NewPipeline(task *collecttask.CollectTask, baseConf *base.Conf, input interface{}, output *Output) (*Pipeline, error) {
tags := make(map[string]string, len(baseConf.RefMetas))
for key, item := range baseConf.RefMetas {
value := task.Target.Meta[item.Name]
if value == "" && task.Target.Type == collecttask.TargetLocalhost {
switch item.Name {
case "app":
value = appconfig.StdAgentConfig.App
case "ip":
value = util.GetLocalIp()
case "host":
fallthrough
case "hostname":
value = util.GetHostname()
}
}
if value != "" {
tags[key] = value
}
}
meta.SuppressCommonTags(tags)
tags := createCommonTags(task, baseConf)

intervalMills := 0
offsetMills := 0
Expand Down Expand Up @@ -150,7 +130,7 @@ func NewPipeline(task *collecttask.CollectTask, baseConf *base.Conf, input inter
state: &internalState{
timer: timer,
},
transform: baseConf.Transform,
baseConf: baseConf,
scriptManager: newScriptManager(task.Key, baseConf.Transform.Scripts, task.Target.Meta),
}, nil
}
Expand Down Expand Up @@ -378,7 +358,7 @@ func (p *Pipeline) transformMetrics(metricTime time.Time, m *accumulator.Memory)

metric.Timestamp = ts

if mc, ok := p.transform.MetricConfigs[metric.Name]; ok {
if mc, ok := p.baseConf.Transform.MetricConfigs[metric.Name]; ok {
metricKey := model.BuildMetricKey(metric)
metricValueCache[metricKey] = metric.Value

Expand Down Expand Up @@ -417,11 +397,11 @@ func (p *Pipeline) transformMetrics(metricTime time.Time, m *accumulator.Memory)

m.Metrics = p.scriptManager.run(m.Metrics)

if x := p.transform.MetricPrefix; x != "" {
if x := p.baseConf.Transform.MetricPrefix; x != "" {
for _, metric := range m.Metrics {
metric.Name = x + metric.Name
}
} else if x := p.transform.MetricFormat; x != "" {
} else if x := p.baseConf.Transform.MetricFormat; x != "" {
for _, metric := range m.Metrics {
metric.Name = fmt.Sprintf(x, metric.Name)
}
Expand Down

0 comments on commit fa72556

Please sign in to comment.