Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
port to hyper for http
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Oct 7, 2023
1 parent 5a05b33 commit 9e1a7ab
Show file tree
Hide file tree
Showing 21 changed files with 875 additions and 175 deletions.
28 changes: 22 additions & 6 deletions targets/otlp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ version = "0.0.0"
edition = "2021"

[features]
grpc = ["dep:prost"]
default = ["http"]
http = ["dep:hyper", "dep:hyper-util"]
grpc = ["dep:prost", "dep:serde", "emit_core/serde"]

[dependencies.emit_core]
path = "../../core"
Expand All @@ -15,26 +17,40 @@ path = "../../batcher"
features = ["tokio"]

[dependencies.sval]
version = "2.9"
version = "2.10"
features = ["std"]

[dependencies.sval_ref]
version = "2.9"
version = "2.10"

[dependencies.sval_derive]
version = "2.9"
version = "2.10"
features = ["std", "flatten"]

[dependencies.sval_protobuf]
git = "https://github.com/KodrAus/sval_protobuf"
features = ["bytes"]

[dependencies.tokio]
version = "1"
features = ["rt-multi-thread", "sync"]

[dependencies.reqwest]
version = "0.11"
[dependencies.hyper]
optional = true
version = "1.0.0-rc.4"
features = ["client", "http1"]

[dependencies.hyper-util]
optional = true
git = "https://github.com/hyperium/hyper-util"

[dependencies.bytes]
version = "1"

[dependencies.prost]
version = "0.11"
optional = true

[dependencies.serde]
version = "1"
optional = true
2 changes: 1 addition & 1 deletion targets/otlp/gen/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut config = prost_build::Config::new();

config.out_dir("../src/proto");
config.out_dir("../src/data/generated");

config.compile_protos(
&[
Expand Down
28 changes: 11 additions & 17 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use sval_protobuf::buf::ProtoBuf;

use crate::data::{self, PreEncoded};

use self::http::HttpConnection;

mod http;

pub(super) struct OtlpClient<T> {
sender: emit_batcher::Sender<T>,
}
Expand Down Expand Up @@ -65,7 +69,7 @@ impl OtlpClientBuilder {
self,
mut on_batch: impl FnMut(OtlpSender, Vec<T>) -> F + Send + 'static,
) -> OtlpClient<T> {
let (sender, receiver) = emit_batcher::bounded(1024);
let (sender, receiver) = emit_batcher::bounded(10_000);

let client = OtlpSender {
client: Arc::new(match self.dst {
Expand All @@ -74,10 +78,9 @@ impl OtlpClientBuilder {
resource,
scope,
} => RawClient::HttpProto {
url,
http: HttpConnection::new(&url).expect("failed to open connection"),
resource: resource.map(PreEncoded::Proto),
scope: scope.map(PreEncoded::Proto),
client: reqwest::Client::new(),
},
}),
};
Expand All @@ -95,40 +98,31 @@ pub struct OtlpSender {

enum RawClient {
HttpProto {
url: String,
http: HttpConnection,
resource: Option<PreEncoded>,
scope: Option<PreEncoded>,
client: reqwest::Client,
},
}

impl OtlpSender {
pub(crate) async fn emit<T>(
self,
batch: Vec<T>,
// TODO: Encode proto
encode: impl FnOnce(
Option<&PreEncoded>,
Option<&PreEncoded>,
&[T],
) -> Result<Vec<u8>, BatchError<T>>,
) -> Result<PreEncoded, BatchError<T>>,
) -> Result<(), BatchError<T>> {
match *self.client {
RawClient::HttpProto {
ref url,
ref http,
ref resource,
ref scope,
ref client,
} => {
let body = encode(resource.as_ref(), scope.as_ref(), &batch)?;

client
.request(reqwest::Method::POST, url)
.header("content-type", "application/x-protobuf")
.body(body)
.send()
http.send(encode(resource.as_ref(), scope.as_ref(), &batch)?)
.await
.map_err(|e| BatchError::retry(e, batch))?;
.map_err(|e| BatchError::no_retry(e))?;
}
}

Expand Down
152 changes: 152 additions & 0 deletions targets/otlp/src/client/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::{
error, fmt,
pin::Pin,
sync::Mutex,
task::{Context, Poll},
};

use bytes::Buf;
use hyper::{
body::{Body, Frame, SizeHint},
client::conn::{http1, http1::SendRequest},
Method, Request, Uri,
};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;

use crate::data::{PreEncoded, PreEncodedCursor};

pub(crate) struct Error(Box<dyn error::Error + Send + Sync>);

impl Error {
fn new(e: impl error::Error + Send + Sync + 'static) -> Self {
Error(Box::new(e))
}
}

impl error::Error for Error {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
self.0.source()
}
}

impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}

pub(crate) struct HttpConnection {
uri: Uri,
sender: Mutex<Option<SendRequest<HttpBody>>>,
}

impl HttpConnection {
pub fn new(url: &str) -> Result<Self, Error> {
Ok(HttpConnection {
uri: url.parse().map_err(Error::new)?,
sender: Mutex::new(None),
})
}

fn poison(&self) -> Option<SendRequest<HttpBody>> {
self.sender.lock().unwrap().take()
}

fn unpoison(&self, sender: SendRequest<HttpBody>) {
*self.sender.lock().unwrap() = Some(sender);
}

pub async fn send(&self, body: PreEncoded) -> Result<(), Error> {
let mut sender = match self.poison() {
Some(sender) => sender,
None => {
let io = TokioIo::new(
TcpStream::connect((
self.uri.host().unwrap(),
self.uri.port_u16().unwrap_or(80),
))
.await
.map_err(Error::new)?,
);

let (sender, conn) = http1::handshake(io).await.map_err(Error::new)?;

tokio::task::spawn(async move {
let _ = conn.await;
});

sender
}
};

send_request(&mut sender, &self.uri, body).await?;

self.unpoison(sender);

Ok(())
}
}

async fn send_request(
sender: &mut SendRequest<HttpBody>,
uri: &Uri,
body: PreEncoded,
) -> Result<(), Error> {
sender
.send_request(
Request::builder()
.uri(uri)
.method(Method::POST)
.header("host", uri.authority().unwrap().as_str())
.header(
"content-type",
match body {
PreEncoded::Proto(_) => "application/x-protobuf",
},
)
.body(HttpBody(Some(body.into_cursor())))
.map_err(Error::new)?,
)
.await
.map_err(Error::new)?;

Ok(())
}

pub(crate) struct HttpBody(Option<PreEncodedCursor>);

impl Body for HttpBody {
type Data = PreEncodedCursor;

type Error = std::convert::Infallible;

fn poll_frame(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if let Some(buf) = self.get_mut().0.take() {
Poll::Ready(Some(Ok(Frame::data(buf))))
} else {
Poll::Ready(None)
}
}

fn is_end_stream(&self) -> bool {
self.0.is_none()
}

fn size_hint(&self) -> SizeHint {
if let Some(ref buf) = self.0 {
SizeHint::with_exact(buf.remaining() as u64)
} else {
SizeHint::with_exact(0)
}
}
}
48 changes: 46 additions & 2 deletions targets/otlp/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::borrow::Cow;

use bytes::Buf;
use sval_derive::Value;
use sval_protobuf::buf::ProtoBuf;
use sval_protobuf::buf::{ProtoBuf, ProtoBufCursor};

mod any_value;
mod export_logs_service;
mod instrumentation_scope;
mod log_record;
mod resource;

pub use self::{
#[cfg(feature = "grpc")]
pub(crate) mod generated;

pub(crate) use self::{
any_value::*, export_logs_service::*, instrumentation_scope::*, log_record::*, resource::*,
};

Expand All @@ -16,3 +22,41 @@ pub use self::{
pub(crate) enum PreEncoded {
Proto(ProtoBuf),
}

impl PreEncoded {
pub fn into_cursor(self) -> PreEncodedCursor {
match self {
PreEncoded::Proto(buf) => PreEncodedCursor::Proto(buf.into_cursor()),
}
}

pub fn to_vec(&self) -> Cow<[u8]> {
match self {
PreEncoded::Proto(buf) => buf.to_vec(),
}
}
}

pub(crate) enum PreEncodedCursor {
Proto(ProtoBufCursor),
}

impl Buf for PreEncodedCursor {
fn remaining(&self) -> usize {
match self {
PreEncodedCursor::Proto(cursor) => cursor.remaining(),
}
}

fn chunk(&self) -> &[u8] {
match self {
PreEncodedCursor::Proto(cursor) => cursor.chunk(),
}
}

fn advance(&mut self, cnt: usize) {
match self {
PreEncodedCursor::Proto(cursor) => cursor.advance(cnt),
}
}
}
Loading

0 comments on commit 9e1a7ab

Please sign in to comment.