Skip to content

Commit

Permalink
docker, BlobInfoCache: try to reuse compressed blobs when pushing acr…
Browse files Browse the repository at this point in the history
…oss registries

It seems we try to reuse blobs only for the specified registry, however
we can have valid known compressed digests across registry as well
following pr attempts to use that by doing following steps.

* `CandidateLocations2` now processes all known blobs and appends them
  to returned candidates at the lowest priority. As a result when
`TryReusingBlob` tries to process these candidates and if the blobs
filtered by the `Opaque` set by the `transport` fail to match then
attempt is made against all known blobs (ones which do not belong to the
current registry).

* Increase the sample set of potential blob reuse to all known
  compressed digests , also involving the one which do not belong to
current registry.

* If a blob is found match it against the registry where we are
  attempting to push. If blob is already there consider it a `CACHE
HIT!` and reply skipping blob, since its already there.

How to verify this ?

* Remove all images `buildah rmi --all` // needed so all new blobs can
  be tagged again in common bucket
* Remove any previous `blob-info-cache` by

```console
rm /home/<user>/.local/share/containers/cache/blob-info-cache-v1.boltdb
```

```console
$ skopeo copy docker://registry.fedoraproject.org/fedora-minimal docker://quay.io/fl/test:some-tag
$ buildah pull registry.fedoraproject.org/fedora-minimal
$ buildah tag registry.fedoraproject.org/fedora-minimal quay.io/fl/test
$ buildah push quay.io/fl/test
```

```console
Getting image source signatures
Copying blob a3497ca15bbf skipped: already exists
Copying config f7e02de757 done
Writing manifest to image destination
Storing signatures
```

Signed-off-by: Aditya R <arajan@redhat.com>
  • Loading branch information
flouthoc committed Sep 8, 2022
1 parent 6517443 commit 822867a
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 96 deletions.
44 changes: 28 additions & 16 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,33 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context,
// Then try reusing blobs from other locations.
candidates := options.Cache.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, options.CanSubstitute)
for _, candidate := range candidates {
candidateRepo, err := parseBICLocationReference(candidate.Location)
if err != nil {
logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err)
continue
}
if candidate.CompressorName != blobinfocache.Uncompressed {
logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name())
var candidateRepo reference.Named
if !candidate.UnknownLocation {
candidateRepo, err = parseBICLocationReference(candidate.Location)
if err != nil {
logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err)
continue
}
if candidate.CompressorName != blobinfocache.Uncompressed {
logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name())
} else {
logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name())
}
// Sanity checks:
if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) {
logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref))
continue
}
} else {
logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name())
}

// Sanity checks:
if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) {
logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref))
continue
if candidate.CompressorName != blobinfocache.Uncompressed {
logrus.Debugf("Trying to reuse cached location %s compressed with %s", candidate.Digest.String(), candidate.CompressorName)
} else {
logrus.Debugf("Trying to reuse cached location %s with no compression", candidate.Digest.String())
}
// This digest is a known variant of this blob but we don’t
// have a recorded location in this registry, let’s try looking
// for it in the current repo.
candidateRepo = reference.TrimNamed(d.ref.ref)
}
if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest {
logrus.Debug("... Already tried the primary destination")
Expand All @@ -359,8 +371,8 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context,
// expanded token scope.
extraScope := &authScope{
resourceType: "repository",
remoteName: reference.Path(candidateRepo),
actions: "pull",
remoteName: reference.Path(candidateRepo),
actions: "pull",
}
// This existence check is not, strictly speaking, necessary: We only _really_ need it to get the blob size, and we could record that in the cache instead.
// But a "failed" d.mountBlob currently leaves around an unterminated server-side upload, which we would try to cancel.
Expand Down
7 changes: 4 additions & 3 deletions internal/blobinfocache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type BlobInfoCache2 interface {

// BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2.
type BICReplacementCandidate2 struct {
Digest digest.Digest
CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression
Location types.BICLocationReference
Digest digest.Digest
CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression
UnknownLocation bool // is true when `Location` for this blob is not set
Location types.BICLocationReference // not set if UnknownLocation is set to `true`
}
140 changes: 86 additions & 54 deletions pkg/blobinfocache/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,15 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket with corresponding compression info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them to candidates.
func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime {
// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket
// (or with an unknown location if scopeBucket is nil and v2Output is set) with corresponding compression
// info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them
// to candidates.
func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime {
digestKey := []byte(digest.String())
b := scopeBucket.Bucket(digestKey)
if b == nil {
return candidates
var b *bolt.Bucket
if scopeBucket != nil {
b = scopeBucket.Bucket(digestKey)
}
compressorName := blobinfocache.UnknownCompression
if compressionBucket != nil {
Expand All @@ -297,24 +300,38 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW
compressorName = string(compressorNameValue)
}
}
if compressorName == blobinfocache.UnknownCompression && requireCompressionInfo {
if compressorName == blobinfocache.UnknownCompression && v2Output {
return candidates
}
_ = b.ForEach(func(k, v []byte) error {
t := time.Time{}
if err := t.UnmarshalBinary(v); err != nil {
return err
if b != nil {
_ = b.ForEach(func(k, v []byte) error {
t := time.Time{}
if err := t.UnmarshalBinary(v); err != nil {
return err
}
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
Location: types.BICLocationReference{Opaque: string(k)},
},
LastSeen: t,
})
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
} else {
if v2Output {
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
UnknownLocation: true,
Location: types.BICLocationReference{Opaque: ""},
},
LastSeen: time.Time{},
})
}
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
Location: types.BICLocationReference{Opaque: string(k)},
},
LastSeen: t,
})
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}
return candidates
}

Expand All @@ -328,57 +345,72 @@ func (bdc *cache) CandidateLocations2(transport types.ImageTransport, scope type
return bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, true)
}

func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 {
func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 {
var err error
res := []prioritize.CandidateWithTime{}
resAllBlobs := []prioritize.CandidateWithTime{}
var uncompressedDigestValue digest.Digest // = ""

getCandidatesForUncompressedDigest := func(tx *bolt.Tx, res []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, primaryDigest digest.Digest, v2Output bool) ([]prioritize.CandidateWithTime, error) {
if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" {
b := tx.Bucket(digestByUncompressedBucket)
if b != nil {
b = b.Bucket([]byte(uncompressedDigestValue.String()))
if b != nil {
if err := b.ForEach(func(k, _ []byte) error {
d, err := digest.Parse(string(k))
if err != nil {
return err
}
if d != primaryDigest && d != uncompressedDigestValue {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, v2Output)
}
return nil
}); err != nil {
return nil, err
}
}
}
if uncompressedDigestValue != primaryDigest {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, v2Output)
}
}
return res, nil
}

if err := bdc.view(func(tx *bolt.Tx) error {
scopeBucket := tx.Bucket(knownLocationsBucket)
if scopeBucket == nil {
return nil
if scopeBucket != nil {
scopeBucket = scopeBucket.Bucket([]byte(transport.Name()))
}
scopeBucket = scopeBucket.Bucket([]byte(transport.Name()))
if scopeBucket == nil {
return nil
}
scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque))
if scopeBucket == nil {
return nil
if scopeBucket != nil {
scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque))
}
// compressionBucket won't have been created if previous writers never recorded info about compression,
// and we don't want to fail just because of that
compressionBucket := tx.Bucket(digestCompressorBucket)

res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, requireCompressionInfo)
if canSubstitute {
if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" {
b := tx.Bucket(digestByUncompressedBucket)
if b != nil {
b = b.Bucket([]byte(uncompressedDigestValue.String()))
if b != nil {
if err := b.ForEach(func(k, _ []byte) error {
d, err := digest.Parse(string(k))
if err != nil {
return err
}
if d != primaryDigest && d != uncompressedDigestValue {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, requireCompressionInfo)
}
return nil
}); err != nil {
return err
}
}
}
if uncompressedDigestValue != primaryDigest {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, requireCompressionInfo)
// Add alls potential blobs with location unknown if v2Output is configured true.
resAllBlobs, err = getCandidatesForUncompressedDigest(tx, resAllBlobs, nil, compressionBucket, primaryDigest, v2Output)
if err != nil {
return err
}

if scopeBucket != nil {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, v2Output)
if canSubstitute {
res, err = getCandidatesForUncompressedDigest(tx, res, scopeBucket, compressionBucket, primaryDigest, v2Output)
if err != nil {
return err
}
}
}
return nil
}); err != nil { // Including os.IsNotExist(err)
return []blobinfocache.BICReplacementCandidate2{} // FIXME? Log err (but throttle the log volume on repeated accesses)?
}

res = append(res, resAllBlobs...)
// priortize the result
return prioritize.DestructivelyPrioritizeReplacementCandidates(res, primaryDigest, uncompressedDigestValue)
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/blobinfocache/internal/prioritize/prioritize.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ func (css *candidateSortState) Less(i, j int) bool {
xi := css.cs[i]
xj := css.cs[j]

// If i has location unknown and j has known location
// always treat i as lesser than j or vice-versa so the
// candidate with location unknown alaways gets lower priority.
if xi.Candidate.UnknownLocation && !xj.Candidate.UnknownLocation {
return true
}
if !xi.Candidate.UnknownLocation && xj.Candidate.UnknownLocation {
return false
}

// primaryDigest entries come first, more recent first.
// uncompressedDigest entries, if uncompressedDigest is set and != primaryDigest, come last, more recent entry first.
// Other digest values are primarily sorted by time (more recent first), secondarily by digest (to provide a deterministic order)
Expand Down
72 changes: 49 additions & 23 deletions pkg/blobinfocache/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,43 @@ func (mem *cache) RecordDigestCompressorName(blobDigest digest.Digest, compresso
mem.compressors[blobDigest] = compressorName
}

// appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates.
func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime {
locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: scope, blobDigest: digest}] // nil if not present
for l, t := range locations {
compressorName, compressorKnown := mem.compressors[digest]
if !compressorKnown {
if requireCompressionInfo {
continue
}
compressorName = blobinfocache.UnknownCompression
// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket
// (or with an unknown location if scopeBucket is nil and v2Output is set) with corresponding compression
// info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them
// to candidates.
func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope *types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime {
compressorName := blobinfocache.UnknownCompression
compressorNameValue, compressorKnown := mem.compressors[digest]
if compressorKnown {
compressorName = compressorNameValue
}
if compressorName == blobinfocache.UnknownCompression && v2Output {
return candidates
}
if scope != nil {
locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: *scope, blobDigest: digest}] // nil if not present
for l, t := range locations {
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
Location: l,
},
LastSeen: t,
})
}
} else {
if v2Output {
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
UnknownLocation: true,
Location: types.BICLocationReference{Opaque: ""},
},
LastSeen: time.Time{},
})
}
candidates = append(candidates, prioritize.CandidateWithTime{
Candidate: blobinfocache.BICReplacementCandidate2{
Digest: digest,
CompressorName: compressorName,
Location: l,
},
LastSeen: t,
})
}
return candidates
}
Expand All @@ -163,24 +181,32 @@ func (mem *cache) CandidateLocations2(transport types.ImageTransport, scope type
return mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, true)
}

func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 {
func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, v2Output bool) []blobinfocache.BICReplacementCandidate2 {
mem.mutex.Lock()
defer mem.mutex.Unlock()
res := []prioritize.CandidateWithTime{}
res = mem.appendReplacementCandidates(res, transport, scope, primaryDigest, requireCompressionInfo)
resAllBlobs := []prioritize.CandidateWithTime{}
var uncompressedDigest digest.Digest // = ""
if canSubstitute {
getCandidatesForUncompressedDigest := func(res []prioritize.CandidateWithTime, scope *types.BICTransportScope, digest digest.Digest, v2Output bool) []prioritize.CandidateWithTime {
if uncompressedDigest = mem.uncompressedDigestLocked(primaryDigest); uncompressedDigest != "" {
otherDigests := mem.digestsByUncompressed[uncompressedDigest] // nil if not present in the map
for d := range otherDigests {
if d != primaryDigest && d != uncompressedDigest {
res = mem.appendReplacementCandidates(res, transport, scope, d, requireCompressionInfo)
res = mem.appendReplacementCandidates(res, transport, scope, d, v2Output)
}
}
if uncompressedDigest != primaryDigest {
res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, requireCompressionInfo)
res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, v2Output)
}
}
return res
}
res = mem.appendReplacementCandidates(res, transport, &scope, primaryDigest, v2Output)
if canSubstitute {
res = getCandidatesForUncompressedDigest(res, &scope, primaryDigest, v2Output)
}
// Add alls potential blobs with location unknown if v2Output is configured true.
resAllBlobs = getCandidatesForUncompressedDigest(resAllBlobs, nil, primaryDigest, v2Output)
res = append(res, resAllBlobs...)
return prioritize.DestructivelyPrioritizeReplacementCandidates(res, primaryDigest, uncompressedDigest)
}

0 comments on commit 822867a

Please sign in to comment.