diff --git a/cmd/containerhelper/handlers/all.go b/cmd/containerhelper/handlers/all.go index c770db3..325b7b1 100644 --- a/cmd/containerhelper/handlers/all.go +++ b/cmd/containerhelper/handlers/all.go @@ -20,4 +20,5 @@ func init() { model.RegisterHandler("tcpProxy", tcpProxyHandler) model.RegisterHandler("fixout", fixOutHandler) model.RegisterHandler("countZombies", countZombiesHandler) + model.RegisterHandler("countThread", countThreadHandler) } diff --git a/cmd/containerhelper/handlers/countThread.go b/cmd/containerhelper/handlers/countThread.go new file mode 100644 index 0000000..762e371 --- /dev/null +++ b/cmd/containerhelper/handlers/countThread.go @@ -0,0 +1,59 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package handlers + +import ( + "bufio" + "fmt" + "github.com/shirou/gopsutil/v3/process" + "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "os" + "strconv" + "strings" +) + +// Count the number of thread +func countThreadHandler(_ string, resp *model.Resp) error { + pids, err := process.Pids() + if err != nil { + return err + } + + values := make(map[string]interface{}) + totalThreads := readTotalThreads(pids) + values["process_pids"] = len(pids) + values["process_threads"] = totalThreads + resp.Data = values + return nil +} + +// Only runs on Linux +// Get total threads count of OS +func readTotalThreads(pids []int32) int { + totalThreads := int64(0) + for _, pid := range pids { + path := fmt.Sprintf("/proc/%d/status", pid) + file, err := os.Open(path) + if err != nil { + continue + } else { + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "Threads:\t") { + x, err := strconv.ParseInt(line[len("Threads:\t"):], 10, 64) + if err != nil { + continue + } else { + totalThreads += x + } + } + } + file.Close() + } + } + return int(totalThreads) +} diff --git a/pkg/plugin/input/all/all.go b/pkg/plugin/input/all/all.go index a2f6647..21c2902 100644 --- a/pkg/plugin/input/all/all.go +++ b/pkg/plugin/input/all/all.go @@ -13,5 +13,6 @@ import ( _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/process" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/swap" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/tcp" + _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/thread" _ "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/traffic" ) diff --git a/pkg/plugin/input/thread/init.go b/pkg/plugin/input/thread/init.go new file mode 100644 index 0000000..39e3d8c --- /dev/null +++ b/pkg/plugin/input/thread/init.go @@ -0,0 +1,19 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package thread + +import ( + "github.com/traas-stack/holoinsight-agent/pkg/collecttask" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/api" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/input/standard/providers" +) + +func init() { + providers.RegisterInputProvider("countthreadtask", func(task *collecttask.CollectTask) (api.Input, error) { + return &input{ + task: task, + }, nil + }) +} diff --git a/pkg/plugin/input/thread/input.go b/pkg/plugin/input/thread/input.go new file mode 100644 index 0000000..e741161 --- /dev/null +++ b/pkg/plugin/input/thread/input.go @@ -0,0 +1,63 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package thread + +import ( + "context" + "encoding/json" + "github.com/spf13/cast" + cm "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "github.com/traas-stack/holoinsight-agent/pkg/collecttask" + "github.com/traas-stack/holoinsight-agent/pkg/core" + "github.com/traas-stack/holoinsight-agent/pkg/cri" + "github.com/traas-stack/holoinsight-agent/pkg/cri/criutils" + "github.com/traas-stack/holoinsight-agent/pkg/ioc" + "github.com/traas-stack/holoinsight-agent/pkg/logger" + "github.com/traas-stack/holoinsight-agent/pkg/model" + "github.com/traas-stack/holoinsight-agent/pkg/plugin/api" + "go.uber.org/zap" +) + +type ( + input struct { + task *collecttask.CollectTask + } +) + +func (i *input) GetDefaultPrefix() string { + return "" +} + +func (i *input) Collect(a api.Accumulator) error { + if !i.task.Target.IsTypePod() { + return nil + } + + biz, err := criutils.GetMainBizContainerE(ioc.Crii, i.task.Target.GetNamespace(), i.task.Target.GetPodName()) + if err != nil { + return err + } + + r, err := ioc.Crii.Exec(context.Background(), biz, cri.ExecRequest{ + Cmd: []string{core.HelperToolPath, "countThread"}, + }) + if err != nil { + return err + } + resp := &cm.Resp{} + if err := json.NewDecoder(r.Stdout).Decode(resp); err != nil { + return err + } + data := resp.Data.(map[string]interface{}) + logger.Infoz("test thread count ", zap.Any("result", data)) + for metric, value := range data { + a.AddMetric(&model.Metric{ + Name: metric, + Tags: make(map[string]string), + Value: cast.ToFloat64(value), + }) + } + return nil +}