Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
collect all grpc specific code together
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Apr 5, 2024
1 parent ba37636 commit ddc35fb
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 115 deletions.
39 changes: 32 additions & 7 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,14 +397,16 @@ impl OtlpTransportBuilder {
metrics,
url,
self.headers,
http::HttpContent::raw,
|req| Ok(req),
|res| async move {
let status = res.http_status();

if status >= 200 && status < 300 {
Ok(vec![])
} else {
Err(Error::msg(format_args!("OTLP HTTP server responded {status}")))
Err(Error::msg(format_args!(
"OTLP HTTP server responded {status}"
)))
}
},
)?,
Expand All @@ -417,7 +419,25 @@ impl OtlpTransportBuilder {
metrics,
url,
self.headers,
http::HttpContent::grpc,
|mut req| {
let len = (u32::try_from(req.payload_len()).unwrap()).to_be_bytes();

Ok(if let Some(compression) = req.take_content_encoding() {
req.with_content_type_header("application/grpc+proto")
.with_headers(match compression {
"gzip" => &[("grpc-encoding", "gzip")],
compression => {
return Err(Error::msg(format_args!(
"unsupported compression '{compression}'"
)))
}
})
.with_frame_header([1, len[0], len[1], len[2], len[3]])
} else {
req.with_content_type_header("application/grpc+proto")
.with_frame_header([0, len[0], len[1], len[2], len[3]])
})
},
|res| async move {
let mut status = 0;
let mut msg = String::new();
Expand All @@ -440,9 +460,13 @@ impl OtlpTransportBuilder {
Ok(vec![])
} else {
if msg.len() > 0 {
Err(Error::msg(format_args!("OTLP gRPC server responded {status} {msg}")))
Err(Error::msg(format_args!(
"OTLP gRPC server responded {status} {msg}"
)))
} else {
Err(Error::msg(format_args!("OTLP gRPC server responded {status}")))
Err(Error::msg(format_args!(
"OTLP gRPC server responded {status}"
)))
}
}
},
Expand Down Expand Up @@ -827,6 +851,7 @@ impl<R: data::RequestEncoder> OtlpTransport<R> {
ref scope,
ref request_encoder,
} => {
let uri = http.uri();
let batch_size = batch.len();

match http
Expand All @@ -843,7 +868,7 @@ impl<R: data::RequestEncoder> OtlpTransport<R> {
rt: emit::runtime::internal(),
when: emit::filter::always(),
extent,
"OTLP batch of {batch_size} events",
"OTLP batch of {batch_size} events to {uri}",
batch_size,
)
});
Expand All @@ -856,7 +881,7 @@ impl<R: data::RequestEncoder> OtlpTransport<R> {
rt: emit::runtime::internal(),
when: emit::filter::always(),
extent,
"OTLP batch of {batch_size} events failed: {err}",
"OTLP batch of {batch_size} events to {uri} failed: {err}",
batch_size,
err,
)
Expand Down
Loading

0 comments on commit ddc35fb

Please sign in to comment.