diff --git a/cmd/containerhelper/handlers/all.go b/cmd/containerhelper/handlers/all.go index c770db3..325b7b1 100644 --- a/cmd/containerhelper/handlers/all.go +++ b/cmd/containerhelper/handlers/all.go @@ -20,4 +20,5 @@ func init() { model.RegisterHandler("tcpProxy", tcpProxyHandler) model.RegisterHandler("fixout", fixOutHandler) model.RegisterHandler("countZombies", countZombiesHandler) + model.RegisterHandler("countThread", countThreadHandler) } diff --git a/cmd/containerhelper/handlers/countThread.go b/cmd/containerhelper/handlers/countThread.go new file mode 100644 index 0000000..762e371 --- /dev/null +++ b/cmd/containerhelper/handlers/countThread.go @@ -0,0 +1,59 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package handlers + +import ( + "bufio" + "fmt" + "github.com/shirou/gopsutil/v3/process" + "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "os" + "strconv" + "strings" +) + +// Count the number of thread +func countThreadHandler(_ string, resp *model.Resp) error { + pids, err := process.Pids() + if err != nil { + return err + } + + values := make(map[string]interface{}) + totalThreads := readTotalThreads(pids) + values["process_pids"] = len(pids) + values["process_threads"] = totalThreads + resp.Data = values + return nil +} + +// Only runs on Linux +// Get total threads count of OS +func readTotalThreads(pids []int32) int { + totalThreads := int64(0) + for _, pid := range pids { + path := fmt.Sprintf("/proc/%d/status", pid) + file, err := os.Open(path) + if err != nil { + continue + } else { + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "Threads:\t") { + x, err := strconv.ParseInt(line[len("Threads:\t"):], 10, 64) + if err != nil { + continue + } else { + totalThreads += x + } + } + } + file.Close() + } + } + return int(totalThreads) +} diff --git a/pkg/collectconfig/executor/consumer_log_sub_detail.go b/pkg/collectconfig/executor/consumer_log_sub_detail.go index 9391467..1fe8513 100644 --- a/pkg/collectconfig/executor/consumer_log_sub_detail.go +++ b/pkg/collectconfig/executor/consumer_log_sub_detail.go @@ -11,6 +11,7 @@ import ( "github.com/traas-stack/holoinsight-agent/pkg/logger" "github.com/traas-stack/holoinsight-agent/pkg/model" "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/gateway" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/sls" "github.com/traas-stack/holoinsight-agent/pkg/server/gateway/pb" "github.com/traas-stack/holoinsight-agent/pkg/util" "go.uber.org/zap" @@ -42,7 +43,9 @@ func (c *detailConsumer) MaybeFlush() { }, }, Extension: map[string]string{ - "details": "1", + // for ceresdb4 + "details": "1", + "configKey": c.parent.ct.Config.Key, }, Timestamp: 0, Completeness: nil, @@ -65,7 +68,17 @@ func (c *detailConsumer) MaybeFlush() { } begin := time.Now() - err := gateway.GetWriteService().WriteV4(context.Background(), &gateway.WriteV4Request{Batch: []*pb.WriteMetricsRequestV4_TaskResult{tr}}) + + var err error + switch c.parent.task.Output.Type { + case "gateway": + err = gateway.GetWriteService().WriteV4(context.Background(), &gateway.WriteV4Request{Batch: []*pb.WriteMetricsRequestV4_TaskResult{tr}}) + case "sls": + err = sls.GetOutPut().WriteToSLS(c.parent.ct.Config.Key, c.parent.ct.Target.Key, c.table, c.parent.task.Output.Sls) + default: + logger.Warnf("detail output type %s is not support!", c.parent.task.Output.Type) + } + sendCost := time.Since(begin) logger.Infoz("detail", zap.String("key", c.parent.key), zap.Int("count", len(c.table.Rows)), zap.Duration("sendCost", sendCost), zap.Error(err)) c.table = nil diff --git a/pkg/collectconfig/executor/log-to-sls.json b/pkg/collectconfig/executor/log-to-sls.json new file mode 100644 index 0000000..0e2f629 --- /dev/null +++ b/pkg/collectconfig/executor/log-to-sls.json @@ -0,0 +1,70 @@ +{ + "json": { + "select": { + }, + "from": { + "type": "log", + "log": { + "path": [ + { + "type": "sls" + } + ] + } + }, + "groupBy": { + "groups": [ + { + "name": "value", + "elect": { + "type": "line" + } + } + ], + "details": { + "enabled": true + } + }, + "window": { + "interval": 60000 + }, + "output": { + "type": "sls", + "sls": { + "endpoint": "", + "project": "", + "logstore": "", + "ak": "", + "sk": "" + } + } + }, + "collectRange": { + "type": "cloudmonitor", + "cloudmonitor": { + "table": "sls_shard", + "condition": [ + { + "endpoint": [ + "" + ], + "sk": [ + "" + ], + "project": [ + "" + ], + "ak": [ + "" + ], + "logstore": [ + "" + ] + } + ] + } + }, + "executeRule": { + "fixedRate": 0 + } +} \ No newline at end of file diff --git a/pkg/collectconfig/type.go b/pkg/collectconfig/type.go index 5f35c31..f882f98 100644 --- a/pkg/collectconfig/type.go +++ b/pkg/collectconfig/type.go @@ -89,8 +89,16 @@ type ( Interval interface{} `json:"interval"` } Output struct { - Type string `json:"type"` - Gateway *Gateway `json:"gateway"` + Type string `json:"type"` + Gateway *Gateway `json:"gateway"` + Sls *SlsConfig `json:"sls"` + } + SlsConfig struct { + Endpoint string `json:"endpoint"` + AK string `json:"ak"` + SK string `json:"sk"` + Project string `json:"project"` + Logstore string `json:"logstore"` } Gateway struct { // 用户可以覆盖, 否则默认使用 tableName diff --git a/pkg/pipeline/standard/outputs.go b/pkg/pipeline/standard/outputs.go index 294394d..3012d6a 100644 --- a/pkg/pipeline/standard/outputs.go +++ b/pkg/pipeline/standard/outputs.go @@ -11,14 +11,16 @@ import ( type ( Output struct { - Tenant string - O output.Output + Tenant string + O output.Output + ConfigKey string } ) func (o *Output) Write(metrics []*model.Metric) { oe := output.Extension{ - Tenant: o.Tenant, + Tenant: o.Tenant, + ConfigKey: o.ConfigKey, } o.O.WriteMetricsV1(metrics, oe) } diff --git a/pkg/pipeline/standard/parse.go b/pkg/pipeline/standard/parse.go index db61453..cae2e91 100644 --- a/pkg/pipeline/standard/parse.go +++ b/pkg/pipeline/standard/parse.go @@ -39,8 +39,9 @@ func ParsePipeline(task *collecttask.CollectTask) (*Pipeline, error) { tenant := task.Target.GetTenant() to := &Output{ - Tenant: tenant, - O: out, + Tenant: tenant, + ConfigKey: task.Config.Key, + O: out, } return NewPipeline(task, baseConf, i, to) diff --git a/pkg/plugin/input/all/all.go b/pkg/plugin/input/all/all.go index a2f6647..21c2902 100644 --- a/pkg/plugin/input/all/all.go +++ b/pkg/plugin/input/all/all.go @@ -13,5 +13,6 @@ import ( _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/process" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/swap" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/tcp" + _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/thread" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/traffic" ) diff --git a/pkg/plugin/input/thread/init.go b/pkg/plugin/input/thread/init.go new file mode 100644 index 0000000..39e3d8c --- /dev/null +++ b/pkg/plugin/input/thread/init.go @@ -0,0 +1,19 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package thread + +import ( + "github.com/traas-stack/holoinsight-agent/pkg/collecttask" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/api" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/standard/providers" +) + +func init() { + providers.RegisterInputProvider("countthreadtask", func(task *collecttask.CollectTask) (api.Input, error) { + return &input{ + task: task, + }, nil + }) +} diff --git a/pkg/plugin/input/thread/input.go b/pkg/plugin/input/thread/input.go new file mode 100644 index 0000000..599c74e --- /dev/null +++ b/pkg/plugin/input/thread/input.go @@ -0,0 +1,60 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package thread + +import ( + "context" + "encoding/json" + "github.com/spf13/cast" + cm "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "github.com/traas-stack/holoinsight-agent/pkg/collecttask" + "github.com/traas-stack/holoinsight-agent/pkg/core" + "github.com/traas-stack/holoinsight-agent/pkg/cri" + "github.com/traas-stack/holoinsight-agent/pkg/cri/criutils" + "github.com/traas-stack/holoinsight-agent/pkg/ioc" + "github.com/traas-stack/holoinsight-agent/pkg/model" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/api" +) + +type ( + input struct { + task *collecttask.CollectTask + } +) + +func (i *input) GetDefaultPrefix() string { + return "" +} + +func (i *input) Collect(a api.Accumulator) error { + if !i.task.Target.IsTypePod() { + return nil + } + + biz, err := criutils.GetMainBizContainerE(ioc.Crii, i.task.Target.GetNamespace(), i.task.Target.GetPodName()) + if err != nil { + return err + } + + r, err := ioc.Crii.Exec(context.Background(), biz, cri.ExecRequest{ + Cmd: []string{core.HelperToolPath, "countThread"}, + }) + if err != nil { + return err + } + resp := &cm.Resp{} + if err := json.NewDecoder(r.Stdout).Decode(resp); err != nil { + return err + } + data := resp.Data.(map[string]interface{}) + for metric, value := range data { + a.AddMetric(&model.Metric{ + Name: metric, + Tags: make(map[string]string), + Value: cast.ToFloat64(value), + }) + } + return nil +} diff --git a/pkg/plugin/output/all/all.go b/pkg/plugin/output/all/all.go index d54b5df..e8f08ab 100644 --- a/pkg/plugin/output/all/all.go +++ b/pkg/plugin/output/all/all.go @@ -7,4 +7,5 @@ package all import ( _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/console" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/gateway" + _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/output/sls" ) diff --git a/pkg/plugin/output/gateway/gateway.go b/pkg/plugin/output/gateway/gateway.go index 4aa91a7..ffaa295 100644 --- a/pkg/plugin/output/gateway/gateway.go +++ b/pkg/plugin/output/gateway/gateway.go @@ -46,13 +46,14 @@ func (c *gatewayOutput) WriteMetricsV1(metrics []*model.Metric, oe output.Extens NoMerge: false, } + request.Extension = make(map[string]string) if oe.Tenant != "" { - - request.Extension = map[string]string{ - "tenant": oe.Tenant, - } + request.Extension["tenant"] = oe.Tenant request.NoMerge = true } + if oe.ConfigKey != "" { + request.Extension["configKey"] = oe.ConfigKey + } err := GetWriteService().WriteV1(context.Background(), request) if err != nil { diff --git a/pkg/plugin/output/output.go b/pkg/plugin/output/output.go index fc726c6..636aa4e 100644 --- a/pkg/plugin/output/output.go +++ b/pkg/plugin/output/output.go @@ -8,7 +8,8 @@ import "github.com/traas-stack/holoinsight-agent/pkg/model" type ( Extension struct { - Tenant string + Tenant string + ConfigKey string } Output interface { WriteMetricsV1([]*model.Metric, Extension) diff --git a/pkg/plugin/output/sls/init.go b/pkg/plugin/output/sls/init.go new file mode 100644 index 0000000..b96554a --- /dev/null +++ b/pkg/plugin/output/sls/init.go @@ -0,0 +1,13 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package sls + +import "github.com/traas-stack/holoinsight-agent/pkg/plugin/output" + +func init() { + output.Register("sls", func(config output.Config) (output.Output, error) { + return NewSLSOutput() + }) +} diff --git a/pkg/plugin/output/sls/sls.go b/pkg/plugin/output/sls/sls.go new file mode 100644 index 0000000..db6076d --- /dev/null +++ b/pkg/plugin/output/sls/sls.go @@ -0,0 +1,124 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package sls + +import ( + "fmt" + aliyunsls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/traas-stack/holoinsight-agent/pkg/collectconfig" + "github.com/traas-stack/holoinsight-agent/pkg/logger" + "github.com/traas-stack/holoinsight-agent/pkg/model" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/output" + "github.com/traas-stack/holoinsight-agent/pkg/util" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "net/http" + "time" +) + +var ( + dnsCacheHelper *util.DnsCacheHelper + slsHttpclient *http.Client + slsOutput = &SLSOutput{clientCache: make(map[string]*aliyunsls.Client)} +) + +func init() { + dnsCacheHelper = util.NewDnsCacheHelper() + dnsCacheHelper.Start() + slsHttpclient = dnsCacheHelper.NewHttpClient() +} + +type ( + SLSOutput struct { + clientCache map[string]*aliyunsls.Client + } +) + +func GetOutPut() *SLSOutput { + return slsOutput +} + +func (c *SLSOutput) WriteMetricsV1(metrics []*model.Metric, extension output.Extension) { +} + +func NewSLSOutput() (output.Output, error) { + return slsOutput, nil +} + +func (c *SLSOutput) WriteBatchAsync(configKey, targetKey, metricName string, array []*model.DetailData) error { + return nil +} + +func (c *SLSOutput) WriteBatchV4(configKey, targetKey, metricName string, array []*model.DetailData, _ *output.PeriodCompleteness) error { + return nil +} + +func (c *SLSOutput) Start() { +} + +func (c *SLSOutput) Stop() { +} + +func (c *SLSOutput) WriteToSLS(configKey, targetKey string, table *model.Table, slsConfig *collectconfig.SlsConfig) error { + if slsConfig == nil { + return nil + } + client := c.getSLSClient(slsConfig) + + // Create a log group + logGroup := &aliyunsls.LogGroup{} + + //Create log contents + for _, row := range table.Rows { + contents := make([]*aliyunsls.LogContent, 0) + for i, value := range row.TagValues { + content := &aliyunsls.LogContent{ + Key: proto.String(table.Header.TagKeys[i]), + Value: proto.String(value), + } + contents = append(contents, content) + } + // Create a log entry + log := &aliyunsls.Log{ + Time: proto.Uint32(uint32(time.Now().Unix())), + Contents: contents, + } + + // Add log to log group + logGroup.Logs = append(logGroup.Logs, log) + // PUT log to SLS + err := client.PutLogs(slsConfig.Project, slsConfig.Logstore, logGroup) + if err != nil { + logger.Errorz("detail log write to sls error", zap.String("configKey", configKey), zap.String("targetKey", targetKey), zap.Error(err)) + return err + } + } + + logger.Infoz("detail log write to sls success", zap.String("configKey", configKey), zap.String("targetKey", targetKey), zap.Int("count", len(table.Rows))) + return nil +} + +func (c *SLSOutput) buildKey(endpoint, project, logstore string) string { + return fmt.Sprintf("%s/%s/%s", endpoint, project, logstore) +} + +func (c *SLSOutput) getSLSClient(slsConfig *collectconfig.SlsConfig) *aliyunsls.Client { + key := c.buildKey(slsConfig.Endpoint, slsConfig.Project, slsConfig.Logstore) + var client *aliyunsls.Client + if v, ok := c.clientCache[key]; ok { + client = v + } else { + client = &aliyunsls.Client{ + Endpoint: slsConfig.Endpoint, + AccessKeyID: slsConfig.AK, + AccessKeySecret: slsConfig.SK, + RequestTimeOut: 5 * time.Second, + RetryTimeOut: 5 * time.Second, + HTTPClient: slsHttpclient, + } + c.clientCache[key] = client + } + return client +}