Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output/cloudv2: Implement RequestMetadata output #3100

Merged
merged 40 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
04b6fee
Implement RequestMetadata output
Blinkuu May 29, 2023
6638db1
Add tracesFlusher tests
Blinkuu May 30, 2023
b330ee5
Add requestMetadatasCollector tests
Blinkuu May 30, 2023
6040bab
Add mappers tests
Blinkuu May 30, 2023
a1f85c8
Add client tests
Blinkuu May 30, 2023
fe1140c
Format flush.go
Blinkuu May 31, 2023
16d9fb6
Fix `TestConfigApply` test
Blinkuu May 31, 2023
0b14184
Add TODO in client.go
Blinkuu May 31, 2023
e25c778
Fix flush_test.go
Blinkuu May 31, 2023
fddded2
Add validation logic in mappers.go
Blinkuu May 31, 2023
a9d9739
Rename `ErrAlreadyInitialized` to `ErrClientAlreadyInitialized`
Blinkuu May 31, 2023
becd581
Fix linter errors
Blinkuu May 31, 2023
dc2ed72
Remove unused `shouldStopSendingMetrics()`
Blinkuu May 31, 2023
fc0a6a2
Add //nolint: nestif directive
Blinkuu May 31, 2023
4fa7f4f
Update proto package
Blinkuu Jun 1, 2023
41cbe21
Fix mappers_test.go
Blinkuu Jun 1, 2023
94be8b4
Update proto
Blinkuu Jun 2, 2023
5f7b124
Set HTTP URL label to `name` tag
Blinkuu Jun 5, 2023
7956a52
Update traces host
Blinkuu Jun 7, 2023
1509fde
Remove `gen.sh`
Blinkuu Jun 12, 2023
e6129b2
Make `MetadataTraceIDKeyName` private again
Blinkuu Jun 12, 2023
3ffad13
Update min TLS version to 1.3
Blinkuu Jun 12, 2023
eb58b6e
Reimplement `CollectRequestMetadatas()``
Blinkuu Jun 13, 2023
0f6448b
Format flush_test.go
Blinkuu Jun 13, 2023
4e677f6
Add //nolint:lll directive
Blinkuu Jun 13, 2023
817b18c
Add copyright notice to common.proto
Blinkuu Jun 13, 2023
7afb9fa
Make internal interfaces private
Blinkuu Jun 19, 2023
456da47
Remove obsolete TODO
Blinkuu Jun 19, 2023
2db5ace
Minimize critical section
Blinkuu Jun 19, 2023
39f3da5
Add shutdown hooks
Blinkuu Jun 19, 2023
b3e8f86
Fix after merge
Blinkuu Jun 19, 2023
127f3a8
Remove argument name from the `flusher` interface
Blinkuu Jun 20, 2023
27cd2bc
Update js/modules/k6/experimental/tracing/client.go
Blinkuu Jun 20, 2023
24602fb
Store insightsClient instead of shutdownHooks
Blinkuu Jun 20, 2023
1910235
Update output/cloud/expv2/output.go
Blinkuu Jun 20, 2023
2b82672
Update output/cloud/expv2/output.go
Blinkuu Jun 20, 2023
117858c
Fix flush_test.go
Blinkuu Jun 20, 2023
029632e
Bring back 5s timeout in config_test.go
Blinkuu Jun 22, 2023
3fbac3e
Remove redundant validation logic
Blinkuu Jun 22, 2023
6ffe102
Rollback changes in metric.pb.go
Blinkuu Jun 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"gopkg.in/guregu/null.v3"

"github.com/mstoykov/envconfig"

"go.k6.io/k6/lib/types"
)

Expand Down Expand Up @@ -40,6 +41,15 @@ type Config struct {
// This is how many concurrent pushes will be done at the same time to the cloud
MetricPushConcurrency null.Int `json:"metricPushConcurrency" envconfig:"K6_CLOUD_METRIC_PUSH_CONCURRENCY"`

// Indicates whether to send traces to the k6 Insights backend service.
TracesEnabled null.Bool `json:"tracesEnabled" envconfig:"K6_CLOUD_TRACES_ENABLED"`

// The host of the k6 Insights backend service.
TracesHost null.String `json:"traceHost" envconfig:"K6_CLOUD_TRACES_HOST"`

// The time interval between periodic API calls for sending samples to the cloud ingest service.
TracesPushInterval types.NullDuration `json:"tracesPushInterval" envconfig:"K6_CLOUD_TRACES_PUSH_INTERVAL"`

// Aggregation docs:
//
// If AggregationPeriod is specified and if it is greater than 0, HTTP metric aggregation
Expand Down Expand Up @@ -145,11 +155,16 @@ type Config struct {
// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
return Config{
Host: null.NewString("https://ingest.k6.io", false),
LogsTailURL: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false),
WebAppURL: null.NewString("https://app.k6.io", false),
MetricPushInterval: types.NewNullDuration(1*time.Second, false),
MetricPushConcurrency: null.NewInt(1, false),
Host: null.NewString("https://ingest.k6.io", false),
LogsTailURL: null.NewString("wss://cloudlogs.k6.io/api/v1/tail", false),
WebAppURL: null.NewString("https://app.k6.io", false),
MetricPushInterval: types.NewNullDuration(1*time.Second, false),
MetricPushConcurrency: null.NewInt(1, false),

TracesEnabled: null.NewBool(false, false),
TracesHost: null.NewString("insights.k6.io:4443", false),
TracesPushInterval: types.NewNullDuration(1*time.Second, false),

MaxMetricSamplesPerPackage: null.NewInt(100000, false),
Timeout: types.NewNullDuration(1*time.Minute, false),
APIVersion: null.NewInt(1, false),
Expand Down Expand Up @@ -216,6 +231,15 @@ func (c Config) Apply(cfg Config) Config {
if cfg.MetricPushConcurrency.Valid {
c.MetricPushConcurrency = cfg.MetricPushConcurrency
}
if cfg.TracesEnabled.Valid {
c.TracesEnabled = cfg.TracesEnabled
}
if cfg.TracesHost.Valid {
c.TracesHost = cfg.TracesHost
}
if cfg.TracesPushInterval.Valid {
c.TracesPushInterval = cfg.TracesPushInterval
}
if cfg.AggregationPeriod.Valid {
c.AggregationPeriod = cfg.AggregationPeriod
}
Expand Down
5 changes: 4 additions & 1 deletion cloudapi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ func TestConfigApply(t *testing.T) {
ProjectID: null.NewInt(1, true),
Name: null.NewString("Name", true),
Host: null.NewString("Host", true),
Timeout: types.NewNullDuration(5*time.Second, true),
LogsTailURL: null.NewString("LogsTailURL", true),
PushRefID: null.NewString("PushRefID", true),
WebAppURL: null.NewString("foo", true),
NoCompress: null.NewBool(true, true),
StopOnError: null.NewBool(true, true),
Timeout: types.NewNullDuration(5*time.Second, true),
APIVersion: null.NewInt(2, true),
MaxMetricSamplesPerPackage: null.NewInt(2, true),
MetricPushInterval: types.NewNullDuration(1*time.Second, true),
MetricPushConcurrency: null.NewInt(3, true),
TracesEnabled: null.NewBool(true, true),
TracesHost: null.NewString("TracesHost", true),
TracesPushInterval: types.NewNullDuration(1*time.Second, true),
AggregationPeriod: types.NewNullDuration(2*time.Second, true),
AggregationCalcInterval: types.NewNullDuration(3*time.Second, true),
AggregationWaitPeriod: types.NewNullDuration(4*time.Second, true),
Expand Down
207 changes: 207 additions & 0 deletions cloudapi/insights/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package insights

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"go.k6.io/k6/cloudapi/insights/proto/v1/ingester"
)

const (
testRunIDHeader = "X-K6TestRun-Id"
authorizationHeader = "Authorization"
)

var (
// ErrClientAlreadyInitialized is returned when the client is already initialized.
ErrClientAlreadyInitialized = errors.New("insights client already initialized")

// ErrClientClosed is returned when the client is closed.
ErrClientClosed = errors.New("insights client closed")
)

// ClientConfig is the configuration for the client.
type ClientConfig struct {
IngesterHost string
ConnectConfig ClientConnectConfig
AuthConfig ClientAuthConfig
TLSConfig ClientTLSConfig
}

// ClientConnectConfig is the configuration for the client connection.
type ClientConnectConfig struct {
Block bool
FailOnNonTempDialError bool
Dialer func(context.Context, string) (net.Conn, error)
}

// ClientAuthConfig is the configuration for the client authentication.
type ClientAuthConfig struct {
Enabled bool
TestRunID int64
Token string
RequireTransportSecurity bool
}

// ClientTLSConfig is the configuration for the client TLS.
type ClientTLSConfig struct {
Insecure bool
CertFile string
}

// Client is the client for the k6 Insights ingester service.
type Client struct {
cfg ClientConfig
client ingester.IngesterServiceClient
conn *grpc.ClientConn
connMu *sync.RWMutex
}

// NewClient creates a new client.
func NewClient(cfg ClientConfig) *Client {
return &Client{
cfg: cfg,
client: nil,
conn: nil,
connMu: &sync.RWMutex{},
}
}

// Dial creates a client connection using ClientConfig.
func (c *Client) Dial(ctx context.Context) error {
c.connMu.Lock()
defer c.connMu.Unlock()

if c.conn != nil {
return ErrClientAlreadyInitialized
}

opts, err := dialOptionsFromClientConfig(c.cfg)
if err != nil {
return fmt.Errorf("failed to create dial options: %w", err)
}

conn, err := grpc.DialContext(ctx, c.cfg.IngesterHost, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}

c.client = ingester.NewIngesterServiceClient(conn)
c.conn = conn

return nil
}

// IngestRequestMetadatasBatch ingests a batch of request metadatas.
func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadatas RequestMetadatas) error {
c.connMu.RLock()
closed := c.conn == nil
c.connMu.RUnlock()
if closed {
return ErrClientClosed
}

if len(requestMetadatas) < 1 {
return nil
}

req, err := newBatchCreateRequestMetadatasRequest(requestMetadatas)
if err != nil {
return fmt.Errorf("failed to create request from request metadatas: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("failed to create request from request metadatas: %w", err)
return fmt.Errorf("failed to create request from request metadata: %w", err)

Metadata doesn't have the plural, am I missing something? I see it is used all over the code so if we change then keep it consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are aware it doesn't have a plural form. However, together with @vortegatorres we decided to add the s suffix to match the conventions defined in Google API Design Guide. I'd like to keep this as "request metadatas".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not certain which part of the Google API Design Docs apply here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://google.aip.dev/233:

The RPC's name must begin with BatchCreate. The remainder of the RPC name should be the plural form of the resource being created.

We use singular for a single request and plural for batch requests (see here)

}

// TODO(lukasz, retry-support): Retry request with returned metadatas.
//
// Note: There is currently no backend support backing up this retry mechanism.
codebien marked this conversation as resolved.
Show resolved Hide resolved
_, err = c.client.BatchCreateRequestMetadatas(ctx, req)
if err != nil {
st := status.Convert(err)
return fmt.Errorf("failed to ingest request metadatas batch: code=%s, msg=%s", st.Code().String(), st.Message())
}

return nil
}

// Close closes the client.
func (c *Client) Close() error {
c.connMu.Lock()
defer c.connMu.Unlock()

if c.conn == nil {
return ErrClientClosed
}

conn := c.conn
c.client = nil
c.conn = nil

return conn.Close()
}

func dialOptionsFromClientConfig(cfg ClientConfig) ([]grpc.DialOption, error) {
var opts []grpc.DialOption

if cfg.ConnectConfig.Block {
opts = append(opts, grpc.WithBlock())
}

if cfg.ConnectConfig.FailOnNonTempDialError {
opts = append(opts, grpc.FailOnNonTempDialError(true))
}

if cfg.ConnectConfig.Dialer != nil {
opts = append(opts, grpc.WithContextDialer(cfg.ConnectConfig.Dialer))
}

if cfg.TLSConfig.Insecure { //nolint: nestif
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
if cfg.TLSConfig.CertFile != "" {
creds, err := credentials.NewClientTLSFromFile(cfg.TLSConfig.CertFile, "")
if err != nil {
return nil, fmt.Errorf("failed to load TLS credentials from file: %w", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
codebien marked this conversation as resolved.
Show resolved Hide resolved
} else {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS13})))
}
}

if cfg.AuthConfig.Enabled {
opts = append(opts, grpc.WithPerRPCCredentials(newPerRPCCredentials(cfg.AuthConfig)))
}

return opts, nil
}

type perRPCCredentials struct {
metadata map[string]string
requireTransportSecurity bool
}

func newPerRPCCredentials(cfg ClientAuthConfig) perRPCCredentials {
return perRPCCredentials{
metadata: map[string]string{
testRunIDHeader: fmt.Sprintf("%d", cfg.TestRunID),
authorizationHeader: fmt.Sprintf("Token %s", cfg.Token),
},
requireTransportSecurity: cfg.RequireTransportSecurity,
}
}

func (c perRPCCredentials) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
return c.metadata, nil
}

func (c perRPCCredentials) RequireTransportSecurity() bool {
return c.requireTransportSecurity
}
Loading