Skip to content

Commit

Permalink
fix: fix docker exec zombies (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Mar 28, 2024
1 parent 81797dd commit a3edd7e
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 51 deletions.
12 changes: 3 additions & 9 deletions cmd/containerhelper/handlers/countZombies.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,12 @@ func countZombiesHandler(_ string, resp *model.Resp) error {
if _, err := strconv.ParseInt(info.Name(), 10, 32); err != nil {
return nil
}
content, err := os.ReadFile(filepath.Join(path, "/status"))
content, err := os.ReadFile(filepath.Join(path, "/stat"))
if err != nil {
return nil
}
lines := strings.Split(string(content), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "State:") {
if strings.Contains(line, "zombie") {
count++
}
break
}
if strings.Contains(string(content), ") Z 1 ") {
count++
}
if count > countLimit {
return filepath.SkipAll
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type (
Hostname string
Runtime string
NetworkMode string
PidMode string
MergedDir string
LogPath string
Mounts []*MountPoint
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/criutils/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func CopyToContainerByChunk(ctx context.Context, i cri.Interface, c *cri.Contain
})

if err != nil {
logger.Criz("[digest] copy chunk error", zap.String("cid", c.ShortContainerID()), zap.Int("accSize", accSize), zap.Error(err))
logger.Criz("[digest] copy chunk error", zap.String("cid", c.ShortID()), zap.Int("accSize", accSize), zap.Error(err))
return err
}

Expand Down
96 changes: 70 additions & 26 deletions pkg/cri/impl/default_cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func (e *defaultCri) chunkCpLoop() {
cost := time.Since(begin)

if err == nil {
logger.Metaz("[local] retry hack success", zap.String("cid", c.ShortContainerID()), zap.Duration("cost", cost))
logger.Metaz("[local] retry hack success", zap.String("cid", c.ShortID()), zap.Duration("cost", cost))
c.Hacked = cri.HackOk
} else {
logger.Metaz("[local] retry hack error", zap.String("cid", c.ShortContainerID()), zap.Duration("cost", cost), zap.Error(err))
logger.Metaz("[local] retry hack error", zap.String("cid", c.ShortID()), zap.Duration("cost", cost), zap.Error(err))
c.Hacked = cri.HackRetryError
}
}()
Expand Down Expand Up @@ -229,7 +229,7 @@ func (e *defaultCri) CopyToContainer(ctx context.Context, c *cri.Container, srcP
cost := time.Now().Sub(begin)
logger.Criz("[digest] copy to container",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.String("method", method),
zap.String("src", srcPath),
Expand All @@ -250,7 +250,7 @@ func (e *defaultCri) CopyFromContainer(ctx context.Context, c *cri.Container, sr
defer func() {
logger.Criz("[digest] copy from container",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.String("method", method),
zap.String("src", srcPath),
Expand Down Expand Up @@ -281,7 +281,7 @@ func (e *defaultCri) Exec(ctx context.Context, c *cri.Container, req cri.ExecReq

logger.Criz("[digest] exec",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.Strings("cmd", req.Cmd),
zap.Int("code", r.ExitCode),
Expand Down Expand Up @@ -512,13 +512,13 @@ func (e *defaultCri) checkHelperMd5(ctx context.Context, c *cri.Container) bool
}
if md5, err := criutils.Md5sum(ctx, e, c, core.HelperToolPath); err == nil {
logger.Metaz("[local] helper exists",
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("md5", md5),
zap.String("local-md5", e.helperToolLocalMd5sum),
)
if md5 == e.helperToolLocalMd5sum {
logger.Metaz("[local] already hack",
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("ns", c.Pod.Namespace),
zap.String("pod", c.Pod.Name))
return true
Expand Down Expand Up @@ -551,8 +551,9 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont
SandboxID: dc.SandboxId,
Runtime: dc.Runtime,
NetworkMode: dc.NetworkMode,
PidMode: dc.PidMode,
Hacked: cri.HackInit,
IsAlpine: cri.AlpineStatusUnknown,
ZombieCount: -1,
}

if dc.LogPath != "" {
Expand Down Expand Up @@ -611,27 +612,24 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont
logger.Metaz("[local] fail to get hostname",
zap.String("ns", criPod.Namespace), //
zap.String("pod", criPod.Name), //
zap.String("cid", criContainer.ShortContainerID()),
zap.String("cid", criContainer.ShortID()),
zap.Error(err))
}
}

// skip kube-system containers
if !strings.HasPrefix(criPod.Namespace, "kube-") {

// check alpine based container
if bs, err := criutils.ReadContainerFileUsingExecCat(ctx, e, criContainer, "/etc/alpine-release"); err == nil && len(bs) > 0 {
criContainer.IsAlpine = cri.AlpineStatusYes
logger.Metaz("find alpine based container", zap.String("cid", criContainer.ShortContainerID()))
} else {
if strings.Contains(err.Error(), "No such file or directory") {
criContainer.IsAlpine = cri.AlpineStatusNo
}
logger.Metaz("find alpine based container", zap.String("cid", criContainer.ShortID()))
}

// check pid 1 process
if bs, err := criutils.ReadContainerFileUsingExecCat(ctx, e, criContainer, "/proc/1/cmdline"); err == nil && len(bs) > 0 {
// seperated by \0
criContainer.Pid1Name = filepath.Base(string(bytes.Split(bs, []byte{0})[0]))
logger.Metaz("find pid1Name", zap.String("cid", criContainer.ShortID()), zap.String("pid1Name", criContainer.Pid1Name))
}

alreadyExists := false
Expand All @@ -645,7 +643,7 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont
copyCount++
}

criContainer.ZombiesCount, _ = criutils.CountZombies(e, ctx, criContainer)
go e.updateZombieCheck(criContainer)

if copyCount == 2 {
alreadyExists = true
Expand All @@ -654,7 +652,7 @@ func (e *defaultCri) buildCriContainer(criPod *cri.Pod, dc *cri.EngineDetailCont

if alreadyExists {
logger.Metaz("[local] hack success",
zap.String("cid", criContainer.ShortContainerID()),
zap.String("cid", criContainer.ShortID()),
zap.String("ns", criPod.Namespace),
zap.String("pod", criPod.Name),
zap.Error(err))
Expand Down Expand Up @@ -933,9 +931,7 @@ func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *in
cached.criContainer.Pod = criPod

if forceUpdateContainerInfo {
ctx, cancel := context.WithTimeout(context.Background(), defaultOpTimeout)
cached.criContainer.ZombiesCount, _ = criutils.CountZombies(e, ctx, cached.criContainer)
cancel()
go e.updateZombieCheck(cached.criContainer)
}

newStateLock.Lock()
Expand All @@ -947,7 +943,7 @@ func (e *defaultCri) buildPod(pod *v1.Pod, oldState *internalState, newState *in

} else {
if cached != nil {
logger.Metaz("container changed", zap.String("cid", cached.criContainer.ShortContainerID()))
logger.Metaz("container changed", zap.String("cid", cached.criContainer.ShortID()))
}
changed = true
criContainer := e.buildCriContainer(criPod, container)
Expand Down Expand Up @@ -1003,7 +999,7 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex
}
logger.Criz("[digest] exec async",
zap.String("engine", e.engine.Type()),
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("runtime", c.Runtime),
zap.Strings("cmd", req.Cmd),
zap.Strings("env", req.Env))
Expand All @@ -1013,11 +1009,15 @@ func (e *defaultCri) ExecAsync(ctx context.Context, c *cri.Container, req cri.Ex
func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, from string, to string) (ret error) {
begin := time.Now()
hitCache := false
var fromMd5 string
var toMd5 string
defer func() {
logger.Metaz("copy helper",
zap.String("cid", c.ShortContainerID()),
zap.String("cid", c.ShortID()),
zap.String("from", from),
zap.String("to", to),
//zap.String("fromMd5", fromMd5),
//zap.String("toMd5", toMd5),
zap.Bool("hitCache", hitCache),
zap.Duration("cost", time.Since(begin)),
zap.Error(ret),
Expand All @@ -1026,7 +1026,6 @@ func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, f

i, ok := e.localFileMd5.Load(from)

var fromMd5 string
if !ok {
if calc, err := calcMd5(from); err == nil {
fromMd5 = calc
Expand All @@ -1043,8 +1042,9 @@ func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, f
return nil
}

if cmd5, err := criutils.Md5sum(ctx, e, c, core.HelperToolPath); err == nil {
if fromMd5 == cmd5 {
var err error
if toMd5, err = criutils.Md5sum(ctx, e, c, to); err == nil {
if fromMd5 == toMd5 {
hitCache = true
return nil
}
Expand All @@ -1053,6 +1053,50 @@ func (e *defaultCri) ensureHelperCopied(ctx context.Context, c *cri.Container, f
return e.CopyToContainer(ctx, c, from, to)
}

// This method will be executed asynchronously, and there will be a little data inconsistency problem in a short period of time, but it is not critical.
func (e *defaultCri) updateZombieCheck(c *cri.Container) {
// well-known pid 1 processes
if c.PidMode == "host" || c.Pid1Name == "systemd" || c.Pid1Name == "init" || c.Pid1Name == "tini" {
c.Pid1CanRecycleZombieProcesses = true
c.ZombieCount = 0
return
}

ctx, cancel := context.WithTimeout(context.Background(), defaultOpTimeout)
defer cancel()

if count, err := criutils.CountZombies(e, ctx, c); err == nil {
c.ZombieCount = count
if c.ZombieCount > 0 {
return
}
} else {
logger.Errorz("count zombies error", zap.String("cid", c.ShortID()), zap.Error(err))
return
}

// already checked
if c.Pid1CanRecycleZombieProcesses {
return
}

if _, err := e.Exec(ctx, c, cri.ExecRequest{Cmd: []string{core.BusyboxPath, "timeout", "1", "true"}}); err != nil {
logger.Errorz("check timeout error", zap.String("cid", c.ShortID()), zap.Error(err))
return
}

// We have to wait long enough for zombie processes to appear
// This func will be executed by a separate goroutine, so sleep does not matter.
time.Sleep(2 * time.Second)

zombieCount0 := c.ZombieCount
if count, err := criutils.CountZombies(e, ctx, c); err == nil {
c.ZombieCount = count
}

c.Pid1CanRecycleZombieProcesses = c.ZombieCount == zombieCount0
}

func calcMd5(path string) (string, error) {
if file, err := os.Open(path); err == nil {
defer file.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/default_cri_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (e *defaultCri) registerHttpHandlers() {
continue
}
if container.Hacked == 1 || container.Hacked == 5 {
ret = append(ret, container.ShortContainerID())
ret = append(ret, container.ShortID())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/impl/engine/containerd_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (e *ContainerdContainerEngine) GetContainerDetail(ctx context.Context, cid

if detail.IsSandbox {
detail.NetworkMode = "netns:" + sandboxMeta.NetNSPath
// TODO detail.PidMode
}

// I don't know how to get containerd's state dir.
Expand Down
1 change: 1 addition & 0 deletions pkg/cri/impl/engine/docker_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (e *DockerContainerEngine) GetContainerDetail(ctx context.Context, cid stri
Hostname: i.Config.Hostname,
Runtime: i.HostConfig.Runtime,
NetworkMode: string(i.HostConfig.NetworkMode),
PidMode: string(i.HostConfig.PidMode),
MergedDir: "",
Mounts: nil,
State: cri.ContainerState{
Expand Down
12 changes: 9 additions & 3 deletions pkg/cri/impl/engine/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package engine

import (
"github.com/spf13/cast"
"github.com/traas-stack/holoinsight-agent/pkg/core"
"github.com/traas-stack/holoinsight-agent/pkg/cri"
"os"
"time"
)

var (
Expand All @@ -23,14 +25,18 @@ func init() {

// wrapTimeout wraps cmd with timeout -s KILL <seconds> to prevent the process from hanging and not exiting for any reason.
func wrapTimeout(c *cri.Container, cmd []string) []string {
// TODO Different busybox versions have different timeout command formats
// TODO In alpined based container, timeout will generate zombie processes
// Note:
// Different busybox versions have different timeout command formats
// In alpined based container, timeout will generate zombie processes
// timeout -s KILL <seconds> cmd...
// return append([]string{"timeout", "-s", "KILL", timeout}, cmd...)
if c.Pid1CanRecycleZombieProcesses {
return append([]string{core.BusyboxPath, "timeout", "-s", "KILL", timeout}, cmd...)
}
return cmd
}

// wrapEnv wraps envs with _FROM=holoinsight-agent. This env is used to mark the source of the call.
func wrapEnv(envs []string) []string {
return append(envs, "_FROM=holoinsight-agent")
return append(envs, "_FROM=holoinsight-agent", "_TS="+cast.ToString(time.Now().UnixMilli()))
}
2 changes: 1 addition & 1 deletion pkg/cri/impl/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newInternalState() *internalState {

func (s *internalState) build() {
for _, c := range s.containerMap {
s.shortCidContainerMap[c.criContainer.ShortContainerID()] = c
s.shortCidContainerMap[c.criContainer.ShortID()] = c
}
for _, pod := range s.pods {
if pod.IsRunning() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/port_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (t *PortForwardTask) Start(ctx context.Context) (string, error) {
stopCh <- struct{}{}
}
}()
logCtx := zap.Fields(zap.String("uuid", uuid.New().String()), zap.String("cid", biz.ShortContainerID()), zap.String("listenAddr", listener.Addr().String()), zap.String("toAddr", t.Addr))
logCtx := zap.Fields(zap.String("uuid", uuid.New().String()), zap.String("cid", biz.ShortID()), zap.String("listenAddr", listener.Addr().String()), zap.String("toAddr", t.Addr))
logger.Infozo(logCtx, "[netproxy] create port forward")

go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/impl/netproxy/proxy_socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *CriHandle) tcpHandle(s *socks5.Server, c *net.TCPConn, r *socks5.Reques

uuid2 := uuid.New().String()

logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortContainerID()), zap.String("addr", addr))
logCtx := zap.Fields(zap.String("uuid", uuid2), zap.String("protocol", "socks5"), zap.String("cid", biz.ShortID()), zap.String("addr", addr))

proxied, err := TcpProxy(logger.WithLogCtx(context.Background(), logCtx), h.Cri, biz, addr, DefaultDialTimeout)
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions pkg/cri/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,23 @@ type (
// NetworkMode
NetworkMode string

// If PidMode is "host", it means that the container uses the pid namespace of the physical machine.
PidMode string

// docker json log: https://docs.docker.com/config/containers/logging/json-file/
LogPath string

// Attributes can be used to prevent arbitrary extension fields
Attributes sync.Map

// name of pid 1
// tini systemd java python
Pid1Name string
// The number of zombie processes inside the container
ZombieCount int

// Is this container based on alpine?
IsAlpine AlpineStatus
// pid 1 process name
Pid1Name string

// The number of zombie processes inside the container
ZombiesCount int
// Whether pid 1 has the ability to recycle zombie processes
Pid1CanRecycleZombieProcesses bool
}
ContainerState struct {
Pid int
Expand Down Expand Up @@ -211,7 +213,7 @@ func (c *Container) IsRunning() bool {
return c.State.Pid > 0 && c.State.Status == "running"
}

func (c *Container) ShortContainerID() string {
func (c *Container) ShortID() string {
return ShortContainerId(c.Id)
}

Expand Down

0 comments on commit a3edd7e

Please sign in to comment.