Skip to content

Commit

Permalink
Read from redis only if blob size is below limit. (#303)
Browse files Browse the repository at this point in the history
We read from redis even for blobs whose size is above the maxSize, which should therefore not be found there. This will hopefully limit the number of calls and help with the load on redis.
  • Loading branch information
fische authored Apr 22, 2024
1 parent 339b2d5 commit 2a4bb1f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
5 changes: 5 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 11.9.9
--------------
* Read from redis only blob size is below limit (and could be found in
Redis).

Version 11.9.8
--------------
* Allow specifying redis timeout through flags/environment.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.8
11.9.9
72 changes: 44 additions & 28 deletions mettle/worker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@ func (r *elanRedisWrapper) Healthcheck() error {
}

func (r *elanRedisWrapper) ReadBlob(dg *pb.Digest) ([]byte, error) {
if r.limiter.Tokens() >= 1.0 {
cmd := r.readRedis.Get(context.Background(), dg.Hash)
if err := cmd.Err(); err == nil {
blob, _ := cmd.Bytes()
return blob, nil
} else if err != redis.Nil {
log.Warningf("Failed to read blob from Redis: %s", err)
r.limiter.Reserve()
if dg.SizeBytes < r.maxSize {
if r.limiter.Tokens() >= 1.0 {
cmd := r.readRedis.Get(context.Background(), dg.Hash)
if err := cmd.Err(); err == nil {
blob, _ := cmd.Bytes()
return blob, nil
} else if err != redis.Nil {
log.Warningf("Failed to read blob from Redis: %s", err)
r.limiter.Reserve()
}
} else {
log.Warningf("limiter error rate exceeded, skipping Redis lookup")
}
} else {
log.Warningf("limiter error rate exceeded, skipping Redis lookup")
}
// If we get here, the blob didn't exist. Download it then write back to Redis.
blob, err := r.elan.ReadBlob(dg)
Expand Down Expand Up @@ -107,30 +109,36 @@ func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compress
// This is approximate only and assumes that Redis always has a strict subset of the total keys.
missing := make([]*uploadinfo.Entry, 0, len(entries))
uploads := make([]interface{}, 0, 2*len(entries))
keys := make([]string, len(entries))
keys := make([]string, 0, len(entries))
entriesByHash := make(map[string]*uploadinfo.Entry, len(entries))
// Unfortunately there is no MEXISTS, so we reuse MGET but chuck away the values.
// We could also do lots of EXISTS in parallel but this seems simpler...
for i, entry := range entries {
keys[i] = entry.Digest.Hash
for _, entry := range entries {
if entry.Digest.Size < r.maxSize {
keys = append(keys, entry.Digest.Hash)
} else {
missing = append(missing, entry)
}
entriesByHash[entry.Digest.Hash] = entry
}
blobs := r.readBlobs(keys, false)
if blobs == nil {
return r.elan.UploadIfMissing(entries, compressors)
}
for i, blob := range blobs {
if blob == nil {
e := entries[i]
e := entriesByHash[keys[i]]
missing = append(missing, e)
if dg := e.Digest; dg.Size < r.maxSize && dg.Hash != emptyHash {
if e.Contents != nil {
uploads = append(uploads, keys[i], e.Contents)
uploads = append(uploads, e.Digest.Hash, e.Contents)
} else {
b, err := os.ReadFile(e.Path)
if err != nil {
log.Warning("Failed to read file %s: %s", e.Path, err)
continue
}
uploads = append(uploads, keys[i], b)
uploads = append(uploads, e.Digest.Hash, b)
}
}
}
Expand All @@ -154,15 +162,19 @@ func (r *elanRedisWrapper) UploadIfMissing(entries []*uploadinfo.Entry, compress

func (r *elanRedisWrapper) BatchDownload(dgs []digest.Digest) (map[digest.Digest][]byte, error) {
log.Debug("Checking Redis for batch of %d files...", len(dgs))
keys := make([]string, len(dgs))
for i, dg := range dgs {
keys[i] = dg.Hash
keys := make([]string, 0, len(dgs))
missingDigests := make([]digest.Digest, 0, len(dgs))
for _, dg := range dgs {
if dg.Size < r.maxSize {
keys = append(keys, dg.Hash)
} else {
missingDigests = append(missingDigests, dg)
}
}
blobs := r.readBlobs(keys, true)
if blobs == nil {
return r.elan.BatchDownload(dgs)
}
missingDigests := make([]digest.Digest, 0, len(dgs))
ret := make(map[digest.Digest][]byte, len(dgs))
for i, blob := range blobs {
if blob == nil {
Expand Down Expand Up @@ -245,16 +257,20 @@ func (r *elanRedisWrapper) writeBlobs(uploads []interface{}) {
}

func (r *elanRedisWrapper) ReadToFile(dg digest.Digest, filename string, compressed bool) error {
blob, err := r.readRedis.Get(context.Background(), dg.Hash).Bytes()
if err != nil {
if err != redis.Nil { // Not found.
log.Warning("Error reading blob from Redis: %s", err)
if dg.Size < r.maxSize {
blob, err := r.readRedis.Get(context.Background(), dg.Hash).Bytes()
if err != nil {
if err != redis.Nil { // Not found.
log.Warning("Error reading blob from Redis: %s", err)
} else {
redisMisses.Inc()
}
} else {
redisHits.Inc()
return os.WriteFile(filename, blob, 0644)
}
redisMisses.Inc()
return r.elan.ReadToFile(dg, filename, compressed)
}
redisHits.Inc()
return os.WriteFile(filename, blob, 0644)
return r.elan.ReadToFile(dg, filename, compressed)
}

func (r *elanRedisWrapper) GetDirectoryTree(dg *pb.Digest, usePacks bool) ([]*pb.Directory, error) {
Expand Down

0 comments on commit 2a4bb1f

Please sign in to comment.