Skip to content

Commit

Permalink
Add SNS/SQS trace propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Aug 2, 2024
1 parent acd9271 commit 7ba577f
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 10 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,5 @@ end
#
# TODO: Remove this once the issue is resolved: https://github.com/ffi/ffi/issues/1107
gem 'ffi', '~> 1.16.3', require: false

gem 'aws-sdk'
35 changes: 35 additions & 0 deletions lib/datadog/tracing/contrib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative '../tracing'
require_relative 'contrib/registry'
require_relative 'contrib/extensions'
require_relative 'contrib/component'

module Datadog
module Tracing
Expand All @@ -26,6 +27,40 @@ module Contrib
# after the tracer has complete initialization. Use `Datadog::Tracing::Contrib::REGISTRY` instead
# of `Datadog.registry` when you code might be called during tracer initialization.
REGISTRY = Registry.new

def self.inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

def self.extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('_contrib') do |config|
tracing = config.tracing
tracing.propagation_style # TODO: do we still need this?

@propagation = Datadog::Tracing::Distributed::Propagation.new(
propagation_styles: {
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher),
Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new
},
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
Expand Down
19 changes: 18 additions & 1 deletion lib/datadog/tracing/contrib/aws/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,24 @@ class Settings < Contrib::Configuration::Settings
o.type :string, nilable: true
o.env Ext::ENV_PEER_SERVICE
end
end

option :propagation do |o|
o.type :bool
# TODO: Add env var for this
# TODO: Add env var for this
# TODO: Add env var for this
# o.env Ext::ENV_PEER_SERVICE
o.default false
end

option :batch_propagation do |o|
o.type :bool
# TODO: Add env var for this
# TODO: Add env var for this
# TODO: Add env var for this
# o.env Ext::ENV_PEER_SERVICE
o.default false
end
end
end
end
Expand Down
18 changes: 10 additions & 8 deletions lib/datadog/tracing/contrib/aws/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,34 @@ def add_handlers(handlers, _)
# Generates Spans for all interactions with AWS
class Handler < Seahorse::Client::Handler
def call(context)
Tracing.trace(Ext::SPAN_COMMAND) do |span|
Tracing.trace(Ext::SPAN_COMMAND) do |span, trace|
@handler.call(context).tap do
annotate!(span, ParsedContext.new(context))
annotate!(span, trace, ParsedContext.new(context))
end
end
end

private

# rubocop:disable Metrics/AbcSize
def annotate!(span, context)
span.service = configuration[:service_name]
def annotate!(span, trace, context)
config = configuration
span.service = config[:service_name]
span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND
span.name = Ext::SPAN_COMMAND
span.resource = context.safely(:resource)
aws_service = span.resource.split('.')[0]
span.set_tag(Ext::TAG_AWS_SERVICE, aws_service)
params = context.safely(:params)
if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service])
handler.process(config, trace, context)
handler.add_tags(span, params)
end

if configuration[:peer_service]
if config[:peer_service]
span.set_tag(
Tracing::Metadata::Ext::TAG_PEER_SERVICE,
configuration[:peer_service]
config[:peer_service]
)
end

Expand All @@ -61,8 +63,8 @@ def annotate!(span, context)
span.set_tag(Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, context.safely(:host))

# Set analytics sample rate
if Contrib::Analytics.enabled?(configuration[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate])
if Contrib::Analytics.enabled?(config[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, config[:analytics_sample_rate])
end
Contrib::Analytics.set_measured(span)

Expand Down
21 changes: 21 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,28 @@ module Aws
module Service
# Base class for all AWS service-specific tag handlers.
class Base
def process(config, trace, context); end
def add_tags(span, params); end

MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes

def extract_propagation(context)
message_attributes = context.params[:message_attributes]

return unless message_attributes && (datadog = message_attributes['_datadog'])

Tracing.continue_trace!(Contrib.extract(datadog))
end

def inject_propagation(trace, context, data_type)
message_attributes = (context.params[:message_attributes] ||= {})
return if message_attributes.size >= MESSAGE_ATTRIBUTES_LIMIT

data = {}
if Datadog::Tracing::Contrib.inject(trace.to_digest, data)
message_attributes['_datadog'] = { :data_type => data_type, :binary_value => data.to_json }
end
end
end
end
end
Expand Down
19 changes: 19 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/sns.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require 'json'

require_relative './base'
require_relative '../ext'

Expand All @@ -10,6 +12,23 @@ module Aws
module Service
# SNS tag handlers.
class SNS < Base
def process(config, trace, context)
inject_propagation(trace, context, 'Binary') if config[:propagation]

return unless config[:propagation]

case context.operation
when :publish
inject_propagation(trace, context, 'Binary')
when :publish_batch
if config[:batch_propagation]
inject_propagation(trace, context, 'Binary')
else
inject_propagation(trace, context, 'Binary')
end
end
end

def add_tags(span, params)
topic_arn = params[:topic_arn]
topic_name = params[:name]
Expand Down
21 changes: 21 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,27 @@ module Aws
module Service
# SQS tag handlers.
class SQS < Base
def before_span(config, context)
if config[:propagation] && context.operation == :receive_message
extract_propagation(context)
end
end

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :send_message
inject_propagation(trace, context, 'String')
when :send_message_batch
if config[:batch_propagation]
inject_propagation(trace, context, 'String')
else
inject_propagation(trace, context, 'String')
end
end
end

def add_tags(span, params)
queue_url = params[:queue_url]
queue_name = params[:queue_name]
Expand Down
40 changes: 39 additions & 1 deletion spec/datadog/tracing/contrib/aws/instrumentation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,37 @@
.to eq('sqs.us-stubbed-1.amazonaws.com')
end
end

describe '#receive_message' do
subject!(:receive_message) do
client.receive_message(
{
queue_url: 'https://sqs.us-stubbed-1.amazonaws.com/123456789012/MyQueueName',
attribute_names: ["All"],
max_number_of_messages: 1,
visibility_timeout: 1,
wait_time_seconds: 1,
receive_request_attempt_id: "my_receive_request_attempt_1",
}
)
end

let(:responses) do
{ receive_message: {
messages: []
} }
end

it 'generates a span' do
expect(span.name).to eq('aws.command')
expect(span.service).to eq('aws')
expect(span.type).to eq('http')
expect(span.resource).to eq('sqs.receive_message')

expect(span.get_tag('aws.agent')).to eq('aws-sdk-ruby')
expect(span.get_tag('aws.operation')).to eq('receive_message')
end
end
end

context 'with an SNS client' do
Expand All @@ -389,7 +420,14 @@
client.publish(
{
topic_arn: 'arn:aws:sns:us-west-2:123456789012:my-topic-name',
message: 'Hello, world!'
message: 'Hello, world!',
message_attributes: {
'String' => {
data_type: 'String', # required
string_value: 'String',
binary_value: 'data',
},
},
}
)
end
Expand Down
62 changes: 62 additions & 0 deletions spec/datadog/tracing/contrib/aws/service/sns_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,66 @@
expect(span).to have_received(:set_tag).with(Datadog::Tracing::Contrib::Aws::Ext::TAG_TOPIC_NAME, nil)
end
end

shared_examples 'injects attribute propagation' do
subject(:inject_propagation) { service.process(config, trace, context) }

let(:config) { { propagation: true } }
let(:trace) { Datadog::Tracing::TraceOperation.new(id: trace_id, parent_span_id: span_id) }
let(:context) { instance_double('Context', params: params) }
let(:params) { {} }
let(:trace_id) { 1 }
let(:span_id) { 2 }

before { Datadog.configure { |c| c.tracing.instrument :aws } }

context 'without preexisting message attributes' do
it 'adds a propagation attribute' do
inject_propagation
expect(params[:message_attributes]).to eq(
'_datadog' => {
binary_value:
'{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \
'"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \
'"tracestate":"dd=p:0000000000000002"}',
data_type: 'Binary'
}
)
end
end

context 'with existing message attributes' do
let(:params) { { message_attributes: message_attributes } }
let(:message_attributes) { { 'existing' => { data_type: 'String', string_value: 'value' } } }

it 'adds a propagation attribute' do
expect { inject_propagation }.to change { message_attributes.keys }.from(['existing']).to(['existing', '_datadog'])
end
end

context 'with 10 message attributes already set' do
let(:params) { { message_attributes: message_attributes } }
let(:message_attributes) do
Array.new(10) do |i|
["attr#{i}", { data_type: 'Number', string_value: i }]
end.to_h
end

it 'does not add a propagation attribute' do
expect { inject_propagation }.to_not(change { params })
end
end

context 'disabled' do
let(:config) { { propagation: false } }

it 'does not add a propagation attribute' do
expect { inject_propagation }.to_not(change { params })
end
end
end

it_behaves_like 'injects attribute propagation' do
let(:service) { sns }
end
end

0 comments on commit 7ba577f

Please sign in to comment.