diff --git a/misc/config/config.nydus.ref.yaml b/misc/config/config.nydus.ref.yaml index 8610b0d2..63dc0304 100644 --- a/misc/config/config.nydus.ref.yaml +++ b/misc/config/config.nydus.ref.yaml @@ -31,6 +31,8 @@ provider: gcpolicy: # size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size. threshold: 1000MB + # remote cache record capacity of converted layers, default is 200. + cache_size: 200 converter: # number of worker for executing conversion task @@ -61,3 +63,5 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus-oci-ref + # add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache diff --git a/misc/config/config.nydus.yaml b/misc/config/config.nydus.yaml index abb9733e..35758e58 100644 --- a/misc/config/config.nydus.yaml +++ b/misc/config/config.nydus.yaml @@ -31,6 +31,8 @@ provider: gcpolicy: # size threshold that triggers GC, the oldest used blobs will be reclaimed if exceeds the size. threshold: 1000MB + # remote cache record capacity of converted layers, default is 200. + cache_size: 200 converter: # number of worker for executing conversion task @@ -100,3 +102,5 @@ converter: rules: # add suffix to tag of source image reference as target image reference - tag_suffix: -nydus + # add suffix to tag of source image reference as remote cache reference, leave empty to disable remote cache. + - cache_tag_suffix: -nydus-cache diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 2f4350ac..37318091 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -61,7 +61,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { return nil, errors.Wrap(err, "invalid platform configuration") } - provider, content, err := content.NewLocalProvider(cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, cfg.Host, platformMC) + provider, content, err := content.NewLocalProvider(cfg, platformMC) if err != nil { return nil, errors.Wrap(err, "create content provider") } @@ -95,7 +95,7 @@ func NewLocalAdapter(cfg *config.Config) (*LocalAdapter, error) { } func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { - target, err := adp.rule.Map(source) + target, err := adp.rule.Map(source, TagSuffix) if err != nil { if errors.Is(err, errdefs.ErrAlreadyConverted) { logrus.Infof("image has been converted: %s", source) @@ -103,7 +103,17 @@ func (adp *LocalAdapter) Convert(ctx context.Context, source string) error { } return errors.Wrap(err, "create target reference by rule") } - if _, err = adp.cvt.Convert(ctx, source, target); err != nil { + cacheRef, err := adp.rule.Map(source, CacheTagSuffix) + if err != nil { + if errors.Is(err, errdefs.ErrIsRemoteCache) { + logrus.Infof("image was remote cache: %s", source) + return nil + } + } + if err = adp.content.NewRemoteCache(cacheRef); err != nil { + return err + } + if _, err = adp.cvt.Convert(ctx, source, target, cacheRef); err != nil { return err } if err := adp.content.GC(ctx); err != nil { diff --git a/pkg/adapter/rule.go b/pkg/adapter/rule.go index ccbb8e0a..4e765818 100644 --- a/pkg/adapter/rule.go +++ b/pkg/adapter/rule.go @@ -24,6 +24,11 @@ import ( "github.com/goharbor/acceleration-service/pkg/errdefs" ) +const ( + TagSuffix = "tag_suffix" + CacheTagSuffix = "cache_tag_suffix" +) + // Add suffix to source image reference as the target // image reference, for example: // Source: 192.168.1.1/nginx:latest @@ -47,16 +52,33 @@ type Rule struct { // Map maps the source image reference to a new one according to // a rule, the new one will be used as the reference of target image. -func (rule *Rule) Map(ref string) (string, error) { - for _, item := range rule.items { - if item.TagSuffix != "" { - if strings.HasSuffix(ref, item.TagSuffix) { - // FIXME: To check if an image has been converted, a better solution - // is to use the annotation on image manifest. - return "", errdefs.ErrAlreadyConverted +func (rule *Rule) Map(ref, opt string) (string, error) { + switch opt { + case TagSuffix: + for _, item := range rule.items { + if item.TagSuffix != "" { + if strings.HasSuffix(ref, item.TagSuffix) { + // FIXME: To check if an image has been converted, a better solution + // is to use the annotation on image manifest. + return "", errdefs.ErrAlreadyConverted + } + return addSuffix(ref, item.TagSuffix) + } + } + case CacheTagSuffix: + for _, item := range rule.items { + if item.CacheTagSuffix != "" { + if strings.HasSuffix(ref, item.CacheTagSuffix) { + // FIXME: Ditto.A better way is to use the annotation on image manifest. + return "", errdefs.ErrIsRemoteCache + } + return addSuffix(ref, item.CacheTagSuffix) } - return addSuffix(ref, item.TagSuffix) } + // CacheTagSuffix empty means do not provide remote cache, just return empty string. + return "", nil + default: + return "", fmt.Errorf("unsupported map option: %s", opt) } return "", errors.New("not found matched conversion rule") } diff --git a/pkg/config/config.go b/pkg/config/config.go index 7d3e2d5e..9bb6e881 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -46,9 +46,10 @@ type MetricConfig struct { } type ProviderConfig struct { - Source map[string]SourceConfig `yaml:"source"` - WorkDir string `yaml:"work_dir"` - GCPolicy GCPolicy `yaml:"gcpolicy"` + Source map[string]SourceConfig `yaml:"source"` + WorkDir string `yaml:"work_dir"` + GCPolicy GCPolicy `yaml:"gcpolicy"` + CacheSize int `yaml:"cache_size"` } type GCPolicy struct { @@ -66,7 +67,8 @@ type SourceConfig struct { } type ConversionRule struct { - TagSuffix string `yaml:"tag_suffix"` + TagSuffix string `yaml:"tag_suffix"` + CacheTagSuffix string `yaml:"cache_tag_suffix"` } type ConverterConfig struct { @@ -137,3 +139,12 @@ func (cfg *Config) Host(ref string) (remote.CredentialFunc, bool, error) { return ary[0], ary[1], nil }, auth.Insecure, nil } + +func (cfg *Config) EnableRemoteCache() bool { + for _, rule := range cfg.Converter.Rules { + if rule.CacheTagSuffix != "" { + return true + } + } + return false +} diff --git a/pkg/content/cache.go b/pkg/content/cache.go index 34a2bf0a..c98282c9 100644 --- a/pkg/content/cache.go +++ b/pkg/content/cache.go @@ -23,7 +23,9 @@ import ( "github.com/containerd/containerd/leases" "github.com/containerd/containerd/namespaces" + "github.com/goharbor/acceleration-service/pkg/remote" lru "github.com/hashicorp/golang-lru/v2" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // This is not thread-safe, which means it will depend on the parent implementation to do the locking mechanism. @@ -122,3 +124,61 @@ func (leaseCache *leaseCache) remove(key string, usedCount string) { func (leaseCache *leaseCache) Len() int { return leaseCache.size } + +type RemoteCache struct { + // remoteCache is an LRU cache for caching target layer descriptors, the cache key is the source layer digest, + // and the cache value is the target layer descriptor after conversion. + remoteCache *lru.Cache[string, ocispec.Descriptor] + // cacheRef is the remote cache reference. + cacheRef string + // host is a func to provide registry credential by host name. + host remote.HostFunc + // cacheSize is the remote cache record capacity of converted layers. + cacheSize int +} + +func NewRemoteCache(cacheSize int, host remote.HostFunc) (*RemoteCache, error) { + remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize) + if err != nil { + return nil, err + } + return &RemoteCache{ + remoteCache: remoteCache, + host: host, + cacheSize: cacheSize, + }, nil +} + +func (rc *RemoteCache) Values() []ocispec.Descriptor { + return rc.remoteCache.Values() +} + +func (rc *RemoteCache) Get(key string) (ocispec.Descriptor, bool) { + return rc.remoteCache.Get(key) +} + +func (rc *RemoteCache) Add(key string, value ocispec.Descriptor) { + rc.remoteCache.Add(key, value) +} + +func (rc *RemoteCache) Remove(key string) { + rc.remoteCache.Remove(key) +} + +// Size returns the number of items in the cache. +func (rc *RemoteCache) Size() int { + return rc.remoteCache.Len() + +} + +func (rc *RemoteCache) NewLRUCache(cacheSize int, cacheRef string) error { + if rc != nil { + remoteCache, err := lru.New[string, ocispec.Descriptor](cacheSize) + if err != nil { + return err + } + rc.remoteCache = remoteCache + rc.cacheRef = cacheRef + } + return nil +} diff --git a/pkg/content/cache_test.go b/pkg/content/cache_test.go index 148d04f5..52603705 100644 --- a/pkg/content/cache_test.go +++ b/pkg/content/cache_test.go @@ -44,7 +44,7 @@ func TestLeaseCache(t *testing.T) { func TestLeaseCacheInit(t *testing.T) { os.MkdirAll("./tmp", 0755) defer os.RemoveAll("./tmp") - content, err := NewContent("./tmp", "./tmp", "100MB") + content, err := NewContent("./tmp", "./tmp", "100MB", false, 200, nil) require.NoError(t, err) testDigest := []string{ "sha256:9bb13890319dc01e5f8a4d3d0c4c72685654d682d568350fd38a02b1d70aee6b", diff --git a/pkg/content/content.go b/pkg/content/content.go index 7f0d3126..6612aae6 100644 --- a/pkg/content/content.go +++ b/pkg/content/content.go @@ -21,13 +21,17 @@ import ( "strconv" "time" - "github.com/containerd/containerd/content" + ctrcontent "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/leases" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/namespaces" "github.com/dustin/go-humanize" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" + "github.com/goharbor/acceleration-service/pkg/remote" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -46,15 +50,17 @@ type Content struct { // lc cache the used count and reference order of lease lc *leaseCache // store is the local content store wrapped inner db - store content.Store + store ctrcontent.Store // threshold is the maximum capacity of the local caches storage threshold int64 + // remoteCache is the cache of remote layers + remoteCache *RemoteCache } // NewContent return content support by content store, bolt database and threshold. // content store created in contentDir and bolt database created in databaseDir. // content.db supported by bolt database and content store, content.lm supported by content.db. -func NewContent(contentDir string, databaseDir string, threshold string) (*Content, error) { +func NewContent(contentDir string, databaseDir string, threshold string, useRemoteCache bool, cacheSize int, host remote.HostFunc) (*Content, error) { store, err := local.NewLabeledStore(contentDir, newMemoryLabelStore()) if err != nil { return nil, errors.Wrap(err, "create local provider content store") @@ -76,6 +82,15 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte if err := lc.Init(lm); err != nil { return nil, err } + var remoteCache *RemoteCache + if useRemoteCache { + remoteCache, err = NewRemoteCache(cacheSize, host) + if err != nil { + return nil, err + } + } else { + remoteCache = nil + } content := Content{ db: db, lm: metadata.NewLeaseManager(db), @@ -83,6 +98,7 @@ func NewContent(contentDir string, databaseDir string, threshold string) (*Conte lc: lc, store: db.ContentStore(), threshold: int64(t), + remoteCache: remoteCache, } return &content, nil } @@ -227,38 +243,100 @@ func (content *Content) updateLease(digest *digest.Digest) error { }) } -func (content *Content) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { - return content.store.Info(ctx, dgst) +func (content *Content) Info(ctx context.Context, dgst digest.Digest) (ctrcontent.Info, error) { + info, err := content.store.Info(ctx, dgst) + if content.remoteCache != nil { + if err != nil { + if errors.Is(err, errdefs.ErrNotFound) { + layers := content.remoteCache.Values() + for _, layer := range layers { + if layer.Digest == dgst { + return ctrcontent.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }, nil + } + } + } + return info, err + } + if desc, ok := content.remoteCache.Get(dgst.String()); ok { + if info.Labels == nil { + info.Labels = map[string]string{} + } + info.Labels[nydusutils.LayerAnnotationNydusTargetDigest] = desc.Digest.String() + } + } + return info, err } -func (content *Content) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { +func (content *Content) Update(ctx context.Context, info ctrcontent.Info, fieldpaths ...string) (ctrcontent.Info, error) { + if content.remoteCache != nil { + sourceDesc, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest] + if ok { + l := ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + } + content.remoteCache.Add(sourceDesc, l) + return info, nil + } + } if info.Labels != nil { info.Labels = nil } return content.store.Update(ctx, info, fieldpaths...) } -func (content *Content) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { - return content.store.Walk(ctx, fn, filters...) +func (content *Content) Walk(ctx context.Context, fn ctrcontent.WalkFunc, fs ...string) error { + if content.remoteCache != nil { + filter, err := filters.ParseAll(fs...) + if err != nil { + return err + } + for _, layer := range content.remoteCache.Values() { + info := ctrcontent.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + } + if filter.Match(ctrcontent.AdaptInfo(info)) { + if err := fn(info); err != nil { + return err + } + } + } + } + return content.store.Walk(ctx, fn, fs...) } func (content *Content) Delete(ctx context.Context, dgst digest.Digest) error { return content.store.Delete(ctx, dgst) } -func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) { +func (content *Content) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (ctrcontent.ReaderAt, error) { readerAt, err := content.store.ReaderAt(ctx, desc) if err != nil { + if content.remoteCache != nil && errors.Is(err, errdefs.ErrNotFound) { + for _, layer := range content.remoteCache.Values() { + if layer.Digest == desc.Digest { + return remote.Fetch(ctx, content.remoteCache.cacheRef, desc, content.remoteCache.host, false) + } + } + } return readerAt, err } return readerAt, content.updateLease(&desc.Digest) } -func (content *Content) Status(ctx context.Context, ref string) (content.Status, error) { +func (content *Content) Status(ctx context.Context, ref string) (ctrcontent.Status, error) { return content.store.Status(ctx, ref) } -func (content *Content) ListStatuses(ctx context.Context, filters ...string) ([]content.Status, error) { +func (content *Content) ListStatuses(ctx context.Context, filters ...string) ([]ctrcontent.Status, error) { return content.store.ListStatuses(ctx, filters...) } @@ -266,18 +344,26 @@ func (content *Content) Abort(ctx context.Context, ref string) error { return content.store.Abort(ctx, ref) } -func (content *Content) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { +func (content *Content) Writer(ctx context.Context, opts ...ctrcontent.WriterOpt) (ctrcontent.Writer, error) { writer, err := content.store.Writer(ctx, opts...) return &localWriter{writer, content}, err } +func (content *Content) NewRemoteCache(cacheRef string) error { + if content.remoteCache != nil { + cacheSize := content.remoteCache.cacheSize + return content.remoteCache.NewLRUCache(cacheSize, cacheRef) + } + return nil +} + // localWriter wrap the content.Writer type localWriter struct { - content.Writer + ctrcontent.Writer content *Content } -func (localWriter localWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { +func (localWriter localWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...ctrcontent.Opt) error { // we don't write any lables, drop the opts localWriter.content.updateLease(&expected) return localWriter.Writer.Commit(ctx, size, expected) diff --git a/pkg/content/content_test.go b/pkg/content/content_test.go index ab4eef36..bfe07ddc 100644 --- a/pkg/content/content_test.go +++ b/pkg/content/content_test.go @@ -15,18 +15,112 @@ package content import ( + "context" "os" "testing" + "github.com/containerd/containerd/content" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/stretchr/testify/require" ) func TestContenSize(t *testing.T) { os.MkdirAll("./tmp", 0755) defer os.RemoveAll("./tmp") - content, err := NewContent("./tmp", "./tmp", "1000MB") + content, err := NewContent("./tmp", "./tmp", "1000MB", false, 200, nil) require.NoError(t, err) size, err := content.Size() require.NoError(t, err) require.Equal(t, size, int64(0)) } + +func TestRemoteCacheFIFO(t *testing.T) { + os.MkdirAll("./tmp", 0755) + defer os.RemoveAll("./tmp") + cs, err := NewContent("./tmp", "./tmp", "1000MB", true, 5, nil) + require.NoError(t, err) + + // Assume the following layers were already cached in remote cache + cs.remoteCache.Add("sha256:3ea3641165a4082d1feb7f933b11589a98f6b44906b3ab7224fd57afdb81bd22", ocispec.Descriptor{ + Digest: digest.Digest("sha256:937705f5a7ed2202ef5efab32f37940bcdc3f3bd8da5d472a605f92a1ee2abb8"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 28104, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:3ea3641165a4082d1feb7f933b11589a98f6b44906b3ab7224fd57afdb81bd22", + }, + }) + // This layer will be update by testTargetDesc, with different size. + cs.remoteCache.Add("sha256:d2a0c55811ee17d810756e8abe1c0e1680b83c1674b167116b5d22c92426271e", ocispec.Descriptor{ + Digest: digest.Digest("sha256:29de1420e90dc712e218943e2503bb8f8e7e0ea33094d21386fc6dfb6296cdaf"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 30000000, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:d2a0c55811ee17d810756e8abe1c0e1680b83c1674b167116b5d22c92426271e", + }, + }) + cs.remoteCache.Add("sha256:61c5a878f9b478eabfb7d805f8b28037bc75fc2723a8ed34a25ed926bb235630", ocispec.Descriptor{ + Digest: digest.Digest("sha256:5068ad3783f63b82ecf9434161923dec024275c2c09f79f517a8eaeba9765488"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 4629756, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:61c5a878f9b478eabfb7d805f8b28037bc75fc2723a8ed34a25ed926bb235630", + }, + }) + + // The testTargetDesc is from a manifest, will be update to the remote cache. + // More closer to the front, more lower. More closer to the back, more upper. + testTargetDesc := []ocispec.Descriptor{ + { + Digest: digest.Digest("sha256:281acccc8425676d6cb1840e2656409f58da7e0e8d4c07f9092d35f9c9810e20"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 39784900, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:ccc37bca66e7c29e0d65a4279511fe9a93932a4bb80e79e95144f3812632b61a", + }, + }, + { + Digest: digest.Digest("sha256:29de1420e90dc712e218943e2503bb8f8e7e0ea33094d21386fc6dfb6296cdaf"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 38238606, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:d2a0c55811ee17d810756e8abe1c0e1680b83c1674b167116b5d22c92426271e", + }, + }, + { + Digest: digest.Digest("sha256:a3a8cb24ca266f5d7513f2c8be9a140c9f5abe223e0d80c68024b2ad5b6c928a"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 26308, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:163d9761f77314ae2beb0cfdb0f86245bef6071233fece6a3e4a3d1d4db23c5f", + }, + }, + { + Digest: digest.Digest("sha256:a9453f674413979bd6cdaaeb4399cd8d9c5449cf7625860e6d93eb2f916b4e50"), + MediaType: "application/vnd.oci.image.layer.nydus.blob.v1", + Size: 26315, + Annotations: map[string]string{ + "containerd.io/snapshot/nydus-source-digest": "sha256:c767afe512614d4adf05b9136a92c89b6151804d41ca92f005ce3af0032c36de", + }, + }, + } + + // Apdate cache to lru from upper to lower + ctx := context.Background() + for i := len(testTargetDesc) - 1; i >= 0; i-- { + layer := testTargetDesc[i] + _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }) + require.NoError(t, err) + } + require.Equal(t, cs.remoteCache.Size(), 5) + require.Equal(t, "sha256:5068ad3783f63b82ecf9434161923dec024275c2c09f79f517a8eaeba9765488", cs.remoteCache.Values()[0].Digest.String()) + require.Equal(t, "sha256:a9453f674413979bd6cdaaeb4399cd8d9c5449cf7625860e6d93eb2f916b4e50", cs.remoteCache.Values()[1].Digest.String()) + require.Equal(t, "sha256:a3a8cb24ca266f5d7513f2c8be9a140c9f5abe223e0d80c68024b2ad5b6c928a", cs.remoteCache.Values()[2].Digest.String()) + require.Equal(t, "sha256:29de1420e90dc712e218943e2503bb8f8e7e0ea33094d21386fc6dfb6296cdaf", cs.remoteCache.Values()[3].Digest.String()) + require.Equal(t, int64(38238606), cs.remoteCache.Values()[3].Size) + require.Equal(t, "sha256:281acccc8425676d6cb1840e2656409f58da7e0e8d4c07f9092d35f9c9810e20", cs.remoteCache.Values()[4].Digest.String()) +} diff --git a/pkg/content/local.go b/pkg/content/local.go index c1b2e19f..2582ba86 100644 --- a/pkg/content/local.go +++ b/pkg/content/local.go @@ -25,6 +25,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" + "github.com/goharbor/acceleration-service/pkg/config" "github.com/goharbor/acceleration-service/pkg/remote" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -37,26 +38,24 @@ type LocalProvider struct { content *Content hosts remote.HostFunc platformMC platforms.MatchComparer + cacheRef string } -func NewLocalProvider( - workDir string, - threshold string, - hosts remote.HostFunc, - platformMC platforms.MatchComparer, -) (Provider, *Content, error) { - contentDir := filepath.Join(workDir, "content") +func NewLocalProvider(cfg *config.Config, platformMC platforms.MatchComparer) (Provider, *Content, error) { + contentDir := filepath.Join(cfg.Provider.WorkDir, "content") if err := os.MkdirAll(contentDir, 0755); err != nil { return nil, nil, errors.Wrap(err, "create local provider work directory") } - content, err := NewContent(contentDir, workDir, threshold) + content, err := NewContent(contentDir, cfg.Provider.WorkDir, cfg.Provider.GCPolicy.Threshold, + cfg.EnableRemoteCache(), cfg.Provider.CacheSize, cfg.Host) + if err != nil { return nil, nil, errors.Wrap(err, "create local provider content") } return &LocalProvider{ content: content, images: make(map[string]*ocispec.Descriptor), - hosts: hosts, + hosts: cfg.Host, platformMC: platformMC, }, content, nil } @@ -130,3 +129,15 @@ func (pvd *LocalProvider) getImage(ref string) (*ocispec.Descriptor, error) { } return nil, errdefs.ErrNotFound } + +func (pvd *LocalProvider) SetCacheRef(ref string) { + pvd.mutex.Lock() + defer pvd.mutex.Unlock() + pvd.cacheRef = ref +} + +func (pvd *LocalProvider) GetCacheRef() string { + pvd.mutex.Lock() + defer pvd.mutex.Unlock() + return pvd.cacheRef +} diff --git a/pkg/content/provider.go b/pkg/content/provider.go index 9dcf6046..e11d4d57 100644 --- a/pkg/content/provider.go +++ b/pkg/content/provider.go @@ -42,4 +42,8 @@ type Provider interface { Image(ctx context.Context, ref string) (*ocispec.Descriptor, error) // ContentStore gets the content store object of containerd. ContentStore() content.Store + // SetCacheRef sets the cache reference of the source image. + SetCacheRef(ref string) + // GetCacheRef gets the cache reference of the source image. + GetCacheRef() string } diff --git a/pkg/converter/converter.go b/pkg/converter/converter.go index fae1d97a..5fe55c15 100644 --- a/pkg/converter/converter.go +++ b/pkg/converter/converter.go @@ -88,7 +88,7 @@ func (cvt *Converter) pull(ctx context.Context, source string) error { return nil } -func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metric, error) { +func (cvt *Converter) Convert(ctx context.Context, source, target, cacheRef string) (*Metric, error) { var metric Metric sourceNamed, err := docker.ParseDockerRef(source) if err != nil { @@ -122,6 +122,7 @@ func (cvt *Converter) Convert(ctx context.Context, source, target string) (*Metr logger.Infof("converting image %s", source) start = time.Now() + cvt.provider.SetCacheRef(cacheRef) desc, err := cvt.driver.Convert(ctx, cvt.provider, source) if err != nil { return nil, errors.Wrap(err, "convert image") diff --git a/pkg/driver/nydus/nydus.go b/pkg/driver/nydus/nydus.go index 2c621c0c..7e8b7de0 100644 --- a/pkg/driver/nydus/nydus.go +++ b/pkg/driver/nydus/nydus.go @@ -17,7 +17,9 @@ package nydus import ( "bytes" "context" + "encoding/json" "fmt" + "io" "os" "os/exec" "regexp" @@ -25,19 +27,20 @@ import ( "strings" "sync" - "github.com/goharbor/acceleration-service/pkg/adapter/annotation" - accelcontent "github.com/goharbor/acceleration-service/pkg/content" - "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" - nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" - "github.com/goharbor/acceleration-service/pkg/errdefs" - "github.com/goharbor/acceleration-service/pkg/utils" - + "github.com/containerd/containerd" "github.com/containerd/containerd/content" + containerdErrDefs "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/images/converter" "github.com/containerd/containerd/platforms" "github.com/containerd/nydus-snapshotter/pkg/backend" nydusify "github.com/containerd/nydus-snapshotter/pkg/converter" + "github.com/goharbor/acceleration-service/pkg/adapter/annotation" + accelcontent "github.com/goharbor/acceleration-service/pkg/content" + "github.com/goharbor/acceleration-service/pkg/driver/nydus/parser" + nydusutils "github.com/goharbor/acceleration-service/pkg/driver/nydus/utils" + "github.com/goharbor/acceleration-service/pkg/errdefs" + "github.com/goharbor/acceleration-service/pkg/utils" "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -58,6 +61,8 @@ const ( var builderVersion string var builderVersionOnce sync.Once +type CacheRef struct{} + type chunkDictInfo struct { BootstrapPath string } @@ -224,6 +229,20 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if err != nil { return nil, errors.Wrap(err, "get source image") } + + cacheRef := provider.GetCacheRef() + useRemoteCache := cacheRef != "" + if useRemoteCache { + logrus.Infof("remote cache image reference: %s", cacheRef) + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + if errors.Is(err, errdefs.ErrNotSupport) { + logrus.Warn("Content store does not support remote cache") + } else { + return nil, errors.Wrap(err, "fetch remote cache") + } + } + } + desc, err := d.convert(ctx, provider, *image, sourceRef) if err != nil { return nil, err @@ -231,6 +250,19 @@ func (d *Driver) Convert(ctx context.Context, provider accelcontent.Provider, so if d.mergeManifest { return d.makeManifestIndex(ctx, provider.ContentStore(), *image, *desc) } + + if useRemoteCache { + // Fetch the old remote cache before updating and pushing the new one to avoid conflict. + if err := d.FetchRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "fetch remote cache") + } + if err := d.UpdateRemoteCache(ctx, provider, *image, *desc); err != nil { + return nil, errors.Wrap(err, "update remote cache") + } + if err := d.PushRemoteCache(ctx, provider, cacheRef); err != nil { + return nil, errors.Wrap(err, "push remote cache") + } + } return desc, err } @@ -399,3 +431,152 @@ func (d *Driver) getChunkDict(ctx context.Context, provider accelcontent.Provide return &chunkDict, nil } + +// FetchRemoteCache fetch cache manifest from remote +func (d *Driver) FetchRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + resolver, err := pvd.Resolver(ref) + if err != nil { + return err + } + + rc := &containerd.RemoteContext{ + Resolver: resolver, + } + name, desc, err := rc.Resolver.Resolve(ctx, ref) + if err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + // Remote cache may do not exist, just return nil + return nil + } + return err + } + fetcher, err := rc.Resolver.Fetcher(ctx, name) + if err != nil { + return err + } + ir, err := fetcher.Fetch(ctx, desc) + if err != nil { + if errdefs.NeedsRetryWithHTTP(err) { + pvd.UsePlainHTTP() + ir, err = fetcher.Fetch(ctx, desc) + if err != nil { + return errors.Wrap(err, "try to pull remote cache") + } + } else { + return errors.Wrap(err, "pull remote cache") + } + } + + bytes, err := io.ReadAll(ir) + if err != nil { + return errors.Wrap(err, "read remote cache to bytes") + } + + // TODO: handle manifest list for multiple platform. + manifest := ocispec.Manifest{} + if err = json.Unmarshal(bytes, &manifest); err != nil { + return err + } + + cs := pvd.ContentStore() + for _, layer := range manifest.Layers { + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + if errors.Is(err, containerdErrDefs.ErrNotFound) { + return errdefs.ErrNotSupport + } + return errors.Wrap(err, "update cache layer") + } + } + return nil +} + +// PushRemoteCache update cache manifest and push to remote +func (d *Driver) PushRemoteCache(ctx context.Context, pvd accelcontent.Provider, ref string) error { + imageConfig := ocispec.ImageConfig{} + imageConfigDesc, imageConfigBytes, err := nydusutils.MarshalToDesc(imageConfig, ocispec.MediaTypeImageConfig) + if err != nil { + return errors.Wrap(err, "remote cache image config marshal failed") + } + configReader := bytes.NewReader(imageConfigBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, configReader, *imageConfigDesc); err != nil { + return errors.Wrap(err, "remote cache image config write blob failed") + } + + cs := pvd.ContentStore() + layers := []ocispec.Descriptor{} + if err = cs.Walk(ctx, func(info content.Info) error { + if _, ok := info.Labels[nydusutils.LayerAnnotationNydusSourceDigest]; ok { + layers = append(layers, ocispec.Descriptor{ + MediaType: nydusutils.MediaTypeNydusBlob, + Digest: info.Digest, + Size: info.Size, + Annotations: info.Labels, + }) + } + return nil + }); err != nil { + return errors.Wrap(err, "get remote cache layers failed") + } + + manifest := ocispec.Manifest{ + Versioned: specs.Versioned{ + SchemaVersion: 2, + }, + MediaType: ocispec.MediaTypeImageManifest, + Config: *imageConfigDesc, + Layers: layers, + } + manifestDesc, manifestBytes, err := nydusutils.MarshalToDesc(manifest, ocispec.MediaTypeImageManifest) + if err != nil { + return errors.Wrap(err, "remote cache manifest marshal failed") + } + manifestReader := bytes.NewReader(manifestBytes) + if err = content.WriteBlob(ctx, pvd.ContentStore(), ref, manifestReader, *manifestDesc); err != nil { + return errors.Wrap(err, "remote cache write blob failed") + } + + if err = pvd.Push(ctx, *manifestDesc, ref); err != nil { + return err + } + return nil +} + +// UpdateRemoteCache update cache layer from upper to lower +func (d *Driver) UpdateRemoteCache(ctx context.Context, provider accelcontent.Provider, orgDesc ocispec.Descriptor, newDesc ocispec.Descriptor) error { + cs := provider.ContentStore() + + orgManifest := ocispec.Manifest{} + _, err := utils.ReadJSON(ctx, cs, &orgManifest, orgDesc) + if err != nil { + return errors.Wrap(err, "read original manifest json") + } + + newManifest := ocispec.Manifest{} + _, err = utils.ReadJSON(ctx, cs, &newManifest, newDesc) + if err != nil { + return errors.Wrap(err, "read new manifest json") + } + newLayers := newManifest.Layers[:len(newManifest.Layers)-1] + + // Update label for each layer + for i, layer := range newLayers { + layer.Annotations[nydusutils.LayerAnnotationNydusSourceDigest] = orgManifest.Layers[i].Digest.String() + } + + // Update cache to lru from upper to lower + for i := len(newLayers) - 1; i >= 0; i-- { + layer := newLayers[i] + if _, err := cs.Update(ctx, content.Info{ + Digest: layer.Digest, + Size: layer.Size, + Labels: layer.Annotations, + }); err != nil { + return errors.Wrap(err, "update cache layer") + } + } + return nil +} diff --git a/pkg/driver/nydus/utils/constant.go b/pkg/driver/nydus/utils/constant.go index 9053233b..7c552762 100644 --- a/pkg/driver/nydus/utils/constant.go +++ b/pkg/driver/nydus/utils/constant.go @@ -26,6 +26,8 @@ const ( LayerAnnotationNydusBootstrapDigest = "containerd.io/snapshot/nydus-bootstrap-digest" LayerAnnotationNydusFsVersion = "containerd.io/snapshot/nydus-fs-version" LayerAnnotationUncompressed = "containerd.io/uncompressed" + LayerAnnotationNydusSourceDigest = "containerd.io/snapshot/nydus-source-digest" + LayerAnnotationNydusTargetDigest = "containerd.io/snapshot/nydus-target-digest" ) var NydusAnnotations = []string{LayerAnnotationNydusBlob, LayerAnnotationNydusBootstrap, LayerAnnotationNydusRAFSVersion} diff --git a/pkg/errdefs/errors.go b/pkg/errdefs/errors.go index 39effa84..bf3d06e9 100644 --- a/pkg/errdefs/errors.go +++ b/pkg/errdefs/errors.go @@ -17,6 +17,8 @@ var ( ErrConvertFailed = errors.New("ERR_CONVERT_FAILED") ErrAlreadyConverted = errors.New("ERR_ALREADY_CONVERTED") ErrUnhealthy = errors.New("ERR_UNHEALTHY") + ErrIsRemoteCache = errors.New("ERR_IS_REMOTE_CACHE") + ErrNotSupport = errors.New("ERR_NOT_SUPPORT") ) // IsErrHTTPResponseToHTTPSClient returns whether err is diff --git a/pkg/remote/ported.go b/pkg/remote/ported.go new file mode 100644 index 00000000..70c48414 --- /dev/null +++ b/pkg/remote/ported.go @@ -0,0 +1,617 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + docker "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/tracing" + acceldErrdefs "github.com/goharbor/acceleration-service/pkg/errdefs" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +const maxRetry = 3 + +// Modified from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/fetcher.go +func Fetch(ctx context.Context, cacheRef string, desc ocispec.Descriptor, insecureHost HostFunc, plainHTTP bool) (content.ReaderAt, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest)) + + refspec, err := reference.Parse(cacheRef) + if err != nil { + return nil, err + } + cred, insecure, err := insecureHost(cacheRef) + if err != nil { + return nil, err + } + + registryHosts := docker.ConfigureDefaultRegistries( + docker.WithAuthorizer( + docker.NewDockerAuthorizer( + docker.WithAuthClient(newDefaultClient(insecure)), + docker.WithAuthCreds(cred), + ), + ), + docker.WithClient(newDefaultClient(insecure)), + docker.WithPlainHTTP(func(host string) (bool, error) { + return plainHTTP, nil + }), + ) + hosts, err := registryHosts(refspec.Hostname()) + if err != nil { + return nil, err + } + + hosts = filterHosts(hosts, docker.HostCapabilityPull) + if len(hosts) == 0 { + return nil, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound) + } + + parts := strings.SplitN(refspec.Locator, "/", 2) + if len(parts) < 2 { + return nil, fmt.Errorf("invalid cache ref: %w", errdefs.ErrInvalidArgument) + } + repository := parts[1] + + ctx, err = docker.ContextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return nil, err + } + + hrs, _ := newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { + // firstly try fetch via external urls + for _, us := range desc.URLs { + u, err := url.Parse(us) + if err != nil { + log.G(ctx).WithError(err).Debugf("failed to parse %q", us) + continue + } + if u.Scheme != "http" && u.Scheme != "https" { + log.G(ctx).Debug("non-http(s) alternative url is unsupported") + continue + } + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + log.G(ctx).Info("request") + + // Try this first, parse it + host := docker.RegistryHost{ + Client: http.DefaultClient, + Host: u.Host, + Scheme: u.Scheme, + Path: u.Path, + Capabilities: docker.HostCapabilityPull, + } + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository) + // Strip namespace from base + req.path = u.Path + if u.RawQuery != "" { + req.path = req.path + "?" + u.RawQuery + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + if errdefs.IsNotFound(err) { + continue // try one of the other urls. + } + + return nil, err + } + return rc, nil + } + + // Try manifests endpoints for manifests types + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + images.MediaTypeDockerSchema1Manifest, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + + var firstErr error + for _, host := range hosts { + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository, "manifests", desc.Digest.String()) + if err := req.addNamespace(refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + return nil, firstErr + } + + // Finally use blobs endpoints + var firstErr error + for _, host := range hosts { + header := http.Header{} + req := newRequest(header, host, http.MethodGet, repository, "blobs", desc.Digest.String()) + if err := req.addNamespace(refspec.Hostname()); err != nil { + return nil, err + } + + rc, err := open(ctx, req, desc.MediaType, offset) + if err != nil { + // Store the error for referencing later + if firstErr == nil { + firstErr = err + } + continue // try another host + } + + return rc, nil + } + + if errdefs.IsNotFound(firstErr) { + firstErr = fmt.Errorf("could not fetch content descriptor %v (%v) from remote: %w", + desc.Digest, desc.MediaType, errdefs.ErrNotFound, + ) + } + + return nil, firstErr + + }) + // try to use open to trigger http request + if _, err = hrs.open(0); err != nil { + if acceldErrdefs.NeedsRetryWithHTTP(err) { + return Fetch(ctx, cacheRef, desc, insecureHost, true) + } + } + return hrs, nil +} + +// Modified from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/httpreadseeker.go +type httpReadSeeker struct { + size int64 + offset int64 + rc io.ReadCloser + open func(offset int64) (io.ReadCloser, error) + closed bool + + errsWithNoProgress int +} + +func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (*httpReadSeeker, error) { + return &httpReadSeeker{ + size: size, + open: open, + }, nil +} + +func (hrs *httpReadSeeker) ReadAt(p []byte, offset int64) (n int, err error) { + if _, err = hrs.Seek(offset, io.SeekStart); err != nil { + return 0, err + } + return hrs.Read(p) +} + +func (hrs *httpReadSeeker) Size() int64 { + return hrs.size +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.closed { + return 0, io.EOF + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + if n > 0 || err == nil { + hrs.errsWithNoProgress = 0 + } + if err == io.ErrUnexpectedEOF { + // connection closed unexpectedly. try reconnecting. + if n == 0 { + hrs.errsWithNoProgress++ + if hrs.errsWithNoProgress > maxRetry { + return // too many retries for this offset with no progress + } + } + if hrs.rc != nil { + if clsErr := hrs.rc.Close(); clsErr != nil { + log.L.WithError(clsErr).Error("httpReadSeeker: failed to close ReadCloser") + } + hrs.rc = nil + } + if _, err2 := hrs.reader(); err2 == nil { + return n, nil + } + } + return +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.closed { + return nil + } + hrs.closed = true + if hrs.rc != nil { + return hrs.rc.Close() + } + + return nil +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.closed { + return 0, fmt.Errorf("Fetcher.Seek: closed: %w", errdefs.ErrUnavailable) + } + + abs := hrs.offset + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + abs += offset + case io.SeekEnd: + if hrs.size == -1 { + return 0, fmt.Errorf("Fetcher.Seek: unknown size, cannot seek from end: %w", errdefs.ErrUnavailable) + } + abs = hrs.size + offset + default: + return 0, fmt.Errorf("Fetcher.Seek: invalid whence: %w", errdefs.ErrInvalidArgument) + } + + if abs < 0 { + return 0, fmt.Errorf("Fetcher.Seek: negative offset: %w", errdefs.ErrInvalidArgument) + } + + if abs != hrs.offset { + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Error("Fetcher.Seek: failed to close ReadCloser") + } + + hrs.rc = nil + } + + hrs.offset = abs + } + + return hrs.offset, nil +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.rc != nil { + return hrs.rc, nil + } + + if hrs.size == -1 || hrs.offset < hrs.size { + // only try to reopen the body request if we are seeking to a value + // less than the actual size. + if hrs.open == nil { + return nil, fmt.Errorf("cannot open: %w", errdefs.ErrNotImplemented) + } + + rc, err := hrs.open(hrs.offset) + if err != nil { + return nil, fmt.Errorf("httpReadSeeker: failed open: %w", err) + } + + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Error("httpReadSeeker: failed to close ReadCloser") + } + } + hrs.rc = rc + } else { + // There is an edge case here where offset == size of the content. If + // we seek, we will probably get an error for content that cannot be + // sought (?). In that case, we should err on committing the content, + // as the length is already satisfied but we just return the empty + // reader instead. + + hrs.rc = io.NopCloser(bytes.NewReader([]byte{})) + } + + return hrs.rc, nil +} + +// Ported from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/fetcher.go +func open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) { + if mediatype == "" { + req.header.Set("Accept", "*/*") + } else { + req.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", ")) + } + + if offset > 0 { + // Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints + // will return the header without supporting the range. The content + // range must always be checked. + req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := req.doWithRetries(ctx, nil) + if err != nil { + return nil, err + } + defer func() { + if retErr != nil { + resp.Body.Close() + } + }() + + if resp.StatusCode > 299 { + // TODO(stevvooe): When doing a offset specific request, we should + // really distinguish between a 206 and a 200. In the case of 200, we + // can discard the bytes, hiding the seek behavior from the + // implementation. + + if resp.StatusCode == http.StatusNotFound { + return nil, fmt.Errorf("content at %v not found: %w", req.String(), errdefs.ErrNotFound) + } + var registryErr docker.Errors + if err := json.NewDecoder(resp.Body).Decode(®istryErr); err != nil || registryErr.Len() < 1 { + return nil, fmt.Errorf("unexpected status code %v: %v", req.String(), resp.Status) + } + return nil, fmt.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error()) + } + if offset > 0 { + cr := resp.Header.Get("content-range") + if cr != "" { + if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) { + return nil, fmt.Errorf("unhandled content range in response: %v", cr) + + } + } else { + // TODO: Should any cases where use of content range + // without the proper header be considered? + // 206 responses? + + // Discard up to offset + // Could use buffer pool here but this case should be rare + n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset)) + if err != nil { + return nil, fmt.Errorf("failed to discard to offset: %w", err) + } + if n != offset { + return nil, errors.New("unable to discard to offset") + } + + } + } + + return resp.Body, nil +} + +// Ported from containerd project, copyright The containerd Authors. +// https://github.com/containerd/containerd/remotes/docker/resolver.go +type request struct { + method string + path string + header http.Header + host docker.RegistryHost + body func() (io.ReadCloser, error) + size int64 +} + +func newRequest(dockerHeader http.Header, host docker.RegistryHost, method string, + repository string, ps ...string) *request { + header := dockerHeader.Clone() + if header == nil { + header = http.Header{} + } + + for key, value := range host.Header { + header[key] = append(header[key], value...) + } + parts := append([]string{"/", host.Path, repository}, ps...) + p := path.Join(parts...) + // Join strips trailing slash, re-add ending "/" if included + if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") { + p = p + "/" + } + return &request{ + method: method, + path: p, + header: header, + host: host, + } +} + +func (r *request) do(ctx context.Context) (*http.Response, error) { + u := r.host.Scheme + "://" + r.host.Host + r.path + req, err := http.NewRequestWithContext(ctx, r.method, u, nil) + if err != nil { + return nil, err + } + req.Header = http.Header{} // headers need to be copied to avoid concurrent map access + for k, v := range r.header { + req.Header[k] = v + } + if r.body != nil { + body, err := r.body() + if err != nil { + return nil, err + } + req.Body = body + req.GetBody = r.body + if r.size > 0 { + req.ContentLength = r.size + } + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u)) + if err := r.authorize(ctx, req); err != nil { + return nil, fmt.Errorf("failed to authorize: %w", err) + } + + client := &http.Client{} + if r.host.Client != nil { + *client = *r.host.Client + } + if client.CheckRedirect == nil { + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return errors.New("stopped after 10 redirects") + } + if err := r.authorize(ctx, req); err != nil { + return fmt.Errorf("failed to authorize redirect: %w", err) + } + return nil + } + } + _, httpSpan := tracing.StartSpan( + ctx, + tracing.Name("remotes.docker.resolver", "HTTPRequest"), + tracing.WithHTTPRequest(req), + ) + defer httpSpan.End() + resp, err := client.Do(req) + if err != nil { + httpSpan.SetStatus(err) + return nil, fmt.Errorf("failed to do request: %w", err) + } + httpSpan.SetAttributes(tracing.HTTPStatusCodeAttributes(resp.StatusCode)...) + return resp, nil +} + +func (r *request) doWithRetries(ctx context.Context, responses []*http.Response) (*http.Response, error) { + resp, err := r.do(ctx) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + retry, err := r.retryRequest(ctx, responses) + if err != nil { + resp.Body.Close() + return nil, err + } + if retry { + resp.Body.Close() + return r.doWithRetries(ctx, responses) + } + return resp, err +} + +func (r *request) authorize(ctx context.Context, req *http.Request) error { + // Check if has header for host + if r.host.Authorizer != nil { + if err := r.host.Authorizer.Authorize(ctx, req); err != nil { + return err + } + } + return nil +} + +func (r *request) addNamespace(ns string) (err error) { + if !isProxy(r.host.Host, ns) { + return nil + } + var q url.Values + // Parse query + if i := strings.IndexByte(r.path, '?'); i > 0 { + r.path = r.path[:i+1] + q, err = url.ParseQuery(r.path[i+1:]) + if err != nil { + return + } + } else { + r.path = r.path + "?" + q = url.Values{} + } + q.Add("ns", ns) + + r.path = r.path + q.Encode() + + return +} + +func (r *request) retryRequest(ctx context.Context, responses []*http.Response) (bool, error) { + if len(responses) > 5 { + return false, nil + } + last := responses[len(responses)-1] + switch last.StatusCode { + case http.StatusUnauthorized: + log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") + if r.host.Authorizer != nil { + if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil { + return true, nil + } else if !errdefs.IsNotImplemented(err) { + return false, err + } + } + + return false, nil + case http.StatusMethodNotAllowed: + // Support registries which have not properly implemented the HEAD method for + // manifests endpoint + if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") { + r.method = http.MethodGet + return true, nil + } + case http.StatusRequestTimeout, http.StatusTooManyRequests: + return true, nil + } + + // TODO: Handle 50x errors accounting for attempt history + return false, nil +} + +func (r *request) String() string { + return r.host.Scheme + "://" + r.host.Host + r.path +} + +func isProxy(host, refhost string) bool { + if refhost != host { + if refhost != "docker.io" || host != "registry-1.docker.io" { + return true + } + } + return false +} + +func filterHosts(hosts []docker.RegistryHost, caps docker.HostCapabilities) []docker.RegistryHost { + for _, host := range hosts { + if host.Capabilities.Has(caps) { + hosts = append(hosts, host) + } + } + return hosts +}