diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index 44b45c472c..17b459cca7 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -332,21 +332,31 @@ func (d *dockerImageDestination) TryReusingBlobWithOptions(ctx context.Context, // Then try reusing blobs from other locations. candidates := options.Cache.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, options.CanSubstitute) for _, candidate := range candidates { - candidateRepo, err := parseBICLocationReference(candidate.Location) - if err != nil { - logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) - continue + var candidateRepo reference.Named + if !candidate.UnknownLocation { + candidateRepo, err = parseBICLocationReference(candidate.Location) + if err != nil { + logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err) + continue + } } if candidate.CompressorName != blobinfocache.Uncompressed { - logrus.Debugf("Trying to reuse cached location %s compressed with %s in %s", candidate.Digest.String(), candidate.CompressorName, candidateRepo.Name()) + logrus.Debugf("Trying to reuse cached location %s compressed with %s", candidate.Digest.String(), candidate.CompressorName) } else { - logrus.Debugf("Trying to reuse cached location %s with no compression in %s", candidate.Digest.String(), candidateRepo.Name()) + logrus.Debugf("Trying to reuse cached location %s with no compression", candidate.Digest.String()) } // Sanity checks: - if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { - logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) - continue + if candidate.UnknownLocation || reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) { + if !candidate.UnknownLocation { + logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref)) + continue + } + // Found following blob in different registry but + // we need to check blob presence against the registry + // where we are planning to push, hence switch back the + // candidate repo to the one where we are planning to push. + candidateRepo = d.ref.ref } if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest { logrus.Debug("... Already tried the primary destination") diff --git a/internal/blobinfocache/types.go b/internal/blobinfocache/types.go index 3c2be57f32..cd3fce4418 100644 --- a/internal/blobinfocache/types.go +++ b/internal/blobinfocache/types.go @@ -39,7 +39,8 @@ type BlobInfoCache2 interface { // BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2. type BICReplacementCandidate2 struct { - Digest digest.Digest - CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression - Location types.BICLocationReference + Digest digest.Digest + CompressorName string // either the Name() of a known pkg/compression.Algorithm, or Uncompressed or UnknownCompression + Location types.BICLocationReference + UnknownLocation bool // is true when `Location` for this blob is not set } diff --git a/pkg/blobinfocache/boltdb/boltdb.go b/pkg/blobinfocache/boltdb/boltdb.go index a472efd95b..12cefcd863 100644 --- a/pkg/blobinfocache/boltdb/boltdb.go +++ b/pkg/blobinfocache/boltdb/boltdb.go @@ -285,9 +285,9 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type // appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket with corresponding compression info from compressionBucket (if compressionBucket is not nil), and returns the result of appending them to candidates. func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { digestKey := []byte(digest.String()) - b := scopeBucket.Bucket(digestKey) - if b == nil { - return candidates + var b *bolt.Bucket + if scopeBucket != nil { + b = scopeBucket.Bucket(digestKey) } compressorName := blobinfocache.UnknownCompression if compressionBucket != nil { @@ -300,21 +300,33 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW if compressorName == blobinfocache.UnknownCompression && requireCompressionInfo { return candidates } - _ = b.ForEach(func(k, v []byte) error { - t := time.Time{} - if err := t.UnmarshalBinary(v); err != nil { - return err - } + if b != nil { + _ = b.ForEach(func(k, v []byte) error { + t := time.Time{} + if err := t.UnmarshalBinary(v); err != nil { + return err + } + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: types.BICLocationReference{Opaque: string(k)}, + }, + LastSeen: t, + }) + return nil + }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } else { candidates = append(candidates, prioritize.CandidateWithTime{ Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: types.BICLocationReference{Opaque: string(k)}, + Digest: digest, + CompressorName: compressorName, + Location: types.BICLocationReference{Opaque: ""}, + UnknownLocation: true, }, - LastSeen: t, + LastSeen: time.Time{}, }) - return nil - }) // FIXME? Log error (but throttle the log volume on repeated accesses)? + } return candidates } @@ -325,7 +337,49 @@ func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateW // data from previous RecordDigestUncompressedPair calls is used to also look up variants of the blob which have the same // uncompressed digest. func (bdc *cache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute bool) []blobinfocache.BICReplacementCandidate2 { - return bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) + resAllBlobs := []prioritize.CandidateWithTime{} + var uncompressedDigestValue digest.Digest // = "" + requireCompressionInfo := true + resultPrioritized := bdc.candidateLocations(transport, scope, primaryDigest, canSubstitute, requireCompressionInfo) + // Reprocess all blobs and append them to resultPrioritized in lower priority + if err := bdc.view(func(tx *bolt.Tx) error { + // compressionBucket won't have been created if previous writers never recorded info about compression, + // and we don't want to fail just because of that + compressionBucket := tx.Bucket(digestCompressorBucket) + if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" { + b := tx.Bucket(digestByUncompressedBucket) + if b != nil { + b = b.Bucket([]byte(uncompressedDigestValue.String())) + if b != nil { + if err := b.ForEach(func(k, _ []byte) error { + d, err := digest.Parse(string(k)) + if err != nil { + return err + } + if d != primaryDigest && d != uncompressedDigestValue { + resAllBlobs = append(resAllBlobs, bdc.appendReplacementCandidates(resAllBlobs, nil, compressionBucket, d, requireCompressionInfo)...) + } + return nil + }); err != nil { + return err + } + } + } + if uncompressedDigestValue != primaryDigest { + resAllBlobs = append(resAllBlobs, bdc.appendReplacementCandidates(resAllBlobs, nil, compressionBucket, uncompressedDigestValue, requireCompressionInfo)...) + } + } + return nil + }); err != nil { + return []blobinfocache.BICReplacementCandidate2{} + } + + resultPrioritizedAllBlobs := prioritize.DestructivelyPrioritizeReplacementCandidates(resAllBlobs, primaryDigest, uncompressedDigestValue) + // Append `resultPrioritizedAllBlobs` after blobs + // which are generated from bucket with provided `scope` + // as a result `resultPrioritizedAllBlobs` will always + // get lower priority while processing candidates. + return append(resultPrioritized, resultPrioritizedAllBlobs...) } func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { diff --git a/pkg/blobinfocache/memory/memory.go b/pkg/blobinfocache/memory/memory.go index 426640366f..f2f1a44924 100644 --- a/pkg/blobinfocache/memory/memory.go +++ b/pkg/blobinfocache/memory/memory.go @@ -121,23 +121,34 @@ func (mem *cache) RecordDigestCompressorName(blobDigest digest.Digest, compresso } // appendReplacementCandidates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates. -func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { - locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: scope, blobDigest: digest}] // nil if not present - for l, t := range locations { - compressorName, compressorKnown := mem.compressors[digest] - if !compressorKnown { - if requireCompressionInfo { - continue - } +func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope *types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime { + compressorName, compressorKnown := mem.compressors[digest] + if !compressorKnown { + if !requireCompressionInfo { compressorName = blobinfocache.UnknownCompression } + } + if scope != nil { + locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: *scope, blobDigest: digest}] // nil if not present + for l, t := range locations { + candidates = append(candidates, prioritize.CandidateWithTime{ + Candidate: blobinfocache.BICReplacementCandidate2{ + Digest: digest, + CompressorName: compressorName, + Location: l, + }, + LastSeen: t, + }) + } + } else { candidates = append(candidates, prioritize.CandidateWithTime{ Candidate: blobinfocache.BICReplacementCandidate2{ - Digest: digest, - CompressorName: compressorName, - Location: l, + Digest: digest, + CompressorName: compressorName, + Location: types.BICLocationReference{Opaque: ""}, + UnknownLocation: true, }, - LastSeen: t, + LastSeen: time.Time{}, }) } return candidates @@ -160,25 +171,45 @@ func (mem *cache) CandidateLocations(transport types.ImageTransport, scope types // data from previous RecordDigestUncompressedPair calls is used to also look up variants of the blob which have the same // uncompressed digest. func (mem *cache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute bool) []blobinfocache.BICReplacementCandidate2 { - return mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, true) + resAllBlobs := []prioritize.CandidateWithTime{} + requireCompressionInfo := true + resultPrioritized := mem.candidateLocations(transport, scope, primaryDigest, canSubstitute, requireCompressionInfo) + var uncompressedDigest digest.Digest // = "" + if uncompressedDigest = mem.uncompressedDigestLocked(primaryDigest); uncompressedDigest != "" { + otherDigests := mem.digestsByUncompressed[uncompressedDigest] // nil if not present in the map + for d := range otherDigests { + if d != primaryDigest && d != uncompressedDigest { + resAllBlobs = append(resAllBlobs, mem.appendReplacementCandidates(resAllBlobs, transport, nil, d, requireCompressionInfo)...) + } + } + if uncompressedDigest != primaryDigest { + resAllBlobs = append(resAllBlobs, mem.appendReplacementCandidates(resAllBlobs, transport, nil, uncompressedDigest, requireCompressionInfo)...) + } + } + resultPrioritizedAllBlobs := prioritize.DestructivelyPrioritizeReplacementCandidates(resAllBlobs, primaryDigest, uncompressedDigest) + // Append `resultPrioritizedAllBlobs` after blobs + // which are generated from bucket with provided `scope` + // as a result `resultPrioritizedAllBlobs` will always + // get lower priority while processing candidates. + return append(resultPrioritized, resultPrioritizedAllBlobs...) } func (mem *cache) candidateLocations(transport types.ImageTransport, scope types.BICTransportScope, primaryDigest digest.Digest, canSubstitute, requireCompressionInfo bool) []blobinfocache.BICReplacementCandidate2 { mem.mutex.Lock() defer mem.mutex.Unlock() res := []prioritize.CandidateWithTime{} - res = mem.appendReplacementCandidates(res, transport, scope, primaryDigest, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, &scope, primaryDigest, requireCompressionInfo) var uncompressedDigest digest.Digest // = "" if canSubstitute { if uncompressedDigest = mem.uncompressedDigestLocked(primaryDigest); uncompressedDigest != "" { otherDigests := mem.digestsByUncompressed[uncompressedDigest] // nil if not present in the map for d := range otherDigests { if d != primaryDigest && d != uncompressedDigest { - res = mem.appendReplacementCandidates(res, transport, scope, d, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, &scope, d, requireCompressionInfo) } } if uncompressedDigest != primaryDigest { - res = mem.appendReplacementCandidates(res, transport, scope, uncompressedDigest, requireCompressionInfo) + res = mem.appendReplacementCandidates(res, transport, &scope, uncompressedDigest, requireCompressionInfo) } } }