Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
throw some debugging aids in
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Sep 29, 2023
1 parent 4956e65 commit 7c23827
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 56 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ exclude = [
"tests/smoke-test"
]

[patch.crates-io.sval]
git = "https://github.com/sval-rs/sval"

[patch.crates-io.sval_flatten]
git = "https://github.com/sval-rs/sval"

[package]
name = "emit"
version = "0.0.0"
Expand Down
4 changes: 4 additions & 0 deletions targets/otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ features = ["std", "flatten"]
[dependencies.sval_protobuf]
git = "https://github.com/KodrAus/sval_protobuf"

[dependencies.sval_json]
version = "2.9"
features = ["std"]

[dependencies.tokio]
version = "1"
features = ["rt-multi-thread", "sync"]
Expand Down
40 changes: 16 additions & 24 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use emit_batcher::BatchError;
use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
use sval_protobuf::buf::ProtoBuf;
use std::{future::Future, sync::Arc, time::Duration};

use crate::data::PreEncoded;
use crate::data::{PreEncoded, self};

pub(super) struct OtlpClient<T> {
sender: emit_batcher::Sender<T>,
Expand All @@ -13,8 +14,8 @@ pub(super) struct OtlpClientBuilder {

enum Destination {
HttpProto {
resource: Option<PreEncoded>,
scope: Option<PreEncoded>,
resource: Option<ProtoBuf>,
scope: Option<ProtoBuf>,
url: String,
},
}
Expand All @@ -40,24 +41,15 @@ impl OtlpClientBuilder {
}
}

pub fn resource(mut self, resource: impl emit_core::props::Props) -> Self {
/*
let mut attributes = Vec::new();
resource.for_each(|k, v| {
let key = k.to_string();
let value = value::to_value(v);
attributes.push(KeyValue { key, value });
ControlFlow::Continue(())
});
self.resource = Some(Resource {
attributes,
dropped_attributes_count: 0,
});
*/
pub fn resource(mut self, attributes: impl emit_core::props::Props) -> Self {
match self.dst {
Destination::HttpProto { ref mut resource, .. } => {
*resource = Some(sval_protobuf::stream_to_protobuf(data::Resource {
attributes: data::PropsResourceAttributes(attributes),
dropped_attribute_count: 0,
}));
}
}

self
}
Expand All @@ -79,8 +71,8 @@ impl OtlpClientBuilder {
scope,
} => RawClient::HttpProto {
url,
resource,
scope,
resource: resource.map(PreEncoded::Proto),
scope: scope.map(PreEncoded::Proto),
client: reqwest::Client::new(),
},
}),
Expand Down
115 changes: 84 additions & 31 deletions targets/otlp/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,54 @@ pub struct ResourceLogs<'a, R, SL> {
}

#[derive(Value)]
pub struct Resource<'a> {
#[sval(index = 1)]
pub attributes: &'a [KeyValue<'a>],
pub struct Resource<A> {
#[sval(flatten)]
pub attributes: A,
#[sval(index = 2)]
pub dropped_attribute_count: u32,
}

#[derive(Value)]
pub struct InlineResourceAttributes<'a> {
#[sval(index = 1)]
pub attributes: &'a [KeyValue<'a>],
}

pub struct PropsResourceAttributes<P>(pub P);

impl<P: emit_core::props::Props> sval::Value for PropsResourceAttributes<P> {
fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result {
stream.record_tuple_begin(None, None, None, None)?;

stream.record_tuple_value_begin(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(1))?;
stream.seq_begin(None)?;

let mut seen = HashSet::new();
self.0.for_each(|k, v| {
if seen.insert(k.to_owned()) {
stream
.seq_value_begin()
.map(|_| ControlFlow::Continue(()))
.unwrap_or(ControlFlow::Break(()))?;
sval_ref::stream_ref(&mut *stream, EmitValue(v))
.map(|_| ControlFlow::Continue(()))
.unwrap_or(ControlFlow::Break(()))?;
stream
.seq_value_end()
.map(|_| ControlFlow::Continue(()))
.unwrap_or(ControlFlow::Break(()))?;
}

ControlFlow::Continue(())
});

stream.seq_end()?;
stream.record_tuple_value_end(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(1))?;

stream.record_tuple_end(None, None, None)
}
}

#[derive(Value)]
pub struct ScopeLogs<'a, IS, LR> {
#[sval(index = 1)]
Expand All @@ -43,17 +84,23 @@ pub struct ScopeLogs<'a, IS, LR> {
}

#[derive(Value)]
pub struct InstrumentationScope<'a> {
pub struct InstrumentationScope<'a, A> {
#[sval(index = 1)]
pub name: &'a str,
#[sval(index = 2)]
pub version: &'a str,
#[sval(index = 3)]
pub attributes: &'a [KeyValue<'a>],
#[sval(flatten)]
pub attributes: A,
#[sval(index = 4)]
pub dropped_attribute_count: u32,
}

#[derive(Value)]
pub struct InlineInstrumentationScopeAttributes<'a> {
#[sval(index = 1)]
pub attributes: &'a [KeyValue<'a>],
}

#[derive(Value)]
#[repr(i32)]
pub enum SeverityNumber {
Expand Down Expand Up @@ -92,19 +139,19 @@ const ANY_VALUE_BYTES_INDEX: sval::Index = sval::Index::new(7);
// TODO: Use the consts here
#[derive(Value)]
pub enum AnyValue<'a> {
#[sval(index = 1)]
#[sval(label = "stringValue", index = 1)]
String(&'a str),
#[sval(index = 2)]
#[sval(label = "boolValue", index = 2)]
Bool(bool),
#[sval(index = 3)]
#[sval(label = "intValue", index = 3)]
Int(i64),
#[sval(index = 4)]
#[sval(label = "doubleValue", index = 4)]
Double(f64),
#[sval(index = 5)]
#[sval(label = "arrayValue", index = 5)]
Array(ArrayValue<'a>),
#[sval(index = 6)]
#[sval(label = "kvlistValue", index = 6)]
Kvlist(KvList<'a>),
#[sval(index = 7)]
#[sval(label = "bytesValue", index = 7)]
Bytes(&'a sval::BinarySlice),
}

Expand Down Expand Up @@ -145,7 +192,7 @@ pub struct LogRecord<B, A> {
}

#[derive(Value)]
pub struct InlineAttributes<'a> {
pub struct InlineLogRecordAttributes<'a> {
#[sval(index = 2)]
pub severity_number: SeverityNumber,
#[sval(index = 3)]
Expand All @@ -158,15 +205,18 @@ pub struct InlineAttributes<'a> {
pub span_id: &'a sval::BinaryArray<8>,
}

pub struct PropsAttributes<P>(pub P);
pub struct PropsLogRecordAttributes<P>(pub P);

impl<P: emit_core::props::Props> sval::Value for PropsAttributes<P> {
impl<P: emit_core::props::Props> sval::Value for PropsLogRecordAttributes<P> {
fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result {
let mut trace_id = [0; 32];
let mut span_id = [0; 16];
let mut level = emit_core::level::Level::default();

stream.tuple_begin(None, None, None, None)?;
stream.record_tuple_begin(None, None, None, None)?;

stream.record_tuple_value_begin(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(6))?;
stream.seq_begin(None)?;

let mut seen = HashSet::new();
self.0.for_each(|k, v| {
Expand All @@ -189,14 +239,14 @@ impl<P: emit_core::props::Props> sval::Value for PropsAttributes<P> {
_ => {
if seen.insert(k.to_owned()) {
stream
.tuple_value_begin(None, &sval::Index::new(6))
.seq_value_begin()
.map(|_| ControlFlow::Continue(()))
.unwrap_or(ControlFlow::Break(()))?;
sval_ref::stream_ref(&mut *stream, EmitValue(v))
.map(|_| ControlFlow::Continue(()))
.unwrap_or(ControlFlow::Break(()))?;
stream
.tuple_value_end(None, &sval::Index::new(6))
.seq_value_end()
.map(|_| ControlFlow::Continue(()))
.unwrap_or(ControlFlow::Break(()))?;
}
Expand All @@ -206,44 +256,47 @@ impl<P: emit_core::props::Props> sval::Value for PropsAttributes<P> {
ControlFlow::Continue(())
});

stream.seq_end()?;
stream.record_tuple_value_end(None, &sval::Label::new("attributes").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(6))?;

let severity_number = match level {
emit_core::level::Level::Debug => 1i32,
emit_core::level::Level::Info => 2i32,
emit_core::level::Level::Warn => 3i32,
emit_core::level::Level::Error => 4i32,
};

stream.tuple_value_begin(None, &sval::Index::new(2))?;
stream.record_tuple_value_begin(None, &sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT), &sval::Index::new(2))?;
stream.i32(severity_number)?;
stream.tuple_value_end(None, &sval::Index::new(2))?;
stream.record_tuple_value_end(None, &sval::Label::new("severityNumber").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(2))?;

stream.tuple_value_begin(None, &sval::Index::new(3))?;
stream.record_tuple_value_begin(None, &sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(3))?;
sval::stream_display(&mut *stream, level)?;
stream.tuple_value_end(None, &sval::Index::new(3))?;
stream.record_tuple_value_end(None, &sval::Label::new("severityText").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(3))?;

if trace_id != [0; 32] {
stream.tuple_value_begin(None, &sval::Index::new(9))?;
stream.record_tuple_value_begin(None, &sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(9))?;
stream.binary_begin(Some(32))?;
stream.binary_fragment_computed(&trace_id)?;
stream.binary_end()?;
stream.tuple_value_end(None, &sval::Index::new(9))?;
stream.record_tuple_value_end(None, &sval::Label::new("traceId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(9))?;
}

if span_id != [0; 16] {
stream.tuple_value_begin(None, &sval::Index::new(10))?;
stream.record_tuple_value_begin(None, &sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(10))?;
stream.binary_begin(Some(16))?;
stream.binary_fragment_computed(&span_id)?;
stream.binary_end()?;
stream.tuple_value_end(None, &sval::Index::new(10))?;
stream.record_tuple_value_end(None, &sval::Label::new("spanId").with_tag(&sval::tags::VALUE_IDENT),&sval::Index::new(10))?;
}

stream.tuple_end(None, None, None)
stream.record_tuple_end(None, None, None)
}
}

#[derive(Value)]
pub enum DisplayValue<D> {
#[sval(index = 1)]
#[sval(label = "stringValue", index = 1)]
String(D),
}

Expand Down Expand Up @@ -324,7 +377,7 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> {

fn binary_begin(&mut self, num_bytes: Option<usize>) -> sval::Result {
self.any_value_begin(&ANY_VALUE_BYTES_LABEL, &ANY_VALUE_BYTES_INDEX)?;
self.stream.text_begin(num_bytes)
self.stream.binary_begin(num_bytes)
}

fn binary_fragment(&mut self, fragment: &'sval [u8]) -> sval::Result {
Expand All @@ -336,7 +389,7 @@ impl<'a> sval_ref::ValueRef<'a> for EmitValue<'a> {
}

fn binary_end(&mut self) -> sval::Result {
self.stream.text_end()?;
self.stream.binary_end()?;
self.any_value_end(&ANY_VALUE_BYTES_LABEL, &ANY_VALUE_BYTES_INDEX)
}

Expand Down
47 changes: 46 additions & 1 deletion targets/otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,30 @@ impl emit_core::emitter::Emitter for OtlpLogsEmitter {

let observed_time_unix_nano = time_unix_nano;

println!("{}", sval_json::stream_to_string(data::LogRecord {
time_unix_nano,
observed_time_unix_nano,
body: Some(data::DisplayValue::String(sval::Display::new(evt.msg()))),
attributes: data::PropsLogRecordAttributes(evt.props()),
dropped_attributes_count: 0,
flags: Default::default(),
}).unwrap());

println!("{}", protoscope(&sval_protobuf::stream_to_protobuf(data::LogRecord {
time_unix_nano,
observed_time_unix_nano,
body: Some(data::DisplayValue::String(sval::Display::new(evt.msg()))),
attributes: data::PropsLogRecordAttributes(evt.props()),
dropped_attributes_count: 0,
flags: Default::default(),
}).to_vec()));

self.inner
.emit(sval_protobuf::stream_to_protobuf(data::LogRecord {
time_unix_nano,
observed_time_unix_nano,
body: Some(data::DisplayValue::String(sval::Display::new(evt.msg()))),
attributes: data::PropsAttributes(evt.props()),
attributes: data::PropsLogRecordAttributes(evt.props()),
dropped_attributes_count: 0,
flags: Default::default(),
}))
Expand All @@ -76,3 +94,30 @@ impl emit_core::emitter::Emitter for OtlpLogsEmitter {
self.inner.blocking_flush(timeout)
}
}

fn protoscope(encoded: &[u8]) -> String {
use std::{
io::{Read, Write},
process::{Command, Stdio},
};

let mut protoscope = Command::new("protoscope")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed to call protoscope");

let mut stdin = protoscope.stdin.take().expect("missing stdin");
stdin.write_all(encoded).expect("failed to write");
drop(stdin);

let mut buf = String::new();
protoscope
.stdout
.take()
.expect("missing stdout")
.read_to_string(&mut buf)
.expect("failed to read");

buf
}

0 comments on commit 7c23827

Please sign in to comment.