diff --git a/src/convert.rs b/src/convert.rs index c366cc9..941ec5f 100644 --- a/src/convert.rs +++ b/src/convert.rs @@ -1,8 +1,14 @@ -use crate::models::{Properties, SeverityLevel}; +use crate::models::{serialize_ms_links, Properties, SeverityLevel, MS_LINKS_KEY}; use chrono::{DateTime, SecondsFormat, Utc}; -use opentelemetry::{trace::Status, KeyValue, Value}; +use opentelemetry::{ + trace::{Link, Status}, + KeyValue, Value, +}; use opentelemetry_sdk::Resource; -use std::time::{Duration, SystemTime}; +use std::{ + collections::HashMap, + time::{Duration, SystemTime}, +}; pub(crate) fn duration_to_string(duration: Duration) -> String { let micros = duration.as_micros(); @@ -24,14 +30,37 @@ pub(crate) fn time_to_string(time: SystemTime) -> String { pub(crate) fn attrs_to_properties( attributes: &[KeyValue], resource: &Resource, + links: &[Link], ) -> Option { - let properties = attributes + let mut properties: Properties = attributes .iter() + .filter(|kv| !kv.key.as_str().starts_with("_MS.")) .map(|kv| ((&kv.key).into(), (&kv.value).into())) .chain(resource.iter().map(|(k, v)| (k.into(), v.into()))) .collect(); - Some(properties).filter(|x: &Properties| !x.is_empty()) + if !links.is_empty() { + properties.insert(MS_LINKS_KEY.into(), serialize_ms_links(links).into()); + } + + Some(properties).filter(|x| !x.is_empty()) +} + +pub(crate) fn attrs_to_map(attributes: &[KeyValue]) -> HashMap<&str, &Value> { + attributes + .iter() + .map(|kv| (kv.key.as_str(), &kv.value)) + .collect() +} + +pub(crate) fn attrs_map_to_properties(attributes: HashMap<&str, &Value>) -> Option { + let properties: Properties = attributes + .iter() + .filter(|(&k, _)| !k.starts_with("_MS.")) + .map(|(&k, &v)| (k.into(), v.into())) + .collect(); + + Some(properties).filter(|x| !x.is_empty()) } pub(crate) fn status_to_result_code(status: &Status) -> i32 { @@ -61,10 +90,58 @@ pub(crate) fn value_to_severity_level(value: &Value) -> Option { #[cfg(test)] mod tests { use super::*; + use crate::models::MS_LINKS_MAX_LEN; + use opentelemetry::trace::SpanContext; use test_case::test_case; #[test_case(Duration::from_micros(123456789123), "1.10:17:36.789123" ; "all")] fn duration(duration: Duration, expected: &'static str) { assert_eq!(expected.to_string(), duration_to_string(duration)); } + + #[test] + fn attrs_to_properties_filters_ms() { + let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")]; + let props = attrs_to_properties(&attrs, &Resource::empty(), &[]).unwrap(); + assert_eq!(props.len(), 1); + assert_eq!(props.get(&"a".into()).unwrap().as_ref(), "b"); + } + + #[test] + fn attrs_to_properties_encodes_links() { + let links = vec![Link::new(SpanContext::empty_context(), Vec::new())]; + let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap(); + assert_eq!(props.len(), 1); + assert_eq!( + props.get(&"_MS.links".into()).unwrap().as_ref(), + "[{\"operation_Id\":\"00000000000000000000000000000000\",\"id\":\"0000000000000000\"}]" + ); + } + + #[test] + fn attrs_to_properties_encodes_many_links() { + let input_len = MS_LINKS_MAX_LEN + 10; + let mut links = Vec::with_capacity(input_len); + for _ in 0..input_len { + links.push(Link::new(SpanContext::empty_context(), Vec::new())); + } + let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap(); + assert_eq!(props.len(), 1); + let encoded_links = props.get(&"_MS.links".into()).unwrap(); + let deserialized: serde_json::Value = serde_json::from_str(encoded_links.as_ref()).unwrap(); + match deserialized { + serde_json::Value::Array(arr) => assert_eq!(arr.len(), MS_LINKS_MAX_LEN), + _ => panic!("Expected links to be serialized as JSON array"), + } + } + + #[test] + fn attrs_map_to_properties_filters_ms() { + let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")]; + let attrs_map = attrs_to_map(&attrs); + assert_eq!(attrs_map.len(), 2); + let props = attrs_map_to_properties(attrs_map).unwrap(); + assert_eq!(props.len(), 1); + assert_eq!(props.get(&"a".into()), Some(&"b".into())); + } } diff --git a/src/models/mod.rs b/src/models/mod.rs index 35fa9df..694b0ca 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -9,6 +9,7 @@ mod exception_details; mod message_data; #[cfg(feature = "metrics")] mod metric_data; +mod ms_link; #[cfg(feature = "live-metrics")] mod quick_pulse; mod remote_dependency_data; @@ -26,6 +27,7 @@ pub(crate) use exception_details::*; pub(crate) use message_data::*; #[cfg(feature = "metrics")] pub(crate) use metric_data::*; +pub(crate) use ms_link::*; #[cfg(feature = "live-metrics")] pub(crate) use quick_pulse::*; pub(crate) use remote_dependency_data::*; diff --git a/src/models/ms_link.rs b/src/models/ms_link.rs new file mode 100644 index 0000000..10b4fc9 --- /dev/null +++ b/src/models/ms_link.rs @@ -0,0 +1,45 @@ +//! Serialization for span links. +//! +//! Application Insights supports receiving span links as a JSON string in the property +//! `_MS.links`. This does not appear in swagger API definition, yet, as far as I can tell. Compare +//! with the different SDKs: +//! +//! - [type definition in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/7f1cb9af148b7ed7331107a3e3cffb37e8ef9409/sdk/monitor/monitor-opentelemetry-exporter/src/types.ts#L21-L28) +//! - [serialization in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/c66cad23c4b803719db65cb48a453b0adc13307b/sdk/monitor/monitor-opentelemetry-exporter/src/utils/spanUtils.ts#L149-L155) +//! - [serialization in Python exporter](https://github.com/Azure/azure-sdk-for-python/blob/aa3a4b32e4d27f15ffd6429cefacce67f5776162/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py#L517-L527) + +use opentelemetry::trace::Link; + +pub(crate) const MS_LINKS_KEY: &str = "_MS.links"; + +/// Maximum number of links that fit into data properties. +/// +/// Links are serialized as a JSON array, e.g. +/// +/// ```json +/// [{"operation_Id":"77225ad66928295345ea7c9b0a97682e","id":"7c29182f74d01363"}] +/// ``` +/// +/// Each link is fixed length of 75 (plus 1 for the comma between links). Property values can be a +/// maximum of 8192 characters. Therefore the maximum number of links is: +/// +/// ```plain +/// (8192 - 2) / 76 = 107.76... +/// ``` +pub(crate) const MS_LINKS_MAX_LEN: usize = 107; + +pub(crate) fn serialize_ms_links(links: &[Link]) -> String { + let count = links.len().min(MS_LINKS_MAX_LEN); + let mut res = String::with_capacity(count * 76 + 2); + res.push('['); + for link in links.iter().take(MS_LINKS_MAX_LEN) { + res.push_str(r#"{"operation_Id":""#); + res.push_str(&link.span_context.trace_id().to_string()); + res.push_str(r#"","id":""#); + res.push_str(&link.span_context.span_id().to_string()); + res.push_str(r#""},"#); + } + res.pop(); // remove trailing comma + res.push(']'); + res +} diff --git a/src/trace.rs b/src/trace.rs index d371229..8c65d89 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -1,12 +1,11 @@ use crate::{ convert::{ - attrs_to_properties, duration_to_string, status_to_result_code, time_to_string, - value_to_severity_level, + attrs_map_to_properties, attrs_to_map, attrs_to_properties, duration_to_string, + status_to_result_code, time_to_string, value_to_severity_level, }, models::{ context_tag_keys::attrs::CUSTOM_EVENT_NAME, Data, Envelope, EventData, ExceptionData, - ExceptionDetails, LimitedLenString, MessageData, Properties, RemoteDependencyData, - RequestData, + ExceptionDetails, LimitedLenString, MessageData, RemoteDependencyData, RequestData, }, tags::{get_tags_for_event, get_tags_for_span}, Exporter, @@ -199,7 +198,7 @@ impl From<&SpanData> for RequestData { success: is_request_success(span), source: None, url: None, - properties: attrs_to_properties(&span.attributes, &span.resource), + properties: attrs_to_properties(&span.attributes, &span.resource, &span.links.links), }; let attrs: HashMap<&str, &Value> = span @@ -287,7 +286,7 @@ impl From<&SpanData> for RemoteDependencyData { data: None, target: None, type_: None, - properties: attrs_to_properties(&span.attributes, &span.resource), + properties: attrs_to_properties(&span.attributes, &span.resource, &span.links.links), }; let attrs: HashMap<&str, &Value> = span @@ -389,11 +388,7 @@ impl From<&SpanData> for RemoteDependencyData { impl From<&Event> for ExceptionData { fn from(event: &Event) -> ExceptionData { - let mut attrs: HashMap<&str, &Value> = event - .attributes - .iter() - .map(|kv| (kv.key.as_str(), &kv.value)) - .collect(); + let mut attrs = attrs_to_map(&event.attributes); let exception = ExceptionDetails { type_name: attrs .remove(semcov::trace::EXCEPTION_TYPE) @@ -410,37 +405,21 @@ impl From<&Event> for ExceptionData { ExceptionData { ver: 2, exceptions: vec![exception], - properties: Some( - attrs - .iter() - .map(|(k, v)| ((*k).into(), (*v).into())) - .collect(), - ) - .filter(|x: &Properties| !x.is_empty()), + properties: attrs_map_to_properties(attrs), } } } impl From<&Event> for EventData { fn from(event: &Event) -> EventData { - let mut attrs: HashMap<&str, &Value> = event - .attributes - .iter() - .map(|kv| (kv.key.as_str(), &kv.value)) - .collect(); + let mut attrs = attrs_to_map(&event.attributes); EventData { ver: 2, name: attrs .remove(CUSTOM_EVENT_NAME) .map(Into::into) .unwrap_or_else(|| "".into()), - properties: Some( - attrs - .iter() - .map(|(k, v)| ((*k).into(), (*v).into())) - .collect(), - ) - .filter(|x: &Properties| !x.is_empty()), + properties: attrs_map_to_properties(attrs), } } } @@ -452,11 +431,7 @@ const LEVEL: &str = "level"; impl From<&Event> for MessageData { fn from(event: &Event) -> MessageData { - let mut attrs: HashMap<&str, &Value> = event - .attributes - .iter() - .map(|kv| (kv.key.as_str(), &kv.value)) - .collect(); + let mut attrs = attrs_to_map(&event.attributes); let severity_level = attrs.get(LEVEL).and_then(|x| value_to_severity_level(x)); if severity_level.is_some() { attrs.remove(LEVEL); @@ -469,13 +444,7 @@ impl From<&Event> for MessageData { } else { event.name.clone().into_owned().into() }, - properties: Some( - attrs - .iter() - .map(|(k, v)| ((*k).into(), (*v).into())) - .collect(), - ) - .filter(|x: &Properties| !x.is_empty()), + properties: attrs_map_to_properties(attrs), } } } diff --git a/tests/http_requests.rs b/tests/http_requests.rs index 581f729..1f6ff3d 100644 --- a/tests/http_requests.rs +++ b/tests/http_requests.rs @@ -9,8 +9,8 @@ use format::requests_to_string; use opentelemetry::{ trace::{ - get_active_span, mark_span_as_active, Span, SpanKind, Status, TraceContextExt, Tracer, - TracerProvider, + get_active_span, mark_span_as_active, Link, Span, SpanKind, Status, TraceContextExt, + Tracer, TracerProvider, }, Context, KeyValue, }; @@ -118,6 +118,12 @@ fn traces_simple() { ); let error: Box = "An error".into(); span.record_error(error.as_ref()); + let async_op_builder = server_tracer + .span_builder("async operation") + .with_links(vec![Link::new(span.span_context().clone(), Vec::new())]); + let async_op_context = Context::new(); + let _span = + server_tracer.build_with_context(async_op_builder, &async_op_context); }); } @@ -309,6 +315,8 @@ mod tick { } mod format { + use std::sync::OnceLock; + use flate2::read::GzDecoder; use http::{HeaderName, Request}; use regex::Regex; @@ -348,26 +356,46 @@ mod format { } fn strip_changing_values(body: &str) -> String { - let res = vec![ - Regex::new(r#""(?Ptime)": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z""#) - .unwrap(), - Regex::new(r#""(?Pduration)": "\d+\.\d{2}:\d{2}:\d{2}\.\d{6}""#).unwrap(), - Regex::new(r#""(?Pid|ai\.operation\.parentId)": "[a-z0-9]{16}""#).unwrap(), - Regex::new(r#""(?Pai\.operation\.id)": "[a-z0-9]{32}""#).unwrap(), - Regex::new(r#""(?PStreamId)": "[a-z0-9]{32}""#).unwrap(), - Regex::new(r#""(?PTimestamp)": "/Date\(\d+\)/""#).unwrap(), - Regex::new( - r#"(?P"\\\\Processor\(_Total\)\\\\% Processor Time",\s*)"(?PValue)": \d+\.\d+"#, - ).unwrap(), - Regex::new( - r#"(?P"\\\\Memory\\\\Committed Bytes",\s*)"(?PValue)": \d+\.\d+"#, - ).unwrap(), - ]; - - res.into_iter().fold(body.into(), |body, re| { - re.replace_all(&body, r#"$prefix"$field": "STRIPPED""#) - .into() - }) + struct Strip { + re: Regex, + replacement: &'static str, + } + impl Strip { + fn new(re: &str) -> Self { + Self { + re: Regex::new(re).unwrap(), + replacement: r#"$prefix"$field": "STRIPPED""#, + } + } + + fn json_in_json(mut self) -> Self { + self.replacement = r#"$prefix\"$field\":\"STRIPPED\""#; + self + } + + fn strip(&self, s: &str) -> String { + self.re.replace_all(s, self.replacement).into() + } + } + static STRIP_CONFIGS: OnceLock> = OnceLock::new(); + let configs = STRIP_CONFIGS.get_or_init(|| { + vec![ + Strip::new(r#""(?Ptime)": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z""#), + Strip::new(r#""(?Pduration)": "\d+\.\d{2}:\d{2}:\d{2}\.\d{6}""#), + Strip::new(r#""(?Pid|ai\.operation\.parentId)": "[a-f0-9]{16}""#), + Strip::new(r#""(?Pai\.operation\.id)": "[a-f0-9]{32}""#), + Strip::new(r#""(?PStreamId)": "[a-f0-9]{32}""#), + Strip::new(r#"\\"(?Poperation_Id)\\":\\"[a-f0-9]{32}\\""#).json_in_json(), + Strip::new(r#"\\"(?Pid)\\":\\"[a-f0-9]{16}\\""#).json_in_json(), + Strip::new(r#""(?PTimestamp)": "/Date\(\d+\)/""#), + Strip::new(r#"(?P"\\\\Processor\(_Total\)\\\\% Processor Time",\s*)"(?PValue)": \d+\.\d+"#), + Strip::new(r#"(?P"\\\\Memory\\\\Committed Bytes",\s*)"(?PValue)": \d+\.\d+"#), + ] + }); + + configs + .iter() + .fold(body.into(), |body, config| config.strip(&body)) } fn pretty_print_json(body: &[u8]) -> String { diff --git a/tests/snapshots/http_requests__traces_simple.snap b/tests/snapshots/http_requests__traces_simple.snap index 5994031..10ff87f 100644 --- a/tests/snapshots/http_requests__traces_simple.snap +++ b/tests/snapshots/http_requests__traces_simple.snap @@ -7,6 +7,41 @@ host: dc.services.visualstudio.com content-type: application/json content-encoding: gzip +[ + { + "data": { + "baseData": { + "duration": "STRIPPED", + "id": "STRIPPED", + "name": "async operation", + "properties": { + "_MS.links": "[{\"operation_Id\":\"STRIPPED\",\"id\":\"STRIPPED\"}]", + "service.name": "server", + "service.namespace": "test" + }, + "resultCode": "0", + "type": "InProc", + "ver": 2 + }, + "baseType": "RemoteDependencyData" + }, + "iKey": "0fdcec70-0ce5-4085-89d9-9ae8ead9af66", + "name": "Microsoft.ApplicationInsights.RemoteDependency", + "sampleRate": 100.0, + "tags": { + "ai.cloud.role": "test.server", + "ai.operation.id": "STRIPPED" + }, + "time": "STRIPPED" + } +] + + +POST /v2/track HTTP/1.1 +host: dc.services.visualstudio.com +content-type: application/json +content-encoding: gzip + [ { "data": {