Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect messaging.{system,operation} in TranslateTransaction #204

Merged
merged 3 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions input/elasticapm/internal/modeldecoder/v2/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,24 @@ func TestDecodeMapToTransactionModel(t *testing.T) {
}, event.Transaction.Message)
})

t.Run("messaging_without_destination", func(t *testing.T) {
var input transaction
var event modelpb.APMEvent
modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues())
input.Type.Reset()
attrs := map[string]interface{}{
"messaging.system": "kafka",
"messaging.operation": "publish",
}
input.OTel.Attributes = attrs
input.OTel.SpanKind.Reset()

mapToTransactionModel(&input, &event)
assert.Equal(t, "messaging", event.Transaction.Type)
assert.Equal(t, "CONSUMER", event.Span.Kind)
assert.Nil(t, event.Transaction.Message)
})

t.Run("network", func(t *testing.T) {
attrs := map[string]interface{}{
"network.connection.type": "cell",
Expand Down
17 changes: 14 additions & 3 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TranslateTransaction(
)

var isHTTP, isRPC, isMessaging bool
var message modelpb.Message
var messagingQueueName string

var samplerType, samplerParam pcommon.Value
attributes.Range(func(kDots string, v pcommon.Value) bool {
Expand Down Expand Up @@ -411,8 +411,14 @@ func TranslateTransaction(

// messaging.*
case "message_bus.destination", semconv.AttributeMessagingDestination:
message.QueueName = stringval
isMessaging = true
messagingQueueName = stringval
case semconv.AttributeMessagingSystem:
isMessaging = true
modelpb.Labels(event.Labels).Set(k, stringval)
case semconv.AttributeMessagingOperation:
isMessaging = true
modelpb.Labels(event.Labels).Set(k, stringval)

// rpc.*
//
Expand Down Expand Up @@ -495,7 +501,12 @@ func TranslateTransaction(
event.Url = modelpb.ParseURL(httpURL, httpHost, httpScheme)
}
if isMessaging {
event.Transaction.Message = &message
// Overwrite existing event.Transaction.Message
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this code back here. It is important to overwrite existing event.Transaction.Message to avoid breaking otel bridge test when event.Transaction.Message and the otel attribute exist at the same time.

=== RUN   TestDecodeMapToTransactionModel/otel-bridge/messaging
    transaction_test.go:559: 
                Error Trace:    /home/carson/projects/apm-data/input/elasticapm/internal/modeldecoder/v2/transaction_test.go:559
                Error:          Not equal: 
                                expected: &modelpb.Message{state:impl.MessageState{NoUnkeyedLiterals:pragma.NoUnkeyedLiterals{}, DoNotCompare:pragma.DoNotCompare{}, DoNotCopy:pragma.DoNotCopy{}, atomicMessageInfo:(*impl.MessageInfo)(nil)}, sizeCache:0, unknownFields:[]uint8(nil), Body:"", Headers:[]*modelpb.HTTPHeader(nil), AgeMillis:(*uint64)(nil), QueueName:"myQueue", RoutingKey:""}
                                actual  : &modelpb.Message{state:impl.MessageState{NoUnkeyedLiterals:pragma.NoUnkeyedLiterals{}, DoNotCompare:pragma.DoNotCompare{}, DoNotCopy:pragma.DoNotCopy{}, atomicMessageInfo:(*impl.MessageInfo)(nil)}, sizeCache:0, unknownFields:[]uint8(nil), Body:"init", Headers:[]*modelpb.HTTPHeader{(*modelpb.HTTPHeader)(0xc00040aa50)}, AgeMillis:(*uint64)(0xc000327058), QueueName:"myQueue", RoutingKey:"init"}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how the message instance would already be non-nil at this point for otel data, so it should be fine.

event.Transaction.Message = nil
if messagingQueueName != "" {
event.Transaction.Message = modelpb.MessageFromVTPool()
event.Transaction.Message.QueueName = messagingQueueName
}
}

if event.Client == nil && event.Source != nil {
Expand Down
64 changes: 50 additions & 14 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,20 +609,56 @@ func TestRPCSpan(t *testing.T) {
}

func TestMessagingTransaction(t *testing.T) {
event := transformTransactionWithAttributes(t, map[string]interface{}{
"messaging.destination": "myQueue",
}, func(s ptrace.Span) {
s.SetKind(ptrace.SpanKindConsumer)
// Set parentID to imply this isn't the root, but
// kind==Consumer should still force the span to be translated
// as a transaction.
s.SetParentSpanID(pcommon.SpanID{3})
})
assert.Equal(t, "messaging", event.Transaction.Type)
assert.Empty(t, event.Labels)
assert.Equal(t, &modelpb.Message{
QueueName: "myQueue",
}, event.Transaction.Message)
for _, tc := range []struct {
attrs map[string]interface{}

expectedLabels map[string]*modelpb.LabelValue
expectedTxnMessage *modelpb.Message
}{
{
attrs: map[string]interface{}{
"messaging.destination": "myQueue",
},
expectedLabels: nil,
expectedTxnMessage: &modelpb.Message{
QueueName: "myQueue",
},
},
{
attrs: map[string]interface{}{
"messaging.system": "kafka",
},
expectedLabels: map[string]*modelpb.LabelValue{
"messaging_system": {Value: "kafka"},
},
expectedTxnMessage: nil,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[For reviewer] As demonstrated by the test case, this is a new case that doesn't exist before. When messaging.system or messaging.operation is present AND messaging.destination is absent, do we want event.Transaction.Message to be empty or nil? Does it make a difference?

To rephrase the question, is it ok to have event.Transaction.Message == nil when event.Transaction.Type == "messaging"?

Copy link
Contributor

@simitt simitt Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; this PR fixes the bug that the transaction type is not set although messaging attributes are sent.
However, the apm message model is pretty outdated compared to the otel messaging spec. So my suggestion would be to open a follow up github issue for updating the supported semconv version and then also update the messaging model definition. I'd see it outside the scope of this PR though.

},
{
attrs: map[string]interface{}{
"messaging.operation": "publish",
},
expectedLabels: map[string]*modelpb.LabelValue{
"messaging_operation": {Value: "publish"},
},
expectedTxnMessage: nil,
},
} {
tcName, err := json.Marshal(tc.attrs)
require.NoError(t, err)
t.Run(string(tcName), func(t *testing.T) {
event := transformTransactionWithAttributes(t, tc.attrs, func(s ptrace.Span) {
s.SetKind(ptrace.SpanKindConsumer)
// Set parentID to imply this isn't the root, but
// kind==Consumer should still force the span to be translated
// as a transaction.
s.SetParentSpanID(pcommon.SpanID{3})
})
assert.Equal(t, "messaging", event.Transaction.Type)
assert.Equal(t, tc.expectedLabels, event.Labels)
assert.Equal(t, tc.expectedTxnMessage, event.Transaction.Message)
})
}

}

func TestMessagingSpan(t *testing.T) {
Expand Down
Loading