Skip to content

Commit

Permalink
feat: support Remote Config sampling rules (#116)
Browse files Browse the repository at this point in the history
Now, sampling remote configuration do not create a new trace sampler
which had the side effect to reset the rate limiter.

- add sampling rules RC support
- fix: report telemetry sampling rules
- fix: report telemetry rps and sample rate for span
- report remote trace sample rate as RULE instead of REMOTE_RULE for legacy reasons
- update REMOTE_RULES and REMOTE_ADAPTIVE_RULE values to match the spec
- report default sample rate for telemetry
- add _dd.psr for new remote rules
  • Loading branch information
dmehala authored May 23, 2024
1 parent 9fedff2 commit 0ada79d
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 90 deletions.
114 changes: 96 additions & 18 deletions src/datadog/config_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,81 @@

namespace datadog {
namespace tracing {
namespace {

using Rules =
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash>;

Expected<Rules> parse_trace_sampling_rules(const nlohmann::json& json_rules) {
Rules parsed_rules;

std::string type = json_rules.type_name();
if (type != "array") {
std::string message;
return Error{Error::TRACE_SAMPLING_RULES_WRONG_TYPE, std::move(message)};
}

for (const auto& json_rule : json_rules) {
auto matcher = SpanMatcher::from_json(json_rule);
if (auto* error = matcher.if_error()) {
std::string prefix;
return error->with_prefix(prefix);
}

TraceSamplerRate rate;
if (auto sample_rate = json_rule.find("sample_rate");
sample_rate != json_rule.end()) {
type = sample_rate->type_name();
if (type != "number") {
std::string message;
return Error{Error::TRACE_SAMPLING_RULES_SAMPLE_RATE_WRONG_TYPE,
std::move(message)};
}

auto maybe_rate = Rate::from(*sample_rate);
if (auto error = maybe_rate.if_error()) {
return *error;
}

rate.value = *maybe_rate;
}

if (auto provenance_it = json_rule.find("provenance");
provenance_it != json_rule.cend()) {
if (!provenance_it->is_string()) {
std::string message;
return Error{Error::TRACE_SAMPLING_RULES_SAMPLE_RATE_WRONG_TYPE,
std::move(message)};
}

auto provenance = provenance_it->get<std::string_view>();
if (provenance == "customer") {
rate.mechanism = SamplingMechanism::REMOTE_RULE;
} else if (provenance == "dynamic") {
rate.mechanism = SamplingMechanism::REMOTE_ADAPTIVE_RULE;
}
}

parsed_rules.emplace(std::move(*matcher), std::move(rate));
}

return parsed_rules;
}

} // namespace

ConfigManager::ConfigManager(const FinalizedTracerConfig& config)
: clock_(config.clock),
default_metadata_(config.metadata),
trace_sampler_(
std::make_shared<TraceSampler>(config.trace_sampler, clock_)),
rules_(config.trace_sampler.rules),
span_defaults_(std::make_shared<SpanDefaults>(config.defaults)),
report_traces_(config.report_traces) {}

std::shared_ptr<TraceSampler> ConfigManager::trace_sampler() {
std::lock_guard<std::mutex> lock(mutex_);
return trace_sampler_.value();
return trace_sampler_;
}

std::shared_ptr<const SpanDefaults> ConfigManager::span_defaults() {
Expand All @@ -35,32 +98,48 @@ std::vector<ConfigMetadata> ConfigManager::update(const ConfigUpdate& conf) {

std::lock_guard<std::mutex> lock(mutex_);

decltype(rules_) rules;

if (!conf.trace_sampling_rate) {
reset_config(ConfigName::TRACE_SAMPLING_RATE, trace_sampler_, metadata);
auto found = default_metadata_.find(ConfigName::TRACE_SAMPLING_RATE);
if (found != default_metadata_.cend()) {
metadata.push_back(found->second);
}
} else {
ConfigMetadata trace_sampling_metadata(
ConfigName::TRACE_SAMPLING_RATE,
to_string(*conf.trace_sampling_rate, 1),
ConfigMetadata::Origin::REMOTE_CONFIG);

TraceSamplerConfig trace_sampler_cfg;
trace_sampler_cfg.sample_rate = *conf.trace_sampling_rate;
auto rate = Rate::from(*conf.trace_sampling_rate);
rules[catch_all] = TraceSamplerRate{*rate, SamplingMechanism::RULE};

metadata.emplace_back(std::move(trace_sampling_metadata));
}

auto finalized_trace_sampler_cfg = finalize_config(trace_sampler_cfg);
if (auto error = finalized_trace_sampler_cfg.if_error()) {
trace_sampling_metadata.error = *error;
if (!conf.trace_sampling_rules) {
auto found = default_metadata_.find(ConfigName::TRACE_SAMPLING_RULES);
if (found != default_metadata_.cend()) {
metadata.emplace_back(found->second);
}
} else {
ConfigMetadata trace_sampling_rules_metadata(
ConfigName::TRACE_SAMPLING_RULES, conf.trace_sampling_rules->dump(),
ConfigMetadata::Origin::REMOTE_CONFIG);

auto trace_sampler =
std::make_shared<TraceSampler>(*finalized_trace_sampler_cfg, clock_);
auto maybe_rules = parse_trace_sampling_rules(*conf.trace_sampling_rules);
if (auto error = maybe_rules.if_error()) {
trace_sampling_rules_metadata.error = std::move(*error);
} else {
rules.merge(*maybe_rules);
}

// This reset rate limiting and `TraceSampler` has no `operator==`.
// TODO: Instead of creating another `TraceSampler`, we should
// update the default sampling rate.
trace_sampler_ = std::move(trace_sampler);
metadata.emplace_back(std::move(trace_sampling_metadata));
metadata.emplace_back(std::move(trace_sampling_rules_metadata));
}

rules.insert(rules_.cbegin(), rules_.cend());
trace_sampler_->set_rules(rules);

if (!conf.tags) {
reset_config(ConfigName::TAGS, span_defaults_, metadata);
} else {
Expand Down Expand Up @@ -109,10 +188,9 @@ std::vector<ConfigMetadata> ConfigManager::reset() { return update({}); }

nlohmann::json ConfigManager::config_json() const {
std::lock_guard<std::mutex> lock(mutex_);
return nlohmann::json{
{"defaults", to_json(*span_defaults_.value())},
{"trace_sampler", trace_sampler_.value()->config_json()},
{"report_traces", report_traces_.value()}};
return nlohmann::json{{"defaults", to_json(*span_defaults_.value())},
{"trace_sampler", trace_sampler_->config_json()},
{"report_traces", report_traces_.value()}};
}

} // namespace tracing
Expand Down
4 changes: 3 additions & 1 deletion src/datadog/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class ConfigManager {
Clock clock_;
std::unordered_map<ConfigName, ConfigMetadata> default_metadata_;

DynamicConfig<std::shared_ptr<TraceSampler>> trace_sampler_;
std::shared_ptr<TraceSampler> trace_sampler_;
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash> rules_;

DynamicConfig<std::shared_ptr<const SpanDefaults>> span_defaults_;
DynamicConfig<bool> report_traces_;

Expand Down
1 change: 1 addition & 0 deletions src/datadog/config_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct ConfigUpdate {
Optional<bool> report_traces;
Optional<double> trace_sampling_rate;
Optional<std::vector<StringView>> tags;
const nlohmann::json* trace_sampling_rules = nullptr;
};

} // namespace tracing
Expand Down
11 changes: 9 additions & 2 deletions src/datadog/remote_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace {
enum CapabilitiesFlag : uint64_t {
APM_TRACING_SAMPLE_RATE = 1 << 12,
APM_TRACING_TAGS = 1 << 15,
APM_TRACING_ENABLED = 1 << 19
APM_TRACING_ENABLED = 1 << 19,
APM_TRACING_SAMPLE_RULES = 1 << 29,
};

constexpr std::array<uint8_t, sizeof(uint64_t)> capabilities_byte_array(
Expand All @@ -46,7 +47,7 @@ constexpr std::array<uint8_t, sizeof(uint64_t)> capabilities_byte_array(

constexpr std::array<uint8_t, sizeof(uint64_t)> k_apm_capabilities =
capabilities_byte_array(APM_TRACING_SAMPLE_RATE | APM_TRACING_TAGS |
APM_TRACING_ENABLED);
APM_TRACING_ENABLED | APM_TRACING_SAMPLE_RULES);

constexpr StringView k_apm_product = "APM_TRACING";
constexpr StringView k_apm_product_path_substring = "/APM_TRACING/";
Expand All @@ -69,6 +70,12 @@ ConfigUpdate parse_dynamic_config(const nlohmann::json& j) {
config_update.report_traces = tracing_enabled_it->get<bool>();
}

if (auto tracing_sampling_rules_it = j.find("tracing_sampling_rules");
tracing_sampling_rules_it != j.cend() &&
tracing_sampling_rules_it->is_array()) {
config_update.trace_sampling_rules = &(*tracing_sampling_rules_it);
}

return config_update;
}

Expand Down
7 changes: 7 additions & 0 deletions src/datadog/sampling_mechanism.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ enum class SamplingMechanism {
// Individual span kept by a matching span sampling rule when the enclosing
// trace was dropped.
SPAN_RULE = 8,
// Reserved for future use.
OTLP_RULE = 9,
// Sampling rule configured by user via remote configuration.
REMOTE_RULE = 11,
// Adaptive sampling rule automatically computed by Datadog backend and sent
// via remote configuration.
REMOTE_ADAPTIVE_RULE = 12,
};

} // namespace tracing
Expand Down
16 changes: 16 additions & 0 deletions src/datadog/span_matcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,23 @@ struct SpanMatcher {
nlohmann::json to_json() const;

static Expected<SpanMatcher> from_json(const nlohmann::json&);

bool operator==(const SpanMatcher& other) const {
return (service == other.service && name == other.name &&
resource == other.resource && tags == other.tags);
}

// TODO: add tags
struct Hash {
size_t operator()(const SpanMatcher& rule) const {
return std::hash<std::string>()(rule.service) ^
(std::hash<std::string>()(rule.name) << 1) ^
(std::hash<std::string>()(rule.resource) << 2);
}
};
};

static const SpanMatcher catch_all;

} // namespace tracing
} // namespace datadog
7 changes: 6 additions & 1 deletion src/datadog/span_sampler_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ namespace {
std::string to_string(const std::vector<SpanSamplerConfig::Rule> &rules) {
nlohmann::json res;
for (const auto &r : rules) {
res.emplace_back(r.to_json());
auto j = r.to_json();
j["sample_rate"] = r.sample_rate;
if (r.max_per_second) {
j["max_per_second"] = *r.max_per_second;
}
res.emplace_back(std::move(j));
}

return res.dump();
Expand Down
27 changes: 18 additions & 9 deletions src/datadog/trace_sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,32 @@ TraceSampler::TraceSampler(const FinalizedTraceSamplerConfig& config,
limiter_(clock, config.max_per_second),
limiter_max_per_second_(config.max_per_second) {}

void TraceSampler::set_rules(
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash>
rules) {
std::lock_guard lock(mutex_);
rules_ = std::move(rules);
}

SamplingDecision TraceSampler::decide(const SpanData& span) {
SamplingDecision decision;
decision.origin = SamplingDecision::Origin::LOCAL;

// First check sampling rules.
auto found_rule =
std::find_if(rules_.begin(), rules_.end(),
[&](const auto& rule) { return rule.match(span); });
const auto found_rule =
std::find_if(rules_.cbegin(), rules_.cend(),
[&](const auto& it) { return it.first.match(span); });

// `mutex_` protects `limiter_`, `collector_sample_rates_`, and
// `collector_default_sample_rate_`, so let's lock it here.
std::lock_guard lock(mutex_);

if (found_rule != rules_.end()) {
const auto& rule = *found_rule;
decision.mechanism = int(SamplingMechanism::RULE);
const auto& [rule, rate] = *found_rule;
decision.mechanism = int(rate.mechanism);
decision.limiter_max_per_second = limiter_max_per_second_;
decision.configured_rate = rule.sample_rate;
const std::uint64_t threshold = max_id_from_rate(rule.sample_rate);
decision.configured_rate = rate.value;
const std::uint64_t threshold = max_id_from_rate(rate.value);
if (knuth_hash(span.trace_id.low) < threshold) {
const auto result = limiter_.allow();
if (result.allowed) {
Expand Down Expand Up @@ -99,8 +106,10 @@ void TraceSampler::handle_collector_response(

nlohmann::json TraceSampler::config_json() const {
std::vector<nlohmann::json> rules;
for (const auto& rule : rules_) {
rules.push_back(to_json(rule));
for (const auto& [rule, rate] : rules_) {
nlohmann::json j = rule.to_json();
j["sampling_rate"] = rate.value.value();
rules.push_back(std::move(j));
}

return nlohmann::json::object({
Expand Down
8 changes: 6 additions & 2 deletions src/datadog/trace_sampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,22 @@ struct SamplingDecision;
struct SpanData;

class TraceSampler {
private:
std::mutex mutex_;

Optional<Rate> collector_default_sample_rate_;
std::unordered_map<std::string, Rate> collector_sample_rates_;

std::vector<FinalizedTraceSamplerConfig::Rule> rules_;
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash> rules_;
Limiter limiter_;
double limiter_max_per_second_;

public:
TraceSampler(const FinalizedTraceSamplerConfig& config, const Clock& clock);

void set_rules(
std::unordered_map<SpanMatcher, TraceSamplerRate, SpanMatcher::Hash>
rules);

// Return a sampling decision for the specified root span.
SamplingDecision decide(const SpanData&);

Expand Down
Loading

0 comments on commit 0ada79d

Please sign in to comment.