Skip to content

Commit

Permalink
output/cloudv2: Add retry support to insights client (#3150)
Browse files Browse the repository at this point in the history
* Add retry support to insights client
* Move flushRequestMetadatas to be executed from a flushers pool instead of doing it from a periodic invoke

---------

Co-authored-by: Mihail Stoykov <312246+mstoykov@users.noreply.github.com>
  • Loading branch information
vortegatorres and mstoykov authored Jul 10, 2023
1 parent 9c019a4 commit c995529
Show file tree
Hide file tree
Showing 19 changed files with 1,331 additions and 54 deletions.
13 changes: 10 additions & 3 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Config struct {
// The host of the k6 Insights backend service.
TracesHost null.String `json:"traceHost" envconfig:"K6_CLOUD_TRACES_HOST"`

// This is how many concurrent pushes will be done at the same time to the cloud
TracesPushConcurrency null.Int `json:"tracesPushConcurrency" envconfig:"K6_CLOUD_TRACES_PUSH_CONCURRENCY"`

// The time interval between periodic API calls for sending samples to the cloud ingest service.
TracesPushInterval types.NullDuration `json:"tracesPushInterval" envconfig:"K6_CLOUD_TRACES_PUSH_INTERVAL"`

Expand Down Expand Up @@ -166,9 +169,10 @@ func NewConfig() Config {
MetricPushInterval: types.NewNullDuration(1*time.Second, false),
MetricPushConcurrency: null.NewInt(1, false),

TracesEnabled: null.NewBool(false, false),
TracesHost: null.NewString("insights.k6.io:4443", false),
TracesPushInterval: types.NewNullDuration(1*time.Second, false),
TracesEnabled: null.NewBool(false, false),
TracesHost: null.NewString("insights.k6.io:4443", false),
TracesPushInterval: types.NewNullDuration(1*time.Second, false),
TracesPushConcurrency: null.NewInt(1, false),

MaxMetricSamplesPerPackage: null.NewInt(100000, false),
MaxTimeSeriesInBatch: null.NewInt(10000, false),
Expand Down Expand Up @@ -249,6 +253,9 @@ func (c Config) Apply(cfg Config) Config {
if cfg.TracesPushInterval.Valid {
c.TracesPushInterval = cfg.TracesPushInterval
}
if cfg.TracesPushConcurrency.Valid {
c.TracesPushConcurrency = cfg.TracesPushConcurrency
}
if cfg.AggregationPeriod.Valid {
c.AggregationPeriod = cfg.AggregationPeriod
}
Expand Down
3 changes: 2 additions & 1 deletion cloudapi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestConfigApply(t *testing.T) {
MetricPushConcurrency: null.NewInt(3, true),
TracesEnabled: null.NewBool(true, true),
TracesHost: null.NewString("TracesHost", true),
TracesPushInterval: types.NewNullDuration(1*time.Second, true),
TracesPushInterval: types.NewNullDuration(10*time.Second, true),
TracesPushConcurrency: null.NewInt(6, true),
AggregationPeriod: types.NewNullDuration(2*time.Second, true),
AggregationCalcInterval: types.NewNullDuration(3*time.Second, true),
AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true),
Expand Down
73 changes: 70 additions & 3 deletions cloudapi/insights/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import (
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.k6.io/k6/lib/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
Expand All @@ -32,9 +37,11 @@ var (
// ClientConfig is the configuration for the client.
type ClientConfig struct {
IngesterHost string
Timeout types.NullDuration
ConnectConfig ClientConnectConfig
AuthConfig ClientAuthConfig
TLSConfig ClientTLSConfig
RetryConfig ClientRetryConfig
}

// ClientConnectConfig is the configuration for the client connection.
Expand All @@ -58,6 +65,21 @@ type ClientTLSConfig struct {
CertFile string
}

// ClientRetryConfig is the configuration for the client retries.
type ClientRetryConfig struct {
RetryableStatusCodes string
MaxAttempts uint
PerRetryTimeout time.Duration
BackoffConfig ClientBackoffConfig
}

// ClientBackoffConfig is the configuration for the client retries using the backoff exponential with jitter algorithm.
type ClientBackoffConfig struct {
Enabled bool
JitterFraction float64
WaitBetween time.Duration
}

// Client is the client for the k6 Insights ingester service.
type Client struct {
cfg ClientConfig
Expand Down Expand Up @@ -110,6 +132,9 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat
return ErrClientClosed
}

ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout.TimeDuration())
defer cancel()

if len(requestMetadatas) < 1 {
return nil
}
Expand All @@ -119,9 +144,6 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat
return fmt.Errorf("failed to create request from request metadatas: %w", err)
}

// TODO(lukasz, retry-support): Retry request with returned metadatas.
//
// Note: There is currently no backend support backing up this retry mechanism.
_, err = c.client.BatchCreateRequestMetadatas(ctx, req)
if err != nil {
st := status.Convert(err)
Expand Down Expand Up @@ -180,9 +202,54 @@ func dialOptionsFromClientConfig(cfg ClientConfig) ([]grpc.DialOption, error) {
opts = append(opts, grpc.WithPerRPCCredentials(newPerRPCCredentials(cfg.AuthConfig)))
}

rI, err := retryInterceptor(cfg.RetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to create retry interceptors: %w", err)
}

opts = append(opts, grpc.WithChainUnaryInterceptor([]grpc.UnaryClientInterceptor{rI}...))

return opts, nil
}

func retryInterceptor(retryConfig ClientRetryConfig) (grpc.UnaryClientInterceptor, error) {
rSC, err := retryableStatusCodes(retryConfig.RetryableStatusCodes)
if err != nil {
return nil, fmt.Errorf("failed to parse retryable status codes: %w", err)
}
withCodes := grpcRetry.WithCodes(rSC...)
withMax := grpcRetry.WithMax(retryConfig.MaxAttempts)
withPerRetryTimeout := grpcRetry.WithPerRetryTimeout(retryConfig.PerRetryTimeout)
callOptions := []grpcRetry.CallOption{withCodes, withMax, withPerRetryTimeout}

backoffConfig := retryConfig.BackoffConfig
if backoffConfig.Enabled {
backoff := grpcRetry.WithBackoff(
grpcRetry.BackoffExponentialWithJitter(backoffConfig.WaitBetween, backoffConfig.JitterFraction))
callOptions = append(callOptions, backoff)
}

unaryInterceptor := grpcRetry.UnaryClientInterceptor(callOptions...)
return unaryInterceptor, nil
}

func retryableStatusCodes(retryableStatusCodes string) ([]codes.Code, error) {
if len(retryableStatusCodes) == 0 {
return nil, fmt.Errorf("no retryable status codes provided")
}

statusCodes := strings.Split(retryableStatusCodes, ",")
errorCodes := make([]codes.Code, len(statusCodes))
for i, code := range statusCodes {
err := errorCodes[i].UnmarshalJSON([]byte(code))
if err != nil {
return nil, fmt.Errorf("invalid status code %s provided", code)
}
}

return errorCodes, nil
}

type perRPCCredentials struct {
metadata map[string]string
requireTransportSecurity bool
Expand Down
Loading

0 comments on commit c995529

Please sign in to comment.