Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
stub out metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Jan 22, 2024
1 parent 415341c commit dcd13af
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<
where
TCtxt::Frame: Send + 'static,
{
#[must_use = "call `blocking_flush` at the end of `main` to ensure events are flushed."]
#[must_use = "call `blocking_flush` at the end of `main` (after flushing the main runtime) to ensure events are flushed."]
pub fn init_internal(self) -> Init<&'static TEmitter, &'static TCtxt> {
let ambient = emit_core::runtime::internal_slot()
.init(
Expand Down
133 changes: 128 additions & 5 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use emit_batcher::BatchError;
use std::{fmt, sync::Arc, time::Duration};

use crate::{
data::{self, default_message_formatter, logs, traces, PreEncoded},
data::{self, default_message_formatter, logs, metrics, traces, PreEncoded},
Error,
};

Expand All @@ -13,11 +13,18 @@ mod http;
pub struct OtlpClient {
logs: Option<logs::EventEncoder>,
traces: Option<traces::EventEncoder>,
metrics: Option<metrics::EventEncoder>,
sender: emit_batcher::Sender<Channel<PreEncoded>>,
}

impl emit::emitter::Emitter for OtlpClient {
fn emit<P: emit::props::Props>(&self, evt: &emit::event::Event<P>) {
if let Some(ref encoder) = self.metrics {
if let Some(encoded) = encoder.encode_event(evt) {
return self.sender.send(ChannelItem::Metric(encoded));
}
}

if let Some(ref encoder) = self.traces {
if let Some(encoded) = encoder.encode_event(evt) {
return self.sender.send(ChannelItem::Span(encoded));
Expand All @@ -38,11 +45,13 @@ impl emit::emitter::Emitter for OtlpClient {
struct Channel<T> {
logs: Vec<T>,
traces: Vec<T>,
metrics: Vec<T>,
}

enum ChannelItem<T> {
LogRecord(T),
Span(T),
Metric(T),
}

impl<T> emit_batcher::Channel for Channel<T> {
Expand All @@ -52,23 +61,38 @@ impl<T> emit_batcher::Channel for Channel<T> {
Channel {
logs: Vec::new(),
traces: Vec::new(),
metrics: Vec::new(),
}
}

fn push(&mut self, item: Self::Item) {
match item {
ChannelItem::LogRecord(item) => self.logs.push(item),
ChannelItem::Span(item) => self.traces.push(item),
ChannelItem::Metric(item) => self.metrics.push(item),
}
}

fn remaining(&self) -> usize {
self.logs.len() + self.traces.len()
let Channel {
logs,
traces,
metrics,
} = self;

logs.len() + traces.len() + metrics.len()
}

fn clear(&mut self) {
self.logs.clear();
self.traces.clear();
let Channel {
logs,
traces,
metrics,
} = self;

logs.clear();
traces.clear();
metrics.clear();
}
}

Expand All @@ -78,6 +102,7 @@ pub struct OtlpClientBuilder {
encoding: Encoding,
logs: Option<OtlpLogsBuilder>,
traces: Option<OtlpTracesBuilder>,
metrics: Option<OtlpMetricsBuilder>,
}

pub struct OtlpLogsBuilder {
Expand Down Expand Up @@ -112,6 +137,22 @@ impl OtlpTracesBuilder {
}
}

pub struct OtlpMetricsBuilder {
encoder: metrics::EventEncoder,
transport: Transport,
}

impl OtlpMetricsBuilder {
pub fn http(dst: impl Into<String>) -> Self {
OtlpMetricsBuilder {
encoder: metrics::EventEncoder {
name: default_message_formatter(),
},
transport: Transport::Http { url: dst.into() },
}
}
}

enum Encoding {
Proto,
}
Expand All @@ -128,6 +169,7 @@ impl OtlpClientBuilder {
scope: None,
logs: None,
traces: None,
metrics: None,
}
}

Expand All @@ -149,6 +191,15 @@ impl OtlpClientBuilder {
self
}

pub fn metrics_http(self, dst: impl Into<String>) -> Self {
self.metrics(OtlpMetricsBuilder::http(dst))
}

pub fn metrics(mut self, builder: OtlpMetricsBuilder) -> Self {
self.metrics = Some(builder);
self
}

pub fn resource(mut self, attributes: impl emit::props::Props) -> Self {
match self.encoding {
Encoding::Proto => {
Expand Down Expand Up @@ -186,6 +237,7 @@ impl OtlpClientBuilder {

let mut logs = None;
let mut traces = None;
let mut metrics = None;

let client = OtlpSender {
logs: match self.logs {
Expand Down Expand Up @@ -216,13 +268,31 @@ impl OtlpClientBuilder {
}
None => None,
},
metrics: match self.metrics {
Some(OtlpMetricsBuilder {
encoder,
transport: Transport::Http { url },
}) => {
metrics = Some(encoder);
Some(Arc::new(RawClient::Http {
http: HttpConnection::new(&url)?,
resource: self.resource.clone(),
scope: self.scope.clone(),
}))
}
None => None,
},
};

emit_batcher::tokio::spawn(receiver, move |batch: Channel<PreEncoded>| {
let client = client.clone();

async move {
let Channel { logs, traces } = batch;
let Channel {
logs,
traces,
metrics,
} = batch;

let mut r = Ok(());

Expand All @@ -247,6 +317,7 @@ impl OtlpClientBuilder {
r = Err(e.map(|logs| Channel {
logs,
traces: Vec::new(),
metrics: Vec::new(),
}));
}
}
Expand Down Expand Up @@ -278,6 +349,40 @@ impl OtlpClientBuilder {
Err(e.map(|traces| Channel {
traces,
logs: Vec::new(),
metrics: Vec::new(),
}))
};
}
}

if let Some(client) = client.metrics {
if let Err(e) = client
.send(metrics, metrics::encode_request, {
#[cfg(feature = "decode_responses")]
{
if emit::runtime::internal_slot().is_enabled() {
Some(metrics::decode_response)
} else {
None
}
}
#[cfg(not(feature = "decode_responses"))]
{
None::<fn(Result<&[u8], &[u8]>)>
}
})
.await
{
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.metrics = e.into_retryable();
channel
}))
} else {
Err(e.map(|metrics| Channel {
metrics,
logs: Vec::new(),
traces: Vec::new(),
}))
};
}
Expand All @@ -290,6 +395,7 @@ impl OtlpClientBuilder {
Ok(OtlpClient {
logs,
traces,
metrics,
sender,
})
}
Expand Down Expand Up @@ -327,11 +433,28 @@ impl OtlpTracesBuilder {
}
}

impl OtlpMetricsBuilder {
pub fn name(
mut self,
writer: impl Fn(
&emit::event::Event<&dyn emit::props::ErasedProps>,
&mut fmt::Formatter,
) -> fmt::Result
+ Send
+ Sync
+ 'static,
) -> Self {
self.encoder.name = Box::new(writer);
self
}
}

#[derive(Clone)]
pub struct OtlpSender {
// TODO: Share the client
logs: Option<Arc<RawClient>>,
traces: Option<Arc<RawClient>>,
metrics: Option<Arc<RawClient>>,
}

enum RawClient {
Expand Down
1 change: 1 addition & 0 deletions targets/otlp/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use sval_derive::Value;
use sval_protobuf::buf::{ProtoBuf, ProtoBufCursor};

pub mod logs;
pub mod metrics;
pub mod traces;

mod any_value;
Expand Down
12 changes: 12 additions & 0 deletions targets/otlp/src/data/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub(crate) mod trace {
pub(crate) mod v1;
}

#[path = ""]
pub(crate) mod metrics {
#[path = "./generated/opentelemetry.proto.metrics.v1.rs"]
pub(crate) mod v1;
}

#[path = ""]
pub(crate) mod common {
#[path = "./generated/opentelemetry.proto.common.v1.rs"]
Expand All @@ -35,6 +41,12 @@ pub(crate) mod collector {
#[path = "./generated/opentelemetry.proto.collector.trace.v1.rs"]
pub(crate) mod v1;
}

#[path = ""]
pub(crate) mod metrics {
#[path = "./generated/opentelemetry.proto.collector.metrics.v1.rs"]
pub(crate) mod v1;
}
}

#[cfg(feature = "grpc")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ pub struct ExportMetricsServiceRequest {
/// data from multiple origins typically batch the data before forwarding further and
/// in that case this array will contain multiple elements.
#[prost(message, repeated, tag = "1")]
pub resource_metrics: ::prost::alloc::vec::Vec<
super::super::super::metrics::v1::ResourceMetrics,
>,
pub resource_metrics:
::prost::alloc::vec::Vec<super::super::super::metrics::v1::ResourceMetrics>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,7 @@ pub struct Exemplar {
/// recorded alongside the original measurement. Only key/value pairs that were
/// filtered out by the aggregator should be included
#[prost(message, repeated, tag = "7")]
pub filtered_attributes: ::prost::alloc::vec::Vec<
super::super::common::v1::KeyValue,
>,
pub filtered_attributes: ::prost::alloc::vec::Vec<super::super::common::v1::KeyValue>,
/// time_unix_nano is the exact time when this exemplar was recorded
///
/// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January
Expand Down Expand Up @@ -744,9 +742,7 @@ impl DataPointFlags {
pub fn as_str_name(&self) -> &'static str {
match self {
DataPointFlags::DoNotUse => "DATA_POINT_FLAGS_DO_NOT_USE",
DataPointFlags::NoRecordedValueMask => {
"DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK"
}
DataPointFlags::NoRecordedValueMask => "DATA_POINT_FLAGS_NO_RECORDED_VALUE_MASK",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand Down
48 changes: 48 additions & 0 deletions targets/otlp/src/data/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use emit_batcher::BatchError;

use super::{AnyValue, MessageFormatter, MessageRenderer, PreEncoded};

pub(crate) struct EventEncoder {
pub name: Box<MessageFormatter>,
}

impl EventEncoder {
pub(crate) fn encode_event(
&self,
evt: &emit::event::Event<impl emit::props::Props>,
) -> Option<PreEncoded> {
todo!()
}
}

pub(crate) fn encode_request(
resource: Option<&PreEncoded>,
scope: Option<&PreEncoded>,
log_records: &[PreEncoded],
) -> Result<PreEncoded, BatchError<Vec<PreEncoded>>> {
todo!()
}

#[cfg(feature = "decode_responses")]
pub(crate) fn decode_response(body: Result<&[u8], &[u8]>) {
use prost::Message;

match body {
Ok(body) => {
let response =
crate::data::generated::collector::metrics::v1::ExportMetricsServiceResponse::decode(
body,
)
.unwrap();

emit::debug!(rt: emit::runtime::internal(), "received {#[emit::as_debug] response}");
}
Err(body) => {
let response =
crate::data::generated::collector::metrics::v1::ExportMetricsPartialSuccess::decode(body)
.unwrap();

emit::warn!(rt: emit::runtime::internal(), "received {#[emit::as_debug] response}");
}
}
}
4 changes: 4 additions & 0 deletions targets/otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ pub fn logs_http(dst: impl Into<String>) -> OtlpLogsBuilder {
pub fn traces_http(dst: impl Into<String>) -> OtlpTracesBuilder {
OtlpTracesBuilder::http(dst)
}

pub fn metrics_http(dst: impl Into<String>) -> OtlpMetricsBuilder {
OtlpMetricsBuilder::http(dst)
}
Loading

0 comments on commit dcd13af

Please sign in to comment.