Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
finish off HTTP/JSON support
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Mar 28, 2024
1 parent 89fbed2 commit 89533cf
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 37 deletions.
4 changes: 4 additions & 0 deletions targets/otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ features = ["std", "flatten"]
version = "0.1"
features = ["bytes"]

[dependencies.sval_json]
version = "2"
features = ["std"]

[dependencies.sval_buffer]
version = "2"
features = ["std"]
Expand Down
139 changes: 106 additions & 33 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use emit_batcher::BatchError;
use std::{collections::HashMap, fmt, sync::Arc, time::Duration};

use crate::{
data::{self, logs, metrics, traces, PreEncoded},
data::{self, logs, metrics, traces, PreEncoded, RawEncoder},
internal_metrics::InternalMetrics,
Error,
};
Expand Down Expand Up @@ -150,6 +150,7 @@ struct Scope {
#[derive(Debug, Clone, Copy)]
pub(crate) enum Encoding {
Proto,
Json,
}

pub struct OtlpLogsBuilder {
Expand All @@ -160,19 +161,31 @@ pub struct OtlpLogsBuilder {
}

impl OtlpLogsBuilder {
pub fn proto(transport: OtlpTransportBuilder) -> Self {
fn new(encoding: Encoding, transport: OtlpTransportBuilder) -> Self {
OtlpLogsBuilder {
event_encoder: logs::LogsEventEncoder::default(),
request_encoder: logs::LogsRequestEncoder::default(),
encoding: Encoding::Proto,
encoding,
transport,
}
}

pub fn proto(transport: OtlpTransportBuilder) -> Self {
Self::new(Encoding::Proto, transport)
}

pub fn http_proto(dst: impl Into<String>) -> Self {
Self::proto(OtlpTransportBuilder::http(dst))
}

pub fn json(transport: OtlpTransportBuilder) -> Self {
Self::new(Encoding::Json, transport)
}

pub fn http_json(dst: impl Into<String>) -> Self {
Self::json(OtlpTransportBuilder::http(dst))
}

pub fn body(
mut self,
writer: impl Fn(
Expand All @@ -196,19 +209,31 @@ pub struct OtlpTracesBuilder {
}

impl OtlpTracesBuilder {
pub fn proto(transport: OtlpTransportBuilder) -> Self {
fn new(encoding: Encoding, transport: OtlpTransportBuilder) -> Self {
OtlpTracesBuilder {
event_encoder: traces::TracesEventEncoder::default(),
request_encoder: traces::TracesRequestEncoder::default(),
encoding: Encoding::Proto,
encoding,
transport,
}
}

pub fn proto(transport: OtlpTransportBuilder) -> Self {
Self::new(Encoding::Proto, transport)
}

pub fn http_proto(dst: impl Into<String>) -> Self {
Self::proto(OtlpTransportBuilder::http(dst))
}

pub fn json(transport: OtlpTransportBuilder) -> Self {
Self::new(Encoding::Json, transport)
}

pub fn http_json(dst: impl Into<String>) -> Self {
Self::json(OtlpTransportBuilder::http(dst))
}

pub fn name(
mut self,
writer: impl Fn(
Expand All @@ -232,19 +257,31 @@ pub struct OtlpMetricsBuilder {
}

impl OtlpMetricsBuilder {
pub fn proto(transport: OtlpTransportBuilder) -> Self {
fn new(encoding: Encoding, transport: OtlpTransportBuilder) -> Self {
OtlpMetricsBuilder {
event_encoder: metrics::MetricsEventEncoder::default(),
request_encoder: metrics::MetricsRequestEncoder::default(),
encoding: Encoding::Proto,
encoding,
transport,
}
}

pub fn proto(transport: OtlpTransportBuilder) -> Self {
Self::new(Encoding::Proto, transport)
}

pub fn http_proto(dst: impl Into<String>) -> Self {
Self::proto(OtlpTransportBuilder::http(dst))
}

pub fn json(transport: OtlpTransportBuilder) -> Self {
Self::new(Encoding::Json, transport)
}

pub fn http_json(dst: impl Into<String>) -> Self {
Self::json(OtlpTransportBuilder::http(dst))
}

pub fn name(
mut self,
writer: impl Fn(
Expand Down Expand Up @@ -397,50 +434,68 @@ impl OtlpBuilder {
Some(OtlpLogsBuilder {
event_encoder,
request_encoder,
encoding: encoding @ Encoding::Proto,
encoding,
transport,
}) => {
logs_event_encoder = Some(ClientEventEncoder::new(encoding, event_encoder));

Some(Arc::new(transport.build(
self.resource.as_ref().map(encode_resource::<data::Proto>),
self.scope.as_ref().map(encode_scope::<data::Proto>),
ClientRequestEncoder::new(encoding, request_encoder),
)?))
Some(Arc::new(
transport.build(
self.resource
.as_ref()
.map(|resource| encode_resource(encoding, resource)),
self.scope
.as_ref()
.map(|scope| encode_scope(encoding, scope)),
ClientRequestEncoder::new(encoding, request_encoder),
)?,
))
}
None => None,
},
traces: match self.otlp_traces {
Some(OtlpTracesBuilder {
event_encoder,
request_encoder,
encoding: encoding @ Encoding::Proto,
encoding,
transport,
}) => {
traces_event_encoder = Some(ClientEventEncoder::new(encoding, event_encoder));

Some(Arc::new(transport.build(
self.resource.as_ref().map(encode_resource::<data::Proto>),
self.scope.as_ref().map(encode_scope::<data::Proto>),
ClientRequestEncoder::new(encoding, request_encoder),
)?))
Some(Arc::new(
transport.build(
self.resource
.as_ref()
.map(|resource| encode_resource(encoding, resource)),
self.scope
.as_ref()
.map(|scope| encode_scope(encoding, scope)),
ClientRequestEncoder::new(encoding, request_encoder),
)?,
))
}
None => None,
},
metrics: match self.otlp_metrics {
Some(OtlpMetricsBuilder {
event_encoder,
request_encoder,
encoding: encoding @ Encoding::Proto,
encoding,
transport,
}) => {
metrics_event_encoder = Some(ClientEventEncoder::new(encoding, event_encoder));

Some(Arc::new(transport.build(
self.resource.as_ref().map(encode_resource::<data::Proto>),
self.scope.as_ref().map(encode_scope::<data::Proto>),
ClientRequestEncoder::new(encoding, request_encoder),
)?))
Some(Arc::new(
transport.build(
self.resource
.as_ref()
.map(|resource| encode_resource(encoding, resource)),
self.scope
.as_ref()
.map(|scope| encode_scope(encoding, scope)),
ClientRequestEncoder::new(encoding, request_encoder),
)?,
))
}
None => None,
},
Expand Down Expand Up @@ -590,6 +645,7 @@ impl<E: data::EventEncoder> ClientEventEncoder<E> {
) -> Option<PreEncoded> {
match self.encoding {
Encoding::Proto => self.encoder.encode_event::<data::Proto>(evt),
Encoding::Json => self.encoder.encode_event::<data::Json>(evt),
}
}
}
Expand All @@ -616,24 +672,41 @@ impl<R: data::RequestEncoder> ClientRequestEncoder<R> {
Encoding::Proto => self
.encoder
.encode_request::<data::Proto>(resource, scope, items),
Encoding::Json => self
.encoder
.encode_request::<data::Json>(resource, scope, items),
}
}
}

fn encode_resource<E: data::RawEncoder>(resource: &Resource) -> PreEncoded {
E::encode(data::Resource {
attributes: &data::PropsResourceAttributes(&resource.attributes),
fn encode_resource(encoding: Encoding, resource: &Resource) -> PreEncoded {
let attributes = data::PropsResourceAttributes(&resource.attributes);

let resource = data::Resource {
attributes: &attributes,
dropped_attribute_count: 0,
})
};

match encoding {
Encoding::Proto => data::Proto::encode(&resource),
Encoding::Json => data::Json::encode(&resource),
}
}

fn encode_scope<E: data::RawEncoder>(scope: &Scope) -> PreEncoded {
E::encode(data::InstrumentationScope {
fn encode_scope(encoding: Encoding, scope: &Scope) -> PreEncoded {
let attributes = data::PropsInstrumentationScopeAttributes(&scope.attributes);

let scope = data::InstrumentationScope {
name: &scope.name,
version: &scope.version,
attributes: &data::PropsInstrumentationScopeAttributes(&scope.attributes),
attributes: &attributes,
dropped_attribute_count: 0,
})
};

match encoding {
Encoding::Proto => data::Proto::encode(&scope),
Encoding::Json => data::Json::encode(&scope),
}
}

#[derive(Clone)]
Expand Down
1 change: 1 addition & 0 deletions targets/otlp/src/client/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ async fn send_request(
"content-type",
match body {
PreEncoded::Proto(_) => "application/x-protobuf",
PreEncoded::Json(_) => "application/json",
},
);

Expand Down
63 changes: 62 additions & 1 deletion targets/otlp/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashSet, fmt, ops::ControlFlow};
use bytes::Buf;
use emit_batcher::BatchError;
use sval_derive::Value;
use sval_json::JsonStr;
use sval_protobuf::buf::{ProtoBuf, ProtoBufCursor};

pub mod logs;
Expand Down Expand Up @@ -80,40 +81,100 @@ impl RawEncoder for Proto {
}
}

#[derive(Value, Clone)]
pub(crate) struct Json;

pub(crate) struct TextTraceId(emit::trace::TraceId);

impl From<emit::trace::TraceId> for TextTraceId {
fn from(id: emit::trace::TraceId) -> TextTraceId {
TextTraceId(id)
}
}

impl sval::Value for TextTraceId {
fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result {
stream.value_computed(&sval::Display::new(&self.0))
}
}

pub(crate) struct TextSpanId(emit::trace::SpanId);

impl From<emit::trace::SpanId> for TextSpanId {
fn from(id: emit::trace::SpanId) -> TextSpanId {
TextSpanId(id)
}
}

impl sval::Value for TextSpanId {
fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(&'sval self, stream: &mut S) -> sval::Result {
stream.value_computed(&sval::Display::new(&self.0))
}
}

impl RawEncoder for Json {
type TraceId = TextTraceId;
type SpanId = TextSpanId;

fn encode<V: sval::Value>(value: V) -> PreEncoded {
PreEncoded::Json(JsonStr::boxed(
sval_json::stream_to_string(value).expect("failed to stream"),
))
}
}

#[derive(Value)]
#[sval(dynamic)]
pub(crate) enum PreEncoded {
Proto(ProtoBuf),
Json(Box<JsonStr>),
}

impl PreEncoded {
pub fn into_cursor(self) -> PreEncodedCursor {
match self {
PreEncoded::Proto(buf) => PreEncodedCursor::Proto(buf.into_cursor()),
PreEncoded::Json(buf) => PreEncodedCursor::Json(JsonCursor { buf, idx: 0 }),
}
}
}

pub(crate) enum PreEncodedCursor {
Proto(ProtoBufCursor),
Json(JsonCursor),
}

pub(crate) struct JsonCursor {
buf: Box<JsonStr>,
idx: usize,
}

impl Buf for PreEncodedCursor {
fn remaining(&self) -> usize {
match self {
PreEncodedCursor::Proto(cursor) => cursor.remaining(),
PreEncodedCursor::Json(cursor) => cursor.buf.as_str().len() - cursor.idx,
}
}

fn chunk(&self) -> &[u8] {
match self {
PreEncodedCursor::Proto(cursor) => cursor.chunk(),
PreEncodedCursor::Json(cursor) => &cursor.buf.as_str().as_bytes()[cursor.idx..],
}
}

fn advance(&mut self, cnt: usize) {
match self {
PreEncodedCursor::Proto(cursor) => cursor.advance(cnt),
PreEncodedCursor::Json(cursor) => {
let new_idx = cursor.idx + cnt;

if new_idx > cursor.buf.as_str().len() {
panic!("attempt to advance out of bounds");
}

cursor.idx = new_idx;
}
}
}
}
Expand Down
Loading

0 comments on commit 89533cf

Please sign in to comment.