From 58da2cc62b9c3e4596d4fdbd7ba3cbbdcc77be5c Mon Sep 17 00:00:00 2001 From: Bryan Moffatt Date: Thu, 25 Jan 2024 09:54:07 -0800 Subject: [PATCH 1/3] Plumb error info to X-ray (#544) * WIP: Plumb error info to x-ray * cleanup * fix xrayException shape - the json tags differ from InvokeResponse_Error * no stack should be empty list instead of nil * gofmt * stable order of the paths array --- lambda/invoke_loop.go | 46 ++++++++++++- lambda/invoke_loop_test.go | 106 ++++++++++++++++++++++++++++++ lambda/runtime_api_client.go | 14 ++-- lambda/runtime_api_client_test.go | 8 +-- 4 files changed, 165 insertions(+), 9 deletions(-) diff --git a/lambda/invoke_loop.go b/lambda/invoke_loop.go index 9e2d6598..338237ea 100644 --- a/lambda/invoke_loop.go +++ b/lambda/invoke_loop.go @@ -104,7 +104,13 @@ func handleInvoke(invoke *invoke, handler *handlerOptions) error { func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) error { errorPayload := safeMarshal(invokeErr) log.Printf("%s", errorPayload) - if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON); err != nil { + + causeForXRay, err := json.Marshal(makeXRayError(invokeErr)) + if err != nil { + return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err) + } + + if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil { return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err) } return nil @@ -166,3 +172,41 @@ func safeMarshal(v interface{}) []byte { } return payload } + +type xrayException struct { + Type string `json:"type"` + Message string `json:"message"` + Stack []*messages.InvokeResponse_Error_StackFrame `json:"stack"` +} + +type xrayError struct { + WorkingDirectory string `json:"working_directory"` + Exceptions []xrayException `json:"exceptions"` + Paths []string `json:"paths"` +} + +func makeXRayError(invokeResponseError *messages.InvokeResponse_Error) *xrayError { + paths := make([]string, 0, len(invokeResponseError.StackTrace)) + visitedPaths := make(map[string]struct{}, len(invokeResponseError.StackTrace)) + for _, frame := range invokeResponseError.StackTrace { + if _, exists := visitedPaths[frame.Path]; !exists { + visitedPaths[frame.Path] = struct{}{} + paths = append(paths, frame.Path) + } + } + + cwd, _ := os.Getwd() + exceptions := []xrayException{{ + Type: invokeResponseError.Type, + Message: invokeResponseError.Message, + Stack: invokeResponseError.StackTrace, + }} + if exceptions[0].Stack == nil { + exceptions[0].Stack = []*messages.InvokeResponse_Error_StackFrame{} + } + return &xrayError{ + WorkingDirectory: cwd, + Paths: paths, + Exceptions: exceptions, + } +} diff --git a/lambda/invoke_loop_test.go b/lambda/invoke_loop_test.go index fab800b9..3374dc2a 100644 --- a/lambda/invoke_loop_test.go +++ b/lambda/invoke_loop_test.go @@ -90,6 +90,110 @@ func TestCustomErrorMarshaling(t *testing.T) { } } +func TestXRayCausePlumbing(t *testing.T) { + errors := []error{ + errors.New("barf"), + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{ + {Label: "yolo", Path: "yolo", Line: 2}, + {Label: "hi", Path: "hello/hello", Line: 12}, + }, + }, + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{ + {Label: "hi", Path: "hello/hello", Line: 12}, + {Label: "hihi", Path: "hello/hello", Line: 13}, + {Label: "yolo", Path: "yolo", Line: 2}, + {Label: "hi", Path: "hello/hello", Line: 14}, + }, + }, + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + StackTrace: []*messages.InvokeResponse_Error_StackFrame{}, + }, + messages.InvokeResponse_Error{ + Type: "yoloError", + Message: "hello yolo", + }, + } + wd, _ := os.Getwd() + expected := []string{ + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "errorString", + "message": "barf", + "stack": [] + }] + }`, + `{ + "working_directory":"` + wd + `", + "paths": ["yolo", "hello/hello"], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [ + {"label": "yolo", "path": "yolo", "line": 2}, + {"label": "hi", "path": "hello/hello", "line": 12} + ] + }] + }`, + `{ + "working_directory":"` + wd + `", + "paths": ["hello/hello", "yolo"], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [ + {"label": "hi", "path": "hello/hello", "line": 12}, + {"label": "hihi", "path": "hello/hello", "line": 13}, + {"label": "yolo", "path": "yolo", "line": 2}, + {"label": "hi", "path": "hello/hello", "line": 14} + ] + }] + }`, + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [] + }] + }`, + `{ + "working_directory":"` + wd + `", + "paths": [], + "exceptions": [{ + "type": "yoloError", + "message": "hello yolo", + "stack": [] + }] + }`, + } + require.Equal(t, len(errors), len(expected)) + ts, record := runtimeAPIServer(``, len(errors)) + defer ts.Close() + n := 0 + handler := NewHandler(func() error { + defer func() { n++ }() + return errors[n] + }) + endpoint := strings.Split(ts.URL, "://")[1] + expectedError := fmt.Sprintf("failed to GET http://%s/2018-06-01/runtime/invocation/next: got unexpected status code: 410", endpoint) + assert.EqualError(t, startRuntimeAPILoop(endpoint, handler), expectedError) + for i := range errors { + assert.JSONEq(t, expected[i], string(record.xrayCauses[i])) + } + +} + func TestRuntimeAPIContextPlumbing(t *testing.T) { handler := NewHandler(func(ctx context.Context) (interface{}, error) { lc, _ := lambdacontext.FromContext(ctx) @@ -271,6 +375,7 @@ type requestRecord struct { nPosts int responses [][]byte contentTypes []string + xrayCauses []string } type eventMetadata struct { @@ -336,6 +441,7 @@ func runtimeAPIServer(eventPayload string, failAfter int, overrides ...eventMeta w.WriteHeader(http.StatusAccepted) record.responses = append(record.responses, response.Bytes()) record.contentTypes = append(record.contentTypes, r.Header.Get("Content-Type")) + record.xrayCauses = append(record.xrayCauses, r.Header.Get(headerXRayErrorCause)) default: w.WriteHeader(http.StatusBadRequest) } diff --git a/lambda/runtime_api_client.go b/lambda/runtime_api_client.go index 84384c29..158bd6b4 100644 --- a/lambda/runtime_api_client.go +++ b/lambda/runtime_api_client.go @@ -22,11 +22,13 @@ const ( headerCognitoIdentity = "Lambda-Runtime-Cognito-Identity" headerClientContext = "Lambda-Runtime-Client-Context" headerInvokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn" + headerXRayErrorCause = "Lambda-Runtime-Function-Xray-Error-Cause" trailerLambdaErrorType = "Lambda-Runtime-Function-Error-Type" trailerLambdaErrorBody = "Lambda-Runtime-Function-Error-Body" contentTypeJSON = "application/json" contentTypeBytes = "application/octet-stream" apiVersion = "2018-06-01" + xrayErrorCauseMaxSize = 1024 * 1024 ) type runtimeAPIClient struct { @@ -57,7 +59,7 @@ type invoke struct { // - An invoke is not complete until next() is called again! func (i *invoke) success(body io.Reader, contentType string) error { url := i.client.baseURL + i.id + "/response" - return i.client.post(url, body, contentType) + return i.client.post(url, body, contentType, nil) } // failure sends the payload to the Runtime API. This marks the function's invoke as a failure. @@ -65,9 +67,9 @@ func (i *invoke) success(body io.Reader, contentType string) error { // - The execution of the function process continues, and is billed, until next() is called again! // - A Lambda Function continues to be re-used for future invokes even after a failure. // If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure() -func (i *invoke) failure(body io.Reader, contentType string) error { +func (i *invoke) failure(body io.Reader, contentType string, causeForXRay []byte) error { url := i.client.baseURL + i.id + "/error" - return i.client.post(url, body, contentType) + return i.client.post(url, body, contentType, causeForXRay) } // next connects to the Runtime API and waits for a new invoke Request to be available. @@ -108,7 +110,7 @@ func (c *runtimeAPIClient) next() (*invoke, error) { }, nil } -func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) error { +func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string, xrayErrorCause []byte) error { b := newErrorCapturingReader(body) req, err := http.NewRequest(http.MethodPost, url, b) if err != nil { @@ -118,6 +120,10 @@ func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) req.Header.Set("User-Agent", c.userAgent) req.Header.Set("Content-Type", contentType) + if xrayErrorCause != nil && len(xrayErrorCause) < xrayErrorCauseMaxSize { + req.Header.Set(headerXRayErrorCause, string(xrayErrorCause)) + } + resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("failed to POST to %s: %v", url, err) diff --git a/lambda/runtime_api_client_test.go b/lambda/runtime_api_client_test.go index 7ccd47fb..3f41403f 100644 --- a/lambda/runtime_api_client_test.go +++ b/lambda/runtime_api_client_test.go @@ -92,7 +92,7 @@ func TestClientDoneAndError(t *testing.T) { assert.NoError(t, err) }) t.Run(fmt.Sprintf("happy Error with payload[%d]", i), func(t *testing.T) { - err := invoke.failure(bytes.NewReader(payload), contentTypeJSON) + err := invoke.failure(bytes.NewReader(payload), contentTypeJSON, nil) assert.NoError(t, err) }) } @@ -105,7 +105,7 @@ func TestInvalidRequestsForMalformedEndpoint(t *testing.T) { require.Error(t, err) err = (&invoke{client: newRuntimeAPIClient("🚨")}).success(nil, "") require.Error(t, err) - err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "") + err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "", nil) require.Error(t, err) } @@ -145,7 +145,7 @@ func TestStatusCodes(t *testing.T) { require.NoError(t, err) }) t.Run("failure should not error", func(t *testing.T) { - err := invoke.failure(nil, "") + err := invoke.failure(nil, "", nil) require.NoError(t, err) }) } else { @@ -158,7 +158,7 @@ func TestStatusCodes(t *testing.T) { } }) t.Run("failure should error", func(t *testing.T) { - err := invoke.failure(nil, "") + err := invoke.failure(nil, "", nil) require.Error(t, err) if i != 301 && i != 302 && i != 303 { assert.Contains(t, err.Error(), "unexpected status code") From eae55b8c04c5836edbfc5135e17a758495183ff1 Mon Sep 17 00:00:00 2001 From: Luke Young <91491244+lyoung-confluent@users.noreply.github.com> Date: Mon, 29 Jan 2024 13:15:37 -0800 Subject: [PATCH 2/3] Add S3 Batch Operations 2.0 Schema Fields to S3BatchJob/S3BatchJobTask (#550) * Add S3 Batch Operations 2.0 Schema Fields * Create s3-batch-job-event-request-2.0.json * Update s3_batch_job_test.go * Rename s3-batch-job-event-request.json to s3-batch-job-event-request-1.0.json * Fix tests * Split into S3BatchJobV2 --- events/s3_batch_job.go | 22 +++++++++++++++++++ events/s3_batch_job_test.go | 22 ++++++++++++++++++- ...on => s3-batch-job-event-request-1.0.json} | 2 +- .../s3-batch-job-event-request-2.0.json | 19 ++++++++++++++++ 4 files changed, 63 insertions(+), 2 deletions(-) rename events/testdata/{s3-batch-job-event-request.json => s3-batch-job-event-request-1.0.json} (99%) create mode 100644 events/testdata/s3-batch-job-event-request-2.0.json diff --git a/events/s3_batch_job.go b/events/s3_batch_job.go index f2626edd..183ec907 100644 --- a/events/s3_batch_job.go +++ b/events/s3_batch_job.go @@ -23,6 +23,28 @@ type S3BatchJobTask struct { S3BucketARN string `json:"s3BucketArn"` } +// S3BatchJobEventV2 encapsulates the detail of a s3 batch job +type S3BatchJobEventV2 struct { + InvocationSchemaVersion string `json:"invocationSchemaVersion"` + InvocationID string `json:"invocationId"` + Job S3BatchJobV2 `json:"job"` + Tasks []S3BatchJobTaskV2 `json:"tasks"` +} + +// S3BatchJobV2 whichs have the job id +type S3BatchJobV2 struct { + ID string `json:"id"` + UserArguments map[string]string `json:"userArguments"` +} + +// S3BatchJobTaskV2 represents one task in the s3 batch job and have all task details +type S3BatchJobTaskV2 struct { + TaskID string `json:"taskId"` + S3Key string `json:"s3Key"` + S3VersionID string `json:"s3VersionId"` + S3Bucket string `json:"s3Bucket"` +} + // S3BatchJobResponse is the response of a iven s3 batch job with the results type S3BatchJobResponse struct { InvocationSchemaVersion string `json:"invocationSchemaVersion"` diff --git a/events/s3_batch_job_test.go b/events/s3_batch_job_test.go index 802d38b2..3067c256 100644 --- a/events/s3_batch_job_test.go +++ b/events/s3_batch_job_test.go @@ -13,7 +13,7 @@ import ( func TestS3BatchJobEventMarshaling(t *testing.T) { // 1. read JSON from file - inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request.json") + inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request-1.0.json") // 2. de-serialize into Go object var inputEvent S3BatchJobEvent @@ -31,6 +31,26 @@ func TestS3BatchJobEventMarshaling(t *testing.T) { assert.JSONEq(t, string(inputJSON), string(outputJSON)) } +func TestS3BatchJobEventV2Marshaling(t *testing.T) { + // 1. read JSON from file + inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request-2.0.json") + + // 2. de-serialize into Go object + var inputEvent S3BatchJobEventV2 + if err := json.Unmarshal(inputJSON, &inputEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + // 3. serialize to JSON + outputJSON, err := json.Marshal(inputEvent) + if err != nil { + t.Errorf("could not marshal event. details: %v", err) + } + + // 4. check result + assert.JSONEq(t, string(inputJSON), string(outputJSON)) +} + func TestS3BatchJobResponseMarshaling(t *testing.T) { // 1. read JSON from file diff --git a/events/testdata/s3-batch-job-event-request.json b/events/testdata/s3-batch-job-event-request-1.0.json similarity index 99% rename from events/testdata/s3-batch-job-event-request.json rename to events/testdata/s3-batch-job-event-request-1.0.json index 210173f4..4c3038ec 100644 --- a/events/testdata/s3-batch-job-event-request.json +++ b/events/testdata/s3-batch-job-event-request-1.0.json @@ -12,4 +12,4 @@ "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket" } ] -} \ No newline at end of file +} diff --git a/events/testdata/s3-batch-job-event-request-2.0.json b/events/testdata/s3-batch-job-event-request-2.0.json new file mode 100644 index 00000000..738339cc --- /dev/null +++ b/events/testdata/s3-batch-job-event-request-2.0.json @@ -0,0 +1,19 @@ +{ + "invocationSchemaVersion": "2.0", + "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", + "job": { + "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce", + "userArguments": { + "k1": "v1", + "k2": "v2" + } + }, + "tasks": [ + { + "taskId": "dGFza2lkZ29lc2hlcmUK", + "s3Key": "customerImage1.jpg", + "s3VersionId": "jbo9_jhdPEyB4RrmOxWS0kU0EoNrU_oI", + "s3Bucket": "awsexamplebucket" + } + ] +} From de51f6811a68226f2238b77fd74b84df29c446f4 Mon Sep 17 00:00:00 2001 From: KIDANI Akito Date: Sun, 4 Feb 2024 10:19:36 +0900 Subject: [PATCH 3/3] Fix compile error in sample code for S3ObjectLambdaEvent (#551) --- events/README_S3_Object_Lambda.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/events/README_S3_Object_Lambda.md b/events/README_S3_Object_Lambda.md index 2166cb91..f9ae1a73 100644 --- a/events/README_S3_Object_Lambda.md +++ b/events/README_S3_Object_Lambda.md @@ -21,7 +21,7 @@ import ( ) func handler(ctx context.Context, event events.S3ObjectLambdaEvent) error { - url := event.GetObjectContext.InputS3Url + url := event.GetObjectContext.InputS3URL resp, err := http.Get(url) if err != nil { return err