Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SNS/SQS trace propagation #3842

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ require 'aws-sdk'
require 'datadog'

Datadog.configure do |c|
c.tracing.instrument :aws, **options
c.tracing.instrument :aws, propagation: true, **options
end

# Perform traced call
Expand All @@ -562,6 +562,8 @@ Aws::S3::Client.new.list_buckets
| `enabled` | `DD_TRACE_AWS_ENABLED` | `Bool` | Whether the integration should create spans. | `true` |
| `service_name` | `DD_TRACE_AWS_SERVICE_NAME` | `String` | Name of application running the `aws` instrumentation. May be overridden by `global_default_service_name`. [See _Additional Configuration_ for more details](#additional-configuration) | `aws` |
| `peer_service` | `DD_TRACE_AWS_PEER_SERVICE` | `String` | Name of external service the application connects to | `nil` |
| `propagation` | `DD_TRACE_AWS_PROPAGATION_ENABLED` | `Bool` | Enables distributed trace propagation for SNS and SQS messages. | `false` |
| `parentage_style` | `DD_TRACE_AWS_TRACE_PARENTAGE_STYLE` | `String` | Controls whether the local trace is parented to the SQS message consumed. Possible values are: `local`, `distributed`. This option is always disable (the equivalent to `local`) if `propagation` is disabled. | `propagation` |

### Concurrent Ruby

Expand Down
1 change: 1 addition & 0 deletions lib/datadog/tracing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'core'
require_relative 'tracing/pipeline'
require_relative 'tracing/distributed'

module Datadog
# Datadog APM tracing public API.
Expand Down
23 changes: 23 additions & 0 deletions lib/datadog/tracing/contrib/aws/configuration/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ class Settings < Contrib::Configuration::Settings
o.type :string, nilable: true
o.env Ext::ENV_PEER_SERVICE
end

# Enables distributed trace propagation for SNS and SQS messages.
# @default `DD_TRACE_AWS_PROPAGATION_ENABLED` environment variable, otherwise `false`
# @return [Boolean]
option :propagation do |o|
o.type :bool
o.env Ext::ENV_PROPAGATION_ENABLED
o.default false
end

# Controls whether the local trace is parented to the SQS message consumed.
# Possible values are:
# `local`: The local active trace is used; SNS has no effect on trace parentage.
# `distributed`: The local active trace becomes a child of the propagation context from the SQS message.
#
# This option is always disable (the equivalent to`local`) if `propagation` is disabled.
# @default `DD_TRACE_AWS_TRACE_PARENTAGE_STYLE` environment variable, otherwise `local`
# @return [String]
option :parentage_style do |o|
o.type :string
o.env Ext::ENV_TRACE_PARENTAGE_STYLE
o.default 'distributed'
end
end
end
end
Expand Down
3 changes: 3 additions & 0 deletions lib/datadog/tracing/contrib/aws/ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module Ext
# @!visibility private
ENV_ANALYTICS_ENABLED = 'DD_TRACE_AWS_ANALYTICS_ENABLED'
ENV_ANALYTICS_SAMPLE_RATE = 'DD_TRACE_AWS_ANALYTICS_SAMPLE_RATE'
ENV_PROPAGATION_ENABLED = 'DD_TRACE_AWS_PROPAGATION_ENABLED'
ENV_TRACE_PARENTAGE_STYLE = 'DD_TRACE_AWS_TRACE_PARENTAGE_STYLE'

DEFAULT_PEER_SERVICE_NAME = 'aws'
SPAN_COMMAND = 'aws.command'
TAG_AGENT = 'aws.agent'
Expand Down
48 changes: 35 additions & 13 deletions lib/datadog/tracing/contrib/aws/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,56 @@ def add_handlers(handlers, _)

# Generates Spans for all interactions with AWS
class Handler < Seahorse::Client::Handler
# Some services contain trace propagation information (e.g. SQS) that affect what active trace
# we'll use for the AWS span.
# But because this information is only available after the request is made, we need to make the AWS
# request first, then create the trace and span with correct distributed trace parenting.
def call(context)
Tracing.trace(Ext::SPAN_COMMAND) do |span|
@handler.call(context).tap do
annotate!(span, ParsedContext.new(context))
end
config = configuration

# Find the AWS service instrumentation
parsed_context = ParsedContext.new(context)
aws_service = parsed_context.safely(:resource).split('.')[0]
handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service]

# Execute handler stack, to ensure we have the response object before the trace and span are created
start_time = Core::Utils::Time.now.utc # Save the start time as the span creation is delayed
begin
response = @handler.call(context)
rescue Exception => e # rubocop:disable Lint/RescueException
# Catch exception to reraise it inside the trace block, to ensure the span has correct error information
# This matches the behavior of {Datadog::Tracing::SpanOperation#measure}
end

Tracing.trace(Ext::SPAN_COMMAND, start_time: start_time) do |span, trace|
handler.before_span(config, context, response) if handler

annotate!(config, span, trace, parsed_context, aws_service)

raise e if e
end

response
end

private

# rubocop:disable Metrics/AbcSize
def annotate!(span, context)
span.service = configuration[:service_name]
def annotate!(config, span, trace, context, aws_service)
span.service = config[:service_name]
span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND
span.name = Ext::SPAN_COMMAND
span.resource = context.safely(:resource)
aws_service = span.resource.split('.')[0]
span.set_tag(Ext::TAG_AWS_SERVICE, aws_service)
params = context.safely(:params)
if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service])
handler.process(config, trace, context)
handler.add_tags(span, params)
end

if configuration[:peer_service]
if config[:peer_service]
span.set_tag(
Tracing::Metadata::Ext::TAG_PEER_SERVICE,
configuration[:peer_service]
config[:peer_service]
)
end

Expand All @@ -61,8 +84,8 @@ def annotate!(span, context)
span.set_tag(Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, context.safely(:host))

# Set analytics sample rate
if Contrib::Analytics.enabled?(configuration[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate])
if Contrib::Analytics.enabled?(config[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, config[:analytics_sample_rate])
end
Contrib::Analytics.set_measured(span)

Expand All @@ -77,7 +100,6 @@ def annotate!(span, context)

Contrib::SpanAttributeSchema.set_peer_service!(span, Ext::PEER_SERVICE_SOURCES)
end
# rubocop:enable Metrics/AbcSize

def configuration
Datadog.configuration.tracing[:aws]
Expand Down
31 changes: 31 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,38 @@ module Aws
module Service
# Base class for all AWS service-specific tag handlers.
class Base
def before_span(config, context, response); end
def process(config, trace, context); end
def add_tags(span, params); end

MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes

# Extract the `_datadog` message attribute and decode its JSON content.
def extract_propagation!(response, data_type)
messages = response.data.messages

# DEV: Extract the context from the first message today.
# DEV: Use span links in the future to support multiple messages related to a single span.
return unless (message = messages[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Suggested change
return unless (message = messages[0])
return unless (message = messages.first)
Improve readability with first (...read more)

This rule encourages the use of first and last methods over array indexing to access the first and last elements of an array, respectively. The primary reason behind this rule is to improve code readability. Using first and last makes it immediately clear that you are accessing the first or last element of the array, which might not be immediately obvious with array indexing, especially for developers who are new to Ruby.

The use of these methods also helps to make your code more idiomatic, which is a crucial aspect of writing effective Ruby code. Idiomatic code is easier to read, understand, and maintain. It also tends to be more efficient, as idioms often reflect patterns that are optimized for the language.

To adhere to this rule, replace the use of array indexing with first or last methods when you want to access the first and last elements of an array. For instance, instead of arr[0] use arr.first and instead of arr[-1] use arr.last. However, note that this rule should be applied only when reading values. When modifying the first or last elements, array indexing should still be used. For example, arr[0] = 'new_value' and arr[-1] = 'new_value'.

View in Datadog  Leave us feedback  Documentation


message_attributes = message.message_attributes

return unless message_attributes && (datadog = message_attributes['_datadog'])

if (data = datadog[data_type]) && (parsed_data = JSON.parse(data))
Tracing.continue_trace!(Distributed.extract(parsed_data))
end
end

def inject_propagation(trace, params, data_type)
message_attributes = (params[:message_attributes] ||= {})
return if message_attributes.size >= MESSAGE_ATTRIBUTES_LIMIT

data = {}
if Distributed.inject(trace.to_digest, data)
message_attributes['_datadog'] = { :data_type => data_type, :binary_value => data.to_json }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality Violation

Use newer syntax for a hash with symbols as keys (...read more)

The rule "Use new syntax when keys are symbols" is a coding standard in modern Ruby development. It encourages the use of the new hash syntax, introduced in Ruby 1.9, where symbols are used as keys. The old hash rocket syntax (:symbol => value) is replaced with the more elegant and succinct symbol: value syntax.

This rule is important as it promotes a cleaner, more readable code. The new syntax is more concise and less cluttered, making it easier to understand the structure and purpose of the hash. This is particularly beneficial in large codebases or when hashes are nested or complex.

To adhere to this rule, always use the new syntax when defining hashes where keys are symbols. Instead of defining a hash with :symbol => value, use symbol: value. This approach will not only make your code more readable but also ensure consistency across your codebase.

View in Datadog  Leave us feedback  Documentation

end
end
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/sns.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ module Aws
module Service
# SNS tag handlers.
class SNS < Base
PROPAGATION_DATATYPE = 'Binary'

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :publish
inject_propagation(trace, context.params, PROPAGATION_DATATYPE)
# TODO: when :publish_batch # Future support for batch publishing
end
end

def add_tags(span, params)
topic_arn = params[:topic_arn]
topic_name = params[:name]
Expand Down
18 changes: 18 additions & 0 deletions lib/datadog/tracing/contrib/aws/service/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,24 @@ module Aws
module Service
# SQS tag handlers.
class SQS < Base
DATATYPE = 'String'
def before_span(config, context, response)
return unless context.operation == :receive_message && config[:propagation]

# Parent the current trace based on distributed message attributes
extract_propagation!(response, 'string_value') if config[:parentage_style] == 'distributed'
end

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :send_message
inject_propagation(trace, context.params, 'String')
# TODO: when :send_message_batch # Future support for batch sending
end
end

def add_tags(span, params)
queue_url = params[:queue_url]
queue_name = params[:queue_name]
Expand Down
59 changes: 59 additions & 0 deletions lib/datadog/tracing/distributed.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require_relative 'distributed/b3_multi'
require_relative 'distributed/b3_single'
require_relative 'distributed/datadog'
require_relative 'distributed/none'
require_relative 'distributed/propagation'
require_relative 'distributed/trace_context'
require_relative 'contrib/component'

module Datadog
module Tracing
# Namespace for distributed tracing propagation and correlation
module Distributed
module_function

# Inject distributed headers into the given request
# @param digest [Datadog::Tracing::TraceDigest] the trace to inject
# @param data [Hash] the request to inject
def inject(digest, data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.inject!(digest, data)
end

# Extract distributed headers from the given request
# @param data [Hash] the request to extract from
# @return [Datadog::Tracing::TraceDigest,nil] the extracted trace digest or nil if none was found
def extract(data)
raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation

@propagation.extract(data)
end

Contrib::Component.register('distributed') do |config|
tracing = config.tracing
# DEV: evaluate propagation_style in case it overrides propagation_style_extract & propagation_extract_first
tracing.propagation_style

@propagation = Propagation.new(
propagation_styles: {
Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER =>
B3Multi.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER =>
B3Single.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG =>
Datadog.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT =>
TraceContext.new(fetcher: Fetcher),
Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => None.new
},
propagation_style_inject: tracing.propagation_style_inject,
propagation_style_extract: tracing.propagation_style_extract,
propagation_extract_first: tracing.propagation_extract_first
)
end
end
end
end
54 changes: 54 additions & 0 deletions lib/datadog/tracing/trace_digest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,60 @@ def merge(field_value_pairs)
}.merge!(field_value_pairs)
)
end

# rubocop:disable Metrics/AbcSize,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity
def ==(other)
self.class == other.class &&
span_id == other.span_id &&
span_name == other.span_name &&
span_resource == other.span_resource &&
span_service == other.span_service &&
span_type == other.span_type &&
trace_distributed_tags == other.trace_distributed_tags &&
trace_hostname == other.trace_hostname &&
trace_id == other.trace_id &&
trace_name == other.trace_name &&
trace_origin == other.trace_origin &&
trace_process_id == other.trace_process_id &&
trace_resource == other.trace_resource &&
trace_runtime_id == other.trace_runtime_id &&
trace_sampling_priority == other.trace_sampling_priority &&
trace_service == other.trace_service &&
trace_distributed_id == other.trace_distributed_id &&
trace_flags == other.trace_flags &&
trace_state == other.trace_state &&
trace_state_unknown_fields == other.trace_state_unknown_fields &&
span_remote == other.span_remote
end
# rubocop:enable Metrics/AbcSize,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity

alias eql? ==

def hash
[
self.class,
span_id,
span_name,
span_resource,
span_service,
span_type,
trace_distributed_tags,
trace_hostname,
trace_id,
trace_name,
trace_origin,
trace_process_id,
trace_resource,
trace_runtime_id,
trace_sampling_priority,
trace_service,
trace_distributed_id,
trace_flags,
trace_state,
trace_state_unknown_fields,
span_remote
].hash
end
end
end
end
2 changes: 2 additions & 0 deletions sig/datadog/tracing/contrib/aws/ext.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Datadog
ENV_ENABLED: "DD_TRACE_AWS_ENABLED"

ENV_PEER_SERVICE: "DD_TRACE_AWS_PEER_SERVICE"
ENV_PROPAGATION_ENABLED: string
ENV_SERVICE_NAME: "DD_TRACE_AWS_SERVICE_NAME"

ENV_ANALYTICS_ENABLED: "DD_TRACE_AWS_ANALYTICS_ENABLED"
Expand All @@ -14,6 +15,7 @@ module Datadog

DEFAULT_PEER_SERVICE_NAME: "aws"

ENV_TRACE_PARENTAGE_STYLE: string
PEER_SERVICE_SOURCES: Array[String]

SPAN_COMMAND: "aws.command"
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/tracing/contrib/aws/service/base.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Datadog
module Aws
module Service
class Base
MESSAGE_ATTRIBUTES_LIMIT: int

def add_tags: (untyped span, untyped params) -> nil
end
end
Expand Down
7 changes: 7 additions & 0 deletions sig/datadog/tracing/contrib/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ module Datadog
module Contrib
module Component
@registry: Hash[String, Proc]

def self.register: (string name) { (untyped) -> void } -> void
def self.configure: (Core::Configuration::Settings config) -> void

private

def self.unregister: (string name) -> void
end
end
end
Expand Down
Loading
Loading