Skip to content
This repository has been archived by the owner on Jun 8, 2024. It is now read-only.

Commit

Permalink
support switching between http1 and http2
Browse files Browse the repository at this point in the history
  • Loading branch information
KodrAus committed Apr 3, 2024
1 parent 3db5a2d commit 1eefa83
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 30 deletions.
9 changes: 7 additions & 2 deletions targets/otlp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,18 @@ impl OtlpTransportBuilder {

Ok(match self.protocol {
Protocol::Http => OtlpTransport::Http {
http: HttpConnection::new(metrics, url, self.headers, http::HttpBody::raw)?,
http: HttpConnection::http1(metrics, url, self.headers, http::HttpBody::raw)?,
resource,
scope,
request_encoder,
},
Protocol::Grpc => OtlpTransport::Http {
http: HttpConnection::new(metrics, url, self.headers, http::HttpBody::grpc)?,
http: HttpConnection::http2(
metrics,
url,
self.headers,
http::HttpBody::grpc_framed,
)?,
resource,
scope,
request_encoder,
Expand Down
153 changes: 125 additions & 28 deletions targets/otlp/src/client/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bytes::Buf;
use emit::well_known::{KEY_SPAN_ID, KEY_TRACE_ID};
use hyper::{
body::{self, Body, Frame, SizeHint},
client::conn::http1::{self, SendRequest},
client::conn::{http1, http2},
Method, Request, Uri,
};

Expand All @@ -22,7 +22,11 @@ use crate::{
Error,
};

async fn connect(metrics: &InternalMetrics, uri: &HttpUri) -> Result<SendRequest<HttpBody>, Error> {
async fn connect(
metrics: &InternalMetrics,
version: HttpVersion,
uri: &HttpUri,
) -> Result<HttpSender, Error> {
let io = tokio::net::TcpStream::connect((uri.host(), uri.port()))
.await
.map_err(|e| {
Expand All @@ -38,16 +42,14 @@ async fn connect(metrics: &InternalMetrics, uri: &HttpUri) -> Result<SendRequest
{
let io = tls_handshake(metrics, io, uri).await?;

metrics.otlp_http_conn_tls_handshake.increment();

http_handshake(metrics, io).await
http_handshake(metrics, version, io).await
}
#[cfg(not(feature = "tls"))]
{
return Err(Error::new("https support requires the `tls` Cargo feature"));
}
} else {
http_handshake(metrics, io).await
http_handshake(metrics, version, io).await
}
}

Expand Down Expand Up @@ -83,46 +85,82 @@ async fn tls_handshake(

let conn = TlsConnector::from(tls);

conn.connect(domain, io).await.map_err(|e| {
let io = conn.connect(domain, io).await.map_err(|e| {
metrics.otlp_http_conn_tls_failed.increment();

Error::new("failed to connect TLS stream", e)
})
})?;

metrics.otlp_http_conn_tls_handshake.increment();

Ok(io)
}

async fn http_handshake(
metrics: &InternalMetrics,
version: HttpVersion,
io: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
) -> Result<SendRequest<HttpBody>, Error> {
) -> Result<HttpSender, Error> {
match version {
HttpVersion::Http1 => http1_handshake(metrics, io).await,
HttpVersion::Http2 => http2_handshake(metrics, io).await,
}
}

async fn http1_handshake(
metrics: &InternalMetrics,
io: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
) -> Result<HttpSender, Error> {
let (sender, conn) = http1::handshake(HttpIo(io)).await.map_err(|e| {
metrics.otlp_http_conn_failed.increment();

Error::new("failed to perform HTTP handshake", e)
Error::new("failed to perform HTTP1 handshake", e)
})?;

tokio::task::spawn(async move {
let _ = conn.await;
});

Ok(sender)
Ok(HttpSender::Http1(sender))
}

async fn http2_handshake(
metrics: &InternalMetrics,
io: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Sync + Unpin + 'static,
) -> Result<HttpSender, Error> {
let (sender, conn) = http2::handshake(TokioAmbientExecutor, HttpIo(io))
.await
.map_err(|e| {
metrics.otlp_http_conn_failed.increment();

Error::new("failed to perform HTTP2 handshake", e)
})?;

tokio::task::spawn(async move {
let _ = conn.await;
});

Ok(HttpSender::Http2(sender))
}

async fn send_request(
metrics: &InternalMetrics,
sender: &mut SendRequest<HttpBody>,
sender: &mut HttpSender,
uri: &HttpUri,
headers: impl Iterator<Item = (&str, &str)>,
body: HttpBody,
) -> Result<hyper::Response<body::Incoming>, Error> {
) -> Result<HttpResponse, Error> {
let rt = emit::runtime::internal();

let res = sender
.send_request({
.send_request(metrics, {
use emit::{Ctxt as _, Props as _};

let body = {
#[cfg(all(feature = "tls", feature = "gzip"))]
{
// TODO: This is happening at the wrong level
// gzip should be done _before_ framing
if !uri.is_https() {
body.gzip()?
} else {
Expand Down Expand Up @@ -174,23 +212,17 @@ async fn send_request(
Error::new("failed to stream HTTP body", e)
})?
})
.await
.map_err(|e| {
metrics.otlp_http_request_failed.increment();

Error::new("failed to send HTTP request", e)
})?;

metrics.otlp_http_request_sent.increment();
.await?;

Ok(res)
}

pub(crate) struct HttpConnection {
version: HttpVersion,
uri: HttpUri,
headers: Vec<(String, String)>,
body: fn(PreEncoded) -> HttpBody,
sender: Mutex<Option<SendRequest<HttpBody>>>,
sender: Mutex<Option<HttpSender>>,
metrics: Arc<InternalMetrics>,
}

Expand All @@ -199,7 +231,26 @@ pub(crate) struct HttpResponse {
}

impl HttpConnection {
pub fn new(
pub fn http1(
metrics: Arc<InternalMetrics>,
url: impl AsRef<str>,
headers: impl Into<Vec<(String, String)>>,
body: fn(PreEncoded) -> HttpBody,
) -> Result<Self, Error> {
Self::new(HttpVersion::Http1, metrics, url, headers, body)
}

pub fn http2(
metrics: Arc<InternalMetrics>,
url: impl AsRef<str>,
headers: impl Into<Vec<(String, String)>>,
body: fn(PreEncoded) -> HttpBody,
) -> Result<Self, Error> {
Self::new(HttpVersion::Http2, metrics, url, headers, body)
}

fn new(
version: HttpVersion,
metrics: Arc<InternalMetrics>,
url: impl AsRef<str>,
headers: impl Into<Vec<(String, String)>>,
Expand All @@ -212,25 +263,26 @@ impl HttpConnection {
url.parse()
.map_err(|e| Error::new(format_args!("failed to parse {url}"), e))?,
),
version,
body,
headers: headers.into(),
sender: Mutex::new(None),
metrics,
})
}

fn poison(&self) -> Option<SendRequest<HttpBody>> {
fn poison(&self) -> Option<HttpSender> {
self.sender.lock().unwrap().take()
}

fn unpoison(&self, sender: SendRequest<HttpBody>) {
fn unpoison(&self, sender: HttpSender) {
*self.sender.lock().unwrap() = Some(sender);
}

pub async fn send(&self, body: PreEncoded) -> Result<HttpResponse, Error> {
let mut sender = match self.poison() {
Some(sender) => sender,
None => connect(&self.metrics, &self.uri).await?,
None => connect(&self.metrics, self.version, &self.uri).await?,
};

let res = send_request(
Expand All @@ -244,6 +296,39 @@ impl HttpConnection {

self.unpoison(sender);

Ok(res)
}
}

#[derive(Debug, Clone, Copy)]
enum HttpVersion {
Http1,
Http2,
}

enum HttpSender {
Http1(http1::SendRequest<HttpBody>),
Http2(http2::SendRequest<HttpBody>),
}

impl HttpSender {
async fn send_request(
&mut self,
metrics: &InternalMetrics,
req: Request<HttpBody>,
) -> Result<HttpResponse, Error> {
let res = match self {
HttpSender::Http1(sender) => sender.send_request(req).await,
HttpSender::Http2(sender) => sender.send_request(req).await,
}
.map_err(|e| {
metrics.otlp_http_request_failed.increment();

Error::new("failed to send HTTP request", e)
})?;

metrics.otlp_http_request_sent.increment();

Ok(HttpResponse { res })
}
}
Expand Down Expand Up @@ -344,7 +429,7 @@ impl HttpBody {
}
}

pub fn grpc(payload: PreEncoded) -> Self {
pub fn grpc_framed(payload: PreEncoded) -> Self {
let encoding = Encoding::of(&payload);

let payload = HttpBodyPayload::Grpc {
Expand Down Expand Up @@ -581,3 +666,15 @@ impl<T: tokio::io::AsyncWrite> hyper::rt::Write for HttpIo<T> {
tokio::io::AsyncWrite::poll_shutdown(io, cx)
}
}

#[derive(Clone, Copy)]
struct TokioAmbientExecutor;

impl<F: Future + Send + 'static> hyper::rt::Executor<F> for TokioAmbientExecutor
where
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
tokio::spawn(fut);
}
}

0 comments on commit 1eefa83

Please sign in to comment.