Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Links #73

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Set tags Cloud role and Cloud role instance also from resource attributes `k8s.{deployment,replicaset,statefulset,job,cronjob,daemonset,pod}.name`. This matches [the behavior of the JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/c66cad23c4b803719db65cb48a453b0adc13307b/sdk/monitor/monitor-opentelemetry-exporter/src/utils/common.ts#L75-L138).
- Remove option to configure temporality seletor (`with_temporality_selector`). Application Insights supports delta temporality only as defined in the [spec](https://github.com/open-telemetry/opentelemetry-specification/blob/58bfe48eabe887545198d66c43f44071b822373f/specification/metrics/sdk_exporters/otlp.md?plain=1#L46-L47).
- Add support for `ExponentialHistogram` export.
- Add support for span link export.

## [0.31.0] - 2024-05-09

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ opentelemetry-http = { version = "0.11", features = ["reqwest"] }
opentelemetry-application-insights = { path = ".", features = ["live-metrics"] }
rand = "0.8.5"
regex = "1.5.5"
reqwest = { version = "0.11", default-features = false, features = ["blocking"] }
reqwest = { version = "0.11", features = ["blocking"] }
test-case = "3.0.0"
tokio = { version = "1.17.0", features = ["rt-multi-thread", "macros", "process", "time"] }
version-sync = { version = "0.9.4", default-features = false, features = ["html_root_url_updated", "contains_regex"] }
Expand Down
11 changes: 10 additions & 1 deletion examples/attributes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use opentelemetry::{
trace::{
get_active_span, mark_span_as_active, SpanKind, TraceContextExt, Tracer, TracerProvider,
get_active_span, mark_span_as_active, Link, SpanKind, TraceContextExt, Tracer,
TracerProvider,
},
Context, KeyValue,
};
Expand Down Expand Up @@ -110,6 +111,14 @@ fn main() {
log();
custom();
exception();

get_active_span(|span| {
let async_op_builder = server_tracer
.span_builder("async operation")
.with_links(vec![Link::new(span.span_context().clone(), Vec::new())]);
let async_op_context = Context::new();
let _span = server_tracer.build_with_context(async_op_builder, &async_op_context);
})
}
}
}
87 changes: 82 additions & 5 deletions src/convert.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use crate::models::{Properties, SeverityLevel};
use crate::models::{serialize_ms_links, Properties, SeverityLevel, MS_LINKS_KEY};
use chrono::{DateTime, SecondsFormat, Utc};
use opentelemetry::{trace::Status, KeyValue, Value};
use opentelemetry::{
trace::{Link, Status},
KeyValue, Value,
};
use opentelemetry_sdk::Resource;
use std::time::{Duration, SystemTime};
use std::{
collections::HashMap,
time::{Duration, SystemTime},
};

pub(crate) fn duration_to_string(duration: Duration) -> String {
let micros = duration.as_micros();
Expand All @@ -24,14 +30,37 @@ pub(crate) fn time_to_string(time: SystemTime) -> String {
pub(crate) fn attrs_to_properties(
attributes: &[KeyValue],
resource: &Resource,
links: &[Link],
) -> Option<Properties> {
let properties = attributes
let mut properties: Properties = attributes
.iter()
.filter(|kv| !kv.key.as_str().starts_with("_MS."))
.map(|kv| ((&kv.key).into(), (&kv.value).into()))
.chain(resource.iter().map(|(k, v)| (k.into(), v.into())))
.collect();

Some(properties).filter(|x: &Properties| !x.is_empty())
if !links.is_empty() {
properties.insert(MS_LINKS_KEY.into(), serialize_ms_links(links).into());
}

Some(properties).filter(|x| !x.is_empty())
}

pub(crate) fn attrs_to_map(attributes: &[KeyValue]) -> HashMap<&str, &Value> {
attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect()
}

pub(crate) fn attrs_map_to_properties(attributes: HashMap<&str, &Value>) -> Option<Properties> {
let properties: Properties = attributes
.iter()
.filter(|(&k, _)| !k.starts_with("_MS."))
.map(|(&k, &v)| (k.into(), v.into()))
.collect();

Some(properties).filter(|x| !x.is_empty())
}

pub(crate) fn status_to_result_code(status: &Status) -> i32 {
Expand Down Expand Up @@ -61,10 +90,58 @@ pub(crate) fn value_to_severity_level(value: &Value) -> Option<SeverityLevel> {
#[cfg(test)]
mod tests {
use super::*;
use crate::models::MS_LINKS_MAX_LEN;
use opentelemetry::trace::SpanContext;
use test_case::test_case;

#[test_case(Duration::from_micros(123456789123), "1.10:17:36.789123" ; "all")]
fn duration(duration: Duration, expected: &'static str) {
assert_eq!(expected.to_string(), duration_to_string(duration));
}

#[test]
fn attrs_to_properties_filters_ms() {
let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")];
let props = attrs_to_properties(&attrs, &Resource::empty(), &[]).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(props.get(&"a".into()).unwrap().as_ref(), "b");
}

#[test]
fn attrs_to_properties_encodes_links() {
let links = vec![Link::new(SpanContext::empty_context(), Vec::new())];
let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(
props.get(&"_MS.links".into()).unwrap().as_ref(),
"[{\"operation_Id\":\"00000000000000000000000000000000\",\"id\":\"0000000000000000\"}]"
);
}

#[test]
fn attrs_to_properties_encodes_many_links() {
let input_len = MS_LINKS_MAX_LEN + 10;
let mut links = Vec::with_capacity(input_len);
for _ in 0..input_len {
links.push(Link::new(SpanContext::empty_context(), Vec::new()));
}
let props = attrs_to_properties(&[], &Resource::empty(), &links).unwrap();
assert_eq!(props.len(), 1);
let encoded_links = props.get(&"_MS.links".into()).unwrap();
let deserialized: serde_json::Value = serde_json::from_str(encoded_links.as_ref()).unwrap();
match deserialized {
serde_json::Value::Array(arr) => assert_eq!(arr.len(), MS_LINKS_MAX_LEN),
_ => panic!("Expected links to be serialized as JSON array"),
}
}

#[test]
fn attrs_map_to_properties_filters_ms() {
let attrs = vec![KeyValue::new("a", "b"), KeyValue::new("_MS.a", "b")];
let attrs_map = attrs_to_map(&attrs);
assert_eq!(attrs_map.len(), 2);
let props = attrs_map_to_properties(attrs_map).unwrap();
assert_eq!(props.len(), 1);
assert_eq!(props.get(&"a".into()), Some(&"b".into()));
}
}
2 changes: 2 additions & 0 deletions src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod exception_details;
mod message_data;
#[cfg(feature = "metrics")]
mod metric_data;
mod ms_link;
#[cfg(feature = "live-metrics")]
mod quick_pulse;
mod remote_dependency_data;
Expand All @@ -26,6 +27,7 @@ pub(crate) use exception_details::*;
pub(crate) use message_data::*;
#[cfg(feature = "metrics")]
pub(crate) use metric_data::*;
pub(crate) use ms_link::*;
#[cfg(feature = "live-metrics")]
pub(crate) use quick_pulse::*;
pub(crate) use remote_dependency_data::*;
Expand Down
47 changes: 47 additions & 0 deletions src/models/ms_link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Serialization for span links.
//!
//! Application Insights supports receiving span links as JSON in the property `_MS.links`. This
//! does not appear in swagger API definition, yet, as far as I can tell. Compare with the
//! different SDKs:
//!
//! - [type definition in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/7f1cb9af148b7ed7331107a3e3cffb37e8ef9409/sdk/monitor/monitor-opentelemetry-exporter/src/types.ts#L21-L28)
//! - [serialization in JS exporter](https://github.com/Azure/azure-sdk-for-js/blob/c66cad23c4b803719db65cb48a453b0adc13307b/sdk/monitor/monitor-opentelemetry-exporter/src/utils/spanUtils.ts#L149-L155)
//! - [serialization in Python exporter](https://github.com/Azure/azure-sdk-for-python/blob/aa3a4b32e4d27f15ffd6429cefacce67f5776162/sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/trace/_exporter.py#L517-L527)

use opentelemetry::trace::Link;

pub(crate) const MS_LINKS_KEY: &str = "_MS.links";

/// Maximum number of links that fit into the property.
///
/// Links are serialized as a JSON array, e.g.
///
/// ```json
/// [{"operation_Id":"77225ad66928295345ea7c9b0a97682e","id":"7c29182f74d01363"}]
/// ```
///
/// Each link is a fixed length of 75 (plus 1 for the comma between links). Property values can be
/// a maximum of 8192 characters. Therefore the maximum number of links is:
///
/// ```plain
/// (8192 - 2) / 76 = 107.76...
/// ```
pub(crate) const MS_LINKS_MAX_LEN: usize = 107;

pub(crate) fn serialize_ms_links(links: &[Link]) -> String {
let count = links.len().min(MS_LINKS_MAX_LEN);
let mut res = String::with_capacity(count * 76 + 2);
res.push('[');
for link in links.iter().take(MS_LINKS_MAX_LEN) {
res.push_str(r#"{"operation_Id":""#);
res.push_str(&link.span_context.trace_id().to_string());
res.push_str(r#"","id":""#);
res.push_str(&link.span_context.span_id().to_string());
res.push_str(r#""},"#);
}
if count > 0 {
res.pop().expect("can remove trailing comma");
}
res.push(']');
res
}
53 changes: 11 additions & 42 deletions src/trace.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::{
convert::{
attrs_to_properties, duration_to_string, status_to_result_code, time_to_string,
value_to_severity_level,
attrs_map_to_properties, attrs_to_map, attrs_to_properties, duration_to_string,
status_to_result_code, time_to_string, value_to_severity_level,
},
models::{
context_tag_keys::attrs::CUSTOM_EVENT_NAME, Data, Envelope, EventData, ExceptionData,
ExceptionDetails, LimitedLenString, MessageData, Properties, RemoteDependencyData,
RequestData,
ExceptionDetails, LimitedLenString, MessageData, RemoteDependencyData, RequestData,
},
tags::{get_tags_for_event, get_tags_for_span},
Exporter,
Expand Down Expand Up @@ -199,7 +198,7 @@ impl From<&SpanData> for RequestData {
success: is_request_success(span),
source: None,
url: None,
properties: attrs_to_properties(&span.attributes, &span.resource),
properties: attrs_to_properties(&span.attributes, &span.resource, &span.links.links),
};

let attrs: HashMap<&str, &Value> = span
Expand Down Expand Up @@ -287,7 +286,7 @@ impl From<&SpanData> for RemoteDependencyData {
data: None,
target: None,
type_: None,
properties: attrs_to_properties(&span.attributes, &span.resource),
properties: attrs_to_properties(&span.attributes, &span.resource, &span.links.links),
};

let attrs: HashMap<&str, &Value> = span
Expand Down Expand Up @@ -389,11 +388,7 @@ impl From<&SpanData> for RemoteDependencyData {

impl From<&Event> for ExceptionData {
fn from(event: &Event) -> ExceptionData {
let mut attrs: HashMap<&str, &Value> = event
.attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect();
let mut attrs = attrs_to_map(&event.attributes);
let exception = ExceptionDetails {
type_name: attrs
.remove(semcov::trace::EXCEPTION_TYPE)
Expand All @@ -410,37 +405,21 @@ impl From<&Event> for ExceptionData {
ExceptionData {
ver: 2,
exceptions: vec![exception],
properties: Some(
attrs
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.collect(),
)
.filter(|x: &Properties| !x.is_empty()),
properties: attrs_map_to_properties(attrs),
}
}
}

impl From<&Event> for EventData {
fn from(event: &Event) -> EventData {
let mut attrs: HashMap<&str, &Value> = event
.attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect();
let mut attrs = attrs_to_map(&event.attributes);
EventData {
ver: 2,
name: attrs
.remove(CUSTOM_EVENT_NAME)
.map(Into::into)
.unwrap_or_else(|| "<no name>".into()),
properties: Some(
attrs
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.collect(),
)
.filter(|x: &Properties| !x.is_empty()),
properties: attrs_map_to_properties(attrs),
}
}
}
Expand All @@ -452,11 +431,7 @@ const LEVEL: &str = "level";

impl From<&Event> for MessageData {
fn from(event: &Event) -> MessageData {
let mut attrs: HashMap<&str, &Value> = event
.attributes
.iter()
.map(|kv| (kv.key.as_str(), &kv.value))
.collect();
let mut attrs = attrs_to_map(&event.attributes);
let severity_level = attrs.get(LEVEL).and_then(|x| value_to_severity_level(x));
if severity_level.is_some() {
attrs.remove(LEVEL);
Expand All @@ -469,13 +444,7 @@ impl From<&Event> for MessageData {
} else {
event.name.clone().into_owned().into()
},
properties: Some(
attrs
.iter()
.map(|(k, v)| ((*k).into(), (*v).into()))
.collect(),
)
.filter(|x: &Properties| !x.is_empty()),
properties: attrs_map_to_properties(attrs),
}
}
}
Loading
Loading