From 8dfecdf99070bc70f8b99ad322c53f130c6d5ef8 Mon Sep 17 00:00:00 2001 From: xzchaoo Date: Wed, 27 Mar 2024 11:19:44 +0800 Subject: [PATCH 1/2] fix: fix queue backlog issue --- pkg/plugin/output/gateway/consumer.go | 87 +++++++++++++++------------ pkg/plugin/output/gateway/service.go | 21 ++++++- 2 files changed, 68 insertions(+), 40 deletions(-) diff --git a/pkg/plugin/output/gateway/consumer.go b/pkg/plugin/output/gateway/consumer.go index 5bc3467..67b5387 100644 --- a/pkg/plugin/output/gateway/consumer.go +++ b/pkg/plugin/output/gateway/consumer.go @@ -22,7 +22,8 @@ type ( gw *gateway.Service } batchConsumerV4 struct { - gw *gateway.Service + gw *gateway.Service + semaphore chan struct{} } result struct { Resp *pb.WriteMetricsResponse @@ -45,8 +46,8 @@ type ( ) var ( - gatewayDiscardStat = stat.DefaultManager.Counter("gateway.discard") - gatewaySendStat = stat.DefaultManager.Counter("gateway.send") + gatewayDiscardStat = stat.DefaultManager1S.Counter("gateway.discard") + gatewaySendStat = stat.DefaultManager1S.Counter("gateway.send") ) func (b *batchConsumerV1) Consume(a []interface{}) { @@ -88,8 +89,11 @@ func (b *batchConsumerV1) Consume(a []interface{}) { } func (b *batchConsumerV4) Consume(a []interface{}) { + // copy one + a = append([]interface{}(nil), a...) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() + var taskResults []*pb.WriteMetricsRequestV4_TaskResult points := 0 for _, i := range a { @@ -103,45 +107,52 @@ func (b *batchConsumerV4) Consume(a []interface{}) { } begin := time.Now() - var err error - var resp *pb.WriteMetricsResponse + b.semaphore <- struct{}{} + go func() { + defer func() { + cancel() + <-b.semaphore + }() + var err error + var resp *pb.WriteMetricsResponse - for i := 0; i < 3; i++ { - resp, err = b.gw.WriteMetrics(ctx, taskResults) - if err != nil && strings.Contains(err.Error(), "connection refused") { - time.Sleep(300 * time.Millisecond) - continue + for i := 0; i < 3; i++ { + resp, err = b.gw.WriteMetrics(ctx, taskResults) + if err != nil && strings.Contains(err.Error(), "connection refused") { + time.Sleep(300 * time.Millisecond) + continue + } + break } - break - } - cost := time.Now().Sub(begin) - if err == nil && resp.Header.Code != 0 { - err = fmt.Errorf("server error %+v", resp.Header) - } + cost := time.Now().Sub(begin) + if err == nil && resp.Header.Code != 0 { + err = fmt.Errorf("server error %+v", resp.Header) + } - if err != nil { - logger.Errorz("[gateway] write error", zap.Error(err)) - // 统计丢数据数量 - gatewayDiscardStat.Add(nil, []int64{ // - int64(len(a)), // - }) - gatewaySendStat.Add([]string{"v4", "N"}, []int64{1, int64(len(a)), int64(points), cost.Milliseconds()}) - } else { - gatewaySendStat.Add([]string{"v4", "Y"}, []int64{1, int64(len(a)), int64(points), cost.Milliseconds()}) - } + if err != nil { + logger.Errorz("[gateway] write error", zap.Error(err)) + // 统计丢数据数量 + gatewayDiscardStat.Add(nil, []int64{ // + int64(len(a)), // + }) + gatewaySendStat.Add([]string{"v4", "N"}, []int64{1, int64(len(a)), int64(points), cost.Milliseconds()}) + } else { + gatewaySendStat.Add([]string{"v4", "Y"}, []int64{1, int64(len(a)), int64(points), cost.Milliseconds()}) + } - taskResult := &result{ - Resp: resp, - Err: err, - } - for _, i := range a { - switch x := i.(type) { - case *taskV4: - if x.resultCh != nil { - x.resultCh <- taskResult - close(x.resultCh) + taskResult := &result{ + Resp: resp, + Err: err, + } + for _, i := range a { + switch x := i.(type) { + case *taskV4: + if x.resultCh != nil { + x.resultCh <- taskResult + close(x.resultCh) + } } } - } + }() } diff --git a/pkg/plugin/output/gateway/service.go b/pkg/plugin/output/gateway/service.go index 519ea49..6d538b2 100644 --- a/pkg/plugin/output/gateway/service.go +++ b/pkg/plugin/output/gateway/service.go @@ -14,13 +14,15 @@ import ( "github.com/traas-stack/holoinsight-agent/pkg/server/gateway" "github.com/traas-stack/holoinsight-agent/pkg/server/gateway/pb" "github.com/traas-stack/holoinsight-agent/pkg/util/batch" + "github.com/traas-stack/holoinsight-agent/pkg/util/stat" "sync" "time" ) const ( defaultWriteBatchPointSize = 4096 - defaultWriteQueueSize = 65536 + defaultWriteQueueSize = 4096 + defaultSemaphore = 64 defaultWriteBatchWait = 500 * time.Millisecond ) @@ -77,7 +79,7 @@ func (w *writeServiceImpl) ensureGatewayInited0() { }, defaultWriteBatchPointSize)) bpV1.Run() - bpV4 := batch.NewBatchProcessor(defaultWriteQueueSize, &batchConsumerV4{gw: gateway}, + bpV4 := batch.NewBatchProcessor(defaultWriteQueueSize, &batchConsumerV4{gw: gateway, semaphore: make(chan struct{}, defaultSemaphore)}, batch.WithMaxWaitStrategy(defaultWriteBatchWait), batch.WithItemsWeightStrategy(func(i interface{}) int { switch x := i.(type) { @@ -96,6 +98,21 @@ func (w *writeServiceImpl) ensureGatewayInited0() { w.gateway = gateway w.bpV1 = bpV1 w.bpV4 = bpV4 + + stat.DefaultManager1S.Gauge("bpv1.pending", func() []stat.GaugeSubItem { + return []stat.GaugeSubItem{ + { + Values: []int64{int64(bpV1.Num())}, + }, + } + }) + stat.DefaultManager1S.Gauge("bpv4.pending", func() []stat.GaugeSubItem { + return []stat.GaugeSubItem{ + { + Values: []int64{int64(bpV4.Num())}, + }, + } + }) } func (w *writeServiceImpl) WriteV1(ctx context.Context, req *WriteV1Request) error { From b81d520568dd501432a992409a22b2f226b07394 Mon Sep 17 00:00:00 2001 From: xzchaoo Date: Wed, 27 Mar 2024 11:19:44 +0800 Subject: [PATCH 2/2] fix: do not execute the timeout command, it will generate zombie processes --- cmd/containerhelper/handlers/all.go | 1 + cmd/containerhelper/handlers/countZombies.go | 54 +++++ cmd/containerhelper/handlers/countZombies.sh | 20 ++ cmd/containerhelper/main.go | 11 +- pkg/core/helper.go | 2 + pkg/cri/criutils/zombies.go | 28 +++ pkg/cri/impl/default_cri.go | 218 ++++++++++++++----- pkg/cri/impl/engine/containerd_engine.go | 4 +- pkg/cri/impl/engine/docker_engine.go | 4 +- pkg/cri/impl/engine/utils.go | 8 +- pkg/cri/model.go | 32 +++ 11 files changed, 315 insertions(+), 67 deletions(-) create mode 100644 cmd/containerhelper/handlers/countZombies.go create mode 100644 cmd/containerhelper/handlers/countZombies.sh create mode 100644 pkg/cri/criutils/zombies.go diff --git a/cmd/containerhelper/handlers/all.go b/cmd/containerhelper/handlers/all.go index 3ad9428..c770db3 100644 --- a/cmd/containerhelper/handlers/all.go +++ b/cmd/containerhelper/handlers/all.go @@ -19,4 +19,5 @@ func init() { model.RegisterHandler("httpProxy", httpProxyHandler) model.RegisterHandler("tcpProxy", tcpProxyHandler) model.RegisterHandler("fixout", fixOutHandler) + model.RegisterHandler("countZombies", countZombiesHandler) } diff --git a/cmd/containerhelper/handlers/countZombies.go b/cmd/containerhelper/handlers/countZombies.go new file mode 100644 index 0000000..f8da392 --- /dev/null +++ b/cmd/containerhelper/handlers/countZombies.go @@ -0,0 +1,54 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package handlers + +import ( + "errors" + "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" +) + +const ( + countLimit = 100 +) + +// Count the number of zombie processes +func countZombiesHandler(_ string, resp *model.Resp) error { + count := 0 + filepath.Walk("/proc", func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + return nil + } + if _, err := strconv.ParseInt(info.Name(), 10, 32); err != nil { + return nil + } + content, err := os.ReadFile(filepath.Join(path, "/status")) + if err != nil { + return nil + } + lines := strings.Split(string(content), "\n") + for _, line := range lines { + if strings.HasPrefix(line, "State:") { + if strings.Contains(line, "zombie") { + count++ + } + break + } + } + if count > countLimit { + return errors.New("SkipAll") + } + return filepath.SkipDir + }) + resp.Data = count + return nil +} diff --git a/cmd/containerhelper/handlers/countZombies.sh b/cmd/containerhelper/handlers/countZombies.sh new file mode 100644 index 0000000..bc4a0e0 --- /dev/null +++ b/cmd/containerhelper/handlers/countZombies.sh @@ -0,0 +1,20 @@ +#!/bin/sh +set -e + +# docs: count zombies processes +COUNT_LIMIT=100 + +count=0 +for i in /proc/*; do + b=`basename $i` + if [ -e "$i/status" ]; then + if grep 'State: Z (zombie)' "$i/status" >/dev/null 2>&1; then + count=`expr $count + 1` + if [ "$count" -gt "$COUNT_LIMIT" ]; then + break + fi + fi + fi +done + +echo $count diff --git a/cmd/containerhelper/main.go b/cmd/containerhelper/main.go index 0f3c832..18ef4b3 100644 --- a/cmd/containerhelper/main.go +++ b/cmd/containerhelper/main.go @@ -10,12 +10,17 @@ import ( _ "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/handlers" "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" "os" + "time" ) -// 这是一个辅助的bin, k8s场景下, 该bin会被复制到容器内, 然后 daemonset agent 会 使用 docker exec 调用该 bin 在容器内部执行一些命令(从而避免在daemonset agent 通过 切换namespace去访问) -// 入参 通过 args 和 stdin(内容是一个json) 传输 -// 出参 通过 stdout(内容是一个json) 传输 +// This is a helper binary. +// It will be copied to container. +// It will be called by docker exec API to detect certain information inside the container. func main() { + // Under normal circumstances, the helper program should not be executed for a long time. If the time is exceeded, it will be forced to exit. + time.AfterFunc(3*time.Minute, func() { + os.Exit(97) + }) var resp = &model.Resp{} defer func() { diff --git a/pkg/core/helper.go b/pkg/core/helper.go index 31ea89f..8da986d 100644 --- a/pkg/core/helper.go +++ b/pkg/core/helper.go @@ -10,7 +10,9 @@ import ( ) var HelperToolLocalPath = "/usr/local/holoinsight/agent/bin/helper" +var BusyboxLocalPath = "/usr/local/holoinsight/agent/bin/busybox" var HelperToolPath = "/tmp/holoinsight/helper" +var BusyboxPath = "/tmp/holoinsight/busybox" type ( HelperBaseResp struct { diff --git a/pkg/cri/criutils/zombies.go b/pkg/cri/criutils/zombies.go new file mode 100644 index 0000000..922e267 --- /dev/null +++ b/pkg/cri/criutils/zombies.go @@ -0,0 +1,28 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ + +package criutils + +import ( + "context" + "encoding/json" + "github.com/spf13/cast" + "github.com/traas-stack/holoinsight-agent/cmd/containerhelper/model" + "github.com/traas-stack/holoinsight-agent/pkg/core" + "github.com/traas-stack/holoinsight-agent/pkg/cri" +) + +func CountZombies(i cri.Interface, ctx context.Context, c *cri.Container) (int, error) { + r, err := i.Exec(ctx, c, cri.ExecRequest{ + Cmd: []string{core.HelperToolPath, "countZombies"}, + }) + if err != nil { + return 0, err + } + resp := &model.Resp{} + if err := json.NewDecoder(r.Stdout).Decode(resp); err != nil { + return 0, err + } + return cast.ToIntE(resp.Data) +} diff --git a/pkg/cri/impl/default_cri.go b/pkg/cri/impl/default_cri.go index 35f2e32..c724a9c 100644 --- a/pkg/cri/impl/default_cri.go +++ b/pkg/cri/impl/default_cri.go @@ -5,6 +5,7 @@ package impl import ( + "bytes" "context" "crypto/md5" "encoding/hex" @@ -17,7 +18,6 @@ import ( k8smetaextractor "github.com/traas-stack/holoinsight-agent/pkg/k8s/k8smeta/extractor" "github.com/traas-stack/holoinsight-agent/pkg/k8s/k8sutils" "github.com/traas-stack/holoinsight-agent/pkg/logger" - pb2 "github.com/traas-stack/holoinsight-agent/pkg/server/registry/pb" "github.com/traas-stack/holoinsight-agent/pkg/util" "github.com/traas-stack/holoinsight-agent/pkg/util/throttle" "github.com/txthinking/socks5" @@ -58,6 +58,8 @@ type ( chunkCpCh chan *cri.Container httpProxyServer *http.Server socks5ProxyServer *socks5.Server + localFileMd5 sync.Map + lastForceUpdateTime time.Time } ) @@ -83,13 +85,7 @@ func (e *defaultCri) Engine() cri.ContainerEngine { } func (e *defaultCri) Start() error { - if file, err := os.Open(core.HelperToolLocalPath); err == nil { - defer file.Close() - md5 := md5.New() - if _, err := io.Copy(md5, file); err == nil { - e.helperToolLocalMd5sum = hex.EncodeToString(md5.Sum(nil)) - } - } + e.helperToolLocalMd5sum, _ = calcMd5(core.HelperToolLocalPath) if err := e.defaultMetaStore.Start(); err != nil { return err @@ -556,6 +552,7 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont Runtime: dc.Runtime, NetworkMode: dc.NetworkMode, Hacked: cri.HackInit, + IsAlpine: cri.AlpineStatusUnknown, } if dc.LogPath != "" { @@ -603,13 +600,11 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont criPod.All = append(criPod.All, criContainer) - if criContainer.IsRunning() && criContainer.Hacked == cri.HackInit && criContainer.MainBiz { + if criContainer.IsRunning() && criContainer.Hacked == cri.HackInit && !criContainer.Sandbox { criContainer.Hacked = cri.HackIng var err error - e.setupTimezone(ctx, criContainer) - if criContainer.Hostname == "" { criContainer.Hostname, err = e.getHostname(criContainer) if err != nil { @@ -623,60 +618,98 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont // skip kube-system containers if !strings.HasPrefix(criPod.Namespace, "kube-") { + // check alpine based container + if bs, err := criutils.ReadContainerFileUsingExecCat(ctx, e, criContainer, "/etc/alpine-release"); err == nil && len(bs) > 0 { + criContainer.IsAlpine = cri.AlpineStatusYes + logger.Metaz("find alpine based container", zap.String("cid", criContainer.ShortContainerID())) + } else { + if strings.Contains(err.Error(), "No such file or directory") { + criContainer.IsAlpine = cri.AlpineStatusNo + } + } + + // check pid 1 process + if bs, err := criutils.ReadContainerFileUsingExecCat(ctx, e, criContainer, "/proc/1/cmdline"); err == nil && len(bs) > 0 { + // seperated by \0 + criContainer.Pid1Name = filepath.Base(string(bytes.Split(bs, []byte{0})[0])) + } + alreadyExists := false - if e.checkHelperMd5(ctx, criContainer) { + copyCount := 0 + + if err := e.ensureHelperCopied(ctx, criContainer, core.HelperToolLocalPath, core.HelperToolPath); err == nil { + copyCount++ + } + + if err := e.ensureHelperCopied(ctx, criContainer, core.BusyboxLocalPath, core.BusyboxPath); err == nil { + copyCount++ + } + + criContainer.ZombiesCount, _ = criutils.CountZombies(e, ctx, criContainer) + + if copyCount == 2 { alreadyExists = true criContainer.Hacked = cri.HackOk } - if !alreadyExists { - err = e.copyHelper(ctx, criContainer) - if err == nil { - criContainer.Hacked = cri.HackOk - logger.Metaz("[local] hack success", - zap.String("cid", criContainer.ShortContainerID()), - zap.String("ns", criPod.Namespace), - zap.String("pod", criPod.Name), - zap.Error(err)) - } else { - logger.Metaz("[local] hack error", - zap.String("cid", criContainer.ShortContainerID()), - zap.String("ns", criPod.Namespace), - zap.String("pod", criPod.Name), - zap.Error(err)) - - ioc.RegistryService.ReportEventAsync(&pb2.ReportEventRequest_Event{ - EventTimestamp: time.Now().UnixMilli(), - EventType: "DIGEST", - PayloadType: "init_container_error", - Tags: map[string]string{ - "namespace": criPod.Namespace, - "pod": criPod.Name, - "cid": criContainer.ShortContainerID(), - "agent": e.localAgentMeta.PodName(), - }, - Numbers: nil, - Strings: map[string]string{ - "err": err.Error(), - }, - Logs: nil, - Json: "", - }) - - // It makes sense to retry with a timeout - if context.DeadlineExceeded == err { - time.AfterFunc(3*time.Second, func() { - select { - case e.chunkCpCh <- criContainer: - default: - } - }) - } - } + if alreadyExists { + logger.Metaz("[local] hack success", + zap.String("cid", criContainer.ShortContainerID()), + zap.String("ns", criPod.Namespace), + zap.String("pod", criPod.Name), + zap.Error(err)) } + + //if !alreadyExists { + // err = e.copyHelper(ctx, criContainer) + // if err == nil { + // criContainer.Hacked = cri.HackOk + // logger.Metaz("[local] hack success", + // zap.String("cid", criContainer.ShortContainerID()), + // zap.String("ns", criPod.Namespace), + // zap.String("pod", criPod.Name), + // zap.Error(err)) + // } else { + // logger.Metaz("[local] hack error", + // zap.String("cid", criContainer.ShortContainerID()), + // zap.String("ns", criPod.Namespace), + // zap.String("pod", criPod.Name), + // zap.Error(err)) + // + // ioc.RegistryService.ReportEventAsync(&pb2.ReportEventRequest_Event{ + // EventTimestamp: time.Now().UnixMilli(), + // EventType: "DIGEST", + // PayloadType: "init_container_error", + // Tags: map[string]string{ + // "namespace": criPod.Namespace, + // "pod": criPod.Name, + // "cid": criContainer.ShortContainerID(), + // "agent": e.localAgentMeta.PodName(), + // }, + // Numbers: nil, + // Strings: map[string]string{ + // "err": err.Error(), + // }, + // Logs: nil, + // Json: "", + // }) + // + // // It makes sense to retry with a timeout + // if context.DeadlineExceeded == err { + // time.AfterFunc(3*time.Second, func() { + // select { + // case e.chunkCpCh <- criContainer: + // default: + // } + // }) + // } + // } + //} } else { criContainer.Hacked = cri.HackSkipped } + + e.setupTimezone(ctx, criContainer) } return criContainer @@ -722,6 +755,12 @@ func (e *defaultCri) syncOnce() { begin := time.Now() + forceUpdateContainerInfo := false + if e.lastForceUpdateTime.IsZero() || begin.Sub(e.lastForceUpdateTime) > time.Minute { + forceUpdateContainerInfo = true + e.lastForceUpdateTime = begin + } + containers, err := e.listContainers() if err != nil { return @@ -780,7 +819,7 @@ func (e *defaultCri) syncOnce() { wg.Done() }() - criPod, podExpiredContainers, podChanged, _ := e.buildPod(pod, oldState, newState, newStateLock, containersByPod) + criPod, podExpiredContainers, podChanged, _ := e.buildPod(pod, oldState, newState, newStateLock, containersByPod, forceUpdateContainerInfo) if podChanged { anyChanged = true } @@ -837,7 +876,7 @@ func (e *defaultCri) firePodChange() { } } -func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *internalState, newStateLock *sync.Mutex, containersByPod map[string][]*cri.EngineDetailContainer) (*cri.Pod, int, bool, error) { +func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *internalState, newStateLock *sync.Mutex, containersByPod map[string][]*cri.EngineDetailContainer, forceUpdateContainerInfo bool) (*cri.Pod, int, bool, error) { criPod := &cri.Pod{ Pod: pod, @@ -893,6 +932,12 @@ func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *in if cached != nil && !isContainerChanged(cached.engineContainer, container) { cached.criContainer.Pod = criPod + if forceUpdateContainerInfo { + ctx, cancel := context.WithTimeout(context.Background(), defaultOpTimeout) + cached.criContainer.ZombiesCount, _ = criutils.CountZombies(e, ctx, cached.criContainer) + cancel() + } + newStateLock.Lock() newState.containerMap[container.ID] = &cachedContainer{ criContainer: cached.criContainer, @@ -964,3 +1009,60 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex zap.Strings("env", req.Env)) return e.engine.ExecAsync(ctx, c, req) } + +func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, from string, to string) (ret error) { + begin := time.Now() + hitCache := false + defer func() { + logger.Metaz("copy helper", + zap.String("cid", c.ShortContainerID()), + zap.String("from", from), + zap.String("to", to), + zap.Bool("hitCache", hitCache), + zap.Duration("cost", time.Since(begin)), + zap.Error(ret), + ) + }() + + i, ok := e.localFileMd5.Load(from) + + var fromMd5 string + if !ok { + if calc, err := calcMd5(from); err == nil { + fromMd5 = calc + e.localFileMd5.Store(from, fromMd5) + } else { + e.localFileMd5.Store(from, "") + return err + } + } else { + fromMd5 = i.(string) + } + + if fromMd5 == "" { + return nil + } + + if cmd5, err := criutils.Md5sum(ctx, e, c, core.HelperToolPath); err == nil { + if fromMd5 == cmd5 { + hitCache = true + return nil + } + } + + return e.CopyToContainer(ctx, c, from, to) +} + +func calcMd5(path string) (string, error) { + if file, err := os.Open(path); err == nil { + defer file.Close() + md5 := md5.New() + if _, err := io.Copy(md5, file); err == nil { + return hex.EncodeToString(md5.Sum(nil)), nil + } else { + return "", err + } + } else { + return "", err + } +} diff --git a/pkg/cri/impl/engine/containerd_engine.go b/pkg/cri/impl/engine/containerd_engine.go index 3e681e1..92c17c4 100644 --- a/pkg/cri/impl/engine/containerd_engine.go +++ b/pkg/cri/impl/engine/containerd_engine.go @@ -283,7 +283,7 @@ func (e *ContainerdContainerEngine) Exec(ctx context.Context, c *cri.Container, pspec := spec.Process pspec.Terminal = false - pspec.Args = wrapTimeout(req.Cmd) + pspec.Args = wrapTimeout(c, req.Cmd) if req.WorkingDir != "" { pspec.Cwd = req.WorkingDir } @@ -424,7 +424,7 @@ func (e *ContainerdContainerEngine) ExecAsync(ctx context.Context, c *cri.Contai pspec := spec.Process pspec.Terminal = false - pspec.Args = wrapTimeout(req.Cmd) + pspec.Args = wrapTimeout(c, req.Cmd) if req.WorkingDir != "" { pspec.Cwd = req.WorkingDir } diff --git a/pkg/cri/impl/engine/docker_engine.go b/pkg/cri/impl/engine/docker_engine.go index bab04c6..8a18530 100644 --- a/pkg/cri/impl/engine/docker_engine.go +++ b/pkg/cri/impl/engine/docker_engine.go @@ -133,7 +133,7 @@ func (e *DockerContainerEngine) Exec(ctx context.Context, c *cri.Container, req DetachKeys: "", Env: wrapEnv(req.Env), WorkingDir: req.WorkingDir, - Cmd: wrapTimeout(req.Cmd), + Cmd: wrapTimeout(c, req.Cmd), }) if err != nil { return invalidResult, err @@ -241,7 +241,7 @@ func (e *DockerContainerEngine) ExecAsync(ctx context.Context, c *cri.Container, DetachKeys: "", Env: wrapEnv(req.Env), WorkingDir: req.WorkingDir, - Cmd: wrapTimeout(hackedCmd), + Cmd: wrapTimeout(c, hackedCmd), }) if err != nil { return invalidResult, err diff --git a/pkg/cri/impl/engine/utils.go b/pkg/cri/impl/engine/utils.go index f9ab966..04d8107 100644 --- a/pkg/cri/impl/engine/utils.go +++ b/pkg/cri/impl/engine/utils.go @@ -6,6 +6,7 @@ package engine import ( "github.com/spf13/cast" + "github.com/traas-stack/holoinsight-agent/pkg/cri" "os" ) @@ -21,9 +22,12 @@ func init() { } // wrapTimeout wraps cmd with timeout -s KILL to prevent the process from hanging and not exiting for any reason. -func wrapTimeout(cmd []string) []string { +func wrapTimeout(c *cri.Container, cmd []string) []string { + // TODO Different busybox versions have different timeout command formats + // TODO In alpined based container, timeout will generate zombie processes // timeout -s KILL cmd... - return append([]string{"timeout", "-s", "KILL", timeout}, cmd...) + // return append([]string{"timeout", "-s", "KILL", timeout}, cmd...) + return cmd } // wrapEnv wraps envs with _FROM=holoinsight-agent. This env is used to mark the source of the call. diff --git a/pkg/cri/model.go b/pkg/cri/model.go index f9fdd86..de8ea43 100644 --- a/pkg/cri/model.go +++ b/pkg/cri/model.go @@ -27,6 +27,14 @@ const ( HackRetryError ) +const ( + AlpineStatusUnknown AlpineStatus = iota + // AlpineStatusYes indicates that the container is an alpine-based container + AlpineStatusYes + // AlpineStatusNo indicates that the container is not an alpine-based container + AlpineStatusNo +) + // TODO 我们推出一个规范 让用户按我们规范做 就认为它是主容器 var ( ErrMultiBiz = errors.New("multi biz containers") @@ -106,6 +114,16 @@ type ( // Attributes can be used to prevent arbitrary extension fields Attributes sync.Map + + // name of pid 1 + // tini systemd java python + Pid1Name string + + // Is this container based on alpine? + IsAlpine AlpineStatus + + // The number of zombie processes inside the container + ZombiesCount int } ContainerState struct { Pid int @@ -146,6 +164,7 @@ type ( // EnvTz is timezone name read from Env 'TZ' EnvTz string } + AlpineStatus uint8 ) // 如果有且只有一个 main biz 就直接返回 否则返回 nil 让用户自己检查去 @@ -226,3 +245,16 @@ func (r *ExecResult) SampleOutputLength(length int) (stdout string, stderr strin } return } + +func (a AlpineStatus) MarshalText() ([]byte, error) { + switch a { + case AlpineStatusYes: + return []byte("yes"), nil + case AlpineStatusNo: + return []byte("no"), nil + case AlpineStatusUnknown: + fallthrough + default: + return []byte("unknown"), nil + } +}