Skip to content

Commit

Permalink
docker: use streaming stats collection to correct CPU stats (#24229)
Browse files Browse the repository at this point in the history
In #23966 we switched to the official Docker SDK for the `docker` driver. In the
process we refactored code around stats collection to use the "one shot" version
of stats. Unfortunately this "one shot" stats collection does not include the
`PreCPU` stats, which are the stats from the previous read. This breaks the
calculation we use to determine CPU ticks, because now we're subtracting 0 from
the current value to get the delta.

Switch back to using the streaming stats collection. Add a test that fully
exercises the `TaskStats` API.

Fixes: #24224
Ref: https://hashicorp.atlassian.net/browse/NET-11348
  • Loading branch information
tgross authored Oct 17, 2024
1 parent a22e563 commit d12128c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .changelog/24229.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
docker: Fixed a bug where task CPU stats were reported incorrectly
```
64 changes: 64 additions & 0 deletions drivers/docker/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/go-connections/nat"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/numalib"
"github.com/hashicorp/nomad/client/taskenv"
Expand Down Expand Up @@ -3228,3 +3229,66 @@ func TestDockerDriver_GroupAdd(t *testing.T) {

must.Eq(t, cfg.GroupAdd, container.HostConfig.GroupAdd)
}

// TestDockerDriver_CollectStats verifies that the TaskStats API collects stats
// periodically and that these values are non-zero as expected
func TestDockerDriver_CollectStats(t *testing.T) {
ci.Parallel(t)
testutil.RequireLinux(t) // stats outputs are different on Windows
testutil.DockerCompatible(t)

// we want to generate at least some CPU usage
args := []string{"/bin/sh", "-c", "cat /dev/urandom | base64 > /dev/null"}
taskCfg := newTaskConfig("", args)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "nc-demo",
AllocID: uuid.Generate(),
Resources: basicResources,
}
must.NoError(t, task.EncodeConcreteDriverConfig(&taskCfg))

d := dockerDriverHarness(t, nil)
plugin, ok := d.Impl().(*Driver)
must.True(t, ok)
plugin.compute.TotalCompute = 1000
plugin.compute.NumCores = 1

cleanup := d.MkAllocDir(task, true)
defer cleanup()
copyImage(t, task.TaskDir(), "busybox.tar")

_, _, err := d.StartTask(task)
must.NoError(t, err)

defer d.DestroyTask(task.ID, true)

// this test has to run for a while because the minimum stats interval we
// can get from Docker is 1s
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
recv, err := d.TaskStats(ctx, task.ID, time.Second)
must.NoError(t, err)

statsReceived := 0
tickValues := set.From([]float64{})

DONE:
for {
select {
case stats := <-recv:
statsReceived++
ticks := stats.ResourceUsage.CpuStats.TotalTicks
must.Greater(t, 0, ticks)
tickValues.Insert(ticks)
if statsReceived >= 3 {
cancel() // 3 is plenty
}
case <-ctx.Done():
break DONE
}
}

// CPU stats should be changed with every interval
must.Len(t, statsReceived, tickValues.Slice())
}
26 changes: 16 additions & 10 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (u *usageSender) send(tru *cstructs.TaskResourceUsage) {
func (u *usageSender) close() {
u.mu.Lock()
defer u.mu.Unlock()

if u.closed {
// already closed
return
Expand Down Expand Up @@ -97,22 +96,29 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
timer, cancel := helper.NewSafeTimer(interval)
defer cancel()

// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}
defer statsReader.Body.Close()

collectOnce := func() {
defer timer.Reset(interval)
statsReader, err := h.dockerClient.ContainerStatsOneShot(ctx, h.containerID)
var stats *containerapi.Stats
err := json.NewDecoder(statsReader.Body).Decode(&stats)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
h.logger.Debug("error decoding stats data from container", "error", err)
return
}
defer statsReader.Body.Close()

var stats containerapi.Stats
if err := json.NewDecoder(statsReader.Body).Decode(&stats); err != nil {
h.logger.Error("error decoding stats data for container", "error", err)
if stats == nil {
h.logger.Debug("error decoding stats data: stats were nil")
return
}

resourceUsage := util.DockerStatsToTaskResourceUsage(&stats, compute)
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}

Expand Down

0 comments on commit d12128c

Please sign in to comment.