From 056d42a8d5a61b393297a7b71381e36ef7ff4c44 Mon Sep 17 00:00:00 2001 From: Lucas Sunsi Abreu Date: Mon, 22 Jul 2024 09:25:30 -0300 Subject: [PATCH] replace tracing with otel (#58) * replace tracing with otel * drop async trait * use span correctly --- Cargo.toml | 3 +- src/lib.rs | 146 +++++++++++++++++++++++++++++++++++------------------ 2 files changed, 98 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c305fdb..e357436 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,11 @@ license = "MIT" name = "lnd" [dependencies] -async-trait = { version = "0.1.0", default-features = false } hex = { version = "0.4.0", features = ["std"], default-features = false } +opentelemetry = { version = "0.24.0", features = ["trace"], default-features = false } prost = { version = "0.13.0", features = ["prost-derive"], default-features = false } thiserror = { version = "1.0.0", default-features = false } tonic = { version = "0.12.0", features = ["codegen", "prost", "tls", "transport"], default-features = false } -tracing = { version = "0.1.0", default-features = false } [build-dependencies] tonic-build = "0.12.0" diff --git a/src/lib.rs b/src/lib.rs index 850c20c..c9e2e36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ use gen::{ }, routerrpc::{router_client::RouterClient, SendPaymentRequest, TrackPaymentRequest}, }; +use opentelemetry::trace::{FutureExt, TraceContextExt}; use std::convert::TryInto; use tonic::{ codegen::{InterceptedService, StdError}, @@ -37,13 +38,13 @@ use tonic::{ transport::{Channel, Endpoint}, Response, Status, Streaming, }; -use tracing::Instrument; #[derive(Debug, Clone)] pub struct Lnd { lightning: LightningClient>, invoices: InvoicesClient>, router: RouterClient>, + tracer: std::sync::Arc, } #[derive(Debug, thiserror::Error)] @@ -54,6 +55,20 @@ pub enum LndConnectError { Transport(tonic::transport::Error), } +macro_rules! span { + ($tracer:expr => $package:literal. $service:literal / $method:literal) => { + opentelemetry::trace::SpanBuilder::from_name(concat!($package, ".", $service, "/", $method)) + .with_kind(opentelemetry::trace::SpanKind::Client) + .with_attributes([ + opentelemetry::KeyValue::new("service.name", "lnd"), + opentelemetry::KeyValue::new("rpc.system", "grpc"), + opentelemetry::KeyValue::new("rpc.service", concat!($package, ".", $service)), + opentelemetry::KeyValue::new("rpc.method", $method), + ]) + .start($tracer.as_ref()) + }; +} + impl Lnd { pub async fn connect(destination: D, cert_pem_bytes: &[u8]) -> Result where @@ -133,6 +148,7 @@ impl Lnd { } fn build(channel: Channel, interceptor: LndInterceptor) -> Self { + let tracer = std::sync::Arc::new(opentelemetry::global::tracer("lnd")); let lightning = LightningClient::with_interceptor(channel.clone(), interceptor.clone()); let invoices = InvoicesClient::with_interceptor(channel.clone(), interceptor.clone()); let router = RouterClient::with_interceptor(channel, interceptor); @@ -141,6 +157,7 @@ impl Lnd { lightning, invoices, router, + tracer, } } } @@ -173,17 +190,21 @@ impl Interceptor for LndInterceptor { impl Lnd { pub async fn get_info(&mut self) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "GetInfo"); + self.lightning .get_info(GetInfoRequest {}) - .instrument(span!("lnrpc". "Lightning" / "GetInfo")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } pub async fn add_invoice(&mut self, invoice: Invoice) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "AddInvoice"); + self.lightning .add_invoice(invoice) - .instrument(span!("lnrpc". "Lightning" / "AddInvoice")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -193,13 +214,15 @@ impl Lnd { hash: Vec, value: i64, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Invoices" / "AddHoldInvoice"); + self.invoices .add_hold_invoice(AddHoldInvoiceRequest { hash, value, ..AddHoldInvoiceRequest::default() }) - .instrument(span!("lnrpc". "Invoices" / "AddHoldInvoice")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -208,25 +231,31 @@ impl Lnd { &mut self, payment_hash: Vec, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Invoices" / "CancelInvoice"); + self.invoices .cancel_invoice(CancelInvoiceMsg { payment_hash }) - .instrument(span!("lnrpc". "Invoices" / "CancelInvoice")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } pub async fn settle_invoice(&mut self, preimage: Vec) -> Result { + let span = span!(self.tracer => "lnrpc". "Invoices" / "SettleInvoice"); + self.invoices .settle_invoice(SettleInvoiceMsg { preimage }) - .instrument(span!("lnrpc". "Invoices" / "SettleInvoice")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } pub async fn channel_balance(&mut self) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ChannelBalance"); + self.lightning .channel_balance(ChannelBalanceRequest {}) - .instrument(span!("lnrpc". "Lightning" / "ChannelBalance")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -238,6 +267,8 @@ impl Lnd { max_payments: u64, reversed: bool, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ListPayments"); + self.lightning .list_payments(ListPaymentsRequest { include_incomplete, @@ -246,7 +277,7 @@ impl Lnd { reversed, ..ListPaymentsRequest::default() }) - .instrument(span!("lnrpc". "Lightning" / "ListPayments")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -258,6 +289,8 @@ impl Lnd { num_max_invoices: u64, reversed: bool, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ListInvoices"); + self.lightning .list_invoices(ListInvoiceRequest { pending_only, @@ -266,7 +299,7 @@ impl Lnd { reversed, ..ListInvoiceRequest::default() }) - .instrument(span!("lnrpc". "Lightning" / "ListInvoices")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -275,17 +308,21 @@ impl Lnd { &mut self, send_request: SendRequest, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "SendPaymentSync"); + self.lightning .send_payment_sync(send_request) - .instrument(span!("lnrpc". "Lightning" / "SendPaymentSync")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } pub async fn wallet_balance(&mut self) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "WalletBalance"); + self.lightning .wallet_balance(WalletBalanceRequest::default()) - .instrument(span!("lnrpc". "Lightning" / "WalletBalance")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -294,9 +331,11 @@ impl Lnd { &mut self, req: ForwardingHistoryRequest, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ForwardingHistory"); + self.lightning .forwarding_history(req) - .instrument(span!("lnrpc". "Lightning" / "ForwardingHistory")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -305,9 +344,11 @@ impl Lnd { &mut self, req: ListChannelsRequest, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ListChannels"); + self.lightning .list_channels(req) - .instrument(span!("lnrpc". "Lightning" / "ListChannels")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -316,9 +357,11 @@ impl Lnd { &mut self, req: ClosedChannelsRequest, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ClosedChannels"); + self.lightning .closed_channels(req) - .instrument(span!("lnrpc". "Lightning" / "ClosedChannels")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -327,17 +370,21 @@ impl Lnd { &mut self, req: NewAddressRequest, ) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "NewAddress"); + self.lightning .new_address(req) - .instrument(span!("lnrpc". "Lightning" / "NewAddress")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } pub async fn pending_channels(&mut self) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "PendingChannels"); + self.lightning .pending_channels(PendingChannelsRequest {}) - .instrument(span!("lnrpc". "Lightning" / "PendingChannels")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -346,9 +393,11 @@ impl Lnd { &mut self, req: impl tonic::IntoStreamingRequest, ) -> Result, Status> { + let span = span!(self.tracer => "lnrpc". "Lightning" / "ChannelAcceptor"); + self.lightning .channel_acceptor(req) - .instrument(span!("lnrpc". "Lightning" / "ChannelAcceptor")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -357,17 +406,21 @@ impl Lnd { &mut self, req: CloseChannelRequest, ) -> Result, Status> { + let span = span!(self.tracer => "lnrpc". "Lighting" / "CloseChannel"); + self.lightning .close_channel(req) - .instrument(span!("lnrpc". "Lighting" / "CloseChannel")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } pub async fn send_coins(&mut self, req: SendCoinsRequest) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "SendCoins"); + self.lightning .send_coins(req) - .instrument(span!("lnrpc". "Lightning" / "SendCoins")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -378,9 +431,11 @@ impl Lnd { &mut self, req: SendPaymentRequest, ) -> Result, Status> { + let span = span!(self.tracer => "lnrpc". "Router" / "SendPaymentV2"); + self.router .send_payment_v2(req) - .instrument(span!("lnrpc". "Router" / "SendPaymentV2")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } @@ -390,85 +445,78 @@ impl Lnd { payment_hash: Vec, no_inflight_updates: bool, ) -> Result, Status> { + let span = span!(self.tracer => "lnrpc". "Router" / "TrackPaymentV2"); + self.router .track_payment_v2(TrackPaymentRequest { no_inflight_updates, payment_hash, }) - .instrument(span!("lnrpc". "Router" / "TrackPaymentV2")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } } -#[async_trait::async_trait] pub trait LookupInvoice { - async fn lookup_invoice(&mut self, payment_hash: Vec) -> Result; + fn lookup_invoice( + &mut self, + payment_hash: Vec, + ) -> impl std::future::Future> + Send; } -#[async_trait::async_trait] impl LookupInvoice for Lnd { async fn lookup_invoice(&mut self, payment_hash: Vec) -> Result { + let span = span!(self.tracer => "lnrpc". "Invoices" / "LookupInvoiceV2"); + self.invoices .lookup_invoice_v2(LookupInvoiceMsg { lookup_modifier: LookupModifier::Default as i32, invoice_ref: Some(InvoiceRef::PaymentHash(payment_hash)), }) - .instrument(span!("lnrpc". "Invoices" / "LookupInvoiceV2")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } } -#[async_trait::async_trait] pub trait SubscribeSingleInvoice { - async fn subscribe_single_invoice( + fn subscribe_single_invoice( &mut self, r_hash: Vec, - ) -> Result, Status>; + ) -> impl std::future::Future, Status>> + Send; } -#[async_trait::async_trait] impl SubscribeSingleInvoice for Lnd { async fn subscribe_single_invoice( &mut self, r_hash: Vec, ) -> Result, Status> { + let span = span!(self.tracer => "lnrpc". "Invoices" / "SubscribeSingleInvoice"); + self.invoices .subscribe_single_invoice(SubscribeSingleInvoiceRequest { r_hash }) - .instrument(span!("lnrpc". "Invoices" / "SubscribeSingleInvoice")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } } -#[async_trait::async_trait] pub trait DecodePayReq { - async fn decode_pay_req(&mut self, pay_req: String) -> Result; + fn decode_pay_req( + &mut self, + pay_req: String, + ) -> impl std::future::Future> + Send; } -#[async_trait::async_trait] impl DecodePayReq for Lnd { async fn decode_pay_req(&mut self, pay_req: String) -> Result { + let span = span!(self.tracer => "lnrpc". "Lightning" / "DecodePayReq"); + self.lightning .decode_pay_req(PayReqString { pay_req }) - .instrument(span!("lnrpc". "Lightning" / "DecodePayReq")) + .with_context(opentelemetry::Context::current_with_span(span)) .await .map(Response::into_inner) } } - -#[macro_export] -macro_rules! span { - ($package:literal. $service:literal / $method:literal) => { - tracing::info_span!( - "lnd", - service.name = "lnd", - otel.name = concat!($package, ".", $service, "/", $method), - otel.kind = "client", - rpc.system = "grpc", - rpc.service = concat!($package, ".", $service), - rpc.method = $method, - ) - }; -}