diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index fae1d97a..fde5bf2b 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error { return nil } -func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) { +func (cvt *Converter) Convert(ctx context.Context, source, target, cache string) (*Metric, error) { var metric Metric sourceNamed, err := docker.ParseDockerRef(source) if err != nil { @@ -119,8 +119,9 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr return nil, errors.Wrap(err, "get source image size") } logger.Infof("pulled image %s, elapse %s", source, metric.SourcePullElapsed) - + logger.Infof("converting image %s", source) + ctx = context.WithValue(ctx, "remote cache", cache) start = time.Now() desc, err := cvt.driver.Convert(ctx, cvt.provider, source) if err != nil { diff --git a/pkg/driver/nydus/nydus.go b/pkg/driver/nydus/nydus.go index 2c621c0c..65b8ad16 100644 --- a/pkg/driver/nydus/nydus.go +++ b/pkg/driver/nydus/nydus.go @@ -17,9 +17,13 @@ package nydus import ( "bytes" "context" + "encoding/json" "fmt" + "io" "os" "os/exec" + + "path/filepath" "regexp" "strconv" "strings" @@ -30,11 +34,14 @@ import ( "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" "github.com/goharbor/acceleration-service/pkg/errdefs" + "github.com/goharbor/acceleration-service/pkg/remote" "github.com/goharbor/acceleration-service/pkg/utils" - + "github.com/containerd/containerd" "github.com/containerd/containerd/content" + containerdErrDefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/images/converter" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/platforms" "github.com/containerd/nydus-snapshotter/pkg/backend" nydusify "github.com/containerd/nydus-snapshotter/pkg/converter" @@ -224,17 +231,45 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if err != nil { return nil, errors.Wrap(err, "get source image") } - desc, err := d.convert(ctx, provider, *image, sourceRef) + + cacheRef := ctx.Value("remote cache").(string) + useRemoteCache := cacheRef != "" + if useRemoteCache { + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + if errors.Is(err, errdefs.ErrNotSupport) { + logrus.Warn("Content store does not support remote cache") + } else { + return nil, errors.Wrap(err, "fetch remote cache") + } + } + } else { + logrus.Warn("Content store does not use remote cache") + } + + desc, err := d.convert(ctx, provider, *image, sourceRef, useRemoteCache) if err != nil { return nil, err } if d.mergeManifest { return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc) } + + if useRemoteCache { + // Before update cache, we need to pull cache manifest from remote + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "fetch remote cache") + } + if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil { + return nil, errors.Wrap(err, "update remote cache") + } + if err := d.PushRemoteCache(ctx, provider, *desc, cacheRef); err != nil { + return nil, errors.Wrap(err, "push remote cache") + } + } return desc, err } -func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor, sourceRef string) (*ocispec.Descriptor, error) { +func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, source ocispec.Descriptor, sourceRef string, useRemoteCache bool) (*ocispec.Descriptor, error) { cs := provider.ContentStore() chunkDictPath := "" @@ -275,6 +310,21 @@ func (d *Driver) convert(ctx context.Context, provider accelcontent.Provider, so convertHookFunc := func( ctx context.Context, cs content.Store, orgDesc ocispec.Descriptor, newDesc *ocispec.Descriptor, ) (*ocispec.Descriptor, error) { + if useRemoteCache { + if newDesc.MediaType == nydusutils.MediaTypeNydusBlob { + if _, err := cs.ReaderAt(ctx, *newDesc); err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + newCtx := namespaces.WithNamespace(context.Background(), "acceleration-service") + cacheRef := ctx.Value("remote cache").(string) + if err := d.FetchRemoteBoot(newCtx, provider, *newDesc, cacheRef); err != nil { + return nil, errors.Wrap(err, "fetch remote bootstrap") + } + }else{ + return nil, errors.Wrap(err, "get cache layer reader") + } + } + } + } // Append the nydus bootstrap layer for image manifest. desc, err := nydusify.ConvertHookFunc(mergeOpt)(ctx, cs, orgDesc, newDesc) if err != nil { @@ -399,3 +449,230 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide return &chunkDict, nil } + +// FetchRemoteBoot fetches the bootstrap from remote and unpack it to the work dir. +func (d *Driver) FetchRemoteBoot(ctx context.Context, provider accelcontent.Provider, desc ocispec.Descriptor, cacheRef string) error { + cs := provider.ContentStore() + + info, err := cs.Info(ctx, desc.Digest) + if err != nil { + return err + } + if info.Labels == nil || info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize] == "" { + return errors.Errorf("can not get bootstrap offset and size %s", desc.Digest.String()) + } + offsetSize := strings.Split(info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize], ":") + if len(offsetSize) != 2 { + return errors.Errorf("invalid bootstrap offset and size %s", offsetSize) + } + offset, err := strconv.ParseInt(offsetSize[0], 10, 64) + if err != nil { + return errors.Wrapf(err, "invalid bootstrap offset %s", offsetSize[0]) + } + size, err := strconv.ParseInt(offsetSize[1], 10, 64) + if err != nil { + return errors.Wrapf(err, "invalid bootstrap size %s", offsetSize[1]) + } + + // Use '/' as boot path + bootDst := filepath.Join(d.workDir, desc.Digest.Encoded()) + bootFile, err := os.OpenFile(bootDst, os.O_CREATE|os.O_RDWR, 0655) + if err != nil { + return errors.Wrapf(err, "create boot file %s", bootDst) + } + defer bootFile.Close() + + var ra io.ReadCloser + ra, err = remote.Fetch(ctx, cacheRef, desc, offset, size, false, false) + if err != nil { + return err + } + defer ra.Close() + + if _, err := io.Copy(bootFile, ra); err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + ra, err = remote.Fetch(ctx, cacheRef, desc, offset, size, true, true) + if err != nil { + return err + } + defer ra.Close() + if _, err := io.Copy(bootFile, ra); err != nil { + return errors.Wrapf(err, "fetch bootstrap %s", cacheRef) + } + }else{ + return errors.Wrapf(err, "write boot file %s", bootDst) + } + } + + info.Labels[nydusutils.LayerAnnotationNydusReaderAtPath] = bootDst + if _, err = cs.Update(ctx, info); err != nil { + return errors.Wrap(err, "update layer label") + } + return nil +} + +// FetchRemoteCache fetch cache manifest from remote +func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + resolver, err := pvd.Resolver(ref) + if err != nil { + return err + } + + // TODO: sets max concurrent downloaded layer limit by containerd.WithMaxConcurrentDownloads. + rc := &containerd.RemoteContext{ + Resolver: resolver, + } + name, desc, err := rc.Resolver.Resolve(ctx, ref) + if err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + // Remote cache may do not exist, just return nil + return nil + } + return err + } + fetcher, err := rc.Resolver.Fetcher(ctx, name) + if err != nil { + return err + } + ir, err := fetcher.Fetch(ctx, desc) + if err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + ir, err = fetcher.Fetch(ctx, desc) + if err != nil { + return errors.Wrap(err, "try to pull remote cache") + } + } else { + return errors.Wrap(err, "pull remote cache") + } + } + + bytes, err := io.ReadAll(ir) + if err != nil { + return errors.Wrap(err, "read remote cache to bytes") + } + manifest := ocispec.Manifest{} + if err = json.Unmarshal(bytes, &manifest); err != nil { + return err + } + + cs := pvd.ContentStore() + for _, layer := range manifest.Layers { + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + });err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + return errdefs.ErrNotSupport + } + } + } + return nil +} + +// PushRemoteCache update cache manifest and push to remote +func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, targetDesc ocispec.Descriptor, ref string) error { + imageConfig := ocispec.ImageConfig{} + imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig) + if err != nil { + return errors.Wrap(err, "remote cache image config marshal failed") + } + configReader := bytes.NewReader(imageConfigBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil { + return errors.Wrap(err, "remote cache image config write blob failed") + } + + cs := pvd.ContentStore() + layers := []ocispec.Descriptor{} + if err = cs.Walk(ctx, func(info content.Info) error { + if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok { + layers = append(layers, ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + }) + } + return nil + }); err != nil { + return errors.Wrap(err, "get remote cache layers failed") + } + + manifest := ocispec.Manifest{ + Versioned: specs.Versioned{ + SchemaVersion: 2, + }, + MediaType: ocispec.MediaTypeImageManifest, + Config: *imageConfigDesc, + Layers: layers, + } + manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest) + if err != nil { + return errors.Wrap(err, "remote cache manifest marshal failed") + } + manifestReader := bytes.NewReader(manifestBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil { + return errors.Wrap(err, "remote cache write blob failed") + } + + if err = pvd.Push(ctx, *manifestDesc, ref); err != nil { + return err + } + return nil +} + +// UpdateRemoteCache update cache layer from upper to lower +func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error{ + cs := provider.ContentStore() + + orgManifest := ocispec.Manifest{} + _, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc) + if err != nil { + return errors.Wrap(err, "read original manifest json") + } + + newManifest := ocispec.Manifest{} + _, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc) + if err != nil { + return errors.Wrap(err, "read new manifest json") + } + newLayers := newManifest.Layers[:len(newManifest.Layers) - 1] + + // Update and + // labels for each layer + for i, layer := range newLayers { + layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String() + info, err := cs.Info(ctx, layer.Digest) + if err != nil{ + return errors.Wrap(err, "get layer info") + } + if _, ok := info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize]; !ok { + ra, err := cs.ReaderAt(ctx, layer) + if err != nil { + return errors.Wrap(err, "get layer reader") + } + offset, size, err := nydusutils.SeekBootOffsetAndSize(ra, nydusify.EntryBootstrap, nil) + if err != nil { + return errors.Wrap(err, "seek bootstrap offset and size") + } + layer.Annotations[nydusutils.LayerAnnotationNydusBootOffsetSize] = fmt.Sprintf("%d:%d", offset, size) + }else{ + layer.Annotations[nydusutils.LayerAnnotationNydusBootOffsetSize] = info.Labels[nydusutils.LayerAnnotationNydusBootOffsetSize] + os.Remove(info.Labels[nydusutils.LayerAnnotationNydusReaderAtPath]) + } + } + + // Update cache to lru from upper to lower + for i := len(newLayers) - 1; i >= 0; i-- { + layer := newLayers[i] + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + return errors.Wrap(err, "update cache layer") + } + } + return nil +} diff --git a/pkg/driver/nydus/utils/archive.go b/pkg/driver/nydus/utils/archive.go index e2883418..d9cf1146 100644 --- a/pkg/driver/nydus/utils/archive.go +++ b/pkg/driver/nydus/utils/archive.go @@ -24,6 +24,7 @@ import ( "github.com/containerd/containerd/archive/compression" "github.com/opencontainers/go-digest" + "github.com/containerd/containerd/content" ) // PackTargz makes .tar(.gz) stream of file named `name` and returns reader @@ -175,3 +176,80 @@ func UnpackFile(reader io.Reader, source, target string) error { return nil } + +func SeekBootOffsetAndSize(ra content.ReaderAt, targetName string, maxSize *int64) (int64, int64, error) { + const headerSize = 512 + + if headerSize > ra.Size() { + return 0, 0, fmt.Errorf("invalid nydus tar size %d", ra.Size()) + } + + cur := ra.Size() - headerSize + reader := newSeekReader(ra) + + // Seek from tail to head of nydus formatted tar stream to find + // target data. + for { + // Try to seek the part of tar header. + _, err := reader.Seek(cur, io.SeekStart) + if err != nil { + return 0, 0, fmt.Errorf("seek %d for nydus tar header", cur) + } + + // Parse tar header. + tr := tar.NewReader(reader) + hdr, err := tr.Next() + if err != nil { + return 0, 0, fmt.Errorf("parse nydus tar header") + } + + if cur < hdr.Size { + return 0, 0, fmt.Errorf("invalid nydus tar data, name %s, size %d", hdr.Name, hdr.Size) + } + + if hdr.Name == targetName { + if maxSize != nil && hdr.Size > *maxSize { + return 0, 0, fmt.Errorf("invalid nydus tar size %d", ra.Size()) + } + return cur - hdr.Size, hdr.Size + headerSize, nil + } + + cur = cur - hdr.Size - headerSize + if cur < 0 { + break + } + } + + return 0, 0, fmt.Errorf("can't find target %s by seeking tar", targetName) +} + +type seekReader struct { + io.ReaderAt + pos int64 +} + +func (ra *seekReader) Read(p []byte) (int, error) { + n, err := ra.ReaderAt.ReadAt(p, ra.pos) + ra.pos += int64(n) + return n, err +} + +func (ra *seekReader) Seek(offset int64, whence int) (int64, error) { + switch { + case whence == io.SeekCurrent: + ra.pos += offset + case whence == io.SeekStart: + ra.pos = offset + default: + return 0, fmt.Errorf("unsupported whence %d", whence) + } + + return ra.pos, nil +} + +func newSeekReader(ra io.ReaderAt) *seekReader { + return &seekReader{ + ReaderAt: ra, + pos: 0, + } +} \ No newline at end of file diff --git a/pkg/driver/nydus/utils/constant.go b/pkg/driver/nydus/utils/constant.go index 9053233b..4f7f46a9 100644 --- a/pkg/driver/nydus/utils/constant.go +++ b/pkg/driver/nydus/utils/constant.go @@ -26,6 +26,10 @@ const ( LayerAnnotationNydusBootstrapDigest = "containerd.io/snapshot/nydus-bootstrap-digest" LayerAnnotationNydusFsVersion = "containerd.io/snapshot/nydus-fs-version" LayerAnnotationUncompressed = "containerd.io/uncompressed" + LayerAnnotationNydusSourceDigest = "containerd.io/snapshot/nydus-source-digest" + LayerAnnotationNydusTargetDigest = "containerd.io/snapshot/nydus-target-digest" + LayerAnnotationNydusReaderAtPath = "containerd.io/snapshot/nydus-readerat-digest" + LayerAnnotationNydusBootOffsetSize = "containerd.io/snapshot/nydus-boot-offset-size" ) var NydusAnnotations = []string{LayerAnnotationNydusBlob, LayerAnnotationNydusBootstrap, LayerAnnotationNydusRAFSVersion} diff --git a/pkg/errdefs/errors.go b/pkg/errdefs/errors.go index 39effa84..baa650da 100644 --- a/pkg/errdefs/errors.go +++ b/pkg/errdefs/errors.go @@ -17,6 +17,8 @@ var ( ErrConvertFailed = errors.New("ERR_CONVERT_FAILED") ErrAlreadyConverted = errors.New("ERR_ALREADY_CONVERTED") ErrUnhealthy = errors.New("ERR_UNHEALTHY") + ErrIsRemoteCache = errors.New("ERR_IS_REMOTE_CACHE") + ErrNotSupport = errors.New("ERR_NOT_SUPPORT") ) // IsErrHTTPResponseToHTTPSClient returns whether err is diff --git a/pkg/remote/ported.go b/pkg/remote/ported.go new file mode 100644 index 00000000..f78611a4 --- /dev/null +++ b/pkg/remote/ported.go @@ -0,0 +1,587 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + docker "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/tracing" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +const maxRetry = 3 + +type request struct { + method string + path string + header http.Header + host docker.RegistryHost + body func() (io.ReadCloser, error) + size int64 +} + +type httpReadSeeker struct { + size int64 + offset int64 + rc io.ReadCloser + open func(offset int64) (io.ReadCloser, error) + closed bool + + errsWithNoProgress int +} + +func newHTTPReadSeeker(size, offset int64, open func(offset int64) (io.ReadCloser, error)) (io.ReadCloser, error) { + return &httpReadSeeker{ + size: size, + offset: offset, + open: open, + }, nil +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.closed { + return 0, io.EOF + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + if n > 0 || err == nil { + hrs.errsWithNoProgress = 0 + } + if err == io.ErrUnexpectedEOF { + // connection closed unexpectedly. try reconnecting. + if n == 0 { + hrs.errsWithNoProgress++ + if hrs.errsWithNoProgress > maxRetry { + return // too many retries for this offset with no progress + } + } + if hrs.rc != nil { + if clsErr := hrs.rc.Close(); clsErr != nil { + log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser") + } + hrs.rc = nil + } + if _, err2 := hrs.reader(); err2 == nil { + return n, nil + } + } + return +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.closed { + return nil + } + hrs.closed = true + if hrs.rc != nil { + return hrs.rc.Close() + } + + return nil +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.closed { + return 0, fmt.Errorf("Fetcher.Seek: closed: %w", errdefs.ErrUnavailable) + } + + abs := hrs.offset + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + abs += offset + case io.SeekEnd: + if hrs.size == -1 { + return 0, fmt.Errorf("Fetcher.Seek: unknown size, cannot seek from end: %w", errdefs.ErrUnavailable) + } + abs = hrs.size + offset + default: + return 0, fmt.Errorf("Fetcher.Seek: invalid whence: %w", errdefs.ErrInvalidArgument) + } + + if abs < 0 { + return 0, fmt.Errorf("Fetcher.Seek: negative offset: %w", errdefs.ErrInvalidArgument) + } + + if abs != hrs.offset { + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Error("Fetcher.Seek: failed to close ReadCloser") + } + + hrs.rc = nil + } + + hrs.offset = abs + } + + return hrs.offset, nil +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.rc != nil { + return hrs.rc, nil + } + + if hrs.size == -1 || hrs.offset < hrs.size { + // only try to reopen the body request if we are seeking to a value + // less than the actual size. + if hrs.open == nil { + return nil, fmt.Errorf("cannot open: %w", errdefs.ErrNotImplemented) + } + + rc, err := hrs.open(hrs.offset) + if err != nil { + return nil, fmt.Errorf("httpReadSeeker: failed open: %w", err) + } + + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Error("httpReadSeeker: failed to close ReadCloser") + } + } + hrs.rc = rc + } else { + // There is an edge case here where offset == size of the content. If + // we seek, we will probably get an error for content that cannot be + // sought (?). In that case, we should err on committing the content, + // as the length is already satisfied but we just return the empty + // reader instead. + + hrs.rc = io.NopCloser(bytes.NewReader([]byte{})) + } + + return hrs.rc, nil +} + +func Fetch(ctx context.Context, cacheRef string, desc ocispec.Descriptor, offset, size int64, insecure, plainHTTP bool) (io.ReadCloser, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest)) + + refspec, err := reference.Parse(cacheRef) + if err != nil { + return nil, err + } + cred := NewDockerConfigCredFunc() + + registryHosts := docker.ConfigureDefaultRegistries( + docker.WithAuthorizer( + docker.NewDockerAuthorizer( + docker.WithAuthClient(newDefaultClient(insecure)), + docker.WithAuthCreds(cred), + ), + ), + docker.WithClient(newDefaultClient(insecure)), + docker.WithPlainHTTP(func(host string) (bool, error) { + return plainHTTP, nil + }), + ) + hosts, err := registryHosts(refspec.Hostname()) + if err != nil { + return nil, err + } + + hosts = filterHosts(hosts, docker.HostCapabilityPull) + if len(hosts) == 0 { + return nil, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound) + } + + parts := strings.SplitN(refspec.Locator, "/", 2) + if len(parts) < 2 { + return nil, fmt.Errorf("invalid cache ref: %w", errdefs.ErrInvalidArgument) + } + repository := parts[1] + + ctx, err = docker.ContextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return nil, err + } + + return newHTTPReadSeeker(size + offset, offset, func(offset int64) (io.ReadCloser, error) { + // firstly try fetch via external urls + for _, us := range desc.URLs { + u, err := url.Parse(us) + if err != nil { + log.G(ctx).WithError(err).Debugf("failed to parse %q", us) + continue + } + if u.Scheme != "http" && u.Scheme != "https" { + log.G(ctx).Debug("non-http(s) alternative url is unsupported") + continue + } + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + log.G(ctx).Info("request") + + // Try this first, parse it + host := docker.RegistryHost{ + Client: http.DefaultClient, + Host: u.Host, + Scheme: u.Scheme, + Path: u.Path, + Capabilities: docker.HostCapabilityPull, + } + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository) + // Strip namespace from base + req.path = u.Path + if u.RawQuery != "" { + req.path = req.path + "?" + u.RawQuery + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + if errdefs.IsNotFound(err) { + continue // try one of the other urls. + } + + return nil, err + } + return rc, nil + } + + // Try manifests endpoints for manifests types + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + images.MediaTypeDockerSchema1Manifest, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + + var firstErr error + for _, host := range hosts { + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository, "manifests", desc.Digest.String()) + if err := req.addNamespace(refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + return nil, firstErr + } + + // Finally use blobs endpoints + var firstErr error + for _, host := range hosts { + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository, "blobs", desc.Digest.String()) + if err := req.addNamespace(refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + if errdefs.IsNotFound(firstErr) { + firstErr = fmt.Errorf("could not fetch content descriptor %v (%v) from remote: %w", + desc.Digest, desc.MediaType, errdefs.ErrNotFound, + ) + } + + return nil, firstErr + + }) +} + +func open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) { + if mediatype == "" { + req.header.Set("Accept", "*/*") + } else { + req.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", ")) + } + + if offset > 0 { + // Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints + // will return the header without supporting the range. The content + // range must always be checked. + req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := req.doWithRetries(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if retErr != nil { + resp.Body.Close() + } + }() + + if resp.StatusCode > 299 { + // TODO(stevvooe): When doing a offset specific request, we should + // really distinguish between a 206 and a 200. In the case of 200, we + // can discard the bytes, hiding the seek behavior from the + // implementation. + + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("content at %v not found: %w", req.String(), errdefs.ErrNotFound) + } + var registryErr docker.Errors + if err := json.NewDecoder(resp.Body).Decode(®istryErr); err != nil || registryErr.Len() < 1 { + return nil, fmt.Errorf("unexpected status code %v: %v", req.String(), resp.Status) + } + return nil, fmt.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error()) + } + if offset > 0 { + cr := resp.Header.Get("content-range") + if cr != "" { + if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) { + return nil, fmt.Errorf("unhandled content range in response: %v", cr) + + } + } else { + // TODO: Should any cases where use of content range + // without the proper header be considered? + // 206 responses? + + // Discard up to offset + // Could use buffer pool here but this case should be rare + n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset)) + if err != nil { + return nil, fmt.Errorf("failed to discard to offset: %w", err) + } + if n != offset { + return nil, errors.New("unable to discard to offset") + } + + } + } + + return resp.Body, nil +} + +func newRequest(dockerHeader http.Header, host docker.RegistryHost, method string, + repository string, ps ...string) *request { + header := dockerHeader.Clone() + if header == nil { + header = http.Header{} + } + + for key, value := range host.Header { + header[key] = append(header[key], value...) + } + parts := append([]string{"/", host.Path, repository}, ps...) + p := path.Join(parts...) + // Join strips trailing slash, re-add ending "/" if included + if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") { + p = p + "/" + } + return &request{ + method: method, + path: p, + header: header, + host: host, + } +} + +func (r *request) do(ctx context.Context) (*http.Response, error) { + u := r.host.Scheme + "://" + r.host.Host + r.path + req, err := http.NewRequestWithContext(ctx, r.method, u, nil) + if err != nil { + return nil, err + } + req.Header = http.Header{} // headers need to be copied to avoid concurrent map access + for k, v := range r.header { + req.Header[k] = v + } + if r.body != nil { + body, err := r.body() + if err != nil { + return nil, err + } + req.Body = body + req.GetBody = r.body + if r.size > 0 { + req.ContentLength = r.size + } + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + if err := r.authorize(ctx, req); err != nil { + return nil, fmt.Errorf("failed to authorize: %w", err) + } + + client := &http.Client{} + if r.host.Client != nil { + *client = *r.host.Client + } + if client.CheckRedirect == nil { + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return errors.New("stopped after 10 redirects") + } + if err := r.authorize(ctx, req); err != nil { + return fmt.Errorf("failed to authorize redirect: %w", err) + } + return nil + } + } + _, httpSpan := tracing.StartSpan( + ctx, + tracing.Name("remotes.docker.resolver", "HTTPRequest"), + tracing.WithHTTPRequest(req), + ) + defer httpSpan.End() + resp, err := client.Do(req) + if err != nil { + httpSpan.SetStatus(err) + return nil, fmt.Errorf("failed to do request: %w", err) + } + httpSpan.SetAttributes(tracing.HTTPStatusCodeAttributes(resp.StatusCode)...) + return resp, nil +} + +func (r *request) doWithRetries(ctx context.Context, responses []*http.Response) (*http.Response, error) { + resp, err := r.do(ctx) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + retry, err := r.retryRequest(ctx, responses) + if err != nil { + resp.Body.Close() + return nil, err + } + if retry { + resp.Body.Close() + return r.doWithRetries(ctx, responses) + } + return resp, err +} + +func (r *request) authorize(ctx context.Context, req *http.Request) error { + // Check if has header for host + if r.host.Authorizer != nil { + if err := r.host.Authorizer.Authorize(ctx, req); err != nil { + return err + } + } + return nil +} + +func (r *request) addNamespace(ns string) (err error) { + if !isProxy(r.host.Host, ns) { + return nil + } + var q url.Values + // Parse query + if i := strings.IndexByte(r.path, '?'); i > 0 { + r.path = r.path[:i+1] + q, err = url.ParseQuery(r.path[i+1:]) + if err != nil { + return + } + } else { + r.path = r.path + "?" + q = url.Values{} + } + q.Add("ns", ns) + + r.path = r.path + q.Encode() + + return +} + +func (r *request) retryRequest(ctx context.Context, responses []*http.Response) (bool, error) { + if len(responses) > 5 { + return false, nil + } + last := responses[len(responses)-1] + switch last.StatusCode { + case http.StatusUnauthorized: + log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") + if r.host.Authorizer != nil { + if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil { + return true, nil + } else if !errdefs.IsNotImplemented(err) { + return false, err + } + } + + return false, nil + case http.StatusMethodNotAllowed: + // Support registries which have not properly implemented the HEAD method for + // manifests endpoint + if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") { + r.method = http.MethodGet + return true, nil + } + case http.StatusRequestTimeout, http.StatusTooManyRequests: + return true, nil + } + + // TODO: Handle 50x errors accounting for attempt history + return false, nil +} + +func isProxy(host, refhost string) bool { + if refhost != host { + if refhost != "docker.io" || host != "registry-1.docker.io" { + return true + } + } + return false +} + +func filterHosts(hosts []docker.RegistryHost, caps docker.HostCapabilities) ([]docker.RegistryHost) { + for _, host := range hosts { + if host.Capabilities.Has(caps) { + hosts = append(hosts, host) + } + } + return hosts +} + +func (r *request) String() string { + return r.host.Scheme + "://" + r.host.Host + r.path +} \ No newline at end of file