Skip to content

Commit 2ea0886

Browse files
feat: Add timeout to KV client
1 parent c259b77 commit 2ea0886

File tree

5 files changed

+37
-20
lines changed

5 files changed

+37
-20
lines changed

cmd/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func createKVClient(ctx context.Context,
3838
}
3939
logger.Debugf("Build Cache host: %s", buildCacheHost)
4040

41-
kvClient, err := kv.NewClient(ctx, kv.NewClientParams{
41+
kvClient, err := kv.NewClient(kv.NewClientParams{
4242
UseInsecure: insecureGRPC,
4343
Host: buildCacheHost,
4444
DialTimeout: 5 * time.Second,

internal/build_cache/kv/client.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package kv
22

33
import (
44
"bytes"
5-
"context"
65
"crypto/tls"
76
"errors"
87
"fmt"
@@ -43,16 +42,13 @@ type NewClientParams struct {
4342
CacheOperationID string
4443
}
4544

46-
func NewClient(ctx context.Context, p NewClientParams) (*Client, error) {
47-
ctx, cancel := context.WithTimeout(ctx, p.DialTimeout)
48-
defer cancel()
45+
func NewClient(p NewClientParams) (*Client, error) {
4946
creds := credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})
5047
if p.UseInsecure {
5148
creds = insecure.NewCredentials()
5249
}
53-
transportOpt := grpc.WithTransportCredentials(creds)
54-
// nolint: staticcheck
55-
conn, err := grpc.DialContext(ctx, p.Host, transportOpt)
50+
51+
conn, err := grpc.NewClient(p.Host, grpc.WithTransportCredentials(creds))
5652
if err != nil {
5753
return nil, fmt.Errorf("dial %s: %w", p.Host, err)
5854
}

internal/build_cache/kv/download.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"path/filepath"
1010
"syscall"
11+
"time"
1112

1213
"google.golang.org/grpc/codes"
1314
"google.golang.org/grpc/status"
@@ -79,7 +80,10 @@ func (c *Client) DownloadFile(ctx context.Context, filePath, key string, fileMod
7980
}
8081

8182
func (c *Client) DownloadStream(ctx context.Context, destination io.Writer, key string) error {
82-
kvReader, err := c.Get(ctx, key)
83+
timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
84+
defer cancel()
85+
86+
kvReader, err := c.InitiateGet(timeoutCtx, key)
8387
if err != nil {
8488
return fmt.Errorf("create kv get client (with key %s): %w", key, err)
8589
}

internal/build_cache/kv/methods.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"strings"
9+
"time"
910

1011
remoteexecution "github.com/bitrise-io/bitrise-build-cache-cli/proto/build/bazel/remote/execution/v2"
1112
"github.com/dustin/go-humanize"
@@ -25,22 +26,25 @@ type FileDigest struct {
2526
}
2627

2728
func (c *Client) GetCapabilities(ctx context.Context) error {
28-
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())
29+
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
30+
defer cancel()
31+
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
2932

30-
_, err := c.capabilitiesClient.GetCapabilities(ctx, &remoteexecution.GetCapabilitiesRequest{})
33+
_, err := c.capabilitiesClient.GetCapabilities(callCtx, &remoteexecution.GetCapabilitiesRequest{})
3134
if err != nil {
3235
return fmt.Errorf("get capabilities: %w", err)
3336
}
3437

3538
return nil
3639
}
3740

38-
func (c *Client) Put(ctx context.Context, params PutParams) (io.WriteCloser, error) {
41+
func (c *Client) InitiatePut(ctx context.Context, params PutParams) (io.WriteCloser, error) {
3942
md := metadata.Join(c.getMethodCallMetadata(), metadata.Pairs(
4043
"x-flare-blob-validation-sha256", params.Sha256Sum,
4144
"x-flare-blob-validation-level", "error",
4245
"x-flare-no-skip-duplicate-writes", "true",
4346
))
47+
// Timeout is the responsibility of the caller
4448
ctx = metadata.NewOutgoingContext(ctx, md)
4549

4650
stream, err := c.bitriseKVClient.Put(ctx)
@@ -58,9 +62,10 @@ func (c *Client) Put(ctx context.Context, params PutParams) (io.WriteCloser, err
5862
}, nil
5963
}
6064

61-
func (c *Client) Get(ctx context.Context, name string) (io.ReadCloser, error) {
65+
func (c *Client) InitiateGet(ctx context.Context, name string) (io.ReadCloser, error) {
6266
resourceName := fmt.Sprintf("kv/%s", name)
6367

68+
// Timeout is the responsibility of the caller
6469
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())
6570

6671
readReq := &bytestream.ReadRequest{
@@ -82,14 +87,16 @@ func (c *Client) Get(ctx context.Context, name string) (io.ReadCloser, error) {
8287
func (c *Client) Delete(ctx context.Context, name string) error {
8388
resourceName := fmt.Sprintf("kv/%s", name)
8489

85-
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())
90+
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
91+
defer cancel()
92+
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
8693

8794
readReq := &bytestream.ReadRequest{
8895
ResourceName: resourceName,
8996
ReadOffset: 0,
9097
ReadLimit: 0,
9198
}
92-
_, err := c.bitriseKVClient.Delete(ctx, readReq)
99+
_, err := c.bitriseKVClient.Delete(callCtx, readReq)
93100
if err != nil {
94101
return fmt.Errorf("initiate delete: %w", err)
95102
}
@@ -98,8 +105,6 @@ func (c *Client) Delete(ctx context.Context, name string) error {
98105
}
99106

100107
func (c *Client) FindMissing(ctx context.Context, digests []*FileDigest) ([]*FileDigest, error) {
101-
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())
102-
103108
var missingBlobs []*FileDigest
104109
blobDigests := convertToBlobDigests(digests)
105110
req := &remoteexecution.FindMissingBlobsRequest{
@@ -119,16 +124,25 @@ func (c *Client) FindMissing(ctx context.Context, digests []*FileDigest) ([]*Fil
119124
}
120125
req.BlobDigests = blobDigests[startIndex:endIndex]
121126

127+
timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
128+
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
129+
122130
c.logger.Debugf("Calling FindMissingBlobs for chunk: digests[%d:%d]", startIndex, endIndex)
123-
resp, err := c.casClient.FindMissingBlobs(ctx, req)
131+
resp, err := c.casClient.FindMissingBlobs(callCtx, req)
124132

133+
cancel()
125134
if err != nil {
126135
return nil, fmt.Errorf("find missing blobs[%d:%d]: %w", startIndex, endIndex, err)
127136
}
128137
missingBlobs = append(missingBlobs, convertToFileDigests(resp.GetMissingBlobDigests())...)
129138
}
130139
} else {
131-
resp, err := c.casClient.FindMissingBlobs(ctx, req)
140+
timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
141+
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
142+
143+
resp, err := c.casClient.FindMissingBlobs(callCtx, req)
144+
145+
cancel()
132146

133147
if err != nil {
134148
return nil, fmt.Errorf("find missing blobs: %w", err)

internal/build_cache/kv/upload.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ func (c *Client) uploadFile(ctx context.Context, filePath, key, checksum string)
9393
}
9494

9595
func (c *Client) uploadStream(ctx context.Context, source io.Reader, key, checksum string, size int64) error {
96-
kvWriter, err := c.Put(ctx, PutParams{
96+
timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
97+
defer cancel()
98+
99+
kvWriter, err := c.InitiatePut(timeoutCtx, PutParams{
97100
Name: key,
98101
Sha256Sum: checksum,
99102
FileSize: size,

0 commit comments

Comments
 (0)