Skip to content

Commit

Permalink
Support span links
Browse files Browse the repository at this point in the history
  • Loading branch information
frigus02 committed May 15, 2024
1 parent 18ae326 commit b713bba
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 69 deletions.
87 changes: 82 additions & 5 deletions src/convert.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use crate::models::{Properties, SeverityLevel};
use crate::models::{serialize_ms_links, Properties, SeverityLevel, MS_LINKS_KEY};
use chrono::{DateTime, SecondsFormat, Utc};
use opentelemetry::{trace::Status, KeyValue, Value};
use opentelemetry::{
trace::{Link, Status},
KeyValue, Value,
};
use opentelemetry_sdk::Resource;
use std::time::{Duration, SystemTime};
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};

pub(crate) fn duration_to_string(duration: Duration) -> String {
let micros = duration.as_micros();
Expand All @@ -24,14 +30,37 @@ pub(crate) fn time_to_string(time: SystemTime) -> String {
pub(crate) fn attrs_to_properties(
attributes: &[KeyValue],
resource: &Resource,
links: &[Link],
) -> Option<Properties> {
let properties = attributes
let mut properties: Properties = attributes
.iter()
.filter(|kv| !kv.key.as_str().starts_with("_MS."))
.map(|kv| ((&kv.key).into(), (&kv.value).into()))
.chain(resource.iter().map(|(k, v)| (k.into(), v.into())))
.collect();

Some(properties).filter(|x: &Properties| !x.is_empty())
if !links.is_empty() {
properties.insert(MS_LINKS_KEY.into(), serialize_ms_links(links).into());
}

Some(properties).filter(|x| !x.is_empty())
}

pub(crate) fn attrs_to_map(attributes: &[KeyValue]) -> HashMap<&str, &Value> {
attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect()
}

pub(crate) fn attrs_map_to_properties(attributes: HashMap<&str, &Value>) -> Option<Properties> {
let properties: Properties = attributes
.iter()
.filter(|(&k, _)| !k.starts_with("_MS."))
.map(|(&k, &v)| (k.into(), v.into()))
.collect();

Some(properties).filter(|x| !x.is_empty())
}

pub(crate) fn status_to_result_code(status: &Status) -> i32 {
Expand Down Expand Up @@ -61,10 +90,58 @@ pub(crate) fn value_to_severity_level(value: &Value) -> Option<SeverityLevel> {
#[cfg(test)]
mod tests {
use super::*;
use crate::models::MS_LINKS_MAX_LEN;
use opentelemetry::trace::SpanContext;
use test_case::test_case;

#[test_case(Duration::from_micros(123456789123), "1.10:17:36.789123" ; "all")]
fn duration(duration: Duration, expected: &'static str) {
assert_eq!(expected.to_string(), duration_to_string(duration));
}

#[test]
fn attrs_to_properties_filters_ms() {
let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")];
let props = attrs_to_properties(&attrs, &Resource::empty(), &[]).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(props.get(&"a".into()).unwrap().as_ref(), "b");
}

#[test]
fn attrs_to_properties_encodes_links() {
let links = vec![Link::new(SpanContext::empty_context(), Vec::new())];
let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(
props.get(&"_MS.links".into()).unwrap().as_ref(),
"[{\"operation_Id\":\"00000000000000000000000000000000\",\"id\":\"0000000000000000\"}]"
);
}

#[test]
fn attrs_to_properties_encodes_many_links() {
let input_len = MS_LINKS_MAX_LEN + 10;
let mut links = Vec::with_capacity(input_len);
for _ in 0..input_len {
links.push(Link::new(SpanContext::empty_context(), Vec::new()));
}
let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap();
assert_eq!(props.len(), 1);
let encoded_links = props.get(&"_MS.links".into()).unwrap();
let deserialized: serde_json::Value = serde_json::from_str(encoded_links.as_ref()).unwrap();
match deserialized {
serde_json::Value::Array(arr) => assert_eq!(arr.len(), MS_LINKS_MAX_LEN),
_ => panic!("Expected links to be serialized as JSON array"),
}
}

#[test]
fn attrs_map_to_properties_filters_ms() {
let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")];
let attrs_map = attrs_to_map(&attrs);
assert_eq!(attrs_map.len(), 2);
let props = attrs_map_to_properties(attrs_map).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(props.get(&"a".into()), Some(&"b".into()));
}
}
2 changes: 2 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod exception_details;
mod message_data;
#[cfg(feature = "metrics")]
mod metric_data;
mod ms_link;
#[cfg(feature = "live-metrics")]
mod quick_pulse;
mod remote_dependency_data;
Expand All @@ -26,6 +27,7 @@ pub(crate) use exception_details::*;
pub(crate) use message_data::*;
#[cfg(feature = "metrics")]
pub(crate) use metric_data::*;
pub(crate) use ms_link::*;
#[cfg(feature = "live-metrics")]
pub(crate) use quick_pulse::*;
pub(crate) use remote_dependency_data::*;
Expand Down
45 changes: 45 additions & 0 deletions src/models/ms_link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Serialization for span links.
//!
//! Application Insights supports receiving span links as a JSON string in the property
//! `_MS.links`. This does not appear in swagger API definition, yet, as far as I can tell. Compare
//! with the different SDKs:
//!
//! - [type definition in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/7f1cb9af148b7ed7331107a3e3cffb37e8ef9409/sdk/monitor/monitor-opentelemetry-exporter/src/types.ts#L21-L28)
//! - [serialization in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/c66cad23c4b803719db65cb48a453b0adc13307b/sdk/monitor/monitor-opentelemetry-exporter/src/utils/spanUtils.ts#L149-L155)
//! - [serialization in Python exporter](https://github.com/Azure/azure-sdk-for-python/blob/aa3a4b32e4d27f15ffd6429cefacce67f5776162/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py#L517-L527)
use opentelemetry::trace::Link;

pub(crate) const MS_LINKS_KEY: &str = "_MS.links";

/// Maximum number of links that fit into data properties.
///
/// Links are serialized as a JSON array, e.g.
///
/// ```json
/// [{"operation_Id":"77225ad66928295345ea7c9b0a97682e","id":"7c29182f74d01363"}]
/// ```
///
/// Each link is fixed length of 75 (plus 1 for the comma between links). Property values can be a
/// maximum of 8192 characters. Therefore the maximum number of links is:
///
/// ```plain
/// (8192 - 2) / 76 = 107.76...
/// ```
pub(crate) const MS_LINKS_MAX_LEN: usize = 107;

pub(crate) fn serialize_ms_links(links: &[Link]) -> String {
let count = links.len().min(MS_LINKS_MAX_LEN);
let mut res = String::with_capacity(count * 76 + 2);
res.push('[');
for link in links.iter().take(MS_LINKS_MAX_LEN) {
res.push_str(r#"{"operation_Id":""#);
res.push_str(&link.span_context.trace_id().to_string());
res.push_str(r#"","id":""#);
res.push_str(&link.span_context.span_id().to_string());
res.push_str(r#""},"#);
}
res.pop(); // remove trailing comma
res.push(']');
res
}
53 changes: 11 additions & 42 deletions src/trace.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::{
convert::{
attrs_to_properties, duration_to_string, status_to_result_code, time_to_string,
value_to_severity_level,
attrs_map_to_properties, attrs_to_map, attrs_to_properties, duration_to_string,
status_to_result_code, time_to_string, value_to_severity_level,
},
models::{
context_tag_keys::attrs::CUSTOM_EVENT_NAME, Data, Envelope, EventData, ExceptionData,
ExceptionDetails, LimitedLenString, MessageData, Properties, RemoteDependencyData,
RequestData,
ExceptionDetails, LimitedLenString, MessageData, RemoteDependencyData, RequestData,
},
tags::{get_tags_for_event, get_tags_for_span},
Exporter,
Expand Down Expand Up @@ -199,7 +198,7 @@ impl From<&SpanData> for RequestData {
success: is_request_success(span),
source: None,
url: None,
properties: attrs_to_properties(&span.attributes, &span.resource),
properties: attrs_to_properties(&span.attributes, &span.resource, &span.links.links),
};

let attrs: HashMap<&str, &Value> = span
Expand Down Expand Up @@ -287,7 +286,7 @@ impl From<&SpanData> for RemoteDependencyData {
data: None,
target: None,
type_: None,
properties: attrs_to_properties(&span.attributes, &span.resource),
properties: attrs_to_properties(&span.attributes, &span.resource, &span.links.links),
};

let attrs: HashMap<&str, &Value> = span
Expand Down Expand Up @@ -389,11 +388,7 @@ impl From<&SpanData> for RemoteDependencyData {

impl From<&Event> for ExceptionData {
fn from(event: &Event) -> ExceptionData {
let mut attrs: HashMap<&str, &Value> = event
.attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect();
let mut attrs = attrs_to_map(&event.attributes);
let exception = ExceptionDetails {
type_name: attrs
.remove(semcov::trace::EXCEPTION_TYPE)
Expand All @@ -410,37 +405,21 @@ impl From<&Event> for ExceptionData {
ExceptionData {
ver: 2,
exceptions: vec![exception],
properties: Some(
attrs
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.collect(),
)
.filter(|x: &Properties| !x.is_empty()),
properties: attrs_map_to_properties(attrs),
}
}
}

impl From<&Event> for EventData {
fn from(event: &Event) -> EventData {
let mut attrs: HashMap<&str, &Value> = event
.attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect();
let mut attrs = attrs_to_map(&event.attributes);
EventData {
ver: 2,
name: attrs
.remove(CUSTOM_EVENT_NAME)
.map(Into::into)
.unwrap_or_else(|| "<no name>".into()),
properties: Some(
attrs
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.collect(),
)
.filter(|x: &Properties| !x.is_empty()),
properties: attrs_map_to_properties(attrs),
}
}
}
Expand All @@ -452,11 +431,7 @@ const LEVEL: &str = "level";

impl From<&Event> for MessageData {
fn from(event: &Event) -> MessageData {
let mut attrs: HashMap<&str, &Value> = event
.attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect();
let mut attrs = attrs_to_map(&event.attributes);
let severity_level = attrs.get(LEVEL).and_then(|x| value_to_severity_level(x));
if severity_level.is_some() {
attrs.remove(LEVEL);
Expand All @@ -469,13 +444,7 @@ impl From<&Event> for MessageData {
} else {
event.name.clone().into_owned().into()
},
properties: Some(
attrs
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.collect(),
)
.filter(|x: &Properties| !x.is_empty()),
properties: attrs_map_to_properties(attrs),
}
}
}
72 changes: 50 additions & 22 deletions tests/http_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
use format::requests_to_string;
use opentelemetry::{
trace::{
get_active_span, mark_span_as_active, Span, SpanKind, Status, TraceContextExt, Tracer,
TracerProvider,
get_active_span, mark_span_as_active, Link, Span, SpanKind, Status, TraceContextExt,
Tracer, TracerProvider,
},
Context, KeyValue,
};
Expand Down Expand Up @@ -118,6 +118,12 @@ fn traces_simple() {
);
let error: Box<dyn std::error::Error> = "An error".into();
span.record_error(error.as_ref());
let async_op_builder = server_tracer
.span_builder("async operation")
.with_links(vec![Link::new(span.span_context().clone(), Vec::new())]);
let async_op_context = Context::new();
let _span =
server_tracer.build_with_context(async_op_builder, &async_op_context);
});
}

Expand Down Expand Up @@ -309,6 +315,8 @@ mod tick {
}

mod format {
use std::sync::OnceLock;

use flate2::read::GzDecoder;
use http::{HeaderName, Request};
use regex::Regex;
Expand Down Expand Up @@ -348,26 +356,46 @@ mod format {
}

fn strip_changing_values(body: &str) -> String {
let res = vec![
Regex::new(r#""(?P<field>time)": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z""#)
.unwrap(),
Regex::new(r#""(?P<field>duration)": "\d+\.\d{2}:\d{2}:\d{2}\.\d{6}""#).unwrap(),
Regex::new(r#""(?P<field>id|ai\.operation\.parentId)": "[a-z0-9]{16}""#).unwrap(),
Regex::new(r#""(?P<field>ai\.operation\.id)": "[a-z0-9]{32}""#).unwrap(),
Regex::new(r#""(?P<field>StreamId)": "[a-z0-9]{32}""#).unwrap(),
Regex::new(r#""(?P<field>Timestamp)": "/Date\(\d+\)/""#).unwrap(),
Regex::new(
r#"(?P<prefix>"\\\\Processor\(_Total\)\\\\% Processor Time",\s*)"(?P<field>Value)": \d+\.\d+"#,
).unwrap(),
Regex::new(
r#"(?P<prefix>"\\\\Memory\\\\Committed Bytes",\s*)"(?P<field>Value)": \d+\.\d+"#,
).unwrap(),
];

res.into_iter().fold(body.into(), |body, re| {
re.replace_all(&body, r#"$prefix"$field": "STRIPPED""#)
.into()
})
struct Strip {
re: Regex,
replacement: &'static str,
}
impl Strip {
fn new(re: &str) -> Self {
Self {
re: Regex::new(re).unwrap(),
replacement: r#"$prefix"$field": "STRIPPED""#,
}
}

fn json_in_json(mut self) -> Self {
self.replacement = r#"$prefix\"$field\":\"STRIPPED\""#;
self
}

fn strip(&self, s: &str) -> String {
self.re.replace_all(s, self.replacement).into()
}
}
static STRIP_CONFIGS: OnceLock<Vec<Strip>> = OnceLock::new();
let configs = STRIP_CONFIGS.get_or_init(|| {
vec![
Strip::new(r#""(?P<field>time)": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z""#),
Strip::new(r#""(?P<field>duration)": "\d+\.\d{2}:\d{2}:\d{2}\.\d{6}""#),
Strip::new(r#""(?P<field>id|ai\.operation\.parentId)": "[a-f0-9]{16}""#),
Strip::new(r#""(?P<field>ai\.operation\.id)": "[a-f0-9]{32}""#),
Strip::new(r#""(?P<field>StreamId)": "[a-f0-9]{32}""#),
Strip::new(r#"\\"(?P<field>operation_Id)\\":\\"[a-f0-9]{32}\\""#).json_in_json(),
Strip::new(r#"\\"(?P<field>id)\\":\\"[a-f0-9]{16}\\""#).json_in_json(),
Strip::new(r#""(?P<field>Timestamp)": "/Date\(\d+\)/""#),
Strip::new(r#"(?P<prefix>"\\\\Processor\(_Total\)\\\\% Processor Time",\s*)"(?P<field>Value)": \d+\.\d+"#),
Strip::new(r#"(?P<prefix>"\\\\Memory\\\\Committed Bytes",\s*)"(?P<field>Value)": \d+\.\d+"#),
]
});

configs
.iter()
.fold(body.into(), |body, config| config.strip(&body))
}

fn pretty_print_json(body: &[u8]) -> String {
Expand Down
Loading

0 comments on commit b713bba

Please sign in to comment.