Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Usage telemetry infrastructure #1780

Merged
merged 10 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,11 @@ This information is passed along when the library makes calls to the Stripe
API. Note that while `Name` is always required, `URL` and `Version` are
optional.

### Request latency telemetry
### Telemetry

By default, the library sends request latency telemetry to Stripe. These
numbers help Stripe improve the overall latency of its API for all users.
By default, the library sends telemetry to Stripe regarding request latency and feature usage. These
numbers help Stripe improve the overall latency of its API for all users, and
improve popular features.

You can disable this behavior if you prefer:

Expand Down
8 changes: 8 additions & 0 deletions params.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ type Params struct {
// account instead of under the account of the owner of the configured
// Stripe key.
StripeAccount *string `form:"-"` // Passed as header

usage []string `form:"-"` // Tracked behaviors
}

// AddExpand on the Params embedded struct is deprecated.
Expand All @@ -210,6 +212,12 @@ func (p *Params) AddExpand(f string) {
p.Expand = append(p.Expand, &f)
}

// InternalSetUsage sets the usage field on the Params struct.
// Unstable: for internal stripe-go usage only.
func (p *Params) InternalSetUsage(usage []string) {
p.usage = usage
}

// AddExtra adds a new arbitrary key-value pair to the request data
func (p *Params) AddExtra(key, value string) {
if p.Extra == nil {
Expand Down
123 changes: 91 additions & 32 deletions stripe.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ type APIResponse struct {

// StatusCode is a status code as integer. e.g. 200
StatusCode int

duration *time.Duration
}

// StreamingAPIResponse encapsulates some common features of a response from the
Expand All @@ -122,27 +124,30 @@ type StreamingAPIResponse struct {
RequestID string
Status string
StatusCode int
duration *time.Duration
}

func newAPIResponse(res *http.Response, resBody []byte) *APIResponse {
func newAPIResponse(res *http.Response, resBody []byte, requestDuration *time.Duration) *APIResponse {
return &APIResponse{
Header: res.Header,
IdempotencyKey: res.Header.Get("Idempotency-Key"),
RawJSON: resBody,
RequestID: res.Header.Get("Request-Id"),
Status: res.Status,
StatusCode: res.StatusCode,
duration: requestDuration,
}
}

func newStreamingAPIResponse(res *http.Response, body io.ReadCloser) *StreamingAPIResponse {
func newStreamingAPIResponse(res *http.Response, body io.ReadCloser, requestDuration *time.Duration) *StreamingAPIResponse {
return &StreamingAPIResponse{
Header: res.Header,
IdempotencyKey: res.Header.Get("Idempotency-Key"),
Body: body,
RequestID: res.Header.Get("Request-Id"),
Status: res.Status,
StatusCode: res.StatusCode,
duration: requestDuration,
}
}

Expand Down Expand Up @@ -275,6 +280,38 @@ type BackendImplementation struct {
requestMetricsBuffer chan requestMetrics
}

type metricsResponseSetter struct {
LastResponseSetter
backend *BackendImplementation
params *Params
}

func (s *metricsResponseSetter) SetLastResponse(response *APIResponse) {
var usage []string
if s.params != nil {
usage = s.params.usage
}
s.backend.maybeEnqueueTelemetryMetrics(response.RequestID, response.duration, usage)
s.LastResponseSetter.SetLastResponse(response)
}

func (s *metricsResponseSetter) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, s.LastResponseSetter)
}

type streamingLastResponseSetterWrapper struct {
StreamingLastResponseSetter
f func(*StreamingAPIResponse)
}

func (l *streamingLastResponseSetterWrapper) SetLastResponse(response *StreamingAPIResponse) {
l.f(response)
l.StreamingLastResponseSetter.SetLastResponse(response)
}
func (l *streamingLastResponseSetterWrapper) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, l.StreamingLastResponseSetter)
}

func extractParams(params ParamsContainer) (*form.Values, *Params, error) {
var formValues *form.Values
var commonParams *Params
Expand Down Expand Up @@ -346,7 +383,18 @@ func (s *BackendImplementation) CallStreaming(method, path, key string, params P
return err
}

if err := s.DoStreaming(req, bodyBuffer, v); err != nil {
responseSetter := streamingLastResponseSetterWrapper{
v,
func(response *StreamingAPIResponse) {
var usage []string
if commonParams != nil {
usage = commonParams.usage
}
s.maybeEnqueueTelemetryMetrics(response.RequestID, response.duration, usage)
},
}

if err := s.DoStreaming(req, bodyBuffer, &responseSetter); err != nil {
return err
}

Expand Down Expand Up @@ -388,7 +436,13 @@ func (s *BackendImplementation) CallRaw(method, path, key string, form *form.Val
return err
}

if err := s.Do(req, bodyBuffer, v); err != nil {
responseSetter := metricsResponseSetter{
LastResponseSetter: v,
backend: s,
params: params,
}

if err := s.Do(req, bodyBuffer, &responseSetter); err != nil {
return err
}

Expand Down Expand Up @@ -468,22 +522,27 @@ func (s *BackendImplementation) maybeSetTelemetryHeader(req *http.Request) {
}
}

func (s *BackendImplementation) maybeEnqueueTelemetryMetrics(res *http.Response, requestDuration time.Duration) {
if s.enableTelemetry && res != nil {
reqID := res.Header.Get("Request-Id")
if len(reqID) > 0 {
metrics := requestMetrics{
RequestDurationMS: int(requestDuration / time.Millisecond),
RequestID: reqID,
}

// If the metrics buffer is full, discard the new metrics. Otherwise, add
// them to the buffer.
select {
case s.requestMetricsBuffer <- metrics:
default:
}
}
func (s *BackendImplementation) maybeEnqueueTelemetryMetrics(requestID string, requestDuration *time.Duration, usage []string) {
if !s.enableTelemetry || requestID == "" {
return
}
// If there's no duration to report and no usage to report, don't bother
if requestDuration == nil && len(usage) == 0 {
return
}
metrics := requestMetrics{
RequestID: requestID,
}
if requestDuration != nil {
requestDurationMS := int(*requestDuration / time.Millisecond)
metrics.RequestDurationMS = &requestDurationMS
}
if len(usage) > 0 {
metrics.Usage = usage
}
select {
case s.requestMetricsBuffer <- metrics:
default:
}
}

Expand Down Expand Up @@ -541,7 +600,7 @@ func (s *BackendImplementation) requestWithRetriesAndTelemetry(
req *http.Request,
body *bytes.Buffer,
handleResponse func(*http.Response, error) (interface{}, error),
) (*http.Response, interface{}, error) {
) (*http.Response, interface{}, *time.Duration, error) {
s.LeveledLogger.Infof("Requesting %v %v%v", req.Method, req.URL.Host, req.URL.Path)
s.maybeSetTelemetryHeader(req)
var resp *http.Response
Expand Down Expand Up @@ -577,13 +636,11 @@ func (s *BackendImplementation) requestWithRetriesAndTelemetry(
time.Sleep(sleepDuration)
}

s.maybeEnqueueTelemetryMetrics(resp, requestDuration)

if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

return resp, result, nil
return resp, result, &requestDuration, nil
}

func (s *BackendImplementation) logError(statusCode int, err error) {
Expand Down Expand Up @@ -645,11 +702,11 @@ func (s *BackendImplementation) DoStreaming(req *http.Request, body *bytes.Buffe
return res.Body, err
}

resp, result, err := s.requestWithRetriesAndTelemetry(req, body, handleResponse)
resp, result, requestDuration, err := s.requestWithRetriesAndTelemetry(req, body, handleResponse)
if err != nil {
return err
}
v.SetLastResponse(newStreamingAPIResponse(resp, result.(io.ReadCloser)))
v.SetLastResponse(newStreamingAPIResponse(resp, result.(io.ReadCloser), requestDuration))
return nil
}

Expand All @@ -675,14 +732,15 @@ func (s *BackendImplementation) Do(req *http.Request, body *bytes.Buffer, v Last
return resBody, err
}

res, result, err := s.requestWithRetriesAndTelemetry(req, body, handleResponse)
res, result, requestDuration, err := s.requestWithRetriesAndTelemetry(req, body, handleResponse)
if err != nil {
return err
}
resBody := result.([]byte)
s.LeveledLogger.Debugf("Response: %s", string(resBody))

err = s.UnmarshalJSONVerbose(res.StatusCode, resBody, v)
v.SetLastResponse(newAPIResponse(res, resBody))
v.SetLastResponse(newAPIResponse(res, resBody, requestDuration))
return err
}

Expand Down Expand Up @@ -733,7 +791,7 @@ func (s *BackendImplementation) ResponseToError(res *http.Response, resBody []by
}
raw.Error.Err = typedError

raw.Error.SetLastResponse(newAPIResponse(res, resBody))
raw.Error.SetLastResponse(newAPIResponse(res, resBody, nil))

return raw.Error
}
Expand Down Expand Up @@ -1270,8 +1328,9 @@ type stripeClientUserAgent struct {

// requestMetrics contains the id and duration of the last request sent
type requestMetrics struct {
RequestDurationMS int `json:"request_duration_ms"`
RequestID string `json:"request_id"`
RequestDurationMS *int `json:"request_duration_ms"`
RequestID string `json:"request_id"`
Usage []string `json:"usage"`
}

// requestTelemetry contains the payload sent in the
Expand Down
58 changes: 19 additions & 39 deletions stripe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func TestDo_LastResponsePopulated(t *testing.T) {
}

// Test that telemetry metrics are not sent by default
func TestDo_TelemetryDisabled(t *testing.T) {
func TestCall_TelemetryDisabled(t *testing.T) {
type testServerResponse struct {
APIResource
Message string `json:"message"`
Expand Down Expand Up @@ -474,17 +474,8 @@ func TestDo_TelemetryDisabled(t *testing.T) {
// _next_ request via the `X-Stripe-Client-Telemetry header`. To test that
// metrics aren't being sent, we need to fire off two requests in sequence.
for i := 0; i < 2; i++ {
request, err := backend.NewRequest(
http.MethodGet,
"/hello",
"sk_test_123",
"application/x-www-form-urlencoded",
nil,
)
assert.NoError(t, err)

var response testServerResponse
err = backend.Do(request, nil, &response)
err := backend.Call("get", "/hello", "sk_test_xyz", nil, &response)

assert.NoError(t, err)
assert.Equal(t, message, response.Message)
Expand All @@ -496,17 +487,12 @@ func TestDo_TelemetryDisabled(t *testing.T) {

// Test that telemetry metrics are sent on subsequent requests when
// EnableTelemetry = true.
func TestDo_TelemetryEnabled(t *testing.T) {
func TestCall_TelemetryEnabled(t *testing.T) {
type testServerResponse struct {
APIResource
Message string `json:"message"`
}

type requestMetrics struct {
RequestDurationMS int `json:"request_duration_ms"`
RequestID string `json:"request_id"`
}

type requestTelemetry struct {
LastRequestMetrics requestMetrics `json:"last_request_metrics"`
}
Expand All @@ -526,15 +512,19 @@ func TestDo_TelemetryEnabled(t *testing.T) {
case 2:
assert.True(t, len(telemetryStr) > 0, "telemetryStr should not be empty")

// the telemetry should properly unmarshal into RequestTelemetry
var telemetry requestTelemetry
// the telemetry should properly unmarshal into RequestTelemetry
err := json.Unmarshal([]byte(telemetryStr), &telemetry)
assert.NoError(t, err)

// the second request should include the metrics for the first request
assert.Equal(t, telemetry.LastRequestMetrics.RequestID, "req_1")
assert.True(t, telemetry.LastRequestMetrics.RequestDurationMS > 20,
assert.True(t, *telemetry.LastRequestMetrics.RequestDurationMS > 20,
"request_duration_ms should be > 20ms")

// The telemetry in the second request should contain the
// expected usage
assert.Equal(t, telemetry.LastRequestMetrics.Usage, []string{"llama", "bufo"})
default:
assert.Fail(t, "Should not have reached request %v", requestNum)
}
Expand All @@ -560,18 +550,17 @@ func TestDo_TelemetryEnabled(t *testing.T) {
},
).(*BackendImplementation)

type myCreateParams struct {
Params `form:"*"`
Foo string `form:"foo"`
}
params := &myCreateParams{
Foo: "bar",
}
params.InternalSetUsage([]string{"llama", "bufo"})
for i := 0; i < 2; i++ {
request, err := backend.NewRequest(
http.MethodGet,
"/hello",
"sk_test_123",
"application/x-www-form-urlencoded",
nil,
)
assert.NoError(t, err)

var response testServerResponse
err = backend.Do(request, nil, &response)
err := backend.Call("get", "/hello", "sk_test_xyz", params, &response)

assert.NoError(t, err)
assert.Equal(t, message, response.Message)
Expand Down Expand Up @@ -623,17 +612,8 @@ func TestDo_TelemetryEnabledNoDataRace(t *testing.T) {

for i := 0; i < times; i++ {
go func() {
request, err := backend.NewRequest(
http.MethodGet,
"/hello",
"sk_test_123",
"application/x-www-form-urlencoded",
nil,
)
assert.NoError(t, err)

var response testServerResponse
err = backend.Do(request, nil, &response)
err := backend.Call("get", "/hello", "sk_test_xyz", nil, &response)

assert.NoError(t, err)
assert.Equal(t, message, response.Message)
Expand Down
Loading