Skip to content

Commit

Permalink
Merge pull request #74 from bitrise-io/timeout
Browse files Browse the repository at this point in the history
feat: Add timeout to KV client
  • Loading branch information
zsolt-marta-bitrise authored Dec 12, 2024
2 parents fe6371b + ba3a397 commit 8102e5c
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func createKVClient(ctx context.Context,
}
logger.Debugf("Build Cache host: %s", buildCacheHost)

kvClient, err := kv.NewClient(ctx, kv.NewClientParams{
kvClient, err := kv.NewClient(kv.NewClientParams{
UseInsecure: insecureGRPC,
Host: buildCacheHost,
DialTimeout: 5 * time.Second,
Expand Down
10 changes: 3 additions & 7 deletions internal/build_cache/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kv

import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
Expand Down Expand Up @@ -43,16 +42,13 @@ type NewClientParams struct {
CacheOperationID string
}

func NewClient(ctx context.Context, p NewClientParams) (*Client, error) {
ctx, cancel := context.WithTimeout(ctx, p.DialTimeout)
defer cancel()
func NewClient(p NewClientParams) (*Client, error) {
creds := credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})
if p.UseInsecure {
creds = insecure.NewCredentials()
}
transportOpt := grpc.WithTransportCredentials(creds)
// nolint: staticcheck
conn, err := grpc.DialContext(ctx, p.Host, transportOpt)

conn, err := grpc.NewClient(p.Host, grpc.WithTransportCredentials(creds))
if err != nil {
return nil, fmt.Errorf("dial %s: %w", p.Host, err)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/build_cache/kv/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"syscall"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -79,7 +80,10 @@ func (c *Client) DownloadFile(ctx context.Context, filePath, key string, fileMod
}

func (c *Client) DownloadStream(ctx context.Context, destination io.Writer, key string) error {
kvReader, err := c.Get(ctx, key)
timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

kvReader, err := c.InitiateGet(timeoutCtx, key)
if err != nil {
return fmt.Errorf("create kv get client (with key %s): %w", key, err)
}
Expand Down
34 changes: 24 additions & 10 deletions internal/build_cache/kv/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"strings"
"time"

remoteexecution "github.com/bitrise-io/bitrise-build-cache-cli/proto/build/bazel/remote/execution/v2"
"github.com/dustin/go-humanize"
Expand All @@ -25,22 +26,25 @@ type FileDigest struct {
}

func (c *Client) GetCapabilities(ctx context.Context) error {
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())

_, err := c.capabilitiesClient.GetCapabilities(ctx, &remoteexecution.GetCapabilitiesRequest{})
_, err := c.capabilitiesClient.GetCapabilities(callCtx, &remoteexecution.GetCapabilitiesRequest{})
if err != nil {
return fmt.Errorf("get capabilities: %w", err)
}

return nil
}

func (c *Client) Put(ctx context.Context, params PutParams) (io.WriteCloser, error) {
func (c *Client) InitiatePut(ctx context.Context, params PutParams) (io.WriteCloser, error) {
md := metadata.Join(c.getMethodCallMetadata(), metadata.Pairs(
"x-flare-blob-validation-sha256", params.Sha256Sum,
"x-flare-blob-validation-level", "error",
"x-flare-no-skip-duplicate-writes", "true",
))
// Timeout is the responsibility of the caller
ctx = metadata.NewOutgoingContext(ctx, md)

stream, err := c.bitriseKVClient.Put(ctx)
Expand All @@ -58,9 +62,10 @@ func (c *Client) Put(ctx context.Context, params PutParams) (io.WriteCloser, err
}, nil
}

func (c *Client) Get(ctx context.Context, name string) (io.ReadCloser, error) {
func (c *Client) InitiateGet(ctx context.Context, name string) (io.ReadCloser, error) {
resourceName := fmt.Sprintf("kv/%s", name)

// Timeout is the responsibility of the caller
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())

readReq := &bytestream.ReadRequest{
Expand All @@ -82,14 +87,16 @@ func (c *Client) Get(ctx context.Context, name string) (io.ReadCloser, error) {
func (c *Client) Delete(ctx context.Context, name string) error {
resourceName := fmt.Sprintf("kv/%s", name)

ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())

readReq := &bytestream.ReadRequest{
ResourceName: resourceName,
ReadOffset: 0,
ReadLimit: 0,
}
_, err := c.bitriseKVClient.Delete(ctx, readReq)
_, err := c.bitriseKVClient.Delete(callCtx, readReq)
if err != nil {
return fmt.Errorf("initiate delete: %w", err)
}
Expand All @@ -98,8 +105,6 @@ func (c *Client) Delete(ctx context.Context, name string) error {
}

func (c *Client) FindMissing(ctx context.Context, digests []*FileDigest) ([]*FileDigest, error) {
ctx = metadata.NewOutgoingContext(ctx, c.getMethodCallMetadata())

var missingBlobs []*FileDigest
blobDigests := convertToBlobDigests(digests)
req := &remoteexecution.FindMissingBlobsRequest{
Expand All @@ -119,16 +124,25 @@ func (c *Client) FindMissing(ctx context.Context, digests []*FileDigest) ([]*Fil
}
req.BlobDigests = blobDigests[startIndex:endIndex]

timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())

c.logger.Debugf("Calling FindMissingBlobs for chunk: digests[%d:%d]", startIndex, endIndex)
resp, err := c.casClient.FindMissingBlobs(ctx, req)
resp, err := c.casClient.FindMissingBlobs(callCtx, req)

cancel()
if err != nil {
return nil, fmt.Errorf("find missing blobs[%d:%d]: %w", startIndex, endIndex, err)
}
missingBlobs = append(missingBlobs, convertToFileDigests(resp.GetMissingBlobDigests())...)
}
} else {
resp, err := c.casClient.FindMissingBlobs(ctx, req)
timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())

resp, err := c.casClient.FindMissingBlobs(callCtx, req)

cancel()

if err != nil {
return nil, fmt.Errorf("find missing blobs: %w", err)
Expand Down
5 changes: 4 additions & 1 deletion internal/build_cache/kv/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (c *Client) uploadFile(ctx context.Context, filePath, key, checksum string)
}

func (c *Client) uploadStream(ctx context.Context, source io.Reader, key, checksum string, size int64) error {
kvWriter, err := c.Put(ctx, PutParams{
timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

kvWriter, err := c.InitiatePut(timeoutCtx, PutParams{
Name: key,
Sha256Sum: checksum,
FileSize: size,
Expand Down

0 comments on commit 8102e5c

Please sign in to comment.