Skip to content

Commit

Permalink
Add context parameter to Cache.Put
Browse files Browse the repository at this point in the history
Propagate context parameter to Put, just like it is already
done for Get and Contains.

This is preparation for future updates of metrics.go.
  • Loading branch information
ulrfa authored and mostynb committed Mar 31, 2022
1 parent 50efa15 commit 4323662
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Proxy interface {
// `rc` is in the same format as used by the disk.Cache instance.
//
// This is allowed to fail silently (for example when under heavy load).
Put(kind EntryKind, hash string, size int64, rc io.ReadCloser)
Put(ctx context.Context, kind EntryKind, hash string, size int64, rc io.ReadCloser)

// Get returns an io.ReadCloser from which the cache item identified by
// `hash` can be read, its logical size, and an error if something went
Expand Down
6 changes: 3 additions & 3 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Cache interface {
Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (io.ReadCloser, int64, error)
GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error)
GetZstd(ctx context.Context, hash string, size int64, offset int64) (io.ReadCloser, int64, error)
Put(kind cache.EntryKind, hash string, size int64, r io.Reader) error
Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, r io.Reader) error
Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64)
FindMissingCasBlobs(ctx context.Context, blobs []*pb.Digest) ([]*pb.Digest, error)

Expand Down Expand Up @@ -536,7 +536,7 @@ func (c *diskCache) loadExistingFiles() error {
// If `hash` is not the empty string, and the contents don't match it,
// a non-nil error is returned. All data will be read from `r` before
// this function returns.
func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) {
func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) {
defer func() {
if r != nil {
_, _ = io.Copy(ioutil.Discard, r)
Expand Down Expand Up @@ -646,7 +646,7 @@ func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Read
log.Println("Failed to proxy Put:", err)
} else {
// Doesn't block, should be fast.
c.proxy.Put(kind, hash, sizeOnDisk, rc)
c.proxy.Put(ctx, kind, hash, sizeOnDisk, rc)
}
}

Expand Down
53 changes: 31 additions & 22 deletions cache/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestCacheBasics(t *testing.T) {
}

// Add an item.
err = testCache.Put(cache.CAS, hash, itemSize,
err = testCache.Put(ctx, cache.CAS, hash, itemSize,
ioutil.NopCloser(bytes.NewReader(data)))
if err != nil {
t.Fatal(err)
Expand All @@ -98,6 +98,9 @@ func TestCacheBasics(t *testing.T) {
}

func TestCachePutWrongSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cacheDir := tempDir(t)
defer os.RemoveAll(cacheDir)
testCache, err := New(cacheDir, BlockSize, WithAccessLogger(testutils.NewSilentLogger()))
Expand All @@ -109,21 +112,21 @@ func TestCachePutWrongSize(t *testing.T) {
hash := hashStr(content)

for _, kind := range []cache.EntryKind{cache.AC, cache.CAS, cache.RAW} {
err = testCache.Put(kind, hash, int64(len(content)), strings.NewReader(content))
err = testCache.Put(ctx, kind, hash, int64(len(content)), strings.NewReader(content))
if err != nil {
t.Fatal("Expected success", err)
}

err = testCache.Put(kind, hash, int64(len(content))+1, strings.NewReader(content))
err = testCache.Put(ctx, kind, hash, int64(len(content))+1, strings.NewReader(content))
if err == nil {
t.Error("Expected error due to size being different")
}

err = testCache.Put(kind, hash, int64(len(content))-1, strings.NewReader(content))
err = testCache.Put(ctx, kind, hash, int64(len(content))-1, strings.NewReader(content))
if err == nil {
t.Error("Expected error due to size being different")
}
err = testCache.Put(kind, hashStr(content[:len(content)-1]), int64(len(content))-1, strings.NewReader(content))
err = testCache.Put(ctx, kind, hashStr(content[:len(content)-1]), int64(len(content))-1, strings.NewReader(content))
if err == nil {
t.Error("Expected error due to size being different")
}
Expand All @@ -144,7 +147,7 @@ func TestCacheGetContainsWrongSize(t *testing.T) {
var found bool
var rdr io.ReadCloser

err = testCache.Put(cache.CAS, contentsHash, contentsLength, strings.NewReader(contents))
err = testCache.Put(ctx, cache.CAS, contentsHash, contentsLength, strings.NewReader(contents))
if err != nil {
t.Fatal("Expected success", err)
}
Expand Down Expand Up @@ -227,7 +230,7 @@ func TestCacheGetContainsWrongSizeWithProxy(t *testing.T) {
// digest {contentsHash, contentsLength}.
type proxyStub struct{}

func (d proxyStub) Put(kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (d proxyStub) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
// Not implemented.
}

Expand Down Expand Up @@ -294,7 +297,7 @@ func putGetCompareBytes(ctx context.Context, kind cache.EntryKind, hash string,

r := bytes.NewReader(data)

err := testCache.Put(kind, hash, int64(len(data)), r)
err := testCache.Put(ctx, kind, hash, int64(len(data)), r)
if err != nil {
return err
}
Expand Down Expand Up @@ -448,7 +451,7 @@ func TestCacheExistingFiles(t *testing.T) {
continue
}

err = testCache.Put(cache.CAS, hash, int64(len(data)),
err = testCache.Put(ctx, cache.CAS, hash, int64(len(data)),
bytes.NewReader(data))
if err != nil {
t.Fatal("failed to Put CAS blob", hash, err)
Expand Down Expand Up @@ -476,6 +479,9 @@ func TestCacheExistingFiles(t *testing.T) {
// Make sure that the cache returns http.StatusInsufficientStorage when trying to upload an item
// that's bigger than the maximum size.
func TestCacheBlobTooLarge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cacheDir := tempDir(t)
defer os.RemoveAll(cacheDir)
testCacheI, err := New(cacheDir, BlockSize, WithAccessLogger(testutils.NewSilentLogger()))
Expand All @@ -486,7 +492,7 @@ func TestCacheBlobTooLarge(t *testing.T) {

for k := range []cache.EntryKind{cache.AC, cache.RAW} {
kind := cache.EntryKind(k)
err := testCache.Put(kind, hashStr("foo"), 10000, strings.NewReader(contents))
err := testCache.Put(ctx, kind, hashStr("foo"), 10000, strings.NewReader(contents))
if err == nil {
t.Fatal("Expected an error")
}
Expand All @@ -503,6 +509,9 @@ func TestCacheBlobTooLarge(t *testing.T) {

// Make sure that Cache rejects an upload whose hashsum doesn't match
func TestCacheCorruptedCASBlob(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cacheDir := tempDir(t)
defer os.RemoveAll(cacheDir)
testCacheI, err := New(cacheDir, BlockSize, WithAccessLogger(testutils.NewSilentLogger()))
Expand All @@ -511,14 +520,14 @@ func TestCacheCorruptedCASBlob(t *testing.T) {
}
testCache := testCacheI.(*diskCache)

err = testCache.Put(cache.CAS, hashStr("foo"), int64(len(contents)),
err = testCache.Put(ctx, cache.CAS, hashStr("foo"), int64(len(contents)),
strings.NewReader(contents))
if err == nil {
t.Fatal("expected hash mismatch error")
}

// We expect the upload to succeed without validation:
err = testCache.Put(cache.RAW, hashStr("foo"), int64(len(contents)),
err = testCache.Put(ctx, cache.RAW, hashStr("foo"), int64(len(contents)),
strings.NewReader(contents))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -874,7 +883,7 @@ func TestHttpProxyBackend(t *testing.T) {
t.Fatal("Expected empty backend")
}

err = testCache.Put(cache.CAS, casHash, int64(len(blob)),
err = testCache.Put(ctx, cache.CAS, casHash, int64(len(blob)),
bytes.NewReader(blob))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -988,7 +997,7 @@ func TestGetValidatedActionResult(t *testing.T) {
grokHash := sha256.Sum256(grokData)
grokHashStr := hex.EncodeToString(grokHash[:])

err = testCache.Put(cache.CAS, grokHashStr, int64(len(grokData)),
err = testCache.Put(ctx, cache.CAS, grokHashStr, int64(len(grokData)),
bytes.NewReader(grokData))
if err != nil {
t.Fatal(err)
Expand All @@ -998,7 +1007,7 @@ func TestGetValidatedActionResult(t *testing.T) {
fooHash := sha256.Sum256(fooData)
fooHashStr := hex.EncodeToString(fooHash[:])

err = testCache.Put(cache.CAS, fooHashStr, int64(len(fooData)),
err = testCache.Put(ctx, cache.CAS, fooHashStr, int64(len(fooData)),
bytes.NewReader(fooData))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1030,7 +1039,7 @@ func TestGetValidatedActionResult(t *testing.T) {
barDataHash := sha256.Sum256(barData)
barDataHashStr := hex.EncodeToString(barDataHash[:])

err = testCache.Put(cache.CAS, barDataHashStr, int64(len(barData)),
err = testCache.Put(ctx, cache.CAS, barDataHashStr, int64(len(barData)),
bytes.NewReader(barData))
if err != nil {
t.Fatal(err)
Expand All @@ -1055,7 +1064,7 @@ func TestGetValidatedActionResult(t *testing.T) {
rootDataHash := sha256.Sum256(rootData)
rootDataHashStr := hex.EncodeToString(rootDataHash[:])

err = testCache.Put(cache.CAS, rootDataHashStr, int64(len(rootData)),
err = testCache.Put(ctx, cache.CAS, rootDataHashStr, int64(len(rootData)),
bytes.NewReader(rootData))
if err != nil {
t.Fatal(err)
Expand All @@ -1072,7 +1081,7 @@ func TestGetValidatedActionResult(t *testing.T) {
treeDataHash := sha256.Sum256(treeData)
treeDataHashStr := hex.EncodeToString(treeDataHash[:])

err = testCache.Put(cache.CAS, treeDataHashStr, int64(len(treeData)),
err = testCache.Put(ctx, cache.CAS, treeDataHashStr, int64(len(treeData)),
bytes.NewReader(treeData))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1114,7 +1123,7 @@ func TestGetValidatedActionResult(t *testing.T) {
arDataHash := sha256.Sum256([]byte("pretend action"))
arDataHashStr := hex.EncodeToString(arDataHash[:])

err = testCache.Put(cache.AC, arDataHashStr, int64(len(arData)),
err = testCache.Put(ctx, cache.AC, arDataHashStr, int64(len(arData)),
bytes.NewReader(arData))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1158,7 +1167,7 @@ func TestGetWithOffset(t *testing.T) {

data, hash := testutils.RandomDataAndHash(blobSize)

err = testCache.Put(cache.CAS, hash, blobSize,
err = testCache.Put(ctx, cache.CAS, hash, blobSize,
ioutil.NopCloser(bytes.NewReader(data)))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1240,7 +1249,7 @@ func TestMetricsUnvalidatedAC(t *testing.T) {
}
fakeActionHash := "8f279f9d8bc605b4d733d0ba9386de2376004ab628fee6b000144fdc7b30a6a1"

err = testCache.Put(cache.AC, fakeActionHash, int64(len(arData)), bytes.NewReader(arData))
err = testCache.Put(context.Background(), cache.AC, fakeActionHash, int64(len(arData)), bytes.NewReader(arData))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1350,7 +1359,7 @@ func TestMetricsValidatedAC(t *testing.T) {
}
fakeActionHash := "8f279f9d8bc605b4d733d0ba9386de2376004ab628fee6b000144fdc7b30a6a1"

err = testCache.Put(cache.AC, fakeActionHash, int64(len(arData)), bytes.NewReader(arData))
err = testCache.Put(context.Background(), cache.AC, fakeActionHash, int64(len(arData)), bytes.NewReader(arData))
if err != nil {
t.Fatal(err)
}
Expand Down
32 changes: 16 additions & 16 deletions cache/disk/findmissing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type testCWProxy struct {
blob string
}

func (p *testCWProxy) Put(kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (p *testCWProxy) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
}
func (p *testCWProxy) Get(ctx context.Context, kind cache.EntryKind, hash string) (io.ReadCloser, int64, error) {
return nil, -1, nil
Expand Down Expand Up @@ -168,8 +168,8 @@ func NewProxyAdapter(cache Cache) (*proxyAdapter, error) {
}, nil
}

func (p *proxyAdapter) Put(kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
err := p.cache.Put(kind, hash, size, rc)
func (p *proxyAdapter) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
err := p.cache.Put(ctx, kind, hash, size, rc)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -213,8 +213,8 @@ func TestFindMissingCasBlobsWithProxy(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300)
_, digest4 := testutils.RandomDataAndDigest(400)

proxy.Put(cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))

missing, err := testCache.FindMissingCasBlobs(ctx, []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -279,8 +279,8 @@ func TestFindMissingCasBlobsWithProxyFailFast(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300)
_, digest4 := testutils.RandomDataAndDigest(400)

proxy.Put(cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))

blobs := []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -338,10 +338,10 @@ func TestFindMissingCasBlobsWithProxyFailFastNoneMissing(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300)
data4, digest4 := testutils.RandomDataAndDigest(400)

proxy.Put(cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(cache.CAS, digest2.Hash, digest2.SizeBytes, ioutil.NopCloser(bytes.NewReader(data2)))
proxy.Put(cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))
proxy.Put(cache.CAS, digest4.Hash, digest4.SizeBytes, ioutil.NopCloser(bytes.NewReader(data4)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, ioutil.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest4.Hash, digest4.SizeBytes, ioutil.NopCloser(bytes.NewReader(data4)))

blobs := []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -412,9 +412,9 @@ func TestFindMissingCasBlobsWithProxyFailFastMaxProxyBlobSize(t *testing.T) {
data3, digest3 := testutils.RandomDataAndDigest(300) // We expect this blob to not be found.

// Put blobs directly into proxy backend, where it will not be filtered out.
proxy.Put(cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(cache.CAS, digest2.Hash, digest2.SizeBytes, ioutil.NopCloser(bytes.NewReader(data2)))
proxy.Put(cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, ioutil.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest3.Hash, digest3.SizeBytes, ioutil.NopCloser(bytes.NewReader(data3)))

blobs := []*pb.Digest{
&digest1,
Expand Down Expand Up @@ -468,8 +468,8 @@ func TestFindMissingCasBlobsWithProxyMaxProxyBlobSize(t *testing.T) {
data1, digest1 := testutils.RandomDataAndDigest(100)
data2, digest2 := testutils.RandomDataAndDigest(600)

proxy.Put(cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(cache.CAS, digest2.Hash, digest2.SizeBytes, ioutil.NopCloser(bytes.NewReader(data2)))
proxy.Put(ctx, cache.CAS, digest1.Hash, digest1.SizeBytes, ioutil.NopCloser(bytes.NewReader(data1)))
proxy.Put(ctx, cache.CAS, digest2.Hash, digest2.SizeBytes, ioutil.NopCloser(bytes.NewReader(data2)))

missing, err := testCache.FindMissingCasBlobs(ctx, []*pb.Digest{
&digest1,
Expand Down
2 changes: 1 addition & 1 deletion cache/httpproxy/httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func logResponse(logger cache.Logger, method string, code int, url string) {
logger.Printf("HTTP %s %d %s", method, code, url)
}

func (r *remoteHTTPProxyCache) Put(kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (r *remoteHTTPProxyCache) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
if r.uploadQueue == nil {
rc.Close()
return
Expand Down
4 changes: 2 additions & 2 deletions cache/httpproxy/httpproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestEverything(t *testing.T) {

// PUT two different values with the same key in ac and cas.

err = diskCache.Put(cache.AC, hash, int64(len(acData)), bytes.NewReader(acData))
err = diskCache.Put(ctx, cache.AC, hash, int64(len(acData)), bytes.NewReader(acData))
if err != nil {
t.Error(err)
}
Expand All @@ -146,7 +146,7 @@ func TestEverything(t *testing.T) {
}
s.mu.Unlock()

err = diskCache.Put(cache.CAS, hash, int64(len(casData)), bytes.NewReader(casData))
err = diskCache.Put(ctx, cache.CAS, hash, int64(len(casData)), bytes.NewReader(casData))
if err != nil {
t.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cache/s3proxy/s3proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *s3Cache) uploadFile(item uploadReq) {
item.rc.Close()
}

func (c *s3Cache) Put(kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
func (c *s3Cache) Put(ctx context.Context, kind cache.EntryKind, hash string, size int64, rc io.ReadCloser) {
if c.uploadQueue == nil {
rc.Close()
return
Expand Down
Loading

0 comments on commit 4323662

Please sign in to comment.