Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
work on re-enabling tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Nov 30, 2023
1 parent 37b6d2a commit 9500213
Show file tree
Hide file tree
Showing 17 changed files with 733 additions and 456 deletions.
10 changes: 10 additions & 0 deletions batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ impl<T: Channel> BatchError<T> {
retryable: T::new(),
}
}

pub fn into_retryable(self) -> T {
self.retryable
}

pub fn map<U>(self, f: impl FnOnce(T) -> U) -> BatchError<U> {
BatchError {
retryable: f(self.retryable),
}
}
}

impl<T: Channel> Receiver<T> {
Expand Down
230 changes: 167 additions & 63 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use emit_batcher::BatchError;
use std::{future::Future, sync::Arc, time::Duration};
use sval_protobuf::buf::ProtoBuf;
use std::{sync::Arc, time::Duration};

use crate::{
data::{self, PreEncoded},
Expand All @@ -11,54 +10,116 @@ use self::http::HttpConnection;

mod http;

pub(super) struct OtlpClient<T> {
sender: emit_batcher::Sender<Vec<T>>,
pub struct OtlpClient {
emit_logs: bool,
emit_traces: bool,
sender: emit_batcher::Sender<Channel<PreEncoded>>,
}

pub(super) struct OtlpClientBuilder {
dst: Destination,
impl emit_core::emitter::Emitter for OtlpClient {
fn emit<P: emit_core::props::Props>(&self, evt: &emit_core::event::Event<P>) {
if self.emit_traces {
if let Some(encoded) = crate::traces::encode_event(evt) {
return self.sender.send(ChannelItem::Span(encoded));
}
}

if self.emit_logs {
self.sender
.send(ChannelItem::LogRecord(crate::logs::encode_event(evt)));
}
}

fn blocking_flush(&self, timeout: Duration) {
emit_batcher::tokio::blocking_flush(&self.sender, timeout)
}
}

enum Destination {
HttpProto {
resource: Option<ProtoBuf>,
scope: Option<ProtoBuf>,
url: String,
},
struct Channel<T> {
logs: Vec<T>,
traces: Vec<T>,
}

impl<T> OtlpClient<T> {
pub fn emit(&self, value: T) {
self.sender.send(value);
enum ChannelItem<T> {
LogRecord(T),
Span(T),
}

impl<T> emit_batcher::Channel for Channel<T> {
type Item = ChannelItem<T>;

fn new() -> Self {
Channel {
logs: Vec::new(),
traces: Vec::new(),
}
}

pub fn blocking_flush(&self, timeout: Duration) {
emit_batcher::tokio::blocking_flush(&self.sender, timeout)
fn push(&mut self, item: Self::Item) {
match item {
ChannelItem::LogRecord(item) => self.logs.push(item),
ChannelItem::Span(item) => self.traces.push(item),
}
}

fn len(&self) -> usize {
self.logs.len() + self.traces.len()
}

fn clear(&mut self) {
self.logs.clear();
self.traces.clear();
}
}

pub struct OtlpClientBuilder {
resource: Option<PreEncoded>,
scope: Option<PreEncoded>,
encoding: Encoding,
logs: Option<Transport>,
traces: Option<Transport>,
}

enum Encoding {
Proto,
}

enum Transport {
Http { url: String },
}

impl OtlpClientBuilder {
pub fn http_proto(dst: impl Into<String>) -> Self {
pub fn proto() -> Self {
OtlpClientBuilder {
dst: Destination::HttpProto {
url: dst.into(),
resource: None,
scope: None,
},
encoding: Encoding::Proto,
resource: None,
scope: None,
logs: None,
traces: None,
}
}

pub fn logs_http(mut self, dst: impl Into<String>) -> Self {
self.logs = Some(Transport::Http { url: dst.into() });

self
}

pub fn traces_http(mut self, dst: impl Into<String>) -> Self {
self.traces = Some(Transport::Http { url: dst.into() });

self
}

pub fn resource(mut self, attributes: impl emit_core::props::Props) -> Self {
match self.dst {
Destination::HttpProto {
ref mut resource, ..
} => {
match self.encoding {
Encoding::Proto => {
let protobuf = sval_protobuf::stream_to_protobuf(data::Resource {
attributes: &data::EmitResourceAttributes(attributes),
attributes: &data::PropsResourceAttributes(attributes),
dropped_attribute_count: 0,
});

*resource = Some(protobuf);
self.resource = Some(PreEncoded::Proto(protobuf));
}
}

Expand All @@ -71,76 +132,119 @@ impl OtlpClientBuilder {
version: &str,
attributes: impl emit_core::props::Props,
) -> Self {
match self.dst {
Destination::HttpProto { ref mut scope, .. } => {
match self.encoding {
Encoding::Proto => {
let protobuf = sval_protobuf::stream_to_protobuf(data::InstrumentationScope {
name,
version,
attributes: &data::EmitInstrumentationScopeAttributes(attributes),
attributes: &data::PropsInstrumentationScopeAttributes(attributes),
dropped_attribute_count: 0,
});

*scope = Some(protobuf);
self.scope = Some(PreEncoded::Proto(protobuf));
}
}

self
}

pub fn spawn<
T: Send + 'static,
F: Future<Output = Result<(), BatchError<Vec<T>>>> + Send + 'static,
>(
self,
mut on_batch: impl FnMut(OtlpSender, Vec<T>) -> F + Send + 'static,
) -> Result<OtlpClient<T>, Error> {
pub fn spawn(self) -> Result<OtlpClient, Error> {
let (sender, receiver) = emit_batcher::bounded(10_000);

let client = OtlpSender {
client: Arc::new(match self.dst {
Destination::HttpProto {
url,
resource,
scope,
} => RawClient::HttpProto {
logs: match self.logs {
Some(Transport::Http { url }) => Some(Arc::new(RawClient::Http {
http: HttpConnection::new(&url)?,
resource: self.resource.clone(),
scope: self.scope.clone(),
})),
None => None,
},
traces: match self.traces {
Some(Transport::Http { url }) => Some(Arc::new(RawClient::Http {
http: HttpConnection::new(&url)?,
resource: resource.map(PreEncoded::Proto),
scope: scope.map(PreEncoded::Proto),
},
}),
resource: self.resource.clone(),
scope: self.scope.clone(),
})),
None => None,
},
};

emit_batcher::tokio::spawn(receiver, move |batch| on_batch(client.clone(), batch));
let emit_logs = client.logs.is_some();
let emit_traces = client.traces.is_some();

emit_batcher::tokio::spawn(receiver, move |batch: Channel<PreEncoded>| {
let client = client.clone();

async move {
let Channel { logs, traces } = batch;

let mut r = Ok(());

if let Some(client) = client.logs {
if let Err(e) = client.send(logs, crate::logs::encode_request).await {
r = Err(e.map(|logs| Channel {
logs,
traces: Vec::new(),
}));
}
}

if let Some(client) = client.traces {
if let Err(e) = client.send(traces, crate::traces::encode_request).await {
r = if let Err(re) = r {
Err(re.map(|mut channel| {
channel.traces = e.into_retryable();
channel
}))
} else {
Err(e.map(|traces| Channel {
traces,
logs: Vec::new(),
}))
};
}
}

r
}
});

Ok(OtlpClient { sender })
Ok(OtlpClient {
emit_logs,
emit_traces,
sender,
})
}
}

#[derive(Clone)]
pub struct OtlpSender {
client: Arc<RawClient>,
// TODO: Share the client
logs: Option<Arc<RawClient>>,
traces: Option<Arc<RawClient>>,
}

enum RawClient {
HttpProto {
Http {
http: HttpConnection,
resource: Option<PreEncoded>,
scope: Option<PreEncoded>,
},
}

impl OtlpSender {
pub(crate) async fn emit<T>(
self,
batch: Vec<T>,
impl RawClient {
pub(crate) async fn send(
&self,
batch: Vec<PreEncoded>,
encode: impl FnOnce(
Option<&PreEncoded>,
Option<&PreEncoded>,
&[T],
) -> Result<PreEncoded, BatchError<Vec<T>>>,
) -> Result<(), BatchError<Vec<T>>> {
match *self.client {
RawClient::HttpProto {
&[PreEncoded],
) -> Result<PreEncoded, BatchError<Vec<PreEncoded>>>,
) -> Result<(), BatchError<Vec<PreEncoded>>> {
match self {
RawClient::Http {
ref http,
ref resource,
ref scope,
Expand Down
Loading

0 comments on commit 9500213

Please sign in to comment.