From e52ba2c29618767a0616bdb8c0d3660e2483f816 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Mon, 22 Apr 2024 16:50:52 +0200 Subject: [PATCH 01/31] Upgrade to hyper v1 --- Cargo.toml | 18 ++++--- examples/tracing-http-propagator/Cargo.toml | 2 + .../tracing-http-propagator/src/client.rs | 9 ++-- .../tracing-http-propagator/src/server.rs | 52 ++++++++++++------- opentelemetry-http/Cargo.toml | 2 +- opentelemetry-prometheus/Cargo.toml | 2 + opentelemetry-prometheus/examples/hyper.rs | 49 +++++++++-------- .../src/proto/opentelemetry-proto | 2 +- opentelemetry-sdk/src/lib.rs | 2 +- 9 files changed, 81 insertions(+), 57 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 28a0dea98d..52e01e0d97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,23 +23,25 @@ criterion = "0.5" futures-core = "0.3" futures-executor = "0.3" futures-util = { version = "0.3", default-features = false } -hyper = { version = "0.14", default-features = false } -http = { version = "0.2", default-features = false } +hyper = { version = "1.3", default-features = false } +hyper-util = "0.1" +http = { version = "1.1", default-features = false } +http-body-util = "0.1" log = "0.4.21" once_cell = "1.13" ordered-float = "4.0" pin-project-lite = "0.2" -prost = "0.12" -prost-build = "0.12" -prost-types = "0.12" +prost = "0.13" +prost-build = "0.13" +prost-types = "0.13" rand = { version = "0.8", default-features = false } -reqwest = { version = "0.11", default-features = false } +reqwest = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false } serde_json = "1.0" temp-env = "0.3.6" thiserror = { version = "1", default-features = false } -tonic = { version = "0.11", default-features = false } -tonic-build = "0.11" +tonic = { version = "0.12", default-features = false } +tonic-build = "0.12" tokio = { version = "1", default-features = false } tokio-stream = "0.1.1" tracing = { version = "0.1", default-features = false } diff --git a/examples/tracing-http-propagator/Cargo.toml b/examples/tracing-http-propagator/Cargo.toml index 0c019e14e9..7d13e666ed 100644 --- a/examples/tracing-http-propagator/Cargo.toml +++ b/examples/tracing-http-propagator/Cargo.toml @@ -16,7 +16,9 @@ path = "src/client.rs" doc = false [dependencies] +http-body-util = { workspace = true } hyper = { workspace = true, features = ["full"] } +hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } opentelemetry = { path = "../../opentelemetry" } opentelemetry_sdk = { path = "../../opentelemetry-sdk" } diff --git a/examples/tracing-http-propagator/src/client.rs b/examples/tracing-http-propagator/src/client.rs index 35e2530b4a..e0936fd46b 100644 --- a/examples/tracing-http-propagator/src/client.rs +++ b/examples/tracing-http-propagator/src/client.rs @@ -1,10 +1,11 @@ -use hyper::{body::Body, Client}; +use http_body_util::Full; +use hyper_util::{client::legacy::Client, rt::TokioExecutor}; use opentelemetry::{ global, trace::{SpanKind, TraceContextExt, Tracer}, Context, KeyValue, }; -use opentelemetry_http::HeaderInjector; +use opentelemetry_http::{Bytes, HeaderInjector}; use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; use opentelemetry_stdout::SpanExporter; @@ -24,7 +25,7 @@ async fn send_request( body_content: &str, span_name: &str, ) -> std::result::Result<(), Box> { - let client = Client::new(); + let client = Client::builder(TokioExecutor::new()).build_http(); let tracer = global::tracer("example/client"); let span = tracer .span_builder(String::from(span_name)) @@ -37,7 +38,7 @@ async fn send_request( propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap())) }); let res = client - .request(req.body(Body::from(String::from(body_content)))?) + .request(req.body(Full::new(Bytes::from(body_content.to_string())))?) .await?; cx.span().add_event( diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 1ad924c766..0f42d5b5d8 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -1,39 +1,39 @@ -use hyper::{ - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, -}; +use http_body_util::{Either, Full}; +use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode}; +use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::{ global, trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, Context, KeyValue, }; -use opentelemetry_http::HeaderExtractor; +use opentelemetry_http::{Bytes, HeaderExtractor}; use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; use opentelemetry_semantic_conventions::trace; use opentelemetry_stdout::SpanExporter; use std::{convert::Infallible, net::SocketAddr}; +use tokio::net::TcpListener; // Utility function to extract the context from the incoming request headers -fn extract_context_from_request(req: &Request) -> Context { +fn extract_context_from_request(req: &Request) -> Context { global::get_text_map_propagator(|propagator| { propagator.extract(&HeaderExtractor(req.headers())) }) } // Separate async function for the handle endpoint -async fn handle_health_check(_req: Request) -> Result, Infallible> { +async fn handle_health_check(_req: Request) -> Result>, Infallible> { let tracer = global::tracer("example/server"); let mut span = tracer .span_builder("health_check") .with_kind(SpanKind::Internal) .start(&tracer); span.add_event("Health check accessed", vec![]); - let res = Response::new(Body::from("Server is up and running!")); + let res = Response::new(Full::new(Bytes::from_static(b"Server is up and running!"))); Ok(res) } // Separate async function for the echo endpoint -async fn handle_echo(req: Request) -> Result, Infallible> { +async fn handle_echo(req: Request) -> Result, Infallible> { let tracer = global::tracer("example/server"); let mut span = tracer .span_builder("echo") @@ -44,7 +44,9 @@ async fn handle_echo(req: Request) -> Result, Infallible> { Ok(res) } -async fn router(req: Request) -> Result, Infallible> { +async fn router( + req: Request, +) -> Result, Incoming>>, Infallible> { // Extract the context from the incoming request headers let parent_cx = extract_context_from_request(&req); let response = { @@ -59,17 +61,24 @@ async fn router(req: Request) -> Result, Infallible> { let cx = Context::default().with_span(span); match (req.method(), req.uri().path()) { - (&hyper::Method::GET, "/health") => handle_health_check(req).with_context(cx).await, - (&hyper::Method::GET, "/echo") => handle_echo(req).with_context(cx).await, + (&hyper::Method::GET, "/health") => handle_health_check(req) + .with_context(cx) + .await + .map(|response| response.map(Either::Left)), + (&hyper::Method::GET, "/echo") => handle_echo(req) + .with_context(cx) + .await + .map(|response| response.map(Either::Right)), _ => { cx.span() .set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404)); - let mut not_found = Response::default(); + let mut not_found = Response::new(Either::Left(Full::default())); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) } } }; + response } @@ -87,15 +96,18 @@ fn init_tracer() { #[tokio::main] async fn main() { + use hyper_util::server::conn::auto::Builder; + init_tracer(); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = TcpListener::bind(addr).await.unwrap(); - let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) }); - - let server = Server::bind(&addr).serve(make_svc); - - println!("Listening on {addr}"); - if let Err(e) = server.await { - eprintln!("server error: {e}"); + while let Ok((stream, _addr)) = listener.accept().await { + if let Err(err) = Builder::new(TokioExecutor::new()) + .serve_connection(TokioIo::new(stream), service_fn(router)) + .await + { + eprintln!("{err}"); + } } } diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index f7472054df..b4e809d1ab 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -17,7 +17,7 @@ reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"] async-trait = { workspace = true } bytes = { workspace = true } http = { workspace = true } -hyper = { workspace = true, features = ["http2", "client", "tcp"], optional = true } +hyper = { workspace = true, features = ["http2", "client"], optional = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } diff --git a/opentelemetry-prometheus/Cargo.toml b/opentelemetry-prometheus/Cargo.toml index 7aa488cfd6..97d9d7abf2 100644 --- a/opentelemetry-prometheus/Cargo.toml +++ b/opentelemetry-prometheus/Cargo.toml @@ -28,7 +28,9 @@ protobuf = "2.14" [dev-dependencies] opentelemetry-semantic-conventions = { version = "0.15" } +http-body-util = { workspace = true } hyper = { workspace = true, features = ["full"] } +hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } [features] diff --git a/opentelemetry-prometheus/examples/hyper.rs b/opentelemetry-prometheus/examples/hyper.rs index 943ba617b6..10d1402e98 100644 --- a/opentelemetry-prometheus/examples/hyper.rs +++ b/opentelemetry-prometheus/examples/hyper.rs @@ -1,8 +1,11 @@ +use http_body_util::Full; use hyper::{ + body::{Bytes, Incoming}, header::CONTENT_TYPE, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, + service::service_fn, + Method, Request, Response, }; +use hyper_util::rt::{TokioExecutor, TokioIo}; use once_cell::sync::Lazy; use opentelemetry::{ metrics::{Counter, Histogram, MeterProvider as _, Unit}, @@ -10,16 +13,17 @@ use opentelemetry::{ }; use opentelemetry_sdk::metrics::SdkMeterProvider; use prometheus::{Encoder, Registry, TextEncoder}; -use std::convert::Infallible; +use std::net::SocketAddr; use std::sync::Arc; use std::time::SystemTime; +use tokio::net::TcpListener; static HANDLER_ALL: Lazy<[KeyValue; 1]> = Lazy::new(|| [KeyValue::new("handler", "all")]); async fn serve_req( - req: Request, + req: Request, state: Arc, -) -> Result, hyper::Error> { +) -> Result>, hyper::Error> { println!("Receiving request at path {}", req.uri()); let request_start = SystemTime::now(); @@ -38,16 +42,16 @@ async fn serve_req( Response::builder() .status(200) .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) + .body(Full::new(Bytes::from(buffer))) .unwrap() } (&Method::GET, "/") => Response::builder() .status(200) - .body(Body::from("Hello World")) + .body(Full::new("Hello World".into())) .unwrap(), _ => Response::builder() .status(404) - .body(Body::from("Missing Page")) + .body(Full::new("Missing Page".into())) .unwrap(), }; @@ -67,6 +71,8 @@ struct AppState { #[tokio::main] pub async fn main() -> Result<(), Box> { + use hyper_util::server::conn::auto::Builder; + let registry = Registry::new(); let exporter = opentelemetry_prometheus::exporter() .with_registry(registry.clone()) @@ -92,23 +98,22 @@ pub async fn main() -> Result<(), Box> { .init(), }); - // For every connection, we must make a `Service` to handle all - // incoming HTTP requests on said connection. - let make_svc = make_service_fn(move |_conn| { - let state = state.clone(); - // This is the `Service` that will handle the connection. - // `service_fn` is a helper to convert a function that - // returns a Response into a `Service`. - async move { Ok::<_, Infallible>(service_fn(move |req| serve_req(req, state.clone()))) } - }); - - let addr = ([127, 0, 0, 1], 3000).into(); - - let server = Server::bind(&addr).serve(make_svc); + let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); + let listener = TcpListener::bind(addr).await.unwrap(); println!("Listening on http://{addr}"); - server.await?; + while let Ok((stream, _addr)) = listener.accept().await { + if let Err(err) = Builder::new(TokioExecutor::new()) + .serve_connection( + TokioIo::new(stream), + service_fn(|req| serve_req(req, state.clone())), + ) + .await + { + eprintln!("{err}"); + } + } Ok(()) } diff --git a/opentelemetry-proto/src/proto/opentelemetry-proto b/opentelemetry-proto/src/proto/opentelemetry-proto index b3060d2104..24d4bc0020 160000 --- a/opentelemetry-proto/src/proto/opentelemetry-proto +++ b/opentelemetry-proto/src/proto/opentelemetry-proto @@ -1 +1 @@ -Subproject commit b3060d2104df364136d75a35779e6bd48bac449a +Subproject commit 24d4bc002003c74db7aa608c8e254155daf8e49d diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 852f0b8327..79123b0fcb 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -118,7 +118,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/open-telemetry/opentelemetry-rust/main/assets/logo.svg" )] -#![cfg_attr(test, deny(warnings))] +//#![cfg_attr(test, deny(warnings))] pub mod export; mod instrumentation; From 6c75e7c5d8c98ede77c14bece26bb33810ddadc6 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Mon, 22 Apr 2024 17:00:10 +0200 Subject: [PATCH 02/31] Fix hyper feature --- opentelemetry-http/Cargo.toml | 3 +++ opentelemetry-http/src/lib.rs | 26 ++++++++++++++++---------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index b4e809d1ab..8aaf3f7fc5 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" rust-version = "1.65" [features] +hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"] reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"] reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"] @@ -17,7 +18,9 @@ reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"] async-trait = { workspace = true } bytes = { workspace = true } http = { workspace = true } +http-body-util = { workspace = true, optional = true } hyper = { workspace = true, features = ["http2", "client"], optional = true } +hyper-util = { workspace = true, features = ["client-legacy"], optional = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index b921a41d9c..9d1b49982f 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -105,21 +105,23 @@ pub mod hyper { use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use http::HeaderValue; - use hyper::client::connect::Connect; - use hyper::Client; + use hyper::body::Body; + use hyper_util::client::legacy::{connect::Connect, Client}; + use http_body_util::BodyExt; + use std::error::Error; use std::fmt::Debug; use std::time::Duration; use tokio::time; #[derive(Debug, Clone)] - pub struct HyperClient { - inner: Client, + pub struct HyperClient { + inner: Client, timeout: Duration, authorization: Option, } - impl HyperClient { - pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + impl HyperClient { + pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { Self { inner, timeout, @@ -128,7 +130,7 @@ pub mod hyper { } pub fn new_with_timeout_and_authorization_header( - inner: Client, + inner: Client, timeout: Duration, authorization: HeaderValue, ) -> Self { @@ -141,13 +143,16 @@ pub mod hyper { } #[async_trait] - impl HttpClient for HyperClient + impl HttpClient for HyperClient where C: Connect + Send + Sync + Clone + Debug + 'static, + B: From> + Body + Send + Sync + Debug + Unpin + 'static, + ::Data: Send, + ::Error: Into>, { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let mut request = Request::from_parts(parts, body.into()); + let mut request: Request = Request::from_parts(parts, body.into()); if let Some(ref authorization) = self.authorization { request .headers_mut() @@ -155,9 +160,10 @@ pub mod hyper { } let mut response = time::timeout(self.timeout, self.inner.request(request)).await??; let headers = std::mem::take(response.headers_mut()); + let mut http_response = Response::builder() .status(response.status()) - .body(hyper::body::to_bytes(response.into_body()).await?)?; + .body(response.into_body().collect().await?.to_bytes())?; *http_response.headers_mut() = headers; Ok(http_response.error_for_status()?) From e22bb0b063fc42fa0813f537df907bd84b3eecba Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Mon, 22 Apr 2024 17:12:18 +0200 Subject: [PATCH 03/31] Fix compilation --- opentelemetry-http/src/lib.rs | 2 +- opentelemetry-sdk/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 9d1b49982f..2112c873a9 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -105,9 +105,9 @@ pub mod hyper { use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use http::HeaderValue; + use http_body_util::BodyExt; use hyper::body::Body; use hyper_util::client::legacy::{connect::Connect, Client}; - use http_body_util::BodyExt; use std::error::Error; use std::fmt::Debug; use std::time::Duration; diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 79123b0fcb..852f0b8327 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -118,7 +118,7 @@ #![doc( html_logo_url = "https://raw.githubusercontent.com/open-telemetry/opentelemetry-rust/main/assets/logo.svg" )] -//#![cfg_attr(test, deny(warnings))] +#![cfg_attr(test, deny(warnings))] pub mod export; mod instrumentation; From adda11d6d3a55144c74ab850e45a02b77b99c2b2 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Mon, 22 Apr 2024 21:19:25 +0200 Subject: [PATCH 04/31] Ignore doctest for now --- Cargo.toml | 2 +- opentelemetry-zipkin/Cargo.toml | 2 +- opentelemetry-zipkin/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 52e01e0d97..45fa0db400 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ futures-executor = "0.3" futures-util = { version = "0.3", default-features = false } hyper = { version = "1.3", default-features = false } hyper-util = "0.1" -http = { version = "1.1", default-features = false } +http = { version = "1.1", default-features = false, features = ["std"] } http-body-util = "0.1" log = "0.4.21" once_cell = "1.13" diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index b163db8088..03bf20d9eb 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -36,7 +36,7 @@ serde_json = { workspace = true } serde = { workspace = true, features = ["derive"] } typed-builder = "0.18" http = { workspace = true } -reqwest = { workspace = true, optional = true} +reqwest = { workspace = true, optional = true } thiserror = { workspace = true } futures-core = { workspace = true } diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index d884ab740a..29e898b130 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -84,7 +84,7 @@ //! [`ZipkinPipelineBuilder`] docs for details of each option. //! //! -//! ```no_run +//! ```no_run,ignore //! use opentelemetry::{global, KeyValue, trace::Tracer}; //! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}; //! use opentelemetry_sdk::export::trace::ExportResult; From 0664b648d9dda61739f86dd433c66f3751b42da3 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Mon, 22 Apr 2024 21:24:02 +0200 Subject: [PATCH 05/31] Enable HTTP 1 and 2 --- opentelemetry-http/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index 8aaf3f7fc5..34040daffe 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -20,7 +20,7 @@ bytes = { workspace = true } http = { workspace = true } http-body-util = { workspace = true, optional = true } hyper = { workspace = true, features = ["http2", "client"], optional = true } -hyper-util = { workspace = true, features = ["client-legacy"], optional = true } +hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], optional = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } From 75b325927004ccc9f1d0dbba5be98620a12ef000 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 16:47:04 +0200 Subject: [PATCH 06/31] Fix some lints --- opentelemetry-sdk/src/logs/log_emitter.rs | 2 +- opentelemetry-sdk/src/metrics/periodic_reader.rs | 2 +- opentelemetry-sdk/src/trace/span_processor.rs | 2 +- opentelemetry-zipkin/src/lib.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 9914c7b408..faadf473b3 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -312,7 +312,7 @@ mod tests { use opentelemetry::{Key, KeyValue, Value}; use std::fmt::{Debug, Formatter}; use std::sync::atomic::AtomicU64; - use std::sync::{Arc, Mutex}; + use std::sync::Mutex; use std::thread; struct ShutdownTestLogProcessor { diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 81df8d32f7..034053bdbc 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -291,7 +291,7 @@ impl PeriodicReaderWorker { true } - async fn run(mut self, mut messages: impl Unpin + FusedStream) { + async fn run(mut self, mut messages: impl FusedStream + Unpin) { while let Some(message) = messages.next().await { if !self.process_message(message).await { break; diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 214c0e5768..b99dc45311 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -422,7 +422,7 @@ impl BatchSpanProcessorInternal { }) } - async fn run(mut self, mut messages: impl Unpin + FusedStream) { + async fn run(mut self, mut messages: impl FusedStream + Unpin) { loop { select! { // FuturesUnordered implements Fuse intelligently such that it diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 29e898b130..d884ab740a 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -84,7 +84,7 @@ //! [`ZipkinPipelineBuilder`] docs for details of each option. //! //! -//! ```no_run,ignore +//! ```no_run //! use opentelemetry::{global, KeyValue, trace::Tracer}; //! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}; //! use opentelemetry_sdk::export::trace::ExportResult; From 5944e90d64cd9fc397ded0b18f13b6550bebb7db Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:00:42 +0200 Subject: [PATCH 07/31] Update test --- opentelemetry-zipkin/Cargo.toml | 3 ++- opentelemetry-zipkin/src/lib.rs | 20 +++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index 03bf20d9eb..768a165716 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -43,6 +43,7 @@ futures-core = { workspace = true } [dev-dependencies] bytes = { workspace = true } futures-util = { workspace = true, features = ["io"] } -hyper = { workspace = true } +http-body-util = { workspace = true } +hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] } opentelemetry_sdk = { default-features = false, features = ["trace", "testing"], path = "../opentelemetry-sdk" } temp-env = { workspace = true } diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index d884ab740a..0e8db47dd3 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -95,28 +95,33 @@ //! use http::{Request, Response}; //! use std::convert::TryInto as _; //! use std::error::Error; -//! use hyper::{client::HttpConnector, Body}; +//! use http_body_util::{BodyExt, Full}; +//! use hyper_util::{ +//! client::legacy::{Client, connect::HttpConnector}, +//! rt::tokio::TokioExecutor, +//! }; //! //! // `reqwest` is supported through a feature, if you prefer an //! // alternate http client you can add support by implementing `HttpClient` as //! // shown here. //! #[derive(Debug)] -//! struct HyperClient(hyper::Client); +//! struct HyperClient(Client>); //! //! #[async_trait] //! impl HttpClient for HyperClient { //! async fn send(&self, req: Request>) -> Result, HttpError> { //! let resp = self //! .0 -//! .request(req.map(|v| Body::from(v))) +//! .request(req.map(|v| Full::new(Bytes::from(v)))) //! .await?; //! //! let response = Response::builder() //! .status(resp.status()) //! .body({ -//! hyper::body::to_bytes(resp.into_body()) +//! resp.collect() //! .await //! .expect("cannot decode response") +//! .to_bytes() //! }) //! .expect("cannot build response"); //! @@ -127,7 +132,12 @@ //! fn main() -> Result<(), Box> { //! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new()); //! let tracer = opentelemetry_zipkin::new_pipeline() -//! .with_http_client(HyperClient(hyper::Client::new())) +//! .with_http_client( +//! HyperClient( +//! Client::builder(TokioExecutor::new()) +//! .build_http() +//! ) +//! ) //! .with_service_name("my_app") //! .with_service_address("127.0.0.1:8080".parse()?) //! .with_collector_endpoint("http://localhost:9411/api/v2/spans") From 45903d803b529eaed84cf2bfdae676ec6b0d846a Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:05:48 +0200 Subject: [PATCH 08/31] Remove unused dependencies/features --- examples/logs-basic/Cargo.toml | 1 - examples/metrics-advanced/Cargo.toml | 1 - examples/metrics-basic/Cargo.toml | 1 - opentelemetry-appender-tracing/Cargo.toml | 1 - opentelemetry-otlp/tests/integration_test/Cargo.toml | 1 - stress/Cargo.toml | 1 - 6 files changed, 6 deletions(-) diff --git a/examples/logs-basic/Cargo.toml b/examples/logs-basic/Cargo.toml index ae30dc6779..47a6fcd845 100644 --- a/examples/logs-basic/Cargo.toml +++ b/examples/logs-basic/Cargo.toml @@ -12,4 +12,3 @@ opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["logs" opentelemetry-appender-log = { path = "../../opentelemetry-appender-log", default-features = false} opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } log = { workspace = true } -serde_json = { workspace = true } diff --git a/examples/metrics-advanced/Cargo.toml b/examples/metrics-advanced/Cargo.toml index a5a8a7c489..f264a2a4f2 100644 --- a/examples/metrics-advanced/Cargo.toml +++ b/examples/metrics-advanced/Cargo.toml @@ -10,4 +10,3 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics"] } opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]} tokio = { workspace = true, features = ["full"] } -serde_json = { workspace = true } diff --git a/examples/metrics-basic/Cargo.toml b/examples/metrics-basic/Cargo.toml index 37b79da140..cb5f6b50ad 100644 --- a/examples/metrics-basic/Cargo.toml +++ b/examples/metrics-basic/Cargo.toml @@ -10,7 +10,6 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics", "otel_uns opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]} tokio = { workspace = true, features = ["full"] } -serde_json = { workspace = true } [features] default = ["otel_unstable"] diff --git a/opentelemetry-appender-tracing/Cargo.toml b/opentelemetry-appender-tracing/Cargo.toml index d104014662..0e6f0e249d 100644 --- a/opentelemetry-appender-tracing/Cargo.toml +++ b/opentelemetry-appender-tracing/Cargo.toml @@ -12,7 +12,6 @@ rust-version = "1.65" [dependencies] log = { workspace = true, optional = true } -once_cell = { workspace = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["logs"] } tracing = { workspace = true, features = ["std"]} tracing-core = { workspace = true } diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index 9566576d9b..d7ef8de3d3 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -6,7 +6,6 @@ publish = false [dependencies] -once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry", features = ["metrics", "logs"] } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "logs", "testing"] } opentelemetry-proto = { path = "../../../opentelemetry-proto", features = ["gen-tonic-messages", "trace", "logs", "with-serde"] } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 43aa8adee7..d7e2c7a1c4 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -43,7 +43,6 @@ opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "log opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"} rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]} -tracing-core = { workspace = true } tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" sysinfo = { version = "0.30.12", optional = true } From 650e688f7f4f6e59e37aa7e83d2b4d5e104da00b Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:06:48 +0200 Subject: [PATCH 09/31] Prefer `::from` --- opentelemetry-http/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 2112c873a9..79a72029b3 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -152,7 +152,7 @@ pub mod hyper { { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let mut request: Request = Request::from_parts(parts, body.into()); + let mut request = Request::from_parts(parts, B::from(body)); if let Some(ref authorization) = self.authorization { request .headers_mut() From 832b1cb3496be28fc303671f1c9375327464aeab Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:11:20 +0200 Subject: [PATCH 10/31] Remove some features --- opentelemetry-http/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index 34040daffe..3e16fb2d16 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -19,8 +19,8 @@ async-trait = { workspace = true } bytes = { workspace = true } http = { workspace = true } http-body-util = { workspace = true, optional = true } -hyper = { workspace = true, features = ["http2", "client"], optional = true } -hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], optional = true } +hyper = { workspace = true, optional = true } +hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true } opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } From 40fa5766e3cc2786c1010764e7b841c96cd60c6a Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:28:00 +0200 Subject: [PATCH 11/31] Use pinned PR version of external types --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f7002f0c0b..9aa05d4601 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: components: rustfmt - name: external-type-check run: | - cargo install cargo-check-external-types + cargo install cargo-check-external-types --git https://github.com/awslabs/cargo-check-external-types.git --rev 98d6e533ab92540ab12a4ca35acd244dacb04a9f cd ${{ matrix.example }} cargo check-external-types --config allowed-external-types.toml non-default-examples: From 1a3d711b4d983c9d9b17ff7c5547043d0b5069a5 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:41:55 +0200 Subject: [PATCH 12/31] Add CHANGELOG entries --- opentelemetry-appender-log/CHANGELOG.md | 2 +- opentelemetry-http/CHANGELOG.md | 1 + opentelemetry-otlp/CHANGELOG.md | 2 +- opentelemetry-sdk/CHANGELOG.md | 1 + opentelemetry-zipkin/CHANGELOG.md | 4 ++++ 5 files changed, 8 insertions(+), 2 deletions(-) diff --git a/opentelemetry-appender-log/CHANGELOG.md b/opentelemetry-appender-log/CHANGELOG.md index 1c92c570cd..0807b4a414 100644 --- a/opentelemetry-appender-log/CHANGELOG.md +++ b/opentelemetry-appender-log/CHANGELOG.md @@ -3,11 +3,11 @@ ## vNext - [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) Utilize the `LogRecord::set_target()` method to pass the log target to the SDK. +- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.4.0 - Add log key-values as attributes [#1628](https://github.com/open-telemetry/opentelemetry-rust/pull/1628) -- Update `opentelemetry` dependency version to 0.23 ## v0.3.0 diff --git a/opentelemetry-http/CHANGELOG.md b/opentelemetry-http/CHANGELOG.md index 51880f3c31..3603af5a5b 100644 --- a/opentelemetry-http/CHANGELOG.md +++ b/opentelemetry-http/CHANGELOG.md @@ -4,6 +4,7 @@ - **Breaking** Correct the misspelling of "webkpi" to "webpki" in features [#1842](https://github.com/open-telemetry/opentelemetry-rust/pull/1842) - **Breaking** Remove support for the `isahc` HTTP client [#1924](https://github.com/open-telemetry/opentelemetry-rust/pull/1924) +- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.12.0 diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 5295ae9ee0..be681d9886 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -20,7 +20,7 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using previous release. - **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated. - Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873). - +- **Breaking** Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.16.0 diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 61d99a629b..0ca10932a4 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -74,6 +74,7 @@ LogData { } ``` The `LogRecord::target` field contains the actual target/component emitting the logs, while the `Instrumentation::name` contains the name of the OpenTelemetry appender. +- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.23.0 diff --git a/opentelemetry-zipkin/CHANGELOG.md b/opentelemetry-zipkin/CHANGELOG.md index 68ef3842c5..2b521ff5d6 100644 --- a/opentelemetry-zipkin/CHANGELOG.md +++ b/opentelemetry-zipkin/CHANGELOG.md @@ -2,6 +2,10 @@ ## vNext +### Changed + +- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) + ## v0.21.0 ### Changed From 8e6c5d2a9e7c8839213832b4c62ee7ca9c0b73db Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Fri, 3 May 2024 17:57:57 +0200 Subject: [PATCH 13/31] Add `serde_json` dependencies back --- examples/logs-basic/Cargo.toml | 1 + examples/metrics-advanced/Cargo.toml | 1 + examples/metrics-basic/Cargo.toml | 1 + 3 files changed, 3 insertions(+) diff --git a/examples/logs-basic/Cargo.toml b/examples/logs-basic/Cargo.toml index 47a6fcd845..ae30dc6779 100644 --- a/examples/logs-basic/Cargo.toml +++ b/examples/logs-basic/Cargo.toml @@ -12,3 +12,4 @@ opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["logs" opentelemetry-appender-log = { path = "../../opentelemetry-appender-log", default-features = false} opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } log = { workspace = true } +serde_json = { workspace = true } diff --git a/examples/metrics-advanced/Cargo.toml b/examples/metrics-advanced/Cargo.toml index f264a2a4f2..a5a8a7c489 100644 --- a/examples/metrics-advanced/Cargo.toml +++ b/examples/metrics-advanced/Cargo.toml @@ -10,3 +10,4 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics"] } opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]} tokio = { workspace = true, features = ["full"] } +serde_json = { workspace = true } diff --git a/examples/metrics-basic/Cargo.toml b/examples/metrics-basic/Cargo.toml index cb5f6b50ad..37b79da140 100644 --- a/examples/metrics-basic/Cargo.toml +++ b/examples/metrics-basic/Cargo.toml @@ -10,6 +10,7 @@ opentelemetry = { path = "../../opentelemetry", features = ["metrics", "otel_uns opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] } opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]} tokio = { workspace = true, features = ["full"] } +serde_json = { workspace = true } [features] default = ["otel_unstable"] From 8ad764ac9ea5fef69788f083405560ad0d8b1149 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Sun, 5 May 2024 11:13:45 +0200 Subject: [PATCH 14/31] Replace `Either` usage with `BoxBody` --- .../tracing-http-propagator/src/server.rs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 0f42d5b5d8..bbfa5556cb 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -1,4 +1,4 @@ -use http_body_util::{Either, Full}; +use http_body_util::{combinators::BoxBody, BodyExt, Full}; use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode}; use hyper_util::rt::{TokioExecutor, TokioIo}; use opentelemetry::{ @@ -21,32 +21,44 @@ fn extract_context_from_request(req: &Request) -> Context { } // Separate async function for the handle endpoint -async fn handle_health_check(_req: Request) -> Result>, Infallible> { +async fn handle_health_check( + _req: Request, +) -> Result>, Infallible> { let tracer = global::tracer("example/server"); let mut span = tracer .span_builder("health_check") .with_kind(SpanKind::Internal) .start(&tracer); span.add_event("Health check accessed", vec![]); - let res = Response::new(Full::new(Bytes::from_static(b"Server is up and running!"))); + + let res = Response::new( + Full::new(Bytes::from_static(b"Server is up and running!")) + .map_err(|err| match err {}) + .boxed(), + ); + Ok(res) } // Separate async function for the echo endpoint -async fn handle_echo(req: Request) -> Result, Infallible> { +async fn handle_echo( + req: Request, +) -> Result>, Infallible> { let tracer = global::tracer("example/server"); let mut span = tracer .span_builder("echo") .with_kind(SpanKind::Internal) .start(&tracer); span.add_event("Echoing back the request", vec![]); - let res = Response::new(req.into_body()); + + let res = Response::new(req.into_body().boxed()); + Ok(res) } async fn router( req: Request, -) -> Result, Incoming>>, Infallible> { +) -> Result>, Infallible> { // Extract the context from the incoming request headers let parent_cx = extract_context_from_request(&req); let response = { @@ -61,18 +73,12 @@ async fn router( let cx = Context::default().with_span(span); match (req.method(), req.uri().path()) { - (&hyper::Method::GET, "/health") => handle_health_check(req) - .with_context(cx) - .await - .map(|response| response.map(Either::Left)), - (&hyper::Method::GET, "/echo") => handle_echo(req) - .with_context(cx) - .await - .map(|response| response.map(Either::Right)), + (&hyper::Method::GET, "/health") => handle_health_check(req).with_context(cx).await, + (&hyper::Method::GET, "/echo") => handle_echo(req).with_context(cx).await, _ => { cx.span() .set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404)); - let mut not_found = Response::new(Either::Left(Full::default())); + let mut not_found = Response::new(BoxBody::default()); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) } From 6f34ee354a31ea1822470c70480047c0283c80f2 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Tue, 9 Jul 2024 13:02:45 +0200 Subject: [PATCH 15/31] Add allow over generated module --- opentelemetry-proto/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-proto/src/lib.rs b/opentelemetry-proto/src/lib.rs index 209df125a6..c866559a26 100644 --- a/opentelemetry-proto/src/lib.rs +++ b/opentelemetry-proto/src/lib.rs @@ -29,6 +29,7 @@ // we shouldn't manually change it. Thus skip format and lint check. #[rustfmt::skip] #[allow(warnings)] +#[allow(ambiguous_associated_items)] // TODO: Remove this allow as soon as is resolved and released. #[doc(hidden)] mod proto; From 70a9bc7c6cb215e191f9a57577fdfdd8b937c841 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Tue, 9 Jul 2024 13:14:37 +0200 Subject: [PATCH 16/31] Fix hyper example --- .../examples/basic-otlp-http/Cargo.toml | 4 +++- .../examples/basic-otlp-http/src/hyper.rs | 18 +++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 149a734304..ccbe22e960 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,7 +8,7 @@ publish = false [features] default = ["reqwest"] reqwest = ["opentelemetry-otlp/reqwest-client"] -hyper = ["dep:async-trait", "dep:http", "dep:hyper", "dep:opentelemetry-http", "dep:bytes"] +hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"] [dependencies] @@ -23,7 +23,9 @@ opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-c async-trait = { workspace = true, optional = true } bytes = { workspace = true, optional = true } http = { workspace = true, optional = true } +http-body-util = { workspace = true, optional = true } hyper = { workspace = true, features = ["client"], optional = true } +hyper-util = { workspace = true, features = ["client-legacy"], optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"]} tracing-core = { workspace = true } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs index ff6e84a05d..80a28ae62d 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs @@ -1,20 +1,24 @@ use async_trait::async_trait; use bytes::Bytes; use http::{Request, Response}; -use hyper::{ - client::{connect::Connect, HttpConnector}, - Body, Client, +use http_body_util::{BodyExt, Full}; +use hyper_util::{ + client::legacy::{ + connect::{Connect, HttpConnector}, + Client, + }, + rt::TokioExecutor, }; use opentelemetry_http::{HttpClient, HttpError, ResponseExt}; pub struct HyperClient { - inner: hyper::Client, + inner: hyper_util::client::legacy::Client>, } impl Default for HyperClient { fn default() -> Self { Self { - inner: Client::new(), + inner: Client::builder(TokioExecutor::new()).build_http(), } } } @@ -30,7 +34,7 @@ impl std::fmt::Debug for HyperClient { #[async_trait] impl HttpClient for HyperClient { async fn send(&self, request: Request>) -> Result, HttpError> { - let request = request.map(Body::from); + let request = request.map(|body| Full::new(Bytes::from(body))); let (parts, body) = self .inner @@ -38,7 +42,7 @@ impl HttpClient for HyperClient { .await? .error_for_status()? .into_parts(); - let body = hyper::body::to_bytes(body).await?; + let body = body.collect().await?.to_bytes(); Ok(Response::from_parts(parts, body)) } From 95608c5759d34fa825767e375fc66738b0007b56 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Tue, 9 Jul 2024 13:42:16 +0200 Subject: [PATCH 17/31] Regenerate protobuf types --- .../opentelemetry.proto.collector.logs.v1.rs | 23 +++++-------------- ...pentelemetry.proto.collector.metrics.v1.rs | 23 +++++-------------- .../opentelemetry.proto.collector.trace.v1.rs | 23 +++++-------------- .../tonic/opentelemetry.proto.metrics.v1.rs | 6 ++--- 4 files changed, 21 insertions(+), 54 deletions(-) diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs index 5e808db812..a8f5443978 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.logs.v1.rs @@ -207,19 +207,17 @@ pub mod logs_service_server { /// case logs are sent/received to/from multiple Applications). #[derive(Debug)] pub struct LogsServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl LogsServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -282,7 +280,6 @@ pub mod logs_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/opentelemetry.proto.collector.logs.v1.LogsService/Export" => { #[allow(non_camel_case_types)] @@ -313,7 +310,6 @@ pub mod logs_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExportSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -335,8 +331,11 @@ pub mod logs_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -357,16 +356,6 @@ pub mod logs_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for LogsServiceServer { const NAME: &'static str = "opentelemetry.proto.collector.logs.v1.LogsService"; } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs index bc0f036420..052cc810f7 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.metrics.v1.rs @@ -207,19 +207,17 @@ pub mod metrics_service_server { /// central collector. #[derive(Debug)] pub struct MetricsServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl MetricsServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -282,7 +280,6 @@ pub mod metrics_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export" => { #[allow(non_camel_case_types)] @@ -313,7 +310,6 @@ pub mod metrics_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExportSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -335,8 +331,11 @@ pub mod metrics_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -357,16 +356,6 @@ pub mod metrics_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for MetricsServiceServer { const NAME: &'static str = "opentelemetry.proto.collector.metrics.v1.MetricsService"; } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs index fae4d4dce6..55d0361be0 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.collector.trace.v1.rs @@ -207,19 +207,17 @@ pub mod trace_service_server { /// case spans are sent/received to/from multiple Applications). #[derive(Debug)] pub struct TraceServiceServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl TraceServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -282,7 +280,6 @@ pub mod trace_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/opentelemetry.proto.collector.trace.v1.TraceService/Export" => { #[allow(non_camel_case_types)] @@ -313,7 +310,6 @@ pub mod trace_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExportSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -335,8 +331,11 @@ pub mod trace_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -357,16 +356,6 @@ pub mod trace_service_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for TraceServiceServer { const NAME: &'static str = "opentelemetry.proto.collector.trace.v1.TraceService"; } diff --git a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs index 462305014f..1a69fcc5c5 100644 --- a/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs +++ b/opentelemetry-proto/src/proto/tonic/opentelemetry.proto.metrics.v1.rs @@ -335,7 +335,7 @@ pub mod number_data_point { #[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(double, tag = "4")] AsDouble(f64), @@ -624,7 +624,7 @@ pub mod summary_data_point { #[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ValueAtQuantile { /// The quantile of a distribution. Must be in the interval /// \[0.0, 1.0\]. @@ -685,7 +685,7 @@ pub mod exemplar { #[cfg_attr(feature = "with-serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "with-serde", serde(rename_all = "camelCase"))] #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] + #[derive(Clone, Copy, PartialEq, ::prost::Oneof)] pub enum Value { #[prost(double, tag = "3")] AsDouble(f64), From 318bb62b530cdb043191d24998eb6a4d369ee5ff Mon Sep 17 00:00:00 2001 From: aumetra Date: Tue, 9 Jul 2024 14:42:58 +0200 Subject: [PATCH 18/31] Pin cc version --- Cargo.toml | 1 + opentelemetry/Cargo.toml | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 45fa0db400..0c45fd25e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ debug = 1 async-std = "1.10" async-trait = "0.1" bytes = "1" +cc = "=1.0.105" # pinning version supporting rustc 1.65 criterion = "0.5" futures-core = "0.3" futures-executor = "0.3" diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index 3d181fce76..cd7e423a44 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -30,6 +30,11 @@ thiserror = { workspace = true } [target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies] js-sys = "0.3.63" +# This cfg can't ever be enabled but cargo will respect it anyway. +# Meaning we can use this to pin unused transitive dependencies. +[target.'cfg(any())'.dependencies] +cc = { workspace = true } + [features] default = ["trace", "metrics", "logs"] trace = ["pin-project-lite"] From c93e43f9ae12868a01ebbb7dda2c958b733209b4 Mon Sep 17 00:00:00 2001 From: aumetra Date: Tue, 9 Jul 2024 15:00:08 +0200 Subject: [PATCH 19/31] Mention tonic in changelog --- opentelemetry-otlp/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index be681d9886..f901e1314b 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -20,7 +20,7 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using previous release. - **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated. - Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873). -- **Breaking** Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) +- **Breaking** Update to `http` v1 and `tonic` v0.12 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.16.0 From ab7a3a4154bae41780362678e1f697f00ffb6f63 Mon Sep 17 00:00:00 2001 From: aumetra Date: Tue, 9 Jul 2024 15:00:49 +0200 Subject: [PATCH 20/31] Remove mention from appender-log changelog --- opentelemetry-appender-log/CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-appender-log/CHANGELOG.md b/opentelemetry-appender-log/CHANGELOG.md index 0807b4a414..41d09bd957 100644 --- a/opentelemetry-appender-log/CHANGELOG.md +++ b/opentelemetry-appender-log/CHANGELOG.md @@ -3,7 +3,6 @@ ## vNext - [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) Utilize the `LogRecord::set_target()` method to pass the log target to the SDK. -- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.4.0 From af97e0ed5a6de7748c78b99f0e59c110934e2a0b Mon Sep 17 00:00:00 2001 From: aumetra Date: Tue, 9 Jul 2024 15:01:46 +0200 Subject: [PATCH 21/31] Add back entry --- opentelemetry-appender-log/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-appender-log/CHANGELOG.md b/opentelemetry-appender-log/CHANGELOG.md index 41d09bd957..1c92c570cd 100644 --- a/opentelemetry-appender-log/CHANGELOG.md +++ b/opentelemetry-appender-log/CHANGELOG.md @@ -7,6 +7,7 @@ ## v0.4.0 - Add log key-values as attributes [#1628](https://github.com/open-telemetry/opentelemetry-rust/pull/1628) +- Update `opentelemetry` dependency version to 0.23 ## v0.3.0 From eb2ae3295d3f8c5f1670d21a4c09edb71650bae9 Mon Sep 17 00:00:00 2001 From: aumetra Date: Tue, 9 Jul 2024 15:52:35 +0200 Subject: [PATCH 22/31] Remove git patch for CI --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9aa05d4601..f7002f0c0b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,7 +77,7 @@ jobs: components: rustfmt - name: external-type-check run: | - cargo install cargo-check-external-types --git https://github.com/awslabs/cargo-check-external-types.git --rev 98d6e533ab92540ab12a4ca35acd244dacb04a9f + cargo install cargo-check-external-types cd ${{ matrix.example }} cargo check-external-types --config allowed-external-types.toml non-default-examples: From 23c09c781e478dbad5992c3df309b8997e980d54 Mon Sep 17 00:00:00 2001 From: aumetra Date: Tue, 9 Jul 2024 15:55:52 +0200 Subject: [PATCH 23/31] Remove allow annotation --- opentelemetry-proto/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-proto/src/lib.rs b/opentelemetry-proto/src/lib.rs index c866559a26..209df125a6 100644 --- a/opentelemetry-proto/src/lib.rs +++ b/opentelemetry-proto/src/lib.rs @@ -29,7 +29,6 @@ // we shouldn't manually change it. Thus skip format and lint check. #[rustfmt::skip] #[allow(warnings)] -#[allow(ambiguous_associated_items)] // TODO: Remove this allow as soon as is resolved and released. #[doc(hidden)] mod proto; From f5c73eb240aec3934af0e2fe62a41ce1ca18ee22 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Wed, 10 Jul 2024 07:40:18 +0200 Subject: [PATCH 24/31] Update zipkin changelog --- opentelemetry-zipkin/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-zipkin/CHANGELOG.md b/opentelemetry-zipkin/CHANGELOG.md index 2b521ff5d6..13e47c4b38 100644 --- a/opentelemetry-zipkin/CHANGELOG.md +++ b/opentelemetry-zipkin/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) +- Update examples to `hyper` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.21.0 From a87360bb186130e87eb44f37a1baa93fd6a5131a Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Wed, 10 Jul 2024 21:55:06 +0200 Subject: [PATCH 25/31] Remove cc pin --- Cargo.toml | 1 - opentelemetry/Cargo.toml | 5 ----- 2 files changed, 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0c45fd25e2..45fa0db400 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ debug = 1 async-std = "1.10" async-trait = "0.1" bytes = "1" -cc = "=1.0.105" # pinning version supporting rustc 1.65 criterion = "0.5" futures-core = "0.3" futures-executor = "0.3" diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index cd7e423a44..3d181fce76 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -30,11 +30,6 @@ thiserror = { workspace = true } [target.'cfg(all(target_arch = "wasm32", not(target_os = "wasi")))'.dependencies] js-sys = "0.3.63" -# This cfg can't ever be enabled but cargo will respect it anyway. -# Meaning we can use this to pin unused transitive dependencies. -[target.'cfg(any())'.dependencies] -cc = { workspace = true } - [features] default = ["trace", "metrics", "logs"] trace = ["pin-project-lite"] From 88ad41698a4e36269fa8668006c9e661d7c9dcfa Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Thu, 11 Jul 2024 09:56:01 +0200 Subject: [PATCH 26/31] Revert submodule update --- opentelemetry-proto/src/proto/opentelemetry-proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-proto/src/proto/opentelemetry-proto b/opentelemetry-proto/src/proto/opentelemetry-proto index 24d4bc0020..b3060d2104 160000 --- a/opentelemetry-proto/src/proto/opentelemetry-proto +++ b/opentelemetry-proto/src/proto/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 24d4bc002003c74db7aa608c8e254155daf8e49d +Subproject commit b3060d2104df364136d75a35779e6bd48bac449a From 7d1c3a1635d95a01fd966fb99e7ef14f3d9b377c Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Thu, 11 Jul 2024 09:57:04 +0200 Subject: [PATCH 27/31] Reorder dependencies --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 45fa0db400..4ae28e5821 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,10 +23,10 @@ criterion = "0.5" futures-core = "0.3" futures-executor = "0.3" futures-util = { version = "0.3", default-features = false } -hyper = { version = "1.3", default-features = false } -hyper-util = "0.1" http = { version = "1.1", default-features = false, features = ["std"] } http-body-util = "0.1" +hyper = { version = "1.3", default-features = false } +hyper-util = "0.1" log = "0.4.21" once_cell = "1.13" ordered-float = "4.0" From 16783065f157ba507670057325938f3a4ff5b1b9 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Thu, 11 Jul 2024 10:03:15 +0200 Subject: [PATCH 28/31] Update changelogs --- opentelemetry-sdk/CHANGELOG.md | 2 +- opentelemetry-zipkin/CHANGELOG.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 0ca10932a4..60ebfba2fe 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -74,7 +74,7 @@ LogData { } ``` The `LogRecord::target` field contains the actual target/component emitting the logs, while the `Instrumentation::name` contains the name of the OpenTelemetry appender. -- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) +- **Breaking** [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) Update to `http` v1 types (via `opentelemetry-http` update) ## v0.23.0 diff --git a/opentelemetry-zipkin/CHANGELOG.md b/opentelemetry-zipkin/CHANGELOG.md index 13e47c4b38..69d0018785 100644 --- a/opentelemetry-zipkin/CHANGELOG.md +++ b/opentelemetry-zipkin/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changed -- Update examples to `hyper` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) +- Update `opentelemetry-http` (and with that to `http` v1 types) [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674) ## v0.21.0 From 25f9213ab87eeb080df88242aba333c37bbc82ec Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Thu, 11 Jul 2024 10:36:41 +0200 Subject: [PATCH 29/31] Add opaque wrapper around Full body, use wrapper --- opentelemetry-http/src/lib.rs | 51 ++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 79a72029b3..768c12507d 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -105,23 +105,51 @@ pub mod hyper { use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use http::HeaderValue; - use http_body_util::BodyExt; - use hyper::body::Body; + use http_body_util::{BodyExt, Full}; + use hyper::body::{Body as HttpBody, Frame}; use hyper_util::client::legacy::{connect::Connect, Client}; - use std::error::Error; + use std::convert::Infallible; use std::fmt::Debug; + use std::pin::Pin; + use std::task::{self, Poll}; use std::time::Duration; use tokio::time; + pub struct Body(Full); + + impl HttpBody for Body { + type Data = Bytes; + type Error = Infallible; + + #[inline] + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>>> { + let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) }; + inner_body.poll_frame(cx) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.0.size_hint() + } + } + #[derive(Debug, Clone)] - pub struct HyperClient { - inner: Client, + pub struct HyperClient { + inner: Client, timeout: Duration, authorization: Option, } - impl HyperClient { - pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + impl HyperClient { + pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { Self { inner, timeout, @@ -130,7 +158,7 @@ pub mod hyper { } pub fn new_with_timeout_and_authorization_header( - inner: Client, + inner: Client, timeout: Duration, authorization: HeaderValue, ) -> Self { @@ -143,16 +171,13 @@ pub mod hyper { } #[async_trait] - impl HttpClient for HyperClient + impl HttpClient for HyperClient where C: Connect + Send + Sync + Clone + Debug + 'static, - B: From> + Body + Send + Sync + Debug + Unpin + 'static, - ::Data: Send, - ::Error: Into>, { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let mut request = Request::from_parts(parts, B::from(body)); + let mut request = Request::from_parts(parts, Body(Full::from(body))); if let Some(ref authorization) = self.authorization { request .headers_mut() From c8409787d4923d1efc557a82fd458adcdaecafaa Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Thu, 11 Jul 2024 10:54:54 +0200 Subject: [PATCH 30/31] Move body impl, use box dyn error type --- opentelemetry-http/src/lib.rs | 53 +++++++++++++++++------------------ 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 768c12507d..e08fe05fec 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -108,39 +108,12 @@ pub mod hyper { use http_body_util::{BodyExt, Full}; use hyper::body::{Body as HttpBody, Frame}; use hyper_util::client::legacy::{connect::Connect, Client}; - use std::convert::Infallible; use std::fmt::Debug; use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; use tokio::time; - pub struct Body(Full); - - impl HttpBody for Body { - type Data = Bytes; - type Error = Infallible; - - #[inline] - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>>> { - let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) }; - inner_body.poll_frame(cx) - } - - #[inline] - fn is_end_stream(&self) -> bool { - self.0.is_end_stream() - } - - #[inline] - fn size_hint(&self) -> hyper::body::SizeHint { - self.0.size_hint() - } - } - #[derive(Debug, Clone)] pub struct HyperClient { inner: Client, @@ -194,6 +167,32 @@ pub mod hyper { Ok(http_response.error_for_status()?) } } + + pub struct Body(Full); + + impl HttpBody for Body { + type Data = Bytes; + type Error = Box; + + #[inline] + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>>> { + let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) }; + inner_body.poll_frame(cx) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> hyper::body::SizeHint { + self.0.size_hint() + } + } } /// Methods to make working with responses from the [`HttpClient`] trait easier. From 1d1cef9c6f1d481830ea1448300080abe4e63b56 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Thu, 11 Jul 2024 11:45:30 +0200 Subject: [PATCH 31/31] Fix compile error --- opentelemetry-http/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index e08fe05fec..f3e5e4f70a 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -180,7 +180,7 @@ pub mod hyper { cx: &mut task::Context<'_>, ) -> Poll, Self::Error>>> { let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) }; - inner_body.poll_frame(cx) + inner_body.poll_frame(cx).map_err(Into::into) } #[inline]