diff --git a/modelwriter.go b/modelwriter.go index 2d7d2825b..cef59c0eb 100644 --- a/modelwriter.go +++ b/modelwriter.go @@ -48,7 +48,7 @@ type modelWriter struct { // writeTransaction encodes tx as JSON to the buffer, and then resets tx. func (w *modelWriter) writeTransaction(tx *Transaction, td *TransactionData) { var modelTx model.Transaction - w.buildModelTransaction(&modelTx, tx, td) + BuildModelTransaction(&modelTx, tx, td) w.json.RawString(`{"transaction":`) modelTx.MarshalFastJSON(&w.json) w.json.RawByte('}') @@ -103,39 +103,6 @@ func (w *modelWriter) writeMetrics(m *Metrics) { m.reset() } -func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transaction, td *TransactionData) { - out.ID = model.SpanID(tx.traceContext.Span) - out.TraceID = model.TraceID(tx.traceContext.Trace) - sampled := tx.traceContext.Options.Recorded() - if !sampled { - out.Sampled = ¬Sampled - } - if tx.traceContext.State.haveSampleRate { - out.SampleRate = &tx.traceContext.State.sampleRate - } - - out.ParentID = model.SpanID(tx.parentID) - out.Name = truncateString(td.Name) - out.Type = truncateString(td.Type) - out.Result = truncateString(td.Result) - out.Outcome = normalizeOutcome(td.Outcome) - out.Timestamp = model.Time(td.timestamp.UTC()) - out.Duration = td.Duration.Seconds() * 1000 - out.SpanCount.Started = td.spansCreated - out.SpanCount.Dropped = td.spansDropped - out.OTel = td.Context.otel - for _, sl := range td.links { - out.Links = append(out.Links, model.SpanLink{TraceID: model.TraceID(sl.Trace), SpanID: model.SpanID(sl.Span)}) - } - if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 { - out.DroppedSpansStats = dss - } - - if sampled { - out.Context = td.Context.build() - } -} - func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData) { w.modelStacktrace = w.modelStacktrace[:0] out.ID = model.SpanID(span.traceContext.Span) @@ -258,6 +225,40 @@ func (w *modelWriter) buildModelError(out *model.Error, e *ErrorData) { out.Culprit = truncateString(out.Culprit) } +// BuildModelTransaction converts apm transaction to model transaction +func BuildModelTransaction(out *model.Transaction, tx *Transaction, td *TransactionData) { + out.ID = model.SpanID(tx.traceContext.Span) + out.TraceID = model.TraceID(tx.traceContext.Trace) + sampled := tx.traceContext.Options.Recorded() + if !sampled { + out.Sampled = ¬Sampled + } + if tx.traceContext.State.haveSampleRate { + out.SampleRate = &tx.traceContext.State.sampleRate + } + + out.ParentID = model.SpanID(tx.parentID) + out.Name = truncateString(td.Name) + out.Type = truncateString(td.Type) + out.Result = truncateString(td.Result) + out.Outcome = normalizeOutcome(td.Outcome) + out.Timestamp = model.Time(td.timestamp.UTC()) + out.Duration = td.Duration.Seconds() * 1000 + out.SpanCount.Started = td.spansCreated + out.SpanCount.Dropped = td.spansDropped + out.OTel = td.Context.otel + for _, sl := range td.links { + out.Links = append(out.Links, model.SpanLink{TraceID: model.TraceID(sl.Trace), SpanID: model.SpanID(sl.Span)}) + } + if dss := buildDroppedSpansStats(td.droppedSpansStats); len(dss) > 0 { + out.DroppedSpansStats = dss + } + + if sampled { + out.Context = td.Context.build() + } +} + func stacktraceCulprit(frames []model.StacktraceFrame) string { for _, frame := range frames { if !frame.LibraryFrame { diff --git a/module/apmlambda/go.mod b/module/apmlambda/go.mod index d149c0b7a..1cb8ca81a 100644 --- a/module/apmlambda/go.mod +++ b/module/apmlambda/go.mod @@ -3,6 +3,7 @@ module go.elastic.co/apm/module/apmlambda/v2 require ( github.com/aws/aws-lambda-go v1.8.0 go.elastic.co/apm/v2 v2.2.0 + go.elastic.co/fastjson v1.1.0 ) replace go.elastic.co/apm/v2 => ../.. diff --git a/module/apmlambda/lambda.go b/module/apmlambda/lambda.go index 449a84640..7863d772c 100644 --- a/module/apmlambda/lambda.go +++ b/module/apmlambda/lambda.go @@ -18,8 +18,10 @@ package apmlambda // import "go.elastic.co/apm/module/apmlambda/v2" import ( + "bytes" "log" "net" + "net/http" "net/rpc" "os" "unicode/utf8" @@ -28,7 +30,9 @@ import ( "github.com/aws/aws-lambda-go/lambdacontext" "go.elastic.co/apm/v2" + "go.elastic.co/apm/v2/model" "go.elastic.co/apm/v2/stacktrace" + "go.elastic.co/fastjson" ) const ( @@ -51,6 +55,10 @@ var ( Request string `json:"request,omitempty"` Response string `json:"response,omitempty"` } + + jsonw fastjson.Writer + + ignoreTxnRegistration bool ) func init() { @@ -72,6 +80,17 @@ func (f *Function) Ping(req *messages.PingRequest, response *messages.PingRespon return f.client.Call("Function.Ping", req, response) } +func createPartialTransactionJSON(apmTx *apm.Transaction, w *fastjson.Writer) error { + var tx model.Transaction + apm.BuildModelTransaction(&tx, apmTx, apmTx.TransactionData) + w.RawString(`{"transaction":`) + if err := tx.MarshalFastJSON(w); err != nil { + return err + } + w.RawByte('}') + return nil +} + // Invoke invokes the Lambda function. This is our main trace point. func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.InvokeResponse) error { tx := f.tracer.StartTransaction(lambdacontext.FunctionName, "function") @@ -92,6 +111,30 @@ func (f *Function) Invoke(req *messages.InvokeRequest, response *messages.Invoke lambdaContext.Request = formatPayload(req.Payload) lambdaContext.Response = "" + if !ignoreTxnRegistration { + defer jsonw.Reset() + if err := createPartialTransactionJSON(tx, &jsonw); err != nil { + log.Printf("failed to create partial transaction for registration: %v", err) + } else { + resp, err := http.Post( + // TODO: @lahsivjar better way to get base URI + "http://localhost:8200/register/transaction", + "application/vnd.elastic.apm.transaction+json", + bytes.NewReader(jsonw.Bytes()), + ) + // Don't attempt registration for next invocations if network + // error or the registration endpoint is not found. + if err != nil || resp.StatusCode == 404 { + ignoreTxnRegistration = true + } + if err != nil { + log.Printf("failed to register transaction, req failed with error: %v", err) + } + if resp.StatusCode/100 != 2 { + log.Printf("failed to register transaction, req failed with status code: %d", resp.StatusCode) + } + } + } err := f.client.Call("Function.Invoke", req, response) if err != nil { e := f.tracer.NewError(err) diff --git a/module/apmlambda/lambda_test.go b/module/apmlambda/lambda_test.go new file mode 100644 index 000000000..f25bf3158 --- /dev/null +++ b/module/apmlambda/lambda_test.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmlambda + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "go.elastic.co/apm/v2" + "go.elastic.co/fastjson" +) + +func TestCreatePartialTransactionJSON(t *testing.T) { + var w fastjson.Writer + tx := apm.DefaultTracer().StartTransaction("test", "function") + defer tx.End() + assert.NoError(t, createPartialTransactionJSON(tx, &w)) + assert.True(t, json.Valid(w.Bytes())) + assert.Equal(t, "test", string(w.Bytes())) +}