From 7ba577f18f58c094a212f9a018a4cd078327d200 Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Fri, 2 Aug 2024 15:49:53 -0700 Subject: [PATCH] Add SNS/SQS trace propagation --- Gemfile | 2 + lib/datadog/tracing/contrib.rb | 35 +++++++++++ .../contrib/aws/configuration/settings.rb | 19 +++++- .../tracing/contrib/aws/instrumentation.rb | 18 +++--- .../tracing/contrib/aws/service/base.rb | 21 +++++++ .../tracing/contrib/aws/service/sns.rb | 19 ++++++ .../tracing/contrib/aws/service/sqs.rb | 21 +++++++ .../contrib/aws/instrumentation_spec.rb | 40 +++++++++++- .../tracing/contrib/aws/service/sns_spec.rb | 62 +++++++++++++++++++ 9 files changed, 227 insertions(+), 10 deletions(-) diff --git a/Gemfile b/Gemfile index 28ab18df764..2ef97b7c241 100644 --- a/Gemfile +++ b/Gemfile @@ -93,3 +93,5 @@ end # # TODO: Remove this once the issue is resolved: https://github.com/ffi/ffi/issues/1107 gem 'ffi', '~> 1.16.3', require: false + +gem 'aws-sdk' diff --git a/lib/datadog/tracing/contrib.rb b/lib/datadog/tracing/contrib.rb index 13a15640333..0b80d860dc1 100644 --- a/lib/datadog/tracing/contrib.rb +++ b/lib/datadog/tracing/contrib.rb @@ -3,6 +3,7 @@ require_relative '../tracing' require_relative 'contrib/registry' require_relative 'contrib/extensions' +require_relative 'contrib/component' module Datadog module Tracing @@ -26,6 +27,40 @@ module Contrib # after the tracer has complete initialization. Use `Datadog::Tracing::Contrib::REGISTRY` instead # of `Datadog.registry` when you code might be called during tracer initialization. REGISTRY = Registry.new + + def self.inject(digest, data) + raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation + + @propagation.inject!(digest, data) + end + + def self.extract(data) + raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation + + @propagation.extract(data) + end + + Contrib::Component.register('_contrib') do |config| + tracing = config.tracing + tracing.propagation_style # TODO: do we still need this? + + @propagation = Datadog::Tracing::Distributed::Propagation.new( + propagation_styles: { + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER => + Tracing::Distributed::B3Multi.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER => + Tracing::Distributed::B3Single.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG => + Tracing::Distributed::Datadog.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT => + Tracing::Distributed::TraceContext.new(fetcher: Tracing::Distributed::Fetcher), + Tracing::Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => Tracing::Distributed::None.new + }, + propagation_style_inject: tracing.propagation_style_inject, + propagation_style_extract: tracing.propagation_style_extract, + propagation_extract_first: tracing.propagation_extract_first + ) + end end end end diff --git a/lib/datadog/tracing/contrib/aws/configuration/settings.rb b/lib/datadog/tracing/contrib/aws/configuration/settings.rb index c6c64bf5d9c..893a25a3a8b 100644 --- a/lib/datadog/tracing/contrib/aws/configuration/settings.rb +++ b/lib/datadog/tracing/contrib/aws/configuration/settings.rb @@ -45,7 +45,24 @@ class Settings < Contrib::Configuration::Settings o.type :string, nilable: true o.env Ext::ENV_PEER_SERVICE end - end + + option :propagation do |o| + o.type :bool + # TODO: Add env var for this + # TODO: Add env var for this + # TODO: Add env var for this + # o.env Ext::ENV_PEER_SERVICE + o.default false + end + + option :batch_propagation do |o| + o.type :bool + # TODO: Add env var for this + # TODO: Add env var for this + # TODO: Add env var for this + # o.env Ext::ENV_PEER_SERVICE + o.default false + end end end end diff --git a/lib/datadog/tracing/contrib/aws/instrumentation.rb b/lib/datadog/tracing/contrib/aws/instrumentation.rb index 7194ef9ae01..b7fa9057488 100644 --- a/lib/datadog/tracing/contrib/aws/instrumentation.rb +++ b/lib/datadog/tracing/contrib/aws/instrumentation.rb @@ -19,9 +19,9 @@ def add_handlers(handlers, _) # Generates Spans for all interactions with AWS class Handler < Seahorse::Client::Handler def call(context) - Tracing.trace(Ext::SPAN_COMMAND) do |span| + Tracing.trace(Ext::SPAN_COMMAND) do |span, trace| @handler.call(context).tap do - annotate!(span, ParsedContext.new(context)) + annotate!(span, trace, ParsedContext.new(context)) end end end @@ -29,8 +29,9 @@ def call(context) private # rubocop:disable Metrics/AbcSize - def annotate!(span, context) - span.service = configuration[:service_name] + def annotate!(span, trace, context) + config = configuration + span.service = config[:service_name] span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND span.name = Ext::SPAN_COMMAND span.resource = context.safely(:resource) @@ -38,13 +39,14 @@ def annotate!(span, context) span.set_tag(Ext::TAG_AWS_SERVICE, aws_service) params = context.safely(:params) if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service]) + handler.process(config, trace, context) handler.add_tags(span, params) end - if configuration[:peer_service] + if config[:peer_service] span.set_tag( Tracing::Metadata::Ext::TAG_PEER_SERVICE, - configuration[:peer_service] + config[:peer_service] ) end @@ -61,8 +63,8 @@ def annotate!(span, context) span.set_tag(Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, context.safely(:host)) # Set analytics sample rate - if Contrib::Analytics.enabled?(configuration[:analytics_enabled]) - Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate]) + if Contrib::Analytics.enabled?(config[:analytics_enabled]) + Contrib::Analytics.set_sample_rate(span, config[:analytics_sample_rate]) end Contrib::Analytics.set_measured(span) diff --git a/lib/datadog/tracing/contrib/aws/service/base.rb b/lib/datadog/tracing/contrib/aws/service/base.rb index c6b496c1df7..6f7d5a8fb96 100644 --- a/lib/datadog/tracing/contrib/aws/service/base.rb +++ b/lib/datadog/tracing/contrib/aws/service/base.rb @@ -7,7 +7,28 @@ module Aws module Service # Base class for all AWS service-specific tag handlers. class Base + def process(config, trace, context); end def add_tags(span, params); end + + MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes + + def extract_propagation(context) + message_attributes = context.params[:message_attributes] + + return unless message_attributes && (datadog = message_attributes['_datadog']) + + Tracing.continue_trace!(Contrib.extract(datadog)) + end + + def inject_propagation(trace, context, data_type) + message_attributes = (context.params[:message_attributes] ||= {}) + return if message_attributes.size >= MESSAGE_ATTRIBUTES_LIMIT + + data = {} + if Datadog::Tracing::Contrib.inject(trace.to_digest, data) + message_attributes['_datadog'] = { :data_type => data_type, :binary_value => data.to_json } + end + end end end end diff --git a/lib/datadog/tracing/contrib/aws/service/sns.rb b/lib/datadog/tracing/contrib/aws/service/sns.rb index 40cb0022344..f9cb0c8f37b 100644 --- a/lib/datadog/tracing/contrib/aws/service/sns.rb +++ b/lib/datadog/tracing/contrib/aws/service/sns.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'json' + require_relative './base' require_relative '../ext' @@ -10,6 +12,23 @@ module Aws module Service # SNS tag handlers. class SNS < Base + def process(config, trace, context) + inject_propagation(trace, context, 'Binary') if config[:propagation] + + return unless config[:propagation] + + case context.operation + when :publish + inject_propagation(trace, context, 'Binary') + when :publish_batch + if config[:batch_propagation] + inject_propagation(trace, context, 'Binary') + else + inject_propagation(trace, context, 'Binary') + end + end + end + def add_tags(span, params) topic_arn = params[:topic_arn] topic_name = params[:name] diff --git a/lib/datadog/tracing/contrib/aws/service/sqs.rb b/lib/datadog/tracing/contrib/aws/service/sqs.rb index 2ab6c0cb03f..88723177edf 100644 --- a/lib/datadog/tracing/contrib/aws/service/sqs.rb +++ b/lib/datadog/tracing/contrib/aws/service/sqs.rb @@ -10,6 +10,27 @@ module Aws module Service # SQS tag handlers. class SQS < Base + def before_span(config, context) + if config[:propagation] && context.operation == :receive_message + extract_propagation(context) + end + end + + def process(config, trace, context) + return unless config[:propagation] + + case context.operation + when :send_message + inject_propagation(trace, context, 'String') + when :send_message_batch + if config[:batch_propagation] + inject_propagation(trace, context, 'String') + else + inject_propagation(trace, context, 'String') + end + end + end + def add_tags(span, params) queue_url = params[:queue_url] queue_name = params[:queue_name] diff --git a/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb b/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb index 88e6c7cbee0..e0aa2247610 100644 --- a/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb +++ b/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb @@ -379,6 +379,37 @@ .to eq('sqs.us-stubbed-1.amazonaws.com') end end + + describe '#receive_message' do + subject!(:receive_message) do + client.receive_message( + { + queue_url: 'https://sqs.us-stubbed-1.amazonaws.com/123456789012/MyQueueName', + attribute_names: ["All"], + max_number_of_messages: 1, + visibility_timeout: 1, + wait_time_seconds: 1, + receive_request_attempt_id: "my_receive_request_attempt_1", + } + ) + end + + let(:responses) do + { receive_message: { + messages: [] + } } + end + + it 'generates a span' do + expect(span.name).to eq('aws.command') + expect(span.service).to eq('aws') + expect(span.type).to eq('http') + expect(span.resource).to eq('sqs.receive_message') + + expect(span.get_tag('aws.agent')).to eq('aws-sdk-ruby') + expect(span.get_tag('aws.operation')).to eq('receive_message') + end + end end context 'with an SNS client' do @@ -389,7 +420,14 @@ client.publish( { topic_arn: 'arn:aws:sns:us-west-2:123456789012:my-topic-name', - message: 'Hello, world!' + message: 'Hello, world!', + message_attributes: { + 'String' => { + data_type: 'String', # required + string_value: 'String', + binary_value: 'data', + }, + }, } ) end diff --git a/spec/datadog/tracing/contrib/aws/service/sns_spec.rb b/spec/datadog/tracing/contrib/aws/service/sns_spec.rb index 8b3737b43e6..65289ec6f71 100644 --- a/spec/datadog/tracing/contrib/aws/service/sns_spec.rb +++ b/spec/datadog/tracing/contrib/aws/service/sns_spec.rb @@ -37,4 +37,66 @@ expect(span).to have_received(:set_tag).with(Datadog::Tracing::Contrib::Aws::Ext::TAG_TOPIC_NAME, nil) end end + + shared_examples 'injects attribute propagation' do + subject(:inject_propagation) { service.process(config, trace, context) } + + let(:config) { { propagation: true } } + let(:trace) { Datadog::Tracing::TraceOperation.new(id: trace_id, parent_span_id: span_id) } + let(:context) { instance_double('Context', params: params) } + let(:params) { {} } + let(:trace_id) { 1 } + let(:span_id) { 2 } + + before { Datadog.configure { |c| c.tracing.instrument :aws } } + + context 'without preexisting message attributes' do + it 'adds a propagation attribute' do + inject_propagation + expect(params[:message_attributes]).to eq( + '_datadog' => { + binary_value: + '{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \ + '"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ + '"tracestate":"dd=p:0000000000000002"}', + data_type: 'Binary' + } + ) + end + end + + context 'with existing message attributes' do + let(:params) { { message_attributes: message_attributes } } + let(:message_attributes) { { 'existing' => { data_type: 'String', string_value: 'value' } } } + + it 'adds a propagation attribute' do + expect { inject_propagation }.to change { message_attributes.keys }.from(['existing']).to(['existing', '_datadog']) + end + end + + context 'with 10 message attributes already set' do + let(:params) { { message_attributes: message_attributes } } + let(:message_attributes) do + Array.new(10) do |i| + ["attr#{i}", { data_type: 'Number', string_value: i }] + end.to_h + end + + it 'does not add a propagation attribute' do + expect { inject_propagation }.to_not(change { params }) + end + end + + context 'disabled' do + let(:config) { { propagation: false } } + + it 'does not add a propagation attribute' do + expect { inject_propagation }.to_not(change { params }) + end + end + end + + it_behaves_like 'injects attribute propagation' do + let(:service) { sns } + end end