Skip to content

Commit

Permalink
Tests, docs, cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
frigus02 committed May 11, 2024
1 parent 77e39f6 commit 3c604ce
Showing 5 changed files with 173 additions and 74 deletions.
80 changes: 54 additions & 26 deletions src/convert.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::{
models::{MsLink, Properties, SeverityLevel, MS_LINKS_KEY, MS_LINKS_MAX_LEN},
Error,
};
use crate::models::{serialize_ms_links, Properties, SeverityLevel, MS_LINKS_KEY};
use chrono::{DateTime, SecondsFormat, Utc};
use opentelemetry::{
trace::{Link, Status, TraceError},
trace::{Link, Status},
KeyValue, Value,
};
use opentelemetry_sdk::Resource;
@@ -37,31 +34,13 @@ pub(crate) fn attrs_to_properties(
) -> Option<Properties> {
let mut properties: Properties = attributes
.iter()
.filter(|kv| !kv.key.as_str().starts_with("_MS."))
.map(|kv| ((&kv.key).into(), (&kv.value).into()))
.chain(resource.iter().map(|(k, v)| (k.into(), v.into())))
.collect();

if !links.is_empty() {
let ms_links: Vec<MsLink> = links
.iter()
.take(MS_LINKS_MAX_LEN)
.map(|link| MsLink {
operation_id: link.span_context.trace_id().to_string(),
id: link.span_context.span_id().to_string(),
})
.collect();
match serde_json::to_string(&ms_links) {
Ok(links_json) => {
properties.insert(MS_LINKS_KEY.into(), links_json.into());
}
Err(err) => {
// Don't want to fail the entire export, but it's probably a good idea to inform
// the developer.
opentelemetry::global::handle_error(TraceError::Other(
Error::SerializeLinks(err).into(),
));
}
};
properties.insert(MS_LINKS_KEY.into(), serialize_ms_links(links).into());
}

Some(properties).filter(|x| !x.is_empty())
@@ -77,7 +56,8 @@ pub(crate) fn attrs_to_map(attributes: &[KeyValue]) -> HashMap<&str, &Value> {
pub(crate) fn attrs_map_to_properties(attributes: HashMap<&str, &Value>) -> Option<Properties> {
let properties: Properties = attributes
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.filter(|(&k, _)| !k.starts_with("_MS."))
.map(|(&k, &v)| (k.into(), v.into()))
.collect();

Some(properties).filter(|x| !x.is_empty())
@@ -110,10 +90,58 @@ pub(crate) fn value_to_severity_level(value: &Value) -> Option<SeverityLevel> {
#[cfg(test)]
mod tests {
use super::*;
use crate::models::MS_LINKS_MAX_LEN;
use opentelemetry::trace::SpanContext;
use test_case::test_case;

#[test_case(Duration::from_micros(123456789123), "1.10:17:36.789123" ; "all")]
fn duration(duration: Duration, expected: &'static str) {
assert_eq!(expected.to_string(), duration_to_string(duration));
}

#[test]
fn attrs_to_properties_filters_ms() {
let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")];
let props = attrs_to_properties(&attrs, &Resource::empty(), &[]).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(props.get(&"a".into()).unwrap().as_ref(), "b");
}

#[test]
fn attrs_to_properties_encodes_links() {
let links = vec![Link::new(SpanContext::empty_context(), Vec::new())];
let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(
props.get(&"_MS.links".into()).unwrap().as_ref(),
"[{\"operation_Id\":\"00000000000000000000000000000000\",\"id\":\"0000000000000000\"}]"
);
}

#[test]
fn attrs_to_properties_encodes_many_links() {
let input_len = MS_LINKS_MAX_LEN + 10;
let mut links = Vec::with_capacity(input_len);
for _ in 0..input_len {
links.push(Link::new(SpanContext::empty_context(), Vec::new()));
}
let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap();
assert_eq!(props.len(), 1);
let encoded_links = props.get(&"_MS.links".into()).unwrap();
let deserialized: serde_json::Value = serde_json::from_str(encoded_links.as_ref()).unwrap();
match deserialized {
serde_json::Value::Array(arr) => assert_eq!(arr.len(), MS_LINKS_MAX_LEN),
_ => panic!("Expected links to be serialized as JSON array"),
}
}

#[test]
fn attrs_map_to_properties_filters_ms() {
let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")];
let attrs_map = attrs_to_map(&attrs);
assert_eq!(attrs_map.len(), 2);
let props = attrs_map_to_properties(attrs_map).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(props.get(&"a".into()), Some(&"b".into()));
}
}
4 changes: 0 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -834,10 +834,6 @@ pub enum Error {
#[cfg_attr(docsrs, doc(cfg(feature = "live-metrics")))]
#[error("stop live metrics failed with {0}")]
QuickPulseShutdown(opentelemetry_sdk::runtime::TrySendError),

/// Failed to serialize span links.
#[error("serializing span links failed with {0}")]
SerializeLinks(serde_json::Error),
}

impl ExportError for Error {
56 changes: 34 additions & 22 deletions src/models/ms_link.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
use serde::Serialize;
//! Serialization for span links.
//!
//! Application Insights supports receiving span links as a JSON string in the property
//! `_MS.links`. This does not appear in swagger API definition, yet, as far as I can tell. Compare
//! with the different SDKs:
//!
//! - [type definition in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/7f1cb9af148b7ed7331107a3e3cffb37e8ef9409/sdk/monitor/monitor-opentelemetry-exporter/src/types.ts#L21-L28)
//! - [serialization in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/c66cad23c4b803719db65cb48a453b0adc13307b/sdk/monitor/monitor-opentelemetry-exporter/src/utils/spanUtils.ts#L149-L155)
//! - [serialization in Python exporter](https://github.com/Azure/azure-sdk-for-python/blob/aa3a4b32e4d27f15ffd6429cefacce67f5776162/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py#L517-L527)
use opentelemetry::trace::Link;

pub(crate) const MS_LINKS_KEY: &str = "_MS.links";

/// Maximum number of links that fit into data properties.
///
/// Links are serialized as a JSON array. As such they contain 1 start and 1 end character (`[]`)
/// plus 76 characters for each link:
/// - 27 characters for the skeleton: `{"operation_Id":"","id":""}`
/// - 32 characters for the trace id
/// - 16 characters for the span id
/// - 1 character for a comma (except for the last link)
/// Links are serialized as a JSON array, e.g.
///
/// ```json
/// [{"operation_Id":"77225ad66928295345ea7c9b0a97682e","id":"7c29182f74d01363"}]
/// ```
///
/// Property values can be 8192 characters max.
/// Each link is fixed length of 75 (plus 1 for the comma between links). Property values can be a
/// maximum of 8192 characters. Therefore the maximum number of links is:
///
/// ```plain
/// (8192 - 2) / 76 = 107.76...
/// ```
pub(crate) const MS_LINKS_MAX_LEN: usize = 107;

/// Link to another operation.
///
/// Does not appear in swagger API definition, yet, as far as I can tell. Compare to JS SDK:
///
/// - [type definition](https://github.com/Azure/azure-sdk-for-js/blob/7f1cb9af148b7ed7331107a3e3cffb37e8ef9409/sdk/monitor/monitor-opentelemetry-exporter/src/types.ts#L21-L28)
/// - [usage](https://github.com/Azure/azure-sdk-for-js/blob/c66cad23c4b803719db65cb48a453b0adc13307b/sdk/monitor/monitor-opentelemetry-exporter/src/utils/spanUtils.ts#L149-L155)
#[derive(Debug, Serialize)]
pub(crate) struct MsLink {
/// Operation ID.
#[serde(rename = "operation_Id")]
pub(crate) operation_id: String,

/// ID.
pub(crate) id: String,
pub(crate) fn serialize_ms_links(links: &[Link]) -> String {
let count = links.len().min(MS_LINKS_MAX_LEN);
let mut res = String::with_capacity(count * 76 + 2);
res.push('[');
for link in links.iter().take(MS_LINKS_MAX_LEN) {
res.push_str(r#"{"operation_Id":""#);
res.push_str(&link.span_context.trace_id().to_string());
res.push_str(r#"","id":""#);
res.push_str(&link.span_context.span_id().to_string());
res.push_str(r#""},"#);
}
res.pop(); // remove trailing comma
res.push(']');
res
}
72 changes: 50 additions & 22 deletions tests/http_requests.rs
Original file line number Diff line number Diff line change
@@ -9,8 +9,8 @@
use format::requests_to_string;
use opentelemetry::{
trace::{
get_active_span, mark_span_as_active, Span, SpanKind, Status, TraceContextExt, Tracer,
TracerProvider,
get_active_span, mark_span_as_active, Link, Span, SpanKind, Status, TraceContextExt,
Tracer, TracerProvider,
},
Context, KeyValue,
};
@@ -118,6 +118,12 @@ fn traces_simple() {
);
let error: Box<dyn std::error::Error> = "An error".into();
span.record_error(error.as_ref());
let async_op_builder = server_tracer
.span_builder("async operation")
.with_links(vec![Link::new(span.span_context().clone(), Vec::new())]);
let async_op_context = Context::new();
let _span =
server_tracer.build_with_context(async_op_builder, &async_op_context);
});
}

@@ -309,6 +315,8 @@ mod tick {
}

mod format {
use std::sync::OnceLock;

use flate2::read::GzDecoder;
use http::{HeaderName, Request};
use regex::Regex;
@@ -348,26 +356,46 @@ mod format {
}

fn strip_changing_values(body: &str) -> String {
let res = vec![
Regex::new(r#""(?P<field>time)": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z""#)
.unwrap(),
Regex::new(r#""(?P<field>duration)": "\d+\.\d{2}:\d{2}:\d{2}\.\d{6}""#).unwrap(),
Regex::new(r#""(?P<field>id|ai\.operation\.parentId)": "[a-z0-9]{16}""#).unwrap(),
Regex::new(r#""(?P<field>ai\.operation\.id)": "[a-z0-9]{32}""#).unwrap(),
Regex::new(r#""(?P<field>StreamId)": "[a-z0-9]{32}""#).unwrap(),
Regex::new(r#""(?P<field>Timestamp)": "/Date\(\d+\)/""#).unwrap(),
Regex::new(
r#"(?P<prefix>"\\\\Processor\(_Total\)\\\\% Processor Time",\s*)"(?P<field>Value)": \d+\.\d+"#,
).unwrap(),
Regex::new(
r#"(?P<prefix>"\\\\Memory\\\\Committed Bytes",\s*)"(?P<field>Value)": \d+\.\d+"#,
).unwrap(),
];

res.into_iter().fold(body.into(), |body, re| {
re.replace_all(&body, r#"$prefix"$field": "STRIPPED""#)
.into()
})
struct Strip {
re: Regex,
replacement: &'static str,
}
impl Strip {
fn new(re: &str) -> Self {
Self {
re: Regex::new(re).unwrap(),
replacement: r#"$prefix"$field": "STRIPPED""#,
}
}

fn json_in_json(mut self) -> Self {
self.replacement = r#"$prefix\"$field\":\"STRIPPED\""#;
self
}

fn strip(&self, s: &str) -> String {
self.re.replace_all(s, self.replacement).into()
}
}
static STRIP_CONFIGS: OnceLock<Vec<Strip>> = OnceLock::new();
let configs = STRIP_CONFIGS.get_or_init(|| {
vec![
Strip::new(r#""(?P<field>time)": "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z""#),
Strip::new(r#""(?P<field>duration)": "\d+\.\d{2}:\d{2}:\d{2}\.\d{6}""#),
Strip::new(r#""(?P<field>id|ai\.operation\.parentId)": "[a-f0-9]{16}""#),
Strip::new(r#""(?P<field>ai\.operation\.id)": "[a-f0-9]{32}""#),
Strip::new(r#""(?P<field>StreamId)": "[a-f0-9]{32}""#),
Strip::new(r#"\\"(?P<field>operation_Id)\\":\\"[a-f0-9]{32}\\""#).json_in_json(),
Strip::new(r#"\\"(?P<field>id)\\":\\"[a-f0-9]{16}\\""#).json_in_json(),
Strip::new(r#""(?P<field>Timestamp)": "/Date\(\d+\)/""#),
Strip::new(r#"(?P<prefix>"\\\\Processor\(_Total\)\\\\% Processor Time",\s*)"(?P<field>Value)": \d+\.\d+"#),
Strip::new(r#"(?P<prefix>"\\\\Memory\\\\Committed Bytes",\s*)"(?P<field>Value)": \d+\.\d+"#),
]
});

configs
.iter()
.fold(body.into(), |body, config| config.strip(&body))
}

fn pretty_print_json(body: &[u8]) -> String {
35 changes: 35 additions & 0 deletions tests/snapshots/http_requests__traces_simple.snap
Original file line number Diff line number Diff line change
@@ -7,6 +7,41 @@ host: dc.services.visualstudio.com
content-type: application/json
content-encoding: gzip

[
{
"data": {
"baseData": {
"duration": "STRIPPED",
"id": "STRIPPED",
"name": "async operation",
"properties": {
"_MS.links": "[{\"operation_Id\":\"STRIPPED\",\"id\":\"STRIPPED\"}]",
"service.name": "server",
"service.namespace": "test"
},
"resultCode": "0",
"type": "InProc",
"ver": 2
},
"baseType": "RemoteDependencyData"
},
"iKey": "0fdcec70-0ce5-4085-89d9-9ae8ead9af66",
"name": "Microsoft.ApplicationInsights.RemoteDependency",
"sampleRate": 100.0,
"tags": {
"ai.cloud.role": "test.server",
"ai.operation.id": "STRIPPED"
},
"time": "STRIPPED"
}
]


POST /v2/track HTTP/1.1
host: dc.services.visualstudio.com
content-type: application/json
content-encoding: gzip

[
{
"data": {

0 comments on commit 3c604ce

Please sign in to comment.