Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docker: use official client instead of fsouza/go-dockerclient #23966

Merged
merged 29 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
09855dd
docker: use official docker sdk instead of fsouza/go-dockerclient
pkazmierczak Sep 24, 2024
abe8716
docker: remove fsouza/go-dockerclient from unit tests
pkazmierczak Sep 24, 2024
4a6c39b
docker: go.mod and go.sum updates
pkazmierczak Sep 24, 2024
78e57b5
Update drivers/docker/utils.go
pkazmierczak Sep 25, 2024
6ffc2a8
Update drivers/docker/driver.go
pkazmierczak Sep 25, 2024
32495eb
Update drivers/docker/driver_unix_test.go
pkazmierczak Sep 25, 2024
1b39837
Update drivers/docker/driver_unix_test.go
pkazmierczak Sep 25, 2024
55f9670
remove unnecessary ContainerInspect call
pkazmierczak Sep 25, 2024
d54747a
context correction for findPauseContainer and recoverPauseContainers
pkazmierczak Sep 25, 2024
2d44b72
catch errors when decoding stats
pkazmierczak Sep 25, 2024
ee27028
stats revamp
pkazmierczak Sep 25, 2024
1bbe9cc
fix TestDockerDriver_PidsLimit
pkazmierczak Sep 25, 2024
351b18f
Apply suggestions from code review
pkazmierczak Sep 25, 2024
854ec69
addressed some of the review comments
pkazmierczak Sep 25, 2024
15ef385
stats improvement
pkazmierczak Sep 25, 2024
e13d840
missing error handling
pkazmierczak Sep 25, 2024
b70bbac
stats unit test
pkazmierczak Sep 25, 2024
5f284d8
TestDockerDriver_Stats fixes
pkazmierczak Sep 25, 2024
c75dfb2
Update drivers/docker/stats.go
pkazmierczak Sep 25, 2024
c1e02bc
review suggestiong
pkazmierczak Sep 26, 2024
5dc4caa
refactor TestDockerDriver_Stats
pkazmierczak Sep 26, 2024
b180c59
fix ctx.Err() handling in recoverPauseContainers
pkazmierczak Sep 26, 2024
62a4b9e
cl
pkazmierczak Sep 26, 2024
c47f248
fix conditional in driver.ExecTaskStreaming
pkazmierczak Sep 26, 2024
10ec432
i love races
pkazmierczak Sep 26, 2024
e0fc1a3
stats refactor
pkazmierczak Sep 26, 2024
a778f85
wip: HI
shoenig Sep 26, 2024
4d6acac
execTaskStreaming refactor
pkazmierczak Sep 26, 2024
e61408c
TestDockerDriver_PidsLimit
pkazmierczak Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/23966.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
docker: Use official docker SDK instead of a 3rd party client
```
15 changes: 5 additions & 10 deletions client/testutil/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"runtime"
"testing"

docker "github.com/fsouza/go-dockerclient"
docker "github.com/docker/docker/client"
"github.com/hashicorp/nomad/testutil"
)

Expand All @@ -23,20 +23,15 @@ func DockerIsConnected(t *testing.T) bool {
return runtime.GOOS == "windows"
}

client, err := docker.NewClientFromEnv()
client, err := docker.NewClientWithOpts(docker.FromEnv, docker.WithAPIVersionNegotiation())
if err != nil {
return false
}

// Creating a client doesn't actually connect, so make sure we do something
// like call Version() on it.
env, err := client.Version()
if err != nil {
t.Logf("Failed to connect to docker daemon: %s", err)
return false
}

t.Logf("Successfully connected to docker daemon running version %s", env.Get("Version"))
// like call ClientVersion() on it.
ver := client.ClientVersion()
t.Logf("Successfully connected to docker daemon running version %s", ver)
return true
}

Expand Down
28 changes: 15 additions & 13 deletions drivers/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ package docker
import (
"context"
"fmt"
"io/fs"
"runtime"
"strconv"
"strings"
"time"

docker "github.com/fsouza/go-dockerclient"
containerapi "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/drivers/shared/capabilities"
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
Expand All @@ -29,7 +31,7 @@ const (

// ContainerNotRunningError is returned by the docker daemon if the container
// is not running, yet we requested it to stop
ContainerNotRunningError = "Container not running"
ContainerNotRunningError = "is not running" // exact string is "Container %s is not running"

// pluginName is the name of the plugin
pluginName = "docker"
Expand Down Expand Up @@ -522,8 +524,8 @@ type DockerDevice struct {
CgroupPermissions string `codec:"cgroup_permissions"`
}

func (d DockerDevice) toDockerDevice() (docker.Device, error) {
dd := docker.Device{
func (d DockerDevice) toDockerDevice() (containerapi.DeviceMapping, error) {
dd := containerapi.DeviceMapping{
PathOnHost: d.HostPath,
PathInContainer: d.ContainerPath,
CgroupPermissions: d.CgroupPermissions,
Expand Down Expand Up @@ -573,41 +575,41 @@ type DockerMount struct {
TmpfsOptions DockerTmpfsOptions `codec:"tmpfs_options"`
}

func (m DockerMount) toDockerHostMount() (docker.HostMount, error) {
func (m DockerMount) toDockerHostMount() (mount.Mount, error) {
if m.Type == "" {
// for backward compatibility, as type is optional
m.Type = "volume"
}

hm := docker.HostMount{
hm := mount.Mount{
Target: m.Target,
Source: m.Source,
Type: m.Type,
Type: mount.Type(m.Type),
ReadOnly: m.ReadOnly,
}

switch m.Type {
case "volume":
vo := m.VolumeOptions
hm.VolumeOptions = &docker.VolumeOptions{
hm.VolumeOptions = &mount.VolumeOptions{
NoCopy: vo.NoCopy,
Labels: vo.Labels,
DriverConfig: docker.VolumeDriverConfig{
DriverConfig: &mount.Driver{
Name: vo.DriverConfig.Name,
Options: vo.DriverConfig.Options,
},
}
case "bind":
hm.BindOptions = &docker.BindOptions{
Propagation: m.BindOptions.Propagation,
hm.BindOptions = &mount.BindOptions{
Propagation: mount.Propagation(m.BindOptions.Propagation),
}
case "tmpfs":
if m.Source != "" {
return hm, fmt.Errorf(`invalid source, must be "" for tmpfs`)
}
hm.TempfsOptions = &docker.TempfsOptions{
hm.TmpfsOptions = &mount.TmpfsOptions{
SizeBytes: m.TmpfsOptions.SizeBytes,
Mode: m.TmpfsOptions.Mode,
Mode: fs.FileMode(m.TmpfsOptions.Mode),
}
default:
return hm, fmt.Errorf(`invalid mount type, must be "bind", "volume", "tmpfs": %q`, m.Type)
Expand Down
61 changes: 34 additions & 27 deletions drivers/docker/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ package docker

import (
"context"
"errors"
"fmt"
"io"
"regexp"
"strings"
"sync"
"time"

docker "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/image"
"github.com/docker/docker/api/types/registry"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -62,9 +67,9 @@ func (p *pullFuture) set(imageID, imageUser string, err error) {
// DockerImageClient provides the methods required to do CRUD operations on the
// Docker images
type DockerImageClient interface {
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
InspectImage(id string) (*docker.Image, error)
RemoveImageExtended(id string, opts docker.RemoveImageOptions) error
ImagePull(ctx context.Context, refStr string, opts image.PullOptions) (io.ReadCloser, error)
ImageInspectWithRaw(ctx context.Context, id string) (types.ImageInspect, []byte, error)
ImageRemove(ctx context.Context, id string, opts image.RemoveOptions) ([]image.DeleteResponse, error)
}

// LogEventFn is a callback which allows Drivers to emit task events.
Expand Down Expand Up @@ -136,7 +141,7 @@ func newDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {

// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string,
func (d *dockerCoordinator) PullImage(image string, authOptions *registry.AuthConfig, callerID string,
emitFn LogEventFn, pullTimeout, pullActivityTimeout time.Duration) (imageID, imageUser string, err error) {
// Get the future
d.imageLock.Lock()
Expand Down Expand Up @@ -171,53 +176,55 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf

// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration,
func (d *dockerCoordinator) pullImageImpl(imageID string, authOptions *registry.AuthConfig,
pullTimeout, pullActivityTimeout time.Duration, future *pullFuture) {

defer d.clearPullLogger(image)
defer d.clearPullLogger(imageID)
// Parse the repo and tag
repo, tag := parseDockerImage(image)
repo, tag := parseDockerImage(imageID)
ctx, cancel := context.WithTimeout(context.Background(), pullTimeout)
defer cancel()

pm := newImageProgressManager(image, cancel, pullActivityTimeout, d.handlePullInactivity,
pm := newImageProgressManager(imageID, cancel, pullActivityTimeout, d.handlePullInactivity,
d.handlePullProgressReport, d.handleSlowPullProgressReport)
defer pm.stop()

pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
OutputStream: pm,
RawJSONStream: true,
Context: ctx,
}

// Attempt to pull the image
var auth docker.AuthConfiguration
var auth registry.AuthConfig
if authOptions != nil {
auth = *authOptions
}

err := d.client.PullImage(pullOptions, auth)
pullOptions := image.PullOptions{RegistryAuth: auth.Auth}
reader, err := d.client.ImagePull(d.ctx, dockerImageRef(repo, tag), pullOptions)

if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
d.logger.Error("timeout pulling container", "image_ref", dockerImageRef(repo, tag))
future.set("", "", recoverablePullError(ctxErr, image))
future.set("", "", recoverablePullError(ctx.Err(), imageID))
return
}

if err != nil {
d.logger.Error("failed pulling container", "image_ref", dockerImageRef(repo, tag),
"error", err)
future.set("", "", recoverablePullError(err, image))
future.set("", "", recoverablePullError(err, imageID))
return
}

if reader != nil {
defer reader.Close()
_, err = io.Copy(pm, reader)
if err != nil && !errors.Is(err, io.EOF) {
d.logger.Error("error reading image pull progress", "error", err)
return
}
}

d.logger.Debug("docker pull succeeded", "image_ref", dockerImageRef(repo, tag))

dockerImage, err := d.client.InspectImage(image)
dockerImage, _, err := d.client.ImageInspectWithRaw(d.ctx, imageID)
if err != nil {
d.logger.Error("failed getting image id", "image_name", image, "error", err)
d.logger.Error("failed getting image id", "image_name", imageID, "error", err)
future.set("", "", recoverableErrTimeouts(err))
return
}
Expand Down Expand Up @@ -330,18 +337,18 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
d.imageLock.Unlock()

for i := 0; i < 3; i++ {
err := d.client.RemoveImageExtended(id, docker.RemoveImageOptions{
_, err := d.client.ImageRemove(d.ctx, id, image.RemoveOptions{
Force: true, // necessary to GC images referenced by multiple tags
})
if err == nil {
break
}

if err == docker.ErrNoSuchImage {
if strings.Contains(err.Error(), "No such image") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the underlying error types not public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are... https://pkg.go.dev/github.com/docker/docker@v27.1.1+incompatible/errdefs
but the tricky part is that you can never be quite sure which error type the API returns, as this isn't documented (at least not in the Go SDK). So unless you sit down for a long session with a debugger, there's no way of knowing.

I'll try to update as many checks of this sort as I can, because of course I agree it's sloppy to rely on string comparisons, but this PR has already been a lot of work, and I may end up creating a follow-up ticket to make these improvements. Then again I'm in the trenches already, might as well do it.

d.logger.Debug("unable to cleanup image, does not exist", "image_id", id)
return
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
if derr, ok := err.(*types.ErrorResponse); ok && strings.Contains(derr.Error(), "Conflict") {
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
d.logger.Debug("unable to cleanup image, still in use", "image_id", id)
return
}
Expand Down
20 changes: 11 additions & 9 deletions drivers/docker/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package docker
import (
"context"
"fmt"
"io"
"sync"
"testing"
"time"

docker "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/image"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
Expand All @@ -35,27 +37,27 @@ func newMockImageClient(idToName map[string]string, pullDelay time.Duration) *mo
}
}

func (m *mockImageClient) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error {
func (m *mockImageClient) ImagePull(ctx context.Context, refStr string, opts image.PullOptions) (io.ReadCloser, error) {
time.Sleep(m.pullDelay)
m.lock.Lock()
defer m.lock.Unlock()
m.pulled[opts.Repository]++
return nil
m.pulled[refStr]++
return nil, nil
}

func (m *mockImageClient) InspectImage(id string) (*docker.Image, error) {
func (m *mockImageClient) ImageInspectWithRaw(ctx context.Context, id string) (types.ImageInspect, []byte, error) {
m.lock.Lock()
defer m.lock.Unlock()
return &docker.Image{
return types.ImageInspect{
ID: m.idToName[id],
}, nil
}, []byte{}, nil
}

func (m *mockImageClient) RemoveImageExtended(id string, options docker.RemoveImageOptions) error {
func (m *mockImageClient) ImageRemove(ctx context.Context, id string, opts image.RemoveOptions) ([]image.DeleteResponse, error) {
m.lock.Lock()
defer m.lock.Unlock()
m.removed[id]++
return nil
return []image.DeleteResponse{}, nil
}

func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
Expand Down
Loading